triflix commited on
Commit
2789e08
·
verified ·
1 Parent(s): dcdb630

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +241 -0
app.py ADDED
@@ -0,0 +1,241 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app.py
2
+ import os
3
+ import io
4
+ import json
5
+ import re
6
+ from datetime import datetime
7
+ from typing import Optional
8
+
9
+ import pytz
10
+ from fastapi import FastAPI, UploadFile, File, Form, Request
11
+ from fastapi.middleware.cors import CORSMiddleware
12
+ from fastapi.responses import JSONResponse, HTMLResponse
13
+ from fastapi.templating import Jinja2Templates
14
+ from PIL import Image
15
+ from google import genai
16
+ from google.genai import types
17
+
18
+ # -----------------------
19
+ # Config
20
+ # -----------------------
21
+ API_KEY = os.environ.get("GENAI_API_KEY", "") # set in env on HF
22
+ MODEL = os.environ.get("GENAI_MODEL", "gemini-2.5-flash")
23
+ client = genai.Client(api_key=API_KEY)
24
+
25
+ # -----------------------
26
+ # FastAPI app
27
+ # -----------------------
28
+ app = FastAPI(title="ICIS Mini-Hub")
29
+
30
+ # Allow all origins for Spaces; tighten in production
31
+ app.add_middleware(
32
+ CORSMiddleware,
33
+ allow_origins=["*"],
34
+ allow_credentials=True,
35
+ allow_methods=["*"],
36
+ allow_headers=["*"],
37
+ )
38
+
39
+ templates = Jinja2Templates(directory="templates")
40
+
41
+ # -----------------------
42
+ # Mini tools (unchanged semantics)
43
+ # -----------------------
44
+ def time_tool(location: str = "UTC") -> dict:
45
+ tz = pytz.timezone("Asia/Kolkata") if location and "india" in location.lower() else pytz.utc
46
+ now = datetime.now(tz)
47
+ return {"date": now.strftime("%Y-%m-%d"), "time_24": now.strftime("%H:%M:%S"), "time_12": now.strftime("%I:%M:%S %p"), "timezone": str(tz)}
48
+
49
+ def date_tool(location: str = "UTC") -> dict:
50
+ tz = pytz.timezone("Asia/Kolkata") if location and "india" in location.lower() else pytz.utc
51
+ now = datetime.now(tz)
52
+ return {"date": now.strftime("%A, %d-%m-%Y"), "timezone": str(tz)}
53
+
54
+ def math_tool(expression: str) -> dict:
55
+ try:
56
+ value = eval(expression, {"__builtins__": None}, {})
57
+ return {"expression": expression, "result": str(value)}
58
+ except Exception:
59
+ return {"expression": expression, "error": "Could not evaluate expression."}
60
+
61
+ def weather_tool(location: str) -> dict:
62
+ return {"location": location, "temperature": 25, "unit": "C", "note": "dummy data; integrate a weather API for real results."}
63
+
64
+ def generate_text(system_instruction: str, content: str) -> str:
65
+ cfg = types.GenerateContentConfig(system_instruction=system_instruction)
66
+ resp = client.models.generate_content(model=MODEL, config=cfg, contents=content)
67
+ return getattr(resp, "text", "").strip()
68
+
69
+ def grounded_search(query: str) -> str:
70
+ grounding_tool = types.Tool(google_search=types.GoogleSearch())
71
+ cfg = types.GenerateContentConfig(tools=[grounding_tool])
72
+ resp = client.models.generate_content(model=MODEL, config=cfg, contents=query)
73
+ return getattr(resp, "text", "").strip()
74
+
75
+ # -----------------------
76
+ # Router & polish (kept logic)
77
+ # -----------------------
78
+ FACTUAL_KEYWORDS = re.compile(
79
+ r"\b(time|date|today|now|what's the time|what is the time|weather|forecast|temperature|convert|calculate|solve|sum|add|subtract|multiply|divide|what is)\b",
80
+ flags=re.I
81
+ )
82
+ MATH_PATTERN = re.compile(r"^[0-9\.\s\+\-\*\/\(\)]+$")
83
+ MATH_KEYWORDS = re.compile(r"\b(calculate|solve|what is|evaluate|sum|add|subtract|multiply|divide)\b", flags=re.I)
84
+
85
+ def decide_tool(user_query: str) -> dict:
86
+ q = user_query.strip().lower()
87
+ if re.search(r"\bhello\b|\bhi\b|\bhey\b|\bgood morning\b|\bgood evening\b", q):
88
+ return {"function_to_use": "chat", "reason": "Greeting detected by rule."}
89
+ if "weather" in q or "forecast" in q or "temperature" in q:
90
+ return {"function_to_use": "weather", "reason": "Weather-related keyword matched."}
91
+ if re.search(r"\bthermostat\b|\bset thermostat\b|\bset temperature\b", q):
92
+ return {"function_to_use": "thermostat", "reason": "Thermostat control intent matched."}
93
+ if "india" in q and re.search(r"\btime\b|\bdate\b|\bnow\b|\bcurrent\b", q):
94
+ if "time" in q:
95
+ return {"function_to_use": "time", "reason": "Explicit 'time' + 'India' matched."}
96
+ if "date" in q:
97
+ return {"function_to_use": "date", "reason": "Explicit 'date' + 'India' matched."}
98
+ if MATH_PATTERN.match(user_query) or (re.search(MATH_KEYWORDS, q) and any(ch.isdigit() for ch in q)):
99
+ return {"function_to_use": "math", "reason": "Math expression or math keywords detected."}
100
+ if re.search(FACTUAL_KEYWORDS, q):
101
+ if "time" in q and "india" not in q:
102
+ return {"function_to_use": "time", "reason": "Time query detected; using deterministic time tool."}
103
+ return {"function_to_use": "search", "reason": "Factual query matched; using grounded search."}
104
+ system_instruction = """
105
+ You are a strict router assistant. Decide exactly one tool for this query and return only valid JSON with keys:
106
+ {"function_to_use": "<one of: chat, search, time, date, math, weather, thermostat, science>", "reason": "short explanation"}
107
+ Do not return anything else.
108
+ """
109
+ try:
110
+ resp = client.models.generate_content(model=MODEL, config=types.GenerateContentConfig(system_instruction=system_instruction), contents=user_query)
111
+ text = getattr(resp, "text", "").strip()
112
+ parsed = json.loads(text)
113
+ if "function_to_use" in parsed:
114
+ return parsed
115
+ except Exception:
116
+ pass
117
+ return {"function_to_use": "chat", "reason": "Default fallback to chat."}
118
+
119
+ def teacher_polish(user_query: str, tool_name: str, tool_output) -> str:
120
+ system_instruction = (
121
+ "You are ICIS AI teacher. Produce a concise (1-3 sentence) explanation in teacher tone.\n"
122
+ "IF the tool_output contains numeric facts (dates, times, numbers), DO NOT change them; only rephrase and add a short real-life example.\n"
123
+ "If the tool_output is an action confirmation (like thermostat status), confirm the action succinctly.\n"
124
+ "Return only the final user-facing text."
125
+ )
126
+ content = f"User query: {user_query}\nTool used: {tool_name}\nTool output: {json.dumps(tool_output, ensure_ascii=False)}"
127
+ return generate_text(system_instruction=system_instruction, content=content)
128
+
129
+ def hub_handle(user_query: str):
130
+ decision = decide_tool(user_query)
131
+ tool_name = decision.get("function_to_use", "chat")
132
+ tool_output = None
133
+ if tool_name == "time":
134
+ loc = "India" if "india" in user_query.lower() else "UTC"
135
+ tool_output = time_tool(location=loc)
136
+ elif tool_name == "date":
137
+ loc = "India" if "india" in user_query.lower() else "UTC"
138
+ tool_output = date_tool(location=loc)
139
+ elif tool_name == "math":
140
+ expr = re.sub(r"[^0-9\.\+\-\*\/\(\)\s]", "", user_query).strip() or user_query
141
+ tool_output = math_tool(expr)
142
+ elif tool_name == "weather":
143
+ m = re.search(r"in ([A-Za-z\s]+)$", user_query, flags=re.I)
144
+ loc = m.group(1).strip() if m else "London"
145
+ tool_output = weather_tool(loc)
146
+ elif tool_name == "thermostat":
147
+ m = re.search(r"(\d+)", user_query)
148
+ temp = int(m.group(1)) if m else 20
149
+ tool_output = {"status": "success", "set_to": temp}
150
+ elif tool_name == "search":
151
+ tool_output_text = grounded_search(user_query)
152
+ tool_output = {"search_text": tool_output_text}
153
+ elif tool_name == "science":
154
+ system_inst = "You are an ICIS science teacher; explain succinctly in 2-3 sentences with a simple example."
155
+ expl = generate_text(system_inst, user_query)
156
+ tool_output = {"explanation": expl}
157
+ else:
158
+ system_inst = "You are a friendly ICIS AI teacher, reply casually and briefly."
159
+ reply = generate_text(system_inst, user_query)
160
+ tool_output = {"reply": reply}
161
+ final = teacher_polish(user_query=user_query, tool_name=tool_name, tool_output=tool_output)
162
+ return {"user_query": user_query, "decision": decision, "tool_output": tool_output, "final_response": final}
163
+
164
+ # -----------------------
165
+ # Helpers: markdown -> plain & concise
166
+ # -----------------------
167
+ def strip_markdown(md: Optional[str]) -> str:
168
+ if not md:
169
+ return ""
170
+ text = str(md)
171
+ text = re.sub(r"```.*?```", "", text, flags=re.S)
172
+ text = re.sub(r"!\[.*?\]\(.*?\)", "", text)
173
+ text = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", text)
174
+ text = re.sub(r"`([^`]*)`", r"\1", text)
175
+ text = re.sub(r"(^|\s)[#>*\-]+\s*", r"\1", text)
176
+ text = re.sub(r"\s+\n", "\n", text)
177
+ text = re.sub(r"\n{2,}", "\n\n", text)
178
+ return text.strip()
179
+
180
+ def concise_text(plain: str, max_sentences: int = 2) -> str:
181
+ if not plain:
182
+ return ""
183
+ parts = re.split(r'(?<=[\.\?\!])\s+', plain.strip())
184
+ if len(parts) <= max_sentences:
185
+ return " ".join([p.strip() for p in parts]).strip()
186
+ return " ".join(p.strip() for p in parts[:max_sentences]).strip()
187
+
188
+ # -----------------------
189
+ # Routes
190
+ # -----------------------
191
+ @app.get("/", response_class=HTMLResponse)
192
+ async def index(request: Request):
193
+ return templates.TemplateResponse("index.html", {"request": request})
194
+
195
+ @app.post("/chat")
196
+ async def chat_endpoint(payload: dict):
197
+ q = payload.get("query") if isinstance(payload, dict) else None
198
+ if not q:
199
+ return JSONResponse(status_code=400, content={"error": "Missing 'query' in JSON payload."})
200
+ out = hub_handle(q)
201
+ final_md = out.get("final_response", "")
202
+ plain = strip_markdown(final_md)
203
+ concise = concise_text(plain, max_sentences=2)
204
+ function_used = out.get("decision", {}).get("function_to_use", "chat")
205
+ return JSONResponse(content={"mode": "chat", "function_used": function_used, "response": concise})
206
+
207
+ @app.post("/analyze_image")
208
+ async def analyze_image(file: UploadFile = File(...), prompt: str = Form(...)):
209
+ # Expect client to send file + prompt deliberately (not on attach)
210
+ ct = (file.content_type or "").lower()
211
+ if not ct.startswith("image/"):
212
+ return JSONResponse(status_code=400, content={"error": "Uploaded file is not an image."})
213
+ image_bytes = await file.read()
214
+ try:
215
+ image = Image.open(io.BytesIO(image_bytes))
216
+ except Exception as e:
217
+ return JSONResponse(status_code=400, content={"error": f"Could not open image: {str(e)}"})
218
+ try:
219
+ response = client.models.generate_content(model=MODEL, contents=[image, prompt])
220
+ text_md = getattr(response, "text", "")
221
+ except Exception as e:
222
+ return JSONResponse(status_code=500, content={"error": f"GenAI image analysis failed: {str(e)}"})
223
+ plain = strip_markdown(text_md)
224
+ concise = concise_text(plain, max_sentences=2)
225
+ return JSONResponse(content={"mode": "image", "response": concise})
226
+
227
+ @app.post("/summarize_pdf")
228
+ async def summarize_pdf(file: UploadFile = File(...), prompt: str = Form(...)):
229
+ ct = (file.content_type or "").lower()
230
+ if ct != "application/pdf":
231
+ return JSONResponse(status_code=400, content={"error": "Uploaded file is not a PDF."})
232
+ data = await file.read()
233
+ try:
234
+ part = types.Part.from_bytes(data=data, mime_type='application/pdf')
235
+ response = client.models.generate_content(model=MODEL, contents=[part, prompt])
236
+ text_md = getattr(response, "text", "")
237
+ except Exception as e:
238
+ return JSONResponse(status_code=500, content={"error": f"GenAI PDF summarization failed: {str(e)}"})
239
+ plain = strip_markdown(text_md)
240
+ concise = concise_text(plain, max_sentences=2)
241
+ return JSONResponse(content={"mode": "pdf", "response": concise})