ghadgemadhuri92 commited on
Commit
3659da9
·
1 Parent(s): cc2fef3

final updates

Browse files
app/agents/adk_mathminds.py CHANGED
@@ -21,6 +21,7 @@ from app.tools.python_executor import PythonInterpreter
21
  from app.tools.advanced_ocr import AdvancedOCR
22
  from app.tools.vision_analyzer import VisionAnalyzer
23
  from app.core.math_normalizer import MathQueryNormalizer
 
24
 
25
  logger = logging.getLogger(__name__)
26
 
@@ -152,13 +153,29 @@ class MathMindsADKAgent:
152
  formatted += f"Problem: {item.get('problem_text')}\nSolution: {item.get('solution_text')}\n---\n"
153
  return formatted
154
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
  # ── Agent & Runner ────────────────────────────────────────────────────
156
  self.agent = Agent(
157
  name="math_minds_core",
158
  model=model_name,
159
  tools=[
160
  web_search, math_solver, execute_python,
161
- find_similar_problems, image_interpreter, statistical_vision
 
162
  ],
163
  instruction=(
164
  "You are MathMinds AI, a precise mathematical analytical assistant. "
@@ -175,7 +192,8 @@ class MathMindsADKAgent:
175
  "In Python, you can use specialized libraries like `numpy`, `scipy`, or `sympy` for numerical and symbolic solutions."
176
  "\n3. INTERPRET LATEX: Tool outputs (especially from SymPy) are often in raw LaTeX. "
177
  "NEVER just display the raw LaTeX to the user. Always explain the steps in clear English. "
178
- "Wrap LaTeX in `$ ... $` for inline or `$$ ... $$` for blocks so the UI renders it properly."
 
179
  "\n\nCRITICAL: Always explain your reasoning before and after using tools. If a tool fails, explain WHY and try a different approach."
180
  )
181
  )
@@ -245,6 +263,8 @@ class MathMindsADKAgent:
245
  logger.error(f"Image decode failed: {e}")
246
 
247
  # ── 4. Run agent (Streaming) ──────────────────────────────────────────
 
 
248
  async for event in self.runner.run_async(
249
  user_id=user_id,
250
  session_id=session_id,
@@ -252,36 +272,41 @@ class MathMindsADKAgent:
252
  run_config=RunConfig(streaming_mode=StreamingMode.SSE)
253
  ):
254
  # ── Determine Event Type ──
255
- # is_final_response() is usually True for the final user-facing text
256
  try:
257
  is_final = event.is_final_response()
258
  except Exception:
259
  is_final = False
260
 
261
- # ── Capture Content (Text) ──
262
  if hasattr(event, "content") and event.content and event.content.parts:
263
- for part in event.content.parts:
264
- if hasattr(part, "text") and part.text:
265
- # Stream ALL text to the main answer window
266
- # This fixes the "empty answer until refresh" issue.
267
- yield {"type": "answer", "content": part.text}
268
-
269
- # Log terminal responses separately if needed for logic
270
- if is_final:
271
- logger.debug(f"Final response chunk received: {part.text[:50]}...")
 
 
 
 
 
 
272
 
273
  # ── Capture Tool Usage (Reasoning) ──
274
  for fc in event.get_function_calls():
275
  yield {
276
- "type": "action",
277
- "content": f"Using tool: {fc.name}"
278
  }
279
 
280
  # ── Capture Tool Response ──
281
  for fr in event.get_function_responses():
282
  yield {
283
- "type": "observation",
284
- "content": f"Obtained result from {fr.name}"
285
  }
286
 
287
  except Exception as e:
 
21
  from app.tools.advanced_ocr import AdvancedOCR
22
  from app.tools.vision_analyzer import VisionAnalyzer
23
  from app.core.math_normalizer import MathQueryNormalizer
24
+ from app.services.automation import automation_service
25
 
26
  logger = logging.getLogger(__name__)
27
 
 
153
  formatted += f"Problem: {item.get('problem_text')}\nSolution: {item.get('solution_text')}\n---\n"
154
  return formatted
155
 
156
+ async def trigger_automation(event_name: str, payload_json: str) -> str:
157
+ """
158
+ Trigger an external automation workflow (n8n).
159
+ Use this for sending alerts, emails, Discord messages, or logging data.
160
+ Args:
161
+ event_name: Description of the event (e.g., 'complex_problem_solved').
162
+ payload_json: A JSON string containing the data to send.
163
+ """
164
+ try:
165
+ payload = json.loads(payload_json)
166
+ result = await automation_service.trigger(event_name, payload)
167
+ return f"Automation triggered: {result.get('status')}"
168
+ except Exception as e:
169
+ return f"Automation failed: {str(e)}"
170
+
171
  # ── Agent & Runner ────────────────────────────────────────────────────
172
  self.agent = Agent(
173
  name="math_minds_core",
174
  model=model_name,
175
  tools=[
176
  web_search, math_solver, execute_python,
177
+ find_similar_problems, image_interpreter, statistical_vision,
178
+ trigger_automation
179
  ],
180
  instruction=(
181
  "You are MathMinds AI, a precise mathematical analytical assistant. "
 
192
  "In Python, you can use specialized libraries like `numpy`, `scipy`, or `sympy` for numerical and symbolic solutions."
193
  "\n3. INTERPRET LATEX: Tool outputs (especially from SymPy) are often in raw LaTeX. "
194
  "NEVER just display the raw LaTeX to the user. Always explain the steps in clear English. "
195
+ "Wrap LaTeX in `$ ... $` for inline or `$$ ... $$` for blocks so the UI renders it properly. "
196
+ "Example: Use '$x^2$' instead of 'x^2'."
197
  "\n\nCRITICAL: Always explain your reasoning before and after using tools. If a tool fails, explain WHY and try a different approach."
198
  )
199
  )
 
263
  logger.error(f"Image decode failed: {e}")
264
 
265
  # ── 4. Run agent (Streaming) ──────────────────────────────────────────
266
+ yielded_text_len = 0
267
+
268
  async for event in self.runner.run_async(
269
  user_id=user_id,
270
  session_id=session_id,
 
272
  run_config=RunConfig(streaming_mode=StreamingMode.SSE)
273
  ):
274
  # ── Determine Event Type ──
 
275
  try:
276
  is_final = event.is_final_response()
277
  except Exception:
278
  is_final = False
279
 
280
+ # ── Capture Content (Text Delta) ──
281
  if hasattr(event, "content") and event.content and event.content.parts:
282
+ # Safer handling: Ensure we only join STRINGS (handle None indices from tool parts)
283
+ full_turn_text = "".join((getattr(part, "text", "") or "") for part in event.content.parts)
284
+
285
+ # Handle buffer reset (happens after tool calls)
286
+ if len(full_turn_text) < yielded_text_len:
287
+ yielded_text_len = 0
288
+
289
+ # Stream delta
290
+ if len(full_turn_text) > yielded_text_len:
291
+ delta = full_turn_text[yielded_text_len:]
292
+ yielded_text_len = len(full_turn_text)
293
+ yield {"type": "answer", "content": delta}
294
+
295
+ if is_final:
296
+ logger.debug(f"Final response chunk received: {delta[:50]}...")
297
 
298
  # ── Capture Tool Usage (Reasoning) ──
299
  for fc in event.get_function_calls():
300
  yield {
301
+ "type": "thought", # Changed from action to thought for UI consistency
302
+ "content": f"⚙️ {fc.name}"
303
  }
304
 
305
  # ── Capture Tool Response ──
306
  for fr in event.get_function_responses():
307
  yield {
308
+ "type": "thought", # Changed from observation to thought for UI consistency
309
+ "content": f"👁️ Result from {fr.name}"
310
  }
311
 
312
  except Exception as e:
app/api/main.py CHANGED
@@ -294,15 +294,15 @@ async def solve_problem(
294
 
295
  async def event_generator():
296
  try:
297
- async for chunk in orchestrator.process_problem(
298
- text=solve_req.effective_text,
299
  image=solve_req.image,
300
- request_id=final_request_id,
301
- model_preference=solve_req.model_preference,
302
  session_id=solve_req.session_id,
303
- user_id=current_user.get("uid")
304
  ):
305
- yield json.dumps(chunk) + "\n"
 
306
  except Exception as e:
307
  logger.error(f"Streaming error: {e}")
308
  yield json.dumps({"type": "error", "content": "Internal processing error"}) + "\n"
@@ -313,7 +313,15 @@ async def solve_problem(
313
  except Exception:
314
  pass
315
 
316
- return StreamingResponse(event_generator(), media_type="application/x-ndjson")
 
 
 
 
 
 
 
 
317
 
318
  # --- Chat History Endpoints ---
319
 
 
294
 
295
  async def event_generator():
296
  try:
297
+ async for event in orchestrator.solve_problem_stream(
298
+ query=solve_req.effective_text,
299
  image=solve_req.image,
300
+ user_id=current_user["uid"],
 
301
  session_id=solve_req.session_id,
302
+ request_id=final_request_id
303
  ):
304
+ # STRICT SSE FORMAT
305
+ yield f"data: {json.dumps(event)}\n\n"
306
  except Exception as e:
307
  logger.error(f"Streaming error: {e}")
308
  yield json.dumps({"type": "error", "content": "Internal processing error"}) + "\n"
 
313
  except Exception:
314
  pass
315
 
316
+ return StreamingResponse(
317
+ event_generator(),
318
+ media_type="text/event-stream",
319
+ headers={
320
+ "Cache-Control": "no-cache",
321
+ "Connection": "keep-alive",
322
+ "X-Accel-Buffering": "no" # Prevent Nginx buffering
323
+ }
324
+ )
325
 
326
  # --- Chat History Endpoints ---
327
 
app/core/orchestrator.py CHANGED
@@ -43,9 +43,9 @@ class Orchestrator:
43
  logger.critical(f"Failed to initialize Orchestrator: {e}")
44
  raise
45
 
46
- async def process_problem(
47
  self,
48
- text: Optional[str] = None,
49
  image: Optional[str] = None,
50
  request_id: Optional[str] = None,
51
  model_preference: str = "fast",
@@ -71,7 +71,7 @@ class Orchestrator:
71
 
72
  try:
73
  # ── 1. Input processing ───────────────────────────────────────────
74
- processed = self.input_processor.process_compound(text_input=text, image_input=image)
75
  if not processed.is_valid:
76
  yield {"type": "error", "content": processed.error_message}
77
  return
@@ -79,12 +79,16 @@ class Orchestrator:
79
  query = processed.cleaned_content
80
  image_data = processed.metadata.get("image_data")
81
 
82
- # Background: Persist user message
83
  if user_id and session_id:
84
- asyncio.create_task(self._persist_message(
85
- user_id=user_id, session_id=session_id, role="user",
86
- content=text or "Uploaded an image", image_data=image_data
87
- ))
 
 
 
 
88
 
89
  # ── 2. Cache lookup ───────────────────────────────────────────────
90
  if settings.ENABLE_CACHE and not image_data:
@@ -93,12 +97,9 @@ class Orchestrator:
93
  if cached:
94
  yield {"type": "thought", "content": "Retrieving answer from memory..."}
95
  yield {"type": "answer", "content": cached.get("answer")}
96
- # Background: Persist assistant response
97
  if user_id and session_id:
98
- asyncio.create_task(self._persist_message(
99
- user_id=user_id, session_id=session_id, role="assistant",
100
- content=cached.get("answer"), metadata=cached.get("metadata")
101
- ))
102
  return
103
  else:
104
  cache_key = None
@@ -116,7 +117,7 @@ class Orchestrator:
116
  "metadata": {"model": "sympy", "tools_used": ["sympy"]}
117
  })
118
 
119
- self._fire_and_forget_log(query, result_schema, user_id, session_id, cache_key)
120
  return
121
 
122
  # ── 4. Agentic Streaming Loop ─────────────────────────────────────
@@ -149,7 +150,8 @@ class Orchestrator:
149
  result_schema["metadata"]["latency_ms"] = int((time.time() - start_time) * 1000)
150
 
151
  if full_answer:
152
- self._fire_and_forget_log(query, result_schema, user_id, session_id, cache_key)
 
153
 
154
  except Exception as e:
155
  logger.error(f"Orchestrator Error: {e}")
@@ -162,18 +164,15 @@ class Orchestrator:
162
  except Exception as e:
163
  logger.error(f"Failed to persist message: {e}")
164
 
165
- def _fire_and_forget_log(self, query, schema, user_id, session_id, cache_key):
166
- """Fire and forget persistence to avoid blocking the stream completion."""
167
- asyncio.create_task(self._persist_log(query, schema, user_id, session_id, cache_key))
168
-
169
- async def _persist_log(self, query, schema, user_id, session_id, cache_key):
170
  """Internal awaitable helper."""
171
  # Map logic_trace to reasoning for frontend consistency
172
  reasoning = "\n".join(schema["metadata"].get("logic_trace", []))
173
 
174
  await self._persist_message(
175
  user_id=user_id, session_id=session_id, role="assistant",
176
- content=schema["answer"], reasoning=reasoning, metadata=schema["metadata"]
 
177
  )
178
  if settings.ENABLE_CACHE and cache_key:
179
  self.cache_manager.set_cached_answer(cache_key, schema)
 
43
  logger.critical(f"Failed to initialize Orchestrator: {e}")
44
  raise
45
 
46
+ async def solve_problem_stream(
47
  self,
48
+ query: Optional[str] = None,
49
  image: Optional[str] = None,
50
  request_id: Optional[str] = None,
51
  model_preference: str = "fast",
 
71
 
72
  try:
73
  # ── 1. Input processing ───────────────────────────────────────────
74
+ processed = self.input_processor.process_compound(text_input=query, image_input=image)
75
  if not processed.is_valid:
76
  yield {"type": "error", "content": processed.error_message}
77
  return
 
79
  query = processed.cleaned_content
80
  image_data = processed.metadata.get("image_data")
81
 
82
+ # 1.5. Persist user message (Safety Check: Don't duplicate)
83
  if user_id and session_id:
84
+ # Check if this exact request already exists in DB to prevent duplicates
85
+ history = self.db_manager.get_chat_history(user_id, session_id) or []
86
+ if not any(m.get("request_id") == request_id for m in history):
87
+ await self._persist_message(
88
+ user_id=user_id, session_id=session_id, role="user",
89
+ content=query or "Uploaded an image", image_data=image_data,
90
+ request_id=request_id
91
+ )
92
 
93
  # ── 2. Cache lookup ───────────────────────────────────────────────
94
  if settings.ENABLE_CACHE and not image_data:
 
97
  if cached:
98
  yield {"type": "thought", "content": "Retrieving answer from memory..."}
99
  yield {"type": "answer", "content": cached.get("answer")}
100
+ # Persist assistant response
101
  if user_id and session_id:
102
+ await self._persist_log(query, {"answer": cached.get("answer"), "metadata": cached.get("metadata")}, user_id, session_id, cache_key)
 
 
 
103
  return
104
  else:
105
  cache_key = None
 
117
  "metadata": {"model": "sympy", "tools_used": ["sympy"]}
118
  })
119
 
120
+ await self._persist_log(query, result_schema, user_id, session_id, cache_key, request_id=request_id)
121
  return
122
 
123
  # ── 4. Agentic Streaming Loop ─────────────────────────────────────
 
150
  result_schema["metadata"]["latency_ms"] = int((time.time() - start_time) * 1000)
151
 
152
  if full_answer:
153
+ # AWAIT the final log instead of fire-and-forget to prevent race conditions with UI reloads.
154
+ await self._persist_log(query, result_schema, user_id, session_id, cache_key, request_id=request_id)
155
 
156
  except Exception as e:
157
  logger.error(f"Orchestrator Error: {e}")
 
164
  except Exception as e:
165
  logger.error(f"Failed to persist message: {e}")
166
 
167
+ async def _persist_log(self, query, schema, user_id, session_id, cache_key, request_id=None):
 
 
 
 
168
  """Internal awaitable helper."""
169
  # Map logic_trace to reasoning for frontend consistency
170
  reasoning = "\n".join(schema["metadata"].get("logic_trace", []))
171
 
172
  await self._persist_message(
173
  user_id=user_id, session_id=session_id, role="assistant",
174
+ content=schema["answer"], reasoning=reasoning, metadata=schema["metadata"],
175
+ request_id=request_id
176
  )
177
  if settings.ENABLE_CACHE and cache_key:
178
  self.cache_manager.set_cached_answer(cache_key, schema)
app/core/settings.py CHANGED
@@ -44,6 +44,7 @@ class Settings(BaseSettings):
44
  SUPABASE_URL: Optional[str] = None
45
  SUPABASE_KEY: Optional[str] = None
46
  WOLFRAM_APP_ID: Optional[str] = None
 
47
 
48
  # Security
49
  JWT_SECRET_KEY: str = "super_secret_key_change_me"
 
44
  SUPABASE_URL: Optional[str] = None
45
  SUPABASE_KEY: Optional[str] = None
46
  WOLFRAM_APP_ID: Optional[str] = None
47
+ N8N_WEBHOOK_URL: Optional[str] = None
48
 
49
  # Security
50
  JWT_SECRET_KEY: str = "super_secret_key_change_me"
app/services/automation.py ADDED
@@ -0,0 +1,53 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import httpx
3
+ from typing import Dict, Any, Optional
4
+ from app.core.settings import settings
5
+
6
+ logger = logging.getLogger(__name__)
7
+
8
+ class AutomationService:
9
+ """
10
+ Service for integrating with n8n via webhooks.
11
+ Used for external notifications, data logging, and low-code workflows.
12
+ """
13
+
14
+ def __init__(self, webhook_url: Optional[str] = None):
15
+ self.webhook_url = webhook_url or settings.N8N_WEBHOOK_URL
16
+
17
+ async def trigger(self, event_name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
18
+ """
19
+ Triggers an n8n workflow by sending a POST request to a webhook.
20
+ """
21
+ if not self.webhook_url:
22
+ logger.warning("n8n automation triggered but no N8N_WEBHOOK_URL is configured.")
23
+ return {"status": "skipped", "reason": "no_webhook_url"}
24
+
25
+ try:
26
+ # Add metadata to the payload
27
+ data = {
28
+ "event": event_name,
29
+ "timestamp": settings.datetime.now().isoformat() if hasattr(settings, 'datetime') else None,
30
+ "environment": settings.ENV,
31
+ "data": payload
32
+ }
33
+
34
+ async with httpx.AsyncClient() as client:
35
+ response = await client.post(
36
+ self.webhook_url,
37
+ json=data,
38
+ timeout=10.0
39
+ )
40
+
41
+ if response.status_code in (200, 201):
42
+ logger.info(f"n8n automation triggered successfully: {event_name}")
43
+ return {"status": "success", "response": response.json() if response.content else "OK"}
44
+ else:
45
+ logger.error(f"n8n automation failed with status {response.status_code}: {response.text}")
46
+ return {"status": "error", "code": response.status_code, "detail": response.text}
47
+
48
+ except Exception as e:
49
+ logger.error(f"Error triggering n8n automation: {e}")
50
+ return {"status": "error", "detail": str(e)}
51
+
52
+ # Singleton instance
53
+ automation_service = AutomationService()
db_diag.py ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from pymongo import MongoClient
3
+ from dotenv import load_dotenv
4
+
5
+ load_dotenv()
6
+
7
+ mongo_uri = os.getenv("MONGO_URI")
8
+ client = MongoClient(mongo_uri)
9
+ db = client.mathminds_db
10
+ # FIXED COLLECTION NAME
11
+ sessions = db.chat_sessions
12
+
13
+ print("LAST 3 SESSIONS:")
14
+ for s in sessions.find().sort("created_at", -1).limit(3):
15
+ print(f"Session: {s.get('session_id')} | User: {s.get('user_id')}")
16
+ print(f"Title: {s.get('title')}")
17
+ msgs = s.get("messages", [])
18
+ print(f"Messages Count: {len(msgs)}")
19
+ for m in msgs[-10:]:
20
+ print(f" [{m.get('role')}] {m.get('content')[:100]} (RID: {m.get('request_id')})")
21
+ print("-" * 20)
docker-compose.yml CHANGED
@@ -21,7 +21,7 @@ services:
21
  - mathminds_net
22
  restart: unless-stopped
23
  healthcheck:
24
- test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
25
  interval: 30s
26
  timeout: 10s
27
  retries: 3
@@ -47,7 +47,7 @@ services:
47
  restart: unless-stopped
48
 
49
  frontend:
50
- build:
51
  context: .
52
  dockerfile: frontend/Dockerfile
53
  container_name: mathminds_frontend
@@ -64,7 +64,7 @@ services:
64
  - mathminds_net
65
  restart: unless-stopped
66
  healthcheck:
67
- test: ["CMD", "curl", "-f", "http://localhost:8501/_stcore/health"]
68
  interval: 30s
69
  timeout: 10s
70
  retries: 3
@@ -81,7 +81,7 @@ services:
81
  - mathminds_net
82
  restart: unless-stopped
83
  healthcheck:
84
- test: ["CMD", "redis-cli", "ping"]
85
  interval: 10s
86
  timeout: 5s
87
  retries: 5
@@ -97,11 +97,28 @@ services:
97
  - mathminds_net
98
  restart: unless-stopped
99
  healthcheck:
100
- test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"]
101
  interval: 10s
102
  timeout: 5s
103
  retries: 5
104
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
  networks:
106
  mathminds_net:
107
  driver: bridge
@@ -109,3 +126,4 @@ networks:
109
  volumes:
110
  redis_data:
111
  mongo_data:
 
 
21
  - mathminds_net
22
  restart: unless-stopped
23
  healthcheck:
24
+ test: [ "CMD", "curl", "-f", "http://localhost:8000/health" ]
25
  interval: 30s
26
  timeout: 10s
27
  retries: 3
 
47
  restart: unless-stopped
48
 
49
  frontend:
50
+ build:
51
  context: .
52
  dockerfile: frontend/Dockerfile
53
  container_name: mathminds_frontend
 
64
  - mathminds_net
65
  restart: unless-stopped
66
  healthcheck:
67
+ test: [ "CMD", "curl", "-f", "http://localhost:8501/_stcore/health" ]
68
  interval: 30s
69
  timeout: 10s
70
  retries: 3
 
81
  - mathminds_net
82
  restart: unless-stopped
83
  healthcheck:
84
+ test: [ "CMD", "redis-cli", "ping" ]
85
  interval: 10s
86
  timeout: 5s
87
  retries: 5
 
97
  - mathminds_net
98
  restart: unless-stopped
99
  healthcheck:
100
+ test: [ "CMD", "mongosh", "--eval", "db.adminCommand('ping')" ]
101
  interval: 10s
102
  timeout: 5s
103
  retries: 5
104
 
105
+ n8n:
106
+ image: n8nio/n8n:latest
107
+ container_name: mathminds_n8n
108
+ ports:
109
+ - "5678:5678"
110
+ environment:
111
+ - N8N_HOST=localhost
112
+ - N8N_PORT=5678
113
+ - N8N_PROTOCOL=http
114
+ - NODE_ENV=production
115
+ - WEBHOOK_URL=http://localhost:5678/
116
+ volumes:
117
+ - n8n_data:/home/node/.n8n
118
+ networks:
119
+ - mathminds_net
120
+ restart: unless-stopped
121
+
122
  networks:
123
  mathminds_net:
124
  driver: bridge
 
126
  volumes:
127
  redis_data:
128
  mongo_data:
129
+ n8n_data:
frontend/app.py CHANGED
@@ -1,5 +1,6 @@
1
  import streamlit as st
2
  import requests
 
3
  import base64
4
  from PIL import Image
5
  import io
@@ -206,7 +207,35 @@ def load_messages(session_id):
206
  headers=headers, timeout=30
207
  )
208
  if response.status_code == 200:
209
- st.session_state.messages = response.json()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
210
  elif response.status_code == 404:
211
  # Session doesn't belong to this user — clear silently
212
  st.session_state.messages = []
@@ -216,6 +245,7 @@ def load_messages(session_id):
216
  st.session_state.messages = []
217
  st.error(f"Failed to load messages: {response.status_code}")
218
  except Exception as e:
 
219
  st.error(f"Error loading messages: {e}")
220
  st.session_state.messages = []
221
 
@@ -227,9 +257,15 @@ def get_active_session():
227
  return None
228
 
229
 
230
- def add_message(role, content, sent_to_api=False, **kwargs):
231
  """Optimistic UI update only — persistence happens in the backend via /solve."""
232
- msg = {"role": role, "content": content, "timestamp": time.time(), "sent_to_api": sent_to_api}
 
 
 
 
 
 
233
  msg.update(kwargs)
234
  st.session_state.messages.append(msg)
235
 
@@ -476,9 +512,7 @@ def chat_interface():
476
  if badges:
477
  st.markdown(badges, unsafe_allow_html=True)
478
 
479
- if msg.get("reasoning") or msg.get("explanation"):
480
- with st.expander("Show Reasoning Steps"):
481
- st.markdown(msg.get("reasoning") or msg.get("explanation"))
482
 
483
  content = msg["content"]
484
  if isinstance(content, dict) and "final_answer" in content:
@@ -547,78 +581,94 @@ def chat_interface():
547
  and not st.session_state.messages[-1].get("sent_to_api", False)
548
  ):
549
  last = st.session_state.messages[-1]
550
- current_request_id = last.get("request_id") or str(uuid.uuid4())
551
- last["request_id"] = current_request_id
 
 
552
 
553
  with st.chat_message("assistant", avatar="🤖"):
554
- status_msg = st.status("Thinking...", expanded=True)
555
- logic_placeholder = status_msg.empty()
556
  answer_placeholder = st.empty()
557
 
558
  full_answer = ""
559
  logic_trace = []
560
 
561
  try:
562
- last["sent_to_api"] = True
563
  payload = {
564
- "text": last["content"],
565
- "image": last.get("image_data"),
566
  "session_id": st.session_state.active_session_id,
567
- "request_id": current_request_id,
568
  }
569
  headers = get_auth_headers()
570
-
571
- with requests.post(API_URL, json=payload, headers=headers, stream=True, timeout=360) as r:
572
  if r.status_code == 200:
573
- for raw_line in r.iter_lines(decode_unicode=True):
574
- if raw_line:
575
- try:
576
- line = raw_line.strip()
577
- # Handle optional "data: " prefix if SSE is used
578
- if line.startswith("data: "):
579
- line = line[6:].strip()
 
 
 
 
 
 
 
 
580
 
581
- data = json.loads(line)
582
- if data["type"] == "thought":
583
- logic_trace.append(data["content"])
584
- elif data["type"] == "action":
585
- logic_trace.append(f"⚙️ {data['content']}")
586
- elif data["type"] == "observation":
587
- logic_trace.append(f"👁️ {data['content']}")
588
- elif data["type"] == "answer":
589
- full_answer += data["content"]
590
- answer_placeholder.markdown(full_answer)
591
- elif data["type"] == "error":
592
- st.error(data["content"])
593
 
594
- # Update logic trace UI
595
- logic_placeholder.markdown("\n".join(logic_trace))
596
- except Exception:
597
- continue
 
 
 
 
 
 
 
 
 
 
598
 
 
 
 
 
 
 
 
 
 
 
 
 
599
  status_msg.update(label="Solved!", state="complete", expanded=False)
600
 
601
- # Force sync with database to ensure UI has the latest persisted state
602
- if st.session_state.active_session_id:
603
- load_messages(st.session_state.active_session_id)
604
- load_sessions() # Refresh titles
605
- elif r.status_code == 401:
606
- _clear_user_state()
607
- st.session_state.user = None
608
- st.error("Session expired. Please log in again.")
609
  else:
610
- st.error(f"Error: {r.status_code}")
611
-
612
  except Exception as e:
613
- st.error(f"Connection error: {e}")
 
614
  finally:
 
615
  st.session_state.is_processing = False
616
- # Final check for unsent user message cleanup
617
- if st.session_state.messages and st.session_state.messages[-1].get("role") == "user":
618
- st.session_state.messages[-1]["sent_to_api"] = True
619
  st.rerun()
620
 
621
 
 
622
  # ====================================================
623
  # Sidebar
624
  # ====================================================
 
1
  import streamlit as st
2
  import requests
3
+ import json
4
  import base64
5
  from PIL import Image
6
  import io
 
207
  headers=headers, timeout=30
208
  )
209
  if response.status_code == 200:
210
+ server_messages = response.json()
211
+ local_messages = st.session_state.get("messages", [])
212
+
213
+ # ✅ INDESTRUCTIBLE MERGE LOGIC
214
+ # 1. Start with server messages as the definitive baseline.
215
+ merged = []
216
+ server_keys = set()
217
+ for m in server_messages:
218
+ merged.append(m)
219
+ rid = m.get("request_id")
220
+ role = m.get("role")
221
+ if rid and role:
222
+ server_keys.add((role, rid))
223
+
224
+ # 2. Append local messages that have NOT yet reached the server.
225
+ # This protects local "optimistic" messages from vanishing if DB is slow.
226
+ for lm in local_messages:
227
+ rid = lm.get("request_id")
228
+ role = lm.get("role")
229
+ if rid and role:
230
+ if (role, rid) not in server_keys:
231
+ merged.append(lm)
232
+ elif not rid:
233
+ # Fallback for messages without IDs (should be rare)
234
+ content_prefix = str(lm.get("content", ""))[:50]
235
+ if not any(str(sm.get("content", "")).startswith(content_prefix) for sm in server_messages):
236
+ merged.append(lm)
237
+
238
+ st.session_state.messages = merged
239
  elif response.status_code == 404:
240
  # Session doesn't belong to this user — clear silently
241
  st.session_state.messages = []
 
245
  st.session_state.messages = []
246
  st.error(f"Failed to load messages: {response.status_code}")
247
  except Exception as e:
248
+ logger.error(f"Error loading messages: {e}")
249
  st.error(f"Error loading messages: {e}")
250
  st.session_state.messages = []
251
 
 
257
  return None
258
 
259
 
260
+ def add_message(role, content, sent_to_api=False, request_id=None, **kwargs):
261
  """Optimistic UI update only — persistence happens in the backend via /solve."""
262
+ msg = {
263
+ "role": role,
264
+ "content": content,
265
+ "timestamp": time.time(),
266
+ "sent_to_api": sent_to_api,
267
+ "request_id": request_id
268
+ }
269
  msg.update(kwargs)
270
  st.session_state.messages.append(msg)
271
 
 
512
  if badges:
513
  st.markdown(badges, unsafe_allow_html=True)
514
 
515
+ # Reasoning display removed as per user request
 
 
516
 
517
  content = msg["content"]
518
  if isinstance(content, dict) and "final_answer" in content:
 
581
  and not st.session_state.messages[-1].get("sent_to_api", False)
582
  ):
583
  last = st.session_state.messages[-1]
584
+ request_id = last.get("request_id") or str(uuid.uuid4())
585
+ last["request_id"] = request_id
586
+ # ✅ CRITICAL: Mark as sent immediately to prevent re-triggering during streaming
587
+ last["sent_to_api"] = True
588
 
589
  with st.chat_message("assistant", avatar="🤖"):
590
+ status_msg = st.status("Thinking...", expanded=False)
 
591
  answer_placeholder = st.empty()
592
 
593
  full_answer = ""
594
  logic_trace = []
595
 
596
  try:
597
+ # Prepare SSE Session
598
  payload = {
599
+ "text": last["content"],
600
+ "image": last.get("image_data"),
601
  "session_id": st.session_state.active_session_id,
602
+ "request_id": request_id
603
  }
604
  headers = get_auth_headers()
605
+ with requests.post(f"{BACKEND_URL}/solve", json=payload, headers=headers, stream=True, timeout=360) as r:
 
606
  if r.status_code == 200:
607
+ line_buffer = ""
608
+ last_ui_update = time.time()
609
+
610
+ # ZERO-BUFFER BYTE STREAMING
611
+ for chunk in r.iter_content(chunk_size=None, decode_unicode=True):
612
+ if chunk:
613
+ line_buffer += chunk
614
+ while "\n" in line_buffer:
615
+ line, line_buffer = line_buffer.split("\n", 1)
616
+ line = line.strip()
617
+ if not line: continue
618
+
619
+ try:
620
+ if line.startswith("data:"):
621
+ line = line[len("data:"):].strip()
622
 
623
+ data = json.loads(line)
624
+ ev_type = data.get("type", "")
 
 
 
 
 
 
 
 
 
 
625
 
626
+ if ev_type == "answer":
627
+ content = data.get("content", "")
628
+ full_answer += content
629
+ # ✅ RATE-LIMITED UI UPDATE (Smooth @ 20fps)
630
+ if time.time() - last_ui_update > 0.05:
631
+ answer_placeholder.markdown(full_answer + "▌")
632
+ last_ui_update = time.time()
633
+ elif ev_type in ("thought", "action", "observation"):
634
+ content = data.get("content", "")
635
+ if content:
636
+ logic_trace.append(content)
637
+ status_msg.update(label=f"⚙️ {content}", state="running", expanded=False)
638
+ except Exception:
639
+ continue
640
 
641
+ # ✅ FINAL FLUSH
642
+ if line_buffer.strip():
643
+ try:
644
+ line = line_buffer.strip()
645
+ if line.startswith("data:"): line = line[len("data:"):].strip()
646
+ data = json.loads(line)
647
+ if data.get("type") == "answer":
648
+ full_answer += data.get("content", "")
649
+ except Exception: pass
650
+
651
+ # Finalize
652
+ answer_placeholder.markdown(full_answer if full_answer else "No answer received.")
653
  status_msg.update(label="Solved!", state="complete", expanded=False)
654
 
655
+ # Save & FINAL SYNC
656
+ add_message("assistant", full_answer, request_id=request_id)
657
+ time.sleep(0.1)
658
+ load_messages(st.session_state.active_session_id)
659
+ st.rerun()
 
 
 
660
  else:
661
+ st.error(f"Backend Error: {r.status_code}")
 
662
  except Exception as e:
663
+ logger.error(f"Streaming Exception: {e}")
664
+ st.error(f"Connection lost or error: {e}")
665
  finally:
666
+ # ✅ CRITICAL: Always release processing lock
667
  st.session_state.is_processing = False
 
 
 
668
  st.rerun()
669
 
670
 
671
+
672
  # ====================================================
673
  # Sidebar
674
  # ====================================================
test_api.py ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ import json
3
+ import time
4
+
5
+ url = "http://localhost:8000/solve"
6
+ payload = {
7
+ "text": "what is 2+2?",
8
+ "session_id": "test_session",
9
+ "request_id": "test_rid_" + str(time.time())
10
+ }
11
+
12
+ print(f"Calling {url}...")
13
+ headers = {"Authorization": "Bearer mock_token_123"}
14
+ try:
15
+ with requests.post(url, json=payload, headers=headers, stream=True, timeout=30) as r:
16
+ print(f"Status: {r.status_code}")
17
+ for chunk in r.iter_content(chunk_size=1, decode_unicode=True):
18
+ if chunk:
19
+ print(chunk, end="", flush=True)
20
+ except Exception as e:
21
+ print(f"\nFAILED: {e}")
test_orch.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import os
3
+ os.environ["DISABLE_MODEL_SOURCE_CHECK"] = "True"
4
+ import json
5
+ import sys
6
+
7
+ # Add current dir to path
8
+ sys.path.append(os.getcwd())
9
+
10
+ from app.core.orchestrator import Orchestrator
11
+
12
+ async def test_stream():
13
+ # Mock dependencies
14
+ orch = Orchestrator()
15
+ print("Orchestrator initialized.")
16
+
17
+ query = "what is 9^3?"
18
+ print(f"Solving: {query}")
19
+
20
+ try:
21
+ async for event in orch.solve_problem_stream(query=query, request_id="test-rid"):
22
+ print(f"EVENT: {event}")
23
+ except Exception as e:
24
+ print(f"FAILED: {e}")
25
+
26
+ if __name__ == "__main__":
27
+ asyncio.run(test_stream())
verify_scraper.py ADDED
@@ -0,0 +1,33 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import logging
3
+ from app.tools.web_scraper import run_playwright_sync
4
+
5
+ logging.basicConfig(level=logging.INFO)
6
+
7
+ def test_scraper():
8
+ print("Starting WebScraper Verification...")
9
+
10
+ # Test query: something that definitely has tables
11
+ query = "gold rate in bangalore"
12
+ print(f"Query: {query}")
13
+
14
+ result = run_playwright_sync(query, headless=True)
15
+
16
+ if result.get("status") == "success":
17
+ print(f"Success! Targeted URL: {result.get('url')}")
18
+ content = result.get("content", "")
19
+
20
+ # Check for Table Preservation
21
+ has_tables = "[TABLE START]" in content
22
+ print(f"Table Preservation: {'DETECTED' if has_tables else 'NOT FOUND'}")
23
+
24
+ # Check for dynamic search (should not be DuckDuckGo URL if successful)
25
+ is_ddg = "duckduckgo" in result.get("url", "").lower()
26
+ print(f"Dynamic Search: {'WORKING' if not is_ddg else 'FALLBACK TO DDG'}")
27
+
28
+ print(f"Content Preview (first 200 chars):\n{content[:200]}...")
29
+ else:
30
+ print(f"Error: {result.get('error')}")
31
+
32
+ if __name__ == "__main__":
33
+ test_scraper()