rewritten

This commit is contained in:
cpu
2025-03-26 09:36:58 +01:00
parent b87e30f6b4
commit fc7f4f4b7a
2 changed files with 23 additions and 26 deletions

35
app.py
View File

@@ -73,21 +73,15 @@ class FlicButtonHandler:
"""Load and strictly validate VAPID private key.""" """Load and strictly validate VAPID private key."""
try: try:
# Get and clean the key # Get and clean the key
env_key = os.getenv('VAPID_PRIVATE_KEY', '').strip().strip('"\'') env_key = os.getenv('VAPID_PRIVATE_KEY', '').strip()
# Convert to clean PEM format # Reconstruct PEM format if missing headers
if '\\n' in env_key: if not env_key.startswith('-----BEGIN PRIVATE KEY-----'):
private_pem = env_key.replace('\\n', '\n') env_key = f"-----BEGIN PRIVATE KEY-----\n{env_key}\n-----END PRIVATE KEY-----"
else:
private_pem = env_key # Strict validation and key preparation
# Ensure proper PEM headers
if not private_pem.startswith('-----BEGIN PRIVATE KEY-----'):
private_pem = f"-----BEGIN PRIVATE KEY-----\n{private_pem}\n-----END PRIVATE KEY-----"
# Strict validation
key = serialization.load_pem_private_key( key = serialization.load_pem_private_key(
private_pem.encode('utf-8'), env_key.encode('utf-8'),
password=None, password=None,
backend=default_backend() backend=default_backend()
) )
@@ -124,14 +118,16 @@ class FlicButtonHandler:
async def send_push_notification(self, subscription: Dict, message: str): async def send_push_notification(self, subscription: Dict, message: str):
try: try:
# Get endpoint base for aud claim # Determine audience (aud) claim for VAPID
endpoint = subscription['endpoint'] endpoint = subscription['endpoint']
aud = endpoint.split('/send')[0] if '/send' in endpoint else endpoint.split('/fcm/send')[0] aud = (endpoint.split('/send')[0] if '/send' in endpoint
else endpoint.split('/fcm/send')[0])
logger.debug(f"Sending to: {endpoint[:50]}...") logger.debug(f"Sending to: {endpoint[:50]}...")
logger.debug(f"Using aud: {aud}") logger.debug(f"Using aud: {aud}")
webpush( # Perform web push
result = webpush(
subscription_info=subscription, subscription_info=subscription,
data=message, data=message,
vapid_private_key=self.vapid_private_key, vapid_private_key=self.vapid_private_key,
@@ -145,6 +141,8 @@ class FlicButtonHandler:
return True return True
except Exception as e: except Exception as e:
logger.error(f"Push failed: {str(e)}") logger.error(f"Push failed: {str(e)}")
logger.error(f"Endpoint details: {subscription['endpoint']}")
logger.error(f"Keys: {subscription.get('keys', 'No keys found')}")
return False return False
async def handle_button1(self): async def handle_button1(self):
@@ -174,8 +172,9 @@ class FlicButtonHandler:
success_count = 0 success_count = 0
for subscription in self.subscriptions: for subscription in self.subscriptions:
try: try:
await self.send_push_notification(subscription, message) success = await self.send_push_notification(subscription, message)
success_count += 1 if success:
success_count += 1
except Exception as e: except Exception as e:
logger.error(f"Failed to send to {subscription['endpoint'][:30]}...: {str(e)}") logger.error(f"Failed to send to {subscription['endpoint'][:30]}...: {str(e)}")

View File

@@ -3,6 +3,7 @@ import os
import base64 import base64
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
def generate_vapid_keys(): def generate_vapid_keys():
""" """
@@ -29,17 +30,17 @@ def generate_vapid_keys():
if need_new_keys: if need_new_keys:
# Generate EC private key # Generate EC private key
private_key = ec.generate_private_key(ec.SECP256R1()) private_key = ec.generate_private_key(ec.SECP256R1(), backend=default_backend())
# Serialize private key to PEM format # Serialize private key to PEM format, but keep it clean
private_pem = private_key.private_bytes( private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8, format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption() encryption_algorithm=serialization.NoEncryption()
).decode('utf-8') ).decode('utf-8')
# Format for .env file (replace newlines with \n) # Clean up PEM formatting for .env file
env_private_key = private_pem.strip().replace('\n', '\\n') private_pem_clean = private_pem.replace('-----BEGIN PRIVATE KEY-----\n', '').replace('\n-----END PRIVATE KEY-----\n', '').replace('\n', '')
# Get public key # Get public key
public_key = private_key.public_key() public_key = private_key.public_key()
@@ -49,15 +50,12 @@ def generate_vapid_keys():
) )
# Store keys # Store keys
env_vars['VAPID_PRIVATE_KEY'] = env_private_key # Single-line format env_vars['VAPID_PRIVATE_KEY'] = private_pem_clean
env_vars['VAPID_PUBLIC_KEY'] = base64.urlsafe_b64encode(public_key_bytes).decode('utf-8') env_vars['VAPID_PUBLIC_KEY'] = base64.urlsafe_b64encode(public_key_bytes).decode('utf-8')
print("New VAPID keys generated in .env-compatible format.") print("New VAPID keys generated in .env-compatible format.")
else: else:
print("Existing VAPID keys found - no changes made.") print("Existing VAPID keys found - no changes made.")
# Verify existing key format
if '-----BEGIN PRIVATE KEY-----' not in env_vars['VAPID_PRIVATE_KEY']:
print("Warning: Existing private key doesn't appear to be in PEM format!")
# Ensure we have all required configuration variables with defaults if missing # Ensure we have all required configuration variables with defaults if missing
defaults = { defaults = {