Athena-API / main.py
rairo's picture
Update main.py
f6ae6a2 verified
raw
history blame
21 kB
import os
import io
import uuid
import re
import time
import json
import traceback
import math
import requests
import threading
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
import firebase_admin
from firebase_admin import credentials, db, storage, auth
from PIL import Image
from google import genai
from google.genai import types
# -----------------------------------------------------------------------------
# 0. LOGGING CONFIGURATION
# -----------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("SOZO_ATHENA")
# -----------------------------------------------------------------------------
# 1. CONFIGURATION & INITIALIZATION
# -----------------------------------------------------------------------------
app = Flask(__name__)
CORS(app)
# --- Firebase Initialization ---
try:
logger.info("Initializing Firebase Admin SDK...")
credentials_json_string = os.environ.get("FIREBASE")
if not credentials_json_string:
raise ValueError("The FIREBASE environment variable is not set.")
credentials_json = json.loads(credentials_json_string)
firebase_db_url = os.environ.get("Firebase_DB")
firebase_storage_bucket = os.environ.get("Firebase_Storage")
cred = credentials.Certificate(credentials_json)
firebase_admin.initialize_app(cred, {
'databaseURL': firebase_db_url,
'storageBucket': firebase_storage_bucket
})
logger.info("Firebase Admin SDK initialized successfully.")
except Exception as e:
logger.error(f"FATAL: Error initializing Firebase: {e}")
exit(1)
bucket = storage.bucket()
db_ref = db.reference()
# --- Google GenAI Client Initialization (Gemini 3.0 Flash) ---
try:
logger.info("Initializing Google GenAI Client...")
api_key = os.environ.get("Gemini")
if not api_key:
raise ValueError("The 'Gemini' API key is not set.")
client = genai.Client(api_key=api_key)
logger.info("Google GenAI (Gemini 3.0) Client initialized successfully.")
except Exception as e:
logger.error(f"FATAL: Error initializing GenAI Client: {e}")
exit(1)
# Model Constants
ATHENA_MODEL = "gemini-3-flash"
BRIEFING_MODEL = "gemini-3-flash"
# Grounding API Keys
WOLFRAM_APP_ID = os.environ.get("WOLFRAM_APP_ID")
OPENALEX_MAILTO = os.environ.get("OPENALEX_MAILTO", "rairo@sozofix.tech")
# -----------------------------------------------------------------------------
# 2. HELPER FUNCTIONS & GROUNDING
# -----------------------------------------------------------------------------
def verify_token(auth_header):
if not auth_header or not auth_header.startswith('Bearer '):
logger.warning("Missing or invalid Authorization header.")
return None
token = auth_header.split('Bearer ')[1]
try:
decoded_token = auth.verify_id_token(token)
return decoded_token['uid']
except Exception as e:
logger.error(f"Token verification failed: {e}")
return None
def verify_admin(auth_header):
uid = verify_token(auth_header)
if not uid: raise PermissionError('Invalid or missing token')
user_data = db_ref.child(f'users/{uid}').get()
if not user_data or not user_data.get('is_admin', False):
logger.warning(f"Admin access denied for user: {uid}")
raise PermissionError('Admin access required')
return uid
def upload_to_storage(data_bytes, destination_blob_name, content_type):
try:
blob = bucket.blob(destination_blob_name)
blob.upload_from_string(data_bytes, content_type=content_type)
blob.make_public()
return blob.public_url
except Exception as e:
logger.error(f"Failed to upload to Firebase Storage: {e}")
return None
def query_wolfram_alpha(query):
logger.info(f"Querying Wolfram Alpha: {query}")
if not WOLFRAM_APP_ID:
logger.warning("WOLFRAM_APP_ID missing.")
return "Wolfram|Alpha grounding unavailable."
try:
url = f"http://api.wolframalpha.com/v1/result?appid={WOLFRAM_APP_ID}&i={query}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
logger.info("Wolfram Alpha check successful.")
return response.text
else:
logger.warning(f"Wolfram Alpha returned status {response.status_code}")
return "Fact-check pending."
except Exception as e:
logger.error(f"Wolfram Alpha query failed: {e}")
return "Grounding timeout."
def query_openalex(topic):
logger.info(f"Querying OpenAlex for topic: {topic}")
try:
url = f"https://api.openalex.org/works?search={topic}&mailto={OPENALEX_MAILTO}"
resp = requests.get(url, timeout=5).json()
results = resp.get('results', [])
logger.info(f"OpenAlex found {len(results)} results.")
return [{"title": r['title'], "url": r['doi'] or r['id'], "year": r['publication_year']} for r in results[:3]]
except Exception as e:
logger.error(f"OpenAlex query failed: {e}")
return []
# -----------------------------------------------------------------------------
# 3. ASYNCHRONOUS VOICE ORCHESTRATION (ATHENA VOICE)
# -----------------------------------------------------------------------------
def generate_single_narration(text, uid, epiphany_id, layer_name):
"""Deepgram Aura-Luna generation for a single layer."""
try:
logger.info(f"Generating TTS for layer: {layer_name}")
api_key = os.environ.get("DEEPGRAM_API_KEY")
if not api_key:
logger.error("DEEPGRAM_API_KEY is not set.")
return layer_name, None
DEEPGRAM_URL = "https://api.deepgram.com/v1/speak?model=aura-luna-en"
headers = {"Authorization": f"Token {api_key}", "Content-Type": "text/plain"}
response = requests.post(DEEPGRAM_URL, headers=headers, data=text.encode('utf-8'))
response.raise_for_status()
audio_path = f"users/{uid}/epiphanies/{epiphany_id}/narrations/{layer_name}.mp3"
url = upload_to_storage(response.content, audio_path, 'audio/mpeg')
logger.info(f"TTS Uploaded: {url}")
return layer_name, url
except Exception as e:
logger.error(f"TTS Error [{layer_name}]: {e}")
return layer_name, None
def generate_all_narrations_async(data_dict, uid, epiphany_id):
"""Uses ThreadPoolExecutor to generate all 4 layers in parallel."""
logger.info(f"Starting parallel TTS generation for epiphany: {epiphany_id}")
layers = ['genesis', 'scientific_core', 'engineering_edge', 'cross_pollination']
results = {}
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(generate_single_narration, data_dict[l], uid, epiphany_id, l) for l in layers]
for f in futures:
layer, url = f.result()
results[layer] = url
return results
# -----------------------------------------------------------------------------
# 4. EPIPHANY SYNTHESIS (THE DISCOVERY ENDPOINTS)
# -----------------------------------------------------------------------------
@app.route('/api/epiphany/generate', methods=['POST'])
def generate_epiphany():
logger.info(">>> START generate_epiphany request")
uid = verify_token(request.headers.get('Authorization'))
if not uid: return jsonify({'error': 'Unauthorized'}), 401
logger.info(f"User validated: {uid}")
user_ref = db_ref.child(f'users/{uid}')
user_data = user_ref.get()
if not user_data or user_data.get('credits', 0) < 1:
logger.warning(f"User {uid} has insufficient credits.")
return jsonify({'error': 'Insufficient sparks for an Epiphany.'}), 402
if 'image' not in request.files:
logger.error("No image file provided in request.")
return jsonify({'error': 'Visual input is required.'}), 400
image_file = request.files['image']
image_bytes = image_file.read()
try:
pil_image = Image.open(io.BytesIO(image_bytes)).convert('RGB')
logger.info("Image bytes successfully converted to PIL.")
# Step 1: Rapid Identification
logger.info("Step 1: Running Rapid Identification...")
id_prompt = "Identify this precisely. If biological, include the Latin name. If mechanical, include the inventor if known. Reply with ONLY the name."
id_response = client.models.generate_content(model=ATHENA_MODEL, contents=[id_prompt, pil_image])
subject = id_response.text.strip()
logger.info(f"Subject Identified: {subject}")
# Step 2: Grounding (Parallel Dispatch)
logger.info("Step 2: Grounding Dispatch...")
physics_fact = query_wolfram_alpha(f"constants of {subject}")
papers = query_openalex(subject)
# Step 3: Synthesis (Feynman Technique)
logger.info("Step 3: Generating Feynman Synthesis JSON...")
synthesis_prompt = f"""
Act as Athena, the Systems Synthesizer. Reveal the first principles of '{subject}'.
Style: Richard Feynman. Simple analogies, profound scientific truths.
Grounding Context: {physics_fact}
Output JSON:
{{
"title": "Epic 3-word title",
"genesis": "The origin and the core 'Aha!' moment of this system.",
"scientific_core": "The first principles of physics/biology at play here.",
"engineering_edge": "Detailed analysis of material, stress, or evolutionary advantage.",
"cross_pollination": "How this principle applies to a completely different field."
}}
"""
synth_response = client.models.generate_content(
model=ATHENA_MODEL,
contents=[synthesis_prompt, pil_image],
config=types.GenerateContentConfig(response_mime_type='application/json')
)
# Log raw response for debugging 500s
raw_text = synth_response.text
logger.info(f"Raw Synthesis Response: {raw_text}")
# Cleaning up markdown code blocks if present
if raw_text.startswith("```json"):
raw_text = raw_text.replace("```json", "").replace("```", "").strip()
data = json.loads(raw_text)
epiphany_id = str(uuid.uuid4())
logger.info(f"Epiphany ID generated: {epiphany_id}")
# Step 4: Parallel Audio Generation
logger.info("Step 4: Running Parallel TTS...")
narration_urls = generate_all_narrations_async(data, uid, epiphany_id)
# Step 5: Persistence
logger.info("Step 5: Finalizing record and uploading image...")
image_url = upload_to_storage(image_bytes, f"users/{uid}/epiphanies/{epiphany_id}/vision.jpg", 'image/jpeg')
epiphany_record = {
"epiphanyId": epiphany_id,
"uid": uid,
"title": data.get('title', 'Unknown Discovery'),
"subject": subject,
"imageURL": image_url,
"layers": {
"genesis": {"text": data.get('genesis', ''), "audio": narration_urls.get('genesis')},
"scientific_core": {"text": data.get('scientific_core', ''), "audio": narration_urls.get('scientific_core')},
"engineering_edge": {"text": data.get('engineering_edge', ''), "audio": narration_urls.get('engineering_edge')},
"cross_pollination": {"text": data.get('cross_pollination', ''), "audio": narration_urls.get('cross_pollination')}
},
"grounding": {"physics": physics_fact, "papers": papers},
"createdAt": datetime.utcnow().isoformat()
}
db_ref.child(f'epiphanies/{epiphany_id}').set(epiphany_record)
user_ref.update({'credits': user_data.get('credits', 0) - 1})
logger.info(f"EP-GEN SUCCESS: {epiphany_id}")
return jsonify(epiphany_record), 201
except json.JSONDecodeError as je:
logger.error(f"JSON Parsing Error: {je}. Raw output was: {synth_response.text}")
return jsonify({'error': 'AI generated invalid JSON. Try again.'}), 500
except Exception as e:
logger.error(f"Generate Epiphany Global Error: {e}")
logger.error(traceback.format_exc())
return jsonify({'error': str(e)}), 500
@app.route('/api/epiphany/deep-dive', methods=['POST'])
def deep_dive():
logger.info(">>> START deep-dive request")
uid = verify_token(request.headers.get('Authorization'))
if not uid: return jsonify({'error': 'Unauthorized'}), 401
image_file = request.files['image']
try:
pil_image = Image.open(io.BytesIO(image_file.read())).convert('RGB')
dive_prompt = "Act as an Engineering Lead. In 50 words, explain the microscopic or mechanical significance of this specific detail."
res = client.models.generate_content(model=ATHENA_MODEL, contents=[dive_prompt, pil_image])
logger.info("Deep dive reasoning completed.")
user_ref = db_ref.child(f'users/{uid}')
user_ref.update({'credits': max(0, user_ref.get().get('credits', 0) - 1)})
return jsonify({"analysis": res.text.strip()}), 200
except Exception as e:
logger.error(f"Deep Dive Error: {e}")
return jsonify({'error': str(e)}), 500
# -----------------------------------------------------------------------------
# 5. THE CHIRON MENTOR (INTERACTION & MEMORY)
# -----------------------------------------------------------------------------
@app.route('/api/user/call-briefing', methods=['GET'])
def get_chiron_briefing():
logger.info(">>> START get_chiron_briefing request")
uid = verify_token(request.headers.get('Authorization'))
if not uid: return jsonify({'error': 'Unauthorized'}), 401
try:
last_epiphany = db_ref.child('epiphanies').order_by_child('uid').equal_to(uid).limit_to_last(1).get() or {}
context_data = ""
if last_epiphany:
eid = list(last_epiphany.keys())[0]
e_data = last_epiphany[eid]
papers = e_data.get('grounding', {}).get('papers', [])
paper_titles = [p['title'] for p in papers]
context_data = f"Current focus: {e_data['subject']}. Recent papers unlocked: {', '.join(paper_titles)}."
logger.info(f"Generating briefing with context: {context_data}")
brief_prompt = f"""
Prep Chiron, the Socratic Mentor.
User Context: {context_data}
Write a 4-sentence brief for Chiron. He must reference the scientific papers or topics the user just explored.
"""
response = client.models.generate_content(model=BRIEFING_MODEL, contents=[brief_prompt])
return jsonify({"memory_summary": response.text.strip(), "grounding_context": context_data}), 200
except Exception as e:
logger.error(f"Call Briefing Error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/log-call-usage', methods=['POST'])
def log_call_usage():
logger.info(">>> START log_call_usage request")
uid = verify_token(request.headers.get('Authorization'))
if not uid: return jsonify({'error': 'Unauthorized'}), 401
data = request.get_json()
duration = data.get("durationSeconds", 0)
transcript = data.get("transcript", "")
cost = math.ceil(duration / 60) * 3
logger.info(f"Call ended for {uid}. Duration: {duration}s, Cost: {cost} sparks.")
try:
user_ref = db_ref.child(f'users/{uid}')
user_data = user_ref.get()
new_bal = max(0, user_data.get('credits', 0) - cost)
user_ref.update({'credits': new_bal})
if transcript:
db_ref.child(f'transcripts/{uid}').push({
"text": transcript,
"createdAt": datetime.utcnow().isoformat()
})
logger.info("Transcript saved to database.")
return jsonify({"success": True, "remainingCredits": new_bal}), 200
except Exception as e:
logger.error(f"Usage Logging Error: {e}")
return jsonify({'error': str(e)}), 500
# -----------------------------------------------------------------------------
# 6. ADMIN, FEEDBACK & CREDITS
# -----------------------------------------------------------------------------
@app.route('/api/feedback', methods=['POST'])
def submit_feedback():
uid = verify_token(request.headers.get('Authorization'))
if not uid: return jsonify({'error': 'Unauthorized'}), 401
data = request.get_json()
try:
fb_ref = db_ref.child('feedback').push()
fb_ref.set({
"userId": uid,
"message": data.get('message'),
"type": data.get('type', 'general'),
"status": "open",
"createdAt": datetime.utcnow().isoformat()
})
logger.info(f"Feedback received from user {uid}.")
return jsonify({"success": True}), 201
except Exception as e:
logger.error(f"Feedback Submission Error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/user/request-credits', methods=['POST'])
def request_credits():
uid = verify_token(request.headers.get('Authorization'))
if not uid: return jsonify({'error': 'Unauthorized'}), 401
data = request.get_json()
try:
req_ref = db_ref.child('credit_requests').push()
req_ref.set({
"userId": uid,
"requested_amount": data.get('amount', 50),
"status": "pending",
"createdAt": datetime.utcnow().isoformat()
})
logger.info(f"Spark request logged for user {uid}.")
return jsonify({"success": True}), 201
except Exception as e:
logger.error(f"Credit Request Error: {e}")
return jsonify({'error': str(e)}), 500
# -----------------------------------------------------------------------------
# 7. AUTHENTICATION & PROFILE
# -----------------------------------------------------------------------------
@app.route('/api/auth/signup', methods=['POST'])
def signup():
logger.info(">>> START signup sync")
uid = verify_token(request.headers.get('Authorization'))
if not uid:
return jsonify({'error': 'Invalid or expired token'}), 401
try:
user_ref = db_ref.child(f'users/{uid}')
user_data = user_ref.get()
if user_data:
logger.info(f"User {uid} already exists, skipping initialization.")
return jsonify({'uid': uid, **user_data}), 200
data = request.get_json()
new_user_record = {
'email': data.get('email'),
'displayName': data.get('displayName', 'Seeker'),
'credits': 30,
'is_admin': False,
'createdAt': datetime.utcnow().isoformat()
}
user_ref.set(new_user_record)
logger.info(f"New seeker initialized: {uid} with 30 Sparks.")
return jsonify({'success': True, 'uid': uid, **new_user_record}), 201
except Exception as e:
logger.error(f"Signup Sync Error: {e}")
return jsonify({'error': str(e)}), 400
@app.route('/api/auth/social-signin', methods=['POST'])
def social_signin():
uid = verify_token(request.headers.get('Authorization'))
if not uid: return jsonify({'error': 'Invalid token'}), 401
user_ref = db_ref.child(f'users/{uid}')
user_data = user_ref.get()
if not user_data:
logger.info(f"Initializing social user: {uid}")
firebase_user = auth.get_user(uid)
user_data = {
'email': firebase_user.email,
'displayName': firebase_user.display_name or 'Seeker',
'credits': 30,
'is_admin': False,
'createdAt': datetime.utcnow().isoformat()
}
user_ref.set(user_data)
return jsonify({'uid': uid, **user_data}), 200
@app.route('/api/user/profile', methods=['GET'])
def get_profile():
uid = verify_token(request.headers.get('Authorization'))
if not uid: return jsonify({'error': 'Unauthorized'}), 401
return jsonify(db_ref.child(f'users/{uid}').get())
@app.route('/api/epiphanies', methods=['GET'])
def list_epiphanies():
uid = verify_token(request.headers.get('Authorization'))
if not uid: return jsonify({'error': 'Unauthorized'}), 401
logger.info(f"Listing epiphanies for user {uid}.")
results = db_ref.child('epiphanies').order_by_child('uid').equal_to(uid).get() or {}
return jsonify(list(results.values()))
# -----------------------------------------------------------------------------
# 8. MAIN EXECUTION
# -----------------------------------------------------------------------------
if __name__ == '__main__':
# Standard HF Port 7860
logger.info("Starting Sozo Athena Server on port 7860...")
app.run(debug=False, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))