Kalpokoch commited on
Commit
52aa2ef
·
verified ·
1 Parent(s): 73fdab6

Update app/app.py

Browse files
Files changed (1) hide show
  1. app/app.py +237 -35
app/app.py CHANGED
@@ -4,7 +4,9 @@ import asyncio
4
  import logging
5
  import uuid
6
  import re
7
- from fastapi import FastAPI, HTTPException, Request
 
 
8
  from pydantic import BaseModel
9
  from llama_cpp import Llama
10
  # Correctly reference the module within the 'app' package
@@ -21,6 +23,131 @@ class RequestIdAdapter(logging.LoggerAdapter):
21
 
22
  logger = logging.getLogger("app")
23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  # -----------------------------
25
  # ✅ Configuration
26
  # -----------------------------
@@ -31,12 +158,16 @@ LLM_TIMEOUT_SECONDS = int(os.getenv("LLM_TIMEOUT_SECONDS", "120"))
31
  RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.3"))
32
  TOP_K_SEARCH = int(os.getenv("TOP_K_SEARCH", "4"))
33
  TOP_K_CONTEXT = int(os.getenv("TOP_K_CONTEXT", "2"))
 
34
 
35
  # -----------------------------
36
  # ✅ Initialize FastAPI App
37
  # -----------------------------
38
  app = FastAPI(title="NEEPCO DoP RAG Chatbot", version="2.1.0")
39
 
 
 
 
40
  @app.middleware("http")
41
  async def add_request_id(request: Request, call_next):
42
  request_id = str(uuid.uuid4())
@@ -101,25 +232,40 @@ class Feedback(BaseModel):
101
  comment: str | None = None
102
 
103
  # -----------------------------
104
- # ✅ Endpoints
105
  # -----------------------------
106
- def get_logger_adapter(request: Request):
107
- return RequestIdAdapter(logger, {'request_id': getattr(request.state, 'request_id', 'N/A')})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
108
 
109
- @app.get("/")
110
- async def root():
111
- return {"status": "✅ Server is running."}
 
112
 
113
- @app.get("/health")
114
- async def health_check():
115
- status = {
116
- "status": "ok",
117
- "database_status": "ready" if db_ready else "error",
118
- "model_status": "ready" if model_ready else "error"
119
- }
120
- if not db_ready or not model_ready:
121
- raise HTTPException(status_code=503, detail=status)
122
- return status
123
 
124
  async def generate_llm_response(prompt: str, request_id: str):
125
  loop = asyncio.get_running_loop()
@@ -132,23 +278,23 @@ async def generate_llm_response(prompt: str, request_id: str):
132
  raise ValueError("Empty response from LLM")
133
  return answer
134
 
135
- @app.post("/chat")
136
- async def chat(query: Query, request: Request):
137
- adapter = get_logger_adapter(request)
138
- question_lower = query.question.strip().lower()
139
 
140
  # --- GREETING & INTRO HANDLING ---
141
  greeting_keywords = ["hello", "hi", "hey", "what can you do", "who are you"]
142
  if question_lower in greeting_keywords:
143
- adapter.info(f"Handling a greeting or introductory query: '{query.question}'")
144
  intro_message = (
145
  "Hello! I am an AI assistant specifically trained on NEEPCO's Delegation of Powers (DoP) policy document. "
146
  "My purpose is to help you find accurate information and answer questions based on this specific dataset. "
147
  "I am currently running on a CPU-based environment. How can I assist you with the DoP policy today?"
148
  )
149
  return {
150
- "request_id": getattr(request.state, 'request_id', 'N/A'),
151
- "question": query.question,
152
  "context_used": "NA - Greeting",
153
  "answer": intro_message
154
  }
@@ -157,15 +303,16 @@ async def chat(query: Query, request: Request):
157
  adapter.error("Service unavailable due to initialization failure.")
158
  raise HTTPException(status_code=503, detail="Service is not ready. Please check logs.")
159
 
160
- adapter.info(f"Received query: '{query.question}'")
161
 
162
  # 1. Search Vector DB
163
- search_results = db.search(query.question, top_k=TOP_K_SEARCH)
164
 
165
  if not search_results:
166
  adapter.warning("No relevant context found in vector DB.")
167
  return {
168
- "question": query.question,
 
169
  "context_used": "No relevant context found.",
170
  "answer": "Sorry, I could not find a relevant policy to answer that question. Please try rephrasing."
171
  }
@@ -181,7 +328,6 @@ async def chat(query: Query, request: Request):
181
  prompt = f"""<|system|>
182
  You are a precise and factual assistant for NEEPCO's Delegation of Powers (DoP) policy.
183
  Your task is to answer the user's question based ONLY on the provided context.
184
-
185
  - **Formatting Rule:** If the answer contains a list of items or steps, you **MUST** separate each item with a pipe symbol (`|`). For example: `First item|Second item|Third item`.
186
  - **Content Rule:** If the information is not in the provided context, you **MUST** reply with the exact phrase: "The provided policy context does not contain information on this topic."
187
  </s>
@@ -190,9 +336,8 @@ Your task is to answer the user's question based ONLY on the provided context.
190
  ```
191
  {context}
192
  ```
193
-
194
  ### Question:
195
- {query.question}
196
  </s>
197
  <|assistant|>
198
  ### Detailed Answer:
@@ -203,7 +348,7 @@ Your task is to answer the user's question based ONLY on the provided context.
203
  try:
204
  adapter.info("Sending prompt to LLM for generation...")
205
  raw_answer = await asyncio.wait_for(
206
- generate_llm_response(prompt, request.state.request_id),
207
  timeout=LLM_TIMEOUT_SECONDS
208
  )
209
  adapter.info(f"LLM generation successful. Raw response: {raw_answer[:250]}...")
@@ -229,14 +374,71 @@ Your task is to answer the user's question based ONLY on the provided context.
229
  adapter.error(f"An unexpected error occurred during LLM generation: {e}", exc_info=True)
230
  answer = "Sorry, an unexpected error occurred while generating a response."
231
 
232
- adapter.info(f"Final answer prepared. Returning to client.")
233
  return {
234
- "request_id": request.state.request_id,
235
- "question": query.question,
236
  "context_used": context,
237
  "answer": answer
238
  }
239
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240
  @app.post("/feedback")
241
  async def collect_feedback(feedback: Feedback, request: Request):
242
  adapter = get_logger_adapter(request)
@@ -250,4 +452,4 @@ async def collect_feedback(feedback: Feedback, request: Request):
250
  "comment": feedback.comment
251
  }
252
  adapter.info(json.dumps(feedback_log))
253
- return {"status": "✅ Feedback recorded. Thank you!"}
 
4
  import logging
5
  import uuid
6
  import re
7
+ from typing import Dict, List, Optional
8
+ from datetime import datetime
9
+ from fastapi import FastAPI, HTTPException, Request, BackgroundTasks
10
  from pydantic import BaseModel
11
  from llama_cpp import Llama
12
  # Correctly reference the module within the 'app' package
 
23
 
24
  logger = logging.getLogger("app")
25
 
26
+ # -----------------------------
27
+ # ✅ Queue Management Classes
28
+ # -----------------------------
29
+ class QueuedRequest:
30
+ def __init__(self, request_id: str, question: str, timestamp: datetime):
31
+ self.request_id = request_id
32
+ self.question = question
33
+ self.timestamp = timestamp
34
+ self.status = "queued" # queued, processing, completed, failed, timeout
35
+ self.result: Optional[Dict] = None
36
+ self.error: Optional[str] = None
37
+
38
+ class RequestQueue:
39
+ def __init__(self, max_size: int = 15):
40
+ self.queue: List[QueuedRequest] = []
41
+ self.processing: Optional[QueuedRequest] = None
42
+ self.completed_requests: Dict[str, QueuedRequest] = {}
43
+ self.max_size = max_size
44
+ self.lock = asyncio.Lock()
45
+
46
+ async def add_request(self, request_id: str, question: str) -> Dict:
47
+ async with self.lock:
48
+ if len(self.queue) >= self.max_size:
49
+ return {
50
+ "status": "queue_full",
51
+ "message": f"Queue is full (max {self.max_size} requests). Please try again later.",
52
+ "queue_position": None,
53
+ "estimated_wait_time": None
54
+ }
55
+
56
+ queued_request = QueuedRequest(request_id, question, datetime.now())
57
+
58
+ # If nothing is processing and queue is empty, this request can be processed immediately
59
+ if not self.processing and not self.queue:
60
+ self.processing = queued_request
61
+ queued_request.status = "processing"
62
+ return {
63
+ "status": "processing",
64
+ "message": "Your request is being processed now.",
65
+ "queue_position": 0,
66
+ "estimated_wait_time": "0-2 minutes"
67
+ }
68
+
69
+ # Add to queue
70
+ self.queue.append(queued_request)
71
+ position = len(self.queue)
72
+ estimated_wait = f"{position * 2}-{(position + 1) * 2} minutes"
73
+
74
+ return {
75
+ "status": "queued",
76
+ "message": f"Using free CPU tier - can only process one request at a time. Your request is #{position} in queue and will be processed after current requests are completed.",
77
+ "queue_position": position,
78
+ "estimated_wait_time": estimated_wait
79
+ }
80
+
81
+ async def get_next_request(self) -> Optional[QueuedRequest]:
82
+ async with self.lock:
83
+ if self.queue:
84
+ next_request = self.queue.pop(0)
85
+ self.processing = next_request
86
+ next_request.status = "processing"
87
+ return next_request
88
+ return None
89
+
90
+ async def complete_request(self, request_id: str, result: Dict = None, error: str = None):
91
+ async with self.lock:
92
+ if self.processing and self.processing.request_id == request_id:
93
+ if result:
94
+ self.processing.result = result
95
+ self.processing.status = "completed"
96
+ elif error:
97
+ self.processing.error = error
98
+ self.processing.status = "failed"
99
+
100
+ # Store completed request for result retrieval
101
+ self.completed_requests[request_id] = self.processing
102
+ self.processing = None
103
+
104
+ # Clean up old completed requests (keep only last 50)
105
+ if len(self.completed_requests) > 50:
106
+ oldest_keys = list(self.completed_requests.keys())[:-50]
107
+ for key in oldest_keys:
108
+ del self.completed_requests[key]
109
+
110
+ async def get_request_status(self, request_id: str) -> Optional[Dict]:
111
+ async with self.lock:
112
+ # Check if currently processing
113
+ if self.processing and self.processing.request_id == request_id:
114
+ return {
115
+ "status": self.processing.status,
116
+ "message": "Your request is currently being processed.",
117
+ "result": self.processing.result
118
+ }
119
+
120
+ # Check completed requests
121
+ if request_id in self.completed_requests:
122
+ req = self.completed_requests[request_id]
123
+ return {
124
+ "status": req.status,
125
+ "message": "Request completed." if req.status == "completed" else "Request failed.",
126
+ "result": req.result,
127
+ "error": req.error
128
+ }
129
+
130
+ # Check queue
131
+ for i, req in enumerate(self.queue):
132
+ if req.request_id == request_id:
133
+ return {
134
+ "status": "queued",
135
+ "message": f"Your request is #{i+1} in queue.",
136
+ "queue_position": i + 1,
137
+ "estimated_wait_time": f"{(i+1) * 2}-{(i+2) * 2} minutes"
138
+ }
139
+
140
+ return None
141
+
142
+ async def get_queue_info(self) -> Dict:
143
+ async with self.lock:
144
+ return {
145
+ "queue_length": len(self.queue),
146
+ "currently_processing": self.processing.request_id if self.processing else None,
147
+ "max_queue_size": self.max_size,
148
+ "completed_requests_count": len(self.completed_requests)
149
+ }
150
+
151
  # -----------------------------
152
  # ✅ Configuration
153
  # -----------------------------
 
158
  RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.3"))
159
  TOP_K_SEARCH = int(os.getenv("TOP_K_SEARCH", "4"))
160
  TOP_K_CONTEXT = int(os.getenv("TOP_K_CONTEXT", "2"))
161
+ MAX_QUEUE_SIZE = int(os.getenv("MAX_QUEUE_SIZE", "15"))
162
 
163
  # -----------------------------
164
  # ✅ Initialize FastAPI App
165
  # -----------------------------
166
  app = FastAPI(title="NEEPCO DoP RAG Chatbot", version="2.1.0")
167
 
168
+ # Initialize request queue
169
+ request_queue = RequestQueue(max_size=MAX_QUEUE_SIZE)
170
+
171
  @app.middleware("http")
172
  async def add_request_id(request: Request, call_next):
173
  request_id = str(uuid.uuid4())
 
232
  comment: str | None = None
233
 
234
  # -----------------------------
235
+ # ✅ Background Processing
236
  # -----------------------------
237
+ async def process_queued_requests():
238
+ """Background task to process queued requests"""
239
+ while True:
240
+ try:
241
+ next_request = await request_queue.get_next_request()
242
+ if next_request:
243
+ logger.info(f"Processing queued request: {next_request.request_id}")
244
+ try:
245
+ result = await process_chat_request(next_request.question, next_request.request_id)
246
+ await request_queue.complete_request(next_request.request_id, result=result)
247
+ logger.info(f"Completed request: {next_request.request_id}")
248
+ except Exception as e:
249
+ error_msg = f"Error processing request: {str(e)}"
250
+ logger.error(f"Failed to process request {next_request.request_id}: {e}", exc_info=True)
251
+ await request_queue.complete_request(next_request.request_id, error=error_msg)
252
+ else:
253
+ # No requests in queue, wait a bit before checking again
254
+ await asyncio.sleep(2)
255
+ except Exception as e:
256
+ logger.error(f"Error in background processor: {e}", exc_info=True)
257
+ await asyncio.sleep(5)
258
 
259
+ # Start background processor
260
+ @app.on_event("startup")
261
+ async def startup_event():
262
+ asyncio.create_task(process_queued_requests())
263
 
264
+ # -----------------------------
265
+ # Core Processing Function
266
+ # -----------------------------
267
+ def get_logger_adapter(request_id: str):
268
+ return RequestIdAdapter(logger, {'request_id': request_id})
 
 
 
 
 
269
 
270
  async def generate_llm_response(prompt: str, request_id: str):
271
  loop = asyncio.get_running_loop()
 
278
  raise ValueError("Empty response from LLM")
279
  return answer
280
 
281
+ async def process_chat_request(question: str, request_id: str) -> Dict:
282
+ """Core chat processing logic extracted for reuse"""
283
+ adapter = get_logger_adapter(request_id)
284
+ question_lower = question.strip().lower()
285
 
286
  # --- GREETING & INTRO HANDLING ---
287
  greeting_keywords = ["hello", "hi", "hey", "what can you do", "who are you"]
288
  if question_lower in greeting_keywords:
289
+ adapter.info(f"Handling a greeting or introductory query: '{question}'")
290
  intro_message = (
291
  "Hello! I am an AI assistant specifically trained on NEEPCO's Delegation of Powers (DoP) policy document. "
292
  "My purpose is to help you find accurate information and answer questions based on this specific dataset. "
293
  "I am currently running on a CPU-based environment. How can I assist you with the DoP policy today?"
294
  )
295
  return {
296
+ "request_id": request_id,
297
+ "question": question,
298
  "context_used": "NA - Greeting",
299
  "answer": intro_message
300
  }
 
303
  adapter.error("Service unavailable due to initialization failure.")
304
  raise HTTPException(status_code=503, detail="Service is not ready. Please check logs.")
305
 
306
+ adapter.info(f"Received query: '{question}'")
307
 
308
  # 1. Search Vector DB
309
+ search_results = db.search(question, top_k=TOP_K_SEARCH)
310
 
311
  if not search_results:
312
  adapter.warning("No relevant context found in vector DB.")
313
  return {
314
+ "request_id": request_id,
315
+ "question": question,
316
  "context_used": "No relevant context found.",
317
  "answer": "Sorry, I could not find a relevant policy to answer that question. Please try rephrasing."
318
  }
 
328
  prompt = f"""<|system|>
329
  You are a precise and factual assistant for NEEPCO's Delegation of Powers (DoP) policy.
330
  Your task is to answer the user's question based ONLY on the provided context.
 
331
  - **Formatting Rule:** If the answer contains a list of items or steps, you **MUST** separate each item with a pipe symbol (`|`). For example: `First item|Second item|Third item`.
332
  - **Content Rule:** If the information is not in the provided context, you **MUST** reply with the exact phrase: "The provided policy context does not contain information on this topic."
333
  </s>
 
336
  ```
337
  {context}
338
  ```
 
339
  ### Question:
340
+ {question}
341
  </s>
342
  <|assistant|>
343
  ### Detailed Answer:
 
348
  try:
349
  adapter.info("Sending prompt to LLM for generation...")
350
  raw_answer = await asyncio.wait_for(
351
+ generate_llm_response(prompt, request_id),
352
  timeout=LLM_TIMEOUT_SECONDS
353
  )
354
  adapter.info(f"LLM generation successful. Raw response: {raw_answer[:250]}...")
 
374
  adapter.error(f"An unexpected error occurred during LLM generation: {e}", exc_info=True)
375
  answer = "Sorry, an unexpected error occurred while generating a response."
376
 
377
+ adapter.info(f"Final answer prepared. Returning result.")
378
  return {
379
+ "request_id": request_id,
380
+ "question": question,
381
  "context_used": context,
382
  "answer": answer
383
  }
384
 
385
+ # -----------------------------
386
+ # ✅ Endpoints
387
+ # -----------------------------
388
+ @app.get("/")
389
+ async def root():
390
+ return {"status": "✅ Server is running."}
391
+
392
+ @app.get("/health")
393
+ async def health_check():
394
+ queue_info = await request_queue.get_queue_info()
395
+ status = {
396
+ "status": "ok",
397
+ "database_status": "ready" if db_ready else "error",
398
+ "model_status": "ready" if model_ready else "error",
399
+ "queue_info": queue_info
400
+ }
401
+ if not db_ready or not model_ready:
402
+ raise HTTPException(status_code=503, detail=status)
403
+ return status
404
+
405
+ @app.post("/chat")
406
+ async def chat(query: Query, request: Request):
407
+ """Add request to queue and return queue status"""
408
+ if not db_ready or not model_ready:
409
+ raise HTTPException(status_code=503, detail="Service is not ready. Please check logs.")
410
+
411
+ request_id = request.state.request_id
412
+ adapter = get_logger_adapter(request_id)
413
+
414
+ adapter.info(f"Received chat request: '{query.question}'")
415
+
416
+ # Add request to queue
417
+ queue_status = await request_queue.add_request(request_id, query.question)
418
+
419
+ return {
420
+ "request_id": request_id,
421
+ "question": query.question,
422
+ **queue_status
423
+ }
424
+
425
+ @app.get("/status/{request_id}")
426
+ async def get_request_status(request_id: str):
427
+ """Check the status of a specific request"""
428
+ status = await request_queue.get_request_status(request_id)
429
+ if not status:
430
+ raise HTTPException(status_code=404, detail="Request not found")
431
+
432
+ return {
433
+ "request_id": request_id,
434
+ **status
435
+ }
436
+
437
+ @app.get("/queue")
438
+ async def get_queue_status():
439
+ """Get current queue information"""
440
+ return await request_queue.get_queue_info()
441
+
442
  @app.post("/feedback")
443
  async def collect_feedback(feedback: Feedback, request: Request):
444
  adapter = get_logger_adapter(request)
 
452
  "comment": feedback.comment
453
  }
454
  adapter.info(json.dumps(feedback_log))
455
+ return {"status": "✅ Feedback recorded. Thank you!"}