Spaces:
Sleeping
Sleeping
| import uuid | |
| from typing import Dict, Optional | |
| from datetime import datetime, timedelta | |
| from app.services.RAG_service import RAGService | |
| from app.database.database import SessionDatabase | |
| from app.schemas.request_models import DocumentTypeSchema | |
| class Session: | |
| def __init__(self, session_id:str): | |
| self.session_id = session_id | |
| self.created_at = datetime.now() | |
| self.last_activity = datetime.now() | |
| self.rag_service : Optional[RAGService] = None | |
| self.document_uploaded = False | |
| self.vector_store_created = False | |
| self.document_info = {} | |
| self.username = None | |
| def update_activity(self): | |
| self.last_activity = datetime.now() | |
| def is_expired(self, timeout_minutes: int = 60) -> bool: | |
| return datetime.now() - self.last_activity > timedelta(minutes=timeout_minutes) | |
| class SessionManager: | |
| def __init__(self): | |
| self.sessions: Dict[str, Session] = {} | |
| self.db = SessionDatabase() | |
| def create_session(self, username: str = "anonymous") -> str: | |
| session_id = str(uuid.uuid4()) | |
| session = Session(session_id) | |
| session.username = username | |
| self.sessions[session_id] = session | |
| # Save to database | |
| self.db.create_session(session_id, username) | |
| return session_id | |
| def get_session(self, session_id: str) -> Optional[Session]: | |
| # First check in-memory sessions | |
| if session_id in self.sessions: | |
| session = self.sessions[session_id] | |
| if not session.is_expired(): | |
| session.update_activity() | |
| return session | |
| else: | |
| del self.sessions[session_id] | |
| # If not in memory, try to restore from database | |
| db_session = self.db.get_session(session_id) | |
| if db_session and db_session['is_active']: | |
| # Restore session to memory | |
| session = Session(session_id) | |
| session.username = db_session['username'] | |
| session.document_uploaded = db_session['chunks_count'] > 0 | |
| session.vector_store_created = db_session['pinecone_index'] is not None | |
| session.document_info = { | |
| 'filename': db_session['document_name'], | |
| 'type': db_session['document_type'], | |
| 'chunks_count': db_session['chunks_count'] | |
| } | |
| # Initialize RAG service if document exists (same as restore_session) | |
| if session.vector_store_created: | |
| print(f"[SessionManager] Restoring RAG service for session {session_id}") | |
| session.rag_service = RAGService() | |
| # Set the basic attributes | |
| session.rag_service.index = db_session['pinecone_index'] | |
| session.rag_service.namespace = db_session['pinecone_namespace'] | |
| session.rag_service.Document_path = db_session['document_path'] | |
| session.rag_service.url = db_session['document_url'] | |
| # Create a mock splitter with the keywords file path for restored sessions | |
| from app.ingestion.text_splitter import splitting_text | |
| from app.utils.metadata_utils import MetadataService | |
| # Recreate the document type information | |
| metadataservice = MetadataService() | |
| mock_scheme = DocumentTypeSchema(document_types="Insurance") # Default for restored sessions | |
| document_type = metadataservice.Return_document_model(mock_scheme) | |
| session.rag_service.DocumentTypeScheme = mock_scheme | |
| session.rag_service.Document_Type = document_type | |
| # Create splitter instance to maintain the keywords file path | |
| session.rag_service.splitter = splitting_text(documentTypeSchema=document_type, llm=session.rag_service.llm) | |
| # Generate the expected keywords file path based on document name | |
| import os | |
| document_name = db_session['document_name'] or 'unknown' | |
| keywords_filename = document_name.replace(".", "").replace("\\", "").replace("/", "") + ".json" | |
| session.rag_service.splitter.Keywordsfile_path = os.path.join("app/data/", keywords_filename) | |
| print(f"[SessionManager] RAG service restored with index: {session.rag_service.index}") | |
| self.sessions[session_id] = session | |
| return session | |
| return None | |
| def update_session_document(self, session_id: str, document_name: str, | |
| document_type: str, chunks_count: int, | |
| pinecone_index: str = None, pinecone_namespace: str = None, | |
| document_path: str = None, document_url: str = None): | |
| """Update session with document information""" | |
| self.db.update_session( | |
| session_id, | |
| document_name=document_name, | |
| document_type=document_type, | |
| chunks_count=chunks_count, | |
| pinecone_index=pinecone_index, | |
| pinecone_namespace=pinecone_namespace, | |
| document_path=document_path, | |
| document_url=document_url | |
| ) | |
| # Update in-memory session if exists | |
| if session_id in self.sessions: | |
| session = self.sessions[session_id] | |
| session.document_uploaded = True | |
| session.vector_store_created = pinecone_index is not None | |
| session.document_info = { | |
| 'filename': document_name, | |
| 'type': document_type, | |
| 'chunks_count': chunks_count | |
| } | |
| def get_user_sessions(self, username: str): | |
| """Get all sessions for a user""" | |
| return self.db.get_user_sessions(username) | |
| def restore_session(self, session_id: str) -> bool: | |
| """Restore a session from database""" | |
| db_session = self.db.get_session(session_id) | |
| if db_session and db_session['is_active']: | |
| session = Session(session_id) | |
| session.username = db_session['username'] | |
| session.document_uploaded = db_session['chunks_count'] > 0 | |
| session.vector_store_created = db_session['pinecone_index'] is not None | |
| session.document_info = { | |
| 'filename': db_session['document_name'], | |
| 'type': db_session['document_type'], | |
| 'chunks_count': db_session['chunks_count'] | |
| } | |
| # Initialize RAG service if document exists | |
| if session.vector_store_created: | |
| print(f"[SessionManager] Restoring RAG service for session {session_id}") | |
| session.rag_service = RAGService() | |
| # Set the basic attributes | |
| session.rag_service.index = db_session['pinecone_index'] | |
| session.rag_service.namespace = db_session['pinecone_namespace'] | |
| session.rag_service.Document_path = db_session['document_path'] | |
| session.rag_service.url = db_session['document_url'] | |
| # Create a mock splitter with the keywords file path for restored sessions | |
| from app.ingestion.text_splitter import splitting_text | |
| from app.utils.metadata_utils import MetadataService | |
| # Recreate the document type information | |
| metadataservice = MetadataService() | |
| mock_scheme = DocumentTypeSchema(document_types="Insurance") # Default for restored sessions | |
| document_type = metadataservice.Return_document_model(mock_scheme) | |
| session.rag_service.DocumentTypeScheme = mock_scheme | |
| session.rag_service.Document_Type = document_type | |
| # Create splitter instance to maintain the keywords file path | |
| session.rag_service.splitter = splitting_text(documentTypeSchema=document_type, llm=session.rag_service.llm) | |
| # Generate the expected keywords file path based on document name | |
| import os | |
| document_name = db_session['document_name'] or 'unknown' | |
| keywords_filename = document_name.replace(".", "").replace("\\", "").replace("/", "") + ".json" | |
| session.rag_service.splitter.Keywordsfile_path = os.path.join("app/data/", keywords_filename) | |
| print(f"[SessionManager] RAG service restored with index: {session.rag_service.index}") | |
| self.sessions[session_id] = session | |
| return True | |
| return False | |
| def delete_session(self, session_id:str): | |
| if session_id in self.sessions: | |
| del self.sessions[session_id] | |
| self.db.deactivate_session(session_id) | |
| def cleanup_expired_sessions(self): | |
| expired_sessions = [ | |
| sid for sid, session in self.sessions.items() | |
| if session.is_expired() | |
| ] | |
| for sid in expired_sessions: | |
| del self.sessions[sid] | |
| session_manager = SessionManager() | |