Spaces:
Running
Running
Pulastya B commited on
Commit Β·
1750ce2
1
Parent(s): cac7327
ADD COMPREHENSIVE DEBUG LOGGING - will show exactly where SSE breaks
Browse files- src/api/app.py +4 -0
- src/orchestrator.py +5 -0
- src/progress_manager.py +5 -0
src/api/app.py
CHANGED
|
@@ -192,6 +192,8 @@ async def stream_progress(session_id: str):
|
|
| 192 |
- token_update: Token budget updates
|
| 193 |
- analysis_complete: When the entire workflow finishes
|
| 194 |
"""
|
|
|
|
|
|
|
| 195 |
async def event_generator():
|
| 196 |
try:
|
| 197 |
# Send initial connection event
|
|
@@ -329,6 +331,7 @@ async def run_analysis(
|
|
| 329 |
|
| 330 |
# Set HTTP session key for SSE streaming (so orchestrator emits to correct stream)
|
| 331 |
agent.http_session_key = session_key
|
|
|
|
| 332 |
|
| 333 |
try:
|
| 334 |
# Agent's session memory should resolve file_path from context
|
|
@@ -447,6 +450,7 @@ async def run_analysis(
|
|
| 447 |
|
| 448 |
# Set HTTP session key for SSE streaming (so orchestrator emits to correct stream)
|
| 449 |
agent.http_session_key = session_key
|
|
|
|
| 450 |
|
| 451 |
# Call existing agent logic
|
| 452 |
logger.info(f"Starting analysis with task: {task_description}")
|
|
|
|
| 192 |
- token_update: Token budget updates
|
| 193 |
- analysis_complete: When the entire workflow finishes
|
| 194 |
"""
|
| 195 |
+
print(f"π SSE ENDPOINT: Client connected for session_id={session_id}")
|
| 196 |
+
|
| 197 |
async def event_generator():
|
| 198 |
try:
|
| 199 |
# Send initial connection event
|
|
|
|
| 331 |
|
| 332 |
# Set HTTP session key for SSE streaming (so orchestrator emits to correct stream)
|
| 333 |
agent.http_session_key = session_key
|
| 334 |
+
print(f"π SET agent.http_session_key = {session_key}")
|
| 335 |
|
| 336 |
try:
|
| 337 |
# Agent's session memory should resolve file_path from context
|
|
|
|
| 450 |
|
| 451 |
# Set HTTP session key for SSE streaming (so orchestrator emits to correct stream)
|
| 452 |
agent.http_session_key = session_key
|
| 453 |
+
print(f"\ud83d\udd11 SET agent.http_session_key = {session_key} (file upload)")
|
| 454 |
|
| 455 |
# Call existing agent logic
|
| 456 |
logger.info(f"Starting analysis with task: {task_description}")
|
src/orchestrator.py
CHANGED
|
@@ -3039,13 +3039,18 @@ You are a DOER. Complete workflows based on user intent."""
|
|
| 3039 |
if not session_key_for_emit and hasattr(self, 'session') and self.session:
|
| 3040 |
session_key_for_emit = self.session.session_id
|
| 3041 |
|
|
|
|
|
|
|
| 3042 |
if session_key_for_emit:
|
|
|
|
| 3043 |
progress_manager.emit(session_key_for_emit, {
|
| 3044 |
'type': 'tool_executing',
|
| 3045 |
'tool': tool_name,
|
| 3046 |
'message': f"π§ Executing: {tool_name}",
|
| 3047 |
'arguments': tool_args
|
| 3048 |
})
|
|
|
|
|
|
|
| 3049 |
|
| 3050 |
# Execute tool
|
| 3051 |
tool_result = self._execute_tool(tool_name, tool_args)
|
|
|
|
| 3039 |
if not session_key_for_emit and hasattr(self, 'session') and self.session:
|
| 3040 |
session_key_for_emit = self.session.session_id
|
| 3041 |
|
| 3042 |
+
print(f"π DEBUG EMIT: http_session_key={getattr(self, 'http_session_key', 'NOT SET')}, session={hasattr(self, 'session')}, final_key={session_key_for_emit}")
|
| 3043 |
+
|
| 3044 |
if session_key_for_emit:
|
| 3045 |
+
print(f"π EMITTING tool_executing for session: {session_key_for_emit}, tool: {tool_name}")
|
| 3046 |
progress_manager.emit(session_key_for_emit, {
|
| 3047 |
'type': 'tool_executing',
|
| 3048 |
'tool': tool_name,
|
| 3049 |
'message': f"π§ Executing: {tool_name}",
|
| 3050 |
'arguments': tool_args
|
| 3051 |
})
|
| 3052 |
+
else:
|
| 3053 |
+
print(f"β οΈ SKIPPING EMIT: No session key available")
|
| 3054 |
|
| 3055 |
# Execute tool
|
| 3056 |
tool_result = self._execute_tool(tool_name, tool_args)
|
src/progress_manager.py
CHANGED
|
@@ -36,6 +36,8 @@ class ProgressManager:
|
|
| 36 |
session_id: Session identifier
|
| 37 |
event: Event data (must include 'type' and 'message')
|
| 38 |
"""
|
|
|
|
|
|
|
| 39 |
# Add timestamp
|
| 40 |
event['timestamp'] = datetime.now().isoformat()
|
| 41 |
|
|
@@ -46,8 +48,11 @@ class ProgressManager:
|
|
| 46 |
if len(self._history[session_id]) > 500:
|
| 47 |
self._history[session_id] = self._history[session_id][-500:]
|
| 48 |
|
|
|
|
|
|
|
| 49 |
# Send to all active subscribers
|
| 50 |
if session_id in self._queues:
|
|
|
|
| 51 |
dead_queues = []
|
| 52 |
for queue in self._queues[session_id]:
|
| 53 |
try:
|
|
|
|
| 36 |
session_id: Session identifier
|
| 37 |
event: Event data (must include 'type' and 'message')
|
| 38 |
"""
|
| 39 |
+
print(f"π‘ PROGRESS_MANAGER EMIT: session={session_id}, event_type={event.get('type')}, msg={event.get('message', '')[:50]}")
|
| 40 |
+
|
| 41 |
# Add timestamp
|
| 42 |
event['timestamp'] = datetime.now().isoformat()
|
| 43 |
|
|
|
|
| 48 |
if len(self._history[session_id]) > 500:
|
| 49 |
self._history[session_id] = self._history[session_id][-500:]
|
| 50 |
|
| 51 |
+
print(f"π History stored, total events for {session_id}: {len(self._history[session_id])}")
|
| 52 |
+
|
| 53 |
# Send to all active subscribers
|
| 54 |
if session_id in self._queues:
|
| 55 |
+
print(f"β
Found {len(self._queues[session_id])} subscribers for {session_id}")
|
| 56 |
dead_queues = []
|
| 57 |
for queue in self._queues[session_id]:
|
| 58 |
try:
|