Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import uuid | |
| import re | |
| import time | |
| import json | |
| import traceback | |
| import math | |
| import requests | |
| import threading | |
| import logging | |
| from concurrent.futures import ThreadPoolExecutor | |
| from datetime import datetime, timedelta | |
| from flask import Flask, request, jsonify, Response | |
| from flask_cors import CORS | |
| import firebase_admin | |
| from firebase_admin import credentials, db, storage, auth | |
| from PIL import Image | |
| from google import genai | |
| from google.genai import types | |
| # ----------------------------------------------------------------------------- | |
| # 0. LOGGING CONFIGURATION | |
| # ----------------------------------------------------------------------------- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger("SOZO_ATHENA") | |
| # ----------------------------------------------------------------------------- | |
| # 1. CONFIGURATION & INITIALIZATION | |
| # ----------------------------------------------------------------------------- | |
| app = Flask(__name__) | |
| CORS(app) | |
| # --- Firebase Initialization --- | |
| try: | |
| logger.info("Initializing Firebase Admin SDK...") | |
| credentials_json_string = os.environ.get("FIREBASE") | |
| if not credentials_json_string: | |
| raise ValueError("The FIREBASE environment variable is not set.") | |
| credentials_json = json.loads(credentials_json_string) | |
| firebase_db_url = os.environ.get("Firebase_DB") | |
| firebase_storage_bucket = os.environ.get("Firebase_Storage") | |
| cred = credentials.Certificate(credentials_json) | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': firebase_db_url, | |
| 'storageBucket': firebase_storage_bucket | |
| }) | |
| logger.info("Firebase Admin SDK initialized successfully.") | |
| except Exception as e: | |
| logger.error(f"FATAL: Error initializing Firebase: {e}") | |
| exit(1) | |
| bucket = storage.bucket() | |
| db_ref = db.reference() | |
| # --- Google GenAI Client Initialization (Gemini 3.0 Flash) --- | |
| try: | |
| logger.info("Initializing Google GenAI Client...") | |
| api_key = os.environ.get("Gemini") | |
| if not api_key: | |
| raise ValueError("The 'Gemini' API key is not set.") | |
| client = genai.Client(api_key=api_key) | |
| logger.info("Google GenAI (Gemini 3.0) Client initialized successfully.") | |
| except Exception as e: | |
| logger.error(f"FATAL: Error initializing GenAI Client: {e}") | |
| exit(1) | |
| # Model Constants | |
| ATHENA_MODEL = "gemini-3-flash" | |
| BRIEFING_MODEL = "gemini-3-flash" | |
| # Grounding API Keys | |
| WOLFRAM_APP_ID = os.environ.get("WOLFRAM_APP_ID") | |
| OPENALEX_MAILTO = os.environ.get("OPENALEX_MAILTO", "rairo@sozofix.tech") | |
| # ----------------------------------------------------------------------------- | |
| # 2. HELPER FUNCTIONS & GROUNDING | |
| # ----------------------------------------------------------------------------- | |
| def verify_token(auth_header): | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| logger.warning("Missing or invalid Authorization header.") | |
| return None | |
| token = auth_header.split('Bearer ')[1] | |
| try: | |
| decoded_token = auth.verify_id_token(token) | |
| return decoded_token['uid'] | |
| except Exception as e: | |
| logger.error(f"Token verification failed: {e}") | |
| return None | |
| def verify_admin(auth_header): | |
| uid = verify_token(auth_header) | |
| if not uid: raise PermissionError('Invalid or missing token') | |
| user_data = db_ref.child(f'users/{uid}').get() | |
| if not user_data or not user_data.get('is_admin', False): | |
| logger.warning(f"Admin access denied for user: {uid}") | |
| raise PermissionError('Admin access required') | |
| return uid | |
| def upload_to_storage(data_bytes, destination_blob_name, content_type): | |
| try: | |
| blob = bucket.blob(destination_blob_name) | |
| blob.upload_from_string(data_bytes, content_type=content_type) | |
| blob.make_public() | |
| return blob.public_url | |
| except Exception as e: | |
| logger.error(f"Failed to upload to Firebase Storage: {e}") | |
| return None | |
| def query_wolfram_alpha(query): | |
| logger.info(f"Querying Wolfram Alpha: {query}") | |
| if not WOLFRAM_APP_ID: | |
| logger.warning("WOLFRAM_APP_ID missing.") | |
| return "Wolfram|Alpha grounding unavailable." | |
| try: | |
| url = f"http://api.wolframalpha.com/v1/result?appid={WOLFRAM_APP_ID}&i={query}" | |
| response = requests.get(url, timeout=5) | |
| if response.status_code == 200: | |
| logger.info("Wolfram Alpha check successful.") | |
| return response.text | |
| else: | |
| logger.warning(f"Wolfram Alpha returned status {response.status_code}") | |
| return "Fact-check pending." | |
| except Exception as e: | |
| logger.error(f"Wolfram Alpha query failed: {e}") | |
| return "Grounding timeout." | |
| def query_openalex(topic): | |
| logger.info(f"Querying OpenAlex for topic: {topic}") | |
| try: | |
| url = f"https://api.openalex.org/works?search={topic}&mailto={OPENALEX_MAILTO}" | |
| resp = requests.get(url, timeout=5).json() | |
| results = resp.get('results', []) | |
| logger.info(f"OpenAlex found {len(results)} results.") | |
| return [{"title": r['title'], "url": r['doi'] or r['id'], "year": r['publication_year']} for r in results[:3]] | |
| except Exception as e: | |
| logger.error(f"OpenAlex query failed: {e}") | |
| return [] | |
| # ----------------------------------------------------------------------------- | |
| # 3. ASYNCHRONOUS VOICE ORCHESTRATION (ATHENA VOICE) | |
| # ----------------------------------------------------------------------------- | |
| def generate_single_narration(text, uid, epiphany_id, layer_name): | |
| """Deepgram Aura-Luna generation for a single layer.""" | |
| try: | |
| logger.info(f"Generating TTS for layer: {layer_name}") | |
| api_key = os.environ.get("DEEPGRAM_API_KEY") | |
| if not api_key: | |
| logger.error("DEEPGRAM_API_KEY is not set.") | |
| return layer_name, None | |
| DEEPGRAM_URL = "https://api.deepgram.com/v1/speak?model=aura-luna-en" | |
| headers = {"Authorization": f"Token {api_key}", "Content-Type": "text/plain"} | |
| response = requests.post(DEEPGRAM_URL, headers=headers, data=text.encode('utf-8')) | |
| response.raise_for_status() | |
| audio_path = f"users/{uid}/epiphanies/{epiphany_id}/narrations/{layer_name}.mp3" | |
| url = upload_to_storage(response.content, audio_path, 'audio/mpeg') | |
| logger.info(f"TTS Uploaded: {url}") | |
| return layer_name, url | |
| except Exception as e: | |
| logger.error(f"TTS Error [{layer_name}]: {e}") | |
| return layer_name, None | |
| def generate_all_narrations_async(data_dict, uid, epiphany_id): | |
| """Uses ThreadPoolExecutor to generate all 4 layers in parallel.""" | |
| logger.info(f"Starting parallel TTS generation for epiphany: {epiphany_id}") | |
| layers = ['genesis', 'scientific_core', 'engineering_edge', 'cross_pollination'] | |
| results = {} | |
| with ThreadPoolExecutor(max_workers=4) as executor: | |
| futures = [executor.submit(generate_single_narration, data_dict[l], uid, epiphany_id, l) for l in layers] | |
| for f in futures: | |
| layer, url = f.result() | |
| results[layer] = url | |
| return results | |
| # ----------------------------------------------------------------------------- | |
| # 4. EPIPHANY SYNTHESIS (THE DISCOVERY ENDPOINTS) | |
| # ----------------------------------------------------------------------------- | |
| def generate_epiphany(): | |
| logger.info(">>> START generate_epiphany request") | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| logger.info(f"User validated: {uid}") | |
| user_ref = db_ref.child(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data or user_data.get('credits', 0) < 1: | |
| logger.warning(f"User {uid} has insufficient credits.") | |
| return jsonify({'error': 'Insufficient sparks for an Epiphany.'}), 402 | |
| if 'image' not in request.files: | |
| logger.error("No image file provided in request.") | |
| return jsonify({'error': 'Visual input is required.'}), 400 | |
| image_file = request.files['image'] | |
| image_bytes = image_file.read() | |
| try: | |
| pil_image = Image.open(io.BytesIO(image_bytes)).convert('RGB') | |
| logger.info("Image bytes successfully converted to PIL.") | |
| # Step 1: Rapid Identification | |
| logger.info("Step 1: Running Rapid Identification...") | |
| id_prompt = "Identify this precisely. If biological, include the Latin name. If mechanical, include the inventor if known. Reply with ONLY the name." | |
| id_response = client.models.generate_content(model=ATHENA_MODEL, contents=[id_prompt, pil_image]) | |
| subject = id_response.text.strip() | |
| logger.info(f"Subject Identified: {subject}") | |
| # Step 2: Grounding (Parallel Dispatch) | |
| logger.info("Step 2: Grounding Dispatch...") | |
| physics_fact = query_wolfram_alpha(f"constants of {subject}") | |
| papers = query_openalex(subject) | |
| # Step 3: Synthesis (Feynman Technique) | |
| logger.info("Step 3: Generating Feynman Synthesis JSON...") | |
| synthesis_prompt = f""" | |
| Act as Athena, the Systems Synthesizer. Reveal the first principles of '{subject}'. | |
| Style: Richard Feynman. Simple analogies, profound scientific truths. | |
| Grounding Context: {physics_fact} | |
| Output JSON: | |
| {{ | |
| "title": "Epic 3-word title", | |
| "genesis": "The origin and the core 'Aha!' moment of this system.", | |
| "scientific_core": "The first principles of physics/biology at play here.", | |
| "engineering_edge": "Detailed analysis of material, stress, or evolutionary advantage.", | |
| "cross_pollination": "How this principle applies to a completely different field." | |
| }} | |
| """ | |
| synth_response = client.models.generate_content( | |
| model=ATHENA_MODEL, | |
| contents=[synthesis_prompt, pil_image], | |
| config=types.GenerateContentConfig(response_mime_type='application/json') | |
| ) | |
| # Log raw response for debugging 500s | |
| raw_text = synth_response.text | |
| logger.info(f"Raw Synthesis Response: {raw_text}") | |
| # Cleaning up markdown code blocks if present | |
| if raw_text.startswith("```json"): | |
| raw_text = raw_text.replace("```json", "").replace("```", "").strip() | |
| data = json.loads(raw_text) | |
| epiphany_id = str(uuid.uuid4()) | |
| logger.info(f"Epiphany ID generated: {epiphany_id}") | |
| # Step 4: Parallel Audio Generation | |
| logger.info("Step 4: Running Parallel TTS...") | |
| narration_urls = generate_all_narrations_async(data, uid, epiphany_id) | |
| # Step 5: Persistence | |
| logger.info("Step 5: Finalizing record and uploading image...") | |
| image_url = upload_to_storage(image_bytes, f"users/{uid}/epiphanies/{epiphany_id}/vision.jpg", 'image/jpeg') | |
| epiphany_record = { | |
| "epiphanyId": epiphany_id, | |
| "uid": uid, | |
| "title": data.get('title', 'Unknown Discovery'), | |
| "subject": subject, | |
| "imageURL": image_url, | |
| "layers": { | |
| "genesis": {"text": data.get('genesis', ''), "audio": narration_urls.get('genesis')}, | |
| "scientific_core": {"text": data.get('scientific_core', ''), "audio": narration_urls.get('scientific_core')}, | |
| "engineering_edge": {"text": data.get('engineering_edge', ''), "audio": narration_urls.get('engineering_edge')}, | |
| "cross_pollination": {"text": data.get('cross_pollination', ''), "audio": narration_urls.get('cross_pollination')} | |
| }, | |
| "grounding": {"physics": physics_fact, "papers": papers}, | |
| "createdAt": datetime.utcnow().isoformat() | |
| } | |
| db_ref.child(f'epiphanies/{epiphany_id}').set(epiphany_record) | |
| user_ref.update({'credits': user_data.get('credits', 0) - 1}) | |
| logger.info(f"EP-GEN SUCCESS: {epiphany_id}") | |
| return jsonify(epiphany_record), 201 | |
| except json.JSONDecodeError as je: | |
| logger.error(f"JSON Parsing Error: {je}. Raw output was: {synth_response.text}") | |
| return jsonify({'error': 'AI generated invalid JSON. Try again.'}), 500 | |
| except Exception as e: | |
| logger.error(f"Generate Epiphany Global Error: {e}") | |
| logger.error(traceback.format_exc()) | |
| return jsonify({'error': str(e)}), 500 | |
| def deep_dive(): | |
| logger.info(">>> START deep-dive request") | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| image_file = request.files['image'] | |
| try: | |
| pil_image = Image.open(io.BytesIO(image_file.read())).convert('RGB') | |
| dive_prompt = "Act as an Engineering Lead. In 50 words, explain the microscopic or mechanical significance of this specific detail." | |
| res = client.models.generate_content(model=ATHENA_MODEL, contents=[dive_prompt, pil_image]) | |
| logger.info("Deep dive reasoning completed.") | |
| user_ref = db_ref.child(f'users/{uid}') | |
| user_ref.update({'credits': max(0, user_ref.get().get('credits', 0) - 1)}) | |
| return jsonify({"analysis": res.text.strip()}), 200 | |
| except Exception as e: | |
| logger.error(f"Deep Dive Error: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # 5. THE CHIRON MENTOR (INTERACTION & MEMORY) | |
| # ----------------------------------------------------------------------------- | |
| def get_chiron_briefing(): | |
| logger.info(">>> START get_chiron_briefing request") | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| try: | |
| last_epiphany = db_ref.child('epiphanies').order_by_child('uid').equal_to(uid).limit_to_last(1).get() or {} | |
| context_data = "" | |
| if last_epiphany: | |
| eid = list(last_epiphany.keys())[0] | |
| e_data = last_epiphany[eid] | |
| papers = e_data.get('grounding', {}).get('papers', []) | |
| paper_titles = [p['title'] for p in papers] | |
| context_data = f"Current focus: {e_data['subject']}. Recent papers unlocked: {', '.join(paper_titles)}." | |
| logger.info(f"Generating briefing with context: {context_data}") | |
| brief_prompt = f""" | |
| Prep Chiron, the Socratic Mentor. | |
| User Context: {context_data} | |
| Write a 4-sentence brief for Chiron. He must reference the scientific papers or topics the user just explored. | |
| """ | |
| response = client.models.generate_content(model=BRIEFING_MODEL, contents=[brief_prompt]) | |
| return jsonify({"memory_summary": response.text.strip(), "grounding_context": context_data}), 200 | |
| except Exception as e: | |
| logger.error(f"Call Briefing Error: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def log_call_usage(): | |
| logger.info(">>> START log_call_usage request") | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| data = request.get_json() | |
| duration = data.get("durationSeconds", 0) | |
| transcript = data.get("transcript", "") | |
| cost = math.ceil(duration / 60) * 3 | |
| logger.info(f"Call ended for {uid}. Duration: {duration}s, Cost: {cost} sparks.") | |
| try: | |
| user_ref = db_ref.child(f'users/{uid}') | |
| user_data = user_ref.get() | |
| new_bal = max(0, user_data.get('credits', 0) - cost) | |
| user_ref.update({'credits': new_bal}) | |
| if transcript: | |
| db_ref.child(f'transcripts/{uid}').push({ | |
| "text": transcript, | |
| "createdAt": datetime.utcnow().isoformat() | |
| }) | |
| logger.info("Transcript saved to database.") | |
| return jsonify({"success": True, "remainingCredits": new_bal}), 200 | |
| except Exception as e: | |
| logger.error(f"Usage Logging Error: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # 6. ADMIN, FEEDBACK & CREDITS | |
| # ----------------------------------------------------------------------------- | |
| def submit_feedback(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| data = request.get_json() | |
| try: | |
| fb_ref = db_ref.child('feedback').push() | |
| fb_ref.set({ | |
| "userId": uid, | |
| "message": data.get('message'), | |
| "type": data.get('type', 'general'), | |
| "status": "open", | |
| "createdAt": datetime.utcnow().isoformat() | |
| }) | |
| logger.info(f"Feedback received from user {uid}.") | |
| return jsonify({"success": True}), 201 | |
| except Exception as e: | |
| logger.error(f"Feedback Submission Error: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def request_credits(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| data = request.get_json() | |
| try: | |
| req_ref = db_ref.child('credit_requests').push() | |
| req_ref.set({ | |
| "userId": uid, | |
| "requested_amount": data.get('amount', 50), | |
| "status": "pending", | |
| "createdAt": datetime.utcnow().isoformat() | |
| }) | |
| logger.info(f"Spark request logged for user {uid}.") | |
| return jsonify({"success": True}), 201 | |
| except Exception as e: | |
| logger.error(f"Credit Request Error: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # 7. AUTHENTICATION & PROFILE | |
| # ----------------------------------------------------------------------------- | |
| def signup(): | |
| logger.info(">>> START signup sync") | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: | |
| return jsonify({'error': 'Invalid or expired token'}), 401 | |
| try: | |
| user_ref = db_ref.child(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if user_data: | |
| logger.info(f"User {uid} already exists, skipping initialization.") | |
| return jsonify({'uid': uid, **user_data}), 200 | |
| data = request.get_json() | |
| new_user_record = { | |
| 'email': data.get('email'), | |
| 'displayName': data.get('displayName', 'Seeker'), | |
| 'credits': 30, | |
| 'is_admin': False, | |
| 'createdAt': datetime.utcnow().isoformat() | |
| } | |
| user_ref.set(new_user_record) | |
| logger.info(f"New seeker initialized: {uid} with 30 Sparks.") | |
| return jsonify({'success': True, 'uid': uid, **new_user_record}), 201 | |
| except Exception as e: | |
| logger.error(f"Signup Sync Error: {e}") | |
| return jsonify({'error': str(e)}), 400 | |
| def social_signin(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Invalid token'}), 401 | |
| user_ref = db_ref.child(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data: | |
| logger.info(f"Initializing social user: {uid}") | |
| firebase_user = auth.get_user(uid) | |
| user_data = { | |
| 'email': firebase_user.email, | |
| 'displayName': firebase_user.display_name or 'Seeker', | |
| 'credits': 30, | |
| 'is_admin': False, | |
| 'createdAt': datetime.utcnow().isoformat() | |
| } | |
| user_ref.set(user_data) | |
| return jsonify({'uid': uid, **user_data}), 200 | |
| def get_profile(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| return jsonify(db_ref.child(f'users/{uid}').get()) | |
| def list_epiphanies(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| logger.info(f"Listing epiphanies for user {uid}.") | |
| results = db_ref.child('epiphanies').order_by_child('uid').equal_to(uid).get() or {} | |
| return jsonify(list(results.values())) | |
| # ----------------------------------------------------------------------------- | |
| # 8. MAIN EXECUTION | |
| # ----------------------------------------------------------------------------- | |
| if __name__ == '__main__': | |
| # Standard HF Port 7860 | |
| logger.info("Starting Sozo Athena Server on port 7860...") | |
| app.run(debug=False, host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) |