CheckMat / chatmock /session.py
aiqknow's picture
Upload 97 files
35205e8 verified
from __future__ import annotations
import copy
import hashlib
import json
import threading
import uuid
from dataclasses import dataclass, field
from typing import Any, Dict, List
_LOCK = threading.Lock()
_FINGERPRINT_TO_UUID: Dict[str, str] = {}
_ORDER: List[str] = []
_MAX_ENTRIES = 10000
_RESPONSES_SESSION_STATE: Dict[str, "_ResponsesSessionState"] = {}
_RESPONSES_ORDER: List[str] = []
@dataclass(frozen=True)
class PreparedResponsesRequest:
payload: Dict[str, Any]
session_id: str
@dataclass
class _ResponsesSessionState:
last_request_payload: Dict[str, Any] | None = None
last_response_id: str | None = None
last_response_items: List[Dict[str, Any]] = field(default_factory=list)
inflight_request_payload: Dict[str, Any] | None = None
inflight_track_result: bool = False
inflight_response_id: str | None = None
inflight_response_items: List[Dict[str, Any]] = field(default_factory=list)
def _canonicalize_first_user_message(input_items: List[Dict[str, Any]]) -> Dict[str, Any] | None:
"""
Extract the first stable user message from Responses input items. Good use for a fingerprint for prompt caching.
"""
for item in input_items:
if not isinstance(item, dict):
continue
if item.get("type") != "message":
continue
role = item.get("role")
if role != "user":
continue
content = item.get("content")
if not isinstance(content, list):
continue
norm_content = []
for part in content:
if not isinstance(part, dict):
continue
ptype = part.get("type")
if ptype == "input_text":
text = part.get("text") if isinstance(part.get("text"), str) else ""
if text:
norm_content.append({"type": "input_text", "text": text})
elif ptype == "input_image":
url = part.get("image_url") if isinstance(part.get("image_url"), str) else None
if url:
norm_content.append({"type": "input_image", "image_url": url})
if norm_content:
return {"type": "message", "role": "user", "content": norm_content}
return None
def canonicalize_prefix(instructions: str | None, input_items: List[Dict[str, Any]]) -> str:
prefix: Dict[str, Any] = {}
if isinstance(instructions, str) and instructions.strip():
prefix["instructions"] = instructions.strip()
first_user = _canonicalize_first_user_message(input_items)
if first_user is not None:
prefix["first_user_message"] = first_user
return json.dumps(prefix, sort_keys=True, separators=(",", ":"))
def _fingerprint(s: str) -> str:
return hashlib.sha256(s.encode("utf-8")).hexdigest()
def _remember(fp: str, sid: str) -> None:
if fp in _FINGERPRINT_TO_UUID:
return
_FINGERPRINT_TO_UUID[fp] = sid
_ORDER.append(fp)
if len(_ORDER) > _MAX_ENTRIES:
oldest = _ORDER.pop(0)
_FINGERPRINT_TO_UUID.pop(oldest, None)
def _remember_responses_session(session_id: str) -> _ResponsesSessionState:
state = _RESPONSES_SESSION_STATE.get(session_id)
if state is None:
state = _ResponsesSessionState()
_RESPONSES_SESSION_STATE[session_id] = state
_RESPONSES_ORDER.append(session_id)
if len(_RESPONSES_ORDER) > _MAX_ENTRIES:
oldest = _RESPONSES_ORDER.pop(0)
_RESPONSES_SESSION_STATE.pop(oldest, None)
return state
def _request_without_input(payload: Dict[str, Any]) -> Dict[str, Any]:
clone = copy.deepcopy(payload)
clone["input"] = []
clone.pop("previous_response_id", None)
return clone
def _input_list(payload: Dict[str, Any]) -> List[Dict[str, Any]] | None:
raw = payload.get("input")
if not isinstance(raw, list):
return None
return [item for item in copy.deepcopy(raw) if isinstance(item, dict)]
def _conversation_output_items(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
reusable: List[Dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
item_type = item.get("type")
if item_type == "reasoning":
continue
reusable.append(copy.deepcopy(item))
return reusable
def _clear_reuse_state(state: _ResponsesSessionState) -> None:
state.last_request_payload = None
state.last_response_id = None
state.last_response_items = []
state.inflight_request_payload = None
state.inflight_track_result = False
state.inflight_response_id = None
state.inflight_response_items = []
def _clear_inflight(state: _ResponsesSessionState) -> None:
state.inflight_request_payload = None
state.inflight_track_result = False
state.inflight_response_id = None
state.inflight_response_items = []
def ensure_session_id(
instructions: str | None,
input_items: List[Dict[str, Any]],
client_supplied: str | None = None,
) -> str:
if isinstance(client_supplied, str) and client_supplied.strip():
return client_supplied.strip()
canon = canonicalize_prefix(instructions, input_items)
fp = _fingerprint(canon)
with _LOCK:
if fp in _FINGERPRINT_TO_UUID:
return _FINGERPRINT_TO_UUID[fp]
sid = str(uuid.uuid4())
_remember(fp, sid)
return sid
def prepare_responses_request_for_session(
session_id: str,
payload: Dict[str, Any],
*,
allow_previous_response_id: bool = True,
) -> PreparedResponsesRequest:
full_payload = copy.deepcopy(payload)
outbound_payload = copy.deepcopy(payload)
explicit_previous_response_id = (
isinstance(full_payload.get("previous_response_id"), str)
and bool(full_payload.get("previous_response_id").strip())
)
with _LOCK:
state = _remember_responses_session(session_id)
if explicit_previous_response_id:
_clear_reuse_state(state)
return PreparedResponsesRequest(
payload=outbound_payload,
session_id=session_id,
)
request_input = _input_list(full_payload)
if (
allow_previous_response_id
and
state.last_request_payload is not None
and state.last_response_id
and request_input is not None
and _request_without_input(state.last_request_payload) == _request_without_input(full_payload)
):
baseline: List[Dict[str, Any]] = []
previous_input = _input_list(state.last_request_payload)
if previous_input is not None:
baseline.extend(previous_input)
baseline.extend(copy.deepcopy(state.last_response_items))
baseline_len = len(baseline)
if request_input[:baseline_len] == baseline and baseline_len <= len(request_input):
outbound_payload["input"] = copy.deepcopy(request_input[baseline_len:])
outbound_payload["previous_response_id"] = state.last_response_id
state.inflight_request_payload = full_payload
state.inflight_track_result = True
state.inflight_response_id = None
state.inflight_response_items = []
return PreparedResponsesRequest(
payload=outbound_payload,
session_id=session_id,
)
def note_responses_stream_event(session_id: str, event: Dict[str, Any]) -> None:
if not isinstance(session_id, str) or not session_id.strip():
return
if not isinstance(event, dict):
return
with _LOCK:
state = _RESPONSES_SESSION_STATE.get(session_id)
if state is None:
return
kind = event.get("type")
if kind == "response.created":
response = event.get("response")
if isinstance(response, dict) and isinstance(response.get("id"), str):
state.inflight_response_id = response.get("id")
return
if kind == "response.output_item.done":
item = event.get("item")
if isinstance(item, dict):
state.inflight_response_items.append(copy.deepcopy(item))
return
if kind == "response.completed":
response = event.get("response")
response_id = None
response_items: List[Dict[str, Any]] = copy.deepcopy(state.inflight_response_items)
if isinstance(response, dict):
if isinstance(response.get("id"), str):
response_id = response.get("id")
output = response.get("output")
if isinstance(output, list) and output:
response_items = [copy.deepcopy(item) for item in output if isinstance(item, dict)]
if not response_id:
response_id = state.inflight_response_id
if state.inflight_track_result and state.inflight_request_payload is not None and response_id:
state.last_request_payload = copy.deepcopy(state.inflight_request_payload)
state.last_response_id = response_id
state.last_response_items = _conversation_output_items(response_items)
else:
state.last_request_payload = None
state.last_response_id = None
state.last_response_items = []
_clear_inflight(state)
return
if kind in ("response.failed", "error"):
_clear_reuse_state(state)
def note_responses_final_response(session_id: str, response_obj: Dict[str, Any]) -> None:
if not isinstance(session_id, str) or not session_id.strip():
return
if not isinstance(response_obj, dict):
return
with _LOCK:
state = _RESPONSES_SESSION_STATE.get(session_id)
if state is None:
return
response_id = response_obj.get("id") if isinstance(response_obj.get("id"), str) else None
output = response_obj.get("output")
output_items = [copy.deepcopy(item) for item in output if isinstance(item, dict)] if isinstance(output, list) else []
if state.inflight_track_result and state.inflight_request_payload is not None and response_id:
state.last_request_payload = copy.deepcopy(state.inflight_request_payload)
state.last_response_id = response_id
state.last_response_items = _conversation_output_items(output_items)
else:
state.last_request_payload = None
state.last_response_id = None
state.last_response_items = []
_clear_inflight(state)
def clear_responses_reuse_state(session_id: str) -> None:
if not isinstance(session_id, str) or not session_id.strip():
return
with _LOCK:
state = _RESPONSES_SESSION_STATE.get(session_id)
if state is None:
return
_clear_reuse_state(state)
def reset_session_state() -> None:
with _LOCK:
_FINGERPRINT_TO_UUID.clear()
_ORDER.clear()
_RESPONSES_SESSION_STATE.clear()
_RESPONSES_ORDER.clear()