Mirrowel commited on
Commit
54f0f2c
·
1 Parent(s): 33e95e6

feat: Implement request logging for API responses and enhance streaming response handling

Browse files
src/proxy_app/main.py CHANGED
@@ -7,11 +7,14 @@ from dotenv import load_dotenv
7
  import logging
8
  from pathlib import Path
9
  import sys
 
 
10
 
11
  # Add the 'src' directory to the Python path to allow importing 'rotating_api_key_client'
12
  sys.path.append(str(Path(__file__).resolve().parent.parent))
13
 
14
  from rotator_library import RotatingClient, PROVIDER_PLUGINS
 
15
 
16
  # Configure logging
17
  logging.basicConfig(level=logging.INFO)
@@ -20,6 +23,7 @@ logging.basicConfig(level=logging.INFO)
20
  load_dotenv()
21
 
22
  # --- Configuration ---
 
23
  PROXY_API_KEY = os.getenv("PROXY_API_KEY")
24
  if not PROXY_API_KEY:
25
  raise ValueError("PROXY_API_KEY environment variable not set.")
@@ -62,29 +66,111 @@ async def verify_api_key(auth: str = Depends(api_key_header)):
62
  raise HTTPException(status_code=401, detail="Invalid or missing API Key")
63
  return auth
64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65
  @app.post("/v1/chat/completions")
66
  async def chat_completions(
67
- request: Request,
68
  client: RotatingClient = Depends(get_rotating_client),
69
- _=Depends(verify_api_key)
70
  ):
71
  """
72
  OpenAI-compatible endpoint powered by the RotatingClient.
73
- Handles both streaming and non-streaming responses.
74
  """
75
  try:
76
- data = await request.json()
77
- is_streaming = data.get("stream", False)
78
-
79
- response = await client.acompletion(**data)
80
 
81
  if is_streaming:
82
- return StreamingResponse(response, media_type="text/event-stream")
 
 
 
 
83
  else:
 
 
 
 
 
 
 
84
  return response
85
 
86
  except Exception as e:
87
  logging.error(f"Request failed after all retries: {e}")
 
 
 
 
 
 
 
 
 
 
 
88
  raise HTTPException(status_code=500, detail=str(e))
89
 
90
  @app.get("/")
 
7
  import logging
8
  from pathlib import Path
9
  import sys
10
+ import json
11
+ from typing import AsyncGenerator, Any
12
 
13
  # Add the 'src' directory to the Python path to allow importing 'rotating_api_key_client'
14
  sys.path.append(str(Path(__file__).resolve().parent.parent))
15
 
16
  from rotator_library import RotatingClient, PROVIDER_PLUGINS
17
+ from .request_logger import log_request_response
18
 
19
  # Configure logging
20
  logging.basicConfig(level=logging.INFO)
 
23
  load_dotenv()
24
 
25
  # --- Configuration ---
26
+ ENABLE_REQUEST_LOGGING = False # Set to False to disable request/response logging
27
  PROXY_API_KEY = os.getenv("PROXY_API_KEY")
28
  if not PROXY_API_KEY:
29
  raise ValueError("PROXY_API_KEY environment variable not set.")
 
66
  raise HTTPException(status_code=401, detail="Invalid or missing API Key")
67
  return auth
68
 
69
+ async def streaming_response_wrapper(
70
+ request_data: dict,
71
+ response_stream: AsyncGenerator[str, None]
72
+ ) -> AsyncGenerator[str, None]:
73
+ """
74
+ Wraps a streaming response to log the full response after completion.
75
+ """
76
+ response_chunks = []
77
+ full_response = {}
78
+ try:
79
+ async for chunk_str in response_stream:
80
+ yield chunk_str
81
+ # Process chunk for logging
82
+ if chunk_str.strip() and chunk_str.startswith("data:"):
83
+ content = chunk_str[len("data:"):].strip()
84
+ if content != "[DONE]":
85
+ try:
86
+ chunk_data = json.loads(content)
87
+ response_chunks.append(chunk_data)
88
+ except json.JSONDecodeError:
89
+ # Ignore non-json chunks if any
90
+ pass
91
+ finally:
92
+ # Reconstruct the full response object from chunks
93
+ if response_chunks:
94
+ full_content = "".join(
95
+ choice["delta"]["content"]
96
+ for chunk in response_chunks
97
+ if "choices" in chunk and chunk["choices"]
98
+ for choice in chunk["choices"]
99
+ if "delta" in choice and "content" in choice["delta"] and choice["delta"]["content"]
100
+ )
101
+
102
+ # Take metadata from the first chunk and construct a single choice object
103
+ first_chunk = response_chunks[0]
104
+ final_choice = {
105
+ "index": 0,
106
+ "message": {
107
+ "role": "assistant",
108
+ "content": full_content,
109
+ },
110
+ "finish_reason": "stop", # Assuming 'stop' as stream ended
111
+ }
112
+
113
+ full_response = {
114
+ "id": first_chunk.get("id"),
115
+ "object": "chat.completion", # Final object is a completion, not a chunk
116
+ "created": first_chunk.get("created"),
117
+ "model": first_chunk.get("model"),
118
+ "choices": [final_choice],
119
+ "usage": None # Usage is not typically available in the stream itself
120
+ }
121
+
122
+ if ENABLE_REQUEST_LOGGING:
123
+ log_request_response(
124
+ request_data=request_data,
125
+ response_data=full_response,
126
+ is_streaming=True
127
+ )
128
+
129
  @app.post("/v1/chat/completions")
130
  async def chat_completions(
131
+ request: Request,
132
  client: RotatingClient = Depends(get_rotating_client),
133
+ _ = Depends(verify_api_key)
134
  ):
135
  """
136
  OpenAI-compatible endpoint powered by the RotatingClient.
137
+ Handles both streaming and non-streaming responses and logs them.
138
  """
139
  try:
140
+ request_data = await request.json()
141
+ is_streaming = request_data.get("stream", False)
142
+
143
+ response = await client.acompletion(**request_data)
144
 
145
  if is_streaming:
146
+ # Wrap the streaming response to enable logging after it's complete
147
+ return StreamingResponse(
148
+ streaming_response_wrapper(request_data, response),
149
+ media_type="text/event-stream"
150
+ )
151
  else:
152
+ # For non-streaming, log immediately
153
+ if ENABLE_REQUEST_LOGGING:
154
+ log_request_response(
155
+ request_data=request_data,
156
+ response_data=response.dict(),
157
+ is_streaming=False
158
+ )
159
  return response
160
 
161
  except Exception as e:
162
  logging.error(f"Request failed after all retries: {e}")
163
+ # Optionally log the failed request
164
+ if ENABLE_REQUEST_LOGGING:
165
+ try:
166
+ request_data = await request.json()
167
+ except json.JSONDecodeError:
168
+ request_data = {"error": "Could not parse request body"}
169
+ log_request_response(
170
+ request_data=request_data,
171
+ response_data={"error": str(e)},
172
+ is_streaming=request_data.get("stream", False)
173
+ )
174
  raise HTTPException(status_code=500, detail=str(e))
175
 
176
  @app.get("/")
src/proxy_app/request_logger.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import os
3
+ from datetime import datetime
4
+ from pathlib import Path
5
+ import uuid
6
+
7
+ LOGS_DIR = Path(__file__).resolve().parent.parent.parent / "logs"
8
+ LOGS_DIR.mkdir(exist_ok=True)
9
+
10
+ def log_request_response(request_data: dict, response_data: dict, is_streaming: bool):
11
+ """
12
+ Logs the request and response data to a single file in the logs directory.
13
+ """
14
+ try:
15
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
16
+ unique_id = uuid.uuid4()
17
+ filename = LOGS_DIR / f"{timestamp}_{unique_id}.json"
18
+
19
+ log_content = {
20
+ "request": request_data,
21
+ "response": response_data,
22
+ "is_streaming": is_streaming
23
+ }
24
+
25
+ with open(filename, "w") as f:
26
+ json.dump(log_content, f, indent=4)
27
+
28
+ except Exception as e:
29
+ # In case of logging failure, we don't want to crash the main application
30
+ print(f"Error logging request/response: {e}")