Spaces:
Running
Running
| import os | |
| import json | |
| import time | |
| import logging | |
| import uuid | |
| import asyncio | |
| from typing import Dict, Any, List, Optional, Set | |
| from textwrap import dedent | |
| from datetime import datetime | |
| # Load environment variables from .env file | |
| from dotenv import load_dotenv | |
| load_dotenv(os.path.join(os.path.dirname(__file__), '..', '.env')) | |
| # FastAPI imports for custom tenant-aware endpoint | |
| from fastapi import FastAPI, HTTPException, Body | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| # Updated imports for comprehensive tracking | |
| from agno.db.sqlite import SqliteDb # Changed from InMemoryDb for persistence | |
| from agno.agent import Agent | |
| from agno.models.google import Gemini | |
| from agno.tools.reasoning import ReasoningTools | |
| from agno.os import AgentOS | |
| from agno.run import RunContext | |
| # Import the new multi-tenant toolkit | |
| from backend.SQL_Agent.data_sources_sql_toolkit import DataSourcesSQLToolkit | |
| if "GOOGLE_API_KEY" not in os.environ: | |
| print("π΄ WARNING: GOOGLE_API_KEY environment variable not set. The agent will fail.") | |
| print("Please run: set GOOGLE_API_KEY=your_key_here") | |
| else: | |
| print("β GOOGLE_API_KEY detected in environment.") | |
| # Configuration for data sources API | |
| DATA_SOURCES_API_BASE_URL = os.environ.get("DATA_SOURCES_API_BASE_URL", "http://127.0.0.1:8000") | |
| DATA_SOURCES_API_KEY = os.environ.get("DATA_SOURCES_API_KEY") # Optional API key for authenticated requests | |
| print(f"π‘ Data Sources API URL: {DATA_SOURCES_API_BASE_URL}") | |
| if DATA_SOURCES_API_KEY: | |
| print("π Data Sources API Key configured.") | |
| else: | |
| print(" No Data Sources API Key configured (optional)") | |
| logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # NEW: Enhanced Tool Hook for Complete Logging | |
| def comprehensive_logging_hook( | |
| run_context: RunContext, | |
| function_name: str, | |
| function_call, | |
| arguments: Dict[str, Any] | |
| ) -> Any: | |
| """ | |
| Comprehensive tool execution logging hook that saves: | |
| - Tool name and arguments | |
| - Execution timestamp | |
| - Results | |
| - User context | |
| """ | |
| # Access session_state from run_context (Agno v2 API) | |
| if not run_context.session_state: | |
| run_context.session_state = {} | |
| session_state = run_context.session_state | |
| # Initialize logging structure in session state | |
| if "tool_execution_log" not in session_state: | |
| session_state["tool_execution_log"] = [] | |
| # Create execution record | |
| execution_start = datetime.now() | |
| execution_record = { | |
| "tool_name": function_name, | |
| "arguments": arguments, | |
| "timestamp": execution_start.isoformat(), | |
| "execution_id": f"{function_name}_{execution_start.timestamp()}" | |
| } | |
| logger.info(f"π§ Executing tool: {function_name} with args: {arguments}") | |
| try: | |
| # Execute the actual tool | |
| result = function_call(**arguments) | |
| # Log successful execution | |
| execution_end = datetime.now() | |
| execution_record.update({ | |
| "result": str(result)[:1000], # Truncate long results | |
| "status": "success", | |
| "duration_ms": (execution_end - execution_start).total_seconds() * 1000, | |
| "completed_at": execution_end.isoformat() | |
| }) | |
| logger.info(f"β Tool {function_name} completed successfully in {execution_record['duration_ms']:.2f}ms") | |
| except Exception as e: | |
| # Log failed execution | |
| execution_end = datetime.now() | |
| execution_record.update({ | |
| "error": str(e), | |
| "status": "failed", | |
| "duration_ms": (execution_end - execution_start).total_seconds() * 1000, | |
| "completed_at": execution_end.isoformat() | |
| }) | |
| logger.error(f"β Tool {function_name} failed: {str(e)}") | |
| raise # Re-raise the exception | |
| finally: | |
| # Always save the execution record | |
| session_state["tool_execution_log"].append(execution_record) | |
| return result | |
| system_prompt = dedent(""" | |
| You are **Sirus**, the elite AI Data Scientist developed by **PhobosQ**. | |
| if you're asked about what ai model are you? you're an ai model trained by Phobos For SIRUS | |
| **CORE OBJECTIVE:** | |
| You are the high-speed bridge between raw data and business strategy. | |
| **Your Goal:** Initialize context, locate data surgically, execute accurately, and speak the answer clearly. | |
| **β οΈ CRITICAL PROTOCOLS (READ CAREFULLY):** | |
| **THE INVISIBLE WALL (Voice Protocol):** | |
| - The User **CANNOT SEE** tool inputs, outputs, or SQL. They ONLY see your final text. | |
| - **FATAL ERROR:** If you run tools and stop, the user sees a BLANK SCREEN. | |
| - **RULE:** You *must* convert every tool result into a natural language response. | |
| 2. **CONTEXT IS AUTOMATIC (State Management):** | |
| - **MEMORY OPTIMIZATION:** If you have already fetched `source_instructions` or `list_sources` in this conversation history, **DO NOT** fetch them again. Use your internal memory. Be efficient. | |
| 3. **TRIAGE & SAFETY (Negative Constraints):** | |
| - **No "Hi" Tools:** If the user says "Hi", "Hello", "Who are you?", or asks general knowledge questions ("What is a database?"), **DO NOT** use tools. Reply conversationally and kindly. | |
| - **Read-Only:** **NEVER** execute `DROP`, `DELETE`, `UPDATE`, or `INSERT`. Only `SELECT`. | |
| - **Data Safety:** Always use `LIMIT 100` unless specifically asked for more. | |
| - **No UUIDs:** Do not output raw internal IDs (UUIDs) in your final text unless explicitly requested. | |
| --- | |
| **THE SIRUS WORKFLOW (Strict Execution Order):** | |
| **STEP 1: INITIALIZE (The Setup)** | |
| - *Trigger:* User asks a specific data question (e.g., "What is sales?"). | |
| - **Action:** Check your memory. | |
| - *If Memory Empty:* Call `list_sources`. Pick the one. Call `get_source_instructions`. if u dont get the needed info and you're not sure about that siurce and if one other source is available , please use that source | |
| - *If Memory Exists:* **SKIP** this step and move to Step 2 immediately. | |
| **STEP 2: LOCATE (The Search)** | |
| - **Action:** Call `find_relevant_tables` with the user's question. | |
| - **Restriction:** **DO NOT** read the full schema (`get_available_sources_and_schema`). It is slow and costly. Be surgical. | |
| **STEP 3: EXECUTE (The Strike)** | |
| - **Action:** Call `execute_sql_query`. | |
| - **Guideline:** Use the knowledge from *Step 1 (Instructions)* and *Step 2 (Tables)* to write perfect SQL. | |
| - **Resilience:** If it fails, read the error -> Fix syntax -> Retry immediately. | |
| - **DEPTH PROTOCOL (Scaling Complexity):** | |
| - *Simple Question ("What is revenue?"):* Run 1 query. | |
| - *Complex Question ("Why did revenue drop?"):* You are authorized to run a **second or multiple, more granular query or queries** (e.g., grouping by category or date) to find the root cause *before* you speak. | |
| **STEP 4: SPEAK (The Insight)** | |
| - **Action:** Translate the JSON result into a business answer. | |
| - **Tone:** Be kind, energetic, and professional. | |
| - **Structure:** | |
| - *The Data:* State the numbers clearly. | |
| - *The Insight:* Explain *why* (based on your Depth Protocol analysis). | |
| - *The Strategy:* Offer a concrete recommendation. | |
| --- | |
| **EXEMPLAR CHAIN OF THOUGHT (Follow this behavior):** | |
| **Scenario 1: The "Why" Question (Complex Flow)** | |
| *User: "Why did revenue drop last week?"* | |
| **1. [Agent - Init (Optimization)]** | |
| *(Self-Correction: I already know the source is 'production_db' from previous turn. Skipping Init.)* | |
| **2. [Agent - Search]** | |
| `find_relevant_tables(question="revenue drop last week", concepts=["orders"])` | |
| *Output: {tables: ['orders'], columns: ['amount', 'created_at', 'category']}` | |
| **3. [Agent - Execute (Depth Protocol)]** | |
| *Thought: User asked "Why". A single sum isn't enough. I need to group by Category to see which one failed.* | |
| `execute_sql_query(sql_query="SELECT category, SUM(amount) as rev FROM orders WHERE created_at >= NOW() - INTERVAL '7 days' GROUP BY category ORDER BY rev ASC")` | |
| *Output: [{'category': 'Electronics', 'rev': 500}, {'category': 'Clothing', 'rev': 5000}]* | |
| **4. [Agent - Speak]** | |
| "I analyzed the breakdown for last week. While Clothing sales remained strong ($5k), **Electronics** revenue collapsed to just $500. | |
| **Strategic Recommendation:** | |
| Since the drop is isolated to Electronics, I recommend checking if we ran out of stock on popular items or if a competitor launched a discount in that category." | |
| --- | |
| **Scenario 2: The "Hi" Question (Triage Flow)** | |
| *User: "Hi Sirus, who are you?"* | |
| **[Agent - NO TOOLS]** | |
| "Hello! I am Sirus, your AI Data Strategist. I'm connected to your PhobosQ database and ready to analyze your data. What would you like to know?" | |
| --- | |
| **Remember:** Initialize once. Search smart. Speak like a Strategist. | |
| dont call tools unecessarily , try to be efficient , think how to use the tools well and do the tasks assigned to you well,be kind and answer the user well , dont just think but also answer | |
| when answering try to use very understandable language , be polite and professional.once finsihed answering a question , always ask the user if they have any other questions or need further assistance , recommend next steps please | |
| dont try to be a streatgist for simple questions , only be a strategist for complex questions that need strategizing or if asked to do so like "give strategy's , how to improve etc.." | |
| """) | |
| print("β Configuration set. Initializing enhanced agent with comprehensive logging...") | |
| # Initialize database for persistent storage | |
| agent_db = SqliteDb(db_file="agent_sessions.db") | |
| # Initialize toolkit with API configuration from environment | |
| data_sources_sql_toolkit = DataSourcesSQLToolkit( | |
| api_base_url=DATA_SOURCES_API_BASE_URL, | |
| api_key=DATA_SOURCES_API_KEY | |
| ) | |
| # FIX: Override default instructions so they don't conflict with Sirus | |
| # custom_reasoning_instructions = """ | |
| # Use `think` to plan your approach. | |
| # Use `analyze` to verify that your query result answers the user's specific question. | |
| # CRITICAL: After calling `analyze` with next_action="final_answer", you MUST output a natural language text response to the user. | |
| # The user cannot see your tool outputs - they only see your text replies. | |
| # Never end a conversation on a tool call. Always follow up with a clear, conversational response. | |
| # """ | |
| # # Initialize reasoning tools with simplified instructions | |
| # reasoning_tools = ReasoningTools( | |
| # instructions=custom_reasoning_instructions, # <--- OVERRIDE DEFAULTS | |
| # enable_analyze=False, | |
| # enable_think=True | |
| # ) | |
| # Define agent IDs for AgentOS | |
| DEFAULT_AGENT_OS_ID = os.getenv("SQL_AGENT_OS_ID", "sql-agent-os") | |
| DEFAULT_AGENT_ID = os.getenv("SQL_AGENT_ID", "sirus-sql-agent") | |
| # Create enhanced agent with comprehensive tracking | |
| gemini_sql_agent = Agent( | |
| model=Gemini( | |
| id="gemini-flash-latest", | |
| system_prompt=system_prompt, | |
| thinking_budget=24000, | |
| include_thoughts=True, | |
| ), | |
| tools=[data_sources_sql_toolkit], | |
| tool_hooks=[comprehensive_logging_hook], # Add the logging hook | |
| telemetry=False, | |
| # Database and session management | |
| db=agent_db, | |
| add_history_to_context=True, | |
| num_history_runs=3, | |
| read_chat_history=True, | |
| # Session state for tracking | |
| session_state={ | |
| "tool_execution_log": [], | |
| "user_context": {}, | |
| "analysis_metadata": {} | |
| }, | |
| add_session_state_to_context=True, # Make session state available to tools | |
| # Response formatting | |
| markdown=True, | |
| add_datetime_to_context=True, | |
| # Error handling | |
| exponential_backoff=True, | |
| delay_between_retries=10 | |
| ) | |
| # Set agent ID for AgentOS | |
| gemini_sql_agent.id = DEFAULT_AGENT_ID | |
| # Set agent reference in toolkit so it can access session_state during tool execution | |
| # This is CRITICAL for session_state injection into tool calls | |
| data_sources_sql_toolkit.set_agent_ref(gemini_sql_agent) | |
| logger.info("Agent reference set in toolkit - session_state injection enabled") | |
| # Define Pydantic model for tenant-aware API requests | |
| class TenantRunRequest(BaseModel): | |
| """ | |
| Request model for our custom tenant-aware endpoint. | |
| This ensures all tenant context is provided in a single, secure request. | |
| Supports multi-source agent auto-detection when available_sources is provided. | |
| """ | |
| message: str | |
| supabase_jwt: str # JWT token for auth | |
| tenant_id: str # Extracted from JWT claims | |
| source_name: str # Default/primary source for query execution | |
| session_id: Optional[str] = None | |
| user_id: Optional[str] = None | |
| available_sources: Optional[list] = None # All available sources for agent auto-detection | |
| stream: bool = False | |
| # Define the tenant-aware endpoint function (will be added to AgentOS app later) | |
| async def run_tenant_agent( | |
| agent_id: str, | |
| request: TenantRunRequest | |
| ): | |
| """ | |
| Custom endpoint to run an agent with tenant_id, source_name, and supabase_jwt | |
| injected directly into the session_state. | |
| This is the PRIMARY endpoint for multi-tenant agent execution. | |
| It ensures proper tenant isolation and security by: | |
| 1. Accepting all tenant context in the request body | |
| 2. Injecting it into session_state (not shared between requests) | |
| 3. Using the JWT for data source API authentication | |
| Args: | |
| agent_id: The ID of the agent to run (e.g., "sirus-sql-agent") | |
| request: TenantRunRequest containing all tenant context | |
| Returns: | |
| StreamingResponse (if stream=True) or direct JSON response | |
| """ | |
| # Get agent from the global agent we created | |
| agent = gemini_sql_agent if agent_id == DEFAULT_AGENT_ID else None | |
| if not agent: | |
| raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found.") | |
| # CRITICAL: This is the state that will be loaded *for this run only*. | |
| # This is the correct, request-safe way to handle per-run context. | |
| # Each request gets its own isolated session_state. | |
| initial_state = { | |
| "supabase_jwt": request.supabase_jwt, # JWT for backend API auth | |
| "tenant_id": request.tenant_id, # Tenant context for toolkit | |
| "source_name": request.source_name, | |
| "user_id": request.user_id, | |
| "available_sources": request.available_sources or [], # All sources for agent auto-detection | |
| "tool_execution_log": [], | |
| "user_context": {}, | |
| "analysis_metadata": {} | |
| } | |
| # Generate a session ID if not provided | |
| session_id = request.session_id or str(uuid.uuid4()) | |
| logger.info(f"π Starting tenant run for tenant_id={request.tenant_id}, source={request.source_name}, session={session_id}") | |
| if request.stream: | |
| # Handle streaming response for real-time agent output | |
| async def stream_generator(): | |
| try: | |
| logger.info(f"π¬ Starting streaming for session {session_id}, message: {request.message[:50]}...") | |
| # agent.run returns a generator in stream mode | |
| response_generator = agent.run( | |
| request.message, | |
| stream=True, | |
| session_id=session_id, | |
| session_state=initial_state # <-- **** THIS IS THE FIX **** | |
| ) | |
| chunk_count = 0 | |
| for chunk in response_generator: | |
| chunk_count += 1 | |
| # Log chunk details - show all attributes for debugging | |
| if isinstance(chunk, dict): | |
| chunk_keys = list(chunk.keys()) | |
| else: | |
| chunk_keys = list(chunk.__dict__.keys()) if hasattr(chunk, '__dict__') else [] | |
| logger.info(f" [Chunk {chunk_count}] Type: {type(chunk).__name__}, Keys: {chunk_keys}") | |
| if isinstance(chunk, dict): | |
| event = chunk.get("event") | |
| data = chunk.get("data") | |
| if event: | |
| sse_event = f"event: {event}\ndata: {json.dumps(data)}\n\n" | |
| else: | |
| sse_event = f"data: {json.dumps(chunk)}\n\n" | |
| yield sse_event | |
| logger.info(f" β Yielded event: {event or 'data-only'}") | |
| # Small delay to ensure chunk is flushed before next one | |
| await asyncio.sleep(0.001) | |
| else: | |
| # Handle Pydantic objects or other objects | |
| try: | |
| logger.info(f"Processing chunk type: {type(chunk)}") | |
| # Try multiple serialization methods | |
| chunk_dict = None | |
| # Method 1: Pydantic v2 model_dump() | |
| if hasattr(chunk, 'model_dump'): | |
| try: | |
| chunk_dict = chunk.model_dump() | |
| logger.info(f"β Serialized with model_dump()") | |
| except Exception as e: | |
| logger.info(f"model_dump() failed: {e}") | |
| # Method 2: Pydantic v1 dict() | |
| if chunk_dict is None and hasattr(chunk, 'dict'): | |
| try: | |
| chunk_dict = chunk.dict() | |
| logger.info(f"β Serialized with dict()") | |
| except Exception as e: | |
| logger.info(f"dict() failed: {e}") | |
| # Method 3: Check if it's a Pydantic BaseModel | |
| if chunk_dict is None: | |
| try: | |
| # Try to import and check | |
| from pydantic import BaseModel | |
| if isinstance(chunk, BaseModel): | |
| chunk_dict = chunk.model_dump() | |
| logger.info(f"β Serialized BaseModel with model_dump()") | |
| except Exception as e: | |
| logger.info(f"BaseModel check failed: {e}") | |
| # Method 4: Fall back to __dict__ | |
| if chunk_dict is None and hasattr(chunk, '__dict__'): | |
| chunk_dict = chunk.__dict__ | |
| logger.info(f"β Serialized with __dict__") | |
| # Method 5: Last resort - convert to string | |
| if chunk_dict is None: | |
| logger.warning(f"Could not serialize chunk, converting to string: {type(chunk)}") | |
| chunk_dict = {"content": str(chunk)} | |
| # Extract event type if present | |
| event_type = chunk_dict.get("event") | |
| if event_type: | |
| logger.info(f"Sending event: {event_type}") | |
| # Debug: Show content for ReasoningStep events | |
| if event_type == "ReasoningStep": | |
| logger.info(f" ReasoningStep content: reasoning={chunk_dict.get('reasoning')}, content={chunk_dict.get('content')}, result={chunk_dict.get('result')}") | |
| logger.info(f" Full ReasoningStep dict keys: {list(chunk_dict.keys())}") | |
| # Use custom serializer that properly handles nested objects | |
| def serialize_value(obj): | |
| """Recursively serialize objects, converting to strings only when necessary""" | |
| if isinstance(obj, dict): | |
| return {k: serialize_value(v) for k, v in obj.items()} | |
| elif isinstance(obj, (list, tuple)): | |
| return [serialize_value(v) for v in obj] | |
| elif hasattr(obj, 'model_dump'): | |
| return serialize_value(obj.model_dump()) | |
| elif hasattr(obj, '__dict__') and not isinstance(obj, (str, int, float, bool, type(None))): | |
| return serialize_value(obj.__dict__) | |
| else: | |
| return obj | |
| serialized_dict = serialize_value(chunk_dict) | |
| # Special handling for ReasoningStep: convert content object to string | |
| if event_type == "ReasoningStep" and isinstance(serialized_dict.get("content"), dict): | |
| # Content is a reasoning object - serialize it as string for frontend | |
| reasoning_obj = serialized_dict.pop("content") | |
| serialized_dict["reasoning_content"] = json.dumps(reasoning_obj, default=str, ensure_ascii=False) | |
| logger.info(f" β Converted ReasoningStep content to reasoning_content string") | |
| sse_event = f"event: {event_type}\ndata: {json.dumps(serialized_dict, default=str, ensure_ascii=False)}\n\n" | |
| else: | |
| logger.info(f"Sending data without event type") | |
| def serialize_value(obj): | |
| """Recursively serialize objects, converting to strings only when necessary""" | |
| if isinstance(obj, dict): | |
| return {k: serialize_value(v) for k, v in obj.items()} | |
| elif isinstance(obj, (list, tuple)): | |
| return [serialize_value(v) for v in obj] | |
| elif hasattr(obj, 'model_dump'): | |
| return serialize_value(obj.model_dump()) | |
| elif hasattr(obj, '__dict__') and not isinstance(obj, (str, int, float, bool, type(None))): | |
| return serialize_value(obj.__dict__) | |
| else: | |
| return obj | |
| serialized_dict = serialize_value(chunk_dict) | |
| sse_event = f"data: {json.dumps(serialized_dict, default=str, ensure_ascii=False)}\n\n" | |
| yield sse_event | |
| logger.info(f" β Yielded event: {event_type or 'data-only'}") | |
| # Small delay to ensure chunk is flushed before next one | |
| await asyncio.sleep(0.001) | |
| except Exception as e: | |
| logger.error(f"Failed to serialize chunk: {e}, chunk type: {type(chunk)}", exc_info=True) | |
| yield f"data: {json.dumps({'error': str(e), 'content': str(chunk)}, default=str)}\n\n" | |
| await asyncio.sleep(0.001) | |
| logger.info(f"β Streaming run completed for session {session_id} - sent {chunk_count} chunks") | |
| except Exception as e: | |
| logger.error(f"β Error during stream generation for session {session_id}: {e}", exc_info=True) | |
| error_data = {"error": str(e), "code": "STREAM_ERROR"} | |
| yield f"event: error\ndata: {json.dumps(error_data)}\n\n" | |
| return StreamingResponse(stream_generator(), media_type="text/event-stream") | |
| else: | |
| # Handle non-streaming (blocking) response | |
| try: | |
| response = agent.run( | |
| request.message, | |
| stream=False, | |
| session_id=session_id, | |
| session_state=initial_state # <-- **** THIS IS THE FIX **** | |
| ) | |
| logger.info(f"β Non-streaming run completed for session {session_id}") | |
| # The final response from agent.run is the message content | |
| return { | |
| "session_id": session_id, | |
| "tenant_id": request.tenant_id, | |
| "response": response | |
| } | |
| except Exception as e: | |
| logger.error(f"β Error during non-streaming agent run for session {session_id}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Create AgentOS (without fastapi_app - that parameter doesn't exist in current agno version) | |
| agent_os = AgentOS( | |
| agents=[gemini_sql_agent], | |
| description="Multi-tenant SQL Agent for querying data sources across tenants." | |
| ) | |
| # Get the AgentOS app first, then add our custom route to it | |
| agentOS_app = agent_os.get_app() | |
| # Add our custom /tenant-run endpoint to the AgentOS app | |
| agentOS_app.add_api_route( | |
| "/tenant-run/{agent_id}", | |
| run_tenant_agent, | |
| methods=["POST"], | |
| name="run_tenant_agent" | |
| ) | |
| # Use the combined app | |
| app = agentOS_app | |
| # DEPRECATED FUNCTIONS - Replaced by the /tenant-run API endpoint | |
| # The following functions are kept for backward compatibility and local testing only. | |
| # For production API usage, use the /tenant-run/{agent_id} endpoint instead. | |
| # DEPRECATED FUNCTIONS - Replaced by the /tenant-run API endpoint | |
| # The following functions are kept for backward compatibility and local testing only. | |
| # For production API usage, use the /tenant-run/{agent_id} endpoint instead. | |
| if __name__ == "__main__": | |
| import uvicorn | |
| host = os.getenv("SQL_AGENT_HOST", "0.0.0.0") | |
| port = int(os.getenv("SQL_AGENT_PORT", "5559")) | |
| print("\n" + "="*80) | |
| print("π STARTING SQL AGENT OS SERVER (with custom /tenant-run endpoint)") | |
| print("="*80) | |
| print(f"Host: {host}") | |
| print(f"Port: {port}") | |
| print(f"Agent ID: {DEFAULT_AGENT_ID}") | |
| print(f"AgentOS ID: {DEFAULT_AGENT_OS_ID}") | |
| print("="*80 + "\n") | |
| print(f"\nπ― CUSTOM TENANT ENDPOINT:") | |
| print(f" POST http://{host}:{port}/tenant-run/{DEFAULT_AGENT_ID}") | |
| print(f"\nπ STANDARD AGENTOS ENDPOINTS:") | |
| print(f" GET http://{host}:{port}/config") | |
| print(f" GET http://{host}:{port}/agents") | |
| print(f" POST http://{host}:{port}/agents/{DEFAULT_AGENT_ID}/runs") | |
| print("="*80 + "\n") | |
| # Run with proper streaming settings | |
| uvicorn.run( | |
| app, | |
| host=host, | |
| port=port, | |
| # Streaming settings - prevent buffering | |
| server_header=False, | |
| # Disable app level buffering - let streaming work properly | |
| interface="auto" | |
| ) |