Spaces:
Sleeping
Sleeping
| from flask import Flask, request | |
| import requests | |
| import json | |
| import os | |
| import uuid | |
| from db import fetch_json_from_github | |
| from qwen import get_qwen_response | |
| from upload import upload_image_to_cdn, upload_audio_to_cdn | |
| from admin_messages import extract_admin_message, save_admin_message | |
| app = Flask(__name__) | |
| # Cache for user profiles to reduce API calls | |
| user_profile_cache = {} | |
| chat_histories = {} | |
| BASE_URL = "https://chat.qwen.ai" | |
| AUTH_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6ImRjOGQyYzY4LWZjNmEtNDEwYy05NWZjLWQ5MDBmNTM4ZTMwMiIsImV4cCI6MTc0NjM1OTExMH0.ms8Agbit2ObnTMJyeW_dUbk-tV_ON_dc-RjDKCDLAwA" | |
| AI_SERVICE_URL = "https://aoamrnuwara.pythonanywhere.com/send-message" | |
| AI_SERVICE_TOKEN = "123400" | |
| # Proxy API endpoint URL | |
| PROXY_API_URL = "https://aoamrnuwara.pythonanywhere.com/get-user-profile" | |
| def verify_webhook(): | |
| mode = request.args.get("hub.mode") | |
| token = request.args.get("hub.verify_token") | |
| challenge = request.args.get("hub.challenge") | |
| if mode == "subscribe" and token == AI_SERVICE_TOKEN: | |
| return challenge | |
| else: | |
| print(f"Verification failed: mode={mode}, token={token}") # Debug log | |
| return "Verification failed", 403 | |
| def get_user_profile(sender_id, page_token): | |
| """Get user profile with fallback mechanisms""" | |
| # Check cache first | |
| cache_key = f"{sender_id}_{page_token[:10]}" # Use first 10 chars of token for cache key | |
| if cache_key in user_profile_cache: | |
| print(f"Using cached profile for {sender_id}") | |
| return user_profile_cache[cache_key] | |
| # Default fallback name | |
| user_full_name = "User" | |
| try: | |
| proxy_params = { | |
| "sender_id": sender_id, | |
| "page_token": page_token | |
| } | |
| # Add timeout to prevent hanging | |
| proxy_response = requests.get(PROXY_API_URL, params=proxy_params, timeout=5) | |
| # Even if the request fails with non-200 status, continue with default name | |
| if proxy_response.status_code == 200: | |
| proxy_data = proxy_response.json() | |
| if proxy_data.get("success", False): | |
| user_full_name = proxy_data.get("full_name", "User") | |
| # Cache the successful result | |
| user_profile_cache[cache_key] = user_full_name | |
| else: | |
| print(f"Proxy API returned error: {proxy_data.get('error', 'Unknown error')}") | |
| else: | |
| print(f"Proxy API returned status code: {proxy_response.status_code}") | |
| except requests.exceptions.Timeout: | |
| print(f"Timeout when calling proxy API for user {sender_id}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error calling proxy API: {str(e)}") | |
| except Exception as e: | |
| print(f"Unexpected error getting user profile: {str(e)}") | |
| return user_full_name | |
| def handle_message(): | |
| body = request.get_json() | |
| # Fetch the latest JSON data from GitHub for dynamic updates | |
| try: | |
| github_response = fetch_json_from_github() | |
| if github_response["success"]: | |
| PAGE_CREDENTIALS = {page['page_id']: page for page in github_response["data"]["pages"]} | |
| print("Successfully loaded page credentials from GitHub.") | |
| else: | |
| print(f"Error loading page credentials: {github_response['message']}") | |
| PAGE_CREDENTIALS = {} | |
| except Exception as e: | |
| print(f"Unexpected error while fetching credentials: {e}") | |
| PAGE_CREDENTIALS = {} | |
| for entry in body["entry"]: | |
| page_id = entry["id"] | |
| page_config = PAGE_CREDENTIALS.get(page_id, {}) | |
| page_token = page_config.get('page_token') | |
| # Check if page exists in credentials | |
| if not page_token: | |
| print(f"Page ID {page_id} not found in credentials. Skipping...") | |
| continue | |
| # Check page status - if OFF, skip processing | |
| page_status = page_config.get('status', 'OFF') | |
| if page_status != "ON": | |
| print(f"Page ID {page_id} is turned OFF. Skipping...") | |
| continue | |
| # Check subscription status | |
| subscription = page_config.get('subscription', 'NONE') | |
| expiration = page_config.get('expiration', 'NONE') | |
| # Check expiration date for any subscription type that has one | |
| if subscription in ["Basic", "Plus"] and expiration != "NONE": | |
| from datetime import datetime | |
| try: | |
| expiration_date = datetime.strptime(expiration, "%Y-%m-%d") | |
| current_date = datetime.now() | |
| if current_date.date() > expiration_date.date(): | |
| print(f"Page ID {page_id} has an expired {subscription} subscription. Skipping...") | |
| continue | |
| except Exception as e: | |
| print(f"Error checking expiration date: {str(e)}") | |
| # If there's an error parsing the date, we should skip processing | |
| continue | |
| # If no subscription, skip processing | |
| if subscription == "NONE": | |
| print(f"Page ID {page_id} has no subscription. Skipping...") | |
| continue | |
| # Fetch the system prompt dynamically for each request | |
| system_prompt = page_config.get('system', 'Default system prompt') | |
| for messaging_event in entry["messaging"]: | |
| if "message" in messaging_event: | |
| sender_id = messaging_event["sender"]["id"] | |
| message = messaging_event["message"] | |
| # Get user profile with improved error handling | |
| user_full_name = get_user_profile(sender_id, page_token) | |
| # Get or create chat history for this user | |
| user_history = chat_histories.setdefault(sender_id, { | |
| 'history': [], | |
| 'system_prompt': system_prompt # Dynamically update system prompt | |
| }) | |
| # Update the system prompt dynamically | |
| user_history['system_prompt'] = system_prompt | |
| image_cdn_urls = [] | |
| audio_cdn_urls = [] | |
| attachments = message.get("attachments", []) | |
| text_content = message.get("text", "") | |
| # Process attachments based on subscription level | |
| if subscription == "Plus": | |
| # Plus subscribers can send images and audio | |
| for attachment in attachments: | |
| if attachment["type"] == "image": | |
| image_url = attachment["payload"]["url"] | |
| try: | |
| img_response = requests.get(image_url, timeout=10) | |
| img_response.raise_for_status() | |
| temp_path = f"temp_{uuid.uuid4()}.jpg" | |
| with open(temp_path, "wb") as f: | |
| f.write(img_response.content) | |
| cdn_url = upload_image_to_cdn(BASE_URL, AUTH_TOKEN, temp_path) | |
| if cdn_url: | |
| image_cdn_urls.append(cdn_url) | |
| print(f"Image CDN URL: {cdn_url}") | |
| except Exception as e: | |
| print(f"Image processing error: {str(e)}") | |
| finally: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| elif attachment["type"] == "audio": | |
| audio_url = attachment["payload"]["url"] | |
| try: | |
| audio_response = requests.get(audio_url, timeout=10) | |
| audio_response.raise_for_status() | |
| # Extract file extension from the original filename | |
| original_filename = attachment.get("name", "audio.mp3") | |
| file_extension = os.path.splitext(original_filename)[1].lower() | |
| if not file_extension: | |
| file_extension = ".mp3" # Default to MP3 | |
| temp_path = f"temp_{uuid.uuid4()}{file_extension}" | |
| with open(temp_path, "wb") as f: | |
| f.write(audio_response.content) | |
| cdn_url = upload_audio_to_cdn(BASE_URL, AUTH_TOKEN, temp_path) | |
| if cdn_url: | |
| audio_cdn_urls.append(cdn_url) | |
| print(f"Audio CDN URL: {cdn_url}") | |
| except Exception as e: | |
| print(f"Audio processing error: {str(e)}") | |
| finally: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| elif subscription == "Basic": | |
| # Basic subscribers can only send text | |
| if attachments: | |
| print(f"Basic subscriber tried to send attachments. Skipping...") | |
| continue | |
| # Create content based on subscription level | |
| content = [] | |
| if subscription == "Plus": | |
| if image_cdn_urls: | |
| content.append({"type": "text", "text": f"{user_full_name} sent an image"}) | |
| for cdn_url in image_cdn_urls: | |
| content.append({"type": "image", "image": cdn_url}) | |
| elif audio_cdn_urls: | |
| content.append({"type": "text", "text": f"{user_full_name} sent an audio file"}) | |
| for cdn_url in audio_cdn_urls: | |
| content.append({"type": "audio", "audio": cdn_url}) | |
| elif text_content: | |
| content.append({"type": "text", "text": f"{user_full_name}: {text_content}"}) | |
| else: | |
| continue | |
| elif subscription == "Basic": | |
| if text_content: | |
| content.append({"type": "text", "text": f"{user_full_name}: {text_content}"}) | |
| else: | |
| continue | |
| # Append the user's message to the chat history | |
| user_history['history'].append({ | |
| "role": "user", | |
| "content": content | |
| }) | |
| # Ensure only the last 5 messages are retained in the chat history | |
| while len(user_history['history']) > 5: | |
| user_history['history'].pop(0) | |
| try: | |
| # Generate AI response using the dynamically updated system prompt | |
| ai_response = get_qwen_response( | |
| system_prompt=user_history['system_prompt'], | |
| chat_history=user_history['history'] | |
| ) | |
| # Check if the AI response contains an admin message | |
| admin_message, cleaned_response = extract_admin_message(ai_response) | |
| # Always add the original response to chat history (with admin tags if present) | |
| user_history['history'].append({ | |
| "role": "assistant", | |
| "content": [{"type": "text", "text": ai_response}] | |
| }) | |
| # If there's an admin message, save it to the GitHub JSON file | |
| if admin_message: | |
| print(f"Admin message detected: {admin_message}") | |
| save_admin_message(page_id, admin_message, sender_id, user_full_name) | |
| # Send the cleaned response (without admin tags) to the user | |
| response_to_send = cleaned_response | |
| else: | |
| # No admin message, send the original response | |
| response_to_send = ai_response | |
| # Send the appropriate response back to the user | |
| try: | |
| headers = { | |
| "Authorization": f"Bearer {AI_SERVICE_TOKEN}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "recipient_id": sender_id, | |
| "message_text": response_to_send, | |
| "page_access_token": page_token | |
| } | |
| send_response = requests.post(AI_SERVICE_URL, json=payload, headers=headers, timeout=10) | |
| send_response.raise_for_status() | |
| print(f"Response sent successfully to {sender_id}") | |
| except Exception as e: | |
| print(f"Error sending response to user: {str(e)}") | |
| except Exception as e: | |
| print(f"Error generating AI response: {str(e)}") | |
| # Send a fallback message if AI response generation fails | |
| try: | |
| headers = { | |
| "Authorization": f"Bearer {AI_SERVICE_TOKEN}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "recipient_id": sender_id, | |
| "message_text": "I'm sorry, I'm having trouble processing your request right now. Please try again later.", | |
| "page_access_token": page_token | |
| } | |
| requests.post(AI_SERVICE_URL, json=payload, headers=headers, timeout=10) | |
| except Exception as send_err: | |
| print(f"Error sending fallback message: {str(send_err)}") | |
| return "OK", 200 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860, debug=True) |