|
|
|
|
|
import os |
|
|
import io |
|
|
import json |
|
|
import re |
|
|
from datetime import datetime |
|
|
import pytz |
|
|
from typing import Optional |
|
|
|
|
|
from fastapi import FastAPI, UploadFile, File, Form, Request |
|
|
from fastapi.responses import JSONResponse, HTMLResponse |
|
|
from fastapi.templating import Jinja2Templates |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
from google import genai |
|
|
from google.genai import types |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
API_KEY = os.environ.get("GENAI_API_KEY", "AIzaSyCjMsYC-mDTwOr1at1-91EkMwI2O6eOvXg") |
|
|
MODEL = os.environ.get("GENAI_MODEL", "gemini-2.5-flash") |
|
|
client = genai.Client(api_key=API_KEY) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def time_tool(location: str = "UTC") -> dict: |
|
|
if location and "india" in location.lower(): |
|
|
tz = pytz.timezone("Asia/Kolkata") |
|
|
else: |
|
|
tz = pytz.utc |
|
|
now = datetime.now(tz) |
|
|
return { |
|
|
"date": now.strftime("%Y-%m-%d"), |
|
|
"time_24": now.strftime("%H:%M:%S"), |
|
|
"time_12": now.strftime("%I:%M:%S %p"), |
|
|
"timezone": str(tz) |
|
|
} |
|
|
|
|
|
def date_tool(location: str = "UTC") -> dict: |
|
|
if location and "india" in location.lower(): |
|
|
tz = pytz.timezone("Asia/Kolkata") |
|
|
else: |
|
|
tz = pytz.utc |
|
|
now = datetime.now(tz) |
|
|
return {"date": now.strftime("%A, %d-%m-%Y"), "timezone": str(tz)} |
|
|
|
|
|
def math_tool(expression: str) -> dict: |
|
|
try: |
|
|
allowed_names = {} |
|
|
value = eval(expression, {"__builtins__": None}, allowed_names) |
|
|
return {"expression": expression, "result": str(value)} |
|
|
except Exception: |
|
|
return {"expression": expression, "error": "Could not evaluate expression."} |
|
|
|
|
|
def weather_tool(location: str) -> dict: |
|
|
return {"location": location, "temperature": 25, "unit": "C", "note": "dummy data; integrate a weather API for real results."} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_text(system_instruction: str, content: str) -> str: |
|
|
cfg = types.GenerateContentConfig(system_instruction=system_instruction) |
|
|
resp = client.models.generate_content(model=MODEL, config=cfg, contents=content) |
|
|
return getattr(resp, "text", "").strip() |
|
|
|
|
|
def grounded_search(query: str) -> str: |
|
|
grounding_tool = types.Tool(google_search=types.GoogleSearch()) |
|
|
cfg = types.GenerateContentConfig(tools=[grounding_tool]) |
|
|
resp = client.models.generate_content(model=MODEL, config=cfg, contents=query) |
|
|
return getattr(resp, "text", "").strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import re as _re |
|
|
FACTUAL_KEYWORDS = _re.compile( |
|
|
r"\b(time|date|today|now|what's the time|what is the time|weather|forecast|temperature|convert|calculate|solve|sum|add|subtract|multiply|divide|what is)\b", |
|
|
flags=_re.I |
|
|
) |
|
|
MATH_PATTERN = _re.compile(r"^[0-9\.\s\+\-\*\/\(\)]+$") |
|
|
MATH_KEYWORDS = _re.compile(r"\b(calculate|solve|what is|evaluate|sum|add|subtract|multiply|divide)\b", flags=_re.I) |
|
|
|
|
|
def decide_tool(user_query: str) -> dict: |
|
|
q = user_query.strip().lower() |
|
|
if _re.search(r"\bhello\b|\bhi\b|\bhey\b|\bgood morning\b|\bgood evening\b", q): |
|
|
return {"function_to_use": "chat", "reason": "Greeting detected by rule."} |
|
|
if "weather" in q or "forecast" in q or "temperature" in q: |
|
|
return {"function_to_use": "weather", "reason": "Weather-related keyword matched."} |
|
|
if _re.search(r"\bthermostat\b|\bset thermostat\b|\bset temperature\b", q): |
|
|
return {"function_to_use": "thermostat", "reason": "Thermostat control intent matched."} |
|
|
if "india" in q and _re.search(r"\btime\b|\bdate\b|\bnow\b|\bcurrent\b", q): |
|
|
if "time" in q: |
|
|
return {"function_to_use": "time", "reason": "Explicit 'time' + 'India' matched."} |
|
|
if "date" in q: |
|
|
return {"function_to_use": "date", "reason": "Explicit 'date' + 'India' matched."} |
|
|
if MATH_PATTERN.match(user_query) or (_re.search(MATH_KEYWORDS, q) and any(ch.isdigit() for ch in q)): |
|
|
return {"function_to_use": "math", "reason": "Math expression or math keywords detected."} |
|
|
if _re.search(FACTUAL_KEYWORDS, q): |
|
|
if "time" in q and "india" not in q: |
|
|
return {"function_to_use": "time", "reason": "Time query detected; using deterministic time tool."} |
|
|
return {"function_to_use": "search", "reason": "Factual query matched; using grounded search."} |
|
|
system_instruction = """ |
|
|
You are a strict router assistant. Decide exactly one tool for this query and return only valid JSON with keys: |
|
|
{"function_to_use": "<one of: chat, search, time, date, math, weather, thermostat, science>", "reason": "short explanation"} |
|
|
Do not return anything else. |
|
|
""" |
|
|
try: |
|
|
resp = client.models.generate_content(model=MODEL, config=types.GenerateContentConfig(system_instruction=system_instruction), contents=user_query) |
|
|
text = getattr(resp, "text", "").strip() |
|
|
parsed = json.loads(text) |
|
|
if "function_to_use" in parsed: |
|
|
return parsed |
|
|
except Exception: |
|
|
pass |
|
|
return {"function_to_use": "chat", "reason": "Default fallback to chat."} |
|
|
|
|
|
def teacher_polish(user_query: str, tool_name: str, tool_output) -> str: |
|
|
system_instruction = ( |
|
|
"You are ICIS AI teacher. Produce a concise (1-3 sentence) explanation in teacher tone.\n" |
|
|
"IF the tool_output contains numeric facts (dates, times, numbers), DO NOT change them; only rephrase and add a short real-life example.\n" |
|
|
"If the tool_output is an action confirmation (like thermostat status), confirm the action succinctly.\n" |
|
|
"Return only the final user-facing text." |
|
|
) |
|
|
content = f"User query: {user_query}\nTool used: {tool_name}\nTool output: {json.dumps(tool_output, ensure_ascii=False)}" |
|
|
return generate_text(system_instruction=system_instruction, content=content) |
|
|
|
|
|
def hub_handle(user_query: str): |
|
|
decision = decide_tool(user_query) |
|
|
tool_name = decision.get("function_to_use", "chat") |
|
|
tool_output = None |
|
|
if tool_name == "time": |
|
|
loc = "India" if "india" in user_query.lower() else "UTC" |
|
|
tool_output = time_tool(location=loc) |
|
|
elif tool_name == "date": |
|
|
loc = "India" if "india" in user_query.lower() else "UTC" |
|
|
tool_output = date_tool(location=loc) |
|
|
elif tool_name == "math": |
|
|
expr = _re.sub(r"[^0-9\.\+\-\*\/\(\)\s]", "", user_query).strip() or user_query |
|
|
tool_output = math_tool(expr) |
|
|
elif tool_name == "weather": |
|
|
m = _re.search(r"in ([A-Za-z\s]+)$", user_query, flags=_re.I) |
|
|
loc = m.group(1).strip() if m else "London" |
|
|
tool_output = weather_tool(loc) |
|
|
elif tool_name == "thermostat": |
|
|
m = _re.search(r"(\d+)", user_query) |
|
|
temp = int(m.group(1)) if m else 20 |
|
|
tool_output = {"status": "success", "set_to": temp} |
|
|
elif tool_name == "search": |
|
|
tool_output_text = grounded_search(user_query) |
|
|
tool_output = {"search_text": tool_output_text} |
|
|
elif tool_name == "science": |
|
|
system_inst = "You are an ICIS science teacher; explain succinctly in 2-3 sentences with a simple example." |
|
|
expl = generate_text(system_inst, user_query) |
|
|
tool_output = {"explanation": expl} |
|
|
else: |
|
|
system_inst = "You are a friendly ICIS AI teacher, reply casually and briefly." |
|
|
reply = generate_text(system_inst, user_query) |
|
|
tool_output = {"reply": reply} |
|
|
final = teacher_polish(user_query=user_query, tool_name=tool_name, tool_output=tool_output) |
|
|
return { |
|
|
"user_query": user_query, |
|
|
"decision": decision, |
|
|
"tool_output": tool_output, |
|
|
"final_response": final |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def strip_markdown(md: Optional[str]) -> str: |
|
|
if not md: |
|
|
return "" |
|
|
text = str(md) |
|
|
|
|
|
text = re.sub(r"```.*?```", "", text, flags=re.S) |
|
|
|
|
|
text = re.sub(r"!\[.*?\]\(.*?\)", "", text) |
|
|
|
|
|
text = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", text) |
|
|
|
|
|
text = re.sub(r"`([^`]*)`", r"\1", text) |
|
|
|
|
|
text = re.sub(r"(^|\s)[#>*\-]+\s*", r"\1", text) |
|
|
|
|
|
text = re.sub(r"\s+\n", "\n", text) |
|
|
text = re.sub(r"\n{2,}", "\n\n", text) |
|
|
text = text.strip() |
|
|
return text |
|
|
|
|
|
def concise_text(plain: str, max_sentences: int = 2) -> str: |
|
|
if not plain: |
|
|
return "" |
|
|
|
|
|
parts = re.split(r'(?<=[\.\?\!])\s+', plain.strip()) |
|
|
if len(parts) <= max_sentences: |
|
|
return " ".join([p.strip() for p in parts]).strip() |
|
|
return " ".join(p.strip() for p in parts[:max_sentences]).strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI(title="ICIS Mini-Hub") |
|
|
|
|
|
templates = Jinja2Templates(directory="templates") |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def index(request: Request): |
|
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
|
|
@app.post("/chat") |
|
|
async def chat_endpoint(payload: dict): |
|
|
q = payload.get("query") if isinstance(payload, dict) else None |
|
|
if not q: |
|
|
return JSONResponse(status_code=400, content={"error": "Missing 'query' in JSON payload."}) |
|
|
out = hub_handle(q) |
|
|
|
|
|
final_md = out.get("final_response", "") |
|
|
plain = strip_markdown(final_md) |
|
|
concise = concise_text(plain, max_sentences=2) |
|
|
function_used = out.get("decision", {}).get("function_to_use", "chat") |
|
|
return JSONResponse(content={ |
|
|
"function_used": function_used, |
|
|
"response": concise |
|
|
}) |
|
|
|
|
|
@app.post("/analyze_image") |
|
|
async def analyze_image(file: UploadFile = File(...), prompt: str = Form(...)): |
|
|
|
|
|
content_type = file.content_type or "" |
|
|
if not content_type.startswith("image/"): |
|
|
return JSONResponse(status_code=400, content={"error": "Uploaded file is not an image."}) |
|
|
image_bytes = await file.read() |
|
|
try: |
|
|
image = Image.open(io.BytesIO(image_bytes)) |
|
|
except Exception: |
|
|
return JSONResponse(status_code=400, content={"error": "Could not open image."}) |
|
|
|
|
|
try: |
|
|
response = client.models.generate_content(model=MODEL, contents=[image, prompt]) |
|
|
text_md = getattr(response, "text", "") |
|
|
except Exception as e: |
|
|
return JSONResponse(status_code=500, content={"error": f"GenAI image analysis failed: {str(e)}"}) |
|
|
plain = strip_markdown(text_md) |
|
|
concise = concise_text(plain, max_sentences=2) |
|
|
return JSONResponse(content={"mode": "image", "response": concise}) |
|
|
|
|
|
@app.post("/summarize_pdf") |
|
|
async def summarize_pdf(file: UploadFile = File(...), prompt: str = Form(...)): |
|
|
ct = file.content_type or "" |
|
|
if ct != "application/pdf": |
|
|
return JSONResponse(status_code=400, content={"error": "Uploaded file is not a PDF."}) |
|
|
data = await file.read() |
|
|
try: |
|
|
part = types.Part.from_bytes(data=data, mime_type='application/pdf') |
|
|
response = client.models.generate_content(model=MODEL, contents=[part, prompt]) |
|
|
text_md = getattr(response, "text", "") |
|
|
except Exception as e: |
|
|
return JSONResponse(status_code=500, content={"error": f"GenAI PDF summarization failed: {str(e)}"}) |
|
|
plain = strip_markdown(text_md) |
|
|
concise = concise_text(plain, max_sentences=2) |
|
|
return JSONResponse(content={"mode": "pdf", "response": concise}) |
|
|
|