from flask import Flask, request import requests import json import os import uuid from db import fetch_json_from_github from qwen import get_qwen_response from upload import upload_image_to_cdn, upload_audio_to_cdn from admin_messages import extract_admin_message, save_admin_message app = Flask(__name__) # Cache for user profiles to reduce API calls user_profile_cache = {} chat_histories = {} BASE_URL = "https://chat.qwen.ai" AUTH_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6ImRjOGQyYzY4LWZjNmEtNDEwYy05NWZjLWQ5MDBmNTM4ZTMwMiIsImV4cCI6MTc0NjM1OTExMH0.ms8Agbit2ObnTMJyeW_dUbk-tV_ON_dc-RjDKCDLAwA" AI_SERVICE_URL = "https://aoamrnuwara.pythonanywhere.com/send-message" AI_SERVICE_TOKEN = "123400" # Proxy API endpoint URL PROXY_API_URL = "https://aoamrnuwara.pythonanywhere.com/get-user-profile" @app.route("/webhook", methods=["GET"]) def verify_webhook(): mode = request.args.get("hub.mode") token = request.args.get("hub.verify_token") challenge = request.args.get("hub.challenge") if mode == "subscribe" and token == AI_SERVICE_TOKEN: return challenge else: print(f"Verification failed: mode={mode}, token={token}") # Debug log return "Verification failed", 403 def get_user_profile(sender_id, page_token): """Get user profile with fallback mechanisms""" # Check cache first cache_key = f"{sender_id}_{page_token[:10]}" # Use first 10 chars of token for cache key if cache_key in user_profile_cache: print(f"Using cached profile for {sender_id}") return user_profile_cache[cache_key] # Default fallback name user_full_name = "User" try: proxy_params = { "sender_id": sender_id, "page_token": page_token } # Add timeout to prevent hanging proxy_response = requests.get(PROXY_API_URL, params=proxy_params, timeout=5) # Even if the request fails with non-200 status, continue with default name if proxy_response.status_code == 200: proxy_data = proxy_response.json() if proxy_data.get("success", False): user_full_name = proxy_data.get("full_name", "User") # Cache the successful result user_profile_cache[cache_key] = user_full_name else: print(f"Proxy API returned error: {proxy_data.get('error', 'Unknown error')}") else: print(f"Proxy API returned status code: {proxy_response.status_code}") except requests.exceptions.Timeout: print(f"Timeout when calling proxy API for user {sender_id}") except requests.exceptions.RequestException as e: print(f"Error calling proxy API: {str(e)}") except Exception as e: print(f"Unexpected error getting user profile: {str(e)}") return user_full_name @app.route("/webhook", methods=["POST"]) def handle_message(): body = request.get_json() # Fetch the latest JSON data from GitHub for dynamic updates try: github_response = fetch_json_from_github() if github_response["success"]: PAGE_CREDENTIALS = {page['page_id']: page for page in github_response["data"]["pages"]} print("Successfully loaded page credentials from GitHub.") else: print(f"Error loading page credentials: {github_response['message']}") PAGE_CREDENTIALS = {} except Exception as e: print(f"Unexpected error while fetching credentials: {e}") PAGE_CREDENTIALS = {} for entry in body["entry"]: page_id = entry["id"] page_config = PAGE_CREDENTIALS.get(page_id, {}) page_token = page_config.get('page_token') # Check if page exists in credentials if not page_token: print(f"Page ID {page_id} not found in credentials. Skipping...") continue # Check page status - if OFF, skip processing page_status = page_config.get('status', 'OFF') if page_status != "ON": print(f"Page ID {page_id} is turned OFF. Skipping...") continue # Check subscription status subscription = page_config.get('subscription', 'NONE') expiration = page_config.get('expiration', 'NONE') # Check expiration date for any subscription type that has one if subscription in ["Basic", "Plus"] and expiration != "NONE": from datetime import datetime try: expiration_date = datetime.strptime(expiration, "%Y-%m-%d") current_date = datetime.now() if current_date.date() > expiration_date.date(): print(f"Page ID {page_id} has an expired {subscription} subscription. Skipping...") continue except Exception as e: print(f"Error checking expiration date: {str(e)}") # If there's an error parsing the date, we should skip processing continue # If no subscription, skip processing if subscription == "NONE": print(f"Page ID {page_id} has no subscription. Skipping...") continue # Fetch the system prompt dynamically for each request system_prompt = page_config.get('system', 'Default system prompt') for messaging_event in entry["messaging"]: if "message" in messaging_event: sender_id = messaging_event["sender"]["id"] message = messaging_event["message"] # Get user profile with improved error handling user_full_name = get_user_profile(sender_id, page_token) # Get or create chat history for this user user_history = chat_histories.setdefault(sender_id, { 'history': [], 'system_prompt': system_prompt # Dynamically update system prompt }) # Update the system prompt dynamically user_history['system_prompt'] = system_prompt image_cdn_urls = [] audio_cdn_urls = [] attachments = message.get("attachments", []) text_content = message.get("text", "") # Process attachments based on subscription level if subscription == "Plus": # Plus subscribers can send images and audio for attachment in attachments: if attachment["type"] == "image": image_url = attachment["payload"]["url"] try: img_response = requests.get(image_url, timeout=10) img_response.raise_for_status() temp_path = f"temp_{uuid.uuid4()}.jpg" with open(temp_path, "wb") as f: f.write(img_response.content) cdn_url = upload_image_to_cdn(BASE_URL, AUTH_TOKEN, temp_path) if cdn_url: image_cdn_urls.append(cdn_url) print(f"Image CDN URL: {cdn_url}") except Exception as e: print(f"Image processing error: {str(e)}") finally: if os.path.exists(temp_path): os.remove(temp_path) elif attachment["type"] == "audio": audio_url = attachment["payload"]["url"] try: audio_response = requests.get(audio_url, timeout=10) audio_response.raise_for_status() # Extract file extension from the original filename original_filename = attachment.get("name", "audio.mp3") file_extension = os.path.splitext(original_filename)[1].lower() if not file_extension: file_extension = ".mp3" # Default to MP3 temp_path = f"temp_{uuid.uuid4()}{file_extension}" with open(temp_path, "wb") as f: f.write(audio_response.content) cdn_url = upload_audio_to_cdn(BASE_URL, AUTH_TOKEN, temp_path) if cdn_url: audio_cdn_urls.append(cdn_url) print(f"Audio CDN URL: {cdn_url}") except Exception as e: print(f"Audio processing error: {str(e)}") finally: if os.path.exists(temp_path): os.remove(temp_path) elif subscription == "Basic": # Basic subscribers can only send text if attachments: print(f"Basic subscriber tried to send attachments. Skipping...") continue # Create content based on subscription level content = [] if subscription == "Plus": if image_cdn_urls: content.append({"type": "text", "text": f"{user_full_name} sent an image"}) for cdn_url in image_cdn_urls: content.append({"type": "image", "image": cdn_url}) elif audio_cdn_urls: content.append({"type": "text", "text": f"{user_full_name} sent an audio file"}) for cdn_url in audio_cdn_urls: content.append({"type": "audio", "audio": cdn_url}) elif text_content: content.append({"type": "text", "text": f"{user_full_name}: {text_content}"}) else: continue elif subscription == "Basic": if text_content: content.append({"type": "text", "text": f"{user_full_name}: {text_content}"}) else: continue # Append the user's message to the chat history user_history['history'].append({ "role": "user", "content": content }) # Ensure only the last 5 messages are retained in the chat history while len(user_history['history']) > 5: user_history['history'].pop(0) try: # Generate AI response using the dynamically updated system prompt ai_response = get_qwen_response( system_prompt=user_history['system_prompt'], chat_history=user_history['history'] ) # Check if the AI response contains an admin message admin_message, cleaned_response = extract_admin_message(ai_response) # Always add the original response to chat history (with admin tags if present) user_history['history'].append({ "role": "assistant", "content": [{"type": "text", "text": ai_response}] }) # If there's an admin message, save it to the GitHub JSON file if admin_message: print(f"Admin message detected: {admin_message}") save_admin_message(page_id, admin_message, sender_id, user_full_name) # Send the cleaned response (without admin tags) to the user response_to_send = cleaned_response else: # No admin message, send the original response response_to_send = ai_response # Send the appropriate response back to the user try: headers = { "Authorization": f"Bearer {AI_SERVICE_TOKEN}", "Content-Type": "application/json" } payload = { "recipient_id": sender_id, "message_text": response_to_send, "page_access_token": page_token } send_response = requests.post(AI_SERVICE_URL, json=payload, headers=headers, timeout=10) send_response.raise_for_status() print(f"Response sent successfully to {sender_id}") except Exception as e: print(f"Error sending response to user: {str(e)}") except Exception as e: print(f"Error generating AI response: {str(e)}") # Send a fallback message if AI response generation fails try: headers = { "Authorization": f"Bearer {AI_SERVICE_TOKEN}", "Content-Type": "application/json" } payload = { "recipient_id": sender_id, "message_text": "I'm sorry, I'm having trouble processing your request right now. Please try again later.", "page_access_token": page_token } requests.post(AI_SERVICE_URL, json=payload, headers=headers, timeout=10) except Exception as send_err: print(f"Error sending fallback message: {str(send_err)}") return "OK", 200 if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=True)