Spaces:
Sleeping
Sleeping
| # services/schemas.py | |
| """ | |
| MongoDB Schemas for MasterLLM V3 Architecture | |
| Defines Pydantic models for all MongoDB collections: | |
| - sessions: Session metadata | |
| - messages: Message metadata with S3 references | |
| - pipelines: Pipeline execution metadata | |
| - workflows: Saved workflows | |
| - files: File metadata with S3 references | |
| """ | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| from pydantic import BaseModel, Field | |
| from enum import Enum | |
| import uuid | |
| # ======================== | |
| # ENUMS | |
| # ======================== | |
| class SessionState(str, Enum): | |
| """Session states""" | |
| INITIAL = "initial" | |
| PIPELINE_PROPOSED = "pipeline_proposed" | |
| EXECUTING = "executing" | |
| IN_PROGRESS = "in_progress" # Pipeline is running | |
| COMPLETED = "completed" # Pipeline completed successfully | |
| FAILED = "failed" # Pipeline failed | |
| class PipelineStatus(str, Enum): | |
| """Pipeline execution states""" | |
| PROPOSED = "proposed" | |
| EXECUTING = "executing" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| class ComponentStatus(str, Enum): | |
| """Component execution states""" | |
| PENDING = "pending" | |
| EXECUTING = "executing" | |
| RUNNING = "running" # Alias for executing | |
| COMPLETED = "completed" | |
| SUCCESS = "success" # Alias for completed | |
| FAILED = "failed" | |
| class MessageRole(str, Enum): | |
| """Message roles""" | |
| USER = "user" | |
| ASSISTANT = "assistant" | |
| SYSTEM = "system" | |
| # ======================== | |
| # COMPONENT MODELS | |
| # ======================== | |
| class ComponentMetadata(BaseModel): | |
| """Component execution metadata within a pipeline""" | |
| component_id: str # Unique identifier for component | |
| component_name: str | |
| order: int | |
| status: ComponentStatus = ComponentStatus.PENDING | |
| started_at: Optional[datetime] = None | |
| completed_at: Optional[datetime] = None | |
| component_output: Optional[Any] = None # Output from component execution | |
| hasError: bool = False # Error flag | |
| error: Optional[Dict[str, Any]] = None # Error details {error_code, message, details} | |
| success_message: Optional[str] = None # Component-wise success message | |
| metadata: Optional[Dict[str, Any]] = None # Duration, tokens, etc. | |
| class SessionStats(BaseModel): | |
| """Session statistics""" | |
| message_count: int = 0 | |
| pipeline_run_count: int = 0 | |
| total_tokens_used: int = 0 | |
| # ======================== | |
| # COLLECTION SCHEMAS | |
| # ======================== | |
| class SessionSchema(BaseModel): | |
| """MongoDB schema for sessions collection""" | |
| session_id: str | |
| user_id: Optional[str] = None | |
| created_at: datetime | |
| last_activity: datetime | |
| state: SessionState = SessionState.INITIAL | |
| chat_name: Optional[str] = None | |
| chat_name_generated_at: Optional[datetime] = None | |
| chat_name_model: Optional[str] = None | |
| current_file_id: Optional[str] = None # Reference to files collection | |
| proposed_pipeline_id: Optional[str] = None # Reference to pipelines collection | |
| stats: SessionStats = Field(default_factory=SessionStats) | |
| metadata: Dict[str, Any] = Field(default_factory=dict) | |
| class Config: | |
| json_encoders = { | |
| datetime: lambda v: v.isoformat() + "Z" if v else None | |
| } | |
| class MessageSchema(BaseModel): | |
| """MongoDB schema for messages collection - METADATA ONLY""" | |
| message_id: str | |
| session_id: str | |
| role: MessageRole | |
| timestamp: datetime | |
| s3_key: str # Path to full message content in S3 | |
| s3_bucket: str | |
| content_preview: str # First 200 chars for quick display | |
| has_file: bool = False | |
| file_id: Optional[str] = None # Reference to files collection | |
| metadata: Dict[str, Any] = Field(default_factory=dict) | |
| class Config: | |
| json_encoders = { | |
| datetime: lambda v: v.isoformat() + "Z" if v else None | |
| } | |
| class PipelineSchema(BaseModel): | |
| """MongoDB schema for pipelines collection - METADATA ONLY""" | |
| execution_id: str | |
| session_id: str | |
| pipeline_name: str | |
| created_at: datetime | |
| created_from: str # "request" or "edit" | |
| created_by_message: str # User's message that triggered pipeline | |
| status: PipelineStatus | |
| pipeline_definition_s3_key: str # Full pipeline definition in S3 | |
| components: List[ComponentMetadata] # Component metadata only | |
| final_output_s3_key: Optional[str] = None # Contains ALL component outputs | |
| output_id: Optional[str] = None # ID for downloading output | |
| final_output_presigned_url: Optional[str] = None | |
| final_output_presigned_expires_at: Optional[datetime] = None | |
| executed_at: Optional[datetime] = None | |
| completed_at: Optional[datetime] = None | |
| executor: Optional[str] = None # "bedrock", "crewai", "langchain" | |
| result: Optional[str] = None # Renamed from result_preview - output if completed | |
| hasError: bool = False # Error flag | |
| error: Optional[Dict[str, Any]] = None # Error details | |
| metadata: Dict[str, Any] = Field(default_factory=dict) | |
| class Config: | |
| json_encoders = { | |
| datetime: lambda v: v.isoformat() + "Z" if v else None | |
| } | |
| class WorkflowSchema(BaseModel): | |
| """MongoDB schema for workflows collection - METADATA ONLY""" | |
| workflow_id: str | |
| session_id: str | |
| saved_at: datetime | |
| saved_by_user_message: str # Message when user confirmed save | |
| pipeline_definition_s3_key: str # Full workflow/pipeline definition in S3 | |
| pipeline_name: str | |
| pipeline_preview: str # "Extract text → Summarize → Translate" | |
| user_confirmed: bool = True # User explicitly confirmed save | |
| tags: List[str] = Field(default_factory=list) | |
| source_pipeline_id: Optional[str] = None # Pipeline ID this workflow came from | |
| pipeline_status: Optional[str] = None # Status when saved: "proposed", "completed" | |
| metadata: Dict[str, Any] = Field(default_factory=dict) | |
| class Config: | |
| json_encoders = { | |
| datetime: lambda v: v.isoformat() + "Z" if v else None | |
| } | |
| class FileSchema(BaseModel): | |
| """MongoDB schema for files collection - METADATA ONLY""" | |
| file_id: str | |
| session_id: str | |
| uploaded_at: datetime | |
| file_name: str | |
| file_size: int | |
| mime_type: str | |
| s3_bucket: str | |
| s3_key: str | |
| presigned_url: str # Cached, 7-day expiry | |
| presigned_expires_at: datetime | |
| metadata: Dict[str, Any] = Field(default_factory=dict) | |
| class Config: | |
| json_encoders = { | |
| datetime: lambda v: v.isoformat() + "Z" if v else None | |
| } | |
| # ======================== | |
| # S3 CONTENT SCHEMAS | |
| # ======================== | |
| class MessageS3Content(BaseModel): | |
| """Schema for message content stored in S3""" | |
| message_id: str | |
| role: str | |
| content: str | |
| timestamp: str | |
| file_data: Dict[str, Any] = Field(default_factory=dict) | |
| class ComponentOutputS3(BaseModel): | |
| """Schema for individual component output in final_output.json""" | |
| component_id: str | |
| name: str | |
| order: int | |
| status: str | |
| component_output: Optional[Any] = None | |
| hasError: bool = False | |
| error: Optional[Dict[str, Any]] = None | |
| success_message: Optional[str] = None | |
| metadata: Optional[Dict[str, Any]] = None | |
| result: Optional[Dict[str, Any]] = None | |
| class FinalOutputS3(BaseModel): | |
| """Schema for final_output.json in S3""" | |
| workflow_name: str | |
| workflow_status: str # "completed" or "failed" | |
| last_node_output: Optional[str] = None # Output from last successful node ONLY | |
| components: List[ComponentOutputS3] | |
| execution_time: float | |
| timestamp: str | |
| class PipelineDefinitionS3(BaseModel): | |
| """Schema for pipeline definition stored in S3""" | |
| pipeline_name: str | |
| pipeline_steps: List[Dict[str, Any]] # Component configurations | |
| target_lang: Optional[str] = None | |
| reason: str | |
| metadata: Dict[str, Any] = Field(default_factory=dict) | |
| _generator: str # "bedrock" or "gemini" | |
| _model: str | |
| created_at: str | |
| # ======================== | |
| # UTILITY FUNCTIONS | |
| # ======================== | |
| def generate_component_id(pipeline_id: str, index: int) -> str: | |
| """Generate component ID in format: pipeline_id_index""" | |
| return f"{pipeline_id}_{index}" | |
| def generate_message_id() -> str: | |
| """Generate unique message ID using MongoDB ObjectId format""" | |
| from bson import ObjectId | |
| return str(ObjectId()) | |
| def generate_output_id() -> str: | |
| """Generate unique output ID for downloads""" | |
| return f"output_{uuid.uuid4().hex[:12]}" | |
| def datetime_to_iso(dt: Optional[datetime]) -> Optional[str]: | |
| """Convert datetime to ISO string""" | |
| return dt.isoformat() + "Z" if dt else None | |
| def iso_to_datetime(iso_str: Optional[str]) -> Optional[datetime]: | |
| """Convert ISO string to datetime""" | |
| if not iso_str: | |
| return None | |
| # Remove 'Z' if present | |
| iso_str = iso_str.rstrip('Z') | |
| return datetime.fromisoformat(iso_str) | |
| def schema_to_dict(schema: BaseModel) -> Dict[str, Any]: | |
| """Convert Pydantic schema to dict with datetime serialization""" | |
| return schema.model_dump(mode='json') | |
| # ======================== | |
| # INDEX DEFINITIONS | |
| # ======================== | |
| # Define indexes for MongoDB collections | |
| COLLECTION_INDEXES = { | |
| "sessions": [ | |
| {"keys": [("session_id", 1)], "unique": True}, | |
| {"keys": [("last_activity", -1)]}, | |
| {"keys": [("user_id", 1)]}, | |
| ], | |
| "messages": [ | |
| {"keys": [("message_id", 1)], "unique": True}, | |
| {"keys": [("session_id", 1), ("timestamp", 1)]}, # Chronological retrieval | |
| ], | |
| "pipelines": [ | |
| {"keys": [("execution_id", 1)], "unique": True}, | |
| {"keys": [("session_id", 1), ("created_at", -1)]}, | |
| {"keys": [("status", 1)]}, | |
| ], | |
| "workflows": [ | |
| {"keys": [("workflow_id", 1)], "unique": True}, | |
| {"keys": [("saved_at", -1)]}, | |
| {"keys": [("session_id", 1)]}, | |
| ], | |
| "files": [ | |
| {"keys": [("file_id", 1)], "unique": True}, | |
| {"keys": [("session_id", 1)]}, | |
| ], | |
| } | |
| def create_indexes(db): | |
| """ | |
| Create all required indexes on MongoDB database | |
| Args: | |
| db: pymongo database instance | |
| """ | |
| for collection_name, indexes in COLLECTION_INDEXES.items(): | |
| collection = db[collection_name] | |
| for index_def in indexes: | |
| collection.create_index( | |
| index_def["keys"], | |
| unique=index_def.get("unique", False) | |
| ) | |
| print("✅ All MongoDB indexes created successfully") | |