""" core.py — LLM factory, PII-safe logging, thread-safe SSE event bus. """ import json import os import queue import threading from pathlib import Path from typing import Any, Dict, Optional from dotenv import load_dotenv from langchain_openai import ChatOpenAI ROOT_DIR = Path(__file__).parent LOG_DIR = ROOT_DIR / "logs" LOG_DIR.mkdir(exist_ok=True) load_dotenv() # ── Thread-safe SSE event bus ───────────────────────────────────────────────── # We use a threading.local so each Flask request gets its own queue, # BUT the parallel worker threads spawned by phase1/phase4 need to # inherit the same queue from their parent thread. # Solution: store the queue in a module-level dict keyed by "root thread id". _queue_registry: Dict[int, queue.Queue] = {} _registry_lock = threading.Lock() _thread_to_root: Dict[int, int] = {} # child thread → root request thread def set_event_queue(q: queue.Queue) -> None: """Called once per Flask request (in the request's own thread).""" tid = threading.get_ident() with _registry_lock: _queue_registry[tid] = q _thread_to_root[tid] = tid # root maps to itself def register_child_thread(root_tid: int) -> None: """ Called at the start of each parallel worker thread so it can find the correct SSE queue to push events into. """ tid = threading.get_ident() with _registry_lock: _thread_to_root[tid] = root_tid def get_event_queue() -> Optional[queue.Queue]: tid = threading.get_ident() with _registry_lock: root = _thread_to_root.get(tid, tid) return _queue_registry.get(root) def clear_event_queue() -> None: tid = threading.get_ident() with _registry_lock: _queue_registry.pop(tid, None) # Clean up child mappings for this root to_del = [k for k, v in _thread_to_root.items() if v == tid] for k in to_del: _thread_to_root.pop(k, None) def push_event(event_type: str, data: Dict[str, Any]) -> None: q = get_event_queue() if q: q.put({"type": event_type, "data": data}) # ── App config ──────────────────────────────────────────────────────────────── def is_deterministic() -> bool: return os.getenv("AISA_DETERMINISTIC", "0") == "1" def get_llm(model: str = "gpt-4o-mini") -> ChatOpenAI: temperature = 0.0 if is_deterministic() else 0.4 return ChatOpenAI(model=model, temperature=temperature) # ── PII-safe logging ────────────────────────────────────────────────────────── _SENSITIVE_KEYS = {"api_key", "headers", "text", "summary", "description", "query", "user_text"} _log_lock = threading.Lock() def log_step(agent: str, event: str, payload: Dict[str, Any]) -> None: safe: Dict[str, Any] = {} for k, v in payload.items(): if k in _SENSITIVE_KEYS: continue if isinstance(v, str) and len(v) > 200: v = v[:200] + "…[truncated]" safe[k] = v record = {"agent": agent, "event": event, "payload": safe} # Disk write (thread-safe via lock) try: with _log_lock: with (LOG_DIR / "agent_steps.log").open("a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") except OSError: pass # SSE push push_event("agent_step", {"agent": agent, "event": event, "payload": safe})