import os import json import time import logging import uuid import asyncio from typing import Dict, Any, List, Optional, Set from textwrap import dedent from datetime import datetime # Load environment variables from .env file from dotenv import load_dotenv load_dotenv(os.path.join(os.path.dirname(__file__), '..', '.env')) # FastAPI imports for custom tenant-aware endpoint from fastapi import FastAPI, HTTPException, Body from fastapi.responses import StreamingResponse from pydantic import BaseModel # Updated imports for comprehensive tracking from agno.db.sqlite import SqliteDb # Changed from InMemoryDb for persistence from agno.agent import Agent from agno.models.google import Gemini from agno.tools.reasoning import ReasoningTools from agno.os import AgentOS from agno.run import RunContext # Import the new multi-tenant toolkit from backend.SQL_Agent.data_sources_sql_toolkit import DataSourcesSQLToolkit if "GOOGLE_API_KEY" not in os.environ: print("šŸ”“ WARNING: GOOGLE_API_KEY environment variable not set. The agent will fail.") print("Please run: set GOOGLE_API_KEY=your_key_here") else: print("āœ… GOOGLE_API_KEY detected in environment.") # Configuration for data sources API DATA_SOURCES_API_BASE_URL = os.environ.get("DATA_SOURCES_API_BASE_URL", "http://127.0.0.1:8000") DATA_SOURCES_API_KEY = os.environ.get("DATA_SOURCES_API_KEY") # Optional API key for authenticated requests print(f"šŸ“” Data Sources API URL: {DATA_SOURCES_API_BASE_URL}") if DATA_SOURCES_API_KEY: print("šŸ”‘ Data Sources API Key configured.") else: print(" No Data Sources API Key configured (optional)") logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # NEW: Enhanced Tool Hook for Complete Logging def comprehensive_logging_hook( run_context: RunContext, function_name: str, function_call, arguments: Dict[str, Any] ) -> Any: """ Comprehensive tool execution logging hook that saves: - Tool name and arguments - Execution timestamp - Results - User context """ # Access session_state from run_context (Agno v2 API) if not run_context.session_state: run_context.session_state = {} session_state = run_context.session_state # Initialize logging structure in session state if "tool_execution_log" not in session_state: session_state["tool_execution_log"] = [] # Create execution record execution_start = datetime.now() execution_record = { "tool_name": function_name, "arguments": arguments, "timestamp": execution_start.isoformat(), "execution_id": f"{function_name}_{execution_start.timestamp()}" } logger.info(f"šŸ”§ Executing tool: {function_name} with args: {arguments}") try: # Execute the actual tool result = function_call(**arguments) # Log successful execution execution_end = datetime.now() execution_record.update({ "result": str(result)[:1000], # Truncate long results "status": "success", "duration_ms": (execution_end - execution_start).total_seconds() * 1000, "completed_at": execution_end.isoformat() }) logger.info(f"āœ… Tool {function_name} completed successfully in {execution_record['duration_ms']:.2f}ms") except Exception as e: # Log failed execution execution_end = datetime.now() execution_record.update({ "error": str(e), "status": "failed", "duration_ms": (execution_end - execution_start).total_seconds() * 1000, "completed_at": execution_end.isoformat() }) logger.error(f"āŒ Tool {function_name} failed: {str(e)}") raise # Re-raise the exception finally: # Always save the execution record session_state["tool_execution_log"].append(execution_record) return result system_prompt = dedent(""" You are **Sirus**, the elite AI Data Scientist developed by **PhobosQ**. if you're asked about what ai model are you? you're an ai model trained by Phobos For SIRUS **CORE OBJECTIVE:** You are the high-speed bridge between raw data and business strategy. **Your Goal:** Initialize context, locate data surgically, execute accurately, and speak the answer clearly. **āš ļø CRITICAL PROTOCOLS (READ CAREFULLY):** **THE INVISIBLE WALL (Voice Protocol):** - The User **CANNOT SEE** tool inputs, outputs, or SQL. They ONLY see your final text. - **FATAL ERROR:** If you run tools and stop, the user sees a BLANK SCREEN. - **RULE:** You *must* convert every tool result into a natural language response. 2. **CONTEXT IS AUTOMATIC (State Management):** - **MEMORY OPTIMIZATION:** If you have already fetched `source_instructions` or `list_sources` in this conversation history, **DO NOT** fetch them again. Use your internal memory. Be efficient. 3. **TRIAGE & SAFETY (Negative Constraints):** - **No "Hi" Tools:** If the user says "Hi", "Hello", "Who are you?", or asks general knowledge questions ("What is a database?"), **DO NOT** use tools. Reply conversationally and kindly. - **Read-Only:** **NEVER** execute `DROP`, `DELETE`, `UPDATE`, or `INSERT`. Only `SELECT`. - **Data Safety:** Always use `LIMIT 100` unless specifically asked for more. - **No UUIDs:** Do not output raw internal IDs (UUIDs) in your final text unless explicitly requested. --- **THE SIRUS WORKFLOW (Strict Execution Order):** **STEP 1: INITIALIZE (The Setup)** - *Trigger:* User asks a specific data question (e.g., "What is sales?"). - **Action:** Check your memory. - *If Memory Empty:* Call `list_sources`. Pick the one. Call `get_source_instructions`. if u dont get the needed info and you're not sure about that siurce and if one other source is available , please use that source - *If Memory Exists:* **SKIP** this step and move to Step 2 immediately. **STEP 2: LOCATE (The Search)** - **Action:** Call `find_relevant_tables` with the user's question. - **Restriction:** **DO NOT** read the full schema (`get_available_sources_and_schema`). It is slow and costly. Be surgical. **STEP 3: EXECUTE (The Strike)** - **Action:** Call `execute_sql_query`. - **Guideline:** Use the knowledge from *Step 1 (Instructions)* and *Step 2 (Tables)* to write perfect SQL. - **Resilience:** If it fails, read the error -> Fix syntax -> Retry immediately. - **DEPTH PROTOCOL (Scaling Complexity):** - *Simple Question ("What is revenue?"):* Run 1 query. - *Complex Question ("Why did revenue drop?"):* You are authorized to run a **second or multiple, more granular query or queries** (e.g., grouping by category or date) to find the root cause *before* you speak. **STEP 4: SPEAK (The Insight)** - **Action:** Translate the JSON result into a business answer. - **Tone:** Be kind, energetic, and professional. - **Structure:** - *The Data:* State the numbers clearly. - *The Insight:* Explain *why* (based on your Depth Protocol analysis). - *The Strategy:* Offer a concrete recommendation. --- **EXEMPLAR CHAIN OF THOUGHT (Follow this behavior):** **Scenario 1: The "Why" Question (Complex Flow)** *User: "Why did revenue drop last week?"* **1. [Agent - Init (Optimization)]** *(Self-Correction: I already know the source is 'production_db' from previous turn. Skipping Init.)* **2. [Agent - Search]** `find_relevant_tables(question="revenue drop last week", concepts=["orders"])` *Output: {tables: ['orders'], columns: ['amount', 'created_at', 'category']}` **3. [Agent - Execute (Depth Protocol)]** *Thought: User asked "Why". A single sum isn't enough. I need to group by Category to see which one failed.* `execute_sql_query(sql_query="SELECT category, SUM(amount) as rev FROM orders WHERE created_at >= NOW() - INTERVAL '7 days' GROUP BY category ORDER BY rev ASC")` *Output: [{'category': 'Electronics', 'rev': 500}, {'category': 'Clothing', 'rev': 5000}]* **4. [Agent - Speak]** "I analyzed the breakdown for last week. While Clothing sales remained strong ($5k), **Electronics** revenue collapsed to just $500. **Strategic Recommendation:** Since the drop is isolated to Electronics, I recommend checking if we ran out of stock on popular items or if a competitor launched a discount in that category." --- **Scenario 2: The "Hi" Question (Triage Flow)** *User: "Hi Sirus, who are you?"* **[Agent - NO TOOLS]** "Hello! I am Sirus, your AI Data Strategist. I'm connected to your PhobosQ database and ready to analyze your data. What would you like to know?" --- **Remember:** Initialize once. Search smart. Speak like a Strategist. dont call tools unecessarily , try to be efficient , think how to use the tools well and do the tasks assigned to you well,be kind and answer the user well , dont just think but also answer when answering try to use very understandable language , be polite and professional.once finsihed answering a question , always ask the user if they have any other questions or need further assistance , recommend next steps please dont try to be a streatgist for simple questions , only be a strategist for complex questions that need strategizing or if asked to do so like "give strategy's , how to improve etc.." """) print("āœ… Configuration set. Initializing enhanced agent with comprehensive logging...") # Initialize database for persistent storage agent_db = SqliteDb(db_file="agent_sessions.db") # Initialize toolkit with API configuration from environment data_sources_sql_toolkit = DataSourcesSQLToolkit( api_base_url=DATA_SOURCES_API_BASE_URL, api_key=DATA_SOURCES_API_KEY ) # FIX: Override default instructions so they don't conflict with Sirus # custom_reasoning_instructions = """ # Use `think` to plan your approach. # Use `analyze` to verify that your query result answers the user's specific question. # CRITICAL: After calling `analyze` with next_action="final_answer", you MUST output a natural language text response to the user. # The user cannot see your tool outputs - they only see your text replies. # Never end a conversation on a tool call. Always follow up with a clear, conversational response. # """ # # Initialize reasoning tools with simplified instructions # reasoning_tools = ReasoningTools( # instructions=custom_reasoning_instructions, # <--- OVERRIDE DEFAULTS # enable_analyze=False, # enable_think=True # ) # Define agent IDs for AgentOS DEFAULT_AGENT_OS_ID = os.getenv("SQL_AGENT_OS_ID", "sql-agent-os") DEFAULT_AGENT_ID = os.getenv("SQL_AGENT_ID", "sirus-sql-agent") # Create enhanced agent with comprehensive tracking gemini_sql_agent = Agent( model=Gemini( id="gemini-flash-latest", system_prompt=system_prompt, thinking_budget=24000, include_thoughts=True, ), tools=[data_sources_sql_toolkit], tool_hooks=[comprehensive_logging_hook], # Add the logging hook telemetry=False, # Database and session management db=agent_db, add_history_to_context=True, num_history_runs=3, read_chat_history=True, # Session state for tracking session_state={ "tool_execution_log": [], "user_context": {}, "analysis_metadata": {} }, add_session_state_to_context=True, # Make session state available to tools # Response formatting markdown=True, add_datetime_to_context=True, # Error handling exponential_backoff=True, delay_between_retries=10 ) # Set agent ID for AgentOS gemini_sql_agent.id = DEFAULT_AGENT_ID # Set agent reference in toolkit so it can access session_state during tool execution # This is CRITICAL for session_state injection into tool calls data_sources_sql_toolkit.set_agent_ref(gemini_sql_agent) logger.info("Agent reference set in toolkit - session_state injection enabled") # Define Pydantic model for tenant-aware API requests class TenantRunRequest(BaseModel): """ Request model for our custom tenant-aware endpoint. This ensures all tenant context is provided in a single, secure request. Supports multi-source agent auto-detection when available_sources is provided. """ message: str supabase_jwt: str # JWT token for auth tenant_id: str # Extracted from JWT claims source_name: str # Default/primary source for query execution session_id: Optional[str] = None user_id: Optional[str] = None available_sources: Optional[list] = None # All available sources for agent auto-detection stream: bool = False # Define the tenant-aware endpoint function (will be added to AgentOS app later) async def run_tenant_agent( agent_id: str, request: TenantRunRequest ): """ Custom endpoint to run an agent with tenant_id, source_name, and supabase_jwt injected directly into the session_state. This is the PRIMARY endpoint for multi-tenant agent execution. It ensures proper tenant isolation and security by: 1. Accepting all tenant context in the request body 2. Injecting it into session_state (not shared between requests) 3. Using the JWT for data source API authentication Args: agent_id: The ID of the agent to run (e.g., "sirus-sql-agent") request: TenantRunRequest containing all tenant context Returns: StreamingResponse (if stream=True) or direct JSON response """ # Get agent from the global agent we created agent = gemini_sql_agent if agent_id == DEFAULT_AGENT_ID else None if not agent: raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found.") # CRITICAL: This is the state that will be loaded *for this run only*. # This is the correct, request-safe way to handle per-run context. # Each request gets its own isolated session_state. initial_state = { "supabase_jwt": request.supabase_jwt, # JWT for backend API auth "tenant_id": request.tenant_id, # Tenant context for toolkit "source_name": request.source_name, "user_id": request.user_id, "available_sources": request.available_sources or [], # All sources for agent auto-detection "tool_execution_log": [], "user_context": {}, "analysis_metadata": {} } # Generate a session ID if not provided session_id = request.session_id or str(uuid.uuid4()) logger.info(f"šŸš€ Starting tenant run for tenant_id={request.tenant_id}, source={request.source_name}, session={session_id}") if request.stream: # Handle streaming response for real-time agent output async def stream_generator(): try: logger.info(f"šŸŽ¬ Starting streaming for session {session_id}, message: {request.message[:50]}...") # agent.run returns a generator in stream mode response_generator = agent.run( request.message, stream=True, session_id=session_id, session_state=initial_state # <-- **** THIS IS THE FIX **** ) chunk_count = 0 for chunk in response_generator: chunk_count += 1 # Log chunk details - show all attributes for debugging if isinstance(chunk, dict): chunk_keys = list(chunk.keys()) else: chunk_keys = list(chunk.__dict__.keys()) if hasattr(chunk, '__dict__') else [] logger.info(f" [Chunk {chunk_count}] Type: {type(chunk).__name__}, Keys: {chunk_keys}") if isinstance(chunk, dict): event = chunk.get("event") data = chunk.get("data") if event: sse_event = f"event: {event}\ndata: {json.dumps(data)}\n\n" else: sse_event = f"data: {json.dumps(chunk)}\n\n" yield sse_event logger.info(f" āœ… Yielded event: {event or 'data-only'}") # Small delay to ensure chunk is flushed before next one await asyncio.sleep(0.001) else: # Handle Pydantic objects or other objects try: logger.info(f"Processing chunk type: {type(chunk)}") # Try multiple serialization methods chunk_dict = None # Method 1: Pydantic v2 model_dump() if hasattr(chunk, 'model_dump'): try: chunk_dict = chunk.model_dump() logger.info(f"āœ… Serialized with model_dump()") except Exception as e: logger.info(f"model_dump() failed: {e}") # Method 2: Pydantic v1 dict() if chunk_dict is None and hasattr(chunk, 'dict'): try: chunk_dict = chunk.dict() logger.info(f"āœ… Serialized with dict()") except Exception as e: logger.info(f"dict() failed: {e}") # Method 3: Check if it's a Pydantic BaseModel if chunk_dict is None: try: # Try to import and check from pydantic import BaseModel if isinstance(chunk, BaseModel): chunk_dict = chunk.model_dump() logger.info(f"āœ… Serialized BaseModel with model_dump()") except Exception as e: logger.info(f"BaseModel check failed: {e}") # Method 4: Fall back to __dict__ if chunk_dict is None and hasattr(chunk, '__dict__'): chunk_dict = chunk.__dict__ logger.info(f"āœ… Serialized with __dict__") # Method 5: Last resort - convert to string if chunk_dict is None: logger.warning(f"Could not serialize chunk, converting to string: {type(chunk)}") chunk_dict = {"content": str(chunk)} # Extract event type if present event_type = chunk_dict.get("event") if event_type: logger.info(f"Sending event: {event_type}") # Debug: Show content for ReasoningStep events if event_type == "ReasoningStep": logger.info(f" ReasoningStep content: reasoning={chunk_dict.get('reasoning')}, content={chunk_dict.get('content')}, result={chunk_dict.get('result')}") logger.info(f" Full ReasoningStep dict keys: {list(chunk_dict.keys())}") # Use custom serializer that properly handles nested objects def serialize_value(obj): """Recursively serialize objects, converting to strings only when necessary""" if isinstance(obj, dict): return {k: serialize_value(v) for k, v in obj.items()} elif isinstance(obj, (list, tuple)): return [serialize_value(v) for v in obj] elif hasattr(obj, 'model_dump'): return serialize_value(obj.model_dump()) elif hasattr(obj, '__dict__') and not isinstance(obj, (str, int, float, bool, type(None))): return serialize_value(obj.__dict__) else: return obj serialized_dict = serialize_value(chunk_dict) # Special handling for ReasoningStep: convert content object to string if event_type == "ReasoningStep" and isinstance(serialized_dict.get("content"), dict): # Content is a reasoning object - serialize it as string for frontend reasoning_obj = serialized_dict.pop("content") serialized_dict["reasoning_content"] = json.dumps(reasoning_obj, default=str, ensure_ascii=False) logger.info(f" āœ… Converted ReasoningStep content to reasoning_content string") sse_event = f"event: {event_type}\ndata: {json.dumps(serialized_dict, default=str, ensure_ascii=False)}\n\n" else: logger.info(f"Sending data without event type") def serialize_value(obj): """Recursively serialize objects, converting to strings only when necessary""" if isinstance(obj, dict): return {k: serialize_value(v) for k, v in obj.items()} elif isinstance(obj, (list, tuple)): return [serialize_value(v) for v in obj] elif hasattr(obj, 'model_dump'): return serialize_value(obj.model_dump()) elif hasattr(obj, '__dict__') and not isinstance(obj, (str, int, float, bool, type(None))): return serialize_value(obj.__dict__) else: return obj serialized_dict = serialize_value(chunk_dict) sse_event = f"data: {json.dumps(serialized_dict, default=str, ensure_ascii=False)}\n\n" yield sse_event logger.info(f" āœ… Yielded event: {event_type or 'data-only'}") # Small delay to ensure chunk is flushed before next one await asyncio.sleep(0.001) except Exception as e: logger.error(f"Failed to serialize chunk: {e}, chunk type: {type(chunk)}", exc_info=True) yield f"data: {json.dumps({'error': str(e), 'content': str(chunk)}, default=str)}\n\n" await asyncio.sleep(0.001) logger.info(f"āœ… Streaming run completed for session {session_id} - sent {chunk_count} chunks") except Exception as e: logger.error(f"āŒ Error during stream generation for session {session_id}: {e}", exc_info=True) error_data = {"error": str(e), "code": "STREAM_ERROR"} yield f"event: error\ndata: {json.dumps(error_data)}\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream") else: # Handle non-streaming (blocking) response try: response = agent.run( request.message, stream=False, session_id=session_id, session_state=initial_state # <-- **** THIS IS THE FIX **** ) logger.info(f"āœ… Non-streaming run completed for session {session_id}") # The final response from agent.run is the message content return { "session_id": session_id, "tenant_id": request.tenant_id, "response": response } except Exception as e: logger.error(f"āŒ Error during non-streaming agent run for session {session_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) # Create AgentOS (without fastapi_app - that parameter doesn't exist in current agno version) agent_os = AgentOS( agents=[gemini_sql_agent], description="Multi-tenant SQL Agent for querying data sources across tenants." ) # Get the AgentOS app first, then add our custom route to it agentOS_app = agent_os.get_app() # Add our custom /tenant-run endpoint to the AgentOS app agentOS_app.add_api_route( "/tenant-run/{agent_id}", run_tenant_agent, methods=["POST"], name="run_tenant_agent" ) # Use the combined app app = agentOS_app # DEPRECATED FUNCTIONS - Replaced by the /tenant-run API endpoint # The following functions are kept for backward compatibility and local testing only. # For production API usage, use the /tenant-run/{agent_id} endpoint instead. # DEPRECATED FUNCTIONS - Replaced by the /tenant-run API endpoint # The following functions are kept for backward compatibility and local testing only. # For production API usage, use the /tenant-run/{agent_id} endpoint instead. if __name__ == "__main__": import uvicorn host = os.getenv("SQL_AGENT_HOST", "0.0.0.0") port = int(os.getenv("SQL_AGENT_PORT", "5559")) print("\n" + "="*80) print("šŸš€ STARTING SQL AGENT OS SERVER (with custom /tenant-run endpoint)") print("="*80) print(f"Host: {host}") print(f"Port: {port}") print(f"Agent ID: {DEFAULT_AGENT_ID}") print(f"AgentOS ID: {DEFAULT_AGENT_OS_ID}") print("="*80 + "\n") print(f"\nšŸŽÆ CUSTOM TENANT ENDPOINT:") print(f" POST http://{host}:{port}/tenant-run/{DEFAULT_AGENT_ID}") print(f"\nšŸ“š STANDARD AGENTOS ENDPOINTS:") print(f" GET http://{host}:{port}/config") print(f" GET http://{host}:{port}/agents") print(f" POST http://{host}:{port}/agents/{DEFAULT_AGENT_ID}/runs") print("="*80 + "\n") # Run with proper streaming settings uvicorn.run( app, host=host, port=port, # Streaming settings - prevent buffering server_header=False, # Disable app level buffering - let streaming work properly interface="auto" )