improved validations

This commit is contained in:
cpu
2025-03-26 08:52:30 +01:00
parent f500c00896
commit ce7ab594e2

84
app.py
View File

@@ -69,48 +69,36 @@ class FlicButtonHandler:
raise raise
def _decode_vapid_private_key(self): def _decode_vapid_private_key(self):
""" """Load and validate VAPID private key with strict formatting."""
Final robust VAPID private key loader
Handles all possible key formats and provides detailed debugging
"""
try: try:
# Get and clean the key from environment
env_key = os.getenv('VAPID_PRIVATE_KEY', '').strip().strip('"\'') env_key = os.getenv('VAPID_PRIVATE_KEY', '').strip().strip('"\'')
# Debug output # Convert to consistent PEM format
logger.debug(f"Raw env key length: {len(env_key)}")
logger.debug(f"Key starts with: {env_key[:50]}")
# Convert to PEM format
if '\\n' in env_key: if '\\n' in env_key:
# Handle escaped newlines (from .env file)
private_pem = env_key.replace('\\n', '\n') private_pem = env_key.replace('\\n', '\n')
elif '-----BEGIN PRIVATE KEY-----' in env_key:
# Already in PEM format
private_pem = env_key
else: else:
# Assume base64 encoded private_pem = env_key
private_pem = base64.urlsafe_b64decode(env_key).decode('utf-8')
# Ensure proper PEM format # Ensure proper PEM headers
if not private_pem.startswith('-----BEGIN PRIVATE KEY-----'): if not private_pem.startswith('-----BEGIN PRIVATE KEY-----'):
private_pem = f"-----BEGIN PRIVATE KEY-----\n{private_pem}\n-----END PRIVATE KEY-----" private_pem = f"-----BEGIN PRIVATE KEY-----\n{private_pem}\n-----END PRIVATE KEY-----"
# Final validation # Validate by loading
try: key = serialization.load_pem_private_key(
key = serialization.load_pem_private_key( private_pem.encode('utf-8'),
private_pem.encode('utf-8'), password=None
password=None )
)
logger.debug("VAPID private key successfully loaded") # Return standardized PEM
return private_pem return key.private_bytes(
except Exception as e: encoding=serialization.Encoding.PEM,
logger.error(f"Key validation failed: {str(e)}") format=serialization.PrivateFormat.PKCS8,
raise ValueError(f"Invalid private key format") from e encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
except Exception as e: except Exception as e:
logger.error(f"VAPID key loading failed: {str(e)}") logger.error(f"VAPID key loading failed: {str(e)}")
raise raise ValueError("Invalid VAPID private key format") from e
def load_subscriptions(self) -> List[Dict]: def load_subscriptions(self) -> List[Dict]:
"""Load web push subscriptions from file.""" """Load web push subscriptions from file."""
@@ -132,32 +120,50 @@ class FlicButtonHandler:
logger.error(f"Error saving subscriptions: {e}") logger.error(f"Error saving subscriptions: {e}")
async def send_push_notification(self, subscription: Dict, message: str): async def send_push_notification(self, subscription: Dict, message: str):
"""Send a web push notification with proper key handling.""" """Send a web push notification with robust key handling."""
try: try:
if not self.subscriptions: if not self.subscriptions:
logger.warning("No subscriptions available") logger.warning("No subscriptions available")
return return
# Debug output # Convert PEM key to bytes right before use
logger.debug(f"Using VAPID key: {self.vapid_private_key[:50]}...") try:
logger.debug(f"Subscription endpoint: {subscription['endpoint'][:50]}...") private_key = serialization.load_pem_private_key(
self.vapid_private_key.encode('utf-8'),
password=None
)
# Re-serialize to ensure clean format
vapid_private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
except Exception as e:
logger.error(f"Key conversion failed: {str(e)}")
raise
# Extract domain for aud claim
endpoint = subscription['endpoint']
aud = endpoint.split('/fcm/send')[0] if 'fcm.googleapis.com' in endpoint else endpoint.split('/send')[0]
logger.debug(f"Using aud: {aud}")
logger.debug(f"Key length: {len(vapid_private_key)}")
webpush( webpush(
subscription_info=subscription, subscription_info=subscription,
data=message, data=message,
vapid_private_key=self.vapid_private_key, vapid_private_key=vapid_private_key,
vapid_claims={ vapid_claims={
"sub": os.getenv('VAPID_CLAIM_EMAIL', 'mailto:your-email@example.com'), "sub": os.getenv('VAPID_CLAIM_EMAIL', 'mailto:your-email@example.com'),
"aud": subscription['endpoint'].split('/send/')[0] + "/" "aud": aud
} }
) )
logger.info("Push notification sent successfully") logger.info("Push notification sent successfully")
except WebPushException as e: except WebPushException as e:
logger.error(f"Push notification error: {str(e)}") logger.error(f"Push failed: {str(e)}")
if 'Invalid JWT' in str(e): if 'Invalid JWT' in str(e):
logger.error("VAPID key validation failed - check key format") logger.error("VAPID key validation failed")
self.subscriptions = [s for s in self.subscriptions if s != subscription] raise
self.save_subscriptions()
async def handle_button1(self): async def handle_button1(self):
"""Handle first button action - e.g., Home Lights On""" """Handle first button action - e.g., Home Lights On"""