Spaces:
Runtime error
Runtime error
| """ | |
| Database Module - MongoDB schema and operations for file metadata | |
| """ | |
| import logging | |
| from typing import List, Optional, Dict, Any | |
| from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class Database: | |
| """MongoDB database manager for file metadata""" | |
| def __init__(self, mongo_uri: str): | |
| self.client: AsyncIOMotorClient = AsyncIOMotorClient(mongo_uri) | |
| self.db: AsyncIOMotorDatabase = self.client.telegram_streamer | |
| self.files_collection = self.db.files | |
| async def initialize(self): | |
| """Create indexes for optimal query performance""" | |
| try: | |
| await self.files_collection.create_index("unique_id", unique=True) | |
| await self.files_collection.create_index("created_at") | |
| logger.info("Database indexes created successfully") | |
| except Exception as e: | |
| logger.error(f"Error creating indexes: {e}") | |
| async def create_file_metadata( | |
| self, | |
| unique_id: str, | |
| filename: str, | |
| total_size: int, | |
| part_size: int, | |
| mime_type: str = "application/octet-stream" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Create initial file metadata entry | |
| Args: | |
| unique_id: Unique identifier for the file | |
| filename: Original filename | |
| total_size: Total file size in bytes | |
| part_size: Size of each part (except last) | |
| mime_type: MIME type of the file | |
| Returns: | |
| Created document | |
| """ | |
| document = { | |
| "unique_id": unique_id, | |
| "filename": filename, | |
| "total_size": total_size, | |
| "part_size": part_size, | |
| "mime_type": mime_type, | |
| "parts": [], | |
| "total_parts": 0, | |
| "upload_status": "in_progress", | |
| "created_at": datetime.utcnow(), | |
| "updated_at": datetime.utcnow() | |
| } | |
| await self.files_collection.insert_one(document) | |
| logger.info(f"Created metadata for {unique_id}: {filename} ({total_size} bytes)") | |
| return document | |
| async def add_file_part( | |
| self, | |
| unique_id: str, | |
| part_number: int, | |
| file_id: str, | |
| part_size: int | |
| ) -> bool: | |
| """ | |
| Add a file part to the metadata | |
| Args: | |
| unique_id: Unique identifier for the file | |
| part_number: Part sequence number (0-indexed) | |
| file_id: Telegram file_id for this part | |
| part_size: Actual size of this part in bytes | |
| Returns: | |
| Success status | |
| """ | |
| try: | |
| result = await self.files_collection.update_one( | |
| {"unique_id": unique_id}, | |
| { | |
| "$push": { | |
| "parts": { | |
| "part_number": part_number, | |
| "file_id": file_id, | |
| "size": part_size | |
| } | |
| }, | |
| "$inc": {"total_parts": 1}, | |
| "$set": {"updated_at": datetime.utcnow()} | |
| } | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Added part {part_number} to {unique_id}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error adding part {part_number} to {unique_id}: {e}") | |
| return False | |
| async def complete_upload(self, unique_id: str) -> bool: | |
| """Mark upload as complete""" | |
| try: | |
| result = await self.files_collection.update_one( | |
| {"unique_id": unique_id}, | |
| { | |
| "$set": { | |
| "upload_status": "completed", | |
| "updated_at": datetime.utcnow() | |
| } | |
| } | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Upload completed for {unique_id}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error completing upload for {unique_id}: {e}") | |
| return False | |
| async def mark_upload_failed(self, unique_id: str, error: str) -> bool: | |
| """Mark upload as failed""" | |
| try: | |
| result = await self.files_collection.update_one( | |
| {"unique_id": unique_id}, | |
| { | |
| "$set": { | |
| "upload_status": "failed", | |
| "error": error, | |
| "updated_at": datetime.utcnow() | |
| } | |
| } | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Upload failed for {unique_id}: {error}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error marking upload as failed for {unique_id}: {e}") | |
| return False | |
| async def get_file_metadata(self, unique_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve file metadata by unique_id | |
| Args: | |
| unique_id: Unique identifier for the file | |
| Returns: | |
| File metadata document or None if not found | |
| """ | |
| try: | |
| doc = await self.files_collection.find_one({"unique_id": unique_id}) | |
| if doc: | |
| # Sort parts by part_number for ordered retrieval | |
| if "parts" in doc and doc["parts"]: | |
| doc["parts"] = sorted(doc["parts"], key=lambda x: x["part_number"]) | |
| return doc | |
| except Exception as e: | |
| logger.error(f"Error retrieving metadata for {unique_id}: {e}") | |
| return None | |
| async def delete_file_metadata(self, unique_id: str) -> bool: | |
| """Delete file metadata""" | |
| try: | |
| result = await self.files_collection.delete_one({"unique_id": unique_id}) | |
| if result.deleted_count > 0: | |
| logger.info(f"Deleted metadata for {unique_id}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error deleting metadata for {unique_id}: {e}") | |
| return False | |
| async def get_all_files(self, limit: int = 100, skip: int = 0) -> List[Dict[str, Any]]: | |
| """Get list of all files with pagination""" | |
| try: | |
| cursor = self.files_collection.find().sort("created_at", -1).skip(skip).limit(limit) | |
| files = await cursor.to_list(length=limit) | |
| return files | |
| except Exception as e: | |
| logger.error(f"Error retrieving file list: {e}") | |
| return [] | |
| async def close(self): | |
| """Close database connection""" | |
| self.client.close() | |
| logger.info("Database connection closed") | |