Spaces:
Sleeping
Sleeping
| # app.py | |
| from fastapi import FastAPI, Request, HTTPException, status | |
| from pydantic import BaseModel | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import os | |
| import requests | |
| import time | |
| import json | |
| import threading | |
| from typing import Dict | |
| from datetime import datetime, timedelta | |
| GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY", "") | |
| if not GEMINI_API_KEY: | |
| print("WARNING: GEMINI_API_KEY not set. Set it in Space secrets or env.") | |
| GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") # change if needed | |
| GEMINI_BASE = os.environ.get("GEMINI_BASE", "https://generativelanguage.googleapis.com/v1beta") | |
| GENERATE_PATH = f"{GEMINI_BASE}/models/{GEMINI_MODEL}:generateContent" | |
| # Rate limit settings (in-memory). Tune these for your expected traffic. | |
| RATE_LIMIT_REQUESTS = int(os.environ.get("RATE_LIMIT_REQUESTS", "12")) # requests per window per IP | |
| RATE_LIMIT_WINDOW = int(os.environ.get("RATE_LIMIT_WINDOW", "60")) # window size in seconds | |
| # Retry settings for Gemini calls | |
| MAX_RETRIES = 3 | |
| RETRY_BACKOFF = 1.5 # multiplier | |
| # ---------- Simple in-memory rate limiter ---------- | |
| # Structure: { ip: [timestamp1, timestamp2, ...] } | |
| rate_table: Dict[str, list] = {} | |
| rate_lock = threading.Lock() | |
| def clean_old(ip: str): | |
| now = time.time() | |
| cutoff = now - RATE_LIMIT_WINDOW | |
| with rate_lock: | |
| timestamps = rate_table.get(ip, []) | |
| timestamps = [t for t in timestamps if t >= cutoff] | |
| rate_table[ip] = timestamps | |
| return len(timestamps) | |
| def consume_token(ip: str): | |
| with rate_lock: | |
| cnt = clean_old(ip) | |
| if cnt >= RATE_LIMIT_REQUESTS: | |
| return False | |
| rate_table.setdefault(ip, []).append(time.time()) | |
| return True | |
| # ---------- FastAPI setup ---------- | |
| app = FastAPI(title="Hackathon Idea Generator (Gemini backend)") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # change to your frontend domain in production | |
| allow_credentials=True, | |
| allow_methods=["GET", "POST", "OPTIONS"], | |
| allow_headers=["*"], | |
| ) | |
| # ---------- Request / Response models ---------- | |
| class GenerateRequest(BaseModel): | |
| custom_prompt: str = "" | |
| class DetailRequest(BaseModel): | |
| idea_id: int | |
| idea_title: str | |
| # ---------- Utility: call Gemini REST generateContent ---------- | |
| def call_gemini(prompt_text: str, max_output_tokens: int = 1024, temperature: float = 0.7): | |
| """ | |
| Call Gemini generateContent REST endpoint with retries and error handling. | |
| Returns response JSON (raw). | |
| """ | |
| if not GEMINI_API_KEY: | |
| raise HTTPException(status_code=500, detail="GEMINI_API_KEY not configured on server.") | |
| headers = { | |
| "Content-Type": "application/json", | |
| "x-goog-api-key": GEMINI_API_KEY | |
| } | |
| body = { | |
| "contents": [ | |
| { | |
| "parts": [ | |
| {"text": prompt_text} | |
| ] | |
| } | |
| ], | |
| # optional tuning params depending on API; keep compact | |
| "temperature": temperature, | |
| "maxOutputTokens": max_output_tokens | |
| } | |
| attempt = 0 | |
| backoff = 1.0 | |
| while attempt < MAX_RETRIES: | |
| try: | |
| resp = requests.post(GENERATE_PATH, headers=headers, json=body, timeout=60) | |
| if resp.status_code == 200: | |
| return resp.json() | |
| # retry on 429/5xx | |
| if resp.status_code in (429, 500, 502, 503, 504): | |
| attempt += 1 | |
| time.sleep(backoff) | |
| backoff *= RETRY_BACKOFF | |
| continue | |
| # for other errors, raise with message | |
| try: | |
| d = resp.json() | |
| message = d.get("error", d) | |
| except Exception: | |
| message = resp.text | |
| raise HTTPException(status_code=resp.status_code, detail=f"Gemini API error: {message}") | |
| except requests.Timeout: | |
| attempt += 1 | |
| time.sleep(backoff) | |
| backoff *= RETRY_BACKOFF | |
| continue | |
| except requests.RequestException as e: | |
| attempt += 1 | |
| time.sleep(backoff) | |
| backoff *= RETRY_BACKOFF | |
| continue | |
| raise HTTPException(status_code=502, detail="Gemini API unavailable after retries.") | |
| # ---------- Helpers to extract text from Gemini response ---------- | |
| def extract_text_from_gemini(resp_json): | |
| """ | |
| There are several forms of Gemini responses; try to robustly extract | |
| the generated text from the response payload. | |
| """ | |
| # Newer responses may have "candidates" or "output" structure | |
| # See docs: the easy path is to inspect "candidates" or "outputs" etc. | |
| # We'll try common possibilities. | |
| try: | |
| # Example: { "candidates": [{"content": [{"text": "..."}]}] } | |
| if "candidates" in resp_json: | |
| cand = resp_json["candidates"] | |
| if isinstance(cand, list) and len(cand) > 0: | |
| # dive into structured content | |
| content = cand[0].get("content") | |
| if isinstance(content, list): | |
| parts = [] | |
| for item in content: | |
| if isinstance(item, dict) and "text" in item: | |
| parts.append(item["text"]) | |
| if parts: | |
| return "".join(parts) | |
| # fallback to "output" keys | |
| if "output" in cand[0]: | |
| return cand[0]["output"].get("text", "") | |
| # Example newer API: { "outputs": [{"content":[{"text":"..."}]}] } | |
| if "outputs" in resp_json: | |
| outs = resp_json["outputs"] | |
| if isinstance(outs, list) and len(outs) > 0: | |
| content = outs[0].get("content", []) | |
| parts = [] | |
| for c in content: | |
| if isinstance(c, dict) and "text" in c: | |
| parts.append(c["text"]) | |
| if parts: | |
| return "".join(parts) | |
| # Example minimal: { "text": "..." } | |
| if "text" in resp_json: | |
| return resp_json["text"] | |
| # Example nested: "candidates":[{"content":[{"parts":[{"text":"..."}]}]}] | |
| if "candidates" in resp_json: | |
| try: | |
| return resp_json["candidates"][0]["content"][0]["parts"][0]["text"] | |
| except: | |
| pass | |
| except Exception: | |
| pass | |
| # last resort: return stringified json | |
| return json.dumps(resp_json) | |
| # ---------- JSON extraction helper ---------- | |
| import re | |
| def extract_json_from_text(text: str): | |
| # Find first JSON object in text | |
| m = re.search(r'\{[\s\S]*\}', text) | |
| if m: | |
| try: | |
| return json.loads(m.group()) | |
| except: | |
| return None | |
| # also try array | |
| m2 = re.search(r'\[[\s\S]*\]', text) | |
| if m2: | |
| try: | |
| arr = json.loads(m2.group()) | |
| return {"ideas": arr} | |
| except: | |
| return None | |
| return None | |
| # ---------- Endpoints ---------- | |
| def root(): | |
| return {"status": "ok", "model": GEMINI_MODEL} | |
| def stats(): | |
| # simple stats not persisted across restarts | |
| return {"rate_limit_requests": RATE_LIMIT_REQUESTS, "rate_limit_window_seconds": RATE_LIMIT_WINDOW} | |
| async def generate(req: GenerateRequest, request: Request): | |
| client_ip = request.client.host if request.client else "unknown" | |
| if not consume_token(client_ip): | |
| raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, | |
| detail=f"Rate limit exceeded. Max {RATE_LIMIT_REQUESTS} requests per {RATE_LIMIT_WINDOW}s.") | |
| # compose system + user prompt. Keep it compact to reduce tokens. | |
| system_prefix = ( | |
| "[SYSTEM] You are a JSON API that returns exactly 3 hackathon ideas in valid JSON. " | |
| "Return only JSON and nothing else. " | |
| ) | |
| user_prompt = f"Topic: {req.custom_prompt.strip() or 'innovative technology'}" | |
| prompt_text = f"{system_prefix}\n{user_prompt}\n\nReturn JSON with format:\n" \ | |
| "{ \"ideas\": [ { \"id\":1, \"title\":\"...\", \"elevator\":\"...\", \"overview\":\"...\", \"primary_tech_stack\":[], \"difficulty\":\"Medium\", \"time_estimate_hours\":24 }, ... ], \"best_pick_id\": 2, \"best_pick_reason\": \"...\" }" | |
| resp = call_gemini(prompt_text, max_output_tokens=1024, temperature=0.7) | |
| generated_text = extract_text_from_gemini(resp) | |
| parsed = extract_json_from_text(generated_text) | |
| if not parsed: | |
| # return debug for easier local debugging | |
| raise HTTPException(status_code=502, detail={"error": "PARSE_ERROR", "raw": generated_text[:1000], "gemini_raw": resp}) | |
| return parsed | |
| async def details(req: DetailRequest, request: Request): | |
| client_ip = request.client.host if request.client else "unknown" | |
| if not consume_token(client_ip): | |
| raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, | |
| detail=f"Rate limit exceeded. Max {RATE_LIMIT_REQUESTS} requests per {RATE_LIMIT_WINDOW}s.") | |
| prompt_text = ( | |
| "[SYSTEM] You are a JSON API that returns a full 48-hour implementation plan in JSON. " | |
| "Return only JSON.\n" | |
| f"Project title: {req.idea_title}\n\n" | |
| "Return format: {\"id\": <id>, \"title\": \"...\", \"mermaid_architecture\":\"...\", \"phases\": [ {\"name\":\"MVP\",\"time_hours\":20,\"tasks\":[..],\"deliverables\":[..]} , ... ], \"critical_code_snippets\":[...], \"ui_components\": [...], \"risks_and_mitigations\":[...] }" | |
| ) | |
| resp = call_gemini(prompt_text, max_output_tokens=1400, temperature=0.7) | |
| generated_text = extract_text_from_gemini(resp) | |
| parsed = extract_json_from_text(generated_text) | |
| if not parsed: | |
| raise HTTPException(status_code=502, detail={"error": "PARSE_ERROR", "raw": generated_text[:1000], "gemini_raw": resp}) | |
| return parsed |