Dooratre's picture
Update app.py
539396e verified
from flask import Flask, request
import requests
import json
import os
import uuid
from db import fetch_json_from_github
from qwen import get_qwen_response
from upload import upload_image_to_cdn, upload_audio_to_cdn
from admin_messages import extract_admin_message, save_admin_message
app = Flask(__name__)
# Cache for user profiles to reduce API calls
user_profile_cache = {}
chat_histories = {}
BASE_URL = "https://chat.qwen.ai"
AUTH_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6ImRjOGQyYzY4LWZjNmEtNDEwYy05NWZjLWQ5MDBmNTM4ZTMwMiIsImV4cCI6MTc0NjM1OTExMH0.ms8Agbit2ObnTMJyeW_dUbk-tV_ON_dc-RjDKCDLAwA"
AI_SERVICE_URL = "https://aoamrnuwara.pythonanywhere.com/send-message"
AI_SERVICE_TOKEN = "123400"
# Proxy API endpoint URL
PROXY_API_URL = "https://aoamrnuwara.pythonanywhere.com/get-user-profile"
@app.route("/webhook", methods=["GET"])
def verify_webhook():
mode = request.args.get("hub.mode")
token = request.args.get("hub.verify_token")
challenge = request.args.get("hub.challenge")
if mode == "subscribe" and token == AI_SERVICE_TOKEN:
return challenge
else:
print(f"Verification failed: mode={mode}, token={token}") # Debug log
return "Verification failed", 403
def get_user_profile(sender_id, page_token):
"""Get user profile with fallback mechanisms"""
# Check cache first
cache_key = f"{sender_id}_{page_token[:10]}" # Use first 10 chars of token for cache key
if cache_key in user_profile_cache:
print(f"Using cached profile for {sender_id}")
return user_profile_cache[cache_key]
# Default fallback name
user_full_name = "User"
try:
proxy_params = {
"sender_id": sender_id,
"page_token": page_token
}
# Add timeout to prevent hanging
proxy_response = requests.get(PROXY_API_URL, params=proxy_params, timeout=5)
# Even if the request fails with non-200 status, continue with default name
if proxy_response.status_code == 200:
proxy_data = proxy_response.json()
if proxy_data.get("success", False):
user_full_name = proxy_data.get("full_name", "User")
# Cache the successful result
user_profile_cache[cache_key] = user_full_name
else:
print(f"Proxy API returned error: {proxy_data.get('error', 'Unknown error')}")
else:
print(f"Proxy API returned status code: {proxy_response.status_code}")
except requests.exceptions.Timeout:
print(f"Timeout when calling proxy API for user {sender_id}")
except requests.exceptions.RequestException as e:
print(f"Error calling proxy API: {str(e)}")
except Exception as e:
print(f"Unexpected error getting user profile: {str(e)}")
return user_full_name
@app.route("/webhook", methods=["POST"])
def handle_message():
body = request.get_json()
# Fetch the latest JSON data from GitHub for dynamic updates
try:
github_response = fetch_json_from_github()
if github_response["success"]:
PAGE_CREDENTIALS = {page['page_id']: page for page in github_response["data"]["pages"]}
print("Successfully loaded page credentials from GitHub.")
else:
print(f"Error loading page credentials: {github_response['message']}")
PAGE_CREDENTIALS = {}
except Exception as e:
print(f"Unexpected error while fetching credentials: {e}")
PAGE_CREDENTIALS = {}
for entry in body["entry"]:
page_id = entry["id"]
page_config = PAGE_CREDENTIALS.get(page_id, {})
page_token = page_config.get('page_token')
# Check if page exists in credentials
if not page_token:
print(f"Page ID {page_id} not found in credentials. Skipping...")
continue
# Check page status - if OFF, skip processing
page_status = page_config.get('status', 'OFF')
if page_status != "ON":
print(f"Page ID {page_id} is turned OFF. Skipping...")
continue
# Check subscription status
subscription = page_config.get('subscription', 'NONE')
expiration = page_config.get('expiration', 'NONE')
# Check expiration date for any subscription type that has one
if subscription in ["Basic", "Plus"] and expiration != "NONE":
from datetime import datetime
try:
expiration_date = datetime.strptime(expiration, "%Y-%m-%d")
current_date = datetime.now()
if current_date.date() > expiration_date.date():
print(f"Page ID {page_id} has an expired {subscription} subscription. Skipping...")
continue
except Exception as e:
print(f"Error checking expiration date: {str(e)}")
# If there's an error parsing the date, we should skip processing
continue
# If no subscription, skip processing
if subscription == "NONE":
print(f"Page ID {page_id} has no subscription. Skipping...")
continue
# Fetch the system prompt dynamically for each request
system_prompt = page_config.get('system', 'Default system prompt')
for messaging_event in entry["messaging"]:
if "message" in messaging_event:
sender_id = messaging_event["sender"]["id"]
message = messaging_event["message"]
# Get user profile with improved error handling
user_full_name = get_user_profile(sender_id, page_token)
# Get or create chat history for this user
user_history = chat_histories.setdefault(sender_id, {
'history': [],
'system_prompt': system_prompt # Dynamically update system prompt
})
# Update the system prompt dynamically
user_history['system_prompt'] = system_prompt
image_cdn_urls = []
audio_cdn_urls = []
attachments = message.get("attachments", [])
text_content = message.get("text", "")
# Process attachments based on subscription level
if subscription == "Plus":
# Plus subscribers can send images and audio
for attachment in attachments:
if attachment["type"] == "image":
image_url = attachment["payload"]["url"]
try:
img_response = requests.get(image_url, timeout=10)
img_response.raise_for_status()
temp_path = f"temp_{uuid.uuid4()}.jpg"
with open(temp_path, "wb") as f:
f.write(img_response.content)
cdn_url = upload_image_to_cdn(BASE_URL, AUTH_TOKEN, temp_path)
if cdn_url:
image_cdn_urls.append(cdn_url)
print(f"Image CDN URL: {cdn_url}")
except Exception as e:
print(f"Image processing error: {str(e)}")
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
elif attachment["type"] == "audio":
audio_url = attachment["payload"]["url"]
try:
audio_response = requests.get(audio_url, timeout=10)
audio_response.raise_for_status()
# Extract file extension from the original filename
original_filename = attachment.get("name", "audio.mp3")
file_extension = os.path.splitext(original_filename)[1].lower()
if not file_extension:
file_extension = ".mp3" # Default to MP3
temp_path = f"temp_{uuid.uuid4()}{file_extension}"
with open(temp_path, "wb") as f:
f.write(audio_response.content)
cdn_url = upload_audio_to_cdn(BASE_URL, AUTH_TOKEN, temp_path)
if cdn_url:
audio_cdn_urls.append(cdn_url)
print(f"Audio CDN URL: {cdn_url}")
except Exception as e:
print(f"Audio processing error: {str(e)}")
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
elif subscription == "Basic":
# Basic subscribers can only send text
if attachments:
print(f"Basic subscriber tried to send attachments. Skipping...")
continue
# Create content based on subscription level
content = []
if subscription == "Plus":
if image_cdn_urls:
content.append({"type": "text", "text": f"{user_full_name} sent an image"})
for cdn_url in image_cdn_urls:
content.append({"type": "image", "image": cdn_url})
elif audio_cdn_urls:
content.append({"type": "text", "text": f"{user_full_name} sent an audio file"})
for cdn_url in audio_cdn_urls:
content.append({"type": "audio", "audio": cdn_url})
elif text_content:
content.append({"type": "text", "text": f"{user_full_name}: {text_content}"})
else:
continue
elif subscription == "Basic":
if text_content:
content.append({"type": "text", "text": f"{user_full_name}: {text_content}"})
else:
continue
# Append the user's message to the chat history
user_history['history'].append({
"role": "user",
"content": content
})
# Ensure only the last 5 messages are retained in the chat history
while len(user_history['history']) > 5:
user_history['history'].pop(0)
try:
# Generate AI response using the dynamically updated system prompt
ai_response = get_qwen_response(
system_prompt=user_history['system_prompt'],
chat_history=user_history['history']
)
# Check if the AI response contains an admin message
admin_message, cleaned_response = extract_admin_message(ai_response)
# Always add the original response to chat history (with admin tags if present)
user_history['history'].append({
"role": "assistant",
"content": [{"type": "text", "text": ai_response}]
})
# If there's an admin message, save it to the GitHub JSON file
if admin_message:
print(f"Admin message detected: {admin_message}")
save_admin_message(page_id, admin_message, sender_id, user_full_name)
# Send the cleaned response (without admin tags) to the user
response_to_send = cleaned_response
else:
# No admin message, send the original response
response_to_send = ai_response
# Send the appropriate response back to the user
try:
headers = {
"Authorization": f"Bearer {AI_SERVICE_TOKEN}",
"Content-Type": "application/json"
}
payload = {
"recipient_id": sender_id,
"message_text": response_to_send,
"page_access_token": page_token
}
send_response = requests.post(AI_SERVICE_URL, json=payload, headers=headers, timeout=10)
send_response.raise_for_status()
print(f"Response sent successfully to {sender_id}")
except Exception as e:
print(f"Error sending response to user: {str(e)}")
except Exception as e:
print(f"Error generating AI response: {str(e)}")
# Send a fallback message if AI response generation fails
try:
headers = {
"Authorization": f"Bearer {AI_SERVICE_TOKEN}",
"Content-Type": "application/json"
}
payload = {
"recipient_id": sender_id,
"message_text": "I'm sorry, I'm having trouble processing your request right now. Please try again later.",
"page_access_token": page_token
}
requests.post(AI_SERVICE_URL, json=payload, headers=headers, timeout=10)
except Exception as send_err:
print(f"Error sending fallback message: {str(send_err)}")
return "OK", 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=True)