Spaces:
Running
Running
| """ | |
| backend/app/main.py β HF Spaces-compatible version | |
| Key differences from local dev: | |
| 1. PORT = 7860 (HF default) β configurable via $PORT env var | |
| 2. Supports a bundled same-origin frontend, plus external UI origins when configured | |
| 3. All data dir creation is in-memory safe (dirs reset on restart) | |
| 4. Daemon uses asyncio.create_task, not threading β safer in HF's container | |
| 5. Lifespan has singleton guard so --reload doesn't double-start services | |
| 6. /health supports HEAD (HF health checker uses HEAD) | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| from datetime import datetime | |
| import json | |
| import logging | |
| import math | |
| import os | |
| import subprocess | |
| import time | |
| import uuid | |
| from contextlib import asynccontextmanager | |
| from typing import Optional, List, Dict, Any, Union | |
| from fastapi import FastAPI, HTTPException, Request, UploadFile, File, Form | |
| from fastapi.responses import JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(name)s β %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| _FRONTEND_PROXY_METHODS = ["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"] | |
| _HOP_BY_HOP_HEADERS = { | |
| "connection", | |
| "keep-alive", | |
| "proxy-authenticate", | |
| "proxy-authorization", | |
| "te", | |
| "trailer", | |
| "transfer-encoding", | |
| "upgrade", | |
| "content-encoding", | |
| "content-length", | |
| } | |
| _JANUS_PROVIDER_MODELS = [ | |
| { | |
| "id": "janus-chat", | |
| "object": "model", | |
| "created": 1776500000, | |
| "owned_by": "janus", | |
| "description": "General Janus cognitive chat model backed by routing, memory, simulation, and verification.", | |
| }, | |
| { | |
| "id": "janus-reasoner", | |
| "object": "model", | |
| "created": 1776500000, | |
| "owned_by": "janus", | |
| "description": "Janus reasoning mode with stronger deliberation and simulation for uncertain tasks.", | |
| }, | |
| { | |
| "id": "janus-markets", | |
| "object": "model", | |
| "created": 1776500000, | |
| "owned_by": "janus", | |
| "description": "Janus market-aware mode with seeded global market and company knowledge.", | |
| }, | |
| { | |
| "id": "janus-embed", | |
| "object": "model", | |
| "created": 1776500000, | |
| "owned_by": "janus", | |
| "description": "Deterministic Janus embedding model for semantic lookup and retrieval workflows.", | |
| }, | |
| ] | |
| # ββ Singleton guards βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _started = False | |
| _services: dict = {} | |
| async def lifespan(app: FastAPI): | |
| global _started | |
| # 1. Ensure runtime dirs exist (they will be empty after HF Space restart) | |
| _ensure_dirs() | |
| # 2. Log configuration warnings β never crash on missing optional keys | |
| _log_config_warnings() | |
| # 3. Response cache (always on) | |
| try: | |
| from app.services.response_cache import ResponseCache | |
| _services["cache"] = ResponseCache() | |
| app.state.cache = _services["cache"] | |
| logger.info("ResponseCache ready") | |
| except Exception as e: | |
| logger.warning("ResponseCache unavailable: %s", e) | |
| app.state.cache = None | |
| # 4. Compile LangGraph β degrade to 503 on /run rather than crash | |
| app.state.graph = None | |
| app.state.graph_error = "none" | |
| try: | |
| from app.graph import get_compiled_graph | |
| app.state.graph = get_compiled_graph() | |
| logger.info("LangGraph pipeline compiled OK") | |
| except Exception as e: | |
| import traceback | |
| app.state.graph_error = f"{e}\n{traceback.format_exc()}" | |
| logger.error("LangGraph build FAILED: %s β /run will 503", e) | |
| # 4b. Core cognition services used by the live request path | |
| try: | |
| from app.services.adaptive_intelligence import adaptive_intelligence | |
| from app.services.context_engine import context_engine | |
| from app.services.memory_manager import memory_manager | |
| from app.services.reflex_layer import reflex_layer | |
| from app.services.self_reflection import self_reflection | |
| app.state.adaptive = adaptive_intelligence | |
| app.state.context_engine = context_engine | |
| app.state.memory_manager = memory_manager | |
| app.state.reflex_layer = reflex_layer | |
| app.state.self_reflection = self_reflection | |
| logger.info("Core cognition services ready") | |
| except Exception as e: | |
| logger.error("Core cognition service init failed: %s", e) | |
| # 4c. Learning services for case-level experience accumulation | |
| app.state.learning_engine = None | |
| try: | |
| from app.config import get_config | |
| from app.routers import learning as learning_router_module | |
| learning_config = get_config() | |
| learning_router_module.init_learning_services(learning_config) | |
| app.state.learning_engine = learning_router_module.learning_engine | |
| if learning_config.learning_enabled: | |
| learning_router_module.start_scheduler_background() | |
| logger.info("Learning services ready") | |
| except Exception as e: | |
| logger.error("Learning services failed to initialize: %s", e) | |
| # 4d. Observation, curation, and classification services | |
| try: | |
| from app.services.curation import curator, hf_pusher | |
| from app.services.domain_classifier import domain_classifier | |
| from app.services.observation import get_tracer, scorer | |
| from app.services.query_classifier import QueryClassifier | |
| app.state.curator = curator | |
| app.state.domain_classifier = domain_classifier | |
| app.state.hf_pusher = hf_pusher | |
| app.state.query_classifier = QueryClassifier() | |
| app.state.trace_scorer = scorer | |
| app.state.tracer = get_tracer() | |
| logger.info("Observation and classification services ready") | |
| except Exception as e: | |
| logger.error("Observation/classification init failed: %s", e) | |
| # 4e. Kaggle Training and Persistence (HF Spaces) | |
| try: | |
| from app.services.model_training_scheduler import ModelTrainingScheduler | |
| from app.services.curation.persistence_manager import PersistenceManager | |
| from app.services.metrics_collector import MetricsCollector | |
| persistence = PersistenceManager() | |
| # Pull data on startup to survive restarts | |
| persistence.download_all() | |
| scheduler = ModelTrainingScheduler() | |
| app.state.training_scheduler = scheduler | |
| app.state.metrics_collector = MetricsCollector() | |
| # Start training schedule in background | |
| asyncio.create_task(scheduler.start_schedule(interval_hours=168)) | |
| logger.info("Kaggle training and persistence services ready") | |
| except Exception as e: | |
| logger.error("Kaggle/Persistence init failed: %s", e) | |
| try: | |
| await _start_frontend_server() | |
| except Exception as e: | |
| logger.error("Bundled frontend failed to start: %s", e) | |
| # 5. DB & Daemon β exactly once even if uvicorn reloads | |
| if not _started: | |
| _started = True | |
| # Database Initialization (Tables + pgvector) | |
| try: | |
| from app.db.session import init_db | |
| from app.core.runtime_state import runtime_state | |
| try: | |
| await init_db() | |
| runtime_state.db_mode = "postgres" | |
| runtime_state.db_ready = True | |
| logger.info("Database initialized successfully (Tables + Extension).") | |
| except Exception as e: | |
| runtime_state.db_mode = "degraded" | |
| runtime_state.db_ready = False | |
| runtime_state.reason = str(e) | |
| logger.error(f"Database initialization failed: {e}") | |
| except Exception as e: | |
| logger.error(f"Platform-level initialization failed: {e}") | |
| try: | |
| from app.services.daemon import JanusDaemon | |
| import concurrent.futures | |
| daemon = JanusDaemon() | |
| # Daemon is now async and handles internal loops with await asyncio.sleep | |
| daemon_task = asyncio.create_task(daemon.run()) | |
| _services["daemon_task"] = daemon_task | |
| _services["daemon"] = daemon | |
| logger.info("Daemon async task started") | |
| except Exception as e: | |
| logger.error("Daemon failed to start: %s", e) | |
| logger.info("Janus ready on port %s", os.getenv("PORT", "7860")) | |
| yield # β server is live | |
| # Shutdown | |
| for name, svc in _services.items(): | |
| try: | |
| if asyncio.iscoroutine(svc) or asyncio.isfuture(svc): | |
| svc.cancel() | |
| elif hasattr(svc, "stop"): | |
| stop = svc.stop() | |
| if asyncio.iscoroutine(stop): | |
| await stop | |
| elif hasattr(svc, "poll") and hasattr(svc, "terminate"): | |
| if svc.poll() is None: | |
| svc.terminate() | |
| try: | |
| svc.wait(timeout=10) | |
| except Exception: | |
| svc.kill() | |
| except Exception as e: | |
| logger.error("Shutdown error for %s: %s", name, e) | |
| def _ensure_dirs(): | |
| """Create runtime data dirs β called at every startup since HF FS is ephemeral.""" | |
| try: | |
| from app.config import ensure_data_dirs | |
| ensure_data_dirs() | |
| return | |
| except Exception as e: | |
| logger.warning("ensure_data_dirs() failed, using minimal dir set: %s", e) | |
| import pathlib | |
| base = pathlib.Path(__file__).parent / "data" | |
| for d in [ | |
| "memory", | |
| "simulations", | |
| "logs", | |
| "knowledge", | |
| "skills", | |
| "prompt_versions", | |
| "learning", | |
| "adaptive", | |
| "cache", | |
| "sentinel", | |
| "sentinel/pending_patches", | |
| ]: | |
| (base / d).mkdir(parents=True, exist_ok=True) | |
| def _log_config_warnings(): | |
| """Warn about missing keys β useful in HF Space logs.""" | |
| provider = os.getenv("PRIMARY_PROVIDER", "huggingface") | |
| key_map = { | |
| "huggingface": "HUGGINGFACE_API_KEY", | |
| "openrouter": "OPENROUTER_API_KEY", | |
| "openai": "OPENAI_API_KEY", | |
| "groq": "GROQ_API_KEY", | |
| "gemini": "GEMINI_API_KEY", | |
| } | |
| key_name = key_map.get(provider, "HUGGINGFACE_API_KEY") | |
| if not os.getenv(key_name): | |
| logger.warning( | |
| "β %s is not set in Space Secrets β LLM calls will fail", key_name | |
| ) | |
| if not os.getenv("TAVILY_API_KEY"): | |
| logger.warning("β TAVILY_API_KEY not set β web search disabled") | |
| if not any( | |
| [ | |
| os.getenv("ALPHAVANTAGE_API_KEY"), | |
| os.getenv("FINNHUB_API_KEY"), | |
| os.getenv("FMP_API_KEY"), | |
| ] | |
| ): | |
| logger.warning( | |
| "β No market data API key set β historical charts will use yfinance only" | |
| ) | |
| if os.getenv("SPACE_ID") and not os.getenv("HF_STORE_REPO"): | |
| logger.warning( | |
| "β Running on HF Space but HF_STORE_REPO not set. " | |
| "All memory/cases/skills will be LOST on every restart. " | |
| "Create a private dataset repo and add HF_STORE_REPO=username/janus-memory to Secrets." | |
| ) | |
| def _normalize_route(route: Optional[dict]) -> dict: | |
| normalized = dict(route or {}) | |
| domain = normalized.get("domain_pack") or normalized.get("domain") or "general" | |
| normalized.setdefault("domain", domain) | |
| normalized.setdefault("domain_pack", domain) | |
| if "execution_mode" not in normalized: | |
| if normalized.get("requires_simulation"): | |
| normalized["execution_mode"] = "simulation" | |
| elif normalized.get("requires_finance_data"): | |
| normalized["execution_mode"] = "finance" | |
| else: | |
| normalized["execution_mode"] = "standard" | |
| return normalized | |
| def _merge_context(base: dict, incoming: Optional[dict]) -> dict: | |
| if not incoming: | |
| return base | |
| merged = dict(base) | |
| for key, value in incoming.items(): | |
| if isinstance(merged.get(key), dict) and isinstance(value, dict): | |
| merged[key] = _merge_context(merged[key], value) | |
| else: | |
| merged[key] = value | |
| return merged | |
| def _time_of_day() -> str: | |
| hour = datetime.now().hour | |
| if 5 <= hour < 12: | |
| return "morning" | |
| if 12 <= hour < 17: | |
| return "afternoon" | |
| if 17 <= hour < 22: | |
| return "evening" | |
| return "late night" | |
| def _build_runtime_context(app: FastAPI, user_input: str, requested: Optional[dict]) -> dict: | |
| from app.services.context_engine import context_engine | |
| from app.services.memory_manager import memory_manager | |
| from app.services.self_reflection import self_reflection | |
| from app.services.user_analyzer import user_analyzer | |
| context = context_engine.build_context(user_input) | |
| user_state = user_analyzer.analyze_query(user_input) | |
| daemon = _services.get("daemon") | |
| daemon_thoughts = list(getattr(daemon, "_pending_thoughts", [])[:3]) if daemon else [] | |
| existing_thoughts = context.get("system_self", {}).get("pending_thoughts", []) | |
| thought_map = {} | |
| for thought in [*daemon_thoughts, *existing_thoughts]: | |
| text = thought.get("thought", "") | |
| if text and text not in thought_map: | |
| thought_map[text] = thought | |
| recent_discoveries = [] | |
| if daemon and hasattr(daemon, "curiosity"): | |
| try: | |
| recent_discoveries = daemon.curiosity.get_discoveries(limit=3) | |
| except Exception: | |
| recent_discoveries = [] | |
| total_cases = memory_manager.total_cases() if hasattr(memory_manager, "total_cases") else 0 | |
| gaps = self_reflection.get_gaps()[:5] | |
| context["system_self"] = { | |
| **context.get("system_self", {}), | |
| "pending_thoughts": list(thought_map.values())[:5], | |
| "recent_discoveries": recent_discoveries, | |
| "capabilities": [ | |
| "research", | |
| "simulation", | |
| "planning", | |
| "verification", | |
| "financial analysis", | |
| ], | |
| "weaknesses": [gap.get("reason", "") for gap in gaps[:3] if gap.get("reason")], | |
| "total_cases_analyzed": total_cases, | |
| "uptime": f"{getattr(daemon, 'cycle_count', 0)} daemon cycles" if daemon else "live session", | |
| } | |
| context["self_reflection"] = { | |
| "opinions": self_reflection.get_opinions()[:5], | |
| "corrections": self_reflection.get_corrections()[:5], | |
| "gaps": gaps, | |
| "self_model": getattr(self_reflection, "self_model", {}), | |
| } | |
| context["user_persona"] = user_state | |
| context["memory"] = { | |
| "similar_cases": memory_manager.find_similar(user_input, top_k=5) | |
| } | |
| adaptive = getattr(app.state, "adaptive", None) | |
| if adaptive and hasattr(adaptive, "get_context_for_query"): | |
| try: | |
| context["adaptive_intelligence"] = adaptive.get_context_for_query( | |
| user_input, "general" | |
| ) | |
| except Exception as e: | |
| logger.debug("Adaptive context unavailable: %s", e) | |
| context["daemon"] = { | |
| "running": daemon is not None, | |
| "cycle_count": getattr(daemon, "cycle_count", 0), | |
| "circadian_phase": daemon.circadian.get_current_phase().value | |
| if daemon and hasattr(daemon, "circadian") | |
| else "offline", | |
| } | |
| context["environment"] = {"time_of_day": _time_of_day()} | |
| return _merge_context(context, requested) | |
| def _build_case_outputs(result: dict) -> list[dict]: | |
| outputs = [] | |
| def _append(agent: str, details: Optional[dict]) -> None: | |
| if not isinstance(details, dict) or not details: | |
| return | |
| summary = ( | |
| details.get("summary") | |
| or details.get("response") | |
| or details.get("estimated_output") | |
| or "" | |
| ) | |
| outputs.append( | |
| { | |
| "agent": agent, | |
| "summary": str(summary), | |
| "confidence": float(details.get("confidence", 0.0) or 0.0), | |
| "details": details, | |
| } | |
| ) | |
| _append("research", result.get("research")) | |
| _append("planner", result.get("planner")) | |
| _append("verifier", result.get("verifier")) | |
| _append("synthesizer", result.get("final")) | |
| return outputs | |
| def _build_routing_path(case_payload: dict) -> str: | |
| path = ["switchboard"] | |
| if case_payload.get("simulation"): | |
| path.append("mirofish") | |
| elif case_payload.get("finance"): | |
| path.append("finance") | |
| path.extend(["research", "planner", "verifier", "synthesizer"]) | |
| return " > ".join(path) | |
| def _build_tool_results(case_payload: dict) -> list[dict]: | |
| tool_results = [] | |
| sections = { | |
| "simulation": case_payload.get("simulation"), | |
| "finance": case_payload.get("finance"), | |
| "research": case_payload.get("research"), | |
| "planner": case_payload.get("planner"), | |
| "verifier": case_payload.get("verifier"), | |
| } | |
| for name, payload in sections.items(): | |
| if not isinstance(payload, dict) or not payload: | |
| continue | |
| status = payload.get("status", "ok") | |
| if name == "verifier": | |
| status = "ok" if payload.get("passed", True) else "warning" | |
| if name == "planner" and str(payload.get("estimated_output", "")).lower().startswith("error"): | |
| status = "error" | |
| tool_results.append( | |
| { | |
| "tool": name, | |
| "status": status, | |
| "confidence": float(payload.get("confidence", 0.0) or 0.0), | |
| } | |
| ) | |
| return tool_results | |
| def _collect_case_errors(case_payload: dict) -> list[str]: | |
| errors: list[str] = [] | |
| for name in ("research", "planner", "verifier", "finance", "simulation", "final"): | |
| payload = case_payload.get(name) | |
| if not isinstance(payload, dict) or not payload: | |
| continue | |
| if payload.get("status") == "error": | |
| errors.append(f"{name}: {payload.get('reason', 'unknown error')}") | |
| if name == "planner" and str(payload.get("estimated_output", "")).lower().startswith("error"): | |
| errors.append(f"planner: {payload.get('estimated_output')}") | |
| if name == "final": | |
| for caveat in payload.get("caveats", []): | |
| if "fail" in str(caveat).lower() or "error" in str(caveat).lower(): | |
| errors.append(f"final: {caveat}") | |
| return errors | |
| def _record_observation_trace(app: FastAPI, case_payload: dict) -> dict: | |
| tracer = getattr(app.state, "tracer", None) | |
| trace_scorer = getattr(app.state, "trace_scorer", None) | |
| query_classifier = getattr(app.state, "query_classifier", None) | |
| curator = getattr(app.state, "curator", None) | |
| if tracer is None or trace_scorer is None: | |
| return {} | |
| user_input = case_payload.get("user_input", "") | |
| query_type = "unknown" | |
| detected_domain = case_payload.get("route", {}).get("domain", "general") | |
| if query_classifier and hasattr(query_classifier, "classify"): | |
| try: | |
| query_type_result, _, query_meta = query_classifier.classify(user_input) | |
| query_type = getattr(query_type_result, "value", str(query_type_result)) | |
| if detected_domain == "general" and query_meta.get("detected_domain"): | |
| detected_domain = query_meta.get("detected_domain") | |
| except Exception as e: | |
| logger.debug("Query classification failed for trace: %s", e) | |
| trace_data = { | |
| "query": user_input, | |
| "query_type": query_type, | |
| "domain": detected_domain, | |
| "routing_path": _build_routing_path(case_payload), | |
| "provider_used": os.getenv("PRIMARY_PROVIDER", "unknown"), | |
| "output": case_payload.get("final_answer", ""), | |
| "output_length": len(case_payload.get("final_answer", "")), | |
| "latency_ms": int(float(case_payload.get("elapsed_seconds", 0) or 0) * 1000), | |
| "confidence": float(case_payload.get("final", {}).get("confidence", 0.0) or 0.0), | |
| "tool_results": _build_tool_results(case_payload), | |
| "data_sources": case_payload.get("final", {}).get("data_sources", []), | |
| "errors": _collect_case_errors(case_payload), | |
| "cached": False, | |
| } | |
| scoring = trace_scorer.score(trace_data) | |
| trace_data["score"] = scoring.get("score", 0.0) | |
| trace_data["score_breakdown"] = scoring.get("breakdown", {}) | |
| trace_id = tracer.log_trace(trace_data) | |
| trace_info = { | |
| "trace_id": trace_id, | |
| "trace_score": trace_data["score"], | |
| "trace_score_breakdown": trace_data["score_breakdown"], | |
| } | |
| if curator is not None: | |
| try: | |
| trace_info["curation"] = curator.curate_trace({**trace_data, "trace_id": trace_id}) | |
| except Exception as e: | |
| logger.error("Trace curation failed: %s", e) | |
| return trace_info | |
| def _apply_post_run_learning(app: FastAPI, case_payload: dict, runtime_context: dict) -> None: | |
| from app.memory import save_case | |
| from app.services.context_engine import context_engine | |
| from app.services.memory_manager import memory_manager | |
| from app.services.self_reflection import self_reflection | |
| final = case_payload.get("final", {}) | |
| final_answer = case_payload.get("final_answer", "") | |
| elapsed = float(case_payload.get("elapsed_seconds", 0) or 0) | |
| trace_info = _record_observation_trace(app, case_payload) | |
| case_payload.update(trace_info) | |
| quality_score = max( | |
| float(final.get("confidence", 0.0) or 0.0), | |
| float(trace_info.get("trace_score", 0.0) or 0.0), | |
| ) | |
| topic = runtime_context.get("current_topic") | |
| daemon = _services.get("daemon") | |
| if daemon and getattr(daemon, "curiosity", None) and topic and topic != "general query": | |
| try: | |
| daemon.curiosity.add_interest(topic, score=min(0.08, 0.03 + quality_score * 0.05)) | |
| except Exception as e: | |
| logger.error("Curiosity interest update failed: %s", e) | |
| if topic and topic != "general query" and quality_score < 0.45: | |
| try: | |
| context_engine.add_pending_thought( | |
| f"I still feel uncertain about {topic} and should revisit it with better evidence.", | |
| priority=0.7, | |
| source="post_run_doubt", | |
| ) | |
| except Exception as e: | |
| logger.error("Pending thought update failed: %s", e) | |
| case_id = case_payload.get("case_id") | |
| if case_id: | |
| try: | |
| save_case(case_id, case_payload) | |
| except Exception as e: | |
| logger.error("Case persistence failed: %s", e) | |
| try: | |
| memory_manager.add_case( | |
| { | |
| **case_payload, | |
| "quality_score": quality_score, | |
| "domain": case_payload.get("route", {}).get("domain", "general"), | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error("Memory indexing failed: %s", e) | |
| try: | |
| context_engine.update_after_interaction(user_input=case_payload.get("user_input", ""), response=final_answer, context=runtime_context) | |
| except Exception as e: | |
| logger.error("Context update failed: %s", e) | |
| try: | |
| self_reflection.reflect_on_response( | |
| user_input=case_payload.get("user_input", ""), | |
| response=final_answer, | |
| confidence=float(final.get("confidence", 0.0) or 0.0), | |
| data_sources=final.get("data_sources", []), | |
| gaps=case_payload.get("research", {}).get("gaps", []), | |
| elapsed=elapsed, | |
| ) | |
| except Exception as e: | |
| logger.error("Self-reflection update failed: %s", e) | |
| try: | |
| from app.services.self_training import self_training_engine | |
| training_stats = self_training_engine.train_on_response( | |
| user_input=case_payload.get("user_input", ""), | |
| response=final_answer, | |
| confidence=float(final.get("confidence", 0.0) or 0.0), | |
| data_sources=final.get("data_sources", []), | |
| elapsed=elapsed, | |
| prompt_name="synthesizer", | |
| ) | |
| logger.info(f"Self-training cycle {training_stats.get('training_cycle')} complete. Prompt score: {training_stats.get('prompt_score')}") | |
| except Exception as e: | |
| logger.error("Self-training engine failed: %s", e) | |
| adaptive = getattr(app.state, "adaptive", None) | |
| if adaptive and hasattr(adaptive, "learn_from_case"): | |
| try: | |
| adaptive.learn_from_case(case_payload, elapsed) | |
| except Exception as e: | |
| logger.error("Adaptive learning failed: %s", e) | |
| learning_engine = getattr(app.state, "learning_engine", None) | |
| if learning_engine and hasattr(learning_engine, "learn_from_case"): | |
| try: | |
| learning_engine.learn_from_case(case_payload) | |
| except Exception as e: | |
| logger.error("Learning engine case update failed: %s", e) | |
| def _message_content_to_text(content) -> str: | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| parts = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| if item.get("type") == "text": | |
| parts.append(str(item.get("text", ""))) | |
| elif "text" in item: | |
| parts.append(str(item.get("text", ""))) | |
| else: | |
| parts.append(str(item)) | |
| return "\n".join(part for part in parts if part) | |
| return str(content or "") | |
| def _extract_user_input_from_messages(messages: list[dict]) -> str: | |
| if not messages: | |
| return "" | |
| last_user = "" | |
| for message in messages[-12:]: | |
| role = str(message.get("role", "user")) | |
| content = _message_content_to_text(message.get("content", "")) | |
| if not content.strip(): | |
| continue | |
| if role == "user": | |
| last_user = content.strip() | |
| return last_user or _message_content_to_text(messages[-1].get("content", "")).strip() | |
| def _render_message_history(messages: list[dict]) -> list[dict]: | |
| rendered = [] | |
| for message in messages[-12:]: | |
| content = _message_content_to_text(message.get("content", "")).strip() | |
| if not content: | |
| continue | |
| rendered.append( | |
| { | |
| "role": str(message.get("role", "user")), | |
| "content": content, | |
| } | |
| ) | |
| return rendered | |
| def _approx_tokens(text: str) -> int: | |
| return max(1, len((text or "").strip()) // 4) | |
| def _provider_context_from_body(body: dict) -> dict: | |
| messages = body.get("messages") or [] | |
| system_messages = [ | |
| _message_content_to_text(message.get("content", "")) | |
| for message in messages | |
| if message.get("role") == "system" | |
| ] | |
| return { | |
| "provider_facade": { | |
| "model": body.get("model", "janus-chat"), | |
| "temperature": body.get("temperature", 0.7), | |
| "max_tokens": body.get("max_tokens") or body.get("max_completion_tokens"), | |
| "system_messages": [msg for msg in system_messages if msg], | |
| "conversation": _render_message_history(messages), | |
| "raw_message_count": len(messages), | |
| } | |
| } | |
| async def _execute_case_request(app: FastAPI, body: dict) -> dict: | |
| from app.graph import run_case | |
| from app.services.reflex_layer import reflex_layer | |
| user_input = (body.get("user_input") or body.get("query") or "").strip() | |
| if not user_input: | |
| raise HTTPException(status_code=400, detail="Missing user_input") | |
| started_at = time.perf_counter() | |
| runtime_context = _build_runtime_context(app, user_input, body.get("context")) | |
| reflex_result = reflex_layer.respond(user_input, runtime_context) | |
| if reflex_result: | |
| reflex_answer = reflex_result.get("final_answer", "") | |
| try: | |
| from app.services.context_engine import context_engine | |
| context_engine.update_after_interaction( | |
| user_input=user_input, | |
| response=reflex_answer, | |
| context=runtime_context, | |
| ) | |
| except Exception as e: | |
| logger.error("Reflex context update failed: %s", e) | |
| return { | |
| "case_id": reflex_result.get("case_id"), | |
| "user_input": user_input, | |
| "route": _normalize_route(reflex_result.get("route")), | |
| "research": reflex_result.get("research", {}), | |
| "planner": reflex_result.get("planner", {}), | |
| "verifier": reflex_result.get("verifier", {}), | |
| "simulation": reflex_result.get("simulation"), | |
| "finance": reflex_result.get("finance"), | |
| "final": { | |
| **reflex_result.get("final", {}), | |
| "response": reflex_answer, | |
| }, | |
| "final_answer": reflex_answer, | |
| "elapsed_seconds": round(time.perf_counter() - started_at, 1), | |
| } | |
| result = await run_case(user_input, runtime_context) | |
| final = result.get("final", {}) | |
| research_data = result.get("research", {}) | |
| response = { | |
| "case_id": result.get("case_id"), | |
| "user_input": user_input, | |
| "route": _normalize_route(result.get("route")), | |
| "research": research_data, | |
| "planner": result.get("planner", {}), | |
| "verifier": result.get("verifier", {}), | |
| "simulation": result.get("simulation"), | |
| "finance": result.get("finance"), | |
| "final": final, | |
| "final_answer": final.get("response") or final.get("summary") or "", | |
| "model_enhanced": research_data.get("model_enhanced", False), | |
| "model_insights": research_data.get("model_insights", []), | |
| "elapsed_seconds": round(time.perf_counter() - started_at, 1), | |
| } | |
| response["outputs"] = _build_case_outputs(response) | |
| _apply_post_run_learning(app, response, runtime_context) | |
| return response | |
| def _check_provider_auth(request: Request) -> None: | |
| expected = os.getenv("JANUS_API_KEY", "").strip() | |
| if not expected: | |
| return | |
| auth = request.headers.get("authorization", "") | |
| if not auth.startswith("Bearer "): | |
| raise HTTPException(status_code=401, detail="Missing Bearer token") | |
| provided = auth.split(" ", 1)[1].strip() | |
| if provided != expected: | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| def _extract_ticker_symbol(text: str) -> Optional[str]: | |
| import re | |
| company_map = { | |
| "nvidia": "NVDA", | |
| "apple": "AAPL", | |
| "microsoft": "MSFT", | |
| "amazon": "AMZN", | |
| "alphabet": "GOOGL", | |
| "google": "GOOGL", | |
| "meta": "META", | |
| "facebook": "META", | |
| "tesla": "TSLA", | |
| "tsmc": "TSM", | |
| "asml": "ASML", | |
| "jpmorgan": "JPM", | |
| "reliance": "RELIANCE", | |
| "infosys": "INFY", | |
| } | |
| lowered = (text or "").lower() | |
| for company, ticker in company_map.items(): | |
| if company in lowered: | |
| return ticker | |
| matches = re.findall(r"\b[A-Z]{2,5}\b", text or "") | |
| if matches: | |
| return matches[0] | |
| return None | |
| def _select_provider_tool_call( | |
| user_input: str, route: dict, tools: list[dict], tool_choice | |
| ) -> Optional[dict]: | |
| if not tools: | |
| return None | |
| if tool_choice == "none": | |
| return None | |
| functions = [ | |
| tool for tool in tools if isinstance(tool, dict) and tool.get("type") == "function" | |
| ] | |
| if not functions: | |
| return None | |
| explicit_name = None | |
| if isinstance(tool_choice, dict): | |
| explicit_name = ( | |
| tool_choice.get("function", {}) or {} | |
| ).get("name") | |
| if explicit_name: | |
| chosen = next( | |
| (tool for tool in functions if tool.get("function", {}).get("name") == explicit_name), | |
| None, | |
| ) | |
| if chosen is None: | |
| return None | |
| else: | |
| query_words = { | |
| token | |
| for token in __import__("re").findall(r"[a-z0-9_]+", (user_input or "").lower()) | |
| if len(token) >= 3 | |
| } | |
| scored = [] | |
| for tool in functions: | |
| fn = tool.get("function", {}) | |
| haystack = ( | |
| f"{fn.get('name', '')} {fn.get('description', '')}" | |
| ).lower() | |
| overlap = sum(1 for word in query_words if word in haystack) | |
| if route.get("domain") and route.get("domain", "").lower() in haystack: | |
| overlap += 2 | |
| if route.get("requires_finance_data") and any( | |
| hint in haystack for hint in ["finance", "market", "stock", "ticker"] | |
| ): | |
| overlap += 2 | |
| if route.get("requires_simulation") and any( | |
| hint in haystack for hint in ["simulate", "forecast", "scenario"] | |
| ): | |
| overlap += 2 | |
| scored.append((overlap, tool)) | |
| scored.sort(key=lambda item: item[0], reverse=True) | |
| best_score, chosen = scored[0] | |
| if tool_choice == "auto" and best_score <= 0: | |
| return None | |
| fn = chosen.get("function", {}) | |
| function_name = fn.get("name", "janus_tool") | |
| properties = ((fn.get("parameters", {}) or {}).get("properties", {}) or {}) | |
| arguments = {} | |
| ticker = _extract_ticker_symbol(user_input) | |
| for key in properties.keys(): | |
| lowered = key.lower() | |
| if any(token in lowered for token in ["query", "question", "prompt", "input", "task", "request"]): | |
| arguments[key] = user_input | |
| elif any(token in lowered for token in ["domain", "topic"]): | |
| arguments[key] = route.get("domain", "general") | |
| elif any(token in lowered for token in ["intent", "goal", "reason"]): | |
| arguments[key] = route.get("intent", user_input) | |
| elif any(token in lowered for token in ["ticker", "symbol"]): | |
| arguments[key] = ticker or user_input | |
| elif "company" in lowered: | |
| arguments[key] = user_input | |
| if not arguments and properties: | |
| first_key = next(iter(properties.keys())) | |
| arguments[first_key] = user_input | |
| return { | |
| "id": f"call_{uuid.uuid4().hex}", | |
| "type": "function", | |
| "function": { | |
| "name": function_name, | |
| "arguments": json.dumps(arguments, ensure_ascii=False), | |
| }, | |
| } | |
| def _split_stream_text(text: str, target_size: int = 80) -> list[str]: | |
| words = (text or "").split() | |
| if not words: | |
| return [""] | |
| chunks = [] | |
| current = [] | |
| current_len = 0 | |
| for word in words: | |
| if current and current_len + len(word) + 1 > target_size: | |
| chunks.append(" ".join(current)) | |
| current = [word] | |
| current_len = len(word) | |
| else: | |
| current.append(word) | |
| current_len += len(word) + (1 if current_len else 0) | |
| if current: | |
| chunks.append(" ".join(current)) | |
| return chunks | |
| def _sse_event(payload: dict, event: Optional[str] = None) -> str: | |
| prefix = f"event: {event}\n" if event else "" | |
| return prefix + f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" | |
| async def _execute_provider_tool_call( | |
| app: FastAPI, tool_call: dict, user_input: str, route: dict | |
| ) -> dict: | |
| function = tool_call.get("function", {}) or {} | |
| name = function.get("name", "") | |
| try: | |
| arguments = json.loads(function.get("arguments", "{}")) | |
| except Exception: | |
| arguments = {} | |
| query = ( | |
| arguments.get("query") | |
| or arguments.get("question") | |
| or arguments.get("input") | |
| or arguments.get("task") | |
| or user_input | |
| ) | |
| def _run_sync() -> dict: | |
| if name in {"get_stock_quote", "get_market_quote", "ticker_intelligence"}: | |
| from app.domain_packs.finance.market_data import get_company_overview, get_quote, search_symbol | |
| symbol = ( | |
| arguments.get("ticker") | |
| or arguments.get("symbol") | |
| or _extract_ticker_symbol(query) | |
| ) | |
| if not symbol and query: | |
| results = search_symbol(query) | |
| symbol = (results[0] or {}).get("1. symbol") if results else None | |
| symbol = (symbol or "").upper() | |
| quote = get_quote(symbol) if symbol else {} | |
| overview = get_company_overview(symbol) if symbol else {} | |
| return { | |
| "tool": name, | |
| "symbol": symbol, | |
| "quote": quote, | |
| "overview": { | |
| "name": overview.get("Name"), | |
| "sector": overview.get("Sector"), | |
| "industry": overview.get("Industry"), | |
| "market_cap": overview.get("MarketCapitalization"), | |
| "pe_ratio": overview.get("PERatio"), | |
| "analyst_target": overview.get("AnalystTargetPrice"), | |
| }, | |
| } | |
| if name in {"search_market_symbols", "search_symbol"}: | |
| from app.domain_packs.finance.market_data import search_symbol | |
| results = search_symbol(query) | |
| return { | |
| "tool": name, | |
| "query": query, | |
| "results": [ | |
| { | |
| "symbol": item.get("1. symbol"), | |
| "name": item.get("2. name"), | |
| "region": item.get("4. region"), | |
| "currency": item.get("8. currency"), | |
| } | |
| for item in results[:8] | |
| ], | |
| } | |
| if name in {"search_memory", "memory_search", "find_similar_cases"}: | |
| memory_manager = getattr(app.state, "memory_manager", None) | |
| limit = int(arguments.get("limit", 5) or 5) | |
| results = ( | |
| memory_manager.find_similar(query, top_k=limit) | |
| if memory_manager and hasattr(memory_manager, "find_similar") | |
| else [] | |
| ) | |
| return {"tool": name, "query": query, "results": results} | |
| if name in {"get_company_news", "search_finance_news", "get_top_headlines"}: | |
| from app.domain_packs.finance.news import ( | |
| get_company_news, | |
| get_top_headlines, | |
| search_news, | |
| ) | |
| limit = int(arguments.get("limit", 5) or 5) | |
| symbol = arguments.get("ticker") or arguments.get("symbol") or _extract_ticker_symbol(query) | |
| company = arguments.get("company") or query | |
| if name == "get_top_headlines": | |
| category = arguments.get("category") or "business" | |
| articles = get_top_headlines(category=category, page_size=limit) | |
| return { | |
| "tool": name, | |
| "category": category, | |
| "articles": articles[:limit], | |
| } | |
| if name == "get_company_news" and symbol: | |
| articles = get_company_news(company, days_back=7, symbol=symbol) | |
| else: | |
| articles = search_news(query, page_size=limit) | |
| return { | |
| "tool": name, | |
| "query": query, | |
| "symbol": symbol, | |
| "articles": articles[:limit], | |
| } | |
| if name in {"deep_web_research", "public_web_research", "web_research"}: | |
| from app.services.external_sources import deep_web_research_bundle | |
| limit = int(arguments.get("limit", 4) or 4) | |
| follow_links = int(arguments.get("follow_links", 1) or 1) | |
| bundle = deep_web_research_bundle( | |
| query, max_results=limit, follow_links=follow_links | |
| ) | |
| results = bundle.get("results", []) | |
| synthesis = bundle.get("synthesis", {}) | |
| return { | |
| "tool": name, | |
| "query": query, | |
| "summary": synthesis.get("summary", ""), | |
| "key_points": synthesis.get("key_points", []), | |
| "avg_credibility": synthesis.get("avg_credibility", 0.0), | |
| "top_sources": synthesis.get("top_sources", []), | |
| "query_variants": bundle.get("query_variants", []), | |
| "results": results[:limit], | |
| } | |
| if name in {"market_web_brief", "company_web_brief", "market_research_brief"}: | |
| from app.domain_packs.finance.market_data import get_company_overview, get_quote | |
| from app.domain_packs.finance.news import get_company_news | |
| from app.services.external_sources import deep_web_research_bundle | |
| symbol = ( | |
| arguments.get("ticker") | |
| or arguments.get("symbol") | |
| or _extract_ticker_symbol(query) | |
| ) | |
| symbol = (symbol or "").upper() | |
| company = arguments.get("company") or query | |
| quote = get_quote(symbol) if symbol else {} | |
| overview = get_company_overview(symbol) if symbol else {} | |
| news = get_company_news(company, days_back=7, symbol=symbol)[:5] if symbol else [] | |
| bundle = deep_web_research_bundle(query, max_results=4, follow_links=1) | |
| web_results = bundle.get("results", []) | |
| synthesis = bundle.get("synthesis", {}) | |
| top_sources = [ | |
| { | |
| "title": item.get("title"), | |
| "url": item.get("url"), | |
| "credibility_score": item.get("credibility_score", 0.0), | |
| "credibility_reason": item.get("credibility_reason", "unknown"), | |
| } | |
| for item in web_results[:4] | |
| ] | |
| return { | |
| "tool": name, | |
| "symbol": symbol, | |
| "company": company, | |
| "quote": quote, | |
| "overview": { | |
| "name": overview.get("Name"), | |
| "sector": overview.get("Sector"), | |
| "industry": overview.get("Industry"), | |
| "market_cap": overview.get("MarketCapitalization"), | |
| "pe_ratio": overview.get("PERatio"), | |
| "analyst_target": overview.get("AnalystTargetPrice"), | |
| }, | |
| "news": news, | |
| "web_results": web_results, | |
| "summary": synthesis.get("summary", ""), | |
| "key_points": synthesis.get("key_points", []), | |
| "avg_credibility": synthesis.get("avg_credibility", 0.0), | |
| "query_variants": bundle.get("query_variants", []), | |
| "top_sources": top_sources, | |
| } | |
| if name in {"analyze_finance_text", "finance_text_analysis"}: | |
| from app.domain_packs.finance.entity_resolver import extract_entities | |
| from app.domain_packs.finance.event_analyzer import analyze_event_impact, detect_event_type | |
| from app.domain_packs.finance.rumor_detector import detect_rumor_indicators | |
| from app.domain_packs.finance.scam_detector import detect_scam_indicators | |
| from app.domain_packs.finance.source_checker import aggregate_source_scores | |
| from app.domain_packs.finance.stance_detector import ( | |
| analyze_price_action_language, | |
| detect_stance, | |
| ) | |
| from app.domain_packs.finance.ticker_resolver import extract_tickers | |
| text = arguments.get("text") or query | |
| sources = arguments.get("sources") or [] | |
| tickers = extract_tickers(text) | |
| entities = extract_entities(text) | |
| stance = detect_stance(text) | |
| price_action = analyze_price_action_language(text) | |
| scam = detect_scam_indicators(text) | |
| rumor = detect_rumor_indicators(text) | |
| events = detect_event_type(text) | |
| event_impact = analyze_event_impact(text, events) | |
| source_assessment = aggregate_source_scores(sources) if sources else None | |
| return { | |
| "tool": name, | |
| "tickers": tickers, | |
| "entities": [e for e in entities if e.get("confidence", 0) >= 0.7], | |
| "stance": stance, | |
| "price_action": price_action, | |
| "scam_detection": scam, | |
| "rumor_detection": rumor, | |
| "event_impact": event_impact, | |
| "source_assessment": source_assessment, | |
| } | |
| if name in {"search_knowledge", "knowledge_search"}: | |
| from app.memory import knowledge_store | |
| limit = int(arguments.get("limit", 5) or 5) | |
| domain = arguments.get("domain") or route.get("domain", "general") | |
| results = knowledge_store.search(query, domain=domain, top_k=limit) | |
| return {"tool": name, "query": query, "domain": domain, "results": results} | |
| if name in {"classify_domain", "domain_classify"}: | |
| domain_classifier = getattr(app.state, "domain_classifier", None) | |
| result = domain_classifier.classify(query) if domain_classifier else None | |
| top_domains = ( | |
| domain_classifier.get_top_domains(query, top_n=3) | |
| if domain_classifier and hasattr(domain_classifier, "get_top_domains") | |
| else [] | |
| ) | |
| return { | |
| "tool": name, | |
| "query": query, | |
| "domain": result.domain.value if result else "general", | |
| "confidence": result.confidence if result else 0.5, | |
| "keywords_found": result.keywords_found if result else [], | |
| "top_domains": [ | |
| {"domain": domain.value, "confidence": confidence} | |
| for domain, confidence in top_domains | |
| ], | |
| } | |
| if name in {"run_simulation", "simulate_scenario"}: | |
| from app.services.simulation_engine import simulation_engine | |
| simulation = simulation_engine.run_simulation( | |
| query, | |
| context={ | |
| "provider_tool": True, | |
| "route": route, | |
| "tool_call_id": tool_call.get("id"), | |
| }, | |
| ) | |
| synthesis = simulation.get("synthesis", {}) | |
| return { | |
| "tool": name, | |
| "simulation_id": simulation.get("simulation_id"), | |
| "most_likely": synthesis.get("most_likely"), | |
| "scenarios": synthesis.get("scenarios", [])[:3], | |
| "confidence": synthesis.get("confidence", 0.0), | |
| } | |
| if name in {"chat_with_simulation", "simulation_followup"}: | |
| from app.services.simulation_engine import simulation_engine | |
| sim_id = arguments.get("simulation_id") or arguments.get("sim_id") | |
| message = arguments.get("message") or arguments.get("question") or query | |
| response = simulation_engine.chat_with_simulation(sim_id, message) if sim_id else {"error": "Missing simulation_id"} | |
| return {"tool": name, **response} | |
| if name in {"get_watchlist_status", "watchlist_status"}: | |
| daemon = _services.get("daemon") | |
| results = ( | |
| daemon.market_watcher.get_watchlist_status() | |
| if daemon and hasattr(daemon, "market_watcher") | |
| else [] | |
| ) | |
| return {"tool": name, "results": results} | |
| if name in {"get_domain_report", "domain_report"}: | |
| domain_classifier = getattr(app.state, "domain_classifier", None) | |
| memory_manager = getattr(app.state, "memory_manager", None) | |
| classification = domain_classifier.classify(query) if domain_classifier else None | |
| domain = arguments.get("domain") or (classification.domain.value if classification else route.get("domain", "general")) | |
| domain_stats = ( | |
| memory_manager.get_domain_stats().get(domain, {}) | |
| if memory_manager and hasattr(memory_manager, "get_domain_stats") | |
| else {} | |
| ) | |
| frequent_patterns = ( | |
| [p for p in memory_manager.get_frequent_patterns(min_freq=2) if p.get("term")] | |
| if memory_manager and hasattr(memory_manager, "get_frequent_patterns") | |
| else [] | |
| ) | |
| return { | |
| "tool": name, | |
| "domain": domain, | |
| "classification": { | |
| "domain": classification.domain.value if classification else domain, | |
| "confidence": classification.confidence if classification else 0.5, | |
| "keywords_found": classification.keywords_found if classification else [], | |
| }, | |
| "domain_stats": domain_stats, | |
| "frequent_patterns": frequent_patterns[:10], | |
| } | |
| raise ValueError(f"Unsupported Janus tool: {name}") | |
| return await asyncio.to_thread(_run_sync) | |
| def _summarize_provider_tool_execution(execution: dict, user_input: str) -> str: | |
| """Fallback summarizer if the main reasoning model fails.""" | |
| tool = execution.get("tool") | |
| # ββ ZeroTrust Guardian & MMSA Fusion ββββββββββββββββββββββββ | |
| if execution.get("guardian_score") is not None or "dissonance_score" in execution: | |
| risk = execution.get("guardian_score") or execution.get("deception_probability", 0) * 100 | |
| action = execution.get("safe_action", "Proceed with extreme caution.") | |
| return ( | |
| f"Janus completed a Multimodal Dissonance scan. " | |
| f"Risk Index: {risk:.1f}%. " | |
| f"Forensic Conclusion: {execution.get('reason', 'Evidence synthesis complete.')} " | |
| f"Recommended Safe Action: {action}" | |
| ) | |
| if tool in {"get_stock_quote", "get_market_quote", "ticker_intelligence"}: | |
| symbol = execution.get("symbol") or "the requested company" | |
| quote = execution.get("quote", {}) | |
| overview = execution.get("overview", {}) | |
| price = quote.get("05. price") | |
| change_pct = quote.get("10. change percent") | |
| market_cap = overview.get("market_cap") | |
| pe_ratio = overview.get("pe_ratio") | |
| target = overview.get("analyst_target") | |
| parts = [f"Janus fetched market data for {symbol}."] | |
| if price is not None: | |
| parts.append(f"Price: {price}.") | |
| if change_pct not in (None, ""): | |
| parts.append(f"Change percent: {change_pct}.") | |
| if market_cap: | |
| parts.append(f"Market cap: {market_cap}.") | |
| if pe_ratio: | |
| parts.append(f"PE ratio: {pe_ratio}.") | |
| if target: | |
| parts.append(f"Analyst target: {target}.") | |
| parts.append("Use this as grounding, not as standalone investment advice.") | |
| return " ".join(parts) | |
| if tool in {"search_market_symbols", "search_symbol"}: | |
| results = execution.get("results", []) | |
| if not results: | |
| return "Janus could not find a market symbol for that query." | |
| top = results[0] | |
| return ( | |
| f"Janus found {len(results)} matching symbols. " | |
| f"Top match: {top.get('symbol')} for {top.get('name')} in {top.get('region')}." | |
| ) | |
| if tool in {"get_company_news", "search_finance_news", "get_top_headlines"}: | |
| articles = execution.get("articles", []) | |
| if not articles: | |
| return "Janus found no relevant finance news articles for that request." | |
| titles = [article.get("title", "") for article in articles[:3] if article.get("title")] | |
| topic = execution.get("symbol") or execution.get("query") or execution.get("category") or "the request" | |
| return ( | |
| f"Janus gathered {len(articles)} relevant finance news items for {topic}. " | |
| f"Top headlines: {'; '.join(titles)}." | |
| ) | |
| if tool in {"news_market_web_brief", "research_sweep", "intel_sweep"}: | |
| top = (execution.get("top_sources") or [{}])[0] | |
| points = execution.get("key_points") or [] | |
| point_text = " ".join(p.get("point", "") for p in points[:2]) | |
| return ( | |
| f"Janus synthesized a multimodal brief. " | |
| f"Top Source: {top.get('domain', 'primary index')}. " | |
| f"Key Findings: {point_text[:320] if point_text else 'Synthesis awaiting deeper model reasoning.'}" | |
| ) | |
| if tool in {"market_web_brief", "company_web_brief", "market_research_brief"}: | |
| symbol = execution.get("symbol") or execution.get("company") or "the index" | |
| parts = [f"Janus generated a market intelligence brief for {symbol}."] | |
| if execution.get("avg_credibility"): | |
| parts.append(f"Source credibility: {execution.get('avg_credibility'):.2f}.") | |
| return " ".join(parts) | |
| if tool in {"analyze_finance_text", "finance_text_analysis"}: | |
| stance = (execution.get("stance") or {}).get("stance", "neutral") | |
| scam_score = (execution.get("scam_detection") or {}).get("scam_score", 0) | |
| rumor_score = (execution.get("rumor_detection") or {}).get("rumor_score", 0) | |
| events = (execution.get("event_impact") or {}).get("summary") or "No major event impact detected." | |
| return ( | |
| f"Janus analyzed the finance text. Stance: {stance}. " | |
| f"Scam score: {scam_score}. Rumor score: {rumor_score}. {events}" | |
| ) | |
| if tool in {"search_memory", "memory_search", "find_similar_cases"}: | |
| results = execution.get("results", []) | |
| if not results: | |
| return "Janus found no closely related prior cases in memory." | |
| top = results[0] | |
| return ( | |
| f"Janus found {len(results)} similar past cases. " | |
| f"Closest match: {top.get('query')} with similarity {top.get('score')}." | |
| ) | |
| if tool in {"search_knowledge", "knowledge_search"}: | |
| results = execution.get("results", []) | |
| if not results: | |
| return "Janus found no matching knowledge entries for that query." | |
| top = results[0] | |
| headline = top.get("title") or top.get("topic") or "knowledge entry" | |
| return ( | |
| f"Janus found {len(results)} matching knowledge entries. " | |
| f"Top result: {headline}." | |
| ) | |
| if tool in {"classify_domain", "domain_classify"}: | |
| return ( | |
| f"Janus classified this query as {execution.get('domain', 'general')} " | |
| f"with confidence {execution.get('confidence', 0.0)}." | |
| ) | |
| if tool in {"run_simulation", "simulate_scenario"}: | |
| scenarios = execution.get("scenarios", []) | |
| return ( | |
| f"Janus ran simulation {execution.get('simulation_id')}. " | |
| f"Most likely outcome: {execution.get('most_likely', 'unknown')}. " | |
| f"Generated {len(scenarios)} scenarios." | |
| ) | |
| if tool in {"chat_with_simulation", "simulation_followup"}: | |
| response = execution.get("response") or execution.get("error") or "No simulation follow-up response." | |
| return f"Janus consulted the saved simulation. {response}" | |
| if tool in {"get_watchlist_status", "watchlist_status"}: | |
| results = execution.get("results", []) | |
| return f"Janus returned status for {len(results)} watchlist instruments." | |
| if tool in {"get_domain_report", "domain_report"}: | |
| domain = execution.get("domain", "general") | |
| count = (execution.get("domain_stats") or {}).get("count", 0) | |
| patterns = execution.get("frequent_patterns", []) | |
| return ( | |
| f"Janus built a domain report for {domain}. " | |
| f"Known cases in that domain: {count}. " | |
| f"Tracked patterns: {len(patterns)}." | |
| ) | |
| return f"Janus successfully executed the {tool} routine for: {user_input}." | |
| def _reason_over_tool_execution(user_input: str, execution: dict) -> str: | |
| """Expert reasoning layer over tool outputs.""" | |
| fallback = _summarize_provider_tool_execution(execution, user_input) | |
| # Ensure fallback is never empty even if the logic above fails | |
| if not fallback or len(fallback.strip()) < 5: | |
| fallback = f"Janus has processed the following signal: {user_input}. Analysis complete." | |
| try: | |
| from app.agents._model import call_model | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are Janus, the Multimodal Intelligence Sentinel. Summarize the tool execution results naturally." | |
| }, | |
| { | |
| "role": "user", | |
| "content": ( | |
| f"User request:\n{user_input}\n\n" | |
| f"Executed tool result:\n{json.dumps(execution, ensure_ascii=False, indent=2)}\n\n" | |
| "Provide a high-fidelity final answer." | |
| ), | |
| }, | |
| ] | |
| result = call_model(messages) | |
| cleaned = (result or "").strip() | |
| # Double-check cleaned to avoid protocol errors | |
| if not cleaned or len(cleaned) < 10: | |
| return fallback | |
| return cleaned | |
| except Exception: | |
| return fallback | |
| def _stable_hash_int(text: str) -> int: | |
| import hashlib | |
| digest = hashlib.sha256(text.encode("utf-8")).digest() | |
| return int.from_bytes(digest[:8], "big", signed=False) | |
| def _embed_text(text: str, dimensions: int = 256) -> list[float]: | |
| import re | |
| dim = max(16, min(int(dimensions or 256), 2048)) | |
| vector = [0.0] * dim | |
| tokens = re.findall(r"[a-z0-9_]+", (text or "").lower()) | |
| if not tokens: | |
| return vector | |
| for token in tokens: | |
| slot = _stable_hash_int(token) % dim | |
| weight = 1.0 + min(len(token), 12) / 12.0 | |
| vector[slot] += weight | |
| norm = math.sqrt(sum(value * value for value in vector)) or 1.0 | |
| return [round(value / norm, 6) for value in vector] | |
| def _build_chat_completion_response( | |
| model: str, case_response: dict, tool_call: Optional[dict] = None | |
| ) -> dict: | |
| content = case_response.get("final_answer", "") | |
| prompt_tokens = _approx_tokens(case_response.get("user_input", "")) | |
| completion_tokens = _approx_tokens(content) | |
| if tool_call: | |
| assistant_message = { | |
| "role": "assistant", | |
| "content": None, | |
| "tool_calls": [tool_call], | |
| } | |
| finish_reason = "tool_calls" | |
| else: | |
| assistant_message = { | |
| "role": "assistant", | |
| "content": content, | |
| } | |
| finish_reason = "stop" | |
| return { | |
| "id": f"chatcmpl-{uuid.uuid4().hex}", | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": assistant_message, | |
| "finish_reason": finish_reason, | |
| } | |
| ], | |
| "usage": { | |
| "prompt_tokens": prompt_tokens, | |
| "completion_tokens": completion_tokens, | |
| "total_tokens": prompt_tokens + completion_tokens, | |
| }, | |
| "janus": { | |
| "case_id": case_response.get("case_id"), | |
| "route": case_response.get("route", {}), | |
| "trace_id": case_response.get("trace_id"), | |
| "trace_score": case_response.get("trace_score"), | |
| }, | |
| } | |
| def _build_responses_api_response( | |
| model: str, case_response: dict, tool_call: Optional[dict] = None | |
| ) -> dict: | |
| content = case_response.get("final_answer", "") | |
| prompt_tokens = _approx_tokens(case_response.get("user_input", "")) | |
| completion_tokens = _approx_tokens(content) | |
| if tool_call: | |
| output = [ | |
| { | |
| "id": f"fc_{uuid.uuid4().hex}", | |
| "type": "function_call", | |
| "call_id": tool_call.get("id"), | |
| "name": tool_call.get("function", {}).get("name"), | |
| "arguments": tool_call.get("function", {}).get("arguments", "{}"), | |
| } | |
| ] | |
| else: | |
| output = [ | |
| { | |
| "id": f"msg_{uuid.uuid4().hex}", | |
| "type": "message", | |
| "role": "assistant", | |
| "content": [{"type": "output_text", "text": content}], | |
| } | |
| ] | |
| return { | |
| "id": f"resp_{uuid.uuid4().hex}", | |
| "object": "response", | |
| "created_at": int(time.time()), | |
| "model": model, | |
| "output": output, | |
| "usage": { | |
| "input_tokens": prompt_tokens, | |
| "output_tokens": completion_tokens, | |
| "total_tokens": prompt_tokens + completion_tokens, | |
| }, | |
| "janus": { | |
| "case_id": case_response.get("case_id"), | |
| "route": case_response.get("route", {}), | |
| "trace_id": case_response.get("trace_id"), | |
| }, | |
| } | |
| async def _stream_chat_completion_response( | |
| model: str, case_response: dict, tool_call: Optional[dict] = None | |
| ): | |
| stream_id = f"chatcmpl-{uuid.uuid4().hex}" | |
| created = int(time.time()) | |
| yield _sse_event( | |
| { | |
| "id": stream_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}], | |
| } | |
| ) | |
| if tool_call: | |
| yield _sse_event( | |
| { | |
| "id": stream_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"tool_calls": [{**tool_call, "index": 0}]}, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| ) | |
| yield _sse_event( | |
| { | |
| "id": stream_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "tool_calls"}], | |
| } | |
| ) | |
| yield "data: [DONE]\n\n" | |
| return | |
| for chunk in _split_stream_text(case_response.get("final_answer", "")): | |
| yield _sse_event( | |
| { | |
| "id": stream_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [ | |
| {"index": 0, "delta": {"content": chunk}, "finish_reason": None} | |
| ], | |
| } | |
| ) | |
| yield _sse_event( | |
| { | |
| "id": stream_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], | |
| } | |
| ) | |
| yield "data: [DONE]\n\n" | |
| async def _stream_responses_api_response( | |
| model: str, case_response: dict, tool_call: Optional[dict] = None | |
| ): | |
| response_id = f"resp_{uuid.uuid4().hex}" | |
| created = int(time.time()) | |
| yield _sse_event( | |
| {"id": response_id, "object": "response", "created_at": created, "model": model}, | |
| event="response.created", | |
| ) | |
| if tool_call: | |
| yield _sse_event( | |
| { | |
| "response_id": response_id, | |
| "item": { | |
| "id": f"fc_{uuid.uuid4().hex}", | |
| "type": "function_call", | |
| "call_id": tool_call.get("id"), | |
| "name": tool_call.get("function", {}).get("name"), | |
| "arguments": tool_call.get("function", {}).get("arguments", "{}"), | |
| }, | |
| }, | |
| event="response.output_item.added", | |
| ) | |
| else: | |
| for chunk in _split_stream_text(case_response.get("final_answer", "")): | |
| yield _sse_event( | |
| {"response_id": response_id, "delta": chunk}, | |
| event="response.output_text.delta", | |
| ) | |
| yield _sse_event( | |
| { | |
| "id": response_id, | |
| "object": "response", | |
| "created_at": created, | |
| "model": model, | |
| "status": "completed", | |
| }, | |
| event="response.completed", | |
| ) | |
| def _frontend_server_path() -> Optional[str]: | |
| import pathlib | |
| frontend_dir = os.getenv("NEXT_STANDALONE_DIR", "").strip() | |
| if not frontend_dir: | |
| return None | |
| server_js = pathlib.Path(frontend_dir) / "server.js" | |
| if not server_js.exists(): | |
| logger.warning("Bundled frontend missing: %s", server_js) | |
| return None | |
| return str(server_js) | |
| async def _wait_for_frontend(port: str, attempts: int = 40, delay: float = 0.5) -> bool: | |
| import httpx | |
| url = f"http://127.0.0.1:{port}/" | |
| async with httpx.AsyncClient(timeout=2.0) as client: | |
| for _ in range(attempts): | |
| try: | |
| response = await client.get(url) | |
| if response.status_code < 500: | |
| return True | |
| except Exception: | |
| pass | |
| await asyncio.sleep(delay) | |
| return False | |
| async def _start_frontend_server(): | |
| server_js = _frontend_server_path() | |
| if not server_js or _services.get("frontend_process"): | |
| return | |
| env = os.environ.copy() | |
| port = os.getenv("NEXT_INTERNAL_PORT", "3000") | |
| env["PORT"] = port | |
| env["HOSTNAME"] = "127.0.0.1" | |
| env.setdefault("NODE_ENV", "production") | |
| process = subprocess.Popen( | |
| [os.getenv("NODE_BIN", "node"), server_js], | |
| cwd=os.path.dirname(server_js), | |
| env=env, | |
| ) | |
| _services["frontend_process"] = process | |
| if await _wait_for_frontend(port): | |
| logger.info("Bundled frontend started on internal port %s", port) | |
| else: | |
| logger.warning("Bundled frontend did not become ready on port %s", port) | |
| async def _proxy_frontend_request(request, path: str = ""): | |
| import httpx | |
| from fastapi.responses import JSONResponse, Response | |
| if _frontend_server_path() is None: | |
| return JSONResponse( | |
| status_code=404, content={"detail": "Frontend not configured"} | |
| ) | |
| target = f"http://127.0.0.1:{os.getenv('NEXT_INTERNAL_PORT', '3000')}/" | |
| if path: | |
| target += path | |
| if request.url.query: | |
| target += f"?{request.url.query}" | |
| filtered_request_headers = { | |
| key: value | |
| for key, value in request.headers.items() | |
| if key.lower() not in {"host", *_HOP_BY_HOP_HEADERS} | |
| } | |
| body = await request.body() | |
| try: | |
| async with httpx.AsyncClient(follow_redirects=False, timeout=60.0) as client: | |
| proxied = await client.request( | |
| request.method, | |
| target, | |
| content=body, | |
| headers=filtered_request_headers, | |
| ) | |
| except httpx.HTTPError as e: | |
| logger.error("Frontend proxy failed: %s", e) | |
| return JSONResponse( | |
| status_code=502, content={"detail": "Bundled frontend unavailable"} | |
| ) | |
| response_headers = { | |
| key: value | |
| for key, value in proxied.headers.items() | |
| if key.lower() not in _HOP_BY_HOP_HEADERS | |
| } | |
| return Response( | |
| content=proxied.content, | |
| status_code=proxied.status_code, | |
| headers=response_headers, | |
| media_type=proxied.headers.get("content-type"), | |
| ) | |
| def create_app() -> FastAPI: | |
| app = FastAPI( | |
| title="Janus", | |
| description="Cognitive Intelligence Interface", | |
| version="1.0.0", | |
| lifespan=lifespan, | |
| docs_url="/docs", | |
| redoc_url="/redoc", | |
| ) | |
| # ββ CORS β same-origin by default, configurable for external UIs βββββ | |
| raw_origins = os.getenv("ALLOWED_ORIGINS", "") | |
| allowed_origins = [o.strip() for o in raw_origins.split(",") if o.strip()] | |
| # Always include HF Space patterns + localhost for dev | |
| hf_space_id = os.getenv("SPACE_ID", "") | |
| if hf_space_id: | |
| # HF Space URLs follow pattern: https://{owner}-{space-name}.hf.space | |
| owner = hf_space_id.split("/")[0] if "/" in hf_space_id else hf_space_id | |
| allowed_origins.extend( | |
| [ | |
| f"https://{owner.lower()}-*.hf.space", # wildcard for all spaces from same owner | |
| f"https://huggingface.co", | |
| ] | |
| ) | |
| # Always allow localhost for local dev/testing | |
| allowed_origins.extend( | |
| [ | |
| "http://localhost:3000", | |
| "http://localhost:3001", | |
| "http://127.0.0.1:3000", | |
| ] | |
| ) | |
| # If no specific origins configured, allow all (appropriate for public APIs) | |
| if not allowed_origins or os.getenv("CORS_ALLOW_ALL", "false").lower() == "true": | |
| allowed_origins = ["*"] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=allowed_origins, | |
| allow_credentials=allowed_origins != ["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ββ Routers β always on βββββββββββββββββββββββββββββββββββββββββββββββ | |
| from app.routers.finance import router as finance_router | |
| from app.routers.analyze import router as analyze_router | |
| from app.routers.history import router as history_router | |
| from app.routers.feedback import router as feedback_router | |
| from app.routers.websocket import router as websocket_router | |
| app.include_router(finance_router) | |
| app.include_router(analyze_router) | |
| app.include_router(history_router) | |
| app.include_router(feedback_router) | |
| app.include_router(websocket_router) | |
| # Sentinel (always on, checks internal feature flag for logic) | |
| from app.routers.sentinel import router as sentinel_router | |
| from app.routers.status import router as status_router | |
| from app.routers.voice import router as voice_router | |
| app.include_router(sentinel_router) | |
| app.include_router(status_router) | |
| app.include_router(voice_router) | |
| # ββ Health (supports HEAD for HF health checker) ββββββββββββββββββββββ | |
| async def health(request=None): | |
| from fastapi import Request | |
| graph_ok = getattr(getattr(app, "state", None), "graph", None) is not None | |
| return { | |
| "status": "ok" if graph_ok else "degraded", | |
| "graph": "ready" if graph_ok else "failed", | |
| "space": os.getenv("SPACE_ID", "local"), | |
| "version": "1.0.0", | |
| "error_detail": getattr(getattr(app, "state", None), "graph_error", "none"), | |
| } | |
| async def get_model_metrics(request: Request): | |
| collector = getattr(request.app.state, "metrics_collector", None) | |
| if not collector: | |
| raise HTTPException(status_code=503, detail="Metrics collector not initialized") | |
| return collector.get_stats() | |
| async def health_graph_error(): | |
| from app.graph import graph_status | |
| return graph_status() | |
| async def health_deep(): | |
| graph_ok = getattr(getattr(app, "state", None), "graph", None) is not None | |
| return { | |
| "status": "ok" if graph_ok else "degraded", | |
| "space": os.getenv("SPACE_ID", "local"), | |
| "features": { | |
| "simulation": os.getenv("SIMULATION_ENABLED", "true") == "true", | |
| "sentinel": os.getenv("SENTINEL_ENABLED", "true") == "true", | |
| "learning": True, # Enabled by persistence manager | |
| "adaptive": os.getenv("ADAPTIVE_INTELLIGENCE_ENABLED", "false") == "true", | |
| "training": bool(os.getenv("KAGGLE_CONFIG")), | |
| "curiosity": os.getenv("CURIOSITY_ENGINE_ENABLED", "false") == "true", | |
| }, | |
| "data_sources": { | |
| "yfinance": True, | |
| "alphavantage": bool(os.getenv("ALPHAVANTAGE_API_KEY")), | |
| "finnhub": bool(os.getenv("FINNHUB_API_KEY")), | |
| "fmp": bool(os.getenv("FMP_API_KEY")), | |
| "eodhd": bool(os.getenv("EODHD_API_KEY")), | |
| "tavily": bool(os.getenv("TAVILY_API_KEY")), | |
| "newsapi": bool(os.getenv("NEWS_API_KEY") or os.getenv("NEWSAPI_KEY")), | |
| "kaggle": bool(os.getenv("KAGGLE_CONFIG")), | |
| }, | |
| "persistence": { | |
| "hf_store": bool(os.getenv("HF_STORE_REPO")), | |
| "ephemeral": os.getenv("SPACE_ID", "") != "" | |
| and not os.getenv("HF_STORE_REPO"), | |
| }, | |
| } | |
| async def run_query(body: dict, background_tasks=None): | |
| from fastapi.responses import JSONResponse | |
| try: | |
| return await _execute_case_request(app, body) | |
| except HTTPException as e: | |
| return JSONResponse(status_code=e.status_code, content={"detail": e.detail}) | |
| except Exception as e: | |
| logger.error("Pipeline error: %s", e) | |
| return JSONResponse(status_code=500, content={"detail": str(e)}) | |
| async def provider_models(request: Request): | |
| _check_provider_auth(request) | |
| return {"object": "list", "data": _JANUS_PROVIDER_MODELS} | |
| async def provider_model_detail(model_id: str, request: Request): | |
| from fastapi.responses import JSONResponse | |
| _check_provider_auth(request) | |
| model = next((item for item in _JANUS_PROVIDER_MODELS if item["id"] == model_id), None) | |
| if not model: | |
| return JSONResponse(status_code=404, content={"error": {"message": "Model not found", "type": "invalid_request_error"}}) | |
| return model | |
| async def provider_embeddings(request: Request, body: dict): | |
| from fastapi.responses import JSONResponse | |
| _check_provider_auth(request) | |
| model = body.get("model", "janus-embed") | |
| input_payload = body.get("input", "") | |
| dimensions = int(body.get("dimensions", 256) or 256) | |
| if isinstance(input_payload, str): | |
| texts = [input_payload] | |
| elif isinstance(input_payload, list): | |
| texts = [ | |
| _message_content_to_text(item).strip() if not isinstance(item, str) else item | |
| for item in input_payload | |
| ] | |
| else: | |
| texts = [_message_content_to_text(input_payload).strip()] | |
| texts = [text for text in texts if str(text).strip()] | |
| if not texts: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": {"message": "Missing input", "type": "invalid_request_error"}}, | |
| ) | |
| data = [] | |
| total_tokens = 0 | |
| for index, text in enumerate(texts): | |
| embedding = _embed_text(text, dimensions=dimensions) | |
| total_tokens += _approx_tokens(text) | |
| data.append( | |
| { | |
| "object": "embedding", | |
| "index": index, | |
| "embedding": embedding, | |
| } | |
| ) | |
| return { | |
| "object": "list", | |
| "data": data, | |
| "model": model, | |
| "usage": {"prompt_tokens": total_tokens, "total_tokens": total_tokens}, | |
| } | |
| async def provider_chat_completions(request: Request, body: dict): | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| _check_provider_auth(request) | |
| model = body.get("model", "janus-chat") | |
| messages = body.get("messages") or [] | |
| user_input = _extract_user_input_from_messages(messages) | |
| if not user_input: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": {"message": "Missing messages/user content", "type": "invalid_request_error"}}, | |
| ) | |
| try: | |
| case_response = await _execute_case_request( | |
| app, | |
| { | |
| "user_input": user_input, | |
| "context": _provider_context_from_body(body), | |
| }, | |
| ) | |
| tool_call = _select_provider_tool_call( | |
| user_input, | |
| case_response.get("route", {}), | |
| body.get("tools") or [], | |
| body.get("tool_choice", "auto"), | |
| ) | |
| if body.get("janus_execute_tools") and tool_call: | |
| execution = await _execute_provider_tool_call( | |
| app, tool_call, user_input, case_response.get("route", {}) | |
| ) | |
| if body.get("janus_reason_over_tools", True): | |
| tool_summary = await asyncio.to_thread( | |
| _reason_over_tool_execution, user_input, execution | |
| ) | |
| else: | |
| tool_summary = _summarize_provider_tool_execution(execution, user_input) | |
| executed_case_response = { | |
| **case_response, | |
| "final": { | |
| **case_response.get("final", {}), | |
| "response": tool_summary, | |
| }, | |
| "final_answer": tool_summary, | |
| } | |
| if body.get("stream"): | |
| return StreamingResponse( | |
| _stream_chat_completion_response(model, executed_case_response), | |
| media_type="text/event-stream", | |
| ) | |
| response = _build_chat_completion_response(model, executed_case_response) | |
| response.setdefault("janus", {})["executed_tools"] = [execution] | |
| return response | |
| if body.get("stream"): | |
| return StreamingResponse( | |
| _stream_chat_completion_response(model, case_response, tool_call), | |
| media_type="text/event-stream", | |
| ) | |
| return _build_chat_completion_response(model, case_response, tool_call) | |
| except HTTPException as e: | |
| return JSONResponse(status_code=e.status_code, content={"error": {"message": e.detail, "type": "invalid_request_error"}}) | |
| except Exception as e: | |
| logger.error("Provider chat completion error: %s", e) | |
| return JSONResponse(status_code=500, content={"error": {"message": str(e), "type": "server_error"}}) | |
| async def provider_responses(request: Request, body: dict): | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| _check_provider_auth(request) | |
| model = body.get("model", "janus-chat") | |
| input_payload = body.get("input", "") | |
| if isinstance(input_payload, str): | |
| user_input = input_payload.strip() | |
| elif isinstance(input_payload, list): | |
| user_input = _extract_user_input_from_messages(input_payload) | |
| else: | |
| user_input = _message_content_to_text(input_payload).strip() | |
| if not user_input: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": {"message": "Missing input", "type": "invalid_request_error"}}, | |
| ) | |
| try: | |
| case_response = await _execute_case_request( | |
| app, | |
| { | |
| "user_input": user_input, | |
| "context": { | |
| **_provider_context_from_body({"model": model, "messages": input_payload if isinstance(input_payload, list) else []}), | |
| "responses_api": {"instructions": body.get("instructions", "")}, | |
| }, | |
| }, | |
| ) | |
| tool_call = _select_provider_tool_call( | |
| user_input, | |
| case_response.get("route", {}), | |
| body.get("tools") or [], | |
| body.get("tool_choice", "auto"), | |
| ) | |
| if body.get("janus_execute_tools") and tool_call: | |
| execution = await _execute_provider_tool_call( | |
| app, tool_call, user_input, case_response.get("route", {}) | |
| ) | |
| if body.get("janus_reason_over_tools", True): | |
| tool_summary = await asyncio.to_thread( | |
| _reason_over_tool_execution, user_input, execution | |
| ) | |
| else: | |
| tool_summary = _summarize_provider_tool_execution(execution, user_input) | |
| executed_case_response = { | |
| **case_response, | |
| "final": { | |
| **case_response.get("final", {}), | |
| "response": tool_summary, | |
| }, | |
| "final_answer": tool_summary, | |
| } | |
| if body.get("stream"): | |
| return StreamingResponse( | |
| _stream_responses_api_response(model, executed_case_response), | |
| media_type="text/event-stream", | |
| ) | |
| response = _build_responses_api_response(model, executed_case_response) | |
| response.setdefault("janus", {})["executed_tools"] = [execution] | |
| return response | |
| if body.get("stream"): | |
| return StreamingResponse( | |
| _stream_responses_api_response(model, case_response, tool_call), | |
| media_type="text/event-stream", | |
| ) | |
| return _build_responses_api_response(model, case_response, tool_call) | |
| except HTTPException as e: | |
| return JSONResponse(status_code=e.status_code, content={"error": {"message": e.detail, "type": "invalid_request_error"}}) | |
| except Exception as e: | |
| logger.error("Provider responses API error: %s", e) | |
| return JSONResponse(status_code=500, content={"error": {"message": str(e), "type": "server_error"}}) | |
| async def list_cases(): | |
| from app.services.case_store import list_cases as list_saved_cases | |
| cases = list_saved_cases(limit=50) | |
| return {"cases": cases, "count": len(cases)} | |
| async def config_status(): | |
| return { | |
| "primary_provider": os.getenv("PRIMARY_PROVIDER", "huggingface"), | |
| "space_id": os.getenv("SPACE_ID", "local"), | |
| "persistent_store": bool(os.getenv("HF_STORE_REPO")), | |
| } | |
| # ββ Silence HF Space internal log-viewer poll ββββββββββββββββββββββββββ | |
| async def root(request: Request, logs: str = None): | |
| if _frontend_server_path() is not None: | |
| return await _proxy_frontend_request(request) | |
| return {"status": "ok", "service": "Janus"} | |
| # ββ Daemon routes ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def daemon_status(): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| from app.agents.smart_router import get_router_status | |
| status = daemon.get_status() | |
| status["router_health"] = get_router_status() | |
| return status | |
| return {"running": False, "message": "Daemon not started"} | |
| async def daemon_trigger(): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| daemon._force_cycles = True | |
| daemon_id = getattr(daemon, "trigger_cycle", lambda: "legacy_trigger")() | |
| return {"status": "triggered", "id": daemon_id, "message": "Global daemon cycle forced."} | |
| return {"error": "Daemon not available"} | |
| async def daemon_analyze_dissonance( | |
| file: UploadFile = File(..., description="Audio file"), | |
| transcript: str = Form(...), | |
| video: Optional[UploadFile] = File(None, description="Optional Video file for visual dissonance") | |
| ): | |
| """Analyze audio vs transcript for emotional conflict.""" | |
| from app.services.mmsa_engine import mmsa_engine | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| # Save files to temp | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp_audio: | |
| shutil.copyfileobj(file.file, tmp_audio) | |
| audio_path = tmp_audio.name | |
| video_path = None | |
| if video: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(video.filename).suffix) as tmp_video: | |
| shutil.copyfileobj(video.file, tmp_video) | |
| video_path = tmp_video.name | |
| try: | |
| results = mmsa_engine.analyze(audio_path, transcript, video_path) | |
| return results | |
| finally: | |
| if os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| if video_path and os.path.exists(video_path): | |
| os.remove(video_path) | |
| async def daemon_analyze_url( | |
| url: str = Form(...), | |
| transcript: str = Form(...) | |
| ): | |
| """Analyze a YouTube or Stream URL for emotional conflict.""" | |
| from app.services.mmsa_engine import mmsa_engine | |
| return mmsa_engine.analyze_url(url, transcript) | |
| async def guardian_analyze_file(file: UploadFile = File(...)): | |
| """Analyze a screenshot or PDF for scam journey patterns.""" | |
| from app.services.guardian_sensory import guardian_sensory | |
| import shutil | |
| import tempfile | |
| # Save to temp | |
| suffix = os.path.splitext(file.filename)[1].lower() | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
| shutil.copyfileobj(file.file, tmp) | |
| tmp_path = tmp.name | |
| try: | |
| if suffix in ['.pdf']: | |
| return guardian_sensory.analyze_document(tmp_path) | |
| else: | |
| # Assume image for screenshot analysis | |
| return guardian_sensory.analyze_screenshot(tmp_path) | |
| finally: | |
| if os.path.exists(tmp_path): | |
| os.remove(tmp_path) | |
| async def guardian_analyze_url(url: str, transcript: Optional[str] = None): | |
| """Universal URL Probe: Fuses Phishing Heuristics with MMSA Dissonance for Video.""" | |
| from app.services.guardian_sensory import guardian_sensory | |
| from app.services.mmsa_engine import mmsa_engine | |
| # 1. Base URL Forensics (LinkBrain) | |
| safety_report = guardian_sensory.analyze_url(url) | |
| # 2. Multimodal Dissonance (MMSA) if YouTube | |
| if "youtube.com" in url or "youtu.be" in url: | |
| mmsa_report = mmsa_engine.analyze_url(url, transcript or "Autonomous scan β no manual transcript provided.") | |
| if "error" not in mmsa_report: | |
| # Fuse reports | |
| safety_report["details"]["mmsa"] = mmsa_report | |
| safety_report["risk_score"] = float(max(safety_report["risk_score"], mmsa_report.get("deception_probability", 0))) | |
| safety_report["reason"] += f" | MMSA Detection: {mmsa_report.get('reliability_tier')} confidence dissonance detected." | |
| safety_report["safe_action"] = mmsa_report.get("safe_action", safety_report["safe_action"]) | |
| return safety_report | |
| async def daemon_calibrate_dissonance(): | |
| """Trigger threshold calibration and generate Accuracy Report.""" | |
| from app.services.mmsa_engine import mmsa_engine | |
| return mmsa_engine.calibrate() | |
| async def daemon_alerts(limit: int = 20, min_severity: str = "low"): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| try: | |
| return daemon.signal_queue.get_alerts( | |
| limit=limit, min_severity=min_severity | |
| ) | |
| except Exception: | |
| return daemon.signal_queue.get_stats() | |
| return [] | |
| async def daemon_adaptive_status(): | |
| adaptive = getattr(app.state, "adaptive", None) | |
| if adaptive: | |
| return adaptive.get_full_intelligence_report() | |
| return {"running": False, "message": "Adaptive engine not active"} | |
| async def trigger_adaptive_now(): | |
| adaptive = getattr(app.state, "adaptive", None) | |
| if adaptive and hasattr(adaptive, "run_evolution_cycle"): | |
| # Offload to task | |
| asyncio.create_task(adaptive.run_evolution_cycle()) | |
| return {"status": "triggered", "message": "Adaptive evolution cycle started in background."} | |
| return {"error": "Adaptive evolution not available"} | |
| async def daemon_watchlist(): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| try: | |
| return daemon.market_watcher.get_watchlist_status() | |
| except Exception: | |
| return {"watchlist": daemon.market_watcher.watchlist} | |
| return [] | |
| async def daemon_events(limit: int = 20, event_type: str = None): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| try: | |
| if event_type: | |
| return daemon.event_detector.get_events_by_type(event_type) | |
| return daemon.event_detector.get_recent_events(limit=limit) | |
| except Exception as e: | |
| return {"events": [], "error": str(e)} | |
| return {"events": []} | |
| async def daemon_circadian(): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| try: | |
| return daemon.circadian.get_status() | |
| except Exception: | |
| phase = daemon.circadian.get_current_phase() | |
| return {"phase": phase.value} | |
| return {"running": False} | |
| async def daemon_curiosity(): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| try: | |
| return daemon.curiosity.get_status() | |
| except Exception as e: | |
| return {"running": True, "error": str(e)} | |
| return {"running": False} | |
| async def curiosity_discoveries(limit: int = 10): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| try: | |
| return daemon.curiosity.get_discoveries(limit=limit) | |
| except Exception: | |
| return [] | |
| return [] | |
| async def curiosity_interests(): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| try: | |
| return daemon.curiosity.get_interests() | |
| except Exception: | |
| return {} | |
| return {} | |
| async def trigger_curiosity_now(): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| try: | |
| report = daemon.curiosity.run_curiosity_cycle() | |
| daemon.last_curiosity_cycle = report | |
| return report | |
| except Exception as e: | |
| return {"error": str(e)} | |
| return {"error": "Daemon not running"} | |
| async def daemon_dreams(): | |
| daemon = _services.get("daemon") | |
| if daemon and daemon.last_dream: | |
| return daemon.last_dream | |
| return {"dreams": [], "message": "No dream cycle run yet"} | |
| async def trigger_dream_now(): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| try: | |
| report = daemon.dream_processor.run_dream_cycle() | |
| daemon.last_dream = report | |
| return report | |
| except Exception as e: | |
| return {"error": str(e)} | |
| return {"error": "Daemon not running"} | |
| async def pending_thoughts(): | |
| daemon = _services.get("daemon") | |
| if daemon: | |
| thoughts = getattr(daemon, "_pending_thoughts", []) | |
| return {"pending_thoughts": thoughts[:10], "count": len(thoughts)} | |
| return {"pending_thoughts": [], "count": 0} | |
| async def get_context(query: str = ""): | |
| daemon = _services.get("daemon") | |
| signals = [] | |
| context_engine = getattr(app.state, "context_engine", None) | |
| if daemon: | |
| try: | |
| signals = ( | |
| list(daemon.signal_queue._queue)[-10:] | |
| if hasattr(daemon.signal_queue, "_queue") | |
| else [] | |
| ) | |
| except Exception: | |
| pass | |
| if query: | |
| return { | |
| "context": _build_runtime_context(app, query, None), | |
| "recent_signals": len(signals), | |
| } | |
| adaptive = getattr(app.state, "adaptive", None) | |
| memory_manager = getattr(app.state, "memory_manager", None) | |
| from app.services.scam_graph import scam_graph | |
| from app.services.guardian_interceptor import guardian_interceptor | |
| snapshot_query = query or getattr(context_engine, "_last_topic", "") or "system state" | |
| return { | |
| "context": "ok", | |
| "snapshot": _build_runtime_context(app, snapshot_query, None), | |
| "recent_signals": len(signals), | |
| "pending_thoughts": len( | |
| getattr(context_engine, "get_pending_thoughts", lambda: [])() | |
| ), | |
| "recent_discoveries": len(getattr(daemon, "last_curiosity_cycle", {}).get("discoveries", [])) | |
| if daemon | |
| else 0, | |
| "memory_cases": memory_manager.total_cases() | |
| if memory_manager and hasattr(memory_manager, "total_cases") | |
| else 0, | |
| "memory_patterns": memory_manager.get_frequent_patterns(min_freq=2)[:5] | |
| if memory_manager and hasattr(memory_manager, "get_frequent_patterns") | |
| else [], | |
| "adaptive_cases": adaptive.total_cases if adaptive else 0, | |
| "guardian": { | |
| "active_interventions": len(guardian_interceptor.active_interventions), | |
| "graph_nodes": len(scam_graph.graph.nodes) | |
| } | |
| } | |
| # ββ Memory routes ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def memory_stats(): | |
| try: | |
| from app.memory import knowledge_store | |
| from app.services.case_store import memory_stats as get_case_memory_stats | |
| memory_manager = getattr(app.state, "memory_manager", None) | |
| stats = ( | |
| knowledge_store.get_stats() | |
| if hasattr(knowledge_store, "get_stats") | |
| else {} | |
| ) | |
| case_stats = get_case_memory_stats() | |
| return { | |
| "queries": stats.get("total_queries", 0), | |
| "entities": stats.get("total_entities", 0), | |
| "links": stats.get("total_links", 0), | |
| "insights": stats.get("total_links", 0), | |
| "domains": stats.get("domain_counts", {}), | |
| "total_cases": case_stats.get("total_cases", 0), | |
| "latest_case_id": case_stats.get("latest_case_id"), | |
| "disk_bytes": case_stats.get("disk_bytes", 0), | |
| "indexed_case_memory": memory_manager.total_cases() | |
| if memory_manager and hasattr(memory_manager, "total_cases") | |
| else 0, | |
| "domain_stats": memory_manager.get_domain_stats() | |
| if memory_manager and hasattr(memory_manager, "get_domain_stats") | |
| else {}, | |
| "frequent_patterns": memory_manager.get_frequent_patterns(min_freq=2)[:10] | |
| if memory_manager and hasattr(memory_manager, "get_frequent_patterns") | |
| else [], | |
| } | |
| except Exception as e: | |
| return { | |
| "queries": 0, | |
| "entities": 0, | |
| "links": 0, | |
| "insights": 0, | |
| "domains": {}, | |
| "total_cases": 0, | |
| "error": str(e), | |
| } | |
| async def memory_queries(limit: int = 20): | |
| try: | |
| from app.memory import knowledge_store | |
| return ( | |
| knowledge_store.get_recent_queries(limit=limit) | |
| if hasattr(knowledge_store, "get_recent_queries") | |
| else [] | |
| ) | |
| except Exception: | |
| return [] | |
| # ββ Intelligence / Cache routes ββββββββββββββββββββββββββββββββββββββββ | |
| async def intelligence_report(): | |
| cache = _services.get("cache") | |
| daemon = _services.get("daemon") | |
| adaptive = getattr(app.state, "adaptive", None) | |
| learning_engine = getattr(app.state, "learning_engine", None) | |
| memory_manager = getattr(app.state, "memory_manager", None) | |
| self_reflection = getattr(app.state, "self_reflection", None) | |
| tracer = getattr(app.state, "tracer", None) | |
| curator = getattr(app.state, "curator", None) | |
| from app.services.scam_graph import scam_graph | |
| from app.services.guardian_interceptor import guardian_interceptor | |
| sentinel_status = {} | |
| try: | |
| from app.routers.sentinel import engine as sentinel_engine | |
| sentinel_status = sentinel_engine.get_status() | |
| except Exception: | |
| sentinel_status = {} | |
| # Memory stats | |
| memory_stats = {} | |
| try: | |
| if memory_manager and hasattr(memory_manager, "total_cases"): | |
| memory_stats = { | |
| "total_cases": memory_manager.total_cases(), | |
| "domain_stats": memory_manager.get_domain_stats(), | |
| "frequent_patterns": memory_manager.get_frequent_patterns(min_freq=2)[:10], | |
| } | |
| except Exception as e: | |
| logger.warning("Failed to get memory stats for report: %s", e) | |
| # Guardian stats | |
| guardian_stats = {} | |
| try: | |
| from app.services.scam_graph import scam_graph | |
| from app.services.guardian_interceptor import guardian_interceptor | |
| guardian_stats = { | |
| "intervention_threshold": guardian_interceptor.intervention_threshold, | |
| "active_interventions": len(guardian_interceptor.active_interventions), | |
| "graph_nodes": len(scam_graph.graph.nodes), | |
| "graph_edges": len(scam_graph.graph.edges) | |
| } | |
| except Exception as e: | |
| logger.warning("Failed to get guardian stats for report: %s", e) | |
| return { | |
| "status": "ok", | |
| "cache_stats": cache.get_stats() | |
| if cache and hasattr(cache, "get_stats") | |
| else {}, | |
| "daemon_cycles": daemon.cycle_count if daemon else 0, | |
| "daemon_status": daemon.get_status() if daemon and hasattr(daemon, "get_status") else {}, | |
| "adaptive_report": adaptive.get_full_intelligence_report() | |
| if adaptive and hasattr(adaptive, "get_full_intelligence_report") | |
| else {}, | |
| "learning_status": learning_engine.get_status() | |
| if learning_engine and hasattr(learning_engine, "get_status") | |
| else {}, | |
| "memory": memory_stats, | |
| "self_reflection": { | |
| "dataset": self_reflection.get_dataset_stats(), | |
| "top_gaps": self_reflection.get_gaps()[:5], | |
| "top_opinions": self_reflection.get_opinions()[:5], | |
| } | |
| if self_reflection and hasattr(self_reflection, "get_dataset_stats") | |
| else {}, | |
| "observation": tracer.get_stats() if tracer and hasattr(tracer, "get_stats") else {}, | |
| "curation": curator.get_stats() if curator and hasattr(curator, "get_stats") else {}, | |
| "sentinel": sentinel_status, | |
| "guardian": guardian_stats, | |
| "space": os.getenv("SPACE_ID", "local"), | |
| } | |
| async def intelligence_domain(domain: str): | |
| from app.services.case_store import list_cases as list_saved_cases | |
| adaptive = getattr(app.state, "adaptive", None) | |
| memory_manager = getattr(app.state, "memory_manager", None) | |
| tracer = getattr(app.state, "tracer", None) | |
| curator = getattr(app.state, "curator", None) | |
| expertise = {} | |
| if adaptive and hasattr(adaptive, "domain_expertise"): | |
| domain_expertise = adaptive.domain_expertise.get(domain) | |
| if domain_expertise: | |
| expertise = domain_expertise.get_expertise_summary() | |
| recent_cases = [] | |
| for case in list_saved_cases(limit=50, full=True): | |
| case_domain = case.get("route", {}).get("domain", case.get("domain", "general")) | |
| if case_domain == domain: | |
| recent_cases.append( | |
| { | |
| "case_id": case.get("case_id"), | |
| "user_input": case.get("user_input", ""), | |
| "final_answer": str(case.get("final_answer") or case.get("final", {}).get("response", ""))[:240], | |
| "saved_at": case.get("saved_at"), | |
| "trace_score": case.get("trace_score"), | |
| } | |
| ) | |
| if len(recent_cases) >= 10: | |
| break | |
| return { | |
| "domain": domain, | |
| "adaptive_expertise": expertise, | |
| "memory_stats": memory_manager.get_domain_stats().get(domain, {}) | |
| if memory_manager and hasattr(memory_manager, "get_domain_stats") | |
| else {}, | |
| "recent_traces": tracer.get_traces(limit=10, domain=domain) | |
| if tracer and hasattr(tracer, "get_traces") | |
| else [], | |
| "curated_examples": curator.get_curated_examples(limit=10, domain=domain) | |
| if curator and hasattr(curator, "get_curated_examples") | |
| else [], | |
| "recent_cases": recent_cases, | |
| } | |
| async def cache_stats(): | |
| cache = _services.get("cache") | |
| if cache and hasattr(cache, "get_stats"): | |
| return cache.get_stats() | |
| return {"entries": 0, "hit_rate": 0, "status": "unavailable"} | |
| async def cache_cleanup(): | |
| cache = _services.get("cache") | |
| if cache and hasattr(cache, "cleanup_expired"): | |
| expired = cache.cleanup_expired() | |
| return {"expired_removed": expired, "cache_stats": cache.get_stats()} | |
| return {"expired_removed": 0} | |
| async def intelligence_save(): | |
| saved = {} | |
| adaptive = getattr(app.state, "adaptive", None) | |
| if adaptive and hasattr(adaptive, "save"): | |
| adaptive.save() | |
| saved["adaptive"] = True | |
| context_engine = getattr(app.state, "context_engine", None) | |
| if context_engine and hasattr(context_engine, "_save"): | |
| context_engine._save() | |
| saved["context"] = True | |
| self_reflection = getattr(app.state, "self_reflection", None) | |
| if self_reflection and hasattr(self_reflection, "_save"): | |
| self_reflection._save() | |
| saved["self_reflection"] = True | |
| return {"status": "ok", "saved": saved} | |
| # ββ Prompts routes βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def list_prompts_route(): | |
| import pathlib | |
| prompts_dir = pathlib.Path(__file__).parent / "prompts" | |
| prompts = [] | |
| if prompts_dir.exists(): | |
| for f in prompts_dir.glob("*.txt"): | |
| prompts.append({"name": f.stem, "file": f.name}) | |
| return prompts | |
| async def get_prompt_route(name: str): | |
| from fastapi.responses import JSONResponse | |
| import pathlib | |
| prompts_dir = pathlib.Path(__file__).parent / "prompts" | |
| path = prompts_dir / f"{name}.txt" | |
| if not path.exists(): | |
| return JSONResponse(status_code=404, content={"detail": "Prompt not found"}) | |
| return {"name": name, "content": path.read_text(encoding="utf-8")} | |
| async def update_prompt_route(name: str, payload: dict): | |
| import pathlib | |
| prompts_dir = pathlib.Path(__file__).parent / "prompts" | |
| prompts_dir.mkdir(exist_ok=True) | |
| path = prompts_dir / f"{name}.txt" | |
| path.write_text(payload.get("content", ""), encoding="utf-8") | |
| return {"message": "Prompt updated", "name": name} | |
| # ββ Self / Reflection routes βββββββββββββββββββββββββββββββββββββββββββ | |
| async def self_report(): | |
| try: | |
| from app.services.self_reflection import self_reflection | |
| daemon = _services.get("daemon") | |
| context_engine = getattr(app.state, "context_engine", None) | |
| return { | |
| "opinions": self_reflection.get_opinions()[:10], | |
| "corrections": self_reflection.get_corrections()[:5], | |
| "gaps": self_reflection.get_gaps()[:5], | |
| "dataset": self_reflection.get_dataset_stats() | |
| if hasattr(self_reflection, "get_dataset_stats") | |
| else {}, | |
| "self_model": getattr(self_reflection, "self_model", {}), | |
| "pending_thoughts": context_engine.get_pending_thoughts()[:5] | |
| if context_engine and hasattr(context_engine, "get_pending_thoughts") | |
| else [], | |
| "curiosity": daemon.curiosity.get_status() | |
| if daemon and hasattr(daemon, "curiosity") | |
| else {}, | |
| "dreams": daemon.dream_processor.get_status() | |
| if daemon and hasattr(daemon, "dream_processor") | |
| else {}, | |
| } | |
| except Exception as e: | |
| return {"opinions": [], "corrections": [], "gaps": [], "error": str(e)} | |
| async def self_opinions(topic: str = None): | |
| try: | |
| from app.services.self_reflection import self_reflection | |
| return {"opinions": self_reflection.get_opinions(topic)} | |
| except Exception: | |
| return {"opinions": []} | |
| async def self_corrections(topic: str = None): | |
| try: | |
| from app.services.self_reflection import self_reflection | |
| return { | |
| "corrections": self_reflection.get_corrections(topic) | |
| if topic | |
| else self_reflection.get_corrections() | |
| } | |
| except Exception: | |
| return {"corrections": []} | |
| async def self_gaps(): | |
| try: | |
| from app.services.self_reflection import self_reflection | |
| return {"gaps": self_reflection.get_gaps()} | |
| except Exception: | |
| return {"gaps": []} | |
| async def self_dataset(): | |
| try: | |
| from app.services.self_reflection import self_reflection | |
| return ( | |
| self_reflection.get_dataset_stats() | |
| if hasattr(self_reflection, "get_dataset_stats") | |
| else {} | |
| ) | |
| except Exception: | |
| return {} | |
| # ββ Extended Cases routes ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def case_detail(case_id: str): | |
| from fastapi.responses import JSONResponse | |
| from app.services.case_store import get_case | |
| data = get_case(case_id) | |
| if data: | |
| return data | |
| return JSONResponse(status_code=404, content={"detail": "Case not found"}) | |
| async def case_delete(case_id: str): | |
| from fastapi.responses import JSONResponse | |
| from app.services.case_store import delete_case | |
| if delete_case(case_id): | |
| return {"deleted": True, "case_id": case_id} | |
| return JSONResponse(status_code=404, content={"detail": "Case not found"}) | |
| async def case_raw(case_id: str): | |
| from fastapi.responses import JSONResponse | |
| from app.services.case_store import get_case | |
| data = get_case(case_id) | |
| if data: | |
| import json | |
| return {"raw": json.dumps(data, indent=2, ensure_ascii=False)} | |
| return JSONResponse(status_code=404, content={"detail": "Case not found"}) | |
| # ββ Agents / Pipeline / Debug routes ββββββββββββββββββββββββββββββββββ | |
| async def list_agents(): | |
| from app.services.agent_registry import list_agents as list_registered_agents | |
| return list_registered_agents() | |
| async def agent_detail(agent_name: str): | |
| from fastapi.responses import JSONResponse | |
| from app.services.agent_registry import get_agent | |
| agent = get_agent(agent_name) | |
| if not agent: | |
| return JSONResponse(status_code=404, content={"detail": "Agent not found"}) | |
| prompt_preview = None | |
| prompt_name = agent.get("prompt_name") | |
| if prompt_name: | |
| try: | |
| from app.config import load_prompt | |
| prompt_preview = load_prompt(prompt_name)[:400] | |
| except Exception: | |
| prompt_preview = None | |
| return { | |
| **agent, | |
| "status": "active", | |
| "prompt_preview": prompt_preview, | |
| } | |
| async def pipeline_stats(): | |
| memory_manager = getattr(app.state, "memory_manager", None) | |
| tracer = getattr(app.state, "tracer", None) | |
| daemon = _services.get("daemon") | |
| return { | |
| "graph_ready": getattr(getattr(app, "state", None), "graph", None) | |
| is not None, | |
| "services": { | |
| "daemon": daemon is not None, | |
| "adaptive": getattr(app.state, "adaptive", None) is not None, | |
| "learning": getattr(app.state, "learning_engine", None) is not None, | |
| "tracing": tracer is not None, | |
| }, | |
| "memory_cases": memory_manager.total_cases() | |
| if memory_manager and hasattr(memory_manager, "total_cases") | |
| else 0, | |
| "trace_stats": tracer.get_stats() if tracer and hasattr(tracer, "get_stats") else {}, | |
| "space": os.getenv("SPACE_ID", "local"), | |
| } | |
| async def debug_state(case_id: str): | |
| from fastapi.responses import JSONResponse | |
| from app.services.case_store import get_case | |
| case = get_case(case_id) | |
| if not case: | |
| return JSONResponse(status_code=404, content={"detail": "Case not found"}) | |
| memory_manager = getattr(app.state, "memory_manager", None) | |
| tracer = getattr(app.state, "tracer", None) | |
| debug_trace = None | |
| if case.get("trace_id") and tracer and hasattr(tracer, "get_traces"): | |
| recent_traces = tracer.get_traces(limit=200) | |
| debug_trace = next( | |
| (trace for trace in recent_traces if trace.get("trace_id") == case.get("trace_id")), | |
| None, | |
| ) | |
| return { | |
| "case_id": case_id, | |
| "case": case, | |
| "context_preview": _build_runtime_context(app, case.get("user_input", ""), None), | |
| "similar_cases": memory_manager.find_similar(case.get("user_input", ""), top_k=5) | |
| if memory_manager and hasattr(memory_manager, "find_similar") | |
| else [], | |
| "trace": debug_trace, | |
| } | |
| # ββ Traces / Curation / Domain routes βββββββββββββββββββββββββββββββββ | |
| async def list_traces(limit: int = 50, score_min: float = 0.0, domain: str = None): | |
| tracer = getattr(app.state, "tracer", None) | |
| traces = tracer.get_traces(limit=limit, score_min=score_min, domain=domain) if tracer and hasattr(tracer, "get_traces") else [] | |
| return {"traces": traces, "count": len(traces)} | |
| async def traces_stats(): | |
| tracer = getattr(app.state, "tracer", None) | |
| return tracer.get_stats() if tracer and hasattr(tracer, "get_stats") else {"total_traces": 0} | |
| async def curation_examples(limit: int = 50, domain: str = None, query_type: str = None): | |
| curator = getattr(app.state, "curator", None) | |
| examples = curator.get_curated_examples(limit=limit, domain=domain, query_type=query_type) if curator and hasattr(curator, "get_curated_examples") else [] | |
| return {"examples": examples, "count": len(examples)} | |
| async def curation_stats(): | |
| curator = getattr(app.state, "curator", None) | |
| return curator.get_stats() if curator and hasattr(curator, "get_stats") else {"curated_count": 0, "rejected_count": 0} | |
| async def curation_push(limit: int = 500): | |
| hf_pusher = getattr(app.state, "hf_pusher", None) | |
| if hf_pusher and hasattr(hf_pusher, "push_curated_dataset"): | |
| return hf_pusher.push_curated_dataset(limit=limit) | |
| return {"error": "HF dataset pusher unavailable"} | |
| async def domain_classify(query: str = ""): | |
| if not query.strip(): | |
| return {"detail": "Missing query"} | |
| domain_classifier = getattr(app.state, "domain_classifier", None) | |
| query_classifier = getattr(app.state, "query_classifier", None) | |
| domain_result = domain_classifier.classify(query) if domain_classifier and hasattr(domain_classifier, "classify") else None | |
| query_result = query_classifier.classify(query) if query_classifier and hasattr(query_classifier, "classify") else None | |
| top_domains = domain_classifier.get_top_domains(query, top_n=3) if domain_classifier and hasattr(domain_classifier, "get_top_domains") else [] | |
| top_domains_payload = [ | |
| {"domain": item[0].value, "confidence": item[1]} for item in top_domains | |
| ] | |
| if ( | |
| query_result | |
| and query_result[2].get("detected_domain") | |
| and query_result[2].get("detected_domain") != "general" | |
| and (not top_domains or top_domains[0][0].value == "general") | |
| ): | |
| top_domains_payload = [ | |
| { | |
| "domain": query_result[2]["detected_domain"], | |
| "confidence": query_result[1], | |
| } | |
| ] | |
| resolved_domain = domain_result.domain.value if domain_result else "general" | |
| resolved_confidence = domain_result.confidence if domain_result else 0.5 | |
| if ( | |
| resolved_domain == "general" | |
| and query_result | |
| and query_result[2].get("detected_domain") | |
| and query_result[2].get("detected_domain") != "general" | |
| ): | |
| resolved_domain = query_result[2]["detected_domain"] | |
| resolved_confidence = max(resolved_confidence, query_result[1]) | |
| return { | |
| "domain": resolved_domain, | |
| "confidence": resolved_confidence, | |
| "keywords_found": domain_result.keywords_found if domain_result else [], | |
| "reasoning": domain_result.reasoning if domain_result else "classifier unavailable", | |
| "query_type": getattr(query_result[0], "value", "unknown") if query_result else "unknown", | |
| "query_type_confidence": query_result[1] if query_result else 0.0, | |
| "query_metadata": query_result[2] if query_result else {}, | |
| "top_domains": top_domains_payload, | |
| } | |
| async def domain_confidence(query: str = ""): | |
| domain_classifier = getattr(app.state, "domain_classifier", None) | |
| memory_manager = getattr(app.state, "memory_manager", None) | |
| if not query.strip(): | |
| return { | |
| "domains": memory_manager.get_domain_stats() | |
| if memory_manager and hasattr(memory_manager, "get_domain_stats") | |
| else {} | |
| } | |
| if not domain_classifier or not hasattr(domain_classifier, "domain_keywords"): | |
| return {"domains": {}} | |
| domain_scores = {} | |
| for domain_type in domain_classifier.domain_keywords.keys(): | |
| try: | |
| domain_scores[domain_type.value] = domain_classifier.get_domain_confidence(query, domain_type) | |
| except Exception: | |
| continue | |
| return {"domains": domain_scores} | |
| async def domain_top(query: str = "", limit: int = 5): | |
| domain_classifier = getattr(app.state, "domain_classifier", None) | |
| query_classifier = getattr(app.state, "query_classifier", None) | |
| memory_manager = getattr(app.state, "memory_manager", None) | |
| if not query.strip(): | |
| top_domains = [] | |
| if memory_manager and hasattr(memory_manager, "get_domain_stats"): | |
| domain_stats = memory_manager.get_domain_stats() | |
| top_domains = sorted( | |
| [ | |
| {"domain": name, **stats} | |
| for name, stats in domain_stats.items() | |
| ], | |
| key=lambda item: item.get("count", 0), | |
| reverse=True, | |
| )[:limit] | |
| return {"top_domains": top_domains} | |
| top_domains = domain_classifier.get_top_domains(query, top_n=limit) if domain_classifier and hasattr(domain_classifier, "get_top_domains") else [] | |
| if ( | |
| query_classifier | |
| and hasattr(query_classifier, "classify") | |
| and (not top_domains or top_domains[0][0].value == "general") | |
| ): | |
| try: | |
| _, confidence, metadata = query_classifier.classify(query) | |
| detected_domain = metadata.get("detected_domain") | |
| if detected_domain and detected_domain != "general": | |
| return { | |
| "top_domains": [ | |
| {"domain": detected_domain, "confidence": confidence} | |
| ] | |
| } | |
| except Exception: | |
| pass | |
| return { | |
| "top_domains": [ | |
| {"domain": domain.value, "confidence": confidence} | |
| for domain, confidence in top_domains | |
| ] | |
| } | |
| async def health_features(): | |
| return { | |
| "simulation": os.getenv("SIMULATION_ENABLED", "true") == "true", | |
| "sentinel": os.getenv("SENTINEL_ENABLED", "true") == "true", | |
| "learning": os.getenv("LEARNING_ENABLED", "false") == "true", | |
| "curiosity": os.getenv("CURIOSITY_ENGINE_ENABLED", "false") == "true", | |
| } | |
| # ββ Optional routers ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if os.getenv("SIMULATION_ENABLED", "true").lower() == "true": | |
| try: | |
| from app.routers.simulation import router as sim_router | |
| app.include_router(sim_router) | |
| except Exception as e: | |
| logger.warning("Simulation router unavailable: %s", e) | |
| if os.getenv("LEARNING_ENABLED", "false").lower() == "true": | |
| try: | |
| from app.routers.learning import router as learning_router | |
| app.include_router(learning_router, tags=["learning"]) | |
| except Exception as e: | |
| logger.warning("Learning router unavailable: %s", e) | |
| # ββ Prometheus metrics (optional but useful for HF Space monitoring) ββ | |
| try: | |
| from prometheus_fastapi_instrumentator import Instrumentator | |
| Instrumentator().instrument(app).expose(app) | |
| except ImportError: | |
| pass | |
| async def frontend_catchall(request: Request, full_path: str): | |
| return await _proxy_frontend_request(request, full_path) | |
| return app | |
| app = create_app() | |
| # ββ Entry point for HF Spaces βββββββββββββββββββββββββββββββββββββββββββββ | |
| # HF Spaces expects the server to bind on 0.0.0.0:7860 | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", "7860")) | |
| uvicorn.run( | |
| "app.main:app", | |
| host="0.0.0.0", | |
| port=port, | |
| log_level="info", | |
| reload=False, # Never reload in production/HF Space | |
| workers=1, # Single worker β HF free tier has limited RAM | |
| ) | |