Spaces:
Running
Running
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| import os | |
| import shutil | |
| import uuid | |
| import json | |
| import json | |
| import time | |
| from typing import Optional, List, Dict, Any | |
| from datetime import timedelta | |
| import sys | |
| from pathlib import Path | |
| import logging | |
| import asyncio | |
| import polars as pl | |
| import duckdb | |
| from dotenv import load_dotenv | |
| import redis | |
| from minio import Minio | |
| from minio.error import S3Error | |
| from contextlib import asynccontextmanager | |
| # Load environment variables from the backend root | |
| PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent | |
| load_dotenv(dotenv_path=PROJECT_ROOT / '.env') | |
| # Add the parent directory to sys.path so we can import excel_module | |
| EXCEL_MODULE_ROOT = Path(__file__).resolve().parent.parent | |
| sys.path.insert(0, str(EXCEL_MODULE_ROOT.parent)) | |
| # Import configuration | |
| from core.minio.config import ( | |
| get_minio_config, | |
| get_redis_config, | |
| MINIO_BUCKET_NAME, | |
| SESSION_TTL_SECONDS, | |
| ALLOWED_FILE_EXTENSIONS | |
| ) | |
| # Set up logging with UTF-8 support | |
| sys.stdout.reconfigure(encoding='utf-8', errors='replace') | |
| sys.stderr.reconfigure(encoding='utf-8', errors='replace') | |
| # Create console handler with UTF-8 support | |
| console_handler = logging.StreamHandler(sys.stdout) | |
| console_handler.setLevel(logging.INFO) | |
| console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| console_handler.setFormatter(console_formatter) | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("excel_service.log", encoding='utf-8'), | |
| console_handler | |
| ] | |
| ) | |
| logger = logging.getLogger("excel_service") | |
| # Also create a dedicated handler for just this service | |
| excel_log_file = os.path.join(os.path.dirname(__file__), 'excel_service_detailed.log') | |
| excel_file_handler = logging.FileHandler(excel_log_file, encoding='utf-8') | |
| excel_file_handler.setLevel(logging.INFO) | |
| excel_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| excel_file_handler.setFormatter(excel_formatter) | |
| logger.addHandler(excel_file_handler) | |
| # CAPTURE ALL LOGS FROM ALL MODULES - Add the file handler to the root logger | |
| root_logger = logging.getLogger() | |
| root_logger.addHandler(excel_file_handler) | |
| # Also ensure all child loggers use our handlers | |
| for logger_name in ['LLMService', 'DataQueryEngine', 'excel_service', 'text2sql_router']: | |
| child_logger = logging.getLogger(logger_name) | |
| child_logger.addHandler(excel_file_handler) | |
| child_logger.setLevel(logging.INFO) | |
| logger.info("Excel Service logger initialized - detailed logs saved to excel_service_detailed.log") | |
| logger.info("All module logs will now be captured in excel_service_detailed.log") | |
| # Import the Enterprise DataQueryEngine (Phase 1 upgrade) | |
| from excel_module.agents.data_engine import DataQueryEngine | |
| # Initialize MinIO and Redis clients | |
| minio_client = None | |
| redis_client = None | |
| def initialize_clients(): | |
| """Initialize MinIO and Redis clients""" | |
| global minio_client, redis_client | |
| try: | |
| # Initialize MinIO client | |
| minio_config = get_minio_config() | |
| minio_client = Minio(**minio_config) | |
| logger.info(f"MinIO client initialized - endpoint: {minio_config['endpoint']}") | |
| # Initialize Redis client | |
| redis_config = get_redis_config() | |
| redis_client = redis.Redis(**redis_config) | |
| redis_client.ping() # Test connection | |
| logger.info(f"Redis client initialized - host: {redis_config['host']}:{redis_config['port']}") | |
| # Ensure MinIO bucket exists | |
| if not minio_client.bucket_exists(MINIO_BUCKET_NAME): | |
| minio_client.make_bucket(MINIO_BUCKET_NAME) | |
| logger.info(f"Created MinIO bucket: {MINIO_BUCKET_NAME}") | |
| else: | |
| logger.info(f"MinIO bucket exists: {MINIO_BUCKET_NAME}") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize clients: {str(e)}") | |
| raise | |
| async def lifespan(app: FastAPI): | |
| """Lifespan handler to initialize and clean up resources.""" | |
| # Startup | |
| try: | |
| initialize_clients() | |
| logger.info("Excel Service startup complete - stateless architecture enabled") | |
| except Exception as e: | |
| logger.error(f"Startup failed: {e}") | |
| raise | |
| yield | |
| # Shutdown | |
| try: | |
| if redis_client: | |
| try: | |
| # redis-py client supports close() to close connection pools | |
| redis_client.close() | |
| logger.info("Redis client closed") | |
| except Exception as e: | |
| logger.warning(f"Failed to close Redis client cleanly: {e}") | |
| # Minio client does not require an explicit close | |
| except Exception as e: | |
| logger.error(f"Error during shutdown: {e}") | |
| app = FastAPI( | |
| title="Sirus Intelligence Excel API", | |
| description="API for Excel file-based analysis using natural language", | |
| lifespan=lifespan, | |
| ) | |
| # Add CORS middleware - only allow proxy access | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["http://localhost:8080"], # Only the proxy can access this service | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def root(): | |
| return {"message": "Sirus Intelligence Unified API is running - Stateless Architecture"} | |
| async def health_check(): | |
| """Health check endpoint for Docker""" | |
| return {"status": "healthy", "service": "excel-service"} | |
| async def generate_upload_url( | |
| filename: str = Form(...), | |
| content_type: Optional[str] = Form(None) | |
| ): | |
| """ | |
| Generate a pre-signed URL for direct file upload to MinIO | |
| Phase 2: Stateless architecture - Step 1 of 2-step upload process | |
| """ | |
| try: | |
| # Validate file extension | |
| file_extension = os.path.splitext(filename)[1].lower() | |
| if file_extension not in ALLOWED_FILE_EXTENSIONS: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported file format. Allowed: {', '.join(ALLOWED_FILE_EXTENSIONS)}" | |
| ) | |
| # Generate unique object name | |
| object_name = f"{uuid.uuid4()}{file_extension}" | |
| # Generate pre-signed URL (valid for 15 minutes) | |
| presigned_url = minio_client.presigned_put_object( | |
| bucket_name=MINIO_BUCKET_NAME, | |
| object_name=object_name, | |
| expires=timedelta(minutes=15) | |
| ) | |
| logger.info(f"[UPLOAD_URL] Generated pre-signed URL for {filename} -> {object_name}") | |
| return { | |
| "upload_url": presigned_url, | |
| "object_name": object_name, | |
| "expires_in_minutes": 15, | |
| "status": "success" | |
| } | |
| except Exception as e: | |
| logger.error(f"[UPLOAD_URL] Error generating upload URL: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error generating upload URL: {str(e)}") | |
| async def create_session( | |
| object_name: str = Form(...), | |
| original_filename: str = Form(...), | |
| sheet_name: Optional[str] = Form(None) | |
| ): | |
| """ | |
| Create a new session after file upload to MinIO | |
| Phase 2: Stateless architecture - Step 2 of 2-step upload process | |
| """ | |
| try: | |
| # Verify object exists in MinIO | |
| try: | |
| minio_client.stat_object(MINIO_BUCKET_NAME, object_name) | |
| except S3Error: | |
| raise HTTPException(status_code=404, detail="File not found in storage") | |
| # Generate session ID | |
| session_id = str(uuid.uuid4()) | |
| # Create session metadata | |
| session_data = { | |
| "object_name": object_name, | |
| "original_filename": original_filename, | |
| "sheet_name": sheet_name, | |
| "created_at": time.time(), | |
| "last_accessed": time.time() | |
| } | |
| # Store session in Redis with TTL | |
| redis_client.setex( | |
| name=f"session:{session_id}", | |
| time=SESSION_TTL_SECONDS, | |
| value=json.dumps(session_data) | |
| ) | |
| logger.info(f"[CREATE_SESSION] Created session {session_id} for {original_filename}") | |
| return { | |
| "session_id": session_id, | |
| "status": "success", | |
| "message": f"Session created for {original_filename}", | |
| "expires_in_seconds": SESSION_TTL_SECONDS | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"[CREATE_SESSION] Error creating session: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error creating session: {str(e)}") | |
| # Helper function to get session from Redis | |
| def get_session_from_redis(session_id: str) -> Dict[str, Any]: | |
| """Retrieve session data from Redis""" | |
| try: | |
| session_key = f"session:{session_id}" | |
| session_data = redis_client.get(session_key) | |
| if not session_data: | |
| raise HTTPException(status_code=404, detail="Session not found or expired") | |
| # Update last accessed time | |
| session = json.loads(session_data) | |
| session["last_accessed"] = time.time() | |
| redis_client.setex(session_key, SESSION_TTL_SECONDS, json.dumps(session)) | |
| return session | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=500, detail="Invalid session data") | |
| except Exception as e: | |
| logger.error(f"Error retrieving session {session_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Error retrieving session") | |
| async def get_session_info(session_id: str): | |
| """ | |
| Get information about a session including data profile | |
| """ | |
| # Get session from Redis | |
| session_data = get_session_from_redis(session_id) | |
| try: | |
| # Create a stateless data engine instance for this request | |
| engine = DataQueryEngine(minio_client, redis_client) | |
| # Load the file from MinIO to get metadata | |
| result = engine.load_file( | |
| object_name=session_data["object_name"], | |
| original_filename=session_data["original_filename"], | |
| sheet_name=session_data.get("sheet_name") | |
| ) | |
| if result["status"] == "error": | |
| raise HTTPException(status_code=400, detail=result["message"]) | |
| # Get file info and data profile | |
| file_info = engine.get_file_info() | |
| data_profile = engine.get_data_profile() | |
| # Clean up the engine | |
| engine.close() | |
| return { | |
| "session_id": session_id, | |
| "file_name": session_data["original_filename"], | |
| "object_name": session_data["object_name"], | |
| "created_at": session_data["created_at"], | |
| "last_accessed": session_data["last_accessed"], | |
| "file_info": file_info, | |
| "data_profile": data_profile, | |
| "status": "session_active" | |
| } | |
| except Exception as e: | |
| logger.error(f"[SESSION_INFO] Error getting session info: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error getting session info: {str(e)}") | |
| async def change_sheet(session_id: str, sheet_name: str = Form(...)): | |
| """ | |
| Change the active sheet in an Excel file - Stateless architecture implementation | |
| """ | |
| # Get session from Redis | |
| session_data = get_session_from_redis(session_id) | |
| try: | |
| # Create a stateless data engine instance for this request | |
| engine = DataQueryEngine(minio_client, redis_client) | |
| # Load the file first to get available sheets | |
| result = engine.load_file( | |
| object_name=session_data["object_name"], | |
| original_filename=session_data["original_filename"], | |
| sheet_name=None # Load default to get all sheets | |
| ) | |
| if result["status"] == "error": | |
| engine.close() | |
| raise HTTPException(status_code=400, detail=result["message"]) | |
| # Check if it's an Excel file | |
| file_extension = os.path.splitext(session_data["original_filename"])[1].lower() | |
| if file_extension not in ['.xlsx', '.xls', '.xlsm']: | |
| engine.close() | |
| raise HTTPException(status_code=400, detail="This operation is only valid for Excel files") | |
| # Get available sheets | |
| available_sheets = result.get("available_sheets", []) | |
| if sheet_name not in available_sheets: | |
| engine.close() | |
| raise HTTPException(status_code=400, detail=f"Sheet '{sheet_name}' not found. Available sheets: {available_sheets}") | |
| # Change to the requested sheet | |
| change_result = engine.change_sheet(sheet_name) | |
| if change_result["status"] == "error": | |
| engine.close() | |
| raise HTTPException(status_code=400, detail=change_result["message"]) | |
| # Update session data in Redis with new sheet | |
| session_data["sheet_name"] = sheet_name | |
| session_data["last_accessed"] = time.time() | |
| redis_client.setex( | |
| name=f"session:{session_id}", | |
| time=SESSION_TTL_SECONDS, | |
| value=json.dumps(session_data) | |
| ) | |
| # Get updated file info | |
| file_info = engine.get_file_info() | |
| data_profile = engine.get_data_profile() | |
| # Clean up the engine | |
| engine.close() | |
| return { | |
| "session_id": session_id, | |
| "file_name": session_data["original_filename"], | |
| "status": "success", | |
| "message": change_result["message"], | |
| "active_sheet": sheet_name, | |
| "available_sheets": available_sheets, | |
| "file_info": file_info, | |
| "data_profile": data_profile | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"[CHANGE_SHEET] Error changing sheet: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error changing sheet: {str(e)}") | |
| async def query_data(session_id: str, query: Dict[str, Any]): | |
| """ | |
| Process a natural language query - Stateless architecture implementation | |
| """ | |
| # Get session from Redis | |
| session_data = get_session_from_redis(session_id) | |
| if "text" not in query: | |
| raise HTTPException(status_code=400, detail="Query text is required") | |
| try: | |
| # Create a stateless data engine instance for this request | |
| engine = DataQueryEngine(minio_client, redis_client) | |
| # Load the file from MinIO | |
| logger.info(f"[QUERY] Processing stateless query for session {session_id}: '{query['text'][:50]}...'") | |
| result = engine.load_file( | |
| object_name=session_data["object_name"], | |
| original_filename=session_data["original_filename"], | |
| sheet_name=session_data.get("sheet_name") | |
| ) | |
| if result["status"] == "error": | |
| engine.close() | |
| raise HTTPException(status_code=400, detail=result["message"]) | |
| # Process the query | |
| query_result = engine.process_query(query["text"]) | |
| # Debug: Log the actual result structure (without full data array) | |
| log_result = query_result.copy() | |
| if "data" in log_result: | |
| log_result["data"] = f"<{len(query_result.get('data', []))} rows of data not logged>" | |
| logger.info(f"[QUERY] Query result structure: {json.dumps(log_result, default=str, indent=2)}") | |
| if query_result["status"] == "error": | |
| logger.error(f"Query processing error: {query_result['message']}") | |
| engine.close() | |
| raise HTTPException(status_code=400, detail=query_result["message"]) | |
| # Clean up the engine | |
| engine.close() | |
| # Return response with enterprise enhancements | |
| response = { | |
| "status": query_result["status"], | |
| "type": query_result.get("type", "data"), | |
| "data": query_result.get("data", []), | |
| "row_count": query_result.get("row_count", 0), | |
| "column_count": query_result.get("column_count", 0), | |
| "sql": query_result.get("sql", None), | |
| "response": query_result.get("response", None), | |
| "analysis": query_result.get("analysis", None), | |
| "enterprise_metadata": { | |
| "tables_used": query_result.get("tables_used", []), | |
| "query_complexity": query_result.get("query_complexity", "simple"), | |
| "stateless_processing": True, | |
| "architecture_version": "phase_2_stateless" | |
| } | |
| } | |
| # Add strategic analysis fields if present | |
| if query_result["type"] == "strategic": | |
| response.update({ | |
| "queries": query_result.get("queries", []), | |
| "metrics": query_result.get("metrics", []), | |
| "dimensions": query_result.get("dimensions", []) | |
| }) | |
| return response | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"[QUERY] Unexpected error in stateless query processing: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail="An unexpected error occurred while processing your query" | |
| ) | |
| async def delete_session(session_id: str): | |
| """ | |
| Delete a session and clean up resources - Redis-based stateless version | |
| """ | |
| # Verify session exists in Redis | |
| session_data = get_session_from_redis(session_id) | |
| # Delete session from Redis | |
| try: | |
| redis_client.delete(f"session:{session_id}") | |
| logger.info(f"[DELETE_SESSION] Session {session_id} deleted from Redis") | |
| # Note: In the stateless architecture, we don't need to clean up local files | |
| # as they are stored in MinIO. The file will remain in MinIO for potential | |
| # future access, and MinIO can handle its own cleanup policies. | |
| return {"status": "success", "message": "Session deleted successfully"} | |
| except Exception as e: | |
| logger.error(f"[DELETE_SESSION] Error deleting session {session_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Error deleting session") | |
| def cleanup_session(session_id: str): | |
| """ | |
| DEPRECATED - This function is no longer needed in stateless architecture | |
| Redis handles session cleanup automatically via TTL | |
| """ | |
| logger.warning("cleanup_session() called but is deprecated in stateless architecture") | |
| pass | |
| # Background task to clean up inactive sessions | |
| async def startup_event_old(): | |
| """ | |
| DEPRECATED - Old cleanup logic for stateful architecture | |
| In the new stateless architecture, Redis handles session cleanup via TTL | |
| """ | |
| logger.info("Old startup event - Redis TTL handles session cleanup in stateless architecture") | |
| pass | |
| if __name__ == "__main__": | |
| uvicorn.run("excel_service:app", host="0.0.0.0", port=5003, reload=False) |