# app.py import os import logging from typing import Optional, Literal, Dict, Any from fastapi import FastAPI, HTTPException, Header, Request # <-- added Request from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field, ConfigDict import httpx from huggingface_hub import InferenceClient from datetime import date import time # <-- added import hashlib # <-- added from Constants import CONTEXT # ---------- Config ---------- logging.basicConfig(level=logging.INFO) logger = logging.getLogger("voice-agent") PROFILE_MD_PATH = os.path.join("Data", "profile_data.md") def load_profile_md() -> str: if os.path.exists(PROFILE_MD_PATH): with open(PROFILE_MD_PATH, "r", encoding="utf-8") as f: return f.read() return "" PROFILE_MD = load_profile_md() TODAY_DATE = date.today() def build_prompt(question: str) -> str: """Single-message prompt so it works reliably across providers.""" return f""" You are Mohit Gupta's AI voice twin, built to represent him in interviews and Q&A sessions. You should answer as if you are Mohit Gupta, maintaining a professional yet approachable tone. Date Context: Today’s date is {TODAY_DATE}. You may use this date to calculate ages, durations, or the number of years since/past any mentioned event. Your job is to answer truthfully, factually, and in a friendly but professional tone using the context provided. The context is formatted in Markdown with sections (e.g., # About Me, ## Projects, ### Features). Use these sections to give structured and relevant answers. Do not invent details not present in the context. If asked about something outside this context, politely clarify you donot know about that. Where helpful, structure answers with clear points or examples explaining the usecase of project or explaining projects. You can explain any technical things the interviewer asked like how you used the FastAPI etc. ### Guidelines: - Answer concisely but include specific details when relevant (projects, metrics, tech stack). - If multiple related sections exist, combine their info naturally. - Do not repeat the entire context; summarize what is relevant to the question. - Maintain first-person voice (“I have worked on…”) as you are representing Mohit Gupta. - If you get any link in context that is related to the query then must add that in response. - make sure this donot feel that you are mimicking Mohit. Make sure that interviewer will feel that you are Mohit Gupta not as AI. - If asked about a project that is in context (e.g., Fashion Sense AI), explain only using the details provided. - Never substitute with unrelated projects or links (e.g., Corn-Vomitoxin in Fashion Sense AI Project) unless directly asked. - Always output them as plain clickable URLs (e.g., https://example.com). - Do NOT use Markdown link syntax like [label](url). - If multiple links are relevant, list them on separate lines under "Links:". - Always Include link about which you are giving in response. - If you donot know about anything then just say I am a quick learner. I can learn about that. But everything given in the context you should explain that. ### Context about Mohit (Markdown format): {PROFILE_MD} ### Task Answer the question using ONLY the context above but you can explain technical things related to context only. ### Question {question} ### Answer """.strip() # ---------- Provider Clients ---------- # We prefer Gemini by default. If user chooses Hugging Face, we call HF Inference API for the specified model. async def call_gemini( api_key: str, model: str, prompt: str, generation_config: Optional[Dict[str, Any]] = None ) -> str: """ Calls Google Gemini via the official python SDK if available; falls back to REST if not. We DON'T log the API key. """ generation_config = generation_config or {"temperature": 0.2, "max_output_tokens": CONTEXT} try: # Prefer python SDK (google-generativeai) import google.generativeai as genai # type: ignore genai.configure(api_key=api_key) gm = genai.GenerativeModel(model) resp = gm.generate_content(prompt, generation_config=generation_config) # SDK returns .text on success; may carry safety blocks otherwise. text = getattr(resp, "text", None) or "" if not text: # Try to surface blocked / empty output reasons raise HTTPException(502, "Gemini returned empty response.") return text.strip() except ModuleNotFoundError: # Fallback to REST (models may differ in REST naming, e.g., "models/gemini-1.5-flash") # We’ll try both forms automatically. model_names = [model, f"models/{model}"] last_err = None for m in model_names: url = f"https://generativelanguage.googleapis.com/v1beta/{m}:generateContent" payload = { "contents": [{"parts": [{"text": prompt}]}], "generationConfig": generation_config, } headers = {"x-goog-api-key": api_key} try: async with httpx.AsyncClient(timeout=60) as client: r = await client.post(url, json=payload, headers=headers) if r.status_code == 200: data = r.json() # Extract first candidate text candidates = (data.get("candidates") or []) if not candidates: raise HTTPException(502, f"Gemini returned no candidates: {data}") parts = candidates[0].get("content", {}).get("parts", []) text = "".join(p.get("text", "") for p in parts).strip() if not text: raise HTTPException(502, "Gemini returned empty text.") return text else: last_err = HTTPException(r.status_code, f"Gemini error: {r.text}") except Exception as e: last_err = e # If we got here, all attempts failed raise last_err or HTTPException(502, "Gemini request failed") async def call_hf_chat(hf_api_key: str, model: str, messages, *, provider: str | None = "auto", max_tokens: int = 1024, temperature: float = 0.2) -> str: """ Uses Hugging Face Inference Providers (OpenAI-compatible chat completions). """ client = InferenceClient(api_key=hf_api_key, provider=provider, timeout=120) resp = client.chat.completions.create( model=model, messages=messages, # [{"role":"user","content":"..."}] OR multimodal structure max_tokens=max_tokens, temperature=temperature, stream=False, ) # hf client returns OpenAI-style response return resp.choices[0].message["content"].strip() # ---------- FastAPI ---------- app = FastAPI(title="Voice Agent API", version="0.2.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], # tighten for prod allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class ChatIn(BaseModel): question: str = Field(..., min_length=1, description="User question/prompt", examples=["Summarize my projects briefly."]) session_id: Optional[str] = Field(None, examples=["demo-1"]) # Which provider to use — default Gemini provider: Optional[Literal["gemini", "huggingface"]] = "gemini" # Optional: model override per provider model: Optional[str] = Field( None, examples=["gemini-1.5-flash", "google/gemma-3-27b-it"] ) # Per-request API keys (frontend supplies these) gemini_api_key: Optional[str] = None hf_api_key: Optional[str] = None model_config = ConfigDict(json_schema_extra={ "examples": [{ "question": "Give me a one-line intro about yourself.", "provider": "gemini", "model": "gemini-1.5-flash", "gemini_api_key": "YOUR_GEMINI_KEY" }] }) class ChatOut(BaseModel): answer: str class DebugPromptOut(BaseModel): length: int preview: str @app.get("/") def root(): return JSONResponse({"ok": True, "message": "M.A.R.S.H.A.L (Mohit’s AgenticAI Representation System for Humanized Assistance and Legacy) Voice Agent API (Gemini / Hugging Face)", "Created By": "Mohit Gupta"}) @app.get("/api/health") def health(): # No external calls here — just server status & profile presence. return { "ok": True, "profile_loaded": bool(PROFILE_MD), "default_context_chars": len(PROFILE_MD), "providers": { "gemini": "supported", "huggingface": "supported" } } @app.post("/api/chat", response_model=ChatOut, tags=["Chat"], summary="Ask the agent") async def chat( payload: ChatIn, # optional: accept keys via headers (frontend can send them this way instead of JSON) x_gemini_api_key: Optional[str] = Header(None), x_hf_api_key: Optional[str] = Header(None), authorization: Optional[str] = Header(None), # e.g. "Bearer hf_xxx" ): question = payload.question.strip() if not question: raise HTTPException(400, "Question is required.") prompt = build_prompt(question) provider = payload.provider or "gemini" if provider == "gemini": model = payload.model or os.getenv("DEFAULT_GEMINI_MODEL", "gemini-1.5-flash") # choose key from body > header > env gemini_key = payload.gemini_api_key or x_gemini_api_key or os.getenv("GEMINI_API_KEY") if not gemini_key: raise HTTPException(400, "Gemini API key is required (send gemini_api_key or X-Gemini-Api-Key).") text = await call_gemini(gemini_key, model, prompt) return ChatOut(answer=text or "Sorry, I didn't catch that.") elif provider == "huggingface": model = payload.model or os.getenv("DEFAULT_HF_MODEL", "google/gemma-3-27b-it") hf_key = payload.hf_api_key or x_hf_api_key or (authorization.split(" ",1)[1].strip() if authorization and authorization.lower().startswith("bearer ") else None) or os.getenv("HF_API_KEY") if not hf_key: raise HTTPException(400, "Hugging Face API key is required (send hf_api_key, X-Hf-Api-Key, or Authorization: Bearer).") messages = [{"role":"user","content": build_prompt(payload.question)}] text = await call_hf_chat(hf_key, model, messages, provider="auto") return ChatOut(answer=text or "Sorry, I didn't catch that.") else: raise HTTPException(400, f"Unknown provider: {provider}") # Optional: peek at the exact prompt we send (for debugging) @app.post("/api/debug/prompt") def debug_prompt(payload: ChatIn): p = build_prompt(payload.question or "") return {"length": len(p), "preview": p} # -------------------------------------------------------------------- # ------------------------ Analytics (NEW) ---------------------------- # -------------------------------------------------------------------- # Persistent view counter + per-visit log using Upstash Redis (REST). # No secrets exposed to frontend; FE calls these endpoints only. UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL", "") UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN", "") ANALYTICS_SKIP_BOTS = os.getenv("ANALYTICS_SKIP_BOTS", "true").lower() == "true" # Redis keys K_TOTAL = "analytics:visits:count" K_STREAM = "analytics:visits:stream" def K_UNIQUE_TODAY(day: str) -> str: return f"analytics:unique:{day}" # HyperLogLog BOT_SIGS = ("bot", "crawler", "spider", "facebookexternalhit", "slurp") class VisitIn(BaseModel): path: str referrer: Optional[str] = None def _assert_upstash_ready(): if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN: raise HTTPException(503, "Analytics datastore not configured.") async def _redis_cmd(cmd: list[str]) -> Any: """ Call Upstash Redis via /pipeline. Body must be a JSON array of arrays: [[ "CMD", "arg1", ... ]] Returns the single command's 'result'. """ _assert_upstash_ready() url = UPSTASH_REDIS_REST_URL.rstrip("/") + "/pipeline" headers = { "Authorization": "Bearer " + UPSTASH_REDIS_REST_TOKEN, "Content-Type": "application/json", "Accept": "application/json", } payload = [cmd] # <-- IMPORTANT: raw array, not {"commands": [cmd]} async with httpx.AsyncClient(timeout=10) as client: r = await client.post(url, headers=headers, json=payload) try: r.raise_for_status() except Exception: logger.error("Upstash error (%s): %s", r.status_code, r.text) raise HTTPException(status_code=502, detail=f"Upstash error: {r.text}") data = r.json() # e.g., [{"result":"PONG","status":200}] if not isinstance(data, list) or not data: raise HTTPException(502, f"Upstash unexpected response: {data!r}") entry = data[0] if "error" in entry: raise HTTPException(502, f"Upstash error: {entry['error']}") return entry.get("result") from fastapi import status @app.get("/analytics/selftest", tags=["analytics"]) async def analytics_selftest(): """ Runs a minimal write/read/delete cycle on Upstash to prove connectivity + payload shape. """ try: pong = await _redis_cmd(["PING"]) except Exception as e: return JSONResponse({"ok": False, "stage": "PING", "error": str(e)}, status_code=status.HTTP_502_BAD_GATEWAY) try: await _redis_cmd(["SET", "analytics:selftest", "ok", "PX", "60000"]) val = await _redis_cmd(["GET", "analytics:selftest"]) await _redis_cmd(["DEL", "analytics:selftest"]) except Exception as e: return JSONResponse({"ok": False, "stage": "SET/GET/DEL", "pong": pong}, status_code=status.HTTP_502_BAD_GATEWAY) return {"ok": True, "pong": pong, "kv": val} def _client_ip(request: Request, x_forwarded_for: Optional[str]) -> str: if x_forwarded_for: return x_forwarded_for.split(",")[0].strip() return request.client.host if request.client else "0.0.0.0" def _hash_ip(ip: str) -> str: return hashlib.sha256(ip.encode("utf-8")).hexdigest()[:32] @app.post("/analytics/visit", tags=["analytics"]) async def track_visit( payload: VisitIn, request: Request, user_agent: Optional[str] = Header(None, alias="User-Agent"), xff: Optional[str] = Header(None, alias="X-Forwarded-For") ): """ Track a visit: - INCR total counter - PFADD into today's HyperLogLog for uniques - XADD into a stream with ts/path/ref/ua/ip_hash """ # Optional bot filter if ANALYTICS_SKIP_BOTS and user_agent and any(sig in user_agent.lower() for sig in BOT_SIGS): return {"ok": True, "skipped": "bot"} ip = _client_ip(request, xff) ip_hash = _hash_ip(ip) now_ms = int(time.time() * 1000) day = time.strftime("%Y%m%d") # Increment total total = await _redis_cmd(["INCR", K_TOTAL]) # Unique per-day await _redis_cmd(["PFADD", K_UNIQUE_TODAY(day), ip_hash]) # Append to stream (cap with MAXLEN if desired) fields = { "ts": str(now_ms), "path": payload.path or "/", "ref": payload.referrer or "", "ua": user_agent or "", "ip_hash": ip_hash, } xs = [] for k, v in fields.items(): xs.extend([k, v]) await _redis_cmd(["XADD", K_STREAM, "*", *xs]) return {"ok": True, "total": int(total)} @app.get("/analytics/summary", tags=["analytics"]) async def analytics_summary(): """ Returns: - total: persistent total views - unique_today: HyperLogLog-based unique visitors for today """ day = time.strftime("%Y%m%d") total_raw = await _redis_cmd(["GET", K_TOTAL]) total = int(total_raw) if total_raw else 0 unique_today = await _redis_cmd(["PFCOUNT", K_UNIQUE_TODAY(day)]) return {"total": total, "unique_today": int(unique_today)} @app.get("/analytics/visitors", tags=["analytics"]) async def analytics_visitors(limit: int = 50, cursor: Optional[str] = None): """ Page through most recent visitors using XREVRANGE. - cursor: exclusive upper-bound stream ID for next page - returns: items[], next_cursor """ args = ["XREVRANGE", K_STREAM] if cursor: args.extend([cursor, "-"]) else: args.extend(["+", "-"]) args.extend(["COUNT", str(limit)]) res = await _redis_cmd(args) # [[id, [k1,v1,k2,v2,...]], ...] items = [] next_cursor = None for i, row in enumerate(res or []): _id = row[0] kv = row[1] d = dict(zip(kv[0::2], kv[1::2])) items.append({"id": _id, **d}) if i == len(res) - 1: next_cursor = _id return {"items": items, "next_cursor": next_cursor} # -------------------------------------------------------------------- # ---------------------- End Analytics (NEW) ------------------------- # -------------------------------------------------------------------- if __name__ == "__main__": import uvicorn uvicorn.run("app:app", reload=True)