whung99
fix: don't crash on missing GOOGLE_API_KEY at startup
87c2f5d
import asyncio
import json
import os
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from pathlib import Path
from fastapi import FastAPI, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, Response, RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
from agent import chat_with_operator, create_chat_session, run_operator
from google_auth import get_auth_url, handle_callback, has_client_secret, is_authenticated, logout
from mock_data import MOCK_EMAILS, MOCK_EVENTS, MOCK_SEARCH
from tts import generate_speech
from urgency import load_model
load_dotenv()
# Warn if Gemini API key is missing (don't crash — HF Spaces sets secrets as env vars)
api_key = os.getenv("GOOGLE_API_KEY")
if not api_key or api_key == "your_key_here":
import warnings
warnings.warn("GOOGLE_API_KEY not set. Add it as a Secret in HuggingFace Space settings.")
FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173")
# Persistent chat session (created on first /api/chat call)
_chat_client = None
_chat_session = None
def get_chat_session():
global _chat_client, _chat_session
if _chat_session is None:
config = load_config()
_chat_client, _chat_session = create_chat_session(config["projects"])
return _chat_session
@asynccontextmanager
async def lifespan(app):
load_model()
print("HuggingFace model loaded.")
if has_client_secret():
print("OAuth client_secret.json found. Google API integration enabled.")
else:
print("No client_secret.json found. Running in mock/demo mode.")
yield
app = FastAPI(title="Oppy", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def load_config():
from pathlib import Path
config_path = Path(__file__).parent / "config.json"
with open(config_path) as f:
return json.load(f)
# ---- OAuth Endpoints ----
@app.get("/api/auth/status")
async def auth_status():
"""Check if user is authenticated with Google APIs."""
return {
"authenticated": is_authenticated(),
"oauth_available": has_client_secret(),
}
@app.get("/api/auth/login")
async def auth_login():
"""Redirect to Google OAuth consent screen."""
url = get_auth_url()
if not url:
return {"error": "OAuth not configured. Place client_secret.json in backend/."}
return RedirectResponse(url)
@app.get("/api/auth/callback")
async def auth_callback(code: str = Query(...)):
"""Handle Google OAuth callback."""
handle_callback(code)
return RedirectResponse(f"{FRONTEND_URL}?auth=success")
@app.get("/api/auth/logout")
async def auth_logout():
"""Clear Google credentials."""
logout()
return {"status": "logged_out"}
# ---- Core Endpoints ----
@app.get("/api/run")
async def run():
config = load_config()
queue = asyncio.Queue()
async def on_event(event: dict):
await queue.put(event)
async def generator():
task = asyncio.create_task(run_operator(config["projects"], on_event))
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=1.0)
yield {"event": event["type"], "data": json.dumps(event, ensure_ascii=False)}
if event["type"] == "brief":
break
except asyncio.TimeoutError:
if task.done():
break
continue
await task
return EventSourceResponse(generator())
class TTSRequest(BaseModel):
text: str
class ChatRequest(BaseModel):
message: str
@app.post("/api/tts")
async def tts(req: TTSRequest):
audio_bytes = generate_speech(req.text)
return Response(content=audio_bytes, media_type="audio/wav")
@app.post("/api/chat")
async def chat(req: ChatRequest):
session = get_chat_session()
queue = asyncio.Queue()
async def on_event(event: dict):
await queue.put(event)
async def generator():
task = asyncio.create_task(
chat_with_operator(req.message, session, on_event)
)
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=1.0)
yield {"event": event["type"], "data": json.dumps(event, ensure_ascii=False)}
if event["type"] == "chat_reply":
break
except asyncio.TimeoutError:
if task.done():
break
continue
await task
return EventSourceResponse(generator())
@app.get("/api/project/{project_id}/sources")
async def project_sources(project_id: str):
emails = MOCK_EMAILS.get(project_id, [])
events = MOCK_EVENTS.get(project_id, [])
search = MOCK_SEARCH.get(project_id, "")
return {
"emails": emails,
"events": events,
"search": search,
}
@app.get("/api/health")
async def health():
return {
"status": "ok",
"google_authenticated": is_authenticated(),
"oauth_available": has_client_secret(),
}
# --- Serve frontend static files (for Replit / production) ---
_static_dir = Path(__file__).parent / "static"
if _static_dir.exists():
app.mount("/assets", StaticFiles(directory=_static_dir / "assets"), name="assets")
@app.get("/{full_path:path}")
async def serve_spa(full_path: str):
"""Serve the React SPA for any non-API route."""
file_path = _static_dir / full_path
if file_path.is_file():
return FileResponse(file_path)
return FileResponse(_static_dir / "index.html")