peninsula123's picture
Prepare OpenCortex hackathon submission
cb55577
Raw
History Blame Contribute Delete
7.25 kB
import json
import time
from collections.abc import Iterator
import httpx
from open_cortex.runtime.events import RuntimeEvent
from open_cortex.runtime.metrics import fetch_runtime_snapshot
from open_cortex.runtime.messages import ChatMessage, to_llama_messages
CHAT_URL = "http://127.0.0.1:8080/v1/chat/completions"
METRICS_URL = "http://127.0.0.1:8080/metrics"
SLOTS_URL = "http://127.0.0.1:8080/slots"
def _raise_for_status_with_body(response: httpx.Response) -> None:
if response.is_error:
try:
response.read()
except httpx.HTTPError:
pass
response.raise_for_status()
def _working_memory_percent(
context_tokens: int | None,
context_size: int | None,
) -> float | None:
if context_tokens is None or not context_size:
return None
return round(min(100.0, context_tokens / context_size * 100), 1)
def _detect_repetition(text: str) -> bool:
normalized = " ".join(text.split())
if len(normalized) < 120:
return False
for window in (96, 72, 48, 32):
if len(normalized) <= window * 2:
continue
tail = normalized[-window:]
if normalized[:-window].count(tail) >= 1:
return True
return False
def stream_chat_events(message: list[ChatMessage]) -> Iterator[RuntimeEvent]:
request_body = {
"messages": to_llama_messages(message),
"temperature": 0.2,
"max_tokens": 1024,
"stream": True,
"stream_options": {"include_usage": True},
"timings_per_token": True,
}
request_started = time.perf_counter()
first_token_seen = False
first_token_at = None
generated_tokens = 0
generated_text = ""
final_stats = None
base_context_tokens = None
context_size = None
yield RuntimeEvent(
kind="request_started",
text_delta="",
ttft_ms=None,
snapshot=None,
)
with httpx.Client(timeout=120.0, trust_env=False) as client:
with client.stream("POST", CHAT_URL, json=request_body) as response:
_raise_for_status_with_body(response)
for line in response.iter_lines():
if not line.startswith("data: "):
continue
data = line.removeprefix("data: ")
if data == "[DONE]":
break
event = json.loads(data)
choices = event.get("choices", [])
if choices:
content = choices[0].get("delta", {}).get("content")
if content:
now = time.perf_counter()
generated_tokens += 1
generated_text += content
elapsed_ms = (
(now - first_token_at) * 1000
if first_token_at is not None
else 0.0
)
live_tps = (
generated_tokens / (elapsed_ms / 1000)
if elapsed_ms > 0
else None
)
repetition_detected = _detect_repetition(generated_text)
if not first_token_seen:
first_token_seen = True
first_token_at = now
ttft_ms = (first_token_at - request_started) * 1000
snapshot = fetch_runtime_snapshot(
client,
METRICS_URL,
SLOTS_URL,
)
base_context_tokens = (
snapshot.slot_context_tokens[0]
if snapshot.slot_context_tokens
else None
)
context_size = snapshot.slot_context_size
context_tokens = (
base_context_tokens + generated_tokens
if base_context_tokens is not None
else None
)
yield RuntimeEvent(
kind="first_token",
text_delta=content,
ttft_ms=ttft_ms,
snapshot=snapshot,
generated_tokens=generated_tokens,
elapsed_ms=0.0,
live_tps=None,
repetition_detected=repetition_detected,
context_tokens=context_tokens,
context_size=context_size,
working_memory_percent=_working_memory_percent(
context_tokens,
context_size,
),
)
else:
context_tokens = (
base_context_tokens + generated_tokens
if base_context_tokens is not None
else None
)
yield RuntimeEvent(
kind="token",
text_delta=content,
ttft_ms=None,
snapshot=None,
generated_tokens=generated_tokens,
elapsed_ms=elapsed_ms,
live_tps=live_tps,
repetition_detected=repetition_detected,
context_tokens=context_tokens,
context_size=context_size,
working_memory_percent=_working_memory_percent(
context_tokens,
context_size,
),
)
if event.get("usage"):
final_stats = event
if final_stats is not None:
usage = final_stats["usage"]
timings = final_stats["timings"]
context_tokens = usage["prompt_tokens"] + usage["completion_tokens"]
yield RuntimeEvent(
kind="request_completed",
text_delta="",
ttft_ms=None,
snapshot=None,
prompt_tokens=usage["prompt_tokens"],
completion_tokens=usage["completion_tokens"],
prompt_tps=timings["prompt_per_second"],
decode_tps=timings["predicted_per_second"],
generated_tokens=usage["completion_tokens"],
repetition_detected=_detect_repetition(generated_text),
context_tokens=context_tokens,
context_size=context_size,
working_memory_percent=_working_memory_percent(
context_tokens,
context_size,
),
)