Spaces:
Sleeping
Sleeping
| import json | |
| import time | |
| from collections.abc import Iterator | |
| import httpx | |
| from open_cortex.runtime.events import RuntimeEvent | |
| from open_cortex.runtime.metrics import fetch_runtime_snapshot | |
| from open_cortex.runtime.messages import ChatMessage, to_llama_messages | |
| CHAT_URL = "http://127.0.0.1:8080/v1/chat/completions" | |
| METRICS_URL = "http://127.0.0.1:8080/metrics" | |
| SLOTS_URL = "http://127.0.0.1:8080/slots" | |
| def _raise_for_status_with_body(response: httpx.Response) -> None: | |
| if response.is_error: | |
| try: | |
| response.read() | |
| except httpx.HTTPError: | |
| pass | |
| response.raise_for_status() | |
| def _working_memory_percent( | |
| context_tokens: int | None, | |
| context_size: int | None, | |
| ) -> float | None: | |
| if context_tokens is None or not context_size: | |
| return None | |
| return round(min(100.0, context_tokens / context_size * 100), 1) | |
| def _detect_repetition(text: str) -> bool: | |
| normalized = " ".join(text.split()) | |
| if len(normalized) < 120: | |
| return False | |
| for window in (96, 72, 48, 32): | |
| if len(normalized) <= window * 2: | |
| continue | |
| tail = normalized[-window:] | |
| if normalized[:-window].count(tail) >= 1: | |
| return True | |
| return False | |
| def stream_chat_events(message: list[ChatMessage]) -> Iterator[RuntimeEvent]: | |
| request_body = { | |
| "messages": to_llama_messages(message), | |
| "temperature": 0.2, | |
| "max_tokens": 1024, | |
| "stream": True, | |
| "stream_options": {"include_usage": True}, | |
| "timings_per_token": True, | |
| } | |
| request_started = time.perf_counter() | |
| first_token_seen = False | |
| first_token_at = None | |
| generated_tokens = 0 | |
| generated_text = "" | |
| final_stats = None | |
| base_context_tokens = None | |
| context_size = None | |
| yield RuntimeEvent( | |
| kind="request_started", | |
| text_delta="", | |
| ttft_ms=None, | |
| snapshot=None, | |
| ) | |
| with httpx.Client(timeout=120.0, trust_env=False) as client: | |
| with client.stream("POST", CHAT_URL, json=request_body) as response: | |
| _raise_for_status_with_body(response) | |
| for line in response.iter_lines(): | |
| if not line.startswith("data: "): | |
| continue | |
| data = line.removeprefix("data: ") | |
| if data == "[DONE]": | |
| break | |
| event = json.loads(data) | |
| choices = event.get("choices", []) | |
| if choices: | |
| content = choices[0].get("delta", {}).get("content") | |
| if content: | |
| now = time.perf_counter() | |
| generated_tokens += 1 | |
| generated_text += content | |
| elapsed_ms = ( | |
| (now - first_token_at) * 1000 | |
| if first_token_at is not None | |
| else 0.0 | |
| ) | |
| live_tps = ( | |
| generated_tokens / (elapsed_ms / 1000) | |
| if elapsed_ms > 0 | |
| else None | |
| ) | |
| repetition_detected = _detect_repetition(generated_text) | |
| if not first_token_seen: | |
| first_token_seen = True | |
| first_token_at = now | |
| ttft_ms = (first_token_at - request_started) * 1000 | |
| snapshot = fetch_runtime_snapshot( | |
| client, | |
| METRICS_URL, | |
| SLOTS_URL, | |
| ) | |
| base_context_tokens = ( | |
| snapshot.slot_context_tokens[0] | |
| if snapshot.slot_context_tokens | |
| else None | |
| ) | |
| context_size = snapshot.slot_context_size | |
| context_tokens = ( | |
| base_context_tokens + generated_tokens | |
| if base_context_tokens is not None | |
| else None | |
| ) | |
| yield RuntimeEvent( | |
| kind="first_token", | |
| text_delta=content, | |
| ttft_ms=ttft_ms, | |
| snapshot=snapshot, | |
| generated_tokens=generated_tokens, | |
| elapsed_ms=0.0, | |
| live_tps=None, | |
| repetition_detected=repetition_detected, | |
| context_tokens=context_tokens, | |
| context_size=context_size, | |
| working_memory_percent=_working_memory_percent( | |
| context_tokens, | |
| context_size, | |
| ), | |
| ) | |
| else: | |
| context_tokens = ( | |
| base_context_tokens + generated_tokens | |
| if base_context_tokens is not None | |
| else None | |
| ) | |
| yield RuntimeEvent( | |
| kind="token", | |
| text_delta=content, | |
| ttft_ms=None, | |
| snapshot=None, | |
| generated_tokens=generated_tokens, | |
| elapsed_ms=elapsed_ms, | |
| live_tps=live_tps, | |
| repetition_detected=repetition_detected, | |
| context_tokens=context_tokens, | |
| context_size=context_size, | |
| working_memory_percent=_working_memory_percent( | |
| context_tokens, | |
| context_size, | |
| ), | |
| ) | |
| if event.get("usage"): | |
| final_stats = event | |
| if final_stats is not None: | |
| usage = final_stats["usage"] | |
| timings = final_stats["timings"] | |
| context_tokens = usage["prompt_tokens"] + usage["completion_tokens"] | |
| yield RuntimeEvent( | |
| kind="request_completed", | |
| text_delta="", | |
| ttft_ms=None, | |
| snapshot=None, | |
| prompt_tokens=usage["prompt_tokens"], | |
| completion_tokens=usage["completion_tokens"], | |
| prompt_tps=timings["prompt_per_second"], | |
| decode_tps=timings["predicted_per_second"], | |
| generated_tokens=usage["completion_tokens"], | |
| repetition_detected=_detect_repetition(generated_text), | |
| context_tokens=context_tokens, | |
| context_size=context_size, | |
| working_memory_percent=_working_memory_percent( | |
| context_tokens, | |
| context_size, | |
| ), | |
| ) | |