Chrunos commited on
Commit
13b4461
·
verified ·
1 Parent(s): a2b62db

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +285 -390
app.py CHANGED
@@ -1,259 +1,64 @@
1
  import os
2
- import json
3
- import requests
4
- from datetime import datetime, timedelta
5
- from typing import List, Dict, Optional
6
- from fastapi import FastAPI, Request, HTTPException, Depends
7
- from fastapi.middleware.cors import CORSMiddleware
8
- from fastapi.responses import StreamingResponse # <-- Import StreamingResponse
9
- import asyncio # <-- Import asyncio
10
- from openai import OpenAI
11
  import logging
 
12
  import time
 
13
  from collections import defaultdict
14
-
15
-
16
- # --- Security Helper Functions ---
17
- def verify_origin(request: Request):
18
- """Verify that the request comes from an allowed origin for /chat endpoint"""
19
- origin = request.headers.get("origin")
20
- referer = request.headers.get("referer")
21
-
22
- allowed_origins = [
23
- "https://chrunos.com",
24
- "https://www.chrunos.com"
25
- ]
26
-
27
- # Allow localhost for development (you can remove this in production)
28
- if origin and any(origin.startswith(local) for local in ["http://localhost:", "http://127.0.0.1:"]):
29
- return True
30
-
31
- # Check origin header
32
- if origin in allowed_origins:
33
- return True
34
-
35
- # Check referer header as fallback
36
- if referer and any(referer.startswith(allowed) for allowed in allowed_origins):
37
- return True
38
-
39
- raise HTTPException(
40
- status_code=403,
41
- detail="Access denied: This endpoint is only accessible from chrunos.com"
42
- )
43
-
44
- # --- Configure Logging ---
45
- logging.basicConfig(level=logging.INFO)
46
  logger = logging.getLogger(__name__)
47
 
48
- # --- Load API Keys from Environment Variables ---
49
- GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
50
- GOOGLE_CX = os.getenv("GOOGLE_CX")
51
- LLM_API_KEY = os.getenv("LLM_API_KEY")
52
- LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://api-15i2e8ze256bvfn6.aistudio-app.com/v1")
53
-
54
- # --- Enhanced System Prompt ---
55
- SYSTEM_PROMPT_WITH_SEARCH = """You are an intelligent AI assistant with access to real-time web search capabilities.
56
-
57
- IMPORTANT: When search tools are available to you, you should USE them for any query that could benefit from current, recent, or specific factual information, even if you have some knowledge about the topic from your training data.
58
-
59
- **When to use search (be proactive about using search when available):**
60
- - Any mention of recent events, current affairs, or "latest" information
61
- - Specific facts that could have changed since your training
62
- - Statistics, prices, scores, or numerical data
63
- - News, announcements, or current status of anything
64
- - When the user explicitly asks for current information
65
- - Any factual query where fresh information would be valuable
66
-
67
- **Response Guidelines:**
68
- 1. Use search tools when they're available and relevant to the query
69
- 2. Synthesize information from multiple sources when possible
70
- 3. Clearly indicate when information comes from search results
71
- 4. Provide comprehensive, well-structured answers
72
- 5. Cite sources appropriately
73
- 6. If search results are contradictory, mention the discrepancy
74
-
75
- **Current Context**: Today's date is {current_date}. Prioritize recent information when available.
76
-
77
- Remember: When in doubt about whether to search, lean towards using the search tool for more accurate and current information."""
78
-
79
- SYSTEM_PROMPT_NO_SEARCH = """You are an intelligent AI assistant. Provide helpful, accurate, and comprehensive responses based on your training data.
80
-
81
- When you don't have current information about recent events or changing data, acknowledge this limitation and suggest that the user might want to search for the most up-to-date information.
82
-
83
- **Current Context**: Today's date is {current_date}, but your knowledge has a cutoff date and may not include the most recent information."""
84
-
85
- # --- Enhanced Web Search Tool Implementation ---
86
- def Google_Search_tool(queries: List[str], num_results: int = 5) -> List[Dict]:
87
- """
88
- Enhanced Google Custom Search with better error handling and result formatting
89
- """
90
- if not GOOGLE_API_KEY or not GOOGLE_CX:
91
- logger.error("GOOGLE_API_KEY or GOOGLE_CX environment variables not set.")
92
- return []
93
-
94
- if not queries or not queries[0].strip():
95
- logger.warning("Empty search query provided")
96
- return []
97
-
98
- query = queries[0].strip()
99
- logger.info(f"Executing Google Custom Search for: '{query}'")
100
-
101
- search_url = "https://www.googleapis.com/customsearch/v1"
102
- params = {
103
- "key": GOOGLE_API_KEY,
104
- "cx": GOOGLE_CX,
105
- "q": query,
106
- "num": min(num_results, 10), # Google API max is 10
107
- "dateRestrict": "m6" # Prioritize results from last 6 months for freshness
108
- }
109
-
110
- try:
111
- response = requests.get(search_url, params=params, timeout=15)
112
- response.raise_for_status()
113
- search_results = response.json()
114
-
115
- if "items" not in search_results:
116
- logger.warning(f"No search results found for query: '{query}'")
117
- return []
118
-
119
- # Enhanced result parsing with better data validation
120
- parsed_results = []
121
- for item in search_results.get("items", []):
122
- title = item.get("title", "").strip()
123
- url = item.get("link", "").strip()
124
- snippet = item.get("snippet", "").strip()
125
-
126
- # Skip results with missing essential information
127
- if not title or not url or not snippet:
128
- continue
129
-
130
- # Extract publication date if available
131
- pub_date = None
132
- if "pagemap" in item and "metatags" in item["pagemap"]:
133
- for meta in item["pagemap"]["metatags"]:
134
- if "article:published_time" in meta:
135
- pub_date = meta["article:published_time"]
136
- break
137
-
138
- parsed_results.append({
139
- "source_title": title,
140
- "url": url,
141
- "snippet": snippet,
142
- "published_date": pub_date,
143
- "domain": url.split('/')[2] if '/' in url else url
144
- })
145
-
146
- logger.info(f"Successfully parsed {len(parsed_results)} search results")
147
- return parsed_results
148
-
149
- except requests.exceptions.Timeout:
150
- logger.error("Google search request timed out")
151
- return []
152
- except requests.exceptions.RequestException as e:
153
- logger.error(f"Error during Google search request: {e}")
154
- return []
155
- except Exception as e:
156
- logger.error(f"Unexpected error in Google Search_tool: {e}")
157
- return []
158
-
159
- def format_search_results_for_llm(search_results: List[Dict]) -> str:
160
- """
161
- Format search results with enhanced context for better LLM understanding
162
- """
163
- if not search_results:
164
- return "No relevant search results were found for this query."
165
-
166
- current_date = datetime.now().strftime("%Y-%m-%d")
167
- formatted_results = [f"Search Results (Retrieved on {current_date}):\n"]
168
-
169
- for i, result in enumerate(search_results, 1):
170
- formatted_result = f"\n--- Result {i} ---"
171
- formatted_result += f"\nTitle: {result['source_title']}"
172
- formatted_result += f"\nSource: {result['domain']}"
173
- formatted_result += f"\nURL: {result['url']}"
174
-
175
- if result.get('published_date'):
176
- formatted_result += f"\nPublished: {result['published_date']}"
177
-
178
- formatted_result += f"\nContent: {result['snippet']}"
179
- formatted_results.append(formatted_result)
180
-
181
- formatted_results.append(f"\n--- End of Search Results ---\n")
182
- formatted_results.append("Please synthesize this information to provide a comprehensive answer to the user's question. If the search results contain conflicting information, please note the discrepancy. Always cite your sources when using information from the search results.")
183
-
184
- return "\n".join(formatted_results)
185
-
186
- # --- FastAPI Application Setup ---
187
- app = FastAPI(title="AI Chatbot with Enhanced Search", version="2.0.0")
188
-
189
- app.add_middleware(
190
- CORSMiddleware,
191
- allow_origins=[
192
- "https://chrunos.com",
193
- "https://www.chrunos.com",
194
- "http://localhost:3000", # For local development
195
- "http://localhost:8000", # For local development
196
- ],
197
- allow_credentials=True,
198
- allow_methods=["GET", "POST", "OPTIONS"],
199
- allow_headers=["*"],
200
- )
201
 
202
- # --- OpenAI Client Initialization ---
203
- if not LLM_API_KEY or not LLM_BASE_URL:
204
- logger.error("LLM_API_KEY or LLM_BASE_URL not configured")
205
- client = None
206
- else:
207
- client = OpenAI(api_key=LLM_API_KEY, base_url=LLM_BASE_URL)
208
- logger.info("OpenAI client initialized successfully")
209
-
210
- # --- Enhanced Tool Definition ---
211
- available_tools = [
212
- {
213
- "type": "function",
214
- "function": {
215
- "name": "Google Search",
216
- "description": "REQUIRED for current information: Performs a Google search for recent events, current data, latest news, statistics, prices, or any information that changes frequently. Use this tool proactively when the user's query could benefit from up-to-date information, even if you have some relevant knowledge from training data.",
217
- "parameters": {
218
- "type": "object",
219
- "properties": {
220
- "query": {
221
- "type": "string",
222
- "description": "The search query. Be specific and include relevant keywords. For recent events, include time-related terms like 'latest', '2024', 'recent', etc."
223
- }
224
- },
225
- "required": ["query"]
226
- }
227
- }
228
- }
229
- ]
230
-
231
- def should_use_search(message: str) -> bool:
232
- """
233
- Intelligent decision making for when to enable search based on message content
234
- """
235
- search_indicators = [
236
- "latest", "recent", "current", "now", "today", "this year", "2024", "2025",
237
- "news", "update", "what's happening", "status", "price", "stock",
238
- "weather", "score", "results", "announcement", "release"
239
- ]
240
-
241
- factual_indicators = [
242
- "who is", "what is", "where is", "when did", "how many", "statistics",
243
- "data", "information about", "tell me about", "facts about"
244
- ]
245
-
246
- message_lower = message.lower()
247
-
248
- # Strong indicators for search
249
- if any(indicator in message_lower for indicator in search_indicators):
250
- return True
251
-
252
- # Moderate indicators for search (factual queries)
253
- if any(indicator in message_lower for indicator in factual_indicators):
254
- return True
255
-
256
- return False
257
 
258
  # Rate limiting dictionary
259
  class RateLimiter:
@@ -290,7 +95,7 @@ class RateLimiter:
290
  return len(self.requests[user_ip])
291
 
292
 
293
- # Initialize rate limiter with 50 requests per day
294
  rate_limiter = RateLimiter(
295
  max_requests=50,
296
  time_window=timedelta(days=1)
@@ -324,9 +129,110 @@ class ApiRotator:
324
  self.last_successful_index = index
325
 
326
 
327
- # --- Enhanced Chatbot Endpoint (with Streaming) ---
328
- @app.post("/chat")
329
- async def chat_endpoint(request: Request, _: None = Depends(verify_origin)):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
330
  user_ip = get_user_ip(request)
331
 
332
  if rate_limiter.is_rate_limited(user_ip):
@@ -338,148 +244,137 @@ async def chat_endpoint(request: Request, _: None = Depends(verify_origin)):
338
  "url": "https://t.me/chrunoss"
339
  }
340
  )
341
- if not client:
342
- raise HTTPException(status_code=500, detail="LLM client not configured")
343
-
344
- try:
345
- data = await request.json()
346
- user_message = data.get("message", "").strip()
347
- use_search = data.get("use_search")
348
- if use_search is None:
349
- use_search = data.get("user_search")
350
-
351
- temperature = data.get("temperature", 0.7)
352
- if not isinstance(temperature, (int, float)) or not 0 <= temperature <= 2:
353
- temperature = 0.7
354
 
355
- conversation_history = data.get("history", [])
356
-
357
- if not user_message:
358
- raise HTTPException(status_code=400, detail="No message provided")
359
-
360
- if use_search is None:
361
- use_search = should_use_search(user_message)
362
 
363
- # --- Message and Tool Call Preparation (Same as before) ---
364
- current_date = datetime.now().strftime("%Y-%m-%d")
365
- system_content = SYSTEM_PROMPT_WITH_SEARCH.format(current_date=current_date) if use_search else SYSTEM_PROMPT_NO_SEARCH.format(current_date=current_date)
366
- system_message = {"role": "system", "content": system_content}
367
- messages = [system_message] + conversation_history + [{"role": "user", "content": user_message}]
368
-
369
- llm_kwargs = {
370
- "model": "unsloth/Qwen3-30B-A3B-GGUF",
371
- "temperature": temperature,
372
- "messages": messages,
373
- "max_tokens": 2000
374
- }
375
-
376
- if use_search:
377
- llm_kwargs["tools"] = available_tools
378
- llm_kwargs["tool_choice"] = "auto"
379
-
380
- # First LLM call (for tool decision) - This part remains blocking
381
- llm_response = client.chat.completions.create(**llm_kwargs)
382
- tool_calls = llm_response.choices[0].message.tool_calls
383
- source_links = []
384
-
385
- if tool_calls:
386
- logger.info(f"Processing {len(tool_calls)} tool calls")
387
- tool_outputs = []
388
 
389
- for tool_call in tool_calls:
390
- if tool_call.function.name == "Google Search":
391
- try:
392
- function_args = json.loads(tool_call.function.arguments)
393
- search_query = function_args.get("query", "").strip()
 
 
394
 
395
- if search_query:
396
- search_results = Google_Search_tool([search_query], num_results=5)
397
- for result in search_results:
398
- source_links.append({
399
- "title": result["source_title"],
400
- "url": result["url"],
401
- "domain": result["domain"]
402
- })
403
-
404
- formatted_results = format_search_results_for_llm(search_results)
405
- tool_outputs.append({
406
- "tool_call_id": tool_call.id,
407
- "output": formatted_results
408
- })
409
- except Exception as e:
410
- logger.error(f"Error processing tool call: {e}")
411
 
412
- messages.append(llm_response.choices[0].message)
413
- for output_item in tool_outputs:
414
- messages.append({
415
- "role": "tool",
416
- "tool_call_id": output_item["tool_call_id"],
417
- "content": output_item["output"]
418
- })
419
-
420
- # --- MODIFICATION FOR STREAMING ---
421
-
422
- async def response_generator():
423
- """This async generator streams the final response."""
424
- # First, yield metadata (like sources) as a single event
425
- initial_data = {
426
- "sources": source_links,
427
- "search_used": bool(tool_calls),
428
- }
429
- yield f"data: {json.dumps(initial_data)}\n\n"
430
-
431
- # This is the final API call that will actually be streamed
432
- stream = client.chat.completions.create(
433
- model="unsloth/Qwen3-30B-A3B-GGUF",
434
- temperature=temperature,
435
- messages=messages,
436
- max_tokens=2000,
437
- stream=True # <-- Enable streaming from the AI
438
- )
439
-
440
- try:
441
- for chunk in stream:
442
- content = chunk.choices[0].delta.content
443
- if content:
444
- # Yield each piece of content in SSE format
445
- chunk_data = {"response_chunk": content}
446
- yield f"data: {json.dumps(chunk_data)}\n\n"
447
- await asyncio.sleep(0) # Give up control to the event loop
448
- finally:
449
- # Signal the end of the stream to the client
450
- yield "data: [DONE]\n\n"
451
-
452
- # Return the StreamingResponse, which FastAPI will handle.
453
- return StreamingResponse(response_generator(), media_type="text/event-stream")
454
-
455
- except HTTPException:
456
- raise
457
- except json.JSONDecodeError:
458
- logger.error("Invalid JSON in request body")
459
- raise HTTPException(status_code=400, detail="Invalid JSON in request body")
460
- except Exception as e:
461
- logger.error(f"Unexpected error in /chat endpoint: {e}")
462
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
463
 
464
- # --- Health Check Endpoint ---
465
- @app.get("/")
466
- async def root():
467
- return {
468
- "message": "Enhanced AI Chatbot API is running",
469
- "version": "2.0.0",
470
- "features": ["Google Search Integration", "Intelligent Search Decision", "Enhanced Prompting", "Streaming Response"],
471
- "timestamp": datetime.now().isoformat()
472
- }
473
 
474
- # --- Health Check Endpoint ---
475
- @app.get("/health")
476
- async def health_check():
477
- health_status = {
478
- "status": "healthy",
479
- "timestamp": datetime.now().isoformat(),
480
- "services": {
481
- "llm_client": client is not None,
482
- "Google Search": bool(GOOGLE_API_KEY and GOOGLE_CX)
483
- }
 
 
 
 
 
 
 
 
484
  }
485
- return health_status
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
+ import re
 
 
 
 
 
 
 
 
3
  import logging
4
+ import uuid
5
  import time
6
+ from datetime import datetime, timezone, timedelta
7
  from collections import defaultdict
8
+ from typing import Optional, Dict, Any
9
+ import asyncio
10
+ from concurrent.futures import ThreadPoolExecutor
11
+
12
+ from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path, Request
13
+ from fastapi.responses import StreamingResponse
14
+ from pydantic import BaseModel, Field
15
+
16
+ import openai # For your custom API
17
+ import google.generativeai as genai # For Gemini API
18
+ from google.generativeai.types import GenerationConfig
19
+
20
+ # --- Logging Configuration ---
21
+ logging.basicConfig(
22
+ level=logging.INFO,
23
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
24
+ datefmt='%Y-%m-%d %H:%M:%S'
25
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  logger = logging.getLogger(__name__)
27
 
28
+ # --- Configuration ---
29
+ CUSTOM_API_BASE_URL_DEFAULT = "https://api-q3ieh5raqfuad9o8.aistudio-app.com/v1"
30
+ CUSTOM_API_MODEL_DEFAULT = "gemma3:27b"
31
+ DEFAULT_GEMINI_MODEL = "gemini-2.0-flash"
32
+ GEMINI_REQUEST_TIMEOUT_SECONDS = 300
33
+
34
+ # --- In-Memory Task Storage ---
35
+ tasks_db: Dict[str, Dict[str, Any]] = {}
36
+
37
+ # --- Pydantic Models ---
38
+ class ChatPayload(BaseModel):
39
+ message: str
40
+ temperature: float = Field(0.6, ge=0.0, le=1.0)
41
+
42
+ class GeminiTaskRequest(BaseModel):
43
+ message: str
44
+ url: Optional[str] = None
45
+ gemini_model: Optional[str] = None
46
+ api_key: Optional[str] = Field(None, description="Gemini API Key (optional; uses Space secret if not provided)")
47
+
48
+ class TaskSubmissionResponse(BaseModel):
49
+ task_id: str
50
+ status: str
51
+ task_detail_url: str
52
+
53
+ class TaskStatusResponse(BaseModel):
54
+ task_id: str
55
+ status: str
56
+ submitted_at: datetime
57
+ last_updated_at: datetime
58
+ result: Optional[str] = None
59
+ error: Optional[str] = None
60
+ # request_params: Optional[Dict[str, Any]] = None # Optionally return original params
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
 
63
  # Rate limiting dictionary
64
  class RateLimiter:
 
95
  return len(self.requests[user_ip])
96
 
97
 
98
+ # Initialize rate limiter with 100 requests per day
99
  rate_limiter = RateLimiter(
100
  max_requests=50,
101
  time_window=timedelta(days=1)
 
129
  self.last_successful_index = index
130
 
131
 
132
+ # --- FastAPI App Initialization ---
133
+ app = FastAPI(
134
+ title="Dual Chat & Async Gemini API",
135
+ description="Made by Cody from chrunos.com.",
136
+ version="2.0.0"
137
+ )
138
+
139
+ # --- Helper Functions ---
140
+ def is_video_url_for_gemini(url: Optional[str]) -> bool:
141
+ if not url:
142
+ return False
143
+ # Use raw strings (r"...") for regular expressions to avoid SyntaxWarnings
144
+ youtube_regex = (
145
+ r'(https_?://)?(www\.)?'
146
+ r'(youtube|youtu|youtube-nocookie)\.(com|be)/' # Changed to raw string
147
+ r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})' # Changed to raw string
148
+ )
149
+ # This regex was likely fine as it didn't have ambiguous escapes, but good practice to make it raw too
150
+ googleusercontent_youtube_regex = r'https_?://googleusercontent\.com/youtube\.com/\w+'
151
+
152
+ return re.match(youtube_regex, url) is not None or \
153
+ re.match(googleusercontent_youtube_regex, url) is not None
154
+
155
+ async def process_gemini_request_background(
156
+ task_id: str,
157
+ user_message: str,
158
+ input_url: Optional[str],
159
+ requested_gemini_model: str,
160
+ gemini_key_to_use: str
161
+ ):
162
+ logger.info(f"[Task {task_id}] Starting background Gemini processing. Model: {requested_gemini_model}, URL: {input_url}")
163
+ tasks_db[task_id]["status"] = "PROCESSING"
164
+ tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc)
165
+
166
+ try:
167
+ genai.configure(api_key=gemini_key_to_use)
168
+
169
+ model_instance = genai.GenerativeModel(model_name=requested_gemini_model)
170
+
171
+ content_parts = [{"text": user_message}]
172
+ if input_url and is_video_url_for_gemini(input_url):
173
+ logger.info(f"[Task {task_id}] Adding video URL to Gemini content: {input_url}")
174
+ content_parts.append({
175
+ "file_data": {
176
+ "mime_type": "video/youtube", # Or let Gemini infer
177
+ "file_uri": input_url
178
+ }
179
+ })
180
+
181
+ gemini_contents = [{"parts": content_parts}]
182
+
183
+ generation_config = GenerationConfig(candidate_count=1)
184
+ request_options = {"timeout": GEMINI_REQUEST_TIMEOUT_SECONDS}
185
+
186
+ logger.info(f"[Task {task_id}] Sending request to Gemini API...")
187
+ response = await model_instance.generate_content_async(
188
+ gemini_contents,
189
+ stream=False, # Collect full response for async task
190
+ generation_config=generation_config,
191
+ request_options=request_options
192
+ )
193
+
194
+ # Assuming response.text contains the full aggregated text
195
+ # If using a model version that streams even for non-stream call, aggregate it:
196
+ full_response_text = ""
197
+ if hasattr(response, 'text') and response.text:
198
+ full_response_text = response.text
199
+ elif hasattr(response, 'parts'): # Check for newer API structures if .text is not primary
200
+ for part in response.parts:
201
+ if hasattr(part, 'text'):
202
+ full_response_text += part.text
203
+ else: # Fallback for safety if structure is unexpected or if it's an iterable of chunks
204
+ # This part might need adjustment based on actual non-streaming response object
205
+ # For now, assuming generate_content_async with stream=False gives a response with .text
206
+ # or we need to iterate if it's still a stream internally for some models
207
+ logger.warning(f"[Task {task_id}] Gemini response structure not as expected or empty. Response: {response}")
208
+
209
+
210
+ if not full_response_text and response.prompt_feedback and response.prompt_feedback.block_reason:
211
+ block_reason_name = response.prompt_feedback.block_reason.name if hasattr(response.prompt_feedback.block_reason, 'name') else str(response.prompt_feedback.block_reason)
212
+ logger.warning(f"[Task {task_id}] Gemini content blocked: {block_reason_name}")
213
+ tasks_db[task_id]["status"] = "FAILED"
214
+ tasks_db[task_id]["error"] = f"Content blocked by Gemini due to: {block_reason_name}"
215
+ elif full_response_text:
216
+ logger.info(f"[Task {task_id}] Gemini processing successful. Result length: {len(full_response_text)}")
217
+ tasks_db[task_id]["status"] = "COMPLETED"
218
+ tasks_db[task_id]["result"] = full_response_text
219
+ else:
220
+ logger.warning(f"[Task {task_id}] Gemini processing completed but no text content found and no block reason.")
221
+ tasks_db[task_id]["status"] = "FAILED"
222
+ tasks_db[task_id]["error"] = "Gemini returned no content and no specific block reason."
223
+
224
+ except Exception as e:
225
+ logger.error(f"[Task {task_id}] Error during Gemini background processing: {e}", exc_info=True)
226
+ tasks_db[task_id]["status"] = "FAILED"
227
+ tasks_db[task_id]["error"] = str(e)
228
+ finally:
229
+ tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc)
230
+
231
+ # --- API Endpoints ---
232
+
233
+ @app.post("/chat", response_class=StreamingResponse)
234
+ async def direct_chat(payload: ChatPayload, request: Request):
235
+ logger.info(f"Direct chat request received. Temperature: {payload.temperature}, Message: '{payload.message[:50]}...'")
236
  user_ip = get_user_ip(request)
237
 
238
  if rate_limiter.is_rate_limited(user_ip):
 
244
  "url": "https://t.me/chrunoss"
245
  }
246
  )
247
+ custom_api_key_secret = os.getenv("CUSTOM_API_SECRET_KEY")
248
+ custom_api_base_url = os.getenv("CUSTOM_API_BASE_URL", CUSTOM_API_BASE_URL_DEFAULT)
249
+ custom_api_model = os.getenv("CUSTOM_API_MODEL", CUSTOM_API_MODEL_DEFAULT)
250
+
251
+ if not custom_api_key_secret:
252
+ logger.error("Custom API key ('CUSTOM_API_SECRET_KEY') is not configured for /chat.")
253
+ raise HTTPException(status_code=500, detail="Custom API key not configured.")
254
+
255
+ async def custom_api_streamer():
256
+ client = None
257
+ try:
258
+ logger.info("Sending request to Custom API for /chat.")
 
259
 
260
+ # Use AsyncOpenAI with proper configuration
261
+ from openai import AsyncOpenAI
262
+ client = AsyncOpenAI(
263
+ api_key=custom_api_key_secret,
264
+ base_url=custom_api_base_url,
265
+ timeout=60.0 # Longer timeout for gemma3:27b model
266
+ )
267
 
268
+ stream = await client.chat.completions.create(
269
+ model=custom_api_model,
270
+ temperature=payload.temperature,
271
+ messages=[{"role": "user", "content": payload.message}],
272
+ stream=True
273
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
274
 
275
+ async for chunk in stream:
276
+ try:
277
+ # Exact same logic as your working code
278
+ if hasattr(chunk.choices[0].delta, "reasoning_content") and chunk.choices[0].delta.reasoning_content:
279
+ yield chunk.choices[0].delta.reasoning_content
280
+ elif chunk.choices[0].delta.content is not None: # Handle None explicitly
281
+ yield chunk.choices[0].delta.content
282
 
283
+ except (IndexError, AttributeError) as e:
284
+ # Skip malformed chunks silently (some APIs send empty chunks)
285
+ continue
286
+ except Exception as e:
287
+ logger.warning(f"Skipping chunk due to error: {e}")
288
+ continue
289
+
290
+ except Exception as e:
291
+ logger.error(f"Error during Custom API call for /chat: {e}", exc_info=True)
 
 
 
 
 
 
 
292
 
293
+ # Handle specific connection errors with retry suggestion
294
+ if "peer closed connection" in str(e) or "incomplete chunked read" in str(e):
295
+ yield "Connection interrupted. Please try again."
296
+ else:
297
+ yield f"Error processing with Custom API: {str(e)}"
298
+
299
+ finally:
300
+ if client:
301
+ try:
302
+ await client.close()
303
+ except Exception as cleanup_error:
304
+ logger.warning(f"Error closing OpenAI client: {cleanup_error}")
305
+
306
+ return StreamingResponse(
307
+ custom_api_streamer(),
308
+ media_type="text/plain",
309
+ headers={
310
+ "Cache-Control": "no-cache",
311
+ "Connection": "keep-alive",
312
+ }
313
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
314
 
315
+ @app.post("/gemini/submit_task", response_model=TaskSubmissionResponse)
316
+ async def submit_gemini_task(request: GeminiTaskRequest, background_tasks: BackgroundTasks):
317
+ task_id = str(uuid.uuid4())
318
+ logger.info(f"Received Gemini task submission. Assigning Task ID: {task_id}. Message: '{request.message[:50]}...'")
 
 
 
 
 
319
 
320
+ gemini_api_key_from_request = request.api_key
321
+ gemini_api_key_secret = os.getenv("GEMINI_API_KEY")
322
+ key_to_use = gemini_api_key_from_request
323
+
324
+ if not key_to_use:
325
+ logger.error(f"[Task {task_id}] Gemini API Key missing for task submission.")
326
+ raise HTTPException(status_code=400, detail="Gemini API Key required.")
327
+
328
+ requested_model = request.gemini_model or DEFAULT_GEMINI_MODEL
329
+
330
+ current_time = datetime.now(timezone.utc)
331
+ tasks_db[task_id] = {
332
+ "status": "PENDING",
333
+ "result": None,
334
+ "error": None,
335
+ "submitted_at": current_time,
336
+ "last_updated_at": current_time,
337
+ "request_params": request.model_dump() # Store original request
338
  }
339
+
340
+ background_tasks.add_task(
341
+ process_gemini_request_background,
342
+ task_id,
343
+ request.message,
344
+ request.url,
345
+ requested_model,
346
+ key_to_use
347
+ )
348
+
349
+ logger.info(f"[Task {task_id}] Task submitted to background processing.")
350
+ return TaskSubmissionResponse(
351
+ task_id=task_id,
352
+ status="PENDING",
353
+ task_detail_url=f"/gemini/task/{task_id}" # Provide the URL to poll
354
+ )
355
+
356
+
357
+
358
+ @app.get("/gemini/task/{task_id}", response_model=TaskStatusResponse)
359
+ async def get_gemini_task_status(task_id: str = Path(..., description="The ID of the task to retrieve")):
360
+ logger.info(f"Status query for Task ID: {task_id}")
361
+ task = tasks_db.get(task_id)
362
+ if not task:
363
+ logger.warning(f"Task ID not found: {task_id}")
364
+ raise HTTPException(status_code=404, detail="Task ID not found.")
365
+
366
+ logger.info(f"[Task {task_id}] Current status: {task['status']}")
367
+ return TaskStatusResponse(
368
+ task_id=task_id,
369
+ status=task["status"],
370
+ submitted_at=task["submitted_at"],
371
+ last_updated_at=task["last_updated_at"],
372
+ result=task.get("result"),
373
+ error=task.get("error"),
374
+ # request_params=task.get("request_params") # Optionally include original params
375
+ )
376
+
377
+ @app.get("/")
378
+ async def read_root():
379
+ logger.info("Root endpoint '/' accessed (health check).")
380
+ return {"message": "API for Direct Chat and Async Gemini Tasks is running."}