from __future__ import annotations import copy import hashlib import json import threading import uuid from dataclasses import dataclass, field from typing import Any, Dict, List _LOCK = threading.Lock() _FINGERPRINT_TO_UUID: Dict[str, str] = {} _ORDER: List[str] = [] _MAX_ENTRIES = 10000 _RESPONSES_SESSION_STATE: Dict[str, "_ResponsesSessionState"] = {} _RESPONSES_ORDER: List[str] = [] @dataclass(frozen=True) class PreparedResponsesRequest: payload: Dict[str, Any] session_id: str @dataclass class _ResponsesSessionState: last_request_payload: Dict[str, Any] | None = None last_response_id: str | None = None last_response_items: List[Dict[str, Any]] = field(default_factory=list) inflight_request_payload: Dict[str, Any] | None = None inflight_track_result: bool = False inflight_response_id: str | None = None inflight_response_items: List[Dict[str, Any]] = field(default_factory=list) def _canonicalize_first_user_message(input_items: List[Dict[str, Any]]) -> Dict[str, Any] | None: """ Extract the first stable user message from Responses input items. Good use for a fingerprint for prompt caching. """ for item in input_items: if not isinstance(item, dict): continue if item.get("type") != "message": continue role = item.get("role") if role != "user": continue content = item.get("content") if not isinstance(content, list): continue norm_content = [] for part in content: if not isinstance(part, dict): continue ptype = part.get("type") if ptype == "input_text": text = part.get("text") if isinstance(part.get("text"), str) else "" if text: norm_content.append({"type": "input_text", "text": text}) elif ptype == "input_image": url = part.get("image_url") if isinstance(part.get("image_url"), str) else None if url: norm_content.append({"type": "input_image", "image_url": url}) if norm_content: return {"type": "message", "role": "user", "content": norm_content} return None def canonicalize_prefix(instructions: str | None, input_items: List[Dict[str, Any]]) -> str: prefix: Dict[str, Any] = {} if isinstance(instructions, str) and instructions.strip(): prefix["instructions"] = instructions.strip() first_user = _canonicalize_first_user_message(input_items) if first_user is not None: prefix["first_user_message"] = first_user return json.dumps(prefix, sort_keys=True, separators=(",", ":")) def _fingerprint(s: str) -> str: return hashlib.sha256(s.encode("utf-8")).hexdigest() def _remember(fp: str, sid: str) -> None: if fp in _FINGERPRINT_TO_UUID: return _FINGERPRINT_TO_UUID[fp] = sid _ORDER.append(fp) if len(_ORDER) > _MAX_ENTRIES: oldest = _ORDER.pop(0) _FINGERPRINT_TO_UUID.pop(oldest, None) def _remember_responses_session(session_id: str) -> _ResponsesSessionState: state = _RESPONSES_SESSION_STATE.get(session_id) if state is None: state = _ResponsesSessionState() _RESPONSES_SESSION_STATE[session_id] = state _RESPONSES_ORDER.append(session_id) if len(_RESPONSES_ORDER) > _MAX_ENTRIES: oldest = _RESPONSES_ORDER.pop(0) _RESPONSES_SESSION_STATE.pop(oldest, None) return state def _request_without_input(payload: Dict[str, Any]) -> Dict[str, Any]: clone = copy.deepcopy(payload) clone["input"] = [] clone.pop("previous_response_id", None) return clone def _input_list(payload: Dict[str, Any]) -> List[Dict[str, Any]] | None: raw = payload.get("input") if not isinstance(raw, list): return None return [item for item in copy.deepcopy(raw) if isinstance(item, dict)] def _conversation_output_items(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: reusable: List[Dict[str, Any]] = [] for item in items: if not isinstance(item, dict): continue item_type = item.get("type") if item_type == "reasoning": continue reusable.append(copy.deepcopy(item)) return reusable def _clear_reuse_state(state: _ResponsesSessionState) -> None: state.last_request_payload = None state.last_response_id = None state.last_response_items = [] state.inflight_request_payload = None state.inflight_track_result = False state.inflight_response_id = None state.inflight_response_items = [] def _clear_inflight(state: _ResponsesSessionState) -> None: state.inflight_request_payload = None state.inflight_track_result = False state.inflight_response_id = None state.inflight_response_items = [] def ensure_session_id( instructions: str | None, input_items: List[Dict[str, Any]], client_supplied: str | None = None, ) -> str: if isinstance(client_supplied, str) and client_supplied.strip(): return client_supplied.strip() canon = canonicalize_prefix(instructions, input_items) fp = _fingerprint(canon) with _LOCK: if fp in _FINGERPRINT_TO_UUID: return _FINGERPRINT_TO_UUID[fp] sid = str(uuid.uuid4()) _remember(fp, sid) return sid def prepare_responses_request_for_session( session_id: str, payload: Dict[str, Any], *, allow_previous_response_id: bool = True, ) -> PreparedResponsesRequest: full_payload = copy.deepcopy(payload) outbound_payload = copy.deepcopy(payload) explicit_previous_response_id = ( isinstance(full_payload.get("previous_response_id"), str) and bool(full_payload.get("previous_response_id").strip()) ) with _LOCK: state = _remember_responses_session(session_id) if explicit_previous_response_id: _clear_reuse_state(state) return PreparedResponsesRequest( payload=outbound_payload, session_id=session_id, ) request_input = _input_list(full_payload) if ( allow_previous_response_id and state.last_request_payload is not None and state.last_response_id and request_input is not None and _request_without_input(state.last_request_payload) == _request_without_input(full_payload) ): baseline: List[Dict[str, Any]] = [] previous_input = _input_list(state.last_request_payload) if previous_input is not None: baseline.extend(previous_input) baseline.extend(copy.deepcopy(state.last_response_items)) baseline_len = len(baseline) if request_input[:baseline_len] == baseline and baseline_len <= len(request_input): outbound_payload["input"] = copy.deepcopy(request_input[baseline_len:]) outbound_payload["previous_response_id"] = state.last_response_id state.inflight_request_payload = full_payload state.inflight_track_result = True state.inflight_response_id = None state.inflight_response_items = [] return PreparedResponsesRequest( payload=outbound_payload, session_id=session_id, ) def note_responses_stream_event(session_id: str, event: Dict[str, Any]) -> None: if not isinstance(session_id, str) or not session_id.strip(): return if not isinstance(event, dict): return with _LOCK: state = _RESPONSES_SESSION_STATE.get(session_id) if state is None: return kind = event.get("type") if kind == "response.created": response = event.get("response") if isinstance(response, dict) and isinstance(response.get("id"), str): state.inflight_response_id = response.get("id") return if kind == "response.output_item.done": item = event.get("item") if isinstance(item, dict): state.inflight_response_items.append(copy.deepcopy(item)) return if kind == "response.completed": response = event.get("response") response_id = None response_items: List[Dict[str, Any]] = copy.deepcopy(state.inflight_response_items) if isinstance(response, dict): if isinstance(response.get("id"), str): response_id = response.get("id") output = response.get("output") if isinstance(output, list) and output: response_items = [copy.deepcopy(item) for item in output if isinstance(item, dict)] if not response_id: response_id = state.inflight_response_id if state.inflight_track_result and state.inflight_request_payload is not None and response_id: state.last_request_payload = copy.deepcopy(state.inflight_request_payload) state.last_response_id = response_id state.last_response_items = _conversation_output_items(response_items) else: state.last_request_payload = None state.last_response_id = None state.last_response_items = [] _clear_inflight(state) return if kind in ("response.failed", "error"): _clear_reuse_state(state) def note_responses_final_response(session_id: str, response_obj: Dict[str, Any]) -> None: if not isinstance(session_id, str) or not session_id.strip(): return if not isinstance(response_obj, dict): return with _LOCK: state = _RESPONSES_SESSION_STATE.get(session_id) if state is None: return response_id = response_obj.get("id") if isinstance(response_obj.get("id"), str) else None output = response_obj.get("output") output_items = [copy.deepcopy(item) for item in output if isinstance(item, dict)] if isinstance(output, list) else [] if state.inflight_track_result and state.inflight_request_payload is not None and response_id: state.last_request_payload = copy.deepcopy(state.inflight_request_payload) state.last_response_id = response_id state.last_response_items = _conversation_output_items(output_items) else: state.last_request_payload = None state.last_response_id = None state.last_response_items = [] _clear_inflight(state) def clear_responses_reuse_state(session_id: str) -> None: if not isinstance(session_id, str) or not session_id.strip(): return with _LOCK: state = _RESPONSES_SESSION_STATE.get(session_id) if state is None: return _clear_reuse_state(state) def reset_session_state() -> None: with _LOCK: _FINGERPRINT_TO_UUID.clear() _ORDER.clear() _RESPONSES_SESSION_STATE.clear() _RESPONSES_ORDER.clear()