# services/schemas.py """ MongoDB Schemas for MasterLLM V3 Architecture Defines Pydantic models for all MongoDB collections: - sessions: Session metadata - messages: Message metadata with S3 references - pipelines: Pipeline execution metadata - workflows: Saved workflows - files: File metadata with S3 references """ from typing import Dict, Any, List, Optional from datetime import datetime from pydantic import BaseModel, Field from enum import Enum import uuid # ======================== # ENUMS # ======================== class SessionState(str, Enum): """Session states""" INITIAL = "initial" PIPELINE_PROPOSED = "pipeline_proposed" EXECUTING = "executing" IN_PROGRESS = "in_progress" # Pipeline is running COMPLETED = "completed" # Pipeline completed successfully FAILED = "failed" # Pipeline failed class PipelineStatus(str, Enum): """Pipeline execution states""" PROPOSED = "proposed" EXECUTING = "executing" COMPLETED = "completed" FAILED = "failed" class ComponentStatus(str, Enum): """Component execution states""" PENDING = "pending" EXECUTING = "executing" RUNNING = "running" # Alias for executing COMPLETED = "completed" SUCCESS = "success" # Alias for completed FAILED = "failed" class MessageRole(str, Enum): """Message roles""" USER = "user" ASSISTANT = "assistant" SYSTEM = "system" # ======================== # COMPONENT MODELS # ======================== class ComponentMetadata(BaseModel): """Component execution metadata within a pipeline""" component_id: str # Unique identifier for component component_name: str order: int status: ComponentStatus = ComponentStatus.PENDING started_at: Optional[datetime] = None completed_at: Optional[datetime] = None component_output: Optional[Any] = None # Output from component execution hasError: bool = False # Error flag error: Optional[Dict[str, Any]] = None # Error details {error_code, message, details} success_message: Optional[str] = None # Component-wise success message metadata: Optional[Dict[str, Any]] = None # Duration, tokens, etc. class SessionStats(BaseModel): """Session statistics""" message_count: int = 0 pipeline_run_count: int = 0 total_tokens_used: int = 0 # ======================== # COLLECTION SCHEMAS # ======================== class SessionSchema(BaseModel): """MongoDB schema for sessions collection""" session_id: str user_id: Optional[str] = None created_at: datetime last_activity: datetime state: SessionState = SessionState.INITIAL chat_name: Optional[str] = None chat_name_generated_at: Optional[datetime] = None chat_name_model: Optional[str] = None current_file_id: Optional[str] = None # Reference to files collection proposed_pipeline_id: Optional[str] = None # Reference to pipelines collection stats: SessionStats = Field(default_factory=SessionStats) metadata: Dict[str, Any] = Field(default_factory=dict) class Config: json_encoders = { datetime: lambda v: v.isoformat() + "Z" if v else None } class MessageSchema(BaseModel): """MongoDB schema for messages collection - METADATA ONLY""" message_id: str session_id: str role: MessageRole timestamp: datetime s3_key: str # Path to full message content in S3 s3_bucket: str content_preview: str # First 200 chars for quick display has_file: bool = False file_id: Optional[str] = None # Reference to files collection metadata: Dict[str, Any] = Field(default_factory=dict) class Config: json_encoders = { datetime: lambda v: v.isoformat() + "Z" if v else None } class PipelineSchema(BaseModel): """MongoDB schema for pipelines collection - METADATA ONLY""" execution_id: str session_id: str pipeline_name: str created_at: datetime created_from: str # "request" or "edit" created_by_message: str # User's message that triggered pipeline status: PipelineStatus pipeline_definition_s3_key: str # Full pipeline definition in S3 components: List[ComponentMetadata] # Component metadata only final_output_s3_key: Optional[str] = None # Contains ALL component outputs output_id: Optional[str] = None # ID for downloading output final_output_presigned_url: Optional[str] = None final_output_presigned_expires_at: Optional[datetime] = None executed_at: Optional[datetime] = None completed_at: Optional[datetime] = None executor: Optional[str] = None # "bedrock", "crewai", "langchain" result: Optional[str] = None # Renamed from result_preview - output if completed hasError: bool = False # Error flag error: Optional[Dict[str, Any]] = None # Error details metadata: Dict[str, Any] = Field(default_factory=dict) class Config: json_encoders = { datetime: lambda v: v.isoformat() + "Z" if v else None } class WorkflowSchema(BaseModel): """MongoDB schema for workflows collection - METADATA ONLY""" workflow_id: str session_id: str saved_at: datetime saved_by_user_message: str # Message when user confirmed save pipeline_definition_s3_key: str # Full workflow/pipeline definition in S3 pipeline_name: str pipeline_preview: str # "Extract text → Summarize → Translate" user_confirmed: bool = True # User explicitly confirmed save tags: List[str] = Field(default_factory=list) source_pipeline_id: Optional[str] = None # Pipeline ID this workflow came from pipeline_status: Optional[str] = None # Status when saved: "proposed", "completed" metadata: Dict[str, Any] = Field(default_factory=dict) class Config: json_encoders = { datetime: lambda v: v.isoformat() + "Z" if v else None } class FileSchema(BaseModel): """MongoDB schema for files collection - METADATA ONLY""" file_id: str session_id: str uploaded_at: datetime file_name: str file_size: int mime_type: str s3_bucket: str s3_key: str presigned_url: str # Cached, 7-day expiry presigned_expires_at: datetime metadata: Dict[str, Any] = Field(default_factory=dict) class Config: json_encoders = { datetime: lambda v: v.isoformat() + "Z" if v else None } # ======================== # S3 CONTENT SCHEMAS # ======================== class MessageS3Content(BaseModel): """Schema for message content stored in S3""" message_id: str role: str content: str timestamp: str file_data: Dict[str, Any] = Field(default_factory=dict) class ComponentOutputS3(BaseModel): """Schema for individual component output in final_output.json""" component_id: str name: str order: int status: str component_output: Optional[Any] = None hasError: bool = False error: Optional[Dict[str, Any]] = None success_message: Optional[str] = None metadata: Optional[Dict[str, Any]] = None result: Optional[Dict[str, Any]] = None class FinalOutputS3(BaseModel): """Schema for final_output.json in S3""" workflow_name: str workflow_status: str # "completed" or "failed" last_node_output: Optional[str] = None # Output from last successful node ONLY components: List[ComponentOutputS3] execution_time: float timestamp: str class PipelineDefinitionS3(BaseModel): """Schema for pipeline definition stored in S3""" pipeline_name: str pipeline_steps: List[Dict[str, Any]] # Component configurations target_lang: Optional[str] = None reason: str metadata: Dict[str, Any] = Field(default_factory=dict) _generator: str # "bedrock" or "gemini" _model: str created_at: str # ======================== # UTILITY FUNCTIONS # ======================== def generate_component_id(pipeline_id: str, index: int) -> str: """Generate component ID in format: pipeline_id_index""" return f"{pipeline_id}_{index}" def generate_message_id() -> str: """Generate unique message ID using MongoDB ObjectId format""" from bson import ObjectId return str(ObjectId()) def generate_output_id() -> str: """Generate unique output ID for downloads""" return f"output_{uuid.uuid4().hex[:12]}" def datetime_to_iso(dt: Optional[datetime]) -> Optional[str]: """Convert datetime to ISO string""" return dt.isoformat() + "Z" if dt else None def iso_to_datetime(iso_str: Optional[str]) -> Optional[datetime]: """Convert ISO string to datetime""" if not iso_str: return None # Remove 'Z' if present iso_str = iso_str.rstrip('Z') return datetime.fromisoformat(iso_str) def schema_to_dict(schema: BaseModel) -> Dict[str, Any]: """Convert Pydantic schema to dict with datetime serialization""" return schema.model_dump(mode='json') # ======================== # INDEX DEFINITIONS # ======================== # Define indexes for MongoDB collections COLLECTION_INDEXES = { "sessions": [ {"keys": [("session_id", 1)], "unique": True}, {"keys": [("last_activity", -1)]}, {"keys": [("user_id", 1)]}, ], "messages": [ {"keys": [("message_id", 1)], "unique": True}, {"keys": [("session_id", 1), ("timestamp", 1)]}, # Chronological retrieval ], "pipelines": [ {"keys": [("execution_id", 1)], "unique": True}, {"keys": [("session_id", 1), ("created_at", -1)]}, {"keys": [("status", 1)]}, ], "workflows": [ {"keys": [("workflow_id", 1)], "unique": True}, {"keys": [("saved_at", -1)]}, {"keys": [("session_id", 1)]}, ], "files": [ {"keys": [("file_id", 1)], "unique": True}, {"keys": [("session_id", 1)]}, ], } def create_indexes(db): """ Create all required indexes on MongoDB database Args: db: pymongo database instance """ for collection_name, indexes in COLLECTION_INDEXES.items(): collection = db[collection_name] for index_def in indexes: collection.create_index( index_def["keys"], unique=index_def.get("unique", False) ) print("✅ All MongoDB indexes created successfully")