Spaces:
Running
Running
| import os | |
| import io | |
| import uuid | |
| import re | |
| import time | |
| import json | |
| import traceback | |
| import math | |
| import requests | |
| import threading | |
| import logging | |
| from concurrent.futures import ThreadPoolExecutor | |
| from datetime import datetime, timedelta | |
| from flask import Flask, request, jsonify, Response | |
| from flask_cors import CORS | |
| import firebase_admin | |
| from firebase_admin import credentials, db, storage, auth | |
| from PIL import Image | |
| from google import genai | |
| from google.genai import types | |
| # ----------------------------------------------------------------------------- | |
| # 0. LOGGING CONFIGURATION | |
| # ----------------------------------------------------------------------------- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger("SOZO_ATHENA") | |
| # ----------------------------------------------------------------------------- | |
| # 1. CONFIGURATION & INITIALIZATION | |
| # ----------------------------------------------------------------------------- | |
| app = Flask(__name__) | |
| CORS(app) | |
| # --- Firebase Initialization --- | |
| try: | |
| logger.info("Initializing Firebase Admin SDK...") | |
| credentials_json_string = os.environ.get("FIREBASE") | |
| if not credentials_json_string: | |
| raise ValueError("The FIREBASE environment variable is not set.") | |
| credentials_json = json.loads(credentials_json_string) | |
| firebase_db_url = os.environ.get("Firebase_DB") | |
| firebase_storage_bucket = os.environ.get("Firebase_Storage") | |
| if not firebase_db_url or not firebase_storage_bucket: | |
| raise ValueError("Firebase_DB and Firebase_Storage environment variables must be set.") | |
| cred = credentials.Certificate(credentials_json) | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': firebase_db_url, | |
| 'storageBucket': firebase_storage_bucket | |
| }) | |
| logger.info("Firebase Admin SDK initialized successfully.") | |
| except Exception as e: | |
| logger.error(f"FATAL: Error initializing Firebase: {e}") | |
| exit(1) | |
| bucket = storage.bucket() | |
| db_ref = db.reference() | |
| # --- Google GenAI Client Initialization (Gemini 3.0 Ecosystem) --- | |
| try: | |
| logger.info("Initializing Google GenAI Client (Titaness Paradigm)...") | |
| api_key = os.environ.get("Gemini") | |
| if not api_key: | |
| raise ValueError("The 'Gemini' API key is not set.") | |
| client = genai.Client(api_key=api_key) | |
| logger.info("Google GenAI Client initialized successfully.") | |
| except Exception as e: | |
| logger.error(f"FATAL: Error initializing GenAI Client: {e}") | |
| exit(1) | |
| # Model Constants | |
| ATHENA_FLASH = "gemini-3-flash-preview" | |
| ATHENA_PRO = "gemini-3-pro-image-preview" | |
| # Grounding / External API Keys | |
| WOLFRAM_APP_ID = os.environ.get("WOLFRAM_APP_ID") | |
| OPENALEX_MAILTO = os.environ.get("OPENALEX_MAILTO", "rairo@sozofix.tech") | |
| # ----------------------------------------------------------------------------- | |
| # 2. HELPER FUNCTIONS & AUTHENTICATION | |
| # ----------------------------------------------------------------------------- | |
| # ----------------------------------------------------------------------------- | |
| # ODYSSEUS V10 CONSTANTS & LINTING | |
| # ----------------------------------------------------------------------------- | |
| FORBIDDEN_CODE_PATTERNS = [ | |
| r"\bfetch\s*\(", r"\bXMLHttpRequest\b", r"\bWebSocket\b", r"\blocalStorage\b", | |
| r"\bsessionStorage\b", r"\bdocument\.cookie\b", r"\bwindow\.location\b", | |
| r"\beval\s*\(", r"\bnew\s+Function\s*\(", r"\bimport\s*\(", r"\brequire\s*\(" | |
| ] | |
| REQUIRED_CODE_PATTERNS = [r"\bexport\s+default\s+Instrument\b"] | |
| ALLOWED_INTERACTIONS = {"drag", "dial", "toggle", "tap", "hold", "keys"} | |
| ALLOWED_WIN_TYPES = {"zone_dwell", "threshold", "sequence", "match"} | |
| def _strip_json_fences(s: str) -> str: | |
| s = (s or "").strip() | |
| if "```" in s: | |
| m = re.search(r"```(?:json)?\s*(.*?)\s*```", s, re.DOTALL) | |
| if m: return m.group(1).strip() | |
| return s | |
| def validate_manifest(m: dict) -> (bool, str): | |
| if not isinstance(m, dict): return False, "Manifest is not an object" | |
| for k in ("engine", "interaction", "win_rule"): | |
| if k not in m: return False, f"Manifest missing '{k}'" | |
| return True, "ok" | |
| def lint_component_code(code: str) -> (bool, list): | |
| errors = [] | |
| if not isinstance(code, str) or len(code.strip()) < 50: | |
| return False, ["component_code missing/too short"] | |
| for pat in FORBIDDEN_CODE_PATTERNS: | |
| if re.search(pat, code): errors.append(f"forbidden_pattern") | |
| for pat in REQUIRED_CODE_PATTERNS: | |
| if not re.search(pat, code): errors.append(f"missing_export_default_Instrument") | |
| if "onAction" not in code: errors.append("missing_call:onAction") | |
| if "onWin" not in code: errors.append("missing_call:onWin") | |
| return len(errors) == 0, errors | |
| def verify_token(auth_header): | |
| """Verifies Firebase ID token from the Authorization header.""" | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| logger.warning("Auth Header missing or malformed.") | |
| return None | |
| token = auth_header.split('Bearer ')[1] | |
| try: | |
| decoded_token = auth.verify_id_token(token) | |
| return decoded_token['uid'] | |
| except Exception as e: | |
| logger.error(f"Token verification failed: {e}") | |
| return None | |
| def verify_admin(auth_header): | |
| """Ensures the user has admin privileges in the Realtime Database.""" | |
| uid = verify_token(auth_header) | |
| if not uid: | |
| raise PermissionError('Invalid token') | |
| user_data = db_ref.child(f'users/{uid}').get() | |
| if not user_data or not user_data.get('is_admin', False): | |
| logger.warning(f"Unauthorized Admin access attempt by UID: {uid}") | |
| raise PermissionError('Admin access required') | |
| return uid | |
| def upload_to_storage(data_bytes, destination_blob_name, content_type): | |
| """Uploads bytes to Firebase Storage and returns the public URL.""" | |
| try: | |
| blob = bucket.blob(destination_blob_name) | |
| blob.upload_from_string(data_bytes, content_type=content_type) | |
| blob.make_public() | |
| return blob.public_url | |
| except Exception as e: | |
| logger.error(f"Firebase Storage Upload Error: {e}") | |
| return None | |
| def query_wolfram_alpha(query): | |
| """Fetches computational facts from Wolfram Alpha for grounding.""" | |
| if not WOLFRAM_APP_ID: | |
| return "Grounded in first principles." | |
| try: | |
| url = f"http://api.wolframalpha.com/v1/result?appid={WOLFRAM_APP_ID}&i={query}" | |
| response = requests.get(url, timeout=5) | |
| if response.status_code == 200: | |
| return response.text | |
| return "Constants verifying..." | |
| except Exception as e: | |
| logger.error(f"Wolfram Alpha Query Failed: {e}") | |
| return "Grounding in progress." | |
| # ----------------------------------------------------------------------------- | |
| # 3. TITANESS MEDIA ENGINE (CONSOLIDATED MASTER BLUEPRINT + ASYNC AUDIO) | |
| # ----------------------------------------------------------------------------- | |
| def generate_narration_task(text, uid, epiphany_id, layer_name): | |
| """Worker task for generating Deepgram audio in a thread.""" | |
| if not text or len(text) < 5: | |
| return layer_name, None | |
| try: | |
| logger.info(f"[THREAD] Starting Narration: {layer_name}") | |
| api_key = os.environ.get("DEEPGRAM_API_KEY") | |
| if not api_key: return layer_name, None | |
| DEEPGRAM_URL = "https://api.deepgram.com/v1/speak?model=aura-luna-en" | |
| headers = {"Authorization": f"Token {api_key}", "Content-Type": "text/plain"} | |
| response = requests.post(DEEPGRAM_URL, headers=headers, data=text.encode('utf-8')) | |
| response.raise_for_status() | |
| path = f"users/{uid}/epiphanies/{epiphany_id}/audio/{layer_name}.mp3" | |
| url = upload_to_storage(response.content, path, 'audio/mpeg') | |
| return layer_name, url | |
| except Exception as e: | |
| logger.error(f"Narration Task Error [{layer_name}]: {e}") | |
| return layer_name, None | |
| def generate_master_blueprint_task(subject, flattened_data, uid, epiphany_id): | |
| """Worker task for generating the 16:9 Consolidated Master Blueprint.""" | |
| try: | |
| logger.info(f"[THREAD] Starting Nano Banana Master Blueprint for: {subject}") | |
| # Build prompt context from synthesis | |
| summary = ( | |
| f"Genesis: {flattened_data.get('genesis', '')[:100]}... " | |
| f"Core: {flattened_data.get('scientific_core', '')[:100]}... " | |
| f"Edge: {flattened_data.get('engineering_edge', '')[:100]}... " | |
| f"Future: {flattened_data.get('cross_pollination', '')[:100]}..." | |
| ) | |
| prompt = ( | |
| f"Generate a single 4K Master Technical Blueprint for '{subject}'. " | |
| f"Layout: A detailed four-quadrant schematic diagram. " | |
| f"Quadrant 1: Genesis (Origins). Quadrant 2: Scientific Core (The Physics). " | |
| f"Quadrant 3: Engineering Edge (Design Limits). Quadrant 4: Cross-Pollination (Future Tech). " | |
| f"Aesthetic: White-line schematic style on a dark midnight navy background. " | |
| f"Style: Leonardo Da Vinci meets modern 4K engineering schematics. High accuracy. " | |
| f"Details to include: {summary}" | |
| ) | |
| response = client.models.generate_content( | |
| model=ATHENA_PRO, | |
| contents=prompt, | |
| config=types.GenerateContentConfig( | |
| image_config=types.ImageConfig(aspect_ratio="16:9", image_size="4K") | |
| ) | |
| ) | |
| image_parts = [part for part in response.parts if part.inline_data] | |
| if image_parts: | |
| image_bytes = image_parts[0].inline_data.data | |
| path = f"users/{uid}/epiphanies/{epiphany_id}/master_blueprint.png" | |
| return upload_to_storage(image_bytes, path, 'image/png') | |
| return None | |
| except Exception as e: | |
| logger.error(f"Master Blueprint Thread Error: {e}") | |
| return None | |
| # ----------------------------------------------------------------------------- | |
| # 4. PRIMARY ENDPOINTS: GENERATE & THEIA SWEEP | |
| # ----------------------------------------------------------------------------- | |
| def image_proxy(): | |
| """Proxies images to bypass CORS/AI Studio Preview blocks.""" | |
| image_url = request.args.get('url') | |
| if not image_url: return jsonify({'error': 'URL missing'}), 400 | |
| try: | |
| resp = requests.get(image_url, timeout=10) | |
| return Response(resp.content, content_type=resp.headers.get('Content-Type', 'image/jpeg')) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def generate_epiphany(): | |
| """The Titaness Core: Synthesis + Feynman Scholar + Consolidated Blueprint.""" | |
| logger.info(">>> TITANESS GENERATION START") | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| user_ref = db_ref.child(f'users/{uid}') | |
| user_data = user_ref.get() or {} | |
| # Cost: 8 Sparks (Consolidated Synthesis + 4K Blueprint) | |
| if user_data.get('credits', 0) < 8: | |
| return jsonify({'error': 'Insufficient Sparks. (Full Synthesis requires 8 sparks)'}), 402 | |
| if 'image' not in request.files: | |
| return jsonify({'error': 'Visual image is required.'}), 400 | |
| image_file = request.files['image'] | |
| image_bytes = image_file.read() | |
| pil_image = Image.open(io.BytesIO(image_bytes)).convert('RGB') | |
| try: | |
| # Step 1: Accurate Identification | |
| id_prompt = "Identify this precisely. Include specific model or species if visible. Reply with ONLY the name (max 5 words)." | |
| id_res = client.models.generate_content(model=ATHENA_FLASH, contents=[id_prompt, pil_image]) | |
| subject = id_res.text.strip() | |
| logger.info(f"Subject Identified: {subject}") | |
| # Step 2: Grounded Feynman Synthesis + Research | |
| physics_fact = query_wolfram_alpha(f"physics laws and constants of {subject}") | |
| synthesis_prompt = f""" | |
| Act as Athena. Analyze '{subject}' grounded in: {physics_fact}. | |
| Style: Richard Feynman. Simple analogies, profound engineering truths. | |
| Tasks: | |
| 1. Search the web (ArXiv, Patents, Journals) for 3 diverse sources about {subject} using Google Search. | |
| 2. Create 4 Discovery Layers (genesis, scientific_core, engineering_edge, cross_pollination). | |
| 3. For each research source, provide Title, URL, and a 2-sentence Feynman Summary. | |
| MANDATORY JSON SCHEMA (FLAT): | |
| {{ | |
| "title": "Epic Title", | |
| "genesis": "The origin/evolutionary logic.", | |
| "scientific_core": "The first principles of physics.", | |
| "engineering_edge": "Detailed stress/composition analysis.", | |
| "cross_pollination": "Interdisciplinary application.", | |
| "scholar": [ | |
| {{"title": "Title", "url": "URL", "feynman_summary": "Summary"}} | |
| ] | |
| }} | |
| """ | |
| synth_res = client.models.generate_content( | |
| model=ATHENA_FLASH, | |
| contents=[synthesis_prompt, pil_image], | |
| config=types.GenerateContentConfig( | |
| tools=[{"google_search": {}}], | |
| response_mime_type='application/json' | |
| ) | |
| ) | |
| # --- SCHEMA SENTINEL: Handle JSON Code Blocks & Lists --- | |
| raw_json = synth_res.text.strip() | |
| if "```json" in raw_json: | |
| raw_json = re.search(r'```json\n(.*?)\n```', raw_json, re.DOTALL).group(1) | |
| data = json.loads(raw_json) | |
| if isinstance(data, list): data = data[0] | |
| # Flatten any AI nesting (e.g. data['epiphany']['genesis']) | |
| if "epiphany" in data: data = data["epiphany"] | |
| elif "discovery_layers" in data: data = data["discovery_layers"] | |
| layers_list = ['genesis', 'scientific_core', 'engineering_edge', 'cross_pollination'] | |
| # Guard against missing keys | |
| for k in layers_list: | |
| if k not in data: data[k] = f"Analyzing first principles of {k.replace('_', ' ')}..." | |
| epiphany_id = str(uuid.uuid4()) | |
| # Step 3: PARALLEL TITANESS MEDIA ENGINE | |
| logger.info(f"Step 3: Launching media threads for {epiphany_id}") | |
| audios = {} | |
| master_blueprint_url = None | |
| with ThreadPoolExecutor(max_workers=5) as executor: | |
| # 4 Audio Threads + 1 Master Blueprint Thread | |
| aud_futures = [executor.submit(generate_narration_task, data.get(l), uid, epiphany_id, l) for l in layers_list] | |
| blu_future = executor.submit(generate_master_blueprint_task, subject, data, uid, epiphany_id) | |
| for f in aud_futures: | |
| k, v = f.result() | |
| audios[k] = v | |
| master_blueprint_url = blu_future.result() | |
| # Step 4: Record Persistence | |
| image_url = upload_to_storage(image_bytes, f"users/{uid}/epiphanies/{epiphany_id}/vision.jpg", 'image/jpeg') | |
| epiphany_record = { | |
| "epiphanyId": epiphany_id, | |
| "uid": uid, | |
| "title": data.get('title', f"Insight: {subject}"), | |
| "subject": subject, | |
| "imageURL": image_url, | |
| "masterBlueprint": master_blueprint_url, | |
| "layers": { | |
| l: {"text": data.get(l, ""), "audio": audios.get(l)} for l in layers_list | |
| }, | |
| "scholar": data.get('scholar', []), | |
| "createdAt": datetime.utcnow().isoformat() | |
| } | |
| db_ref.child(f'epiphanies/{epiphany_id}').set(epiphany_record) | |
| user_ref.update({'credits': user_data.get('credits', 0) - 8}) | |
| logger.info(f"TITANESS SUCCESS: {epiphany_id}") | |
| return jsonify(epiphany_record), 201 | |
| except Exception as e: | |
| logger.error(f"Global Epiphany Gen Error: {e}\n{traceback.format_exc()}") | |
| return jsonify({'error': str(e)}), 500 | |
| def theia_sweep(): | |
| """Standalone Theia Mode: Bounding Box Annotations with DB Persistence.""" | |
| logger.info(">>> THEIA SWEEP INITIATED") | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| epiphany_id = request.form.get('epiphanyId') | |
| if not epiphany_id: return jsonify({'error': 'epiphanyId is required.'}), 400 | |
| # 1. Check if annotations already exist to prevent double-charging | |
| epiphany_ref = db_ref.child(f'epiphanies/{epiphany_id}') | |
| existing_data = epiphany_ref.get() or {} | |
| if 'annotations' in existing_data: | |
| return jsonify({"annotations": existing_data['annotations'], "status": "already_stored"}), 200 | |
| # 2. Check Sparks | |
| user_ref = db_ref.child(f'users/{uid}') | |
| user_data = user_ref.get() or {} | |
| if user_data.get('credits', 0) < 4: | |
| return jsonify({'error': 'Need 4 Sparks for a Theia Sweep.'}), 402 | |
| image_file = request.files['image'] | |
| subject = existing_data.get('subject', 'Complex System') | |
| sweep_prompt = f""" | |
| Theia Mode Activation: {subject}. | |
| Use Python Code Execution for spatial deconstruction. | |
| Identify every functional component. For each return: | |
| 1. label: name of the part. | |
| 2. coordinates: [ymin, xmin, ymax, xmax] (normalized 0-1000). | |
| 3. micro_epiphany: 20-word Feynman secret. | |
| Return JSON list ONLY. | |
| """ | |
| try: | |
| pil_image = Image.open(io.BytesIO(image_file.read())).convert('RGB') | |
| res = client.models.generate_content( | |
| model=ATHENA_FLASH, | |
| contents=[sweep_prompt, pil_image], | |
| config=types.GenerateContentConfig( | |
| tools=[types.Tool(code_execution=types.ToolCodeExecution)], | |
| response_mime_type='application/json' | |
| ) | |
| ) | |
| raw_json = res.text.strip() | |
| if "```json" in raw_json: | |
| raw_json = re.search(r'```json\n(.*?)\n```', raw_json, re.DOTALL).group(1) | |
| annotations = json.loads(raw_json) | |
| # 3. STORE IN DB & CHARGE | |
| epiphany_ref.update({"annotations": annotations}) | |
| user_ref.update({'credits': user_data.get('credits', 0) - 4}) | |
| return jsonify({"annotations": annotations}), 200 | |
| except Exception as e: | |
| logger.error(f"Theia Sweep Error: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def deep_dive(): | |
| """Manual Zoom analysis (1 Spark).""" | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| if 'image' not in request.files: return jsonify({'error': 'No image'}), 400 | |
| try: | |
| pil_image = Image.open(io.BytesIO(request.files['image'].read())).convert('RGB') | |
| res = client.models.generate_content( | |
| model=ATHENA_FLASH, | |
| contents=["In 50 words Feynman style, reveal the hidden significance of this zoomed detail.", pil_image] | |
| ) | |
| # Deduct 1 spark | |
| user_ref = db_ref.child(f'users/{uid}') | |
| new_val = max(0, (user_ref.get().get('credits', 0) or 0) - 1) | |
| user_ref.update({'credits': new_val}) | |
| return jsonify({"analysis": res.text.strip()}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # ODYSSEUS ENGINE V6: THE AUTONOMOUS ARCHITECT (NO TEMPLATES) | |
| # ----------------------------------------------------------------------------- | |
| def validate_odysseus_v6(t: dict) -> (bool, str): | |
| """Ensures the AI output has the basic components to build a world.""" | |
| try: | |
| if not t.get("entities") or not t.get("victory_logic"): | |
| return False, "Incomplete World Definition" | |
| return True, "ok" | |
| except: return False, "Blueprint Error" | |
| # ----------------------------------------------------------------------------- | |
| # ODYSSEUS ENGINE V9: THE SOCRATIC SIMULATOR (ESSENTIAL VERB ENGINE) | |
| # ----------------------------------------------------------------------------- | |
| # ----------------------------------------------------------------------------- | |
| # ODYSSEUS ENGINE V10: THE SOCRATIC INSTRUMENT (DYNAMIC CODE GEN) | |
| # ----------------------------------------------------------------------------- | |
| def generate_trial(): | |
| """ | |
| Odysseus v10: The Self-Repairing Instrumentalist. | |
| Generates bespoke Vanilla React components with a 3-pass repair loop. | |
| Cost: 1 Spark. | |
| """ | |
| logger.info(">>> ODYSSEUS V10: GENERATING TRIAL") | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({"error": "Unauthorized"}), 401 | |
| payload = request.get_json(silent=True) or {} | |
| ep_id, layer_key, subject = payload.get("epiphanyId"), payload.get("layerKey"), payload.get("subject") | |
| if not all([ep_id, layer_key, subject]): | |
| return jsonify({"error": "Missing context parameters."}), 400 | |
| # 1. Cache Check | |
| trial_path = f"epiphanies/{ep_id}/trials/{layer_key}" | |
| existing = db_ref.child(trial_path).get() | |
| if existing: return jsonify(existing), 200 | |
| # 2. Concurrency Lock | |
| lock_ref = db_ref.child(f"epiphanies/{ep_id}/trialLocks/{layer_key}") | |
| if lock_ref.get(): return jsonify({"status": "locked"}), 409 | |
| lock_ref.set({"uid": uid, "at": datetime.utcnow().isoformat()}) | |
| # 3. Context Retrieval | |
| layer_obj = db_ref.child(f"epiphanies/{ep_id}/layers/{layer_key}").get() or {} | |
| ctx_text = layer_obj.get("text", "General scientific principles.") | |
| # 4. Forge & Repair Loop | |
| attempts, forged, last_raw = 0, None, "" | |
| base_prompt = f""" | |
| You are Athena's Master Instrumentalist. Forge a tactile scientific Instrument for {subject}. | |
| Layer Context: {ctx_text} | |
| TASK: Write a React component named 'Instrument'. | |
| REQUIREMENTS: | |
| 1. Vanilla React ONLY. No external libraries (No Matter.js). Use SVG/Canvas for visuals. | |
| 2. Tactile: The system must fail/decay if user does nothing. | |
| 3. Handshake: Must accept and call `props.onAction()` (on touch) and `props.onWin()` (on success). | |
| 4. Design: Midnight Navy #0B1120, Gold #D4AF37. | |
| JSON SCHEMA: | |
| {{ | |
| "engine": "odysseus_v10", | |
| "instrument_name": "Title", | |
| "mission_brief": "Goal", | |
| "instrument_manifest": {{ "engine": "odysseus_v10", "interaction": "drag|tap|hold", "win_rule": {{ "type": "zone_dwell" }} }}, | |
| "component_code": "const Instrument = (props) => {{ ... }}; export default Instrument;", | |
| "feynman_truth": "The law proven." | |
| }} | |
| """ | |
| try: | |
| while attempts < 3 and not forged: | |
| attempts += 1 | |
| current_prompt = base_prompt if attempts == 1 else f"REPAIR THIS CODE: {last_raw}\nERRORS: {last_errs}" | |
| res = client.models.generate_content( | |
| model=ATHENA_FLASH, | |
| contents=current_prompt, | |
| config=types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| last_raw = _strip_json_fences(res.text) | |
| candidate = json.loads(last_raw) | |
| # Validation logic | |
| ok_m, _ = validate_manifest(candidate.get("instrument_manifest")) | |
| ok_c, last_errs = lint_component_code(candidate.get("component_code")) | |
| if ok_m and ok_c: | |
| forged = candidate | |
| break | |
| if not forged: | |
| lock_ref.delete() | |
| return jsonify({"status": "failed", "reason": "Compilation failure"}), 422 | |
| # 5. Atomic Credit Deduction (1 Spark) | |
| user_ref = db_ref.child(f'users/{uid}') | |
| def credits_txn(cur): | |
| cur = cur or 0 | |
| return cur - 1 if cur >= 1 else cur | |
| result = user_ref.child('credits').transaction(credits_txn) | |
| if result < 0: | |
| lock_ref.delete() | |
| return jsonify({"error": "Insufficient Sparks"}), 402 | |
| # 6. Finalize | |
| forged["createdAt"] = datetime.utcnow().isoformat() | |
| db_ref.child(trial_path).set(forged) | |
| lock_ref.delete() | |
| logger.info(f"Odysseus v10 forged for {subject} in {attempts} attempts.") | |
| return jsonify(forged), 201 | |
| except Exception as e: | |
| lock_ref.delete() | |
| logger.error(f"Forging Global Error: {e}") | |
| return jsonify({"error": "The laws of physics failed to compile."}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # 5. THE CHIRON MENTOR & SYSTEM UTILS | |
| # ----------------------------------------------------------------------------- | |
| def get_chiron_briefing(): | |
| """Preps ElevenLabs Chiron with scientific context from the last Epiphany.""" | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| try: | |
| # Fetch last epiphany for context | |
| last_eps = db_ref.child('epiphanies').order_by_child('uid').equal_to(uid).limit_to_last(1).get() or {} | |
| context_str = "Beginning an academic journey." | |
| if last_eps: | |
| eid = list(last_eps.keys())[0] | |
| e = last_eps[eid] | |
| scholar_sample = e.get('scholar', [])[:1] | |
| context_str = f"Focus: {e['subject']}. Papers Analyzed: {scholar_sample[0]['title'] if scholar_sample else 'General'}" | |
| brief_prompt = f"Prep Chiron (Socratic Mentor). Context: {context_str}. Write a 4-sentence briefing for a voice call." | |
| res = client.models.generate_content(model=ATHENA_FLASH, contents=[brief_prompt]) | |
| return jsonify({"memory_summary": res.text.strip()}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def log_call_usage(): | |
| """Handles Spark deduction (3 per min) for ElevenLabs sessions.""" | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| data = request.get_json() | |
| duration = data.get("durationSeconds", 0) | |
| cost = math.ceil(duration / 60) * 3 | |
| try: | |
| user_ref = db_ref.child(f'users/{uid}') | |
| current = user_ref.get().get('credits', 0) | |
| new_bal = max(0, current - cost) | |
| user_ref.update({'credits': new_bal}) | |
| if data.get("transcript"): | |
| db_ref.child(f'transcripts/{uid}').push({ | |
| "text": data["transcript"], "createdAt": datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({"success": True, "remainingCredits": new_bal}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # 6. ADMIN & FEEDBACK | |
| # ----------------------------------------------------------------------------- | |
| def admin_dashboard(): | |
| try: | |
| verify_admin(request.headers.get('Authorization')) | |
| users = db_ref.child('users').get() or {} | |
| epiphanies = db_ref.child('epiphanies').get() or {} | |
| credit_reqs = db_ref.child('credit_requests').get() or {} | |
| return jsonify({ | |
| "total_users": len(users), | |
| "total_epiphanies": len(epiphanies), | |
| "pending_requests": len([r for r in credit_reqs.values() if r.get('status') == 'pending']) | |
| }), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 403 | |
| def submit_feedback(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| data = request.json | |
| db_ref.child('feedback').push({ | |
| "uid": uid, "message": data.get('message'), "at": datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({"success": True}), 201 | |
| def request_sparks(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| db_ref.child('credit_requests').push({ | |
| "uid": uid, "status": "pending", "at": datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({"success": True}), 201 | |
| # ----------------------------------------------------------------------------- | |
| # 7. AUTHENTICATION & USER PROFILE | |
| # ----------------------------------------------------------------------------- | |
| def signup(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| user_ref = db_ref.child(f'users/{uid}') | |
| if not user_ref.get(): | |
| data = request.json | |
| user_ref.set({ | |
| 'email': data.get('email'), 'displayName': data.get('displayName', 'Seeker'), | |
| 'credits': 30, 'is_admin': False, 'createdAt': datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({'uid': uid, **user_ref.get()}), 201 | |
| def social_signin(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| user_ref = db_ref.child(f'users/{uid}') | |
| u = user_ref.get() | |
| if not u: | |
| fu = auth.get_user(uid) | |
| u = { | |
| 'email': fu.email, 'displayName': fu.display_name or 'Seeker', | |
| 'credits': 30, 'is_admin': False, 'createdAt': datetime.utcnow().isoformat() | |
| } | |
| user_ref.set(u) | |
| return jsonify({'uid': uid, **u}), 200 | |
| def get_profile(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| return jsonify(db_ref.child(f'users/{uid}').get()) | |
| def list_epiphanies(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| res = db_ref.child('epiphanies').order_by_child('uid').equal_to(uid).get() or {} | |
| return jsonify(list(res.values())) | |
| # ----------------------------------------------------------------------------- | |
| # 8. MAIN EXECUTION | |
| # ----------------------------------------------------------------------------- | |
| if __name__ == '__main__': | |
| logger.info("Titaness Definitive Backbone Active on port 7860...") | |
| app.run(debug=False, host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) |