Spaces:
Runtime error
Runtime error
File size: 6,994 Bytes
18b952c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | """
Database Module - MongoDB schema and operations for file metadata
"""
import logging
from typing import List, Optional, Dict, Any
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from datetime import datetime
logger = logging.getLogger(__name__)
class Database:
"""MongoDB database manager for file metadata"""
def __init__(self, mongo_uri: str):
self.client: AsyncIOMotorClient = AsyncIOMotorClient(mongo_uri)
self.db: AsyncIOMotorDatabase = self.client.telegram_streamer
self.files_collection = self.db.files
async def initialize(self):
"""Create indexes for optimal query performance"""
try:
await self.files_collection.create_index("unique_id", unique=True)
await self.files_collection.create_index("created_at")
logger.info("Database indexes created successfully")
except Exception as e:
logger.error(f"Error creating indexes: {e}")
async def create_file_metadata(
self,
unique_id: str,
filename: str,
total_size: int,
part_size: int,
mime_type: str = "application/octet-stream"
) -> Dict[str, Any]:
"""
Create initial file metadata entry
Args:
unique_id: Unique identifier for the file
filename: Original filename
total_size: Total file size in bytes
part_size: Size of each part (except last)
mime_type: MIME type of the file
Returns:
Created document
"""
document = {
"unique_id": unique_id,
"filename": filename,
"total_size": total_size,
"part_size": part_size,
"mime_type": mime_type,
"parts": [],
"total_parts": 0,
"upload_status": "in_progress",
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
await self.files_collection.insert_one(document)
logger.info(f"Created metadata for {unique_id}: {filename} ({total_size} bytes)")
return document
async def add_file_part(
self,
unique_id: str,
part_number: int,
file_id: str,
part_size: int
) -> bool:
"""
Add a file part to the metadata
Args:
unique_id: Unique identifier for the file
part_number: Part sequence number (0-indexed)
file_id: Telegram file_id for this part
part_size: Actual size of this part in bytes
Returns:
Success status
"""
try:
result = await self.files_collection.update_one(
{"unique_id": unique_id},
{
"$push": {
"parts": {
"part_number": part_number,
"file_id": file_id,
"size": part_size
}
},
"$inc": {"total_parts": 1},
"$set": {"updated_at": datetime.utcnow()}
}
)
if result.modified_count > 0:
logger.info(f"Added part {part_number} to {unique_id}")
return True
return False
except Exception as e:
logger.error(f"Error adding part {part_number} to {unique_id}: {e}")
return False
async def complete_upload(self, unique_id: str) -> bool:
"""Mark upload as complete"""
try:
result = await self.files_collection.update_one(
{"unique_id": unique_id},
{
"$set": {
"upload_status": "completed",
"updated_at": datetime.utcnow()
}
}
)
if result.modified_count > 0:
logger.info(f"Upload completed for {unique_id}")
return True
return False
except Exception as e:
logger.error(f"Error completing upload for {unique_id}: {e}")
return False
async def mark_upload_failed(self, unique_id: str, error: str) -> bool:
"""Mark upload as failed"""
try:
result = await self.files_collection.update_one(
{"unique_id": unique_id},
{
"$set": {
"upload_status": "failed",
"error": error,
"updated_at": datetime.utcnow()
}
}
)
if result.modified_count > 0:
logger.info(f"Upload failed for {unique_id}: {error}")
return True
return False
except Exception as e:
logger.error(f"Error marking upload as failed for {unique_id}: {e}")
return False
async def get_file_metadata(self, unique_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieve file metadata by unique_id
Args:
unique_id: Unique identifier for the file
Returns:
File metadata document or None if not found
"""
try:
doc = await self.files_collection.find_one({"unique_id": unique_id})
if doc:
# Sort parts by part_number for ordered retrieval
if "parts" in doc and doc["parts"]:
doc["parts"] = sorted(doc["parts"], key=lambda x: x["part_number"])
return doc
except Exception as e:
logger.error(f"Error retrieving metadata for {unique_id}: {e}")
return None
async def delete_file_metadata(self, unique_id: str) -> bool:
"""Delete file metadata"""
try:
result = await self.files_collection.delete_one({"unique_id": unique_id})
if result.deleted_count > 0:
logger.info(f"Deleted metadata for {unique_id}")
return True
return False
except Exception as e:
logger.error(f"Error deleting metadata for {unique_id}: {e}")
return False
async def get_all_files(self, limit: int = 100, skip: int = 0) -> List[Dict[str, Any]]:
"""Get list of all files with pagination"""
try:
cursor = self.files_collection.find().sort("created_at", -1).skip(skip).limit(limit)
files = await cursor.to_list(length=limit)
return files
except Exception as e:
logger.error(f"Error retrieving file list: {e}")
return []
async def close(self):
"""Close database connection"""
self.client.close()
logger.info("Database connection closed")
|