Chrunos commited on
Commit
d57c6f7
·
verified ·
1 Parent(s): b67a466

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +67 -158
app.py CHANGED
@@ -2,17 +2,15 @@ import os
2
  import re
3
  import logging
4
  import uuid
5
- import time
6
- from datetime import datetime, timezone, timedelta # Added timedelta
7
- from typing import Optional, Dict, Any, List # Added List
8
- from collections import defaultdict, deque # Added defaultdict and deque
9
 
10
- from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path, Request # Added Request
11
  from fastapi.responses import StreamingResponse
12
  from pydantic import BaseModel, Field
13
 
14
- import openai
15
- import google.generativeai as genai
16
  from google.generativeai.types import GenerationConfig
17
 
18
  # --- Logging Configuration ---
@@ -29,108 +27,7 @@ CUSTOM_API_MODEL_DEFAULT = "gemma3:27b"
29
  DEFAULT_GEMINI_MODEL = "gemini-1.5-flash-latest"
30
  GEMINI_REQUEST_TIMEOUT_SECONDS = 300
31
 
32
- # Rate limiting dictionary
33
- class RateLimiter:
34
- def __init__(self, max_requests: int, time_window: timedelta):
35
- self.max_requests = max_requests
36
- self.time_window = time_window
37
- self.requests: Dict[str, list] = defaultdict(list)
38
-
39
- def _cleanup_old_requests(self, user_ip: str) -> None:
40
- """Remove requests that are outside the time window."""
41
- current_time = time.time()
42
- self.requests[user_ip] = [
43
- timestamp for timestamp in self.requests[user_ip]
44
- if current_time - timestamp < self.time_window.total_seconds()
45
- ]
46
-
47
- def is_rate_limited(self, user_ip: str) -> bool:
48
- """Check if the user has exceeded their rate limit."""
49
- self._cleanup_old_requests(user_ip)
50
-
51
- # Get current count after cleanup
52
- current_count = len(self.requests[user_ip])
53
-
54
- # Add current request timestamp (incrementing the count)
55
- current_time = time.time()
56
- self.requests[user_ip].append(current_time)
57
-
58
- # Check if user has exceeded the maximum requests
59
- return (current_count + 1) > self.max_requests
60
-
61
- def get_current_count(self, user_ip: str) -> int:
62
- """Get the current request count for an IP."""
63
- self._cleanup_old_requests(user_ip)
64
- return len(self.requests[user_ip])
65
-
66
- # Initialize rate limiter for the /chat endpoint
67
- # Max 12 requests per day, per IP.
68
- rate_limiter = RateLimiter(
69
- max_requests=3,
70
- time_window=timedelta(days=1)
71
- )
72
-
73
- def get_user_ip(request: Request) -> str:
74
- """Helper function to get user's IP address."""
75
- forwarded = request.headers.get("X-Forwarded-For")
76
- if forwarded:
77
- return forwarded.split(",")[0]
78
- return request.client.host
79
-
80
-
81
- class ApiRotator:
82
- def __init__(self, apis):
83
- self.apis = apis
84
- self.last_successful_index = None
85
-
86
- def get_prioritized_apis(self):
87
- if self.last_successful_index is not None:
88
- # Move the last successful API to the front
89
- rotated_apis = (
90
- [self.apis[self.last_successful_index]] +
91
- self.apis[:self.last_successful_index] +
92
- self.apis[self.last_successful_index+1:]
93
- )
94
- return rotated_apis
95
- return self.apis
96
-
97
- def update_last_successful(self, index):
98
- self.last_successful_index = index
99
-
100
-
101
- # --- API Rotator Class (as provided by you) ---
102
- class ApiRotator:
103
- def __init__(self, apis: List[Any]): # Assuming apis is a list of API configurations
104
- self.apis = apis
105
- self.last_successful_index: Optional[int] = None
106
- logger.info(f"ApiRotator initialized with {len(apis)} APIs.")
107
-
108
- def get_prioritized_apis(self) -> List[Any]:
109
- if self.last_successful_index is not None and 0 <= self.last_successful_index < len(self.apis):
110
- # Move the last successful API to the front
111
- rotated_apis = (
112
- [self.apis[self.last_successful_index]] +
113
- self.apis[:self.last_successful_index] +
114
- self.apis[self.last_successful_index + 1:]
115
- )
116
- # logger.debug("Rotated APIs based on last successful index.")
117
- return rotated_apis
118
- # logger.debug("Returning APIs in original or non-rotated order.")
119
- return list(self.apis) # Return a copy
120
-
121
- def update_last_successful(self, api_config_used: Any):
122
- try:
123
- index = self.apis.index(api_config_used)
124
- self.last_successful_index = index
125
- # logger.info(f"Updated last successful API to index: {index}")
126
- except ValueError:
127
- # This might happen if api_config_used is not in the original list,
128
- # or if the list was modified.
129
- logger.warning(f"Could not find API config in rotator to update last successful index.")
130
- self.last_successful_index = None # Reset if not found
131
-
132
-
133
- # --- In-Memory Task Storage (for Gemini async tasks) ---
134
  tasks_db: Dict[str, Dict[str, Any]] = {}
135
 
136
  # --- Pydantic Models ---
@@ -156,23 +53,24 @@ class TaskStatusResponse(BaseModel):
156
  last_updated_at: datetime
157
  result: Optional[str] = None
158
  error: Optional[str] = None
 
 
159
 
160
  # --- FastAPI App Initialization ---
161
  app = FastAPI(
162
- title="Dual Chat & Async Gemini API with Rate Limiting",
163
- description="Provides rate-limited direct chat and asynchronous tasks for Gemini API.",
164
- version="2.1.0"
165
  )
166
 
167
- # --- Helper Functions (Regex from previous step) ---
168
  def is_video_url_for_gemini(url: Optional[str]) -> bool:
169
  if not url:
170
  return False
171
  youtube_regex = (
172
  r'(https_?://)?(www\.)?'
173
- r'(youtube|youtu|youtube-nocookie)\.(com|be)/'
174
- r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})'
175
- )
176
  googleusercontent_youtube_regex = r'https_?://googleusercontent\.com/youtube\.com/\w+'
177
  return re.match(youtube_regex, url) is not None or \
178
  re.match(googleusercontent_youtube_regex, url) is not None
@@ -184,37 +82,54 @@ async def process_gemini_request_background(
184
  requested_gemini_model: str,
185
  gemini_key_to_use: str
186
  ):
187
- # (Gemini background processing logic remains the same as before)
188
  logger.info(f"[Task {task_id}] Starting background Gemini processing. Model: {requested_gemini_model}, URL: {input_url}")
189
  tasks_db[task_id]["status"] = "PROCESSING"
190
  tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc)
191
 
192
  try:
193
  genai.configure(api_key=gemini_key_to_use)
 
194
  model_instance = genai.GenerativeModel(model_name=requested_gemini_model)
 
195
  content_parts = [{"text": user_message}]
196
  if input_url and is_video_url_for_gemini(input_url):
197
  logger.info(f"[Task {task_id}] Adding video URL to Gemini content: {input_url}")
198
- content_parts.append({"file_data": {"mime_type": "video/youtube", "file_uri": input_url}})
 
 
 
 
 
199
 
200
  gemini_contents = [{"parts": content_parts}]
 
201
  generation_config = GenerationConfig(candidate_count=1)
202
  request_options = {"timeout": GEMINI_REQUEST_TIMEOUT_SECONDS}
203
 
204
  logger.info(f"[Task {task_id}] Sending request to Gemini API...")
205
  response = await model_instance.generate_content_async(
206
- gemini_contents, stream=False, generation_config=generation_config, request_options=request_options
 
 
 
207
  )
208
 
 
 
209
  full_response_text = ""
210
  if hasattr(response, 'text') and response.text:
211
  full_response_text = response.text
212
- elif hasattr(response, 'parts'):
213
  for part in response.parts:
214
- if hasattr(part, 'text'): full_response_text += part.text
215
- else:
 
 
 
 
216
  logger.warning(f"[Task {task_id}] Gemini response structure not as expected or empty. Response: {response}")
217
 
 
218
  if not full_response_text and response.prompt_feedback and response.prompt_feedback.block_reason:
219
  block_reason_name = response.prompt_feedback.block_reason.name if hasattr(response.prompt_feedback.block_reason, 'name') else str(response.prompt_feedback.block_reason)
220
  logger.warning(f"[Task {task_id}] Gemini content blocked: {block_reason_name}")
@@ -238,21 +153,9 @@ async def process_gemini_request_background(
238
 
239
  # --- API Endpoints ---
240
 
241
- # Modified /chat endpoint with rate limiting
242
  @app.post("/chat", response_class=StreamingResponse)
243
- async def direct_chat(payload: ChatPayload, request: Request): # Added 'request: Request'
244
- user_ip = get_user_ip(request)
245
- logger.info(f"Direct chat request received from IP: {user_ip}. Temp: {payload.temperature}, Msg: '{payload.message[:50]}...'")
246
-
247
- if rate_limiter.is_rate_limited(user_ip):
248
- current_count = rate_limiter.get_current_count(user_ip)
249
- raise HTTPException(
250
- status_code=429,
251
- detail={
252
- "error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.",
253
- "url": "https://t.me/chrunoss"
254
- }
255
- )
256
 
257
  custom_api_key_secret = os.getenv("CUSTOM_API_SECRET_KEY")
258
  custom_api_base_url = os.getenv("CUSTOM_API_BASE_URL", CUSTOM_API_BASE_URL_DEFAULT)
@@ -266,7 +169,7 @@ async def direct_chat(payload: ChatPayload, request: Request): # Added 'request:
266
 
267
  async def custom_api_streamer():
268
  try:
269
- logger.info(f"IP {user_ip}: Sending request to Custom API for /chat.")
270
  stream = client.chat.completions.create(
271
  model=custom_api_model,
272
  temperature=payload.temperature,
@@ -282,43 +185,50 @@ async def direct_chat(payload: ChatPayload, request: Request): # Added 'request:
282
  if content_to_yield:
283
  yield content_to_yield
284
  except Exception as e:
285
- logger.error(f"IP {user_ip}: Error during Custom API call for /chat: {e}", exc_info=True)
286
- yield f"Error processing with Custom API: {str(e)}" # Consider a more generic error for client
287
  return StreamingResponse(custom_api_streamer(), media_type="text/plain")
288
 
289
 
290
  @app.post("/gemini/submit_task", response_model=TaskSubmissionResponse)
291
- async def submit_gemini_task(request_payload: GeminiTaskRequest, background_tasks: BackgroundTasks, http_request: Request): # Renamed request to request_payload and added http_request
292
- user_ip = get_user_ip(http_request) # Potentially rate limit this endpoint too if needed
293
  task_id = str(uuid.uuid4())
294
- logger.info(f"IP {user_ip}: Received Gemini task submission. Assigning Task ID: {task_id}. Msg: '{request_payload.message[:50]}...'")
295
 
296
- gemini_api_key_from_request = request_payload.api_key
297
  gemini_api_key_secret = os.getenv("GEMINI_API_KEY")
298
- key_to_use = gemini_api_key_from_request #or gemini_api_key_secret
299
 
300
  if not key_to_use:
301
- logger.error(f"[Task {task_id}] IP {user_ip}: Gemini API Key missing for task submission.")
302
  raise HTTPException(status_code=400, detail="Gemini API Key required.")
303
 
304
- requested_model = request_payload.gemini_model or DEFAULT_GEMINI_MODEL
305
 
306
  current_time = datetime.now(timezone.utc)
307
  tasks_db[task_id] = {
308
- "status": "PENDING", "result": None, "error": None,
309
- "submitted_at": current_time, "last_updated_at": current_time,
310
- "request_params": request_payload.model_dump()
 
 
 
311
  }
312
 
313
  background_tasks.add_task(
314
- process_gemini_request_background, task_id, request_payload.message,
315
- request_payload.url, requested_model, key_to_use
 
 
 
 
316
  )
317
 
318
- logger.info(f"[Task {task_id}] IP {user_ip}: Task submitted to background processing.")
319
  return TaskSubmissionResponse(
320
- task_id=task_id, status="PENDING",
321
- task_detail_url=f"/gemini/task/{task_id}"
 
322
  )
323
 
324
 
@@ -342,7 +252,6 @@ async def get_gemini_task_status(task_id: str = Path(..., description="The ID of
342
  )
343
 
344
  @app.get("/")
345
- async def read_root(request: Request): # Added request
346
- user_ip = get_user_ip(request)
347
- logger.info(f"IP {user_ip}: Root endpoint '/' accessed (health check).")
348
- return {"message": "API for Direct Chat and Async Gemini Tasks with Rate Limiting is running."}
 
2
  import re
3
  import logging
4
  import uuid
5
+ from datetime import datetime, timezone
6
+ from typing import Optional, Dict, Any
 
 
7
 
8
+ from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path
9
  from fastapi.responses import StreamingResponse
10
  from pydantic import BaseModel, Field
11
 
12
+ import openai # For your custom API
13
+ import google.generativeai as genai # For Gemini API
14
  from google.generativeai.types import GenerationConfig
15
 
16
  # --- Logging Configuration ---
 
27
  DEFAULT_GEMINI_MODEL = "gemini-1.5-flash-latest"
28
  GEMINI_REQUEST_TIMEOUT_SECONDS = 300
29
 
30
+ # --- In-Memory Task Storage ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
  tasks_db: Dict[str, Dict[str, Any]] = {}
32
 
33
  # --- Pydantic Models ---
 
53
  last_updated_at: datetime
54
  result: Optional[str] = None
55
  error: Optional[str] = None
56
+ # request_params: Optional[Dict[str, Any]] = None # Optionally return original params
57
+
58
 
59
  # --- FastAPI App Initialization ---
60
  app = FastAPI(
61
+ title="Dual Chat & Async Gemini API",
62
+ description="Provides direct chat via custom API and asynchronous tasks for Gemini API.",
63
+ version="2.0.0"
64
  )
65
 
66
+ # --- Helper Functions ---
67
  def is_video_url_for_gemini(url: Optional[str]) -> bool:
68
  if not url:
69
  return False
70
  youtube_regex = (
71
  r'(https_?://)?(www\.)?'
72
+ '(youtube|youtu|youtube-nocookie)\.(com|be)/'
73
+ '(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})')
 
74
  googleusercontent_youtube_regex = r'https_?://googleusercontent\.com/youtube\.com/\w+'
75
  return re.match(youtube_regex, url) is not None or \
76
  re.match(googleusercontent_youtube_regex, url) is not None
 
82
  requested_gemini_model: str,
83
  gemini_key_to_use: str
84
  ):
 
85
  logger.info(f"[Task {task_id}] Starting background Gemini processing. Model: {requested_gemini_model}, URL: {input_url}")
86
  tasks_db[task_id]["status"] = "PROCESSING"
87
  tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc)
88
 
89
  try:
90
  genai.configure(api_key=gemini_key_to_use)
91
+
92
  model_instance = genai.GenerativeModel(model_name=requested_gemini_model)
93
+
94
  content_parts = [{"text": user_message}]
95
  if input_url and is_video_url_for_gemini(input_url):
96
  logger.info(f"[Task {task_id}] Adding video URL to Gemini content: {input_url}")
97
+ content_parts.append({
98
+ "file_data": {
99
+ "mime_type": "video/youtube", # Or let Gemini infer
100
+ "file_uri": input_url
101
+ }
102
+ })
103
 
104
  gemini_contents = [{"parts": content_parts}]
105
+
106
  generation_config = GenerationConfig(candidate_count=1)
107
  request_options = {"timeout": GEMINI_REQUEST_TIMEOUT_SECONDS}
108
 
109
  logger.info(f"[Task {task_id}] Sending request to Gemini API...")
110
  response = await model_instance.generate_content_async(
111
+ gemini_contents,
112
+ stream=False, # Collect full response for async task
113
+ generation_config=generation_config,
114
+ request_options=request_options
115
  )
116
 
117
+ # Assuming response.text contains the full aggregated text
118
+ # If using a model version that streams even for non-stream call, aggregate it:
119
  full_response_text = ""
120
  if hasattr(response, 'text') and response.text:
121
  full_response_text = response.text
122
+ elif hasattr(response, 'parts'): # Check for newer API structures if .text is not primary
123
  for part in response.parts:
124
+ if hasattr(part, 'text'):
125
+ full_response_text += part.text
126
+ else: # Fallback for safety if structure is unexpected or if it's an iterable of chunks
127
+ # This part might need adjustment based on actual non-streaming response object
128
+ # For now, assuming generate_content_async with stream=False gives a response with .text
129
+ # or we need to iterate if it's still a stream internally for some models
130
  logger.warning(f"[Task {task_id}] Gemini response structure not as expected or empty. Response: {response}")
131
 
132
+
133
  if not full_response_text and response.prompt_feedback and response.prompt_feedback.block_reason:
134
  block_reason_name = response.prompt_feedback.block_reason.name if hasattr(response.prompt_feedback.block_reason, 'name') else str(response.prompt_feedback.block_reason)
135
  logger.warning(f"[Task {task_id}] Gemini content blocked: {block_reason_name}")
 
153
 
154
  # --- API Endpoints ---
155
 
 
156
  @app.post("/chat", response_class=StreamingResponse)
157
+ async def direct_chat(payload: ChatPayload):
158
+ logger.info(f"Direct chat request received. Temperature: {payload.temperature}, Message: '{payload.message[:50]}...'")
 
 
 
 
 
 
 
 
 
 
 
159
 
160
  custom_api_key_secret = os.getenv("CUSTOM_API_SECRET_KEY")
161
  custom_api_base_url = os.getenv("CUSTOM_API_BASE_URL", CUSTOM_API_BASE_URL_DEFAULT)
 
169
 
170
  async def custom_api_streamer():
171
  try:
172
+ logger.info("Sending request to Custom API for /chat.")
173
  stream = client.chat.completions.create(
174
  model=custom_api_model,
175
  temperature=payload.temperature,
 
185
  if content_to_yield:
186
  yield content_to_yield
187
  except Exception as e:
188
+ logger.error(f"Error during Custom API call for /chat: {e}", exc_info=True)
189
+ yield f"Error processing with Custom API: {str(e)}"
190
  return StreamingResponse(custom_api_streamer(), media_type="text/plain")
191
 
192
 
193
  @app.post("/gemini/submit_task", response_model=TaskSubmissionResponse)
194
+ async def submit_gemini_task(request: GeminiTaskRequest, background_tasks: BackgroundTasks):
 
195
  task_id = str(uuid.uuid4())
196
+ logger.info(f"Received Gemini task submission. Assigning Task ID: {task_id}. Message: '{request.message[:50]}...'")
197
 
198
+ gemini_api_key_from_request = request.api_key
199
  gemini_api_key_secret = os.getenv("GEMINI_API_KEY")
200
+ key_to_use = gemini_api_key_from_request or gemini_api_key_secret
201
 
202
  if not key_to_use:
203
+ logger.error(f"[Task {task_id}] Gemini API Key missing for task submission.")
204
  raise HTTPException(status_code=400, detail="Gemini API Key required.")
205
 
206
+ requested_model = request.gemini_model or DEFAULT_GEMINI_MODEL
207
 
208
  current_time = datetime.now(timezone.utc)
209
  tasks_db[task_id] = {
210
+ "status": "PENDING",
211
+ "result": None,
212
+ "error": None,
213
+ "submitted_at": current_time,
214
+ "last_updated_at": current_time,
215
+ "request_params": request.model_dump() # Store original request
216
  }
217
 
218
  background_tasks.add_task(
219
+ process_gemini_request_background,
220
+ task_id,
221
+ request.message,
222
+ request.url,
223
+ requested_model,
224
+ key_to_use
225
  )
226
 
227
+ logger.info(f"[Task {task_id}] Task submitted to background processing.")
228
  return TaskSubmissionResponse(
229
+ task_id=task_id,
230
+ status="PENDING",
231
+ task_detail_url=f"/gemini/task/{task_id}" # Provide the URL to poll
232
  )
233
 
234
 
 
252
  )
253
 
254
  @app.get("/")
255
+ async def read_root():
256
+ logger.info("Root endpoint '/' accessed (health check).")
257
+ return {"message": "API for Direct Chat and Async Gemini Tasks is running."}