Spaces:
Running
Running
| import os | |
| import io | |
| import uuid | |
| import re | |
| import time | |
| import json | |
| import traceback | |
| import math | |
| import requests | |
| import threading | |
| import logging | |
| from concurrent.futures import ThreadPoolExecutor | |
| from datetime import datetime, timedelta | |
| from flask import Flask, request, jsonify, Response | |
| from flask_cors import CORS | |
| import firebase_admin | |
| from firebase_admin import credentials, db, storage, auth | |
| from PIL import Image | |
| from google import genai | |
| from google.genai import types | |
| # ----------------------------------------------------------------------------- | |
| # 0. LOGGING CONFIGURATION | |
| # ----------------------------------------------------------------------------- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger("SOZO_ATHENA") | |
| # ----------------------------------------------------------------------------- | |
| # 1. CONFIGURATION & INITIALIZATION | |
| # ----------------------------------------------------------------------------- | |
| app = Flask(__name__) | |
| CORS(app) | |
| # --- Firebase Initialization --- | |
| try: | |
| logger.info("Initializing Firebase Admin SDK...") | |
| credentials_json_string = os.environ.get("FIREBASE") | |
| if not credentials_json_string: | |
| raise ValueError("The FIREBASE environment variable is not set.") | |
| credentials_json = json.loads(credentials_json_string) | |
| firebase_db_url = os.environ.get("Firebase_DB") | |
| firebase_storage_bucket = os.environ.get("Firebase_Storage") | |
| if not firebase_db_url or not firebase_storage_bucket: | |
| raise ValueError("Firebase_DB and Firebase_Storage environment variables must be set.") | |
| cred = credentials.Certificate(credentials_json) | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': firebase_db_url, | |
| 'storageBucket': firebase_storage_bucket | |
| }) | |
| logger.info("Firebase Admin SDK initialized successfully.") | |
| except Exception as e: | |
| logger.error(f"FATAL: Error initializing Firebase: {e}") | |
| exit(1) | |
| bucket = storage.bucket() | |
| db_ref = db.reference() | |
| # --- Google GenAI Client Initialization (Gemini 3.0 Ecosystem) --- | |
| try: | |
| logger.info("Initializing Google GenAI Client (Titaness Paradigm)...") | |
| api_key = os.environ.get("Gemini") | |
| if not api_key: | |
| raise ValueError("The 'Gemini' API key is not set.") | |
| client = genai.Client(api_key=api_key) | |
| logger.info("Google GenAI Client initialized successfully.") | |
| except Exception as e: | |
| logger.error(f"FATAL: Error initializing GenAI Client: {e}") | |
| exit(1) | |
| # Model Constants | |
| ATHENA_FLASH = "gemini-3-flash-preview" | |
| ATHENA_PRO = "gemini-3-pro-image-preview" | |
| # Grounding / External API Keys | |
| WOLFRAM_APP_ID = os.environ.get("WOLFRAM_APP_ID") | |
| OPENALEX_MAILTO = os.environ.get("OPENALEX_MAILTO", "rairo@sozofix.tech") | |
| # ----------------------------------------------------------------------------- | |
| # 2. HELPER FUNCTIONS & AUTHENTICATION | |
| # ----------------------------------------------------------------------------- | |
| # ----------------------------------------------------------------------------- | |
| # ODYSSEUS V10 CONSTANTS & LINTING | |
| # ----------------------------------------------------------------------------- | |
| FORBIDDEN_CODE_PATTERNS = [ | |
| r"\bfetch\s*\(", r"\bXMLHttpRequest\b", r"\bWebSocket\b", r"\blocalStorage\b", | |
| r"\bsessionStorage\b", r"\bdocument\.cookie\b", r"\bwindow\.location\b", | |
| r"\beval\s*\(", r"\bnew\s+Function\s*\(", r"\bimport\s*\(", r"\brequire\s*\(" | |
| ] | |
| REQUIRED_CODE_PATTERNS = [r"\bexport\s+default\s+Instrument\b"] | |
| ALLOWED_INTERACTIONS = {"drag", "dial", "toggle", "tap", "hold", "keys"} | |
| ALLOWED_WIN_TYPES = {"zone_dwell", "threshold", "sequence", "match"} | |
| def _strip_json_fences(s: str) -> str: | |
| s = (s or "").strip() | |
| if "```" in s: | |
| m = re.search(r"```(?:json)?\s*(.*?)\s*```", s, re.DOTALL) | |
| if m: return m.group(1).strip() | |
| return s | |
| def validate_manifest(m: dict) -> (bool, str): | |
| if not isinstance(m, dict): return False, "Manifest is not an object" | |
| for k in ("engine", "interaction", "win_rule"): | |
| if k not in m: return False, f"Manifest missing '{k}'" | |
| return True, "ok" | |
| def lint_component_code(code: str) -> (bool, list): | |
| errors = [] | |
| if not isinstance(code, str) or len(code.strip()) < 50: | |
| return False, ["component_code missing/too short"] | |
| for pat in FORBIDDEN_CODE_PATTERNS: | |
| if re.search(pat, code): errors.append(f"forbidden_pattern") | |
| for pat in REQUIRED_CODE_PATTERNS: | |
| if not re.search(pat, code): errors.append(f"missing_export_default_Instrument") | |
| if "onAction" not in code: errors.append("missing_call:onAction") | |
| if "onWin" not in code: errors.append("missing_call:onWin") | |
| return len(errors) == 0, errors | |
| def verify_token(auth_header): | |
| """Verifies Firebase ID token from the Authorization header.""" | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| logger.warning("Auth Header missing or malformed.") | |
| return None | |
| token = auth_header.split('Bearer ')[1] | |
| try: | |
| decoded_token = auth.verify_id_token(token) | |
| return decoded_token['uid'] | |
| except Exception as e: | |
| logger.error(f"Token verification failed: {e}") | |
| return None | |
| def verify_admin(auth_header): | |
| """Ensures the user has admin privileges in the Realtime Database.""" | |
| uid = verify_token(auth_header) | |
| if not uid: | |
| raise PermissionError('Invalid token') | |
| user_data = db_ref.child(f'users/{uid}').get() | |
| if not user_data or not user_data.get('is_admin', False): | |
| logger.warning(f"Unauthorized Admin access attempt by UID: {uid}") | |
| raise PermissionError('Admin access required') | |
| return uid | |
| def upload_to_storage(data_bytes, destination_blob_name, content_type): | |
| """Uploads bytes to Firebase Storage and returns the public URL.""" | |
| try: | |
| blob = bucket.blob(destination_blob_name) | |
| blob.upload_from_string(data_bytes, content_type=content_type) | |
| blob.make_public() | |
| return blob.public_url | |
| except Exception as e: | |
| logger.error(f"Firebase Storage Upload Error: {e}") | |
| return None | |
| def query_wolfram_alpha(query): | |
| """Fetches computational facts from Wolfram Alpha for grounding.""" | |
| if not WOLFRAM_APP_ID: | |
| return "Grounded in first principles." | |
| try: | |
| url = f"http://api.wolframalpha.com/v1/result?appid={WOLFRAM_APP_ID}&i={query}" | |
| response = requests.get(url, timeout=5) | |
| if response.status_code == 200: | |
| return response.text | |
| return "Constants verifying..." | |
| except Exception as e: | |
| logger.error(f"Wolfram Alpha Query Failed: {e}") | |
| return "Grounding in progress." | |
| # ----------------------------------------------------------------------------- | |
| # 3. TITANESS MEDIA ENGINE (CONSOLIDATED MASTER BLUEPRINT + ASYNC AUDIO) | |
| # ----------------------------------------------------------------------------- | |
| def generate_narration_task(text, uid, epiphany_id, layer_name): | |
| """Worker task for generating Deepgram audio in a thread.""" | |
| if not text or len(text) < 5: | |
| return layer_name, None | |
| try: | |
| logger.info(f"[THREAD] Starting Narration: {layer_name}") | |
| api_key = os.environ.get("DEEPGRAM_API_KEY") | |
| if not api_key: return layer_name, None | |
| DEEPGRAM_URL = "https://api.deepgram.com/v1/speak?model=aura-2-theia-en" | |
| headers = {"Authorization": f"Token {api_key}", "Content-Type": "text/plain"} | |
| response = requests.post(DEEPGRAM_URL, headers=headers, data=text.encode('utf-8')) | |
| response.raise_for_status() | |
| path = f"users/{uid}/epiphanies/{epiphany_id}/audio/{layer_name}.mp3" | |
| url = upload_to_storage(response.content, path, 'audio/mpeg') | |
| return layer_name, url | |
| except Exception as e: | |
| logger.error(f"Narration Task Error [{layer_name}]: {e}") | |
| return layer_name, None | |
| def generate_master_blueprint_task(subject, flattened_data, uid, epiphany_id): | |
| """Worker task for generating the 16:9 Consolidated Master Blueprint.""" | |
| try: | |
| logger.info(f"[THREAD] Starting Nano Banana Master Blueprint for: {subject}") | |
| # Build prompt context from synthesis | |
| summary = ( | |
| f"Genesis: {flattened_data.get('genesis', '')[:100]}... " | |
| f"Core: {flattened_data.get('scientific_core', '')[:100]}... " | |
| f"Edge: {flattened_data.get('engineering_edge', '')[:100]}... " | |
| f"Future: {flattened_data.get('cross_pollination', '')[:100]}..." | |
| ) | |
| prompt = ( | |
| f"Generate a single 4K Master Technical Blueprint for '{subject}'. " | |
| f"Layout: A detailed four-quadrant schematic diagram. " | |
| f"Quadrant 1: Genesis (Origins). Quadrant 2: Scientific Core (The Physics). " | |
| f"Quadrant 3: Engineering Edge (Design Limits). Quadrant 4: Cross-Pollination (Future Tech). " | |
| f"Aesthetic: White-line schematic style on a dark midnight navy background. " | |
| f"Style: Leonardo Da Vinci meets modern 4K engineering schematics. High accuracy. " | |
| f"Details to include: {summary}" | |
| ) | |
| response = client.models.generate_content( | |
| model=ATHENA_PRO, | |
| contents=prompt, | |
| config=types.GenerateContentConfig( | |
| image_config=types.ImageConfig(aspect_ratio="16:9", image_size="4K") | |
| ) | |
| ) | |
| image_parts = [part for part in response.parts if part.inline_data] | |
| if image_parts: | |
| image_bytes = image_parts[0].inline_data.data | |
| path = f"users/{uid}/epiphanies/{epiphany_id}/master_blueprint.png" | |
| return upload_to_storage(image_bytes, path, 'image/png') | |
| return None | |
| except Exception as e: | |
| logger.error(f"Master Blueprint Thread Error: {e}") | |
| return None | |
| #Prepare hi fidelity images to proper scale | |
| def prepare_vision_image(image_bytes): | |
| """ | |
| Resizes and optimizes the image to ensure it is under the 10MB Gemini limit. | |
| Returns a PIL Image object ready for the SDK. | |
| """ | |
| try: | |
| img = Image.open(io.BytesIO(image_bytes)) | |
| # 1. Convert to RGB to strip Alpha channels (saves 25% space) | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| # 2. If the file is huge, reduce the physical resolution. | |
| # 2000px is more than enough for Athena to see components. | |
| max_dimension = 2000 | |
| w, h = img.size | |
| if max(w, h) > max_dimension: | |
| scale = max_dimension / max(w, h) | |
| new_size = (int(w * scale), int(h * scale)) | |
| img = img.resize(new_size, Image.Resampling.LANCZOS) | |
| logger.info(f"Theia Vision: Resized from {w}x{h} to {new_size[0]}x{new_size[1]}") | |
| # 3. Final Quality Check: If the image is still dense, | |
| # we do an in-memory compression cycle to strip metadata. | |
| buffer = io.BytesIO() | |
| img.save(buffer, format="JPEG", quality=85, optimize=True) | |
| final_img = Image.open(buffer) | |
| final_size = len(buffer.getvalue()) | |
| logger.info(f"Theia Vision: Optimized payload size: {final_size / 1024 / 1024:.2f}MB") | |
| return final_img, final_size | |
| except Exception as e: | |
| logger.error(f"Theia Vision Scaler Failure: {e}") | |
| # Fallback to original | |
| return Image.open(io.BytesIO(image_bytes)) | |
| # ----------------------------------------------------------------------------- | |
| # 4. PRIMARY ENDPOINTS: GENERATE & THEIA SWEEP | |
| # ----------------------------------------------------------------------------- | |
| def image_proxy(): | |
| """Proxies images to bypass CORS/AI Studio Preview blocks.""" | |
| image_url = request.args.get('url') | |
| if not image_url: return jsonify({'error': 'URL missing'}), 400 | |
| try: | |
| resp = requests.get(image_url, timeout=10) | |
| return Response(resp.content, content_type=resp.headers.get('Content-Type', 'image/jpeg')) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def generate_epiphany(): | |
| """The Titaness Core: Synthesis + Feynman Scholar + Consolidated Blueprint.""" | |
| logger.info(">>> TITANESS GENERATION START") | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| user_ref = db_ref.child(f'users/{uid}') | |
| user_data = user_ref.get() or {} | |
| # Cost: 8 Sparks (Consolidated Synthesis + 4K Blueprint) | |
| if user_data.get('credits', 0) < 8: | |
| return jsonify({'error': 'Insufficient Sparks. (Full Synthesis requires 8 sparks)'}), 402 | |
| if 'image' not in request.files: | |
| return jsonify({'error': 'Visual image is required.'}), 400 | |
| image_file = request.files['image'] | |
| raw_bytes = image_file.read() | |
| # THE FIX: | |
| pil_image, scaled_size = prepare_vision_image(raw_bytes) | |
| logger.info(f"Titaness Input Size: {len(raw_bytes)/1024/1024:.2f}MB -> Scaled: {scaled_size/1024/1024:.2f}MB") | |
| try: | |
| # Step 1: Accurate Identification | |
| id_prompt = "Identify this precisely. Include specific model or species if visible. Reply with ONLY the name (max 5 words)." | |
| id_res = client.models.generate_content(model=ATHENA_FLASH, contents=[id_prompt, pil_image]) | |
| subject = id_res.text.strip() | |
| logger.info(f"Subject Identified: {subject}") | |
| # Step 2: Grounded Feynman Synthesis + Research | |
| physics_fact = query_wolfram_alpha(f"physics laws and constants of {subject}") | |
| synthesis_prompt = f""" | |
| Act as Athena. Analyze '{subject}' grounded in: {physics_fact}. | |
| Style: Richard Feynman. Simple analogies, profound engineering truths. | |
| Tasks: | |
| 1. Search the web (ArXiv, Patents, Journals) for 3 diverse sources about {subject} using Google Search. | |
| 2. Create 4 Discovery Layers (genesis, scientific_core, engineering_edge, cross_pollination). | |
| 3. For each research source, provide Title, URL, and a 2-sentence Feynman Summary. | |
| MANDATORY JSON SCHEMA (FLAT): | |
| {{ | |
| "title": "Epic Title", | |
| "genesis": "The origin/evolutionary logic.", | |
| "scientific_core": "The first principles of physics.", | |
| "engineering_edge": "Detailed stress/composition analysis.", | |
| "cross_pollination": "Interdisciplinary application.", | |
| "scholar": [ | |
| {{"title": "Title", "url": "URL", "feynman_summary": "Summary"}} | |
| ] | |
| }} | |
| """ | |
| synth_res = client.models.generate_content( | |
| model=ATHENA_FLASH, | |
| contents=[synthesis_prompt, pil_image], | |
| config=types.GenerateContentConfig( | |
| tools=[{"google_search": {}}], | |
| response_mime_type='application/json' | |
| ) | |
| ) | |
| # --- SCHEMA SENTINEL: Handle JSON Code Blocks & Lists --- | |
| raw_json = synth_res.text.strip() | |
| if "```json" in raw_json: | |
| raw_json = re.search(r'```json\n(.*?)\n```', raw_json, re.DOTALL).group(1) | |
| data = json.loads(raw_json) | |
| if isinstance(data, list): data = data[0] | |
| # Flatten any AI nesting (e.g. data['epiphany']['genesis']) | |
| if "epiphany" in data: data = data["epiphany"] | |
| elif "discovery_layers" in data: data = data["discovery_layers"] | |
| layers_list = ['genesis', 'scientific_core', 'engineering_edge', 'cross_pollination'] | |
| # Guard against missing keys | |
| for k in layers_list: | |
| if k not in data: data[k] = f"Analyzing first principles of {k.replace('_', ' ')}..." | |
| epiphany_id = str(uuid.uuid4()) | |
| # Step 3: PARALLEL TITANESS MEDIA ENGINE | |
| logger.info(f"Step 3: Launching media threads for {epiphany_id}") | |
| audios = {} | |
| master_blueprint_url = None | |
| with ThreadPoolExecutor(max_workers=5) as executor: | |
| # 4 Audio Threads + 1 Master Blueprint Thread | |
| aud_futures = [executor.submit(generate_narration_task, data.get(l), uid, epiphany_id, l) for l in layers_list] | |
| blu_future = executor.submit(generate_master_blueprint_task, subject, data, uid, epiphany_id) | |
| for f in aud_futures: | |
| k, v = f.result() | |
| audios[k] = v | |
| master_blueprint_url = blu_future.result() | |
| # Step 4: Record Persistence | |
| image_url = upload_to_storage(raw_bytes, f"users/{uid}/epiphanies/{epiphany_id}/vision.jpg", 'image/jpeg') | |
| epiphany_record = { | |
| "epiphanyId": epiphany_id, | |
| "uid": uid, | |
| "title": data.get('title', f"Insight: {subject}"), | |
| "subject": subject, | |
| "imageURL": image_url, | |
| "masterBlueprint": master_blueprint_url, | |
| "layers": { | |
| l: {"text": data.get(l, ""), "audio": audios.get(l)} for l in layers_list | |
| }, | |
| "scholar": data.get('scholar', []), | |
| "createdAt": datetime.utcnow().isoformat() | |
| } | |
| db_ref.child(f'epiphanies/{epiphany_id}').set(epiphany_record) | |
| user_ref.update({'credits': user_data.get('credits', 0) - 8}) | |
| logger.info(f"TITANESS SUCCESS: {epiphany_id}") | |
| return jsonify(epiphany_record), 201 | |
| except Exception as e: | |
| logger.error(f"Global Epiphany Gen Error: {e}\n{traceback.format_exc()}") | |
| return jsonify({'error': str(e)}), 500 | |
| def theia_sweep(): | |
| """Standalone Theia Mode: Bounding Box Annotations with Automatic Image Scaling.""" | |
| logger.info(">>> THEIA SWEEP INITIATED") | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| epiphany_id = request.form.get('epiphanyId') | |
| if not epiphany_id: return jsonify({'error': 'epiphanyId is required.'}), 400 | |
| # 1. Cache Check | |
| epiphany_ref = db_ref.child(f'epiphanies/{epiphany_id}') | |
| existing_data = epiphany_ref.get() or {} | |
| if 'annotations' in existing_data: | |
| logger.info(f"Theia: Returning cached annotations for {epiphany_id}") | |
| return jsonify({"annotations": existing_data['annotations'], "status": "cached"}), 200 | |
| # 2. Credit Check | |
| user_ref = db_ref.child(f'users/{uid}') | |
| user_data = user_ref.get() or {} | |
| if user_data.get('credits', 0) < 4: | |
| return jsonify({'error': 'Need 4 Sparks for a Theia Sweep.'}), 402 | |
| # 3. Image Handling with Scaler | |
| if 'image' not in request.files: | |
| return jsonify({'error': 'image file is required.'}), 400 | |
| image_file = request.files['image'] | |
| raw_bytes = image_file.read() | |
| # --- THE CRITICAL FIX --- | |
| # We process the raw bytes into a scaled/optimized PIL Image | |
| pil_image, scaled_size = prepare_vision_image(raw_bytes) | |
| logger.info(f"Theia Post-Scale Size: {scaled_size / (1024 * 1024):.2f} MB") | |
| subject = existing_data.get('subject', 'Complex System') | |
| sweep_prompt = f""" | |
| Theia Mode Activation: {subject}. | |
| Use Python Code Execution for spatial deconstruction. | |
| Identify every functional component. For each return: | |
| 1. label: name of the part. | |
| 2. coordinates: [ymin, xmin, ymax, xmax] (normalized 0-1000). | |
| 3. micro_epiphany: 20-word Feynman secret. | |
| Return JSON list ONLY. | |
| """ | |
| try: | |
| # Pass the optimized PIL image to the SDK | |
| res = client.models.generate_content( | |
| model=ATHENA_FLASH, | |
| contents=[sweep_prompt, pil_image], | |
| config=types.GenerateContentConfig( | |
| tools=[types.Tool(code_execution=types.ToolCodeExecution)], | |
| response_mime_type='application/json' | |
| ) | |
| ) | |
| raw_json = res.text.strip() | |
| if "```json" in raw_json: | |
| raw_json = re.search(r'```json\n(.*?)\n```', raw_json, re.DOTALL).group(1) | |
| elif "```" in raw_json: | |
| raw_json = raw_json.replace("```", "").strip() | |
| annotations = json.loads(raw_json) | |
| # 4. Persistence & Deduction | |
| epiphany_ref.update({"annotations": annotations}) | |
| user_ref.update({'credits': user_data.get('credits', 0) - 4}) | |
| logger.info(f"THEIA SUCCESS: {len(annotations)} annotations found.") | |
| return jsonify({"annotations": annotations}), 200 | |
| except Exception as e: | |
| logger.error(f"Theia Sweep Execution Error: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def deep_dive(): | |
| """Manual Zoom analysis (1 Spark).""" | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| if 'image' not in request.files: return jsonify({'error': 'No image'}), 400 | |
| try: | |
| pil_image = Image.open(io.BytesIO(request.files['image'].read())).convert('RGB') | |
| res = client.models.generate_content( | |
| model=ATHENA_FLASH, | |
| contents=["In 50 words Feynman style, reveal the hidden significance of this zoomed detail.", pil_image] | |
| ) | |
| # Deduct 1 spark | |
| user_ref = db_ref.child(f'users/{uid}') | |
| new_val = max(0, (user_ref.get().get('credits', 0) or 0) - 1) | |
| user_ref.update({'credits': new_val}) | |
| return jsonify({"analysis": res.text.strip()}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # ODYSSEUS ENGINE V10.1: HARDENED GAME CONSOLE (SYNTAX GUARD + VICTORY HOOKS) | |
| # DIFFICULTY CALIBRATION UPDATE: Human-feasible challenges with generous tolerances | |
| # ----------------------------------------------------------------------------- | |
| # Enhanced Forbidden Patterns to prevent "Unexpected Token" errors in Babel | |
| FORBIDDEN_CODE_PATTERNS = [ | |
| r"<!--[\s\S]*?-->", # No HTML comments (Babel killer) | |
| r"\bonclick\b", # Must be onClick | |
| r"\bonmousedown\b", # Must be onMouseDown | |
| r"\bclass=", # Must be className | |
| r"\bfetch\s*\(", # Security | |
| r"\bXMLHttpRequest\b", | |
| r"\bWebSocket\b", | |
| r"\beval\s*\(", | |
| r"\bnew\s+Function\s*\(", | |
| ] | |
| REQUIRED_CODE_PATTERNS = [ | |
| r"\bexport\s+default\s+Instrument\b", | |
| r"props\.onAction", # Must implement handshake | |
| r"props\.onWin" # Must implement victory | |
| ] | |
| def lint_component_code(code: str) -> (bool, list): | |
| """Surgical syntax check for codegen reliability.""" | |
| errors = [] | |
| if not isinstance(code, str) or len(code.strip()) < 50: | |
| return False, ["component_code missing or too short"] | |
| for pat in FORBIDDEN_CODE_PATTERNS: | |
| if re.search(pat, code, re.IGNORECASE): | |
| errors.append(f"Syntax Violation: Found illegal pattern '{pat}'") | |
| for pat in REQUIRED_CODE_PATTERNS: | |
| if not re.search(pat, code): | |
| errors.append(f"Protocol Violation: Missing required pattern '{pat}'") | |
| # Ensure no raw HTML-style comments are present | |
| if "<!--" in code or "-->" in code: | |
| errors.append("Syntax Error: Use {/* */} for comments in JSX, not <!-- -->") | |
| return len(errors) == 0, errors | |
| def generate_trial(): | |
| """ | |
| Odysseus v10.1: The Hardened Instrumentalist. | |
| Architects bespoke, interactive React games with a 3-pass syntax repair loop. | |
| Cost: 1 Spark. | |
| DIFFICULTY TUNING: Now calibrated for human-achievable gameplay with forgiving mechanics. | |
| """ | |
| logger.info(">>> ODYSSEUS V10.1: FORGING HARDENED INSTRUMENT") | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({"error": "Unauthorized"}), 401 | |
| payload = request.get_json(silent=True) or {} | |
| ep_id, layer_key, subject = payload.get("epiphanyId"), payload.get("layerKey"), payload.get("subject") | |
| if not all([ep_id, layer_key, subject]): | |
| return jsonify({"error": "Missing context parameters."}), 400 | |
| # 1. Cache Check | |
| trial_path = f"epiphanies/{ep_id}/trials/{layer_key}" | |
| existing = db_ref.child(trial_path).get() | |
| if existing: return jsonify(existing), 200 | |
| # 2. Concurrency Lock | |
| lock_ref = db_ref.child(f"epiphanies/{ep_id}/trialLocks/{layer_key}") | |
| if lock_ref.get(): return jsonify({"status": "locked"}), 409 | |
| lock_ref.set({"uid": uid, "at": datetime.utcnow().isoformat()}) | |
| # 3. Load Scientific Context | |
| layer_obj = db_ref.child(f"epiphanies/{ep_id}/layers/{layer_key}").get() or {} | |
| ctx_text = layer_obj.get("text", "General scientific principles.") | |
| attempts, forged, last_raw, last_errs = 0, None, "", [] | |
| base_prompt = f""" | |
| You are Athena's Master Game Architect. Forge an interactive scientific 'Instrument' for {subject}. | |
| Context: {ctx_text} | |
| TASK: Write a React component named 'Instrument'. | |
| This is a GAME that requires user interaction to achieve a scientific victory. | |
| TECHNICAL CONSTRAINTS (STRICT): | |
| 1. REACT ONLY: Use React hooks (useState, useEffect, useRef). No external libs. Use SVG/Canvas. | |
| 2. HANDSHAKE: Component receives `{{ onAction, onWin }}` as props. | |
| - Call `props.onAction()` the moment the user interacts. | |
| - Call `props.onWin()` when the specific scientific goal is reached. | |
| 3. WIN CONDITION: You MUST define a logic state (e.g., matching a frequency) that triggers `onWin()`. | |
| 4. NO COMMENTS: Do not use HTML comments <!-- -->. Use {{/* */}} if needed. | |
| 5. CAMELCASE: Use className, onClick, onMouseDown, etc. | |
| 6. ENTROPY: The system must fail/decay if the user stops interacting. | |
| DIFFICULTY CALIBRATION (CRITICAL - HUMAN-FEASIBLE DESIGN): | |
| 7. GENEROUS TOLERANCES: Use ±10-20% margins for success conditions. "Close enough" = victory. | |
| - Example: Target frequency 440Hz? Accept 400-480Hz range. | |
| - Example: Alignment needed? Accept ±15px margin, not pixel-perfect. | |
| 8. HUMAN TIMING: Design for 300-800ms reaction windows. No frame-perfect precision required. | |
| 9. CLEAR FEEDBACK: Show visual progress indicators (color gradients, meters, proximity bars). | |
| - Player should always know: "Am I getting closer?" and "How close am I to winning?" | |
| 10. ACHIEVABLE IN 20-60 SECONDS: Average player with focus should win in under a minute. | |
| 11. FORGIVING MISTAKES: Don't reset to zero on errors. Allow recovery and course correction. | |
| 12. EXPLICIT SUCCESS METRICS: controls_guide must state exact winning thresholds. | |
| GOOD DIFFICULTY EXAMPLES: | |
| - "Stabilize the pendulum angle between -10° and +10° for 3 seconds" | |
| - "Match the wavelength within ±15nm (target: 550nm)" | |
| - "Keep reaction rate in green zone (60-80%) for 5 seconds" | |
| - "Align particles until overlap meter shows >75%" | |
| BAD DIFFICULTY EXAMPLES (AVOID): | |
| - "Match exactly 440.00000Hz" (too precise) | |
| - "Click within 50ms of the flash" (inhuman timing) | |
| - "Perfect circular motion required" (impossible with mouse/touch) | |
| JSON OUTPUT SCHEMA: | |
| {{ | |
| "engine": "odysseus_v10.1", | |
| "instrument_name": "Title", | |
| "mission_brief": "Goal with quantified success criteria.", | |
| "controls_guide": "Explicit manual: 'Drag slider to keep value between X and Y', 'Maintain green zone for Z seconds to win'.", | |
| "component_code": "const Instrument = (props) => {{ ... }}; export default Instrument;", | |
| "feynman_truth": "Scientific revelation." | |
| }} | |
| """ | |
| try: | |
| while attempts < 3 and not forged: | |
| attempts += 1 | |
| current_prompt = base_prompt if attempts == 1 else f"REPAIR LOGIC ERROR: {last_errs}\nPREVIOUS CODE: {last_raw}\n\nREMINDER: Make the game HUMANLY ACHIEVABLE with generous tolerances (±15% margins) and clear visual feedback showing progress." | |
| res = client.models.generate_content( | |
| model=ATHENA_FLASH, | |
| contents=current_prompt, | |
| config=types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| last_raw = _strip_json_fences(res.text) | |
| candidate = json.loads(last_raw) | |
| # Validation Step | |
| code = candidate.get("component_code", "") | |
| ok_c, last_errs = lint_component_code(code) | |
| if ok_c: | |
| forged = candidate | |
| break | |
| else: | |
| logger.warning(f"Attempt {attempts} failed syntax check: {last_errs}") | |
| if not forged: | |
| lock_ref.delete() | |
| return jsonify({"status": "failed", "errors": last_errs}), 422 | |
| # 4. Atomic Credit Check & Deduction (1 Spark) | |
| user_ref = db_ref.child(f'users/{uid}') | |
| def credits_txn(cur): | |
| if (cur or 0) < 1: return cur | |
| return cur - 1 | |
| updated_credits = user_ref.child('credits').transaction(credits_txn) | |
| if updated_credits is None: # Transaction aborted/failed sparks | |
| lock_ref.delete() | |
| return jsonify({"error": "Insufficient Sparks"}), 402 | |
| # 5. Persist & Return | |
| forged["createdAt"] = datetime.utcnow().isoformat() | |
| db_ref.child(trial_path).set(forged) | |
| lock_ref.delete() | |
| logger.info(f"Odysseus v10.1 success for {subject} in {attempts} attempt(s).") | |
| return jsonify(forged), 201 | |
| except Exception as e: | |
| lock_ref.delete() | |
| logger.error(f"v10.1 Global Failure: {e}") | |
| return jsonify({"error": "The laws of the universe failed to stabilize. Retry."}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # 5. THE CHIRON MENTOR & SYSTEM UTILS | |
| # ----------------------------------------------------------------------------- | |
| def get_chiron_briefing(): | |
| """Preps ElevenLabs Chiron with scientific context from the last Epiphany.""" | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| try: | |
| # Fetch last epiphany for context | |
| last_eps = db_ref.child('epiphanies').order_by_child('uid').equal_to(uid).limit_to_last(1).get() or {} | |
| context_str = "Beginning an academic journey." | |
| if last_eps: | |
| eid = list(last_eps.keys())[0] | |
| e = last_eps[eid] | |
| scholar_sample = e.get('scholar', [])[:1] | |
| context_str = f"Focus: {e['subject']}. Papers Analyzed: {scholar_sample[0]['title'] if scholar_sample else 'General'}" | |
| brief_prompt = f"Prep Chiron (Socratic Mentor). Context: {context_str}. Write a 4-sentence briefing for a voice call." | |
| res = client.models.generate_content(model=ATHENA_FLASH, contents=[brief_prompt]) | |
| return jsonify({"memory_summary": res.text.strip()}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def log_call_usage(): | |
| """Handles Spark deduction (3 per min) for ElevenLabs sessions.""" | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| data = request.get_json() | |
| duration = data.get("durationSeconds", 0) | |
| cost = math.ceil(duration / 60) * 3 | |
| try: | |
| user_ref = db_ref.child(f'users/{uid}') | |
| current = user_ref.get().get('credits', 0) | |
| new_bal = max(0, current - cost) | |
| user_ref.update({'credits': new_bal}) | |
| if data.get("transcript"): | |
| db_ref.child(f'transcripts/{uid}').push({ | |
| "text": data["transcript"], "createdAt": datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({"success": True, "remainingCredits": new_bal}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # 6. ADMIN & FEEDBACK | |
| # ----------------------------------------------------------------------------- | |
| def admin_dashboard(): | |
| try: | |
| verify_admin(request.headers.get('Authorization')) | |
| users = db_ref.child('users').get() or {} | |
| epiphanies = db_ref.child('epiphanies').get() or {} | |
| credit_reqs = db_ref.child('credit_requests').get() or {} | |
| return jsonify({ | |
| "total_users": len(users), | |
| "total_epiphanies": len(epiphanies), | |
| "pending_requests": len([r for r in credit_reqs.values() if r.get('status') == 'pending']) | |
| }), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 403 | |
| def submit_feedback(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| data = request.json | |
| db_ref.child('feedback').push({ | |
| "uid": uid, "message": data.get('message'), "at": datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({"success": True}), 201 | |
| def request_sparks(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| db_ref.child('credit_requests').push({ | |
| "uid": uid, "status": "pending", "at": datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({"success": True}), 201 | |
| # ----------------------------------------------------------------------------- | |
| # 7. AUTHENTICATION & USER PROFILE | |
| # ----------------------------------------------------------------------------- | |
| def signup(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| user_ref = db_ref.child(f'users/{uid}') | |
| if not user_ref.get(): | |
| data = request.json | |
| user_ref.set({ | |
| 'email': data.get('email'), 'displayName': data.get('displayName', 'Seeker'), | |
| 'credits': 30, 'is_admin': False, 'createdAt': datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({'uid': uid, **user_ref.get()}), 201 | |
| def social_signin(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| user_ref = db_ref.child(f'users/{uid}') | |
| u = user_ref.get() | |
| if not u: | |
| fu = auth.get_user(uid) | |
| u = { | |
| 'email': fu.email, 'displayName': fu.display_name or 'Seeker', | |
| 'credits': 30, 'is_admin': False, 'createdAt': datetime.utcnow().isoformat() | |
| } | |
| user_ref.set(u) | |
| return jsonify({'uid': uid, **u}), 200 | |
| def get_profile(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| return jsonify(db_ref.child(f'users/{uid}').get()) | |
| def list_epiphanies(): | |
| uid = verify_token(request.headers.get('Authorization')) | |
| if not uid: return jsonify({'error': 'Unauthorized'}), 401 | |
| res = db_ref.child('epiphanies').order_by_child('uid').equal_to(uid).get() or {} | |
| return jsonify(list(res.values())) | |
| # ----------------------------------------------------------------------------- | |
| # 8. MAIN EXECUTION | |
| # ----------------------------------------------------------------------------- | |
| if __name__ == '__main__': | |
| logger.info("Titaness Definitive Backbone Active on port 7860...") | |
| app.run(debug=False, host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) |