""" Database Module - MongoDB schema and operations for file metadata """ import logging from typing import List, Optional, Dict, Any from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase from datetime import datetime logger = logging.getLogger(__name__) class Database: """MongoDB database manager for file metadata""" def __init__(self, mongo_uri: str): self.client: AsyncIOMotorClient = AsyncIOMotorClient(mongo_uri) self.db: AsyncIOMotorDatabase = self.client.telegram_streamer self.files_collection = self.db.files async def initialize(self): """Create indexes for optimal query performance""" try: await self.files_collection.create_index("unique_id", unique=True) await self.files_collection.create_index("created_at") logger.info("Database indexes created successfully") except Exception as e: logger.error(f"Error creating indexes: {e}") async def create_file_metadata( self, unique_id: str, filename: str, total_size: int, part_size: int, mime_type: str = "application/octet-stream" ) -> Dict[str, Any]: """ Create initial file metadata entry Args: unique_id: Unique identifier for the file filename: Original filename total_size: Total file size in bytes part_size: Size of each part (except last) mime_type: MIME type of the file Returns: Created document """ document = { "unique_id": unique_id, "filename": filename, "total_size": total_size, "part_size": part_size, "mime_type": mime_type, "parts": [], "total_parts": 0, "upload_status": "in_progress", "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } await self.files_collection.insert_one(document) logger.info(f"Created metadata for {unique_id}: {filename} ({total_size} bytes)") return document async def add_file_part( self, unique_id: str, part_number: int, file_id: str, part_size: int ) -> bool: """ Add a file part to the metadata Args: unique_id: Unique identifier for the file part_number: Part sequence number (0-indexed) file_id: Telegram file_id for this part part_size: Actual size of this part in bytes Returns: Success status """ try: result = await self.files_collection.update_one( {"unique_id": unique_id}, { "$push": { "parts": { "part_number": part_number, "file_id": file_id, "size": part_size } }, "$inc": {"total_parts": 1}, "$set": {"updated_at": datetime.utcnow()} } ) if result.modified_count > 0: logger.info(f"Added part {part_number} to {unique_id}") return True return False except Exception as e: logger.error(f"Error adding part {part_number} to {unique_id}: {e}") return False async def complete_upload(self, unique_id: str) -> bool: """Mark upload as complete""" try: result = await self.files_collection.update_one( {"unique_id": unique_id}, { "$set": { "upload_status": "completed", "updated_at": datetime.utcnow() } } ) if result.modified_count > 0: logger.info(f"Upload completed for {unique_id}") return True return False except Exception as e: logger.error(f"Error completing upload for {unique_id}: {e}") return False async def mark_upload_failed(self, unique_id: str, error: str) -> bool: """Mark upload as failed""" try: result = await self.files_collection.update_one( {"unique_id": unique_id}, { "$set": { "upload_status": "failed", "error": error, "updated_at": datetime.utcnow() } } ) if result.modified_count > 0: logger.info(f"Upload failed for {unique_id}: {error}") return True return False except Exception as e: logger.error(f"Error marking upload as failed for {unique_id}: {e}") return False async def get_file_metadata(self, unique_id: str) -> Optional[Dict[str, Any]]: """ Retrieve file metadata by unique_id Args: unique_id: Unique identifier for the file Returns: File metadata document or None if not found """ try: doc = await self.files_collection.find_one({"unique_id": unique_id}) if doc: # Sort parts by part_number for ordered retrieval if "parts" in doc and doc["parts"]: doc["parts"] = sorted(doc["parts"], key=lambda x: x["part_number"]) return doc except Exception as e: logger.error(f"Error retrieving metadata for {unique_id}: {e}") return None async def delete_file_metadata(self, unique_id: str) -> bool: """Delete file metadata""" try: result = await self.files_collection.delete_one({"unique_id": unique_id}) if result.deleted_count > 0: logger.info(f"Deleted metadata for {unique_id}") return True return False except Exception as e: logger.error(f"Error deleting metadata for {unique_id}: {e}") return False async def get_all_files(self, limit: int = 100, skip: int = 0) -> List[Dict[str, Any]]: """Get list of all files with pagination""" try: cursor = self.files_collection.find().sort("created_at", -1).skip(skip).limit(limit) files = await cursor.to_list(length=limit) return files except Exception as e: logger.error(f"Error retrieving file list: {e}") return [] async def close(self): """Close database connection""" self.client.close() logger.info("Database connection closed")