triflix commited on
Commit
76b068b
·
verified ·
1 Parent(s): 7337ff4

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +264 -0
app.py ADDED
@@ -0,0 +1,264 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app.py
2
+ import os
3
+ import io
4
+ import json
5
+ import re
6
+ from datetime import datetime
7
+ import pytz
8
+ from typing import Optional
9
+
10
+ from fastapi import FastAPI, UploadFile, File, Form, Request
11
+ from fastapi.responses import JSONResponse, HTMLResponse
12
+ from fastapi.templating import Jinja2Templates
13
+ from PIL import Image
14
+
15
+ # google-genai
16
+ from google import genai
17
+ from google.genai import types
18
+
19
+ # =====================================================
20
+ # CONFIG
21
+ # =====================================================
22
+ API_KEY = os.environ.get("GENAI_API_KEY", "AIzaSyCjMsYC-mDTwOr1at1-91EkMwI2O6eOvXg")
23
+ MODEL = os.environ.get("GENAI_MODEL", "gemini-2.5-flash")
24
+ client = genai.Client(api_key=API_KEY)
25
+
26
+ # =====================================================
27
+ # MINI-AI TOOL FUNCTIONS (unchanged behavior)
28
+ # =====================================================
29
+ def time_tool(location: str = "UTC") -> dict:
30
+ if location and "india" in location.lower():
31
+ tz = pytz.timezone("Asia/Kolkata")
32
+ else:
33
+ tz = pytz.utc
34
+ now = datetime.now(tz)
35
+ return {
36
+ "date": now.strftime("%Y-%m-%d"),
37
+ "time_24": now.strftime("%H:%M:%S"),
38
+ "time_12": now.strftime("%I:%M:%S %p"),
39
+ "timezone": str(tz)
40
+ }
41
+
42
+ def date_tool(location: str = "UTC") -> dict:
43
+ if location and "india" in location.lower():
44
+ tz = pytz.timezone("Asia/Kolkata")
45
+ else:
46
+ tz = pytz.utc
47
+ now = datetime.now(tz)
48
+ return {"date": now.strftime("%A, %d-%m-%Y"), "timezone": str(tz)}
49
+
50
+ def math_tool(expression: str) -> dict:
51
+ try:
52
+ allowed_names = {}
53
+ value = eval(expression, {"__builtins__": None}, allowed_names)
54
+ return {"expression": expression, "result": str(value)}
55
+ except Exception:
56
+ return {"expression": expression, "error": "Could not evaluate expression."}
57
+
58
+ def weather_tool(location: str) -> dict:
59
+ return {"location": location, "temperature": 25, "unit": "C", "note": "dummy data; integrate a weather API for real results."}
60
+
61
+ # =====================================================
62
+ # LLM wrappers
63
+ # =====================================================
64
+ def generate_text(system_instruction: str, content: str) -> str:
65
+ cfg = types.GenerateContentConfig(system_instruction=system_instruction)
66
+ resp = client.models.generate_content(model=MODEL, config=cfg, contents=content)
67
+ return getattr(resp, "text", "").strip()
68
+
69
+ def grounded_search(query: str) -> str:
70
+ grounding_tool = types.Tool(google_search=types.GoogleSearch())
71
+ cfg = types.GenerateContentConfig(tools=[grounding_tool])
72
+ resp = client.models.generate_content(model=MODEL, config=cfg, contents=query)
73
+ return getattr(resp, "text", "").strip()
74
+
75
+ # =====================================================
76
+ # Router logic (kept same as your code)
77
+ # =====================================================
78
+ import re as _re
79
+ FACTUAL_KEYWORDS = _re.compile(
80
+ r"\b(time|date|today|now|what's the time|what is the time|weather|forecast|temperature|convert|calculate|solve|sum|add|subtract|multiply|divide|what is)\b",
81
+ flags=_re.I
82
+ )
83
+ MATH_PATTERN = _re.compile(r"^[0-9\.\s\+\-\*\/\(\)]+$")
84
+ MATH_KEYWORDS = _re.compile(r"\b(calculate|solve|what is|evaluate|sum|add|subtract|multiply|divide)\b", flags=_re.I)
85
+
86
+ def decide_tool(user_query: str) -> dict:
87
+ q = user_query.strip().lower()
88
+ if _re.search(r"\bhello\b|\bhi\b|\bhey\b|\bgood morning\b|\bgood evening\b", q):
89
+ return {"function_to_use": "chat", "reason": "Greeting detected by rule."}
90
+ if "weather" in q or "forecast" in q or "temperature" in q:
91
+ return {"function_to_use": "weather", "reason": "Weather-related keyword matched."}
92
+ if _re.search(r"\bthermostat\b|\bset thermostat\b|\bset temperature\b", q):
93
+ return {"function_to_use": "thermostat", "reason": "Thermostat control intent matched."}
94
+ if "india" in q and _re.search(r"\btime\b|\bdate\b|\bnow\b|\bcurrent\b", q):
95
+ if "time" in q:
96
+ return {"function_to_use": "time", "reason": "Explicit 'time' + 'India' matched."}
97
+ if "date" in q:
98
+ return {"function_to_use": "date", "reason": "Explicit 'date' + 'India' matched."}
99
+ if MATH_PATTERN.match(user_query) or (_re.search(MATH_KEYWORDS, q) and any(ch.isdigit() for ch in q)):
100
+ return {"function_to_use": "math", "reason": "Math expression or math keywords detected."}
101
+ if _re.search(FACTUAL_KEYWORDS, q):
102
+ if "time" in q and "india" not in q:
103
+ return {"function_to_use": "time", "reason": "Time query detected; using deterministic time tool."}
104
+ return {"function_to_use": "search", "reason": "Factual query matched; using grounded search."}
105
+ system_instruction = """
106
+ You are a strict router assistant. Decide exactly one tool for this query and return only valid JSON with keys:
107
+ {"function_to_use": "<one of: chat, search, time, date, math, weather, thermostat, science>", "reason": "short explanation"}
108
+ Do not return anything else.
109
+ """
110
+ try:
111
+ resp = client.models.generate_content(model=MODEL, config=types.GenerateContentConfig(system_instruction=system_instruction), contents=user_query)
112
+ text = getattr(resp, "text", "").strip()
113
+ parsed = json.loads(text)
114
+ if "function_to_use" in parsed:
115
+ return parsed
116
+ except Exception:
117
+ pass
118
+ return {"function_to_use": "chat", "reason": "Default fallback to chat."}
119
+
120
+ def teacher_polish(user_query: str, tool_name: str, tool_output) -> str:
121
+ system_instruction = (
122
+ "You are ICIS AI teacher. Produce a concise (1-3 sentence) explanation in teacher tone.\n"
123
+ "IF the tool_output contains numeric facts (dates, times, numbers), DO NOT change them; only rephrase and add a short real-life example.\n"
124
+ "If the tool_output is an action confirmation (like thermostat status), confirm the action succinctly.\n"
125
+ "Return only the final user-facing text."
126
+ )
127
+ content = f"User query: {user_query}\nTool used: {tool_name}\nTool output: {json.dumps(tool_output, ensure_ascii=False)}"
128
+ return generate_text(system_instruction=system_instruction, content=content)
129
+
130
+ def hub_handle(user_query: str):
131
+ decision = decide_tool(user_query)
132
+ tool_name = decision.get("function_to_use", "chat")
133
+ tool_output = None
134
+ if tool_name == "time":
135
+ loc = "India" if "india" in user_query.lower() else "UTC"
136
+ tool_output = time_tool(location=loc)
137
+ elif tool_name == "date":
138
+ loc = "India" if "india" in user_query.lower() else "UTC"
139
+ tool_output = date_tool(location=loc)
140
+ elif tool_name == "math":
141
+ expr = _re.sub(r"[^0-9\.\+\-\*\/\(\)\s]", "", user_query).strip() or user_query
142
+ tool_output = math_tool(expr)
143
+ elif tool_name == "weather":
144
+ m = _re.search(r"in ([A-Za-z\s]+)$", user_query, flags=_re.I)
145
+ loc = m.group(1).strip() if m else "London"
146
+ tool_output = weather_tool(loc)
147
+ elif tool_name == "thermostat":
148
+ m = _re.search(r"(\d+)", user_query)
149
+ temp = int(m.group(1)) if m else 20
150
+ tool_output = {"status": "success", "set_to": temp}
151
+ elif tool_name == "search":
152
+ tool_output_text = grounded_search(user_query)
153
+ tool_output = {"search_text": tool_output_text}
154
+ elif tool_name == "science":
155
+ system_inst = "You are an ICIS science teacher; explain succinctly in 2-3 sentences with a simple example."
156
+ expl = generate_text(system_inst, user_query)
157
+ tool_output = {"explanation": expl}
158
+ else:
159
+ system_inst = "You are a friendly ICIS AI teacher, reply casually and briefly."
160
+ reply = generate_text(system_inst, user_query)
161
+ tool_output = {"reply": reply}
162
+ final = teacher_polish(user_query=user_query, tool_name=tool_name, tool_output=tool_output)
163
+ return {
164
+ "user_query": user_query,
165
+ "decision": decision,
166
+ "tool_output": tool_output,
167
+ "final_response": final
168
+ }
169
+
170
+ # =====================================================
171
+ # Helpers: strip markdown -> plain text, concise
172
+ # =====================================================
173
+ def strip_markdown(md: Optional[str]) -> str:
174
+ if not md:
175
+ return ""
176
+ text = str(md)
177
+ # remove code fences
178
+ text = re.sub(r"```.*?```", "", text, flags=re.S)
179
+ # images ![alt](url)
180
+ text = re.sub(r"!\[.*?\]\(.*?\)", "", text)
181
+ # links [text](url) -> text
182
+ text = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", text)
183
+ # inline codes `x`
184
+ text = re.sub(r"`([^`]*)`", r"\1", text)
185
+ # remove remaining markdown symbols like # * > -
186
+ text = re.sub(r"(^|\s)[#>*\-]+\s*", r"\1", text)
187
+ # collapse whitespace
188
+ text = re.sub(r"\s+\n", "\n", text)
189
+ text = re.sub(r"\n{2,}", "\n\n", text)
190
+ text = text.strip()
191
+ return text
192
+
193
+ def concise_text(plain: str, max_sentences: int = 2) -> str:
194
+ if not plain:
195
+ return ""
196
+ # naive sentence split
197
+ parts = re.split(r'(?<=[\.\?\!])\s+', plain.strip())
198
+ if len(parts) <= max_sentences:
199
+ return " ".join([p.strip() for p in parts]).strip()
200
+ return " ".join(p.strip() for p in parts[:max_sentences]).strip()
201
+
202
+ # =====================================================
203
+ # FastAPI app + endpoints
204
+ # =====================================================
205
+ app = FastAPI(title="ICIS Mini-Hub")
206
+
207
+ templates = Jinja2Templates(directory="templates")
208
+
209
+ @app.get("/", response_class=HTMLResponse)
210
+ async def index(request: Request):
211
+ return templates.TemplateResponse("index.html", {"request": request})
212
+
213
+ @app.post("/chat")
214
+ async def chat_endpoint(payload: dict):
215
+ q = payload.get("query") if isinstance(payload, dict) else None
216
+ if not q:
217
+ return JSONResponse(status_code=400, content={"error": "Missing 'query' in JSON payload."})
218
+ out = hub_handle(q)
219
+ # extract final_response and function used
220
+ final_md = out.get("final_response", "")
221
+ plain = strip_markdown(final_md)
222
+ concise = concise_text(plain, max_sentences=2)
223
+ function_used = out.get("decision", {}).get("function_to_use", "chat")
224
+ return JSONResponse(content={
225
+ "function_used": function_used,
226
+ "response": concise
227
+ })
228
+
229
+ @app.post("/analyze_image")
230
+ async def analyze_image(file: UploadFile = File(...), prompt: str = Form(...)):
231
+ # read and ensure it's an image
232
+ content_type = file.content_type or ""
233
+ if not content_type.startswith("image/"):
234
+ return JSONResponse(status_code=400, content={"error": "Uploaded file is not an image."})
235
+ image_bytes = await file.read()
236
+ try:
237
+ image = Image.open(io.BytesIO(image_bytes))
238
+ except Exception:
239
+ return JSONResponse(status_code=400, content={"error": "Could not open image."})
240
+ # call genai with image + prompt
241
+ try:
242
+ response = client.models.generate_content(model=MODEL, contents=[image, prompt])
243
+ text_md = getattr(response, "text", "")
244
+ except Exception as e:
245
+ return JSONResponse(status_code=500, content={"error": f"GenAI image analysis failed: {str(e)}"})
246
+ plain = strip_markdown(text_md)
247
+ concise = concise_text(plain, max_sentences=2)
248
+ return JSONResponse(content={"mode": "image", "response": concise})
249
+
250
+ @app.post("/summarize_pdf")
251
+ async def summarize_pdf(file: UploadFile = File(...), prompt: str = Form(...)):
252
+ ct = file.content_type or ""
253
+ if ct != "application/pdf":
254
+ return JSONResponse(status_code=400, content={"error": "Uploaded file is not a PDF."})
255
+ data = await file.read()
256
+ try:
257
+ part = types.Part.from_bytes(data=data, mime_type='application/pdf')
258
+ response = client.models.generate_content(model=MODEL, contents=[part, prompt])
259
+ text_md = getattr(response, "text", "")
260
+ except Exception as e:
261
+ return JSONResponse(status_code=500, content={"error": f"GenAI PDF summarization failed: {str(e)}"})
262
+ plain = strip_markdown(text_md)
263
+ concise = concise_text(plain, max_sentences=2)
264
+ return JSONResponse(content={"mode": "pdf", "response": concise})