ClariDoc / app /core /session_manager.py
Kshitijk20's picture
code push
e5b884f
import uuid
from typing import Dict, Optional
from datetime import datetime, timedelta
from app.services.RAG_service import RAGService
from app.database.database import SessionDatabase
from app.schemas.request_models import DocumentTypeSchema
class Session:
def __init__(self, session_id:str):
self.session_id = session_id
self.created_at = datetime.now()
self.last_activity = datetime.now()
self.rag_service : Optional[RAGService] = None
self.document_uploaded = False
self.vector_store_created = False
self.document_info = {}
self.username = None
def update_activity(self):
self.last_activity = datetime.now()
def is_expired(self, timeout_minutes: int = 60) -> bool:
return datetime.now() - self.last_activity > timedelta(minutes=timeout_minutes)
class SessionManager:
def __init__(self):
self.sessions: Dict[str, Session] = {}
self.db = SessionDatabase()
def create_session(self, username: str = "anonymous") -> str:
session_id = str(uuid.uuid4())
session = Session(session_id)
session.username = username
self.sessions[session_id] = session
# Save to database
self.db.create_session(session_id, username)
return session_id
def get_session(self, session_id: str) -> Optional[Session]:
# First check in-memory sessions
if session_id in self.sessions:
session = self.sessions[session_id]
if not session.is_expired():
session.update_activity()
return session
else:
del self.sessions[session_id]
# If not in memory, try to restore from database
db_session = self.db.get_session(session_id)
if db_session and db_session['is_active']:
# Restore session to memory
session = Session(session_id)
session.username = db_session['username']
session.document_uploaded = db_session['chunks_count'] > 0
session.vector_store_created = db_session['pinecone_index'] is not None
session.document_info = {
'filename': db_session['document_name'],
'type': db_session['document_type'],
'chunks_count': db_session['chunks_count']
}
# Initialize RAG service if document exists (same as restore_session)
if session.vector_store_created:
print(f"[SessionManager] Restoring RAG service for session {session_id}")
session.rag_service = RAGService()
# Set the basic attributes
session.rag_service.index = db_session['pinecone_index']
session.rag_service.namespace = db_session['pinecone_namespace']
session.rag_service.Document_path = db_session['document_path']
session.rag_service.url = db_session['document_url']
# Create a mock splitter with the keywords file path for restored sessions
from app.ingestion.text_splitter import splitting_text
from app.utils.metadata_utils import MetadataService
# Recreate the document type information
metadataservice = MetadataService()
mock_scheme = DocumentTypeSchema(document_types="Insurance") # Default for restored sessions
document_type = metadataservice.Return_document_model(mock_scheme)
session.rag_service.DocumentTypeScheme = mock_scheme
session.rag_service.Document_Type = document_type
# Create splitter instance to maintain the keywords file path
session.rag_service.splitter = splitting_text(documentTypeSchema=document_type, llm=session.rag_service.llm)
# Generate the expected keywords file path based on document name
import os
document_name = db_session['document_name'] or 'unknown'
keywords_filename = document_name.replace(".", "").replace("\\", "").replace("/", "") + ".json"
session.rag_service.splitter.Keywordsfile_path = os.path.join("app/data/", keywords_filename)
print(f"[SessionManager] RAG service restored with index: {session.rag_service.index}")
self.sessions[session_id] = session
return session
return None
def update_session_document(self, session_id: str, document_name: str,
document_type: str, chunks_count: int,
pinecone_index: str = None, pinecone_namespace: str = None,
document_path: str = None, document_url: str = None):
"""Update session with document information"""
self.db.update_session(
session_id,
document_name=document_name,
document_type=document_type,
chunks_count=chunks_count,
pinecone_index=pinecone_index,
pinecone_namespace=pinecone_namespace,
document_path=document_path,
document_url=document_url
)
# Update in-memory session if exists
if session_id in self.sessions:
session = self.sessions[session_id]
session.document_uploaded = True
session.vector_store_created = pinecone_index is not None
session.document_info = {
'filename': document_name,
'type': document_type,
'chunks_count': chunks_count
}
def get_user_sessions(self, username: str):
"""Get all sessions for a user"""
return self.db.get_user_sessions(username)
def restore_session(self, session_id: str) -> bool:
"""Restore a session from database"""
db_session = self.db.get_session(session_id)
if db_session and db_session['is_active']:
session = Session(session_id)
session.username = db_session['username']
session.document_uploaded = db_session['chunks_count'] > 0
session.vector_store_created = db_session['pinecone_index'] is not None
session.document_info = {
'filename': db_session['document_name'],
'type': db_session['document_type'],
'chunks_count': db_session['chunks_count']
}
# Initialize RAG service if document exists
if session.vector_store_created:
print(f"[SessionManager] Restoring RAG service for session {session_id}")
session.rag_service = RAGService()
# Set the basic attributes
session.rag_service.index = db_session['pinecone_index']
session.rag_service.namespace = db_session['pinecone_namespace']
session.rag_service.Document_path = db_session['document_path']
session.rag_service.url = db_session['document_url']
# Create a mock splitter with the keywords file path for restored sessions
from app.ingestion.text_splitter import splitting_text
from app.utils.metadata_utils import MetadataService
# Recreate the document type information
metadataservice = MetadataService()
mock_scheme = DocumentTypeSchema(document_types="Insurance") # Default for restored sessions
document_type = metadataservice.Return_document_model(mock_scheme)
session.rag_service.DocumentTypeScheme = mock_scheme
session.rag_service.Document_Type = document_type
# Create splitter instance to maintain the keywords file path
session.rag_service.splitter = splitting_text(documentTypeSchema=document_type, llm=session.rag_service.llm)
# Generate the expected keywords file path based on document name
import os
document_name = db_session['document_name'] or 'unknown'
keywords_filename = document_name.replace(".", "").replace("\\", "").replace("/", "") + ".json"
session.rag_service.splitter.Keywordsfile_path = os.path.join("app/data/", keywords_filename)
print(f"[SessionManager] RAG service restored with index: {session.rag_service.index}")
self.sessions[session_id] = session
return True
return False
def delete_session(self, session_id:str):
if session_id in self.sessions:
del self.sessions[session_id]
self.db.deactivate_session(session_id)
def cleanup_expired_sessions(self):
expired_sessions = [
sid for sid, session in self.sessions.items()
if session.is_expired()
]
for sid in expired_sessions:
del self.sessions[sid]
session_manager = SessionManager()