|
|
|
|
|
import os |
|
|
import logging |
|
|
from typing import Optional, Literal, Dict, Any |
|
|
|
|
|
from fastapi import FastAPI, HTTPException, Header, Request |
|
|
from fastapi.responses import JSONResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from pydantic import BaseModel, Field, ConfigDict |
|
|
import httpx |
|
|
from huggingface_hub import InferenceClient |
|
|
from datetime import date |
|
|
import time |
|
|
import hashlib |
|
|
|
|
|
from Constants import CONTEXT |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger("voice-agent") |
|
|
|
|
|
PROFILE_MD_PATH = os.path.join("Data", "profile_data.md") |
|
|
|
|
|
def load_profile_md() -> str: |
|
|
if os.path.exists(PROFILE_MD_PATH): |
|
|
with open(PROFILE_MD_PATH, "r", encoding="utf-8") as f: |
|
|
return f.read() |
|
|
return "" |
|
|
|
|
|
PROFILE_MD = load_profile_md() |
|
|
TODAY_DATE = date.today() |
|
|
|
|
|
def build_prompt(question: str) -> str: |
|
|
"""Single-message prompt so it works reliably across providers.""" |
|
|
return f""" |
|
|
You are Mohit Gupta's AI voice twin, built to represent him in interviews and Q&A sessions. |
|
|
You should answer as if you are Mohit Gupta, maintaining a professional yet approachable tone. |
|
|
Date Context: Today’s date is {TODAY_DATE}. You may use this date to calculate ages, durations, or the number of years since/past any mentioned event. |
|
|
|
|
|
Your job is to answer truthfully, factually, and in a friendly but professional tone using the context provided. |
|
|
The context is formatted in Markdown with sections (e.g., # About Me, ## Projects, ### Features). |
|
|
Use these sections to give structured and relevant answers. |
|
|
Do not invent details not present in the context. |
|
|
If asked about something outside this context, politely clarify you donot know about that. |
|
|
Where helpful, structure answers with clear points or examples explaining the usecase of project or explaining projects. |
|
|
You can explain any technical things the interviewer asked like how you used the FastAPI etc. |
|
|
|
|
|
### Guidelines: |
|
|
- Answer concisely but include specific details when relevant (projects, metrics, tech stack). |
|
|
- If multiple related sections exist, combine their info naturally. |
|
|
- Do not repeat the entire context; summarize what is relevant to the question. |
|
|
- Maintain first-person voice (“I have worked on…”) as you are representing Mohit Gupta. |
|
|
- If you get any link in context that is related to the query then must add that in response. |
|
|
- make sure this donot feel that you are mimicking Mohit. Make sure that interviewer will feel that you are Mohit Gupta not as AI. |
|
|
- If asked about a project that is in context (e.g., Fashion Sense AI), explain only using the details provided. |
|
|
- Never substitute with unrelated projects or links (e.g., Corn-Vomitoxin in Fashion Sense AI Project) unless directly asked. |
|
|
- Always output them as plain clickable URLs (e.g., https://example.com). |
|
|
- Do NOT use Markdown link syntax like [label](url). |
|
|
- If multiple links are relevant, list them on separate lines under "Links:". |
|
|
- Always Include link about which you are giving in response. |
|
|
- If you donot know about anything then just say I am a quick learner. I can learn about that. But everything given in the context you should explain that. |
|
|
|
|
|
### Context about Mohit (Markdown format): |
|
|
{PROFILE_MD} |
|
|
|
|
|
### Task |
|
|
Answer the question using ONLY the context above but you can explain technical things related to context only. |
|
|
|
|
|
### Question |
|
|
{question} |
|
|
|
|
|
### Answer |
|
|
""".strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def call_gemini( |
|
|
api_key: str, |
|
|
model: str, |
|
|
prompt: str, |
|
|
generation_config: Optional[Dict[str, Any]] = None |
|
|
) -> str: |
|
|
""" |
|
|
Calls Google Gemini via the official python SDK if available; falls back to REST if not. |
|
|
We DON'T log the API key. |
|
|
""" |
|
|
generation_config = generation_config or {"temperature": 0.2, "max_output_tokens": CONTEXT} |
|
|
|
|
|
try: |
|
|
|
|
|
import google.generativeai as genai |
|
|
genai.configure(api_key=api_key) |
|
|
gm = genai.GenerativeModel(model) |
|
|
resp = gm.generate_content(prompt, generation_config=generation_config) |
|
|
|
|
|
text = getattr(resp, "text", None) or "" |
|
|
if not text: |
|
|
|
|
|
raise HTTPException(502, "Gemini returned empty response.") |
|
|
return text.strip() |
|
|
except ModuleNotFoundError: |
|
|
|
|
|
|
|
|
model_names = [model, f"models/{model}"] |
|
|
last_err = None |
|
|
for m in model_names: |
|
|
url = f"https://generativelanguage.googleapis.com/v1beta/{m}:generateContent" |
|
|
payload = { |
|
|
"contents": [{"parts": [{"text": prompt}]}], |
|
|
"generationConfig": generation_config, |
|
|
} |
|
|
headers = {"x-goog-api-key": api_key} |
|
|
try: |
|
|
async with httpx.AsyncClient(timeout=60) as client: |
|
|
r = await client.post(url, json=payload, headers=headers) |
|
|
if r.status_code == 200: |
|
|
data = r.json() |
|
|
|
|
|
candidates = (data.get("candidates") or []) |
|
|
if not candidates: |
|
|
raise HTTPException(502, f"Gemini returned no candidates: {data}") |
|
|
parts = candidates[0].get("content", {}).get("parts", []) |
|
|
text = "".join(p.get("text", "") for p in parts).strip() |
|
|
if not text: |
|
|
raise HTTPException(502, "Gemini returned empty text.") |
|
|
return text |
|
|
else: |
|
|
last_err = HTTPException(r.status_code, f"Gemini error: {r.text}") |
|
|
except Exception as e: |
|
|
last_err = e |
|
|
|
|
|
raise last_err or HTTPException(502, "Gemini request failed") |
|
|
|
|
|
async def call_hf_chat(hf_api_key: str, model: str, messages, *, provider: str | None = "auto", |
|
|
max_tokens: int = 1024, temperature: float = 0.2) -> str: |
|
|
""" |
|
|
Uses Hugging Face Inference Providers (OpenAI-compatible chat completions). |
|
|
""" |
|
|
client = InferenceClient(api_key=hf_api_key, provider=provider, timeout=120) |
|
|
resp = client.chat.completions.create( |
|
|
model=model, |
|
|
messages=messages, |
|
|
max_tokens=max_tokens, |
|
|
temperature=temperature, |
|
|
stream=False, |
|
|
) |
|
|
|
|
|
return resp.choices[0].message["content"].strip() |
|
|
|
|
|
|
|
|
app = FastAPI(title="Voice Agent API", version="0.2.0") |
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
class ChatIn(BaseModel): |
|
|
question: str = Field(..., min_length=1, description="User question/prompt", examples=["Summarize my projects briefly."]) |
|
|
session_id: Optional[str] = Field(None, examples=["demo-1"]) |
|
|
|
|
|
provider: Optional[Literal["gemini", "huggingface"]] = "gemini" |
|
|
|
|
|
model: Optional[str] = Field( |
|
|
None, |
|
|
examples=["gemini-1.5-flash", "google/gemma-3-27b-it"] |
|
|
) |
|
|
|
|
|
gemini_api_key: Optional[str] = None |
|
|
hf_api_key: Optional[str] = None |
|
|
|
|
|
model_config = ConfigDict(json_schema_extra={ |
|
|
"examples": [{ |
|
|
"question": "Give me a one-line intro about yourself.", |
|
|
"provider": "gemini", |
|
|
"model": "gemini-1.5-flash", |
|
|
"gemini_api_key": "YOUR_GEMINI_KEY" |
|
|
}] |
|
|
}) |
|
|
|
|
|
class ChatOut(BaseModel): |
|
|
answer: str |
|
|
|
|
|
class DebugPromptOut(BaseModel): |
|
|
length: int |
|
|
preview: str |
|
|
|
|
|
@app.get("/") |
|
|
def root(): |
|
|
return JSONResponse({"ok": True, "message": "M.A.R.S.H.A.L (Mohit’s AgenticAI Representation System for Humanized Assistance and Legacy) Voice Agent API (Gemini / Hugging Face)", "Created By": "Mohit Gupta"}) |
|
|
|
|
|
@app.get("/api/health") |
|
|
def health(): |
|
|
|
|
|
return { |
|
|
"ok": True, |
|
|
"profile_loaded": bool(PROFILE_MD), |
|
|
"default_context_chars": len(PROFILE_MD), |
|
|
"providers": { |
|
|
"gemini": "supported", |
|
|
"huggingface": "supported" |
|
|
} |
|
|
} |
|
|
|
|
|
@app.post("/api/chat", response_model=ChatOut, tags=["Chat"], summary="Ask the agent") |
|
|
async def chat( |
|
|
payload: ChatIn, |
|
|
|
|
|
x_gemini_api_key: Optional[str] = Header(None), |
|
|
x_hf_api_key: Optional[str] = Header(None), |
|
|
authorization: Optional[str] = Header(None), |
|
|
): |
|
|
question = payload.question.strip() |
|
|
if not question: |
|
|
raise HTTPException(400, "Question is required.") |
|
|
|
|
|
prompt = build_prompt(question) |
|
|
|
|
|
provider = payload.provider or "gemini" |
|
|
if provider == "gemini": |
|
|
model = payload.model or os.getenv("DEFAULT_GEMINI_MODEL", "gemini-1.5-flash") |
|
|
|
|
|
gemini_key = payload.gemini_api_key or x_gemini_api_key or os.getenv("GEMINI_API_KEY") |
|
|
if not gemini_key: |
|
|
raise HTTPException(400, "Gemini API key is required (send gemini_api_key or X-Gemini-Api-Key).") |
|
|
text = await call_gemini(gemini_key, model, prompt) |
|
|
return ChatOut(answer=text or "Sorry, I didn't catch that.") |
|
|
elif provider == "huggingface": |
|
|
model = payload.model or os.getenv("DEFAULT_HF_MODEL", "google/gemma-3-27b-it") |
|
|
hf_key = payload.hf_api_key or x_hf_api_key or (authorization.split(" ",1)[1].strip() if authorization and authorization.lower().startswith("bearer ") else None) or os.getenv("HF_API_KEY") |
|
|
if not hf_key: |
|
|
raise HTTPException(400, "Hugging Face API key is required (send hf_api_key, X-Hf-Api-Key, or Authorization: Bearer).") |
|
|
|
|
|
messages = [{"role":"user","content": build_prompt(payload.question)}] |
|
|
text = await call_hf_chat(hf_key, model, messages, provider="auto") |
|
|
return ChatOut(answer=text or "Sorry, I didn't catch that.") |
|
|
|
|
|
else: |
|
|
raise HTTPException(400, f"Unknown provider: {provider}") |
|
|
|
|
|
|
|
|
@app.post("/api/debug/prompt") |
|
|
def debug_prompt(payload: ChatIn): |
|
|
p = build_prompt(payload.question or "") |
|
|
return {"length": len(p), "preview": p} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL", "") |
|
|
UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN", "") |
|
|
ANALYTICS_SKIP_BOTS = os.getenv("ANALYTICS_SKIP_BOTS", "true").lower() == "true" |
|
|
|
|
|
|
|
|
K_TOTAL = "analytics:visits:count" |
|
|
K_STREAM = "analytics:visits:stream" |
|
|
def K_UNIQUE_TODAY(day: str) -> str: |
|
|
return f"analytics:unique:{day}" |
|
|
|
|
|
BOT_SIGS = ("bot", "crawler", "spider", "facebookexternalhit", "slurp") |
|
|
|
|
|
class VisitIn(BaseModel): |
|
|
path: str |
|
|
referrer: Optional[str] = None |
|
|
|
|
|
def _assert_upstash_ready(): |
|
|
if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN: |
|
|
raise HTTPException(503, "Analytics datastore not configured.") |
|
|
|
|
|
async def _redis_cmd(cmd: list[str]) -> Any: |
|
|
""" |
|
|
Call Upstash Redis via /pipeline. |
|
|
Body must be a JSON array of arrays: [[ "CMD", "arg1", ... ]] |
|
|
Returns the single command's 'result'. |
|
|
""" |
|
|
_assert_upstash_ready() |
|
|
url = UPSTASH_REDIS_REST_URL.rstrip("/") + "/pipeline" |
|
|
headers = { |
|
|
"Authorization": "Bearer " + UPSTASH_REDIS_REST_TOKEN, |
|
|
"Content-Type": "application/json", |
|
|
"Accept": "application/json", |
|
|
} |
|
|
payload = [cmd] |
|
|
async with httpx.AsyncClient(timeout=10) as client: |
|
|
r = await client.post(url, headers=headers, json=payload) |
|
|
try: |
|
|
r.raise_for_status() |
|
|
except Exception: |
|
|
logger.error("Upstash error (%s): %s", r.status_code, r.text) |
|
|
raise HTTPException(status_code=502, detail=f"Upstash error: {r.text}") |
|
|
|
|
|
data = r.json() |
|
|
if not isinstance(data, list) or not data: |
|
|
raise HTTPException(502, f"Upstash unexpected response: {data!r}") |
|
|
entry = data[0] |
|
|
if "error" in entry: |
|
|
raise HTTPException(502, f"Upstash error: {entry['error']}") |
|
|
return entry.get("result") |
|
|
|
|
|
from fastapi import status |
|
|
|
|
|
@app.get("/analytics/selftest", tags=["analytics"]) |
|
|
async def analytics_selftest(): |
|
|
""" |
|
|
Runs a minimal write/read/delete cycle on Upstash to prove connectivity + payload shape. |
|
|
""" |
|
|
try: |
|
|
pong = await _redis_cmd(["PING"]) |
|
|
except Exception as e: |
|
|
return JSONResponse({"ok": False, "stage": "PING", "error": str(e)}, status_code=status.HTTP_502_BAD_GATEWAY) |
|
|
|
|
|
try: |
|
|
await _redis_cmd(["SET", "analytics:selftest", "ok", "PX", "60000"]) |
|
|
val = await _redis_cmd(["GET", "analytics:selftest"]) |
|
|
await _redis_cmd(["DEL", "analytics:selftest"]) |
|
|
except Exception as e: |
|
|
return JSONResponse({"ok": False, "stage": "SET/GET/DEL", "pong": pong}, status_code=status.HTTP_502_BAD_GATEWAY) |
|
|
|
|
|
return {"ok": True, "pong": pong, "kv": val} |
|
|
|
|
|
def _client_ip(request: Request, x_forwarded_for: Optional[str]) -> str: |
|
|
if x_forwarded_for: |
|
|
return x_forwarded_for.split(",")[0].strip() |
|
|
return request.client.host if request.client else "0.0.0.0" |
|
|
|
|
|
def _hash_ip(ip: str) -> str: |
|
|
return hashlib.sha256(ip.encode("utf-8")).hexdigest()[:32] |
|
|
|
|
|
@app.post("/analytics/visit", tags=["analytics"]) |
|
|
async def track_visit( |
|
|
payload: VisitIn, |
|
|
request: Request, |
|
|
user_agent: Optional[str] = Header(None, alias="User-Agent"), |
|
|
xff: Optional[str] = Header(None, alias="X-Forwarded-For") |
|
|
): |
|
|
""" |
|
|
Track a visit: |
|
|
- INCR total counter |
|
|
- PFADD into today's HyperLogLog for uniques |
|
|
- XADD into a stream with ts/path/ref/ua/ip_hash |
|
|
""" |
|
|
|
|
|
if ANALYTICS_SKIP_BOTS and user_agent and any(sig in user_agent.lower() for sig in BOT_SIGS): |
|
|
return {"ok": True, "skipped": "bot"} |
|
|
|
|
|
ip = _client_ip(request, xff) |
|
|
ip_hash = _hash_ip(ip) |
|
|
now_ms = int(time.time() * 1000) |
|
|
day = time.strftime("%Y%m%d") |
|
|
|
|
|
|
|
|
total = await _redis_cmd(["INCR", K_TOTAL]) |
|
|
|
|
|
|
|
|
await _redis_cmd(["PFADD", K_UNIQUE_TODAY(day), ip_hash]) |
|
|
|
|
|
|
|
|
fields = { |
|
|
"ts": str(now_ms), |
|
|
"path": payload.path or "/", |
|
|
"ref": payload.referrer or "", |
|
|
"ua": user_agent or "", |
|
|
"ip_hash": ip_hash, |
|
|
} |
|
|
xs = [] |
|
|
for k, v in fields.items(): |
|
|
xs.extend([k, v]) |
|
|
await _redis_cmd(["XADD", K_STREAM, "*", *xs]) |
|
|
|
|
|
return {"ok": True, "total": int(total)} |
|
|
|
|
|
@app.get("/analytics/summary", tags=["analytics"]) |
|
|
async def analytics_summary(): |
|
|
""" |
|
|
Returns: |
|
|
- total: persistent total views |
|
|
- unique_today: HyperLogLog-based unique visitors for today |
|
|
""" |
|
|
day = time.strftime("%Y%m%d") |
|
|
total_raw = await _redis_cmd(["GET", K_TOTAL]) |
|
|
total = int(total_raw) if total_raw else 0 |
|
|
unique_today = await _redis_cmd(["PFCOUNT", K_UNIQUE_TODAY(day)]) |
|
|
return {"total": total, "unique_today": int(unique_today)} |
|
|
|
|
|
@app.get("/analytics/visitors", tags=["analytics"]) |
|
|
async def analytics_visitors(limit: int = 50, cursor: Optional[str] = None): |
|
|
""" |
|
|
Page through most recent visitors using XREVRANGE. |
|
|
- cursor: exclusive upper-bound stream ID for next page |
|
|
- returns: items[], next_cursor |
|
|
""" |
|
|
args = ["XREVRANGE", K_STREAM] |
|
|
if cursor: |
|
|
args.extend([cursor, "-"]) |
|
|
else: |
|
|
args.extend(["+", "-"]) |
|
|
args.extend(["COUNT", str(limit)]) |
|
|
|
|
|
res = await _redis_cmd(args) |
|
|
items = [] |
|
|
next_cursor = None |
|
|
for i, row in enumerate(res or []): |
|
|
_id = row[0] |
|
|
kv = row[1] |
|
|
d = dict(zip(kv[0::2], kv[1::2])) |
|
|
items.append({"id": _id, **d}) |
|
|
if i == len(res) - 1: |
|
|
next_cursor = _id |
|
|
return {"items": items, "next_cursor": next_cursor} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run("app:app", reload=True) |