again validations

This commit is contained in:
cpu
2025-03-26 08:59:03 +01:00
parent ce7ab594e2
commit faa32510df

59
app.py
View File

@@ -69,11 +69,12 @@ class FlicButtonHandler:
raise raise
def _decode_vapid_private_key(self): def _decode_vapid_private_key(self):
"""Load and validate VAPID private key with strict formatting.""" """Load and strictly validate VAPID private key."""
try: try:
# Get and clean the key
env_key = os.getenv('VAPID_PRIVATE_KEY', '').strip().strip('"\'') env_key = os.getenv('VAPID_PRIVATE_KEY', '').strip().strip('"\'')
# Convert to consistent PEM format # Convert to clean PEM format
if '\\n' in env_key: if '\\n' in env_key:
private_pem = env_key.replace('\\n', '\n') private_pem = env_key.replace('\\n', '\n')
else: else:
@@ -83,13 +84,14 @@ class FlicButtonHandler:
if not private_pem.startswith('-----BEGIN PRIVATE KEY-----'): if not private_pem.startswith('-----BEGIN PRIVATE KEY-----'):
private_pem = f"-----BEGIN PRIVATE KEY-----\n{private_pem}\n-----END PRIVATE KEY-----" private_pem = f"-----BEGIN PRIVATE KEY-----\n{private_pem}\n-----END PRIVATE KEY-----"
# Validate by loading # Strict validation
key = serialization.load_pem_private_key( key = serialization.load_pem_private_key(
private_pem.encode('utf-8'), private_pem.encode('utf-8'),
password=None password=None,
backend=default_backend()
) )
# Return standardized PEM # Return in strict PEM format
return key.private_bytes( return key.private_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8, format=serialization.PrivateFormat.PKCS8,
@@ -97,8 +99,8 @@ class FlicButtonHandler:
).decode('utf-8') ).decode('utf-8')
except Exception as e: except Exception as e:
logger.error(f"VAPID key loading failed: {str(e)}") logger.error(f"CRITICAL: Invalid VAPID key - {str(e)}")
raise ValueError("Invalid VAPID private key format") from e raise RuntimeError("Invalid VAPID private key configuration") from e
def load_subscriptions(self) -> List[Dict]: def load_subscriptions(self) -> List[Dict]:
"""Load web push subscriptions from file.""" """Load web push subscriptions from file."""
@@ -120,50 +122,29 @@ class FlicButtonHandler:
logger.error(f"Error saving subscriptions: {e}") logger.error(f"Error saving subscriptions: {e}")
async def send_push_notification(self, subscription: Dict, message: str): async def send_push_notification(self, subscription: Dict, message: str):
"""Send a web push notification with robust key handling."""
try: try:
if not self.subscriptions: # Get endpoint base for aud claim
logger.warning("No subscriptions available")
return
# Convert PEM key to bytes right before use
try:
private_key = serialization.load_pem_private_key(
self.vapid_private_key.encode('utf-8'),
password=None
)
# Re-serialize to ensure clean format
vapid_private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
except Exception as e:
logger.error(f"Key conversion failed: {str(e)}")
raise
# Extract domain for aud claim
endpoint = subscription['endpoint'] endpoint = subscription['endpoint']
aud = endpoint.split('/fcm/send')[0] if 'fcm.googleapis.com' in endpoint else endpoint.split('/send')[0] aud = endpoint.split('/send')[0] if '/send' in endpoint else endpoint.split('/fcm/send')[0]
logger.debug(f"Sending to: {endpoint[:50]}...")
logger.debug(f"Using aud: {aud}") logger.debug(f"Using aud: {aud}")
logger.debug(f"Key length: {len(vapid_private_key)}")
webpush( webpush(
subscription_info=subscription, subscription_info=subscription,
data=message, data=message,
vapid_private_key=vapid_private_key, vapid_private_key=self.vapid_private_key,
vapid_claims={ vapid_claims={
"sub": os.getenv('VAPID_CLAIM_EMAIL', 'mailto:your-email@example.com'), "sub": os.getenv('VAPID_CLAIM_EMAIL'),
"aud": aud "aud": aud + "/" # Ensure trailing slash
} },
ttl=86400 # 24 hour expiration
) )
logger.info("Push notification sent successfully") logger.info("Push notification sent successfully")
except WebPushException as e: return True
except Exception as e:
logger.error(f"Push failed: {str(e)}") logger.error(f"Push failed: {str(e)}")
if 'Invalid JWT' in str(e): return False
logger.error("VAPID key validation failed")
raise
async def handle_button1(self): async def handle_button1(self):
"""Handle first button action - e.g., Home Lights On""" """Handle first button action - e.g., Home Lights On"""