Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from typing import Dict, Optional, Any | |
| import asyncio | |
| import secrets | |
| import time | |
| class UploadJob: | |
| upload_id: str | |
| user_id: int | |
| chat_id: int | |
| src_msg_id: int | |
| file_type: str # "video" | "document" | |
| tg_file_id: str | |
| file_name: str | |
| caption: str | |
| privacy: str = "private" | |
| title_mode: str = "caption" # caption | filename | custom | |
| custom_title: str = "" | |
| status_msg_id: Optional[int] = None | |
| created_at: float = field(default_factory=time.time) | |
| class MemoryState: | |
| def __init__(self): | |
| self.uploads: Dict[str, UploadJob] = {} | |
| self.waiting_client_id: Dict[int, bool] = {} | |
| self.waiting_client_secret: Dict[int, str] = {} # user_id -> client_id | |
| self.waiting_custom_title: Dict[int, str] = {} # user_id -> upload_id | |
| self.auto_mode: Dict[int, bool] = {} # user_id -> bool | |
| self.sem = asyncio.Semaphore(1) # overwritten by settings at runtime | |
| def new_upload_id(self) -> str: | |
| return secrets.token_urlsafe(8)[:10] | |
| STATE = MemoryState() |