Kalpokoch commited on
Commit
c560b3c
·
verified ·
1 Parent(s): 52aa2ef

Update app/app.py

Browse files
Files changed (1) hide show
  1. app/app.py +127 -22
app/app.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import os
2
  import json
3
  import asyncio
@@ -5,7 +6,7 @@ import logging
5
  import uuid
6
  import re
7
  from typing import Dict, List, Optional
8
- from datetime import datetime
9
  from fastapi import FastAPI, HTTPException, Request, BackgroundTasks
10
  from pydantic import BaseModel
11
  from llama_cpp import Llama
@@ -31,9 +32,11 @@ class QueuedRequest:
31
  self.request_id = request_id
32
  self.question = question
33
  self.timestamp = timestamp
34
- self.status = "queued" # queued, processing, completed, failed, timeout
35
  self.result: Optional[Dict] = None
36
  self.error: Optional[str] = None
 
 
37
 
38
  class RequestQueue:
39
  def __init__(self, max_size: int = 15):
@@ -42,9 +45,14 @@ class RequestQueue:
42
  self.completed_requests: Dict[str, QueuedRequest] = {}
43
  self.max_size = max_size
44
  self.lock = asyncio.Lock()
 
 
45
 
46
  async def add_request(self, request_id: str, question: str) -> Dict:
47
  async with self.lock:
 
 
 
48
  if len(self.queue) >= self.max_size:
49
  return {
50
  "status": "queue_full",
@@ -90,27 +98,57 @@ class RequestQueue:
90
  async def complete_request(self, request_id: str, result: Dict = None, error: str = None):
91
  async with self.lock:
92
  if self.processing and self.processing.request_id == request_id:
93
- if result:
 
 
 
 
94
  self.processing.result = result
95
  self.processing.status = "completed"
96
  elif error:
97
  self.processing.error = error
98
  self.processing.status = "failed"
99
 
100
- # Store completed request for result retrieval
101
  self.completed_requests[request_id] = self.processing
102
  self.processing = None
103
-
104
- # Clean up old completed requests (keep only last 50)
105
- if len(self.completed_requests) > 50:
106
- oldest_keys = list(self.completed_requests.keys())[:-50]
107
- for key in oldest_keys:
108
- del self.completed_requests[key]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
 
110
  async def get_request_status(self, request_id: str) -> Optional[Dict]:
111
  async with self.lock:
 
 
 
112
  # Check if currently processing
113
  if self.processing and self.processing.request_id == request_id:
 
 
 
 
 
 
 
 
114
  return {
115
  "status": self.processing.status,
116
  "message": "Your request is currently being processed.",
@@ -120,9 +158,18 @@ class RequestQueue:
120
  # Check completed requests
121
  if request_id in self.completed_requests:
122
  req = self.completed_requests[request_id]
 
 
 
 
 
 
 
 
 
123
  return {
124
  "status": req.status,
125
- "message": "Request completed." if req.status == "completed" else "Request failed.",
126
  "result": req.result,
127
  "error": req.error
128
  }
@@ -130,6 +177,7 @@ class RequestQueue:
130
  # Check queue
131
  for i, req in enumerate(self.queue):
132
  if req.request_id == request_id:
 
133
  return {
134
  "status": "queued",
135
  "message": f"Your request is #{i+1} in queue.",
@@ -139,6 +187,21 @@ class RequestQueue:
139
 
140
  return None
141
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
142
  async def get_queue_info(self) -> Dict:
143
  async with self.lock:
144
  return {
@@ -242,9 +305,28 @@ async def process_queued_requests():
242
  if next_request:
243
  logger.info(f"Processing queued request: {next_request.request_id}")
244
  try:
 
 
 
 
 
 
 
 
 
245
  result = await process_chat_request(next_request.question, next_request.request_id)
246
- await request_queue.complete_request(next_request.request_id, result=result)
247
- logger.info(f"Completed request: {next_request.request_id}")
 
 
 
 
 
 
 
 
 
 
248
  except Exception as e:
249
  error_msg = f"Error processing request: {str(e)}"
250
  logger.error(f"Failed to process request {next_request.request_id}: {e}", exc_info=True)
@@ -425,14 +507,37 @@ async def chat(query: Query, request: Request):
425
  @app.get("/status/{request_id}")
426
  async def get_request_status(request_id: str):
427
  """Check the status of a specific request"""
428
- status = await request_queue.get_request_status(request_id)
429
- if not status:
430
- raise HTTPException(status_code=404, detail="Request not found")
431
-
432
- return {
433
- "request_id": request_id,
434
- **status
435
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
436
 
437
  @app.get("/queue")
438
  async def get_queue_status():
@@ -441,7 +546,7 @@ async def get_queue_status():
441
 
442
  @app.post("/feedback")
443
  async def collect_feedback(feedback: Feedback, request: Request):
444
- adapter = get_logger_adapter(request)
445
  feedback_log = {
446
  "type": "USER_FEEDBACK",
447
  "request_id": feedback.request_id,
 
1
+ # app.py - Updated with better request handling
2
  import os
3
  import json
4
  import asyncio
 
6
  import uuid
7
  import re
8
  from typing import Dict, List, Optional
9
+ from datetime import datetime, timedelta
10
  from fastapi import FastAPI, HTTPException, Request, BackgroundTasks
11
  from pydantic import BaseModel
12
  from llama_cpp import Llama
 
32
  self.request_id = request_id
33
  self.question = question
34
  self.timestamp = timestamp
35
+ self.status = "queued" # queued, processing, completed, failed, timeout, cancelled
36
  self.result: Optional[Dict] = None
37
  self.error: Optional[str] = None
38
+ self.cancelled = False # Track if request was cancelled
39
+ self.last_accessed = datetime.now() # Track when status was last checked
40
 
41
  class RequestQueue:
42
  def __init__(self, max_size: int = 15):
 
45
  self.completed_requests: Dict[str, QueuedRequest] = {}
46
  self.max_size = max_size
47
  self.lock = asyncio.Lock()
48
+ self.cleanup_interval = 300 # 5 minutes
49
+ self.max_completed_age = 600 # 10 minutes
50
 
51
  async def add_request(self, request_id: str, question: str) -> Dict:
52
  async with self.lock:
53
+ # Clean up old requests periodically
54
+ await self._cleanup_old_requests()
55
+
56
  if len(self.queue) >= self.max_size:
57
  return {
58
  "status": "queue_full",
 
98
  async def complete_request(self, request_id: str, result: Dict = None, error: str = None):
99
  async with self.lock:
100
  if self.processing and self.processing.request_id == request_id:
101
+ if self.processing.cancelled:
102
+ # Don't store results for cancelled requests
103
+ self.processing.status = "cancelled"
104
+ logger.info(f"Request {request_id} was cancelled, not storing result")
105
+ elif result:
106
  self.processing.result = result
107
  self.processing.status = "completed"
108
  elif error:
109
  self.processing.error = error
110
  self.processing.status = "failed"
111
 
112
+ # Store completed request for result retrieval (even cancelled ones briefly)
113
  self.completed_requests[request_id] = self.processing
114
  self.processing = None
115
+
116
+ async def cancel_request(self, request_id: str) -> bool:
117
+ """Cancel a request if it exists in queue or is processing"""
118
+ async with self.lock:
119
+ # Check if it's currently processing
120
+ if self.processing and self.processing.request_id == request_id:
121
+ self.processing.cancelled = True
122
+ logger.info(f"Marked processing request {request_id} as cancelled")
123
+ return True
124
+
125
+ # Check if it's in queue
126
+ for i, req in enumerate(self.queue):
127
+ if req.request_id == request_id:
128
+ cancelled_req = self.queue.pop(i)
129
+ cancelled_req.status = "cancelled"
130
+ cancelled_req.cancelled = True
131
+ self.completed_requests[request_id] = cancelled_req
132
+ logger.info(f"Cancelled queued request {request_id}")
133
+ return True
134
+
135
+ return False
136
 
137
  async def get_request_status(self, request_id: str) -> Optional[Dict]:
138
  async with self.lock:
139
+ # Update last accessed time for any request we're checking
140
+ current_time = datetime.now()
141
+
142
  # Check if currently processing
143
  if self.processing and self.processing.request_id == request_id:
144
+ self.processing.last_accessed = current_time
145
+ if self.processing.cancelled:
146
+ return {
147
+ "status": "cancelled",
148
+ "message": "Request was cancelled.",
149
+ "result": None,
150
+ "error": "Request cancelled by user"
151
+ }
152
  return {
153
  "status": self.processing.status,
154
  "message": "Your request is currently being processed.",
 
158
  # Check completed requests
159
  if request_id in self.completed_requests:
160
  req = self.completed_requests[request_id]
161
+ req.last_accessed = current_time
162
+
163
+ status_messages = {
164
+ "completed": "Request completed.",
165
+ "failed": "Request failed.",
166
+ "cancelled": "Request was cancelled.",
167
+ "timeout": "Request timed out."
168
+ }
169
+
170
  return {
171
  "status": req.status,
172
+ "message": status_messages.get(req.status, "Request processed."),
173
  "result": req.result,
174
  "error": req.error
175
  }
 
177
  # Check queue
178
  for i, req in enumerate(self.queue):
179
  if req.request_id == request_id:
180
+ req.last_accessed = current_time
181
  return {
182
  "status": "queued",
183
  "message": f"Your request is #{i+1} in queue.",
 
187
 
188
  return None
189
 
190
+ async def _cleanup_old_requests(self):
191
+ """Clean up old completed requests and abandoned requests"""
192
+ current_time = datetime.now()
193
+ cutoff_time = current_time - timedelta(seconds=self.max_completed_age)
194
+
195
+ # Clean up old completed requests
196
+ to_remove = []
197
+ for request_id, req in self.completed_requests.items():
198
+ if req.last_accessed < cutoff_time:
199
+ to_remove.append(request_id)
200
+
201
+ for request_id in to_remove:
202
+ del self.completed_requests[request_id]
203
+ logger.info(f"Cleaned up old request: {request_id}")
204
+
205
  async def get_queue_info(self) -> Dict:
206
  async with self.lock:
207
  return {
 
305
  if next_request:
306
  logger.info(f"Processing queued request: {next_request.request_id}")
307
  try:
308
+ # Check if request was cancelled before processing
309
+ if next_request.cancelled:
310
+ logger.info(f"Skipping cancelled request: {next_request.request_id}")
311
+ await request_queue.complete_request(
312
+ next_request.request_id,
313
+ error="Request was cancelled"
314
+ )
315
+ continue
316
+
317
  result = await process_chat_request(next_request.question, next_request.request_id)
318
+
319
+ # Check again if request was cancelled during processing
320
+ if next_request.cancelled:
321
+ logger.info(f"Request was cancelled during processing: {next_request.request_id}")
322
+ await request_queue.complete_request(
323
+ next_request.request_id,
324
+ error="Request was cancelled during processing"
325
+ )
326
+ else:
327
+ await request_queue.complete_request(next_request.request_id, result=result)
328
+ logger.info(f"Completed request: {next_request.request_id}")
329
+
330
  except Exception as e:
331
  error_msg = f"Error processing request: {str(e)}"
332
  logger.error(f"Failed to process request {next_request.request_id}: {e}", exc_info=True)
 
507
  @app.get("/status/{request_id}")
508
  async def get_request_status(request_id: str):
509
  """Check the status of a specific request"""
510
+ try:
511
+ status = await request_queue.get_request_status(request_id)
512
+ if not status:
513
+ raise HTTPException(status_code=404, detail="Request not found")
514
+
515
+ return {
516
+ "request_id": request_id,
517
+ **status
518
+ }
519
+ except Exception as e:
520
+ logger.error(f"Error checking status for {request_id}: {e}")
521
+ raise HTTPException(status_code=500, detail="Error checking request status")
522
+
523
+ @app.delete("/cancel/{request_id}")
524
+ async def cancel_request(request_id: str):
525
+ """Cancel a specific request"""
526
+ try:
527
+ cancelled = await request_queue.cancel_request(request_id)
528
+ if not cancelled:
529
+ raise HTTPException(status_code=404, detail="Request not found or cannot be cancelled")
530
+
531
+ return {
532
+ "status": "cancelled",
533
+ "message": f"Request {request_id} has been cancelled",
534
+ "request_id": request_id
535
+ }
536
+ except HTTPException:
537
+ raise
538
+ except Exception as e:
539
+ logger.error(f"Error cancelling request {request_id}: {e}")
540
+ raise HTTPException(status_code=500, detail="Error cancelling request")
541
 
542
  @app.get("/queue")
543
  async def get_queue_status():
 
546
 
547
  @app.post("/feedback")
548
  async def collect_feedback(feedback: Feedback, request: Request):
549
+ adapter = get_logger_adapter(request.state.request_id)
550
  feedback_log = {
551
  "type": "USER_FEEDBACK",
552
  "request_id": feedback.request_id,