Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import unittest | |
| from unittest import mock | |
| import json | |
| from services.config import config | |
| from services.protocol import openai_v1_chat_complete, openai_v1_response | |
| from services.protocol.chat_completion_cache import chat_completion_cache | |
| from services.protocol.conversation import iter_conversation_payloads, sanitize_output_text | |
| class ChatCompletionCacheTests(unittest.TestCase): | |
| def setUp(self) -> None: | |
| self.old_cache_settings = config.data.get("chat_completion_cache") | |
| config.data["chat_completion_cache"] = { | |
| "enabled": True, | |
| "ttl_seconds": 60, | |
| "max_entries": 32, | |
| "dedupe_inflight": True, | |
| "stream_cache": True, | |
| "normalize_messages": True, | |
| "drop_adjacent_duplicates": True, | |
| "drop_assistant_history": False, | |
| } | |
| chat_completion_cache.clear() | |
| def tearDown(self) -> None: | |
| if self.old_cache_settings is None: | |
| config.data.pop("chat_completion_cache", None) | |
| else: | |
| config.data["chat_completion_cache"] = self.old_cache_settings | |
| chat_completion_cache.clear() | |
| def test_repeated_non_stream_text_completion_uses_cache(self) -> None: | |
| calls = 0 | |
| def fake_collect_text(_backend, _request): | |
| nonlocal calls | |
| calls += 1 | |
| return f"cached answer {calls}" | |
| body = { | |
| "model": "auto", | |
| "messages": [{"role": "user", "content": "cache this exact prompt"}], | |
| } | |
| with ( | |
| mock.patch("services.protocol.openai_v1_chat_complete.text_backend", return_value=object()), | |
| mock.patch("services.protocol.openai_v1_chat_complete.collect_text", side_effect=fake_collect_text), | |
| ): | |
| first = openai_v1_chat_complete.handle(body) | |
| second = openai_v1_chat_complete.handle(body) | |
| self.assertEqual(calls, 1) | |
| self.assertEqual( | |
| first["choices"][0]["message"]["content"], | |
| second["choices"][0]["message"]["content"], | |
| ) | |
| def test_repeated_stream_text_completion_replays_cached_chunks(self) -> None: | |
| calls = 0 | |
| def fake_stream_text_deltas(_backend, _request): | |
| nonlocal calls | |
| calls += 1 | |
| yield "streamed" | |
| yield " answer" | |
| body = { | |
| "model": "auto", | |
| "stream": True, | |
| "messages": [{"role": "user", "content": "stream cache this exact prompt"}], | |
| } | |
| with ( | |
| mock.patch("services.protocol.openai_v1_chat_complete.text_backend", return_value=object()), | |
| mock.patch( | |
| "services.protocol.openai_v1_chat_complete.stream_text_deltas", | |
| side_effect=fake_stream_text_deltas, | |
| ), | |
| ): | |
| first = list(openai_v1_chat_complete.handle(body)) | |
| second = list(openai_v1_chat_complete.handle(body)) | |
| self.assertEqual(calls, 1) | |
| self.assertEqual(first, second) | |
| content = "".join(str(chunk["choices"][0]["delta"].get("content") or "") for chunk in second) | |
| self.assertEqual(content, "streamed answer") | |
| def test_adjacent_duplicate_messages_are_removed_before_upstream_call(self) -> None: | |
| captured_messages = [] | |
| def fake_collect_text(_backend, request): | |
| captured_messages.extend(request.messages or []) | |
| return "ok" | |
| body = { | |
| "model": "auto", | |
| "messages": [ | |
| {"role": "user", "content": "repeat me"}, | |
| {"role": "user", "content": "repeat me"}, | |
| {"role": "assistant", "content": "old answer"}, | |
| {"role": "user", "content": "next prompt"}, | |
| ], | |
| } | |
| with ( | |
| mock.patch("services.protocol.openai_v1_chat_complete.text_backend", return_value=object()), | |
| mock.patch("services.protocol.openai_v1_chat_complete.collect_text", side_effect=fake_collect_text), | |
| ): | |
| openai_v1_chat_complete.handle(body) | |
| self.assertEqual( | |
| captured_messages, | |
| [ | |
| {"role": "user", "content": "repeat me"}, | |
| {"role": "assistant", "content": "old answer"}, | |
| {"role": "user", "content": "next prompt"}, | |
| ], | |
| ) | |
| def test_chat_completion_usage_includes_cached_tokens(self) -> None: | |
| with ( | |
| mock.patch("services.protocol.openai_v1_chat_complete.text_backend", return_value=object()), | |
| mock.patch("services.protocol.openai_v1_chat_complete.collect_text", return_value="ok"), | |
| ): | |
| response = openai_v1_chat_complete.handle({ | |
| "model": "auto", | |
| "messages": [{"role": "user", "content": "usage shape"}], | |
| }) | |
| details = response["usage"]["prompt_tokens_details"] | |
| self.assertEqual(details["cached_tokens"], 0) | |
| output_details = response["usage"]["completion_tokens_details"] | |
| self.assertEqual(output_details["reasoning_tokens"], 0) | |
| def test_responses_completed_usage_includes_cached_tokens(self) -> None: | |
| with ( | |
| mock.patch("services.protocol.openai_v1_response.text_backend", return_value=object()), | |
| mock.patch("services.protocol.openai_v1_response.stream_text_deltas", return_value=iter(["ok"])), | |
| ): | |
| response = openai_v1_response.handle({ | |
| "model": "auto", | |
| "input": "usage shape", | |
| }) | |
| details = response["usage"]["input_tokens_details"] | |
| self.assertEqual(details["cached_tokens"], 0) | |
| output_details = response["usage"]["output_tokens_details"] | |
| self.assertEqual(output_details["reasoning_tokens"], 0) | |
| def test_repeated_responses_text_request_uses_cache(self) -> None: | |
| calls = 0 | |
| def fake_stream_text_deltas(_backend, _request): | |
| nonlocal calls | |
| calls += 1 | |
| yield f"response cache {calls}" | |
| body = { | |
| "model": "auto", | |
| "input": "cache this responses prompt", | |
| "stream": True, | |
| } | |
| with ( | |
| mock.patch("services.protocol.openai_v1_response.text_backend", return_value=object()), | |
| mock.patch("services.protocol.openai_v1_response.stream_text_deltas", side_effect=fake_stream_text_deltas), | |
| ): | |
| first = list(openai_v1_response.handle(body)) | |
| second = list(openai_v1_response.handle(body)) | |
| self.assertEqual(calls, 1) | |
| self.assertEqual(first, second) | |
| def test_output_sanitizer_removes_chatgpt_annotation_markup(self) -> None: | |
| text = ( | |
| "Repo: \ue200url\ue202basketikun/chatgpt2api" | |
| "\ue202https://github.com/basketikun/chatgpt2api\ue201 " | |
| "details \ue200cite\ue202turn0search0\ue201." | |
| ) | |
| self.assertEqual( | |
| sanitize_output_text(text), | |
| "Repo: basketikun/chatgpt2api (https://github.com/basketikun/chatgpt2api) details .", | |
| ) | |
| def test_stream_sanitizer_does_not_emit_partial_annotation_or_repeat_prefix(self) -> None: | |
| events = [ | |
| {"p": "/message/content/parts/0", "o": "append", "v": "Repo: \ue200url\ue202chat"}, | |
| {"p": "/message/content/parts/0", "o": "append", "v": "gpt2api\ue202turn0search0\ue201 done \ue200cite\ue202turn0\ue201."}, | |
| "[DONE]", | |
| ] | |
| payloads = [json.dumps(event, ensure_ascii=False) if isinstance(event, dict) else event for event in events] | |
| deltas = [ | |
| str(event.get("delta") or "") | |
| for event in iter_conversation_payloads(iter(payloads)) | |
| if event.get("type") == "conversation.delta" | |
| ] | |
| self.assertEqual("".join(deltas), "Repo: chatgpt2api done .") | |
| self.assertFalse(any("\ue200" in delta or "\ue202" in delta or "\ue201" in delta for delta in deltas)) | |
| def test_responses_tools_add_honest_no_tool_guard(self) -> None: | |
| model, messages = openai_v1_response.text_response_parts({ | |
| "model": "auto", | |
| "input": "run echo hi", | |
| "tools": [{"type": "function", "name": "shell"}], | |
| }) | |
| self.assertEqual(model, "auto") | |
| self.assertEqual(messages[0]["role"], "system") | |
| self.assertIn("cannot execute local tools", str(messages[0]["content"])) | |
| if __name__ == "__main__": | |
| unittest.main() | |