import json import time from collections.abc import Iterator import httpx from open_cortex.runtime.events import RuntimeEvent from open_cortex.runtime.metrics import fetch_runtime_snapshot from open_cortex.runtime.messages import ChatMessage, to_llama_messages CHAT_URL = "http://127.0.0.1:8080/v1/chat/completions" METRICS_URL = "http://127.0.0.1:8080/metrics" SLOTS_URL = "http://127.0.0.1:8080/slots" def _raise_for_status_with_body(response: httpx.Response) -> None: if response.is_error: try: response.read() except httpx.HTTPError: pass response.raise_for_status() def _working_memory_percent( context_tokens: int | None, context_size: int | None, ) -> float | None: if context_tokens is None or not context_size: return None return round(min(100.0, context_tokens / context_size * 100), 1) def _detect_repetition(text: str) -> bool: normalized = " ".join(text.split()) if len(normalized) < 120: return False for window in (96, 72, 48, 32): if len(normalized) <= window * 2: continue tail = normalized[-window:] if normalized[:-window].count(tail) >= 1: return True return False def stream_chat_events(message: list[ChatMessage]) -> Iterator[RuntimeEvent]: request_body = { "messages": to_llama_messages(message), "temperature": 0.2, "max_tokens": 1024, "stream": True, "stream_options": {"include_usage": True}, "timings_per_token": True, } request_started = time.perf_counter() first_token_seen = False first_token_at = None generated_tokens = 0 generated_text = "" final_stats = None base_context_tokens = None context_size = None yield RuntimeEvent( kind="request_started", text_delta="", ttft_ms=None, snapshot=None, ) with httpx.Client(timeout=120.0, trust_env=False) as client: with client.stream("POST", CHAT_URL, json=request_body) as response: _raise_for_status_with_body(response) for line in response.iter_lines(): if not line.startswith("data: "): continue data = line.removeprefix("data: ") if data == "[DONE]": break event = json.loads(data) choices = event.get("choices", []) if choices: content = choices[0].get("delta", {}).get("content") if content: now = time.perf_counter() generated_tokens += 1 generated_text += content elapsed_ms = ( (now - first_token_at) * 1000 if first_token_at is not None else 0.0 ) live_tps = ( generated_tokens / (elapsed_ms / 1000) if elapsed_ms > 0 else None ) repetition_detected = _detect_repetition(generated_text) if not first_token_seen: first_token_seen = True first_token_at = now ttft_ms = (first_token_at - request_started) * 1000 snapshot = fetch_runtime_snapshot( client, METRICS_URL, SLOTS_URL, ) base_context_tokens = ( snapshot.slot_context_tokens[0] if snapshot.slot_context_tokens else None ) context_size = snapshot.slot_context_size context_tokens = ( base_context_tokens + generated_tokens if base_context_tokens is not None else None ) yield RuntimeEvent( kind="first_token", text_delta=content, ttft_ms=ttft_ms, snapshot=snapshot, generated_tokens=generated_tokens, elapsed_ms=0.0, live_tps=None, repetition_detected=repetition_detected, context_tokens=context_tokens, context_size=context_size, working_memory_percent=_working_memory_percent( context_tokens, context_size, ), ) else: context_tokens = ( base_context_tokens + generated_tokens if base_context_tokens is not None else None ) yield RuntimeEvent( kind="token", text_delta=content, ttft_ms=None, snapshot=None, generated_tokens=generated_tokens, elapsed_ms=elapsed_ms, live_tps=live_tps, repetition_detected=repetition_detected, context_tokens=context_tokens, context_size=context_size, working_memory_percent=_working_memory_percent( context_tokens, context_size, ), ) if event.get("usage"): final_stats = event if final_stats is not None: usage = final_stats["usage"] timings = final_stats["timings"] context_tokens = usage["prompt_tokens"] + usage["completion_tokens"] yield RuntimeEvent( kind="request_completed", text_delta="", ttft_ms=None, snapshot=None, prompt_tokens=usage["prompt_tokens"], completion_tokens=usage["completion_tokens"], prompt_tps=timings["prompt_per_second"], decode_tps=timings["predicted_per_second"], generated_tokens=usage["completion_tokens"], repetition_detected=_detect_repetition(generated_text), context_tokens=context_tokens, context_size=context_size, working_memory_percent=_working_memory_percent( context_tokens, context_size, ), )