File size: 19,403 Bytes
0535980
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
import os
import shutil
import uuid
import json
import json
import time
from typing import Optional, List, Dict, Any
from datetime import timedelta
import sys
from pathlib import Path
import logging
import asyncio
import polars as pl
import duckdb
from dotenv import load_dotenv
import redis
from minio import Minio
from minio.error import S3Error
from contextlib import asynccontextmanager

# Load environment variables from the backend root
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
load_dotenv(dotenv_path=PROJECT_ROOT / '.env')

# Add the parent directory to sys.path so we can import excel_module
EXCEL_MODULE_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(EXCEL_MODULE_ROOT.parent))

# Import configuration
from core.minio.config import (
    get_minio_config, 
    get_redis_config, 
    MINIO_BUCKET_NAME, 
    SESSION_TTL_SECONDS,
    ALLOWED_FILE_EXTENSIONS
)

# Set up logging with UTF-8 support
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
sys.stderr.reconfigure(encoding='utf-8', errors='replace')

# Create console handler with UTF-8 support
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(console_formatter)

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("excel_service.log", encoding='utf-8'),
        console_handler
    ]
)
logger = logging.getLogger("excel_service")

# Also create a dedicated handler for just this service
excel_log_file = os.path.join(os.path.dirname(__file__), 'excel_service_detailed.log')
excel_file_handler = logging.FileHandler(excel_log_file, encoding='utf-8')
excel_file_handler.setLevel(logging.INFO)
excel_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
excel_file_handler.setFormatter(excel_formatter)
logger.addHandler(excel_file_handler)

# CAPTURE ALL LOGS FROM ALL MODULES - Add the file handler to the root logger
root_logger = logging.getLogger()
root_logger.addHandler(excel_file_handler)

# Also ensure all child loggers use our handlers
for logger_name in ['LLMService', 'DataQueryEngine', 'excel_service', 'text2sql_router']:
    child_logger = logging.getLogger(logger_name)
    child_logger.addHandler(excel_file_handler)
    child_logger.setLevel(logging.INFO)

logger.info("Excel Service logger initialized - detailed logs saved to excel_service_detailed.log")
logger.info("All module logs will now be captured in excel_service_detailed.log")

# Import the Enterprise DataQueryEngine (Phase 1 upgrade)
from excel_module.agents.data_engine import DataQueryEngine

# Initialize MinIO and Redis clients
minio_client = None
redis_client = None

def initialize_clients():
    """Initialize MinIO and Redis clients"""
    global minio_client, redis_client
    
    try:
        # Initialize MinIO client
        minio_config = get_minio_config()
        minio_client = Minio(**minio_config)
        logger.info(f"MinIO client initialized - endpoint: {minio_config['endpoint']}")
        
        # Initialize Redis client
        redis_config = get_redis_config()
        redis_client = redis.Redis(**redis_config)
        redis_client.ping()  # Test connection
        logger.info(f"Redis client initialized - host: {redis_config['host']}:{redis_config['port']}")
        
        # Ensure MinIO bucket exists
        if not minio_client.bucket_exists(MINIO_BUCKET_NAME):
            minio_client.make_bucket(MINIO_BUCKET_NAME)
            logger.info(f"Created MinIO bucket: {MINIO_BUCKET_NAME}")
        else:
            logger.info(f"MinIO bucket exists: {MINIO_BUCKET_NAME}")
            
    except Exception as e:
        logger.error(f"Failed to initialize clients: {str(e)}")
        raise

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Lifespan handler to initialize and clean up resources."""
    # Startup
    try:
        initialize_clients()
        logger.info("Excel Service startup complete - stateless architecture enabled")
    except Exception as e:
        logger.error(f"Startup failed: {e}")
        raise

    yield

    # Shutdown
    try:
        if redis_client:
            try:
                # redis-py client supports close() to close connection pools
                redis_client.close()
                logger.info("Redis client closed")
            except Exception as e:
                logger.warning(f"Failed to close Redis client cleanly: {e}")
        # Minio client does not require an explicit close
    except Exception as e:
        logger.error(f"Error during shutdown: {e}")


app = FastAPI(
    title="Sirus Intelligence Excel API",
    description="API for Excel file-based analysis using natural language",
    lifespan=lifespan,
)

# Add CORS middleware - only allow proxy access
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:8080"],  # Only the proxy can access this service
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/")
async def root():
    return {"message": "Sirus Intelligence Unified API is running - Stateless Architecture"}

@app.get("/health")
async def health_check():
    """Health check endpoint for Docker"""
    return {"status": "healthy", "service": "excel-service"}

@app.post("/upload-url")
async def generate_upload_url(
    filename: str = Form(...),
    content_type: Optional[str] = Form(None)
):
    """
    Generate a pre-signed URL for direct file upload to MinIO
    Phase 2: Stateless architecture - Step 1 of 2-step upload process
    """
    try:
        # Validate file extension
        file_extension = os.path.splitext(filename)[1].lower()
        if file_extension not in ALLOWED_FILE_EXTENSIONS:
            raise HTTPException(
                status_code=400, 
                detail=f"Unsupported file format. Allowed: {', '.join(ALLOWED_FILE_EXTENSIONS)}"
            )
        
        # Generate unique object name
        object_name = f"{uuid.uuid4()}{file_extension}"
        
        # Generate pre-signed URL (valid for 15 minutes)
        presigned_url = minio_client.presigned_put_object(
            bucket_name=MINIO_BUCKET_NAME,
            object_name=object_name,
            expires=timedelta(minutes=15)
        )
        
        logger.info(f"[UPLOAD_URL] Generated pre-signed URL for {filename} -> {object_name}")
        
        return {
            "upload_url": presigned_url,
            "object_name": object_name,
            "expires_in_minutes": 15,
            "status": "success"
        }
        
    except Exception as e:
        logger.error(f"[UPLOAD_URL] Error generating upload URL: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error generating upload URL: {str(e)}")

@app.post("/sessions")
async def create_session(
    object_name: str = Form(...),
    original_filename: str = Form(...),
    sheet_name: Optional[str] = Form(None)
):
    """
    Create a new session after file upload to MinIO
    Phase 2: Stateless architecture - Step 2 of 2-step upload process
    """
    try:
        # Verify object exists in MinIO
        try:
            minio_client.stat_object(MINIO_BUCKET_NAME, object_name)
        except S3Error:
            raise HTTPException(status_code=404, detail="File not found in storage")
        
        # Generate session ID
        session_id = str(uuid.uuid4())
        
        # Create session metadata
        session_data = {
            "object_name": object_name,
            "original_filename": original_filename,
            "sheet_name": sheet_name,
            "created_at": time.time(),
            "last_accessed": time.time()
        }
        
        # Store session in Redis with TTL
        redis_client.setex(
            name=f"session:{session_id}",
            time=SESSION_TTL_SECONDS,
            value=json.dumps(session_data)
        )
        
        logger.info(f"[CREATE_SESSION] Created session {session_id} for {original_filename}")
        
        return {
            "session_id": session_id,
            "status": "success",
            "message": f"Session created for {original_filename}",
            "expires_in_seconds": SESSION_TTL_SECONDS
        }
        
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"[CREATE_SESSION] Error creating session: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error creating session: {str(e)}")

# Helper function to get session from Redis
def get_session_from_redis(session_id: str) -> Dict[str, Any]:
    """Retrieve session data from Redis"""
    try:
        session_key = f"session:{session_id}"
        session_data = redis_client.get(session_key)
        
        if not session_data:
            raise HTTPException(status_code=404, detail="Session not found or expired")
        
        # Update last accessed time
        session = json.loads(session_data)
        session["last_accessed"] = time.time()
        redis_client.setex(session_key, SESSION_TTL_SECONDS, json.dumps(session))
        
        return session
        
    except json.JSONDecodeError:
        raise HTTPException(status_code=500, detail="Invalid session data")
    except Exception as e:
        logger.error(f"Error retrieving session {session_id}: {str(e)}")
        raise HTTPException(status_code=500, detail="Error retrieving session")

@app.get("/sessions/{session_id}/info")
async def get_session_info(session_id: str):
    """
    Get information about a session including data profile
    """
    # Get session from Redis
    session_data = get_session_from_redis(session_id)
    
    try:
        # Create a stateless data engine instance for this request
        engine = DataQueryEngine(minio_client, redis_client)
        
        # Load the file from MinIO to get metadata
        result = engine.load_file(
            object_name=session_data["object_name"],
            original_filename=session_data["original_filename"],
            sheet_name=session_data.get("sheet_name")
        )
        
        if result["status"] == "error":
            raise HTTPException(status_code=400, detail=result["message"])
        
        # Get file info and data profile
        file_info = engine.get_file_info()
        data_profile = engine.get_data_profile()
        
        # Clean up the engine
        engine.close()
        
        return {
            "session_id": session_id,
            "file_name": session_data["original_filename"],
            "object_name": session_data["object_name"],
            "created_at": session_data["created_at"],
            "last_accessed": session_data["last_accessed"],
            "file_info": file_info,
            "data_profile": data_profile,
            "status": "session_active"
        }
        
    except Exception as e:
        logger.error(f"[SESSION_INFO] Error getting session info: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error getting session info: {str(e)}")

@app.post("/sessions/{session_id}/change-sheet")
async def change_sheet(session_id: str, sheet_name: str = Form(...)):
    """
    Change the active sheet in an Excel file - Stateless architecture implementation
    """
    # Get session from Redis
    session_data = get_session_from_redis(session_id)
    
    try:
        # Create a stateless data engine instance for this request
        engine = DataQueryEngine(minio_client, redis_client)
        
        # Load the file first to get available sheets
        result = engine.load_file(
            object_name=session_data["object_name"],
            original_filename=session_data["original_filename"],
            sheet_name=None  # Load default to get all sheets
        )
        
        if result["status"] == "error":
            engine.close()
            raise HTTPException(status_code=400, detail=result["message"])
        
        # Check if it's an Excel file
        file_extension = os.path.splitext(session_data["original_filename"])[1].lower()
        if file_extension not in ['.xlsx', '.xls', '.xlsm']:
            engine.close()
            raise HTTPException(status_code=400, detail="This operation is only valid for Excel files")
        
        # Get available sheets
        available_sheets = result.get("available_sheets", [])
        if sheet_name not in available_sheets:
            engine.close()
            raise HTTPException(status_code=400, detail=f"Sheet '{sheet_name}' not found. Available sheets: {available_sheets}")
        
        # Change to the requested sheet
        change_result = engine.change_sheet(sheet_name)
        
        if change_result["status"] == "error":
            engine.close()
            raise HTTPException(status_code=400, detail=change_result["message"])
        
        # Update session data in Redis with new sheet
        session_data["sheet_name"] = sheet_name
        session_data["last_accessed"] = time.time()
        redis_client.setex(
            name=f"session:{session_id}",
            time=SESSION_TTL_SECONDS,
            value=json.dumps(session_data)
        )
        
        # Get updated file info
        file_info = engine.get_file_info()
        data_profile = engine.get_data_profile()
        
        # Clean up the engine
        engine.close()
        
        return {
            "session_id": session_id,
            "file_name": session_data["original_filename"],
            "status": "success",
            "message": change_result["message"],
            "active_sheet": sheet_name,
            "available_sheets": available_sheets,
            "file_info": file_info,
            "data_profile": data_profile
        }
        
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"[CHANGE_SHEET] Error changing sheet: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error changing sheet: {str(e)}")

@app.post("/sessions/{session_id}/query")
async def query_data(session_id: str, query: Dict[str, Any]):
    """
    Process a natural language query - Stateless architecture implementation
    """
    # Get session from Redis
    session_data = get_session_from_redis(session_id)
    
    if "text" not in query:
        raise HTTPException(status_code=400, detail="Query text is required")
    
    try:
        # Create a stateless data engine instance for this request
        engine = DataQueryEngine(minio_client, redis_client)
        
        # Load the file from MinIO
        logger.info(f"[QUERY] Processing stateless query for session {session_id}: '{query['text'][:50]}...'")
        
        result = engine.load_file(
            object_name=session_data["object_name"],
            original_filename=session_data["original_filename"],
            sheet_name=session_data.get("sheet_name")
        )
        
        if result["status"] == "error":
            engine.close()
            raise HTTPException(status_code=400, detail=result["message"])
        
        # Process the query
        query_result = engine.process_query(query["text"])
        
        # Debug: Log the actual result structure (without full data array)
        log_result = query_result.copy()
        if "data" in log_result:
            log_result["data"] = f"<{len(query_result.get('data', []))} rows of data not logged>"
        logger.info(f"[QUERY] Query result structure: {json.dumps(log_result, default=str, indent=2)}")
        
        if query_result["status"] == "error":
            logger.error(f"Query processing error: {query_result['message']}")
            engine.close()
            raise HTTPException(status_code=400, detail=query_result["message"])
        
        # Clean up the engine
        engine.close()
        
        # Return response with enterprise enhancements
        response = {
            "status": query_result["status"],
            "type": query_result.get("type", "data"),
            "data": query_result.get("data", []),
            "row_count": query_result.get("row_count", 0),
            "column_count": query_result.get("column_count", 0),
            "sql": query_result.get("sql", None),
            "response": query_result.get("response", None),
            "analysis": query_result.get("analysis", None),
            "enterprise_metadata": {
                "tables_used": query_result.get("tables_used", []),
                "query_complexity": query_result.get("query_complexity", "simple"),
                "stateless_processing": True,
                "architecture_version": "phase_2_stateless"
            }
        }

        # Add strategic analysis fields if present
        if query_result["type"] == "strategic":
            response.update({
                "queries": query_result.get("queries", []),
                "metrics": query_result.get("metrics", []),
                "dimensions": query_result.get("dimensions", [])
            })

        return response
        
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"[QUERY] Unexpected error in stateless query processing: {str(e)}")
        raise HTTPException(
            status_code=500,
            detail="An unexpected error occurred while processing your query"
        )

@app.delete("/sessions/{session_id}")
async def delete_session(session_id: str):
    """
    Delete a session and clean up resources - Redis-based stateless version
    """
    # Verify session exists in Redis
    session_data = get_session_from_redis(session_id)
    
    # Delete session from Redis
    try:
        redis_client.delete(f"session:{session_id}")
        logger.info(f"[DELETE_SESSION] Session {session_id} deleted from Redis")
        
        # Note: In the stateless architecture, we don't need to clean up local files
        # as they are stored in MinIO. The file will remain in MinIO for potential
        # future access, and MinIO can handle its own cleanup policies.
        
        return {"status": "success", "message": "Session deleted successfully"}
        
    except Exception as e:
        logger.error(f"[DELETE_SESSION] Error deleting session {session_id}: {str(e)}")
        raise HTTPException(status_code=500, detail="Error deleting session")

def cleanup_session(session_id: str):
    """
    DEPRECATED - This function is no longer needed in stateless architecture
    Redis handles session cleanup automatically via TTL
    """
    logger.warning("cleanup_session() called but is deprecated in stateless architecture")
    pass

# Background task to clean up inactive sessions
@app.on_event("startup")
async def startup_event_old():
    """
    DEPRECATED - Old cleanup logic for stateful architecture
    In the new stateless architecture, Redis handles session cleanup via TTL
    """
    logger.info("Old startup event - Redis TTL handles session cleanup in stateless architecture")
    pass

if __name__ == "__main__":
    uvicorn.run("excel_service:app", host="0.0.0.0", port=5003, reload=False)