tecuts commited on
Commit
9601d52
·
verified ·
1 Parent(s): 13046df

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +83 -284
app.py CHANGED
@@ -2,7 +2,6 @@ import os
2
  import json
3
  import asyncio
4
  import requests
5
- import re
6
  from datetime import datetime
7
  from typing import List, Dict, Optional
8
  from fastapi import FastAPI, Request, HTTPException, Depends
@@ -49,131 +48,80 @@ GOOGLE_CX = os.getenv("GOOGLE_CX")
49
  LLM_API_KEY = os.getenv("LLM_API_KEY")
50
  LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://api-15i2e8ze256bvfn6.aistudio-app.com/v1")
51
 
52
- # --- Enhanced System Prompts ---
53
- SYSTEM_PROMPT_WITH_SEARCH = """You are an intelligent AI assistant with access to real-time web search capabilities.
54
-
55
- IMPORTANT: When you need current information, recent events, or specific facts that might be outdated, you should explicitly request a search by including the phrase "SEARCH_NEEDED:" followed by your search query in your response.
56
-
57
- For example:
58
- - If asked about recent news: "SEARCH_NEEDED: latest news about [topic]"
59
- - If asked about current events: "SEARCH_NEEDED: current status of [event]"
60
- - If asked about recent developments: "SEARCH_NEEDED: recent developments in [field]"
61
-
62
- **Response Guidelines:**
63
- 1. Use search for queries that need current, recent, or specific factual information
64
- 2. Be proactive in identifying when search is needed
65
- 3. Synthesize information from multiple sources when search results are provided
66
- 4. Clearly indicate when information comes from search results
67
- 5. Provide comprehensive, well-structured answers
68
- 6. Cite sources appropriately
69
-
70
  Current date: {current_date}"""
71
 
72
  SYSTEM_PROMPT_NO_SEARCH = """You are an intelligent AI assistant. Provide helpful, accurate, and comprehensive responses based on your training data.
73
  Current date: {current_date}"""
74
 
75
- # --- Optimized Web Search Tool ---
76
- async def google_search_tool_async(query: str, num_results: int = 3) -> List[Dict]:
77
- """
78
- Async Google Custom Search - reduced results for faster response
79
- """
80
  if not GOOGLE_API_KEY or not GOOGLE_CX or not query.strip():
81
  return []
82
 
83
- logger.info(f"Executing search for: '{query}'")
84
 
85
- search_url = "https://www.googleapis.com/customsearch/v1"
86
  params = {
87
  "key": GOOGLE_API_KEY,
88
  "cx": GOOGLE_CX,
89
  "q": query.strip(),
90
- "num": min(num_results, 5), # Reduced for speed
91
- "dateRestrict": "m3" # Last 3 months for freshness
92
  }
93
 
94
  try:
95
- # Run in thread pool to avoid blocking
96
  loop = asyncio.get_event_loop()
97
  response = await loop.run_in_executor(
98
  None,
99
- lambda: requests.get(search_url, params=params, timeout=10)
 
 
 
 
100
  )
101
  response.raise_for_status()
102
- search_results = response.json()
103
-
104
- if "items" not in search_results:
105
- return []
106
 
107
- parsed_results = []
108
- for item in search_results.get("items", [])[:num_results]: # Limit results
109
  title = item.get("title", "").strip()
110
  url = item.get("link", "").strip()
111
  snippet = item.get("snippet", "").strip()
112
 
113
  if title and url and snippet:
114
- parsed_results.append({
115
- "source_title": title,
116
  "url": url,
117
  "snippet": snippet,
118
  "domain": url.split('/')[2] if '/' in url else url
119
  })
120
-
121
- logger.info(f"Retrieved {len(parsed_results)} search results")
122
- return parsed_results
123
 
124
  except Exception as e:
125
- logger.error(f"Search error: {e}")
126
  return []
127
 
128
- def format_search_results_compact(search_results: List[Dict]) -> str:
129
- """Compact formatting for faster processing"""
130
- if not search_results:
131
- return "No search results found."
132
-
133
- formatted = ["Search Results:"]
134
- for i, result in enumerate(search_results, 1):
135
- formatted.append(f"\n{i}. {result['source_title']}")
136
- formatted.append(f" Source: {result['domain']}")
137
- formatted.append(f" Content: {result['snippet']}")
138
-
139
- return "\n".join(formatted)
140
-
141
- # --- Check if query needs search ---
142
- def should_search(query: str, use_search: bool) -> Optional[str]:
143
- """Determine if a query needs search and extract search terms"""
144
- if not use_search:
145
- return None
146
-
147
- # Keywords that typically require current information
148
- current_keywords = [
149
- 'today', 'recent', 'latest', 'current', 'now', 'this year', '2024', '2025',
150
- 'news', 'happening', 'update', 'development', 'status', 'price', 'stock',
151
- 'weather', 'score', 'result', 'election', 'covid', 'pandemic'
152
- ]
153
-
154
- query_lower = query.lower()
155
-
156
- # Check for current-info keywords
157
- if any(keyword in query_lower for keyword in current_keywords):
158
- return query
159
 
160
- # Check for questions about specific companies, products, or events
161
- question_patterns = [
162
- r'what.*happened.*',
163
- r'when.*did.*',
164
- r'how.*is.*doing',
165
- r'what.*the.*status',
166
- r'is.*still.*',
167
- r'has.*been.*',
168
- ]
169
-
170
- if any(re.search(pattern, query_lower) for pattern in question_patterns):
171
- return query
172
 
173
- return None
174
 
175
  # --- FastAPI Application Setup ---
176
- app = FastAPI(title="Streaming AI Chatbot", version="2.1.0")
177
 
178
  app.add_middleware(
179
  CORSMiddleware,
@@ -194,227 +142,71 @@ if not LLM_API_KEY or not LLM_BASE_URL:
194
  client = None
195
  else:
196
  client = OpenAI(api_key=LLM_API_KEY, base_url=LLM_BASE_URL)
197
- logger.info("OpenAI client initialized successfully")
198
 
199
- # --- Tool Definition (keeping for potential future use) ---
200
- available_tools = [
201
- {
202
- "type": "function",
203
- "function": {
204
- "name": "google_search",
205
- "description": "Search Google for current information, recent events, or specific facts.",
206
- "parameters": {
207
- "type": "object",
208
- "properties": {
209
- "query": {
210
- "type": "string",
211
- "description": "Search query with relevant keywords"
212
- }
213
- },
214
- "required": ["query"]
215
- }
216
- }
217
- }
218
- ]
219
-
220
- # --- Enhanced Streaming Response Generator ---
221
  async def generate_streaming_response(messages: List[Dict], use_search: bool, temperature: float, original_query: str):
222
- """Generate streaming response with intelligent search triggering"""
223
 
224
  try:
225
  source_links = []
226
- search_performed = False
227
 
228
- # Check if we should proactively search
229
- proactive_search_query = should_search(original_query, use_search)
230
- if proactive_search_query:
231
- logger.info(f"Proactive search triggered for: {proactive_search_query}")
232
- yield f"data: {json.dumps({'type': 'status', 'data': 'Searching for current information...'})}\n\n"
 
233
 
234
- search_results = await google_search_tool_async(proactive_search_query, 4)
235
  if search_results:
236
- search_context = format_search_results_compact(search_results)
 
 
 
 
 
 
 
 
237
 
238
  # Add search context to messages
239
- enhanced_messages = messages + [{
240
  "role": "system",
241
- "content": f"Recent search results for your reference:\n\n{search_context}\n\nPlease use this information to provide a comprehensive and up-to-date response."
242
  }]
243
 
244
- for result in search_results:
245
- source_links.append({
246
- "title": result["source_title"],
247
- "url": result["url"],
248
- "domain": result["domain"]
249
- })
250
-
251
- search_performed = True
252
- messages = enhanced_messages
253
 
254
- # Initial LLM call with streaming
255
  llm_kwargs = {
256
- "model": "unsloth/Qwen3-30B-A3B-GGUF",
257
  "temperature": temperature,
258
  "messages": messages,
259
- "max_tokens": 2000,
260
- "stream": True
 
261
  }
262
 
263
- # Try function calling as backup (in case model supports it)
264
- if use_search and not search_performed:
265
- llm_kwargs["tools"] = available_tools
266
- llm_kwargs["tool_choice"] = "auto"
267
-
268
- response_content = ""
269
- tool_calls_data = []
270
-
271
- yield f"data: {json.dumps({'type': 'status', 'data': 'Generating response...'})}\n\n"
272
-
273
  # Stream the response
274
  stream = client.chat.completions.create(**llm_kwargs)
275
 
276
  for chunk in stream:
277
- delta = chunk.choices[0].delta
278
-
279
- # Handle content streaming
280
- if delta.content:
281
- content_chunk = delta.content
282
- response_content += content_chunk
283
-
284
- # Check for search requests in the content
285
- if use_search and not search_performed and "SEARCH_NEEDED:" in content_chunk:
286
- # Extract search query from the content
287
- search_match = re.search(r'SEARCH_NEEDED:\s*(.+?)(?:\n|$)', content_chunk)
288
- if search_match:
289
- search_query = search_match.group(1).strip()
290
- logger.info(f"Search requested by model: {search_query}")
291
-
292
- # Don't yield this chunk yet, we'll search first
293
- continue
294
-
295
- yield f"data: {json.dumps({'type': 'content', 'data': content_chunk})}\n\n"
296
-
297
- # Handle tool calls (backup method)
298
- if delta.tool_calls:
299
- for tool_call in delta.tool_calls:
300
- if len(tool_calls_data) <= tool_call.index:
301
- tool_calls_data.extend([{"id": "", "function": {"name": "", "arguments": ""}}
302
- for _ in range(tool_call.index + 1 - len(tool_calls_data))])
303
-
304
- if tool_call.id:
305
- tool_calls_data[tool_call.index]["id"] = tool_call.id
306
- if tool_call.function.name:
307
- tool_calls_data[tool_call.index]["function"]["name"] = tool_call.function.name
308
- if tool_call.function.arguments:
309
- tool_calls_data[tool_call.index]["function"]["arguments"] += tool_call.function.arguments
310
 
311
- # Handle model-requested search
312
- if use_search and not search_performed and "SEARCH_NEEDED:" in response_content:
313
- search_matches = re.findall(r'SEARCH_NEEDED:\s*(.+?)(?:\n|$)', response_content)
314
- if search_matches:
315
- yield f"data: {json.dumps({'type': 'status', 'data': 'Performing requested search...'})}\n\n"
316
-
317
- # Execute all requested searches
318
- search_tasks = [google_search_tool_async(query.strip()) for query in search_matches]
319
- search_results_list = await asyncio.gather(*search_tasks, return_exceptions=True)
320
-
321
- all_results = []
322
- for results in search_results_list:
323
- if isinstance(results, list):
324
- all_results.extend(results)
325
-
326
- if all_results:
327
- search_context = format_search_results_compact(all_results)
328
-
329
- for result in all_results:
330
- source_links.append({
331
- "title": result["source_title"],
332
- "url": result["url"],
333
- "domain": result["domain"]
334
- })
335
-
336
- # Generate new response with search results
337
- search_messages = messages + [{
338
- "role": "system",
339
- "content": f"Search Results:\n\n{search_context}\n\nPlease provide a comprehensive response based on these search results."
340
- }]
341
-
342
- final_stream = client.chat.completions.create(
343
- model="unsloth/Qwen3-30B-A3B-GGUF",
344
- temperature=temperature,
345
- messages=search_messages,
346
- max_tokens=2000,
347
- stream=True
348
- )
349
-
350
- for chunk in final_stream:
351
- if chunk.choices[0].delta.content:
352
- content = chunk.choices[0].delta.content
353
- yield f"data: {json.dumps({'type': 'content', 'data': content})}\n\n"
354
-
355
- search_performed = True
356
-
357
- # Process function-based tool calls (backup method)
358
- elif tool_calls_data and any(tc["function"]["name"] for tc in tool_calls_data):
359
- yield f"data: {json.dumps({'type': 'status', 'data': 'Executing search tools...'})}\n\n"
360
-
361
- search_tasks = []
362
- for tool_call in tool_calls_data:
363
- if tool_call["function"]["name"] == "google_search":
364
- try:
365
- args = json.loads(tool_call["function"]["arguments"])
366
- query = args.get("query", "").strip()
367
- if query:
368
- search_tasks.append(google_search_tool_async(query))
369
- logger.info(f"Function call search: {query}")
370
- except json.JSONDecodeError:
371
- continue
372
-
373
- if search_tasks:
374
- search_results_list = await asyncio.gather(*search_tasks, return_exceptions=True)
375
-
376
- all_results = []
377
- for results in search_results_list:
378
- if isinstance(results, list):
379
- all_results.extend(results)
380
- for result in results:
381
- source_links.append({
382
- "title": result["source_title"],
383
- "url": result["url"],
384
- "domain": result["domain"]
385
- })
386
-
387
- if all_results:
388
- search_context = format_search_results_compact(all_results)
389
-
390
- search_messages = messages + [{
391
- "role": "system",
392
- "content": f"{search_context}\n\nPlease provide a comprehensive response based on the search results above."
393
- }]
394
-
395
- final_stream = client.chat.completions.create(
396
- model="unsloth/Qwen3-30B-A3B-GGUF",
397
- temperature=temperature,
398
- messages=search_messages,
399
- max_tokens=2000,
400
- stream=True
401
- )
402
-
403
- for chunk in final_stream:
404
- if chunk.choices[0].delta.content:
405
- content = chunk.choices[0].delta.content
406
- yield f"data: {json.dumps({'type': 'content', 'data': content})}\n\n"
407
-
408
- search_performed = True
409
-
410
- # Send sources and completion
411
  if source_links:
412
  yield f"data: {json.dumps({'type': 'sources', 'data': source_links})}\n\n"
413
 
414
- yield f"data: {json.dumps({'type': 'done', 'data': {'search_used': search_performed}})}\n\n"
 
415
 
416
  except Exception as e:
417
- logger.error(f"Streaming error: {e}")
418
  yield f"data: {json.dumps({'type': 'error', 'data': str(e)})}\n\n"
419
 
420
  # --- Streaming Chat Endpoint ---
@@ -427,7 +219,7 @@ async def chat_stream_endpoint(request: Request, _: None = Depends(verify_origin
427
  data = await request.json()
428
  user_message = data.get("message", "").strip()
429
  use_search = data.get("use_search", False)
430
- temperature = max(0, min(2, data.get("temperature", 0.7)))
431
  conversation_history = data.get("history", [])
432
 
433
  if not user_message:
@@ -438,7 +230,7 @@ async def chat_stream_endpoint(request: Request, _: None = Depends(verify_origin
438
  system_content = (SYSTEM_PROMPT_WITH_SEARCH if use_search else SYSTEM_PROMPT_NO_SEARCH).format(current_date=current_date)
439
  messages = [{"role": "system", "content": system_content}] + conversation_history + [{"role": "user", "content": user_message}]
440
 
441
- logger.info(f"Stream request - search: {use_search}, temp: {temperature}, query: {user_message[:50]}...")
442
 
443
  return StreamingResponse(
444
  generate_streaming_response(messages, use_search, temperature, user_message),
@@ -446,12 +238,19 @@ async def chat_stream_endpoint(request: Request, _: None = Depends(verify_origin
446
  headers={
447
  "Cache-Control": "no-cache",
448
  "Connection": "keep-alive",
449
- "X-Accel-Buffering": "no"
 
450
  }
451
  )
452
 
453
  except json.JSONDecodeError:
454
  raise HTTPException(status_code=400, detail="Invalid JSON")
455
  except Exception as e:
456
- logger.error(f"Stream endpoint error: {e}")
457
- raise HTTPException(status_code=500, detail=str(e))
 
 
 
 
 
 
 
2
  import json
3
  import asyncio
4
  import requests
 
5
  from datetime import datetime
6
  from typing import List, Dict, Optional
7
  from fastapi import FastAPI, Request, HTTPException, Depends
 
48
  LLM_API_KEY = os.getenv("LLM_API_KEY")
49
  LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://api-15i2e8ze256bvfn6.aistudio-app.com/v1")
50
 
51
+ # --- Simplified System Prompts ---
52
+ SYSTEM_PROMPT_WITH_SEARCH = """You are an intelligent AI assistant with access to current web search results.
53
+ Use the provided search results to give accurate, up-to-date responses.
54
+ Always reference and cite the search results when relevant.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  Current date: {current_date}"""
56
 
57
  SYSTEM_PROMPT_NO_SEARCH = """You are an intelligent AI assistant. Provide helpful, accurate, and comprehensive responses based on your training data.
58
  Current date: {current_date}"""
59
 
60
+ # --- Fast Web Search Tool ---
61
+ async def fast_google_search(query: str, num_results: int = 4) -> List[Dict]:
62
+ """Fast Google Custom Search with minimal processing"""
 
 
63
  if not GOOGLE_API_KEY or not GOOGLE_CX or not query.strip():
64
  return []
65
 
66
+ logger.info(f"Searching: '{query}'")
67
 
 
68
  params = {
69
  "key": GOOGLE_API_KEY,
70
  "cx": GOOGLE_CX,
71
  "q": query.strip(),
72
+ "num": num_results,
73
+ "dateRestrict": "m6" # Last 6 months
74
  }
75
 
76
  try:
 
77
  loop = asyncio.get_event_loop()
78
  response = await loop.run_in_executor(
79
  None,
80
+ lambda: requests.get(
81
+ "https://www.googleapis.com/customsearch/v1",
82
+ params=params,
83
+ timeout=12 # Faster timeout
84
+ )
85
  )
86
  response.raise_for_status()
87
+ data = response.json()
 
 
 
88
 
89
+ results = []
90
+ for item in data.get("items", [])[:num_results]:
91
  title = item.get("title", "").strip()
92
  url = item.get("link", "").strip()
93
  snippet = item.get("snippet", "").strip()
94
 
95
  if title and url and snippet:
96
+ results.append({
97
+ "title": title,
98
  "url": url,
99
  "snippet": snippet,
100
  "domain": url.split('/')[2] if '/' in url else url
101
  })
102
+
103
+ logger.info(f"Found {len(results)} results")
104
+ return results
105
 
106
  except Exception as e:
107
+ logger.error(f"Search failed: {e}")
108
  return []
109
 
110
+ def format_search_context(results: List[Dict]) -> str:
111
+ """Fast search result formatting"""
112
+ if not results:
113
+ return "No search results available."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
 
115
+ context = ["=== SEARCH RESULTS ==="]
116
+ for i, result in enumerate(results, 1):
117
+ context.append(f"\n[{i}] {result['title']}")
118
+ context.append(f"Source: {result['domain']}")
119
+ context.append(f"Content: {result['snippet']}")
 
 
 
 
 
 
 
120
 
121
+ return "\n".join(context)
122
 
123
  # --- FastAPI Application Setup ---
124
+ app = FastAPI(title="Streaming AI Chatbot", version="2.2.0")
125
 
126
  app.add_middleware(
127
  CORSMiddleware,
 
142
  client = None
143
  else:
144
  client = OpenAI(api_key=LLM_API_KEY, base_url=LLM_BASE_URL)
145
+ logger.info("OpenAI client initialized")
146
 
147
+ # --- Optimized Streaming Response Generator ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
  async def generate_streaming_response(messages: List[Dict], use_search: bool, temperature: float, original_query: str):
149
+ """Fast streaming response with optional search"""
150
 
151
  try:
152
  source_links = []
 
153
 
154
+ # ALWAYS search when use_search is True
155
+ if use_search:
156
+ yield f"data: {json.dumps({'type': 'status', 'data': 'Searching...'})}\n\n"
157
+
158
+ # Fast search execution
159
+ search_results = await fast_google_search(original_query, 4)
160
 
 
161
  if search_results:
162
+ # Format search context
163
+ search_context = format_search_context(search_results)
164
+
165
+ # Prepare source links for frontend
166
+ source_links = [{
167
+ "title": result["title"],
168
+ "url": result["url"],
169
+ "domain": result["domain"]
170
+ } for result in search_results]
171
 
172
  # Add search context to messages
173
+ messages = messages + [{
174
  "role": "system",
175
+ "content": f"{search_context}\n\nBased on the search results above, provide a comprehensive and accurate response."
176
  }]
177
 
178
+ logger.info(f"Added {len(search_results)} search results to context")
179
+
180
+ # Generate response
181
+ yield f"data: {json.dumps({'type': 'status', 'data': 'Generating response...'})}\n\n"
 
 
 
 
 
182
 
183
+ # Optimized LLM parameters for speed
184
  llm_kwargs = {
185
+ "model": "unsloth/Qwen3-30B-A3B-GGUF",
186
  "temperature": temperature,
187
  "messages": messages,
188
+ "max_tokens": 2500, # Reduced for faster response
189
+ "stream": True,
190
+ "top_p": 0.9, # Optimize sampling
191
  }
192
 
 
 
 
 
 
 
 
 
 
 
193
  # Stream the response
194
  stream = client.chat.completions.create(**llm_kwargs)
195
 
196
  for chunk in stream:
197
+ if chunk.choices[0].delta.content:
198
+ content = chunk.choices[0].delta.content
199
+ yield f"data: {json.dumps({'type': 'content', 'data': content})}\n\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
200
 
201
+ # Send sources if available
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
202
  if source_links:
203
  yield f"data: {json.dumps({'type': 'sources', 'data': source_links})}\n\n"
204
 
205
+ # Send completion
206
+ yield f"data: {json.dumps({'type': 'done', 'data': {'search_used': use_search and bool(source_links)}})}\n\n"
207
 
208
  except Exception as e:
209
+ logger.error(f"Response generation failed: {e}")
210
  yield f"data: {json.dumps({'type': 'error', 'data': str(e)})}\n\n"
211
 
212
  # --- Streaming Chat Endpoint ---
 
219
  data = await request.json()
220
  user_message = data.get("message", "").strip()
221
  use_search = data.get("use_search", False)
222
+ temperature = max(0.1, min(1.5, data.get("temperature", 0.7))) # Optimized range
223
  conversation_history = data.get("history", [])
224
 
225
  if not user_message:
 
230
  system_content = (SYSTEM_PROMPT_WITH_SEARCH if use_search else SYSTEM_PROMPT_NO_SEARCH).format(current_date=current_date)
231
  messages = [{"role": "system", "content": system_content}] + conversation_history + [{"role": "user", "content": user_message}]
232
 
233
+ logger.info(f"Request: search={use_search}, temp={temperature}")
234
 
235
  return StreamingResponse(
236
  generate_streaming_response(messages, use_search, temperature, user_message),
 
238
  headers={
239
  "Cache-Control": "no-cache",
240
  "Connection": "keep-alive",
241
+ "X-Accel-Buffering": "no",
242
+ "Access-Control-Allow-Origin": "*" # For faster preflight
243
  }
244
  )
245
 
246
  except json.JSONDecodeError:
247
  raise HTTPException(status_code=400, detail="Invalid JSON")
248
  except Exception as e:
249
+ logger.error(f"Endpoint error: {e}")
250
+ raise HTTPException(status_code=500, detail=str(e))
251
+
252
+ # --- Health Check Endpoint ---
253
+ @app.get("/health")
254
+ async def health_check():
255
+ """Fast health check"""
256
+ return {"status": "ok", "timestamp": datetime.now().isoformat()}