mrpoddaa's picture
Upload 13 files
18b952c verified
"""
Database Module - MongoDB schema and operations for file metadata
"""
import logging
from typing import List, Optional, Dict, Any
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from datetime import datetime
logger = logging.getLogger(__name__)
class Database:
"""MongoDB database manager for file metadata"""
def __init__(self, mongo_uri: str):
self.client: AsyncIOMotorClient = AsyncIOMotorClient(mongo_uri)
self.db: AsyncIOMotorDatabase = self.client.telegram_streamer
self.files_collection = self.db.files
async def initialize(self):
"""Create indexes for optimal query performance"""
try:
await self.files_collection.create_index("unique_id", unique=True)
await self.files_collection.create_index("created_at")
logger.info("Database indexes created successfully")
except Exception as e:
logger.error(f"Error creating indexes: {e}")
async def create_file_metadata(
self,
unique_id: str,
filename: str,
total_size: int,
part_size: int,
mime_type: str = "application/octet-stream"
) -> Dict[str, Any]:
"""
Create initial file metadata entry
Args:
unique_id: Unique identifier for the file
filename: Original filename
total_size: Total file size in bytes
part_size: Size of each part (except last)
mime_type: MIME type of the file
Returns:
Created document
"""
document = {
"unique_id": unique_id,
"filename": filename,
"total_size": total_size,
"part_size": part_size,
"mime_type": mime_type,
"parts": [],
"total_parts": 0,
"upload_status": "in_progress",
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
await self.files_collection.insert_one(document)
logger.info(f"Created metadata for {unique_id}: {filename} ({total_size} bytes)")
return document
async def add_file_part(
self,
unique_id: str,
part_number: int,
file_id: str,
part_size: int
) -> bool:
"""
Add a file part to the metadata
Args:
unique_id: Unique identifier for the file
part_number: Part sequence number (0-indexed)
file_id: Telegram file_id for this part
part_size: Actual size of this part in bytes
Returns:
Success status
"""
try:
result = await self.files_collection.update_one(
{"unique_id": unique_id},
{
"$push": {
"parts": {
"part_number": part_number,
"file_id": file_id,
"size": part_size
}
},
"$inc": {"total_parts": 1},
"$set": {"updated_at": datetime.utcnow()}
}
)
if result.modified_count > 0:
logger.info(f"Added part {part_number} to {unique_id}")
return True
return False
except Exception as e:
logger.error(f"Error adding part {part_number} to {unique_id}: {e}")
return False
async def complete_upload(self, unique_id: str) -> bool:
"""Mark upload as complete"""
try:
result = await self.files_collection.update_one(
{"unique_id": unique_id},
{
"$set": {
"upload_status": "completed",
"updated_at": datetime.utcnow()
}
}
)
if result.modified_count > 0:
logger.info(f"Upload completed for {unique_id}")
return True
return False
except Exception as e:
logger.error(f"Error completing upload for {unique_id}: {e}")
return False
async def mark_upload_failed(self, unique_id: str, error: str) -> bool:
"""Mark upload as failed"""
try:
result = await self.files_collection.update_one(
{"unique_id": unique_id},
{
"$set": {
"upload_status": "failed",
"error": error,
"updated_at": datetime.utcnow()
}
}
)
if result.modified_count > 0:
logger.info(f"Upload failed for {unique_id}: {error}")
return True
return False
except Exception as e:
logger.error(f"Error marking upload as failed for {unique_id}: {e}")
return False
async def get_file_metadata(self, unique_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieve file metadata by unique_id
Args:
unique_id: Unique identifier for the file
Returns:
File metadata document or None if not found
"""
try:
doc = await self.files_collection.find_one({"unique_id": unique_id})
if doc:
# Sort parts by part_number for ordered retrieval
if "parts" in doc and doc["parts"]:
doc["parts"] = sorted(doc["parts"], key=lambda x: x["part_number"])
return doc
except Exception as e:
logger.error(f"Error retrieving metadata for {unique_id}: {e}")
return None
async def delete_file_metadata(self, unique_id: str) -> bool:
"""Delete file metadata"""
try:
result = await self.files_collection.delete_one({"unique_id": unique_id})
if result.deleted_count > 0:
logger.info(f"Deleted metadata for {unique_id}")
return True
return False
except Exception as e:
logger.error(f"Error deleting metadata for {unique_id}: {e}")
return False
async def get_all_files(self, limit: int = 100, skip: int = 0) -> List[Dict[str, Any]]:
"""Get list of all files with pagination"""
try:
cursor = self.files_collection.find().sort("created_at", -1).skip(skip).limit(limit)
files = await cursor.to_list(length=limit)
return files
except Exception as e:
logger.error(f"Error retrieving file list: {e}")
return []
async def close(self):
"""Close database connection"""
self.client.close()
logger.info("Database connection closed")