copilot-swe-agent[bot]
feat: TMDB access token, robots.txt, refreshThumbnails, bot fixes, FA icons, SEO meta, sidebar credit, Android file picker fix
79bb8f3 unverified | from utils.downloader import ( | |
| download_file, | |
| get_file_info_from_url, | |
| ) | |
| import asyncio | |
| from pathlib import Path | |
| from contextlib import asynccontextmanager | |
| import aiofiles | |
| import aiohttp | |
| from fastapi import FastAPI, HTTPException, Request, File, UploadFile, Form, Response | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from config import ADMIN_PASSWORD, MAX_FILE_SIZE, STORAGE_CHANNEL, TMDB_API_KEY, TMDB_ACCESS_TOKEN | |
| from utils.clients import initialize_clients | |
| from utils.directoryHandler import getRandomID | |
| from utils.extra import auto_ping_website, convert_class_to_dict, reset_cache_dir | |
| from utils.streamer import media_streamer | |
| from utils.uploader import start_file_uploader, THUMBNAIL_DIR | |
| from utils.logger import Logger | |
| import urllib.parse | |
| # Startup Event | |
| async def lifespan(app: FastAPI): | |
| # Reset the cache directory, delete cache files | |
| reset_cache_dir() | |
| # Initialize the clients | |
| clients_initialized = await initialize_clients() | |
| # If clients failed to initialize, initialize DRIVE_DATA in offline mode | |
| if not clients_initialized: | |
| logger.warning("No Telegram clients available - Initializing in OFFLINE MODE") | |
| from utils.directoryHandler import initDriveDataWithoutClients | |
| await initDriveDataWithoutClients() | |
| logger.warning("β Website is running in OFFLINE MODE - Telegram features are disabled") | |
| logger.info("β Website UI is available - File operations requiring Telegram will show errors") | |
| # Start the website auto ping task | |
| asyncio.create_task(auto_ping_website()) | |
| yield | |
| app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan) | |
| logger = Logger(__name__) | |
| async def home_page(): | |
| return FileResponse("website/home.html") | |
| async def home_page(): | |
| return FileResponse("website/VideoPlayer.html") | |
| async def static_files(file_path): | |
| if "apiHandler.js" in file_path: | |
| with open(Path("website/static/js/apiHandler.js")) as f: | |
| content = f.read() | |
| content = content.replace("MAX_FILE_SIZE__SDGJDG", str(MAX_FILE_SIZE)) | |
| return Response(content=content, media_type="application/javascript") | |
| return FileResponse(f"website/static/{file_path}") | |
| async def dl_file(request: Request): | |
| from utils.directoryHandler import DRIVE_DATA | |
| from utils.clients import has_clients | |
| if not has_clients(): | |
| raise HTTPException(status_code=503, detail="Telegram clients not available - Service running in offline mode") | |
| path = request.query_params["path"] | |
| file = DRIVE_DATA.get_file(path) | |
| return await media_streamer(STORAGE_CHANNEL, file.file_id, file.name, request) | |
| # Api Routes | |
| async def check_password(request: Request): | |
| data = await request.json() | |
| if data["pass"] == ADMIN_PASSWORD: | |
| return JSONResponse({"status": "ok"}) | |
| return JSONResponse({"status": "Invalid password"}) | |
| async def api_new_folder(request: Request): | |
| from utils.directoryHandler import DRIVE_DATA | |
| data = await request.json() | |
| if data["password"] != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| logger.info(f"createNewFolder {data}") | |
| folder_data = DRIVE_DATA.get_directory(data["path"]).contents | |
| for id in folder_data: | |
| f = folder_data[id] | |
| if f.type == "folder": | |
| if f.name == data["name"]: | |
| return JSONResponse( | |
| { | |
| "status": "Folder with the name already exist in current directory" | |
| } | |
| ) | |
| DRIVE_DATA.new_folder(data["path"], data["name"]) | |
| return JSONResponse({"status": "ok"}) | |
| async def api_get_directory(request: Request): | |
| from utils.directoryHandler import DRIVE_DATA | |
| data = await request.json() | |
| if data["password"] == ADMIN_PASSWORD: | |
| is_admin = True | |
| else: | |
| is_admin = False | |
| auth = data.get("auth") | |
| logger.info(f"getFolder {data}") | |
| if data["path"] == "/trash": | |
| data = {"contents": DRIVE_DATA.get_trashed_files_folders()} | |
| folder_data = convert_class_to_dict(data, isObject=False, showtrash=True) | |
| elif "/search_" in data["path"]: | |
| query = urllib.parse.unquote(data["path"].split("_", 1)[1]) | |
| print(query) | |
| data = {"contents": DRIVE_DATA.search_file_folder(query)} | |
| print(data) | |
| folder_data = convert_class_to_dict(data, isObject=False, showtrash=False) | |
| print(folder_data) | |
| elif "/share_" in data["path"]: | |
| path = data["path"].split("_", 1)[1] | |
| folder_data, auth_home_path = DRIVE_DATA.get_directory(path, is_admin, auth) | |
| auth_home_path= auth_home_path.replace("//", "/") if auth_home_path else None | |
| folder_data = convert_class_to_dict(folder_data, isObject=True, showtrash=False) | |
| return JSONResponse( | |
| {"status": "ok", "data": folder_data, "auth_home_path": auth_home_path} | |
| ) | |
| else: | |
| folder_data = DRIVE_DATA.get_directory(data["path"]) | |
| folder_data = convert_class_to_dict(folder_data, isObject=True, showtrash=False) | |
| return JSONResponse({"status": "ok", "data": folder_data, "auth_home_path": None}) | |
| SAVE_PROGRESS = {} | |
| async def upload_file( | |
| file: UploadFile = File(...), | |
| path: str = Form(...), | |
| password: str = Form(...), | |
| id: str = Form(...), | |
| total_size: str = Form(...), | |
| ): | |
| global SAVE_PROGRESS | |
| from utils.clients import has_clients | |
| if password != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| if not has_clients(): | |
| return JSONResponse({"status": "Telegram clients not available - Service running in offline mode"}) | |
| total_size = int(total_size) | |
| SAVE_PROGRESS[id] = ("running", 0, total_size) | |
| ext = file.filename.lower().split(".")[-1] | |
| cache_dir = Path("./cache") | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| file_location = cache_dir / f"{id}.{ext}" | |
| file_size = 0 | |
| async with aiofiles.open(file_location, "wb") as buffer: | |
| while chunk := await file.read(1024 * 1024): # Read file in chunks of 1MB | |
| SAVE_PROGRESS[id] = ("running", file_size, total_size) | |
| file_size += len(chunk) | |
| if file_size > MAX_FILE_SIZE: | |
| await buffer.close() | |
| file_location.unlink() # Delete the partially written file | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"File size exceeds {MAX_FILE_SIZE} bytes limit", | |
| ) | |
| await buffer.write(chunk) | |
| SAVE_PROGRESS[id] = ("completed", file_size, file_size) | |
| asyncio.create_task( | |
| start_file_uploader(file_location, id, path, file.filename, file_size) | |
| ) | |
| return JSONResponse({"id": id, "status": "ok"}) | |
| async def get_save_progress(request: Request): | |
| global SAVE_PROGRESS | |
| data = await request.json() | |
| if data["password"] != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| logger.info(f"getUploadProgress {data}") | |
| try: | |
| progress = SAVE_PROGRESS[data["id"]] | |
| return JSONResponse({"status": "ok", "data": progress}) | |
| except: | |
| return JSONResponse({"status": "not found"}) | |
| async def get_upload_progress(request: Request): | |
| from utils.uploader import PROGRESS_CACHE | |
| data = await request.json() | |
| if data["password"] != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| logger.info(f"getUploadProgress {data}") | |
| try: | |
| progress = PROGRESS_CACHE[data["id"]] | |
| return JSONResponse({"status": "ok", "data": progress}) | |
| except: | |
| return JSONResponse({"status": "not found"}) | |
| async def cancel_upload(request: Request): | |
| from utils.uploader import STOP_TRANSMISSION | |
| from utils.downloader import STOP_DOWNLOAD | |
| data = await request.json() | |
| if data["password"] != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| logger.info(f"cancelUpload {data}") | |
| STOP_TRANSMISSION.append(data["id"]) | |
| STOP_DOWNLOAD.append(data["id"]) | |
| return JSONResponse({"status": "ok"}) | |
| async def rename_file_folder(request: Request): | |
| from utils.directoryHandler import DRIVE_DATA | |
| data = await request.json() | |
| if data["password"] != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| logger.info(f"renameFileFolder {data}") | |
| DRIVE_DATA.rename_file_folder(data["path"], data["name"]) | |
| return JSONResponse({"status": "ok"}) | |
| async def trash_file_folder(request: Request): | |
| from utils.directoryHandler import DRIVE_DATA | |
| data = await request.json() | |
| if data["password"] != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| logger.info(f"trashFileFolder {data}") | |
| DRIVE_DATA.trash_file_folder(data["path"], data["trash"]) | |
| return JSONResponse({"status": "ok"}) | |
| async def delete_file_folder(request: Request): | |
| from utils.directoryHandler import DRIVE_DATA | |
| data = await request.json() | |
| if data["password"] != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| logger.info(f"deleteFileFolder {data}") | |
| DRIVE_DATA.delete_file_folder(data["path"]) | |
| return JSONResponse({"status": "ok"}) | |
| async def getFileInfoFromUrl(request: Request): | |
| data = await request.json() | |
| if data["password"] != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| logger.info(f"getFileInfoFromUrl {data}") | |
| try: | |
| file_info = await get_file_info_from_url(data["url"]) | |
| return JSONResponse({"status": "ok", "data": file_info}) | |
| except Exception as e: | |
| return JSONResponse({"status": str(e)}) | |
| async def startFileDownloadFromUrl(request: Request): | |
| from utils.clients import has_clients | |
| data = await request.json() | |
| if data["password"] != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| if not has_clients(): | |
| return JSONResponse({"status": "Telegram clients not available - Service running in offline mode"}) | |
| logger.info(f"startFileDownloadFromUrl {data}") | |
| try: | |
| id = getRandomID() | |
| asyncio.create_task( | |
| download_file(data["url"], id, data["path"], data["filename"], data["singleThreaded"]) | |
| ) | |
| return JSONResponse({"status": "ok", "id": id}) | |
| except Exception as e: | |
| return JSONResponse({"status": str(e)}) | |
| async def getFileDownloadProgress(request: Request): | |
| from utils.downloader import DOWNLOAD_PROGRESS | |
| data = await request.json() | |
| if data["password"] != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| logger.info(f"getFileDownloadProgress {data}") | |
| try: | |
| progress = DOWNLOAD_PROGRESS[data["id"]] | |
| return JSONResponse({"status": "ok", "data": progress}) | |
| except: | |
| return JSONResponse({"status": "not found"}) | |
| async def getFolderShareAuth(request: Request): | |
| from utils.directoryHandler import DRIVE_DATA | |
| data = await request.json() | |
| if data["password"] != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| logger.info(f"getFolderShareAuth {data}") | |
| try: | |
| auth = DRIVE_DATA.get_folder_auth(data["path"]) | |
| return JSONResponse({"status": "ok", "auth": auth}) | |
| except: | |
| return JSONResponse({"status": "not found"}) | |
| # ββ Thumbnail Endpoint ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def get_thumbnail(file_id: int): | |
| """Serve a cached thumbnail by Telegram message ID. | |
| file_id must be a positive integer (Telegram message ID).""" | |
| if file_id <= 0: | |
| raise HTTPException(status_code=400, detail="Invalid file_id") | |
| # Use a safe integer-only filename to prevent path traversal | |
| thumb_path = (THUMBNAIL_DIR / f"{int(file_id)}.jpg").resolve() | |
| # Ensure the resolved path is still inside THUMBNAIL_DIR | |
| if not str(thumb_path).startswith(str(THUMBNAIL_DIR.resolve())): | |
| raise HTTPException(status_code=400, detail="Invalid file_id") | |
| if thumb_path.exists(): | |
| return FileResponse(str(thumb_path), media_type="image/jpeg") | |
| from fastapi.responses import RedirectResponse | |
| return RedirectResponse(url="/static/assets/file-icon.svg", status_code=302) | |
| # ββ TMDB Proxy Endpoints ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| TMDB_BASE = "https://api.themoviedb.org/3" | |
| def _tmdb_configured(): | |
| return bool(TMDB_ACCESS_TOKEN or TMDB_API_KEY) | |
| def _tmdb_request_kwargs(params: dict) -> dict: | |
| """Return aiohttp request kwargs with proper auth headers or query params.""" | |
| if TMDB_ACCESS_TOKEN: | |
| return {"headers": {"Authorization": f"Bearer {TMDB_ACCESS_TOKEN}"}, "params": params} | |
| params["api_key"] = TMDB_API_KEY | |
| return {"params": params} | |
| async def robots_txt(): | |
| content = "User-agent: *\nAllow: /\nDisallow: /api/\nDisallow: /file\nDisallow: /thumbnail\n" | |
| return Response(content=content, media_type="text/plain") | |
| async def tmdb_search(query: str, page: int = 1): | |
| if not _tmdb_configured(): | |
| raise HTTPException(status_code=503, detail="TMDB_API_KEY not configured") | |
| if not query or len(query) > 200: | |
| raise HTTPException(status_code=400, detail="Invalid query") | |
| url = f"{TMDB_BASE}/search/multi" | |
| params = {"query": query[:200], "page": page, "include_adult": "false"} | |
| kwargs = _tmdb_request_kwargs(params) | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, **kwargs) as resp: | |
| if resp.status != 200: | |
| raise HTTPException(status_code=resp.status, detail="TMDB request failed") | |
| return JSONResponse(await resp.json()) | |
| async def tmdb_trending(media_type: str = "all", time_window: str = "week"): | |
| if not _tmdb_configured(): | |
| raise HTTPException(status_code=503, detail="TMDB_API_KEY not configured") | |
| url = f"{TMDB_BASE}/trending/{media_type}/{time_window}" | |
| kwargs = _tmdb_request_kwargs({}) | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, **kwargs) as resp: | |
| if resp.status != 200: | |
| raise HTTPException(status_code=resp.status, detail="TMDB request failed") | |
| return JSONResponse(await resp.json()) | |
| async def tmdb_movie_details(movie_id: int): | |
| if not _tmdb_configured(): | |
| raise HTTPException(status_code=503, detail="TMDB_API_KEY not configured") | |
| url = f"{TMDB_BASE}/movie/{movie_id}" | |
| kwargs = _tmdb_request_kwargs({"append_to_response": "videos,credits"}) | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, **kwargs) as resp: | |
| if resp.status != 200: | |
| raise HTTPException(status_code=resp.status, detail="TMDB request failed") | |
| return JSONResponse(await resp.json()) | |
| async def tmdb_tv_details(tv_id: int): | |
| if not _tmdb_configured(): | |
| raise HTTPException(status_code=503, detail="TMDB_API_KEY not configured") | |
| url = f"{TMDB_BASE}/tv/{tv_id}" | |
| kwargs = _tmdb_request_kwargs({"append_to_response": "videos,credits"}) | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, **kwargs) as resp: | |
| if resp.status != 200: | |
| raise HTTPException(status_code=resp.status, detail="TMDB request failed") | |
| return JSONResponse(await resp.json()) | |
| async def refresh_thumbnails(request: Request): | |
| from utils.directoryHandler import DRIVE_DATA | |
| from utils.clients import get_client, has_clients | |
| from utils.uploader import extract_thumbnail, THUMBNAIL_DIR | |
| data = await request.json() | |
| if data.get("password") != ADMIN_PASSWORD: | |
| return JSONResponse({"status": "Invalid password"}) | |
| if not has_clients(): | |
| return JSONResponse({"status": "Telegram clients not available"}) | |
| def collect_video_files(folder): | |
| files = [] | |
| for item in folder.contents.values(): | |
| if item.type == "file": | |
| mime = getattr(item, "mime_type", "") | |
| if mime.startswith("video/"): | |
| thumb_path = THUMBNAIL_DIR / f"{int(item.file_id)}.jpg" | |
| if not thumb_path.exists(): | |
| files.append(item) | |
| elif item.type == "folder": | |
| files.extend(collect_video_files(item)) | |
| return files | |
| root = DRIVE_DATA.get_directory("/") | |
| video_files = collect_video_files(root) | |
| updated = 0 | |
| errors = 0 | |
| client = get_client() | |
| for file in video_files: | |
| try: | |
| msg = await client.get_messages(STORAGE_CHANNEL, file.file_id) | |
| if msg: | |
| success = await extract_thumbnail(client, msg, file.file_id) | |
| if success: | |
| updated += 1 | |
| except Exception as e: | |
| logger.warning(f"refreshThumbnails: failed for file_id {file.file_id}: {e}") | |
| errors += 1 | |
| return JSONResponse({"status": "ok", "updated": updated, "errors": errors, "total": len(video_files)}) | |