adeem6's picture
Update core.py
6f817a0 verified
"""
core.py — LLM factory, PII-safe logging, thread-safe SSE event bus.
"""
import json
import os
import queue
import threading
from pathlib import Path
from typing import Any, Dict, Optional
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
ROOT_DIR = Path(__file__).parent
LOG_DIR = ROOT_DIR / "logs"
LOG_DIR.mkdir(exist_ok=True)
load_dotenv()
# ── Thread-safe SSE event bus ─────────────────────────────────────────────────
# We use a threading.local so each Flask request gets its own queue,
# BUT the parallel worker threads spawned by phase1/phase4 need to
# inherit the same queue from their parent thread.
# Solution: store the queue in a module-level dict keyed by "root thread id".
_queue_registry: Dict[int, queue.Queue] = {}
_registry_lock = threading.Lock()
_thread_to_root: Dict[int, int] = {} # child thread → root request thread
def set_event_queue(q: queue.Queue) -> None:
"""Called once per Flask request (in the request's own thread)."""
tid = threading.get_ident()
with _registry_lock:
_queue_registry[tid] = q
_thread_to_root[tid] = tid # root maps to itself
def register_child_thread(root_tid: int) -> None:
"""
Called at the start of each parallel worker thread so it can find
the correct SSE queue to push events into.
"""
tid = threading.get_ident()
with _registry_lock:
_thread_to_root[tid] = root_tid
def get_event_queue() -> Optional[queue.Queue]:
tid = threading.get_ident()
with _registry_lock:
root = _thread_to_root.get(tid, tid)
return _queue_registry.get(root)
def clear_event_queue() -> None:
tid = threading.get_ident()
with _registry_lock:
_queue_registry.pop(tid, None)
# Clean up child mappings for this root
to_del = [k for k, v in _thread_to_root.items() if v == tid]
for k in to_del:
_thread_to_root.pop(k, None)
def push_event(event_type: str, data: Dict[str, Any]) -> None:
q = get_event_queue()
if q:
q.put({"type": event_type, "data": data})
# ── App config ────────────────────────────────────────────────────────────────
def is_deterministic() -> bool:
return os.getenv("AISA_DETERMINISTIC", "0") == "1"
def get_llm(model: str = "gpt-4o-mini") -> ChatOpenAI:
temperature = 0.0 if is_deterministic() else 0.4
return ChatOpenAI(model=model, temperature=temperature)
# ── PII-safe logging ──────────────────────────────────────────────────────────
_SENSITIVE_KEYS = {"api_key", "headers", "text", "summary", "description",
"query", "user_text"}
_log_lock = threading.Lock()
def log_step(agent: str, event: str, payload: Dict[str, Any]) -> None:
safe: Dict[str, Any] = {}
for k, v in payload.items():
if k in _SENSITIVE_KEYS:
continue
if isinstance(v, str) and len(v) > 200:
v = v[:200] + "…[truncated]"
safe[k] = v
record = {"agent": agent, "event": event, "payload": safe}
# Disk write (thread-safe via lock)
try:
with _log_lock:
with (LOG_DIR / "agent_steps.log").open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
except OSError:
pass
# SSE push
push_event("agent_step", {"agent": agent, "event": event, "payload": safe})