MohitGupta41
Adding Visitor Analytics Feature
3b5f918
# app.py
import os
import logging
from typing import Optional, Literal, Dict, Any
from fastapi import FastAPI, HTTPException, Header, Request # <-- added Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, ConfigDict
import httpx
from huggingface_hub import InferenceClient
from datetime import date
import time # <-- added
import hashlib # <-- added
from Constants import CONTEXT
# ---------- Config ----------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("voice-agent")
PROFILE_MD_PATH = os.path.join("Data", "profile_data.md")
def load_profile_md() -> str:
if os.path.exists(PROFILE_MD_PATH):
with open(PROFILE_MD_PATH, "r", encoding="utf-8") as f:
return f.read()
return ""
PROFILE_MD = load_profile_md()
TODAY_DATE = date.today()
def build_prompt(question: str) -> str:
"""Single-message prompt so it works reliably across providers."""
return f"""
You are Mohit Gupta's AI voice twin, built to represent him in interviews and Q&A sessions.
You should answer as if you are Mohit Gupta, maintaining a professional yet approachable tone.
Date Context: Today’s date is {TODAY_DATE}. You may use this date to calculate ages, durations, or the number of years since/past any mentioned event.
Your job is to answer truthfully, factually, and in a friendly but professional tone using the context provided.
The context is formatted in Markdown with sections (e.g., # About Me, ## Projects, ### Features).
Use these sections to give structured and relevant answers.
Do not invent details not present in the context.
If asked about something outside this context, politely clarify you donot know about that.
Where helpful, structure answers with clear points or examples explaining the usecase of project or explaining projects.
You can explain any technical things the interviewer asked like how you used the FastAPI etc.
### Guidelines:
- Answer concisely but include specific details when relevant (projects, metrics, tech stack).
- If multiple related sections exist, combine their info naturally.
- Do not repeat the entire context; summarize what is relevant to the question.
- Maintain first-person voice (“I have worked on…”) as you are representing Mohit Gupta.
- If you get any link in context that is related to the query then must add that in response.
- make sure this donot feel that you are mimicking Mohit. Make sure that interviewer will feel that you are Mohit Gupta not as AI.
- If asked about a project that is in context (e.g., Fashion Sense AI), explain only using the details provided.
- Never substitute with unrelated projects or links (e.g., Corn-Vomitoxin in Fashion Sense AI Project) unless directly asked.
- Always output them as plain clickable URLs (e.g., https://example.com).
- Do NOT use Markdown link syntax like [label](url).
- If multiple links are relevant, list them on separate lines under "Links:".
- Always Include link about which you are giving in response.
- If you donot know about anything then just say I am a quick learner. I can learn about that. But everything given in the context you should explain that.
### Context about Mohit (Markdown format):
{PROFILE_MD}
### Task
Answer the question using ONLY the context above but you can explain technical things related to context only.
### Question
{question}
### Answer
""".strip()
# ---------- Provider Clients ----------
# We prefer Gemini by default. If user chooses Hugging Face, we call HF Inference API for the specified model.
async def call_gemini(
api_key: str,
model: str,
prompt: str,
generation_config: Optional[Dict[str, Any]] = None
) -> str:
"""
Calls Google Gemini via the official python SDK if available; falls back to REST if not.
We DON'T log the API key.
"""
generation_config = generation_config or {"temperature": 0.2, "max_output_tokens": CONTEXT}
try:
# Prefer python SDK (google-generativeai)
import google.generativeai as genai # type: ignore
genai.configure(api_key=api_key)
gm = genai.GenerativeModel(model)
resp = gm.generate_content(prompt, generation_config=generation_config)
# SDK returns .text on success; may carry safety blocks otherwise.
text = getattr(resp, "text", None) or ""
if not text:
# Try to surface blocked / empty output reasons
raise HTTPException(502, "Gemini returned empty response.")
return text.strip()
except ModuleNotFoundError:
# Fallback to REST (models may differ in REST naming, e.g., "models/gemini-1.5-flash")
# We’ll try both forms automatically.
model_names = [model, f"models/{model}"]
last_err = None
for m in model_names:
url = f"https://generativelanguage.googleapis.com/v1beta/{m}:generateContent"
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": generation_config,
}
headers = {"x-goog-api-key": api_key}
try:
async with httpx.AsyncClient(timeout=60) as client:
r = await client.post(url, json=payload, headers=headers)
if r.status_code == 200:
data = r.json()
# Extract first candidate text
candidates = (data.get("candidates") or [])
if not candidates:
raise HTTPException(502, f"Gemini returned no candidates: {data}")
parts = candidates[0].get("content", {}).get("parts", [])
text = "".join(p.get("text", "") for p in parts).strip()
if not text:
raise HTTPException(502, "Gemini returned empty text.")
return text
else:
last_err = HTTPException(r.status_code, f"Gemini error: {r.text}")
except Exception as e:
last_err = e
# If we got here, all attempts failed
raise last_err or HTTPException(502, "Gemini request failed")
async def call_hf_chat(hf_api_key: str, model: str, messages, *, provider: str | None = "auto",
max_tokens: int = 1024, temperature: float = 0.2) -> str:
"""
Uses Hugging Face Inference Providers (OpenAI-compatible chat completions).
"""
client = InferenceClient(api_key=hf_api_key, provider=provider, timeout=120)
resp = client.chat.completions.create(
model=model,
messages=messages, # [{"role":"user","content":"..."}] OR multimodal structure
max_tokens=max_tokens,
temperature=temperature,
stream=False,
)
# hf client returns OpenAI-style response
return resp.choices[0].message["content"].strip()
# ---------- FastAPI ----------
app = FastAPI(title="Voice Agent API", version="0.2.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # tighten for prod
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ChatIn(BaseModel):
question: str = Field(..., min_length=1, description="User question/prompt", examples=["Summarize my projects briefly."])
session_id: Optional[str] = Field(None, examples=["demo-1"])
# Which provider to use — default Gemini
provider: Optional[Literal["gemini", "huggingface"]] = "gemini"
# Optional: model override per provider
model: Optional[str] = Field(
None,
examples=["gemini-1.5-flash", "google/gemma-3-27b-it"]
)
# Per-request API keys (frontend supplies these)
gemini_api_key: Optional[str] = None
hf_api_key: Optional[str] = None
model_config = ConfigDict(json_schema_extra={
"examples": [{
"question": "Give me a one-line intro about yourself.",
"provider": "gemini",
"model": "gemini-1.5-flash",
"gemini_api_key": "YOUR_GEMINI_KEY"
}]
})
class ChatOut(BaseModel):
answer: str
class DebugPromptOut(BaseModel):
length: int
preview: str
@app.get("/")
def root():
return JSONResponse({"ok": True, "message": "M.A.R.S.H.A.L (Mohit’s AgenticAI Representation System for Humanized Assistance and Legacy) Voice Agent API (Gemini / Hugging Face)", "Created By": "Mohit Gupta"})
@app.get("/api/health")
def health():
# No external calls here — just server status & profile presence.
return {
"ok": True,
"profile_loaded": bool(PROFILE_MD),
"default_context_chars": len(PROFILE_MD),
"providers": {
"gemini": "supported",
"huggingface": "supported"
}
}
@app.post("/api/chat", response_model=ChatOut, tags=["Chat"], summary="Ask the agent")
async def chat(
payload: ChatIn,
# optional: accept keys via headers (frontend can send them this way instead of JSON)
x_gemini_api_key: Optional[str] = Header(None),
x_hf_api_key: Optional[str] = Header(None),
authorization: Optional[str] = Header(None), # e.g. "Bearer hf_xxx"
):
question = payload.question.strip()
if not question:
raise HTTPException(400, "Question is required.")
prompt = build_prompt(question)
provider = payload.provider or "gemini"
if provider == "gemini":
model = payload.model or os.getenv("DEFAULT_GEMINI_MODEL", "gemini-1.5-flash")
# choose key from body > header > env
gemini_key = payload.gemini_api_key or x_gemini_api_key or os.getenv("GEMINI_API_KEY")
if not gemini_key:
raise HTTPException(400, "Gemini API key is required (send gemini_api_key or X-Gemini-Api-Key).")
text = await call_gemini(gemini_key, model, prompt)
return ChatOut(answer=text or "Sorry, I didn't catch that.")
elif provider == "huggingface":
model = payload.model or os.getenv("DEFAULT_HF_MODEL", "google/gemma-3-27b-it")
hf_key = payload.hf_api_key or x_hf_api_key or (authorization.split(" ",1)[1].strip() if authorization and authorization.lower().startswith("bearer ") else None) or os.getenv("HF_API_KEY")
if not hf_key:
raise HTTPException(400, "Hugging Face API key is required (send hf_api_key, X-Hf-Api-Key, or Authorization: Bearer).")
messages = [{"role":"user","content": build_prompt(payload.question)}]
text = await call_hf_chat(hf_key, model, messages, provider="auto")
return ChatOut(answer=text or "Sorry, I didn't catch that.")
else:
raise HTTPException(400, f"Unknown provider: {provider}")
# Optional: peek at the exact prompt we send (for debugging)
@app.post("/api/debug/prompt")
def debug_prompt(payload: ChatIn):
p = build_prompt(payload.question or "")
return {"length": len(p), "preview": p}
# --------------------------------------------------------------------
# ------------------------ Analytics (NEW) ----------------------------
# --------------------------------------------------------------------
# Persistent view counter + per-visit log using Upstash Redis (REST).
# No secrets exposed to frontend; FE calls these endpoints only.
UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL", "")
UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN", "")
ANALYTICS_SKIP_BOTS = os.getenv("ANALYTICS_SKIP_BOTS", "true").lower() == "true"
# Redis keys
K_TOTAL = "analytics:visits:count"
K_STREAM = "analytics:visits:stream"
def K_UNIQUE_TODAY(day: str) -> str:
return f"analytics:unique:{day}" # HyperLogLog
BOT_SIGS = ("bot", "crawler", "spider", "facebookexternalhit", "slurp")
class VisitIn(BaseModel):
path: str
referrer: Optional[str] = None
def _assert_upstash_ready():
if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN:
raise HTTPException(503, "Analytics datastore not configured.")
async def _redis_cmd(cmd: list[str]) -> Any:
"""
Call Upstash Redis via /pipeline.
Body must be a JSON array of arrays: [[ "CMD", "arg1", ... ]]
Returns the single command's 'result'.
"""
_assert_upstash_ready()
url = UPSTASH_REDIS_REST_URL.rstrip("/") + "/pipeline"
headers = {
"Authorization": "Bearer " + UPSTASH_REDIS_REST_TOKEN,
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = [cmd] # <-- IMPORTANT: raw array, not {"commands": [cmd]}
async with httpx.AsyncClient(timeout=10) as client:
r = await client.post(url, headers=headers, json=payload)
try:
r.raise_for_status()
except Exception:
logger.error("Upstash error (%s): %s", r.status_code, r.text)
raise HTTPException(status_code=502, detail=f"Upstash error: {r.text}")
data = r.json() # e.g., [{"result":"PONG","status":200}]
if not isinstance(data, list) or not data:
raise HTTPException(502, f"Upstash unexpected response: {data!r}")
entry = data[0]
if "error" in entry:
raise HTTPException(502, f"Upstash error: {entry['error']}")
return entry.get("result")
from fastapi import status
@app.get("/analytics/selftest", tags=["analytics"])
async def analytics_selftest():
"""
Runs a minimal write/read/delete cycle on Upstash to prove connectivity + payload shape.
"""
try:
pong = await _redis_cmd(["PING"])
except Exception as e:
return JSONResponse({"ok": False, "stage": "PING", "error": str(e)}, status_code=status.HTTP_502_BAD_GATEWAY)
try:
await _redis_cmd(["SET", "analytics:selftest", "ok", "PX", "60000"])
val = await _redis_cmd(["GET", "analytics:selftest"])
await _redis_cmd(["DEL", "analytics:selftest"])
except Exception as e:
return JSONResponse({"ok": False, "stage": "SET/GET/DEL", "pong": pong}, status_code=status.HTTP_502_BAD_GATEWAY)
return {"ok": True, "pong": pong, "kv": val}
def _client_ip(request: Request, x_forwarded_for: Optional[str]) -> str:
if x_forwarded_for:
return x_forwarded_for.split(",")[0].strip()
return request.client.host if request.client else "0.0.0.0"
def _hash_ip(ip: str) -> str:
return hashlib.sha256(ip.encode("utf-8")).hexdigest()[:32]
@app.post("/analytics/visit", tags=["analytics"])
async def track_visit(
payload: VisitIn,
request: Request,
user_agent: Optional[str] = Header(None, alias="User-Agent"),
xff: Optional[str] = Header(None, alias="X-Forwarded-For")
):
"""
Track a visit:
- INCR total counter
- PFADD into today's HyperLogLog for uniques
- XADD into a stream with ts/path/ref/ua/ip_hash
"""
# Optional bot filter
if ANALYTICS_SKIP_BOTS and user_agent and any(sig in user_agent.lower() for sig in BOT_SIGS):
return {"ok": True, "skipped": "bot"}
ip = _client_ip(request, xff)
ip_hash = _hash_ip(ip)
now_ms = int(time.time() * 1000)
day = time.strftime("%Y%m%d")
# Increment total
total = await _redis_cmd(["INCR", K_TOTAL])
# Unique per-day
await _redis_cmd(["PFADD", K_UNIQUE_TODAY(day), ip_hash])
# Append to stream (cap with MAXLEN if desired)
fields = {
"ts": str(now_ms),
"path": payload.path or "/",
"ref": payload.referrer or "",
"ua": user_agent or "",
"ip_hash": ip_hash,
}
xs = []
for k, v in fields.items():
xs.extend([k, v])
await _redis_cmd(["XADD", K_STREAM, "*", *xs])
return {"ok": True, "total": int(total)}
@app.get("/analytics/summary", tags=["analytics"])
async def analytics_summary():
"""
Returns:
- total: persistent total views
- unique_today: HyperLogLog-based unique visitors for today
"""
day = time.strftime("%Y%m%d")
total_raw = await _redis_cmd(["GET", K_TOTAL])
total = int(total_raw) if total_raw else 0
unique_today = await _redis_cmd(["PFCOUNT", K_UNIQUE_TODAY(day)])
return {"total": total, "unique_today": int(unique_today)}
@app.get("/analytics/visitors", tags=["analytics"])
async def analytics_visitors(limit: int = 50, cursor: Optional[str] = None):
"""
Page through most recent visitors using XREVRANGE.
- cursor: exclusive upper-bound stream ID for next page
- returns: items[], next_cursor
"""
args = ["XREVRANGE", K_STREAM]
if cursor:
args.extend([cursor, "-"])
else:
args.extend(["+", "-"])
args.extend(["COUNT", str(limit)])
res = await _redis_cmd(args) # [[id, [k1,v1,k2,v2,...]], ...]
items = []
next_cursor = None
for i, row in enumerate(res or []):
_id = row[0]
kv = row[1]
d = dict(zip(kv[0::2], kv[1::2]))
items.append({"id": _id, **d})
if i == len(res) - 1:
next_cursor = _id
return {"items": items, "next_cursor": next_cursor}
# --------------------------------------------------------------------
# ---------------------- End Analytics (NEW) -------------------------
# --------------------------------------------------------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", reload=True)