File size: 17,285 Bytes
d4b40f7 86b50c4 d4b40f7 86b50c4 8eeba5e ede74b8 86b50c4 5e99da8 d4b40f7 62960aa b8cd3ab 8eeba5e 86b50c4 f384afc d4b40f7 86b50c4 d4b40f7 86b50c4 66f4a07 b8cd3ab 86b50c4 d4b40f7 b8cd3ab c173ba5 e31f78a b8cd3ab 85207e8 c173ba5 e31f78a 8002b92 8608c6b c5fdfc9 898aae7 6d03d4e aca919c 3b5f918 e31f78a 86b50c4 d4b40f7 85207e8 d4b40f7 f384afc d4b40f7 62960aa d4b40f7 62960aa d4b40f7 62960aa d4b40f7 86b50c4 d4b40f7 86b50c4 85207e8 d4b40f7 5e99da8 d4b40f7 e31f78a d4b40f7 5e99da8 86b50c4 85207e8 ede74b8 f03de96 85207e8 ede74b8 86b50c4 d4b40f7 62960aa d4b40f7 62960aa d4b40f7 85207e8 8eeba5e 9f95a1f 79bc5da 8eeba5e 79bc5da 9f95a1f 79bc5da 8eeba5e 79bc5da 9f95a1f 79bc5da 8eeba5e 9f95a1f 8eeba5e 85207e8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 | # app.py
import os
import logging
from typing import Optional, Literal, Dict, Any
from fastapi import FastAPI, HTTPException, Header, Request # <-- added Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, ConfigDict
import httpx
from huggingface_hub import InferenceClient
from datetime import date
import time # <-- added
import hashlib # <-- added
from Constants import CONTEXT
# ---------- Config ----------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("voice-agent")
PROFILE_MD_PATH = os.path.join("Data", "profile_data.md")
def load_profile_md() -> str:
if os.path.exists(PROFILE_MD_PATH):
with open(PROFILE_MD_PATH, "r", encoding="utf-8") as f:
return f.read()
return ""
PROFILE_MD = load_profile_md()
TODAY_DATE = date.today()
def build_prompt(question: str) -> str:
"""Single-message prompt so it works reliably across providers."""
return f"""
You are Mohit Gupta's AI voice twin, built to represent him in interviews and Q&A sessions.
You should answer as if you are Mohit Gupta, maintaining a professional yet approachable tone.
Date Context: Today’s date is {TODAY_DATE}. You may use this date to calculate ages, durations, or the number of years since/past any mentioned event.
Your job is to answer truthfully, factually, and in a friendly but professional tone using the context provided.
The context is formatted in Markdown with sections (e.g., # About Me, ## Projects, ### Features).
Use these sections to give structured and relevant answers.
Do not invent details not present in the context.
If asked about something outside this context, politely clarify you donot know about that.
Where helpful, structure answers with clear points or examples explaining the usecase of project or explaining projects.
You can explain any technical things the interviewer asked like how you used the FastAPI etc.
### Guidelines:
- Answer concisely but include specific details when relevant (projects, metrics, tech stack).
- If multiple related sections exist, combine their info naturally.
- Do not repeat the entire context; summarize what is relevant to the question.
- Maintain first-person voice (“I have worked on…”) as you are representing Mohit Gupta.
- If you get any link in context that is related to the query then must add that in response.
- make sure this donot feel that you are mimicking Mohit. Make sure that interviewer will feel that you are Mohit Gupta not as AI.
- If asked about a project that is in context (e.g., Fashion Sense AI), explain only using the details provided.
- Never substitute with unrelated projects or links (e.g., Corn-Vomitoxin in Fashion Sense AI Project) unless directly asked.
- Always output them as plain clickable URLs (e.g., https://example.com).
- Do NOT use Markdown link syntax like [label](url).
- If multiple links are relevant, list them on separate lines under "Links:".
- Always Include link about which you are giving in response.
- If you donot know about anything then just say I am a quick learner. I can learn about that. But everything given in the context you should explain that.
### Context about Mohit (Markdown format):
{PROFILE_MD}
### Task
Answer the question using ONLY the context above but you can explain technical things related to context only.
### Question
{question}
### Answer
""".strip()
# ---------- Provider Clients ----------
# We prefer Gemini by default. If user chooses Hugging Face, we call HF Inference API for the specified model.
async def call_gemini(
api_key: str,
model: str,
prompt: str,
generation_config: Optional[Dict[str, Any]] = None
) -> str:
"""
Calls Google Gemini via the official python SDK if available; falls back to REST if not.
We DON'T log the API key.
"""
generation_config = generation_config or {"temperature": 0.2, "max_output_tokens": CONTEXT}
try:
# Prefer python SDK (google-generativeai)
import google.generativeai as genai # type: ignore
genai.configure(api_key=api_key)
gm = genai.GenerativeModel(model)
resp = gm.generate_content(prompt, generation_config=generation_config)
# SDK returns .text on success; may carry safety blocks otherwise.
text = getattr(resp, "text", None) or ""
if not text:
# Try to surface blocked / empty output reasons
raise HTTPException(502, "Gemini returned empty response.")
return text.strip()
except ModuleNotFoundError:
# Fallback to REST (models may differ in REST naming, e.g., "models/gemini-1.5-flash")
# We’ll try both forms automatically.
model_names = [model, f"models/{model}"]
last_err = None
for m in model_names:
url = f"https://generativelanguage.googleapis.com/v1beta/{m}:generateContent"
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": generation_config,
}
headers = {"x-goog-api-key": api_key}
try:
async with httpx.AsyncClient(timeout=60) as client:
r = await client.post(url, json=payload, headers=headers)
if r.status_code == 200:
data = r.json()
# Extract first candidate text
candidates = (data.get("candidates") or [])
if not candidates:
raise HTTPException(502, f"Gemini returned no candidates: {data}")
parts = candidates[0].get("content", {}).get("parts", [])
text = "".join(p.get("text", "") for p in parts).strip()
if not text:
raise HTTPException(502, "Gemini returned empty text.")
return text
else:
last_err = HTTPException(r.status_code, f"Gemini error: {r.text}")
except Exception as e:
last_err = e
# If we got here, all attempts failed
raise last_err or HTTPException(502, "Gemini request failed")
async def call_hf_chat(hf_api_key: str, model: str, messages, *, provider: str | None = "auto",
max_tokens: int = 1024, temperature: float = 0.2) -> str:
"""
Uses Hugging Face Inference Providers (OpenAI-compatible chat completions).
"""
client = InferenceClient(api_key=hf_api_key, provider=provider, timeout=120)
resp = client.chat.completions.create(
model=model,
messages=messages, # [{"role":"user","content":"..."}] OR multimodal structure
max_tokens=max_tokens,
temperature=temperature,
stream=False,
)
# hf client returns OpenAI-style response
return resp.choices[0].message["content"].strip()
# ---------- FastAPI ----------
app = FastAPI(title="Voice Agent API", version="0.2.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # tighten for prod
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ChatIn(BaseModel):
question: str = Field(..., min_length=1, description="User question/prompt", examples=["Summarize my projects briefly."])
session_id: Optional[str] = Field(None, examples=["demo-1"])
# Which provider to use — default Gemini
provider: Optional[Literal["gemini", "huggingface"]] = "gemini"
# Optional: model override per provider
model: Optional[str] = Field(
None,
examples=["gemini-1.5-flash", "google/gemma-3-27b-it"]
)
# Per-request API keys (frontend supplies these)
gemini_api_key: Optional[str] = None
hf_api_key: Optional[str] = None
model_config = ConfigDict(json_schema_extra={
"examples": [{
"question": "Give me a one-line intro about yourself.",
"provider": "gemini",
"model": "gemini-1.5-flash",
"gemini_api_key": "YOUR_GEMINI_KEY"
}]
})
class ChatOut(BaseModel):
answer: str
class DebugPromptOut(BaseModel):
length: int
preview: str
@app.get("/")
def root():
return JSONResponse({"ok": True, "message": "M.A.R.S.H.A.L (Mohit’s AgenticAI Representation System for Humanized Assistance and Legacy) Voice Agent API (Gemini / Hugging Face)", "Created By": "Mohit Gupta"})
@app.get("/api/health")
def health():
# No external calls here — just server status & profile presence.
return {
"ok": True,
"profile_loaded": bool(PROFILE_MD),
"default_context_chars": len(PROFILE_MD),
"providers": {
"gemini": "supported",
"huggingface": "supported"
}
}
@app.post("/api/chat", response_model=ChatOut, tags=["Chat"], summary="Ask the agent")
async def chat(
payload: ChatIn,
# optional: accept keys via headers (frontend can send them this way instead of JSON)
x_gemini_api_key: Optional[str] = Header(None),
x_hf_api_key: Optional[str] = Header(None),
authorization: Optional[str] = Header(None), # e.g. "Bearer hf_xxx"
):
question = payload.question.strip()
if not question:
raise HTTPException(400, "Question is required.")
prompt = build_prompt(question)
provider = payload.provider or "gemini"
if provider == "gemini":
model = payload.model or os.getenv("DEFAULT_GEMINI_MODEL", "gemini-1.5-flash")
# choose key from body > header > env
gemini_key = payload.gemini_api_key or x_gemini_api_key or os.getenv("GEMINI_API_KEY")
if not gemini_key:
raise HTTPException(400, "Gemini API key is required (send gemini_api_key or X-Gemini-Api-Key).")
text = await call_gemini(gemini_key, model, prompt)
return ChatOut(answer=text or "Sorry, I didn't catch that.")
elif provider == "huggingface":
model = payload.model or os.getenv("DEFAULT_HF_MODEL", "google/gemma-3-27b-it")
hf_key = payload.hf_api_key or x_hf_api_key or (authorization.split(" ",1)[1].strip() if authorization and authorization.lower().startswith("bearer ") else None) or os.getenv("HF_API_KEY")
if not hf_key:
raise HTTPException(400, "Hugging Face API key is required (send hf_api_key, X-Hf-Api-Key, or Authorization: Bearer).")
messages = [{"role":"user","content": build_prompt(payload.question)}]
text = await call_hf_chat(hf_key, model, messages, provider="auto")
return ChatOut(answer=text or "Sorry, I didn't catch that.")
else:
raise HTTPException(400, f"Unknown provider: {provider}")
# Optional: peek at the exact prompt we send (for debugging)
@app.post("/api/debug/prompt")
def debug_prompt(payload: ChatIn):
p = build_prompt(payload.question or "")
return {"length": len(p), "preview": p}
# --------------------------------------------------------------------
# ------------------------ Analytics (NEW) ----------------------------
# --------------------------------------------------------------------
# Persistent view counter + per-visit log using Upstash Redis (REST).
# No secrets exposed to frontend; FE calls these endpoints only.
UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL", "")
UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN", "")
ANALYTICS_SKIP_BOTS = os.getenv("ANALYTICS_SKIP_BOTS", "true").lower() == "true"
# Redis keys
K_TOTAL = "analytics:visits:count"
K_STREAM = "analytics:visits:stream"
def K_UNIQUE_TODAY(day: str) -> str:
return f"analytics:unique:{day}" # HyperLogLog
BOT_SIGS = ("bot", "crawler", "spider", "facebookexternalhit", "slurp")
class VisitIn(BaseModel):
path: str
referrer: Optional[str] = None
def _assert_upstash_ready():
if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN:
raise HTTPException(503, "Analytics datastore not configured.")
async def _redis_cmd(cmd: list[str]) -> Any:
"""
Call Upstash Redis via /pipeline.
Body must be a JSON array of arrays: [[ "CMD", "arg1", ... ]]
Returns the single command's 'result'.
"""
_assert_upstash_ready()
url = UPSTASH_REDIS_REST_URL.rstrip("/") + "/pipeline"
headers = {
"Authorization": "Bearer " + UPSTASH_REDIS_REST_TOKEN,
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = [cmd] # <-- IMPORTANT: raw array, not {"commands": [cmd]}
async with httpx.AsyncClient(timeout=10) as client:
r = await client.post(url, headers=headers, json=payload)
try:
r.raise_for_status()
except Exception:
logger.error("Upstash error (%s): %s", r.status_code, r.text)
raise HTTPException(status_code=502, detail=f"Upstash error: {r.text}")
data = r.json() # e.g., [{"result":"PONG","status":200}]
if not isinstance(data, list) or not data:
raise HTTPException(502, f"Upstash unexpected response: {data!r}")
entry = data[0]
if "error" in entry:
raise HTTPException(502, f"Upstash error: {entry['error']}")
return entry.get("result")
from fastapi import status
@app.get("/analytics/selftest", tags=["analytics"])
async def analytics_selftest():
"""
Runs a minimal write/read/delete cycle on Upstash to prove connectivity + payload shape.
"""
try:
pong = await _redis_cmd(["PING"])
except Exception as e:
return JSONResponse({"ok": False, "stage": "PING", "error": str(e)}, status_code=status.HTTP_502_BAD_GATEWAY)
try:
await _redis_cmd(["SET", "analytics:selftest", "ok", "PX", "60000"])
val = await _redis_cmd(["GET", "analytics:selftest"])
await _redis_cmd(["DEL", "analytics:selftest"])
except Exception as e:
return JSONResponse({"ok": False, "stage": "SET/GET/DEL", "pong": pong}, status_code=status.HTTP_502_BAD_GATEWAY)
return {"ok": True, "pong": pong, "kv": val}
def _client_ip(request: Request, x_forwarded_for: Optional[str]) -> str:
if x_forwarded_for:
return x_forwarded_for.split(",")[0].strip()
return request.client.host if request.client else "0.0.0.0"
def _hash_ip(ip: str) -> str:
return hashlib.sha256(ip.encode("utf-8")).hexdigest()[:32]
@app.post("/analytics/visit", tags=["analytics"])
async def track_visit(
payload: VisitIn,
request: Request,
user_agent: Optional[str] = Header(None, alias="User-Agent"),
xff: Optional[str] = Header(None, alias="X-Forwarded-For")
):
"""
Track a visit:
- INCR total counter
- PFADD into today's HyperLogLog for uniques
- XADD into a stream with ts/path/ref/ua/ip_hash
"""
# Optional bot filter
if ANALYTICS_SKIP_BOTS and user_agent and any(sig in user_agent.lower() for sig in BOT_SIGS):
return {"ok": True, "skipped": "bot"}
ip = _client_ip(request, xff)
ip_hash = _hash_ip(ip)
now_ms = int(time.time() * 1000)
day = time.strftime("%Y%m%d")
# Increment total
total = await _redis_cmd(["INCR", K_TOTAL])
# Unique per-day
await _redis_cmd(["PFADD", K_UNIQUE_TODAY(day), ip_hash])
# Append to stream (cap with MAXLEN if desired)
fields = {
"ts": str(now_ms),
"path": payload.path or "/",
"ref": payload.referrer or "",
"ua": user_agent or "",
"ip_hash": ip_hash,
}
xs = []
for k, v in fields.items():
xs.extend([k, v])
await _redis_cmd(["XADD", K_STREAM, "*", *xs])
return {"ok": True, "total": int(total)}
@app.get("/analytics/summary", tags=["analytics"])
async def analytics_summary():
"""
Returns:
- total: persistent total views
- unique_today: HyperLogLog-based unique visitors for today
"""
day = time.strftime("%Y%m%d")
total_raw = await _redis_cmd(["GET", K_TOTAL])
total = int(total_raw) if total_raw else 0
unique_today = await _redis_cmd(["PFCOUNT", K_UNIQUE_TODAY(day)])
return {"total": total, "unique_today": int(unique_today)}
@app.get("/analytics/visitors", tags=["analytics"])
async def analytics_visitors(limit: int = 50, cursor: Optional[str] = None):
"""
Page through most recent visitors using XREVRANGE.
- cursor: exclusive upper-bound stream ID for next page
- returns: items[], next_cursor
"""
args = ["XREVRANGE", K_STREAM]
if cursor:
args.extend([cursor, "-"])
else:
args.extend(["+", "-"])
args.extend(["COUNT", str(limit)])
res = await _redis_cmd(args) # [[id, [k1,v1,k2,v2,...]], ...]
items = []
next_cursor = None
for i, row in enumerate(res or []):
_id = row[0]
kv = row[1]
d = dict(zip(kv[0::2], kv[1::2]))
items.append({"id": _id, **d})
if i == len(res) - 1:
next_cursor = _id
return {"items": items, "next_cursor": next_cursor}
# --------------------------------------------------------------------
# ---------------------- End Analytics (NEW) -------------------------
# --------------------------------------------------------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", reload=True) |