Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| from telethon import TelegramClient, errors | |
| # def load_state(): | |
| # if os.path.exists(STATE_FILE): | |
| # try: | |
| # with open(STATE_FILE, 'r') as f: | |
| # return json.load(f) | |
| # except: | |
| # return {"downloaded_ids": []} | |
| # return {"downloaded_ids": []} | |
| # def save_state(state): | |
| # with open(STATE_FILE, 'w') as f: | |
| # json.dump(state, f, indent=2) | |
| # async def download_channel(): | |
| # os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # # Load previous download state | |
| # state = load_state() | |
| # downloaded_ids = set(state["downloaded_ids"]) | |
| # # Initialize client with your session | |
| # client = TelegramClient(SESSION_FILE, API_ID, API_HASH) | |
| import asyncio | |
| import json | |
| import os | |
| from typing import Dict, Any | |
| from telethon import TelegramClient, errors | |
| from huggingface_hub import HfApi, hf_hub_download | |
| # Configuration - Edit these variables | |
| CHANNEL = "cgsvalka" # Channel username or ID to download from | |
| SESSION_FILE = "my_session.session" # Your existing session file | |
| OUTPUT_DIR = "downloads" # Where to save downloaded files | |
| API_ID = 28708692 # Your Telegram API ID | |
| API_HASH = "72fa6a22c65d7a58e00f2ccb8d60841d" # Your Telegram API Hash | |
| MESSAGE_LIMIT = 0 # 0 = download all messages, or set a number for testing | |
| STATE_FILE = "download_state.json" # Where to save download progress | |
| # Hugging Face dataset repo where files and state will be uploaded | |
| # Read the HF token from environment for safety. Set HF_TOKEN env var before running. | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") # Hugging Face token with write permission (empty = disabled) | |
| HF_REPO_ID = "Fred808/TGFiles" | |
| STATE_FILE = "download_state.json" # Local filename for state | |
| def load_local_state() -> Dict[str, Any]: | |
| if os.path.exists(STATE_FILE): | |
| try: | |
| with open(STATE_FILE, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return {"downloaded_files": []} | |
| return {"downloaded_files": []} | |
| def save_local_state(state: Dict[str, Any]) -> None: | |
| with open(STATE_FILE, "w", encoding="utf-8") as f: | |
| json.dump(state, f, indent=2, ensure_ascii=False) | |
| def download_state_from_hf(token: str) -> Dict[str, Any]: | |
| """Try to download the state file from the HF dataset. Returns state dict or empty state.""" | |
| if not token: | |
| return {"downloaded_files": []} | |
| try: | |
| # hf_hub_download will raise if file doesn't exist | |
| local_path = hf_hub_download(repo_id=HF_REPO_ID, filename=STATE_FILE, repo_type="dataset", token=token) | |
| with open(local_path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return {"downloaded_files": []} | |
| def upload_file_to_hf(local_path: str, path_in_repo: str, token: str) -> bool: | |
| """Upload a single file to the HF dataset repo. Returns True on success.""" | |
| if not token: | |
| return False | |
| try: | |
| api = HfApi() | |
| api.upload_file(path_or_fileobj=local_path, path_in_repo=path_in_repo, repo_id=HF_REPO_ID, repo_type="dataset", token=token) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to upload {local_path} to HF: {e}") | |
| return False | |
| def upload_state_to_hf(state: Dict[str, Any], token: str) -> bool: | |
| # write temp state file and upload | |
| save_local_state(state) | |
| return upload_file_to_hf(STATE_FILE, STATE_FILE, token) | |
| async def download_channel(): | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Try to download remote state first (if token provided), then merge with local state | |
| remote_state = download_state_from_hf(HF_TOKEN) if HF_TOKEN else {"downloaded_files": []} | |
| local_state = load_local_state() | |
| # Merge: prefer remote entries, then local missing ones | |
| downloaded_files = { (e.get("message_id"), e.get("filename")) for e in remote_state.get("downloaded_files", []) } | |
| for e in local_state.get("downloaded_files", []): | |
| downloaded_files.add((e.get("message_id"), e.get("filename"))) | |
| # Recreate ordered list | |
| downloaded_list = [ {"message_id": mid, "filename": fname} for (mid, fname) in downloaded_files if mid is not None ] | |
| state: Dict[str, Any] = {"downloaded_files": downloaded_list} | |
| # Build quick lookup set of message ids to skip | |
| downloaded_ids = {entry["message_id"] for entry in state["downloaded_files"]} | |
| # Initialize client with your session | |
| client = TelegramClient(SESSION_FILE, API_ID, API_HASH) | |
| async with client: | |
| try: | |
| entity = await client.get_entity(CHANNEL) | |
| except Exception as e: | |
| print(f"Failed to resolve channel '{CHANNEL}': {e}") | |
| return 1 | |
| print(f"Starting download from: {entity.title if hasattr(entity, 'title') else CHANNEL}") | |
| count = 0 | |
| downloaded = 0 | |
| skipped = 0 | |
| not_rar = 0 | |
| try: | |
| async for message in client.iter_messages(entity, limit=MESSAGE_LIMIT or None): | |
| count += 1 | |
| # Skip if already downloaded according to state | |
| if message.id in downloaded_ids: | |
| skipped += 1 | |
| continue | |
| if not message.media: | |
| continue | |
| # Check if it's a RAR file | |
| is_rar = False | |
| filename = "" | |
| if message.file: | |
| filename = getattr(message.file, 'name', '') or '' | |
| if filename: | |
| is_rar = filename.lower().endswith('.rar') | |
| else: | |
| mime_type = getattr(message.file, 'mime_type', '') or '' | |
| is_rar = 'rar' in mime_type.lower() if mime_type else False | |
| if not is_rar: | |
| not_rar += 1 | |
| continue | |
| # Use message ID and original filename for saved file | |
| if filename: | |
| suggested = f"{message.id}_{filename}" | |
| else: | |
| suggested = f"{message.id}.rar" | |
| out_path = os.path.join(OUTPUT_DIR, suggested) | |
| # Download the RAR file | |
| try: | |
| print(f"[{count}] downloading -> {os.path.basename(out_path)}") | |
| await client.download_media(message, file=out_path) | |
| downloaded += 1 | |
| # Upload the RAR file to the HF dataset (path files/<basename>) | |
| if HF_TOKEN: | |
| path_in_repo = f"files/{os.path.basename(out_path)}" | |
| ok = upload_file_to_hf(out_path, path_in_repo, HF_TOKEN) | |
| if not ok: | |
| print(f"Warning: failed to upload {out_path} to HF repo {HF_REPO_ID}") | |
| # Update state after successful download (and attempted upload) | |
| state["downloaded_files"].append({"message_id": message.id, "filename": os.path.basename(out_path)}) | |
| downloaded_ids.add(message.id) | |
| save_local_state(state) | |
| # Upload updated state to HF | |
| if HF_TOKEN: | |
| upload_state_to_hf(state, HF_TOKEN) | |
| # Be polite to the server | |
| await asyncio.sleep(0.2) | |
| except errors.FloodWaitError as fw: | |
| wait = int(fw.seconds) if fw.seconds else 60 | |
| print(f"Hit FloodWait: sleeping {wait}s") | |
| await asyncio.sleep(wait + 1) | |
| except KeyboardInterrupt: | |
| print("Interrupted by user; saving state and exiting.") | |
| save_local_state(state) | |
| if HF_TOKEN: | |
| upload_state_to_hf(state, HF_TOKEN) | |
| break | |
| except Exception as e: | |
| print(f"Error while downloading message {message.id}: {e}") | |
| except KeyboardInterrupt: | |
| print("Interrupted by user; saving final state.") | |
| save_local_state(state) | |
| if HF_TOKEN: | |
| upload_state_to_hf(state, HF_TOKEN) | |
| print(f"\nFinal Statistics:") | |
| print(f"Messages scanned: {count}") | |
| print(f"RAR files downloaded: {downloaded}") | |
| print(f"Already downloaded (skipped): {skipped}") | |
| print(f"Non-RAR files skipped: {not_rar}") | |
| print(f"\nDownload state saved to: {STATE_FILE}") | |
| return 0 | |
| if __name__ == "__main__": | |
| asyncio.run(download_channel()) |