from fastapi import FastAPI, WebSocket, UploadFile, File, HTTPException, WebSocketDisconnect from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel import os import json import uuid import atexit import signal import shutil from datetime import datetime from pathlib import Path from typing import Dict, List, Any, Optional import asyncio import logging from .config import settings from .rag_service import rag_service from .vision_processor import IntelligentMedicalProcessor logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI( title=settings.app_title, description=settings.app_description, version=settings.app_version, debug=settings.debug ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Stockage en mémoire upload_directory = Path(settings.upload_dir) upload_directory.mkdir(exist_ok=True) documents_store: Dict[str, Dict[str, Any]] = {} rag_chains: Dict[str, Any] = {} # WebSocket Manager class ConnectionManager: """Gestionnaire des connexions WebSocket pour les notifications temps réel""" def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): """Accepte et enregistre une nouvelle connexion WebSocket""" await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): """Supprime une connexion WebSocket de la liste active""" if websocket in self.active_connections: self.active_connections.remove(websocket) async def send_message(self, message: dict): """Envoie un message à tous les clients connectés""" for connection in self.active_connections[:]: try: await connection.send_text(json.dumps(message)) except Exception as e: logger.error(f"Erreur envoi WebSocket: {e}") self.active_connections.remove(connection) manager = ConnectionManager() # Configuration frontend frontend_build_path = Path(__file__).parent.parent.parent / "frontend" / "build" possible_paths = [ frontend_build_path, Path("/app/frontend/build"), Path(__file__).parent.parent / "frontend" / "build", ] frontend_build_path = None for path in possible_paths: if path.exists(): frontend_build_path = path break if frontend_build_path: static_path = frontend_build_path / "static" if static_path.exists(): app.mount("/static", StaticFiles(directory=static_path), name="static") # Modèles Pydantic class ChatRequest(BaseModel): question: str document_id: str = None class DocumentResponse(BaseModel): document_id: str filename: str status: str total_chunks: int = 0 class ChatResponse(BaseModel): response: str document_id: str sources: List[Dict] = [] # Routes WebSocket @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """Point d'entrée WebSocket pour les notifications temps réel""" await manager.connect(websocket) try: while True: await asyncio.sleep(1) except WebSocketDisconnect: manager.disconnect(websocket) # Routes API @app.get("/api/health") async def health_check(): """Vérification de l'état de santé de l'API et de ses composants""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "version": settings.app_version, "qdrant_status": "in-memory" if rag_service.qdrant_client else "disconnected", "anthropic_configured": bool(settings.anthropic_api_key), "openai_configured": bool(settings.openai_api_key), "storage_mode": "RAM (in-memory)" } @app.post("/api/upload", response_model=DocumentResponse) async def upload_document(file: UploadFile = File(...)): """Upload et validation d'un document médical (PDF, DOCX, images)""" file_extension = Path(file.filename).suffix.lower() if file_extension not in settings.allowed_extensions: raise HTTPException( status_code=400, detail=f"Type de fichier non supporté. Types autorisés: {settings.allowed_extensions}" ) if file.size > settings.max_file_size: raise HTTPException( status_code=400, detail=f"Fichier trop volumineux. Taille max: {settings.max_file_size // (1024*1024)}MB" ) document_id = str(uuid.uuid4()) file_path = upload_directory / f"{document_id}_{file.filename}" try: with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) documents_store[document_id] = { "document_id": document_id, "filename": file.filename, "file_path": str(file_path), "status": "uploaded", "upload_time": datetime.now().isoformat(), "file_size": file.size } await manager.send_message({ "type": "upload_success", "message": f"Document uploadé: {file.filename}", "document_id": document_id, "timestamp": datetime.now().strftime("%H:%M:%S") }) return DocumentResponse( document_id=document_id, filename=file.filename, status="uploaded" ) except Exception as e: logger.error(f"Erreur upload: {e}") raise HTTPException(status_code=500, detail="Erreur lors de l'upload") @app.post("/api/analyze/{document_id}", response_model=DocumentResponse) async def analyze_document(document_id: str): """Analyse un document avec Claude Vision et création du vectorstore Qdrant""" if document_id not in documents_store: raise HTTPException(status_code=404, detail="Document non trouvé") document = documents_store[document_id] file_path = document["file_path"] if not settings.anthropic_api_key or not settings.openai_api_key: raise HTTPException( status_code=500, detail="Clés API manquantes (Anthropic et OpenAI requises)" ) try: await manager.send_message({ "type": "analysis_start", "message": "Analyse visuelle en cours...", "document_id": document_id, "timestamp": datetime.now().strftime("%H:%M:%S") }) async def progress_callback(message: str, level: str = "info", details: dict = None): """Callback pour envoyer les notifications de progression via WebSocket""" await manager.send_message({ "type": "debug", "level": level, "message": message, "details": details, "document_id": document_id, "timestamp": datetime.now().strftime("%H:%M:%S") }) result = await rag_service.process_document(file_path, document_id, progress_callback) rag_chain = rag_service.create_rag_chain(result["collection_name"]) rag_chains[document_id] = rag_chain documents_store[document_id].update({ "status": "ready", "collection_name": result["collection_name"], "total_chunks": result["total_chunks"], "analysis_time": datetime.now().isoformat() }) await manager.send_message({ "type": "analysis_complete", "message": f"Analyse terminée: {result['total_chunks']} sections analysées", "document_id": document_id, "timestamp": datetime.now().strftime("%H:%M:%S") }) return DocumentResponse( document_id=document_id, filename=document["filename"], status="ready", total_chunks=result["total_chunks"] ) except Exception as e: error_msg = f"Erreur lors de l'analyse: {str(e)}" logger.error(error_msg) await manager.send_message({ "type": "error", "message": f"ERREUR: {error_msg}", "document_id": document_id, "timestamp": datetime.now().strftime("%H:%M:%S") }) documents_store[document_id]["status"] = "error" raise HTTPException(status_code=500, detail=error_msg) @app.post("/api/chat", response_model=ChatResponse) async def chat_endpoint(request: ChatRequest): """Génère une réponse médicale basée sur le contenu du document analysé""" if not request.document_id: raise HTTPException(status_code=400, detail="document_id requis") if request.document_id not in documents_store: raise HTTPException(status_code=404, detail="Document non trouvé") if request.document_id not in rag_chains: raise HTTPException(status_code=400, detail="Document non analysé") document = documents_store[request.document_id] if document["status"] != "ready": raise HTTPException(status_code=400, detail="Document pas prêt pour le chat") # Debug: Afficher la question posée logger.info("NOUVELLE QUESTION RAG:") logger.info("="*100) logger.info(f" Question: {request.question}") logger.info(f" Document ID: {request.document_id}") logger.info(f" Document: {document.get('filename', 'N/A')}") logger.info("="*100) # Envoyer la question à la console de debug await manager.send_message({ "type": "debug", "level": "rag", "message": f"Question RAG: {request.question}", "details": { "document_id": request.document_id, "filename": document.get('filename', 'N/A'), "query": request.question }, "timestamp": datetime.now().strftime("%H:%M:%S") }) try: rag_chain = rag_chains[request.document_id] logger.info("DÉMARRAGE DU PIPELINE RAG...") # Envoyer le début du pipeline RAG await manager.send_message({ "type": "debug", "level": "rag", "message": "Démarrage pipeline RAG - Recherche de chunks similaires...", "document_id": request.document_id, "timestamp": datetime.now().strftime("%H:%M:%S") }) # Récupérer les chunks similaires pour debug collection_name = document.get("collection_name") if collection_name: similar_chunks = rag_service.search_similar_documents( request.question, collection_name, settings.retrieval_k ) # Envoyer les résultats de recherche à la console de debug await manager.send_message({ "type": "debug", "level": "rag", "message": f"Chunks trouvés: {len(similar_chunks)}/{settings.retrieval_k}", "details": { "query": request.question, "collection": collection_name, "chunks_found": len(similar_chunks), "chunks_expected": settings.retrieval_k }, "timestamp": datetime.now().strftime("%H:%M:%S") }) # Envoyer chaque chunk récupéré avec son score for i, chunk in enumerate(similar_chunks): await manager.send_message({ "type": "debug", "level": "rag", "message": f"Chunk {i+1}: Score {chunk['similarity_score']:.4f}", "details": { "chunk_index": i + 1, "similarity_score": chunk['similarity_score'], "metadata": chunk['metadata'], "content_preview": chunk['content'][:200] + "..." if len(chunk['content']) > 200 else chunk['content'] }, "timestamp": datetime.now().strftime("%H:%M:%S") }) response = rag_chain.invoke(request.question) logger.info("RÉPONSE RAG GÉNÉRÉE:") logger.info(f" Longueur réponse: {len(response)} caractères") logger.info(f" Début réponse: {response[:200]}{'...' if len(response) > 200 else ''}") logger.info("="*100) # Envoyer la réponse générée à la console de debug await manager.send_message({ "type": "debug", "level": "success", "message": f"Réponse générée: {len(response)} caractères", "details": { "response_length": len(response), "response_preview": response[:200] + "..." if len(response) > 200 else response }, "timestamp": datetime.now().strftime("%H:%M:%S") }) return ChatResponse( response=response, document_id=request.document_id, sources=[] ) except Exception as e: error_msg = f"Erreur génération réponse: {str(e)}" logger.error(error_msg) raise HTTPException(status_code=500, detail=error_msg) @app.post("/api/chat/stream") async def chat_stream_endpoint(request: ChatRequest): """Génère une réponse médicale en streaming basée sur le contenu du document analysé""" if not request.document_id: raise HTTPException(status_code=400, detail="document_id requis") if request.document_id not in documents_store: raise HTTPException(status_code=404, detail="Document non trouvé") if request.document_id not in rag_chains: raise HTTPException(status_code=400, detail="Document non analysé") document = documents_store[request.document_id] if document["status"] != "ready": raise HTTPException(status_code=400, detail="Document pas prêt pour le chat") # Debug: Afficher la question posée pour le streaming logger.info("NOUVELLE QUESTION RAG STREAMING:") logger.info("="*100) logger.info(f" Question: {request.question}") logger.info(f" Document ID: {request.document_id}") logger.info(f" Document: {document.get('filename', 'N/A')}") logger.info("="*100) # Envoyer la question à la console de debug await manager.send_message({ "type": "debug", "level": "rag", "message": f"Question RAG Streaming: {request.question}", "details": { "document_id": request.document_id, "filename": document.get('filename', 'N/A'), "query": request.question, "mode": "streaming" }, "timestamp": datetime.now().strftime("%H:%M:%S") }) # Récupérer et envoyer les chunks similaires pour debug collection_name = document.get("collection_name") if collection_name: similar_chunks = rag_service.search_similar_documents( request.question, collection_name, settings.retrieval_k ) # Envoyer les résultats de recherche à la console de debug await manager.send_message({ "type": "debug", "level": "rag", "message": f"Chunks trouvés (streaming): {len(similar_chunks)}/{settings.retrieval_k}", "details": { "query": request.question, "collection": collection_name, "chunks_found": len(similar_chunks), "chunks_expected": settings.retrieval_k, "mode": "streaming" }, "timestamp": datetime.now().strftime("%H:%M:%S") }) # Envoyer chaque chunk récupéré avec son score for i, chunk in enumerate(similar_chunks): await manager.send_message({ "type": "debug", "level": "rag", "message": f"Chunk {i+1}: Score {chunk['similarity_score']:.4f} (streaming)", "details": { "chunk_index": i + 1, "similarity_score": chunk['similarity_score'], "metadata": chunk['metadata'], "content_preview": chunk['content'][:200] + "..." if len(chunk['content']) > 200 else chunk['content'], "mode": "streaming" }, "timestamp": datetime.now().strftime("%H:%M:%S") }) async def generate_response(): """Générateur pour le streaming de la réponse""" try: rag_chain = rag_chains[request.document_id] # Envoyer un message de démarrage yield f"data: {json.dumps({'type': 'start', 'message': 'Génération de la réponse...', 'document_id': request.document_id})}\n\n" try: # Essayer d'abord le streaming natif response_chunks = [] async for chunk in rag_chain.astream(request.question): if chunk: response_chunks.append(str(chunk)) chunk_data = { 'type': 'chunk', 'content': str(chunk), 'document_id': request.document_id } yield f"data: {json.dumps(chunk_data)}\n\n" await asyncio.sleep(0.01) # Petit délai pour fluidité except Exception as streaming_error: # Fallback : récupérer la réponse complète et la streamer mot par mot logger.warning(f"Streaming natif échoué, fallback vers simulation: {streaming_error}") full_response = rag_chain.invoke(request.question) words = full_response.split() for i, word in enumerate(words): chunk_data = { 'type': 'chunk', 'content': word + (" " if i < len(words) - 1 else ""), 'document_id': request.document_id } yield f"data: {json.dumps(chunk_data)}\n\n" await asyncio.sleep(0.05) # Simuler le streaming # Message de fin yield f"data: {json.dumps({'type': 'end', 'message': 'Réponse terminée', 'document_id': request.document_id})}\n\n" except Exception as e: error_msg = f"Erreur génération réponse: {str(e)}" logger.error(error_msg) error_data = { 'type': 'error', 'message': error_msg, 'document_id': request.document_id } yield f"data: {json.dumps(error_data)}\n\n" return StreamingResponse( generate_response(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", "X-Accel-Buffering": "no", # Nginx : pas de buffering } ) @app.get("/api/documents") async def list_documents(): """Retourne la liste de tous les documents uploadés avec leurs métadonnées""" return { "documents": list(documents_store.values()), "total": len(documents_store) } @app.get("/api/documents/{document_id}") async def get_document(document_id: str): """Récupère les détails complets d'un document spécifique""" if document_id not in documents_store: raise HTTPException(status_code=404, detail="Document non trouvé") document = documents_store[document_id] if "collection_name" in document: collection_info = rag_service.get_collection_info(document["collection_name"]) document["collection_info"] = collection_info return document @app.delete("/api/documents/{document_id}") async def delete_document(document_id: str): """Supprime définitivement un document et sa collection vectorielle associée""" if document_id not in documents_store: raise HTTPException(status_code=404, detail="Document non trouvé") document = documents_store[document_id] try: file_path = Path(document["file_path"]) if file_path.exists(): file_path.unlink() if "collection_name" in document: rag_service.delete_collection(document["collection_name"]) if document_id in rag_chains: del rag_chains[document_id] del documents_store[document_id] return {"message": "Document supprimé avec succès"} except Exception as e: logger.error(f"Erreur suppression document {document_id}: {e}") raise HTTPException(status_code=500, detail="Erreur lors de la suppression") @app.post("/api/cleanup") async def manual_cleanup(): """Nettoie manuellement tous les documents de la mémoire et du disque""" try: docs_count = len(documents_store) collections_count = len([doc for doc in documents_store.values() if "collection_name" in doc]) cleanup_memory() await manager.send_message({ "type": "cleanup", "message": f"Nettoyage manuel effectué: {docs_count} documents supprimés", "timestamp": datetime.now().strftime("%H:%M:%S") }) return { "message": "Nettoyage de la mémoire effectué avec succès", "documents_cleaned": docs_count, "collections_cleaned": collections_count } except Exception as e: logger.error(f"Erreur lors du nettoyage manuel: {e}") raise HTTPException(status_code=500, detail="Erreur lors du nettoyage") # Routes frontend @app.get("/") async def serve_react_app(): """Sert l'interface React du frontend ou une page d'erreur si indisponible""" if frontend_build_path: index_path = frontend_build_path / "index.html" if index_path.exists(): return HTMLResponse(content=index_path.read_text(), status_code=200) return HTMLResponse(""" RAG CHU - Erreur Frontend

Frontend React non trouvé

Le frontend n'a pas été correctement buildé.

Documentation API

""") @app.get("/{path:path}") async def serve_spa(path: str): """Gère le routage SPA en servant index.html pour toutes les routes non-API""" if path.startswith("api/") or path.startswith("docs") or path.startswith("ws"): raise HTTPException(status_code=404, detail="Route API non trouvée") if frontend_build_path: index_path = frontend_build_path / "index.html" if index_path.exists(): return HTMLResponse(content=index_path.read_text(), status_code=200) raise HTTPException(status_code=404, detail="Frontend non disponible") def cleanup_memory(): """Nettoie tous les documents de la mémoire et du disque lors de l'arrêt""" logger.info("Début du nettoyage de la mémoire...") try: docs_to_clean = list(documents_store.items()) cleaned_files = 0 cleaned_collections = 0 for document_id, document in docs_to_clean: try: if "file_path" in document: file_path = Path(document["file_path"]) if file_path.exists(): file_path.unlink() cleaned_files += 1 if "collection_name" in document: if rag_service.delete_collection(document["collection_name"]): cleaned_collections += 1 if document_id in rag_chains: del rag_chains[document_id] except Exception as e: logger.error(f"Erreur nettoyage document {document_id}: {e}") documents_store.clear() rag_chains.clear() logger.info(f"Nettoyage terminé: {cleaned_files} fichiers et {cleaned_collections} collections supprimés") except Exception as e: logger.error(f"Erreur lors du nettoyage: {e}") def signal_handler(signum, frame): """Gestionnaire de signaux système pour un arrêt propre de l'application""" logger.info(f"Signal reçu ({signum}), arrêt de l'application...") cleanup_memory() exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) atexit.register(cleanup_memory) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)