import asyncio import json import os import threading import time import shutil import urllib.request from pathlib import Path from datetime import datetime import http.server from pyrogram import Client from huggingface_hub import HfApi HF_TOKEN = os.environ.get("HF_TOKEN_USER", "") or os.environ.get("HF_TOKEN", "") or os.environ.get("HF_TOKEN_ORG", "") DATASET_NAMESPACE = os.environ.get("DATASET_NAMESPACE", "") DATASET_REPO = f"{DATASET_NAMESPACE}/archive" if DATASET_NAMESPACE else None ARCHIVER_BOT_TOKEN = os.environ.get("ARCHIVER_BOT_TOKEN", "") API_ID = int(os.environ.get("TELEGRAM_API_ID") or "36439874") API_HASH = os.environ.get("TELEGRAM_API_HASH") or "969852454979ce59df096dd40074c146" CHANNEL_ID = int(os.environ.get("ARCHIVE_CHANNEL_ID") or "-1003900328608") ORG_API_URL = "https://mml-analysing-shyper-and-api-mid-structer-api.hf.space" DATA_DIR = Path("/tmp/archiver_data") DATA_DIR.mkdir(exist_ok=True) ARCHIVE_DIR = DATA_DIR / "files" ARCHIVE_DIR.mkdir(exist_ok=True) pyro_client = None pyro_loop = None _pyro_ready = threading.Event() _pyro_lock = threading.Lock() def _log(msg): print(f"[{datetime.utcnow().isoformat()}] {msg}", flush=True) def _report_progress(archive_id: str, status: str, step: str, pct: int, msg: str, extra: dict = None): try: payload = { "archive_id": archive_id, "status": status, "step": step, "progress_pct": pct, "message": msg, "disk_free": shutil.disk_usage("/tmp").free, "disk_total": shutil.disk_usage("/tmp").total, "archiver_space": DATASET_REPO or "unknown", } if extra: payload.update(extra) data = json.dumps(payload).encode() url = f"{ORG_API_URL}/archive/progress/update" req = urllib.request.Request(url, data=data, method="POST", headers={"Content-Type": "application/json", "Authorization": f"Bearer {HF_TOKEN}"}) urllib.request.urlopen(req, timeout=15) except Exception as e: _log(f"Progress report failed: {e}") def _start_pyrogram(): global pyro_client, pyro_loop if not ARCHIVER_BOT_TOKEN: _log("No ARCHIVER_BOT_TOKEN set - pyrogram disabled") return pyro_loop = asyncio.new_event_loop() asyncio.set_event_loop(pyro_loop) async def _run(): global pyro_client if not API_ID or not API_HASH: _log("ERROR: TELEGRAM_API_ID or TELEGRAM_API_HASH not set — Pyrogram cannot start") return os.makedirs("/tmp/pyro", exist_ok=True) while True: old_client = pyro_client try: pyro_client = Client( "archiver_bot", api_id=API_ID, api_hash=API_HASH, bot_token=ARCHIVER_BOT_TOKEN, workdir="/tmp/pyro", ) await pyro_client.start() _log("Pyrogram connected OK") _pyro_ready.set() # Stop old client only after new one is confirmed running if old_client and old_client is not pyro_client: try: await old_client.stop() except Exception: pass while True: await asyncio.sleep(30) try: await asyncio.wait_for(pyro_client.get_me(), timeout=10) except Exception as ping_err: _log(f"Pyrogram ping failed: {ping_err} — reconnecting") break except Exception as e: _log(f"Pyrogram error: {e} — retrying in 30s") _pyro_ready.clear() # Safely stop current client before reconnect attempt current = pyro_client if current: try: await current.stop() except Exception: pass await asyncio.sleep(30) pyro_loop.run_until_complete(_run()) if ARCHIVER_BOT_TOKEN: threading.Thread(target=_start_pyrogram, daemon=True).start() def _download_from_channel(channel_id: int, message_id: int, dest: str, archive_id: str) -> int: """Download media from Telegram channel via Pyrogram bot session.""" if not _pyro_ready.wait(timeout=60): raise Exception("Pyrogram not ready after 60s - check ARCHIVER_BOT_TOKEN and network") if pyro_client is None: raise Exception("Pyrogram client is None") start = time.time() last_report = [0.0] async def _dl(): nonlocal start _log(f"Getting message {message_id} from channel {channel_id}") try: msg = await asyncio.wait_for( pyro_client.get_messages(int(channel_id), message_ids=int(message_id)), timeout=30 ) except asyncio.TimeoutError: raise Exception("get_messages timed out after 30s") except Exception as e: raise Exception(f"get_messages failed: {e}") if not msg or not msg.media: raise Exception(f"No media found in message {message_id} (channel {channel_id})") media_obj = msg.video or msg.document or msg.animation or msg.video_note if not media_obj: raise Exception(f"Message {message_id} has no supported media type") total = getattr(media_obj, "file_size", 0) if not total: _log("Warning: file_size unknown, proceeding without progress %") def prog(current, _total): if time.time() - last_report[0] < 3: return last_report[0] = time.time() elapsed = time.time() - start pct = int(current * 100 / total) if total > 0 else 0 speed = current / elapsed if elapsed > 0 else 0 eta = int((total - current) / speed) if speed > 0 and total > current else 0 threading.Thread( target=_report_progress, kwargs={ "archive_id": archive_id, "status": "running", "step": "downloading", "pct": pct, "msg": f"تحميل من القناة: {current/1024/1024:.1f}/{total/1024/1024:.1f} MB @ {speed/1024/1024:.1f} MB/s", "extra": { "speed": int(speed), "downloaded_bytes": current, "total_bytes": total, "eta_seconds": eta, }, }, daemon=True, ).start() _log(f"Starting download → {dest}") try: path = await asyncio.wait_for( pyro_client.download_media(msg, file_name=dest, progress=prog), timeout=1800, # 30 min max for large files ) except asyncio.TimeoutError: raise Exception("download_media timed out after 30 minutes") if not path or not os.path.exists(str(path)): raise Exception("download_media returned None or file missing") size = os.path.getsize(str(path)) _log(f"Download complete: {size/1024/1024:.1f} MB in {time.time()-start:.0f}s") return size future = asyncio.run_coroutine_threadsafe(_dl(), pyro_loop) try: return future.result(timeout=1860) except Exception as e: future.cancel() raise def _upload_to_dataset(file_path: str, file_name: str, archive_id: str) -> dict: if not HF_TOKEN: return {"error": "HF_TOKEN not configured for this archiver"} if not DATASET_REPO: return {"error": "DATASET_NAMESPACE env var not set - cannot determine target dataset"} try: _report_progress(archive_id, "running", "uploading", 80, f"رفع الملف إلى {DATASET_REPO}...", {"speed": 0, "downloaded_bytes": 0, "total_bytes": 0, "eta_seconds": 0}) api = HfApi(token=HF_TOKEN) _log(f"Uploading {file_name} to {DATASET_REPO}/media/") # Upload directly — skip create_repo (dataset already exists, and token may lack create-repo permission) api.upload_file( path_or_fileobj=file_path, path_in_repo=f"media/{file_name}", repo_id=DATASET_REPO, repo_type="dataset", commit_message=f"archive: {file_name}", ) _log(f"Upload OK: {DATASET_REPO}/media/{file_name}") return {"status": "uploaded", "dataset": DATASET_REPO, "path": f"media/{file_name}"} except Exception as e: err = str(e) _log(f"Upload failed: {err}") # If dataset doesn't exist yet, create it then retry once if "404" in err or "Repository Not Found" in err or "does not exist" in err.lower(): _log(f"Dataset {DATASET_REPO} not found — creating and retrying...") try: api.create_repo(repo_id=DATASET_REPO, repo_type="dataset", private=True, exist_ok=True) api.upload_file( path_or_fileobj=file_path, path_in_repo=f"media/{file_name}", repo_id=DATASET_REPO, repo_type="dataset", commit_message=f"archive: {file_name}", ) _log(f"Upload OK (after create): {DATASET_REPO}/media/{file_name}") return {"status": "uploaded", "dataset": DATASET_REPO, "path": f"media/{file_name}"} except Exception as e2: return {"error": str(e2)[:300]} return {"error": err[:300]} def _upload_meta_to_dataset(meta_path: str, meta_name: str, archive_id: str) -> dict: """Upload the JSON metadata file to dataset as well.""" if not HF_TOKEN or not DATASET_REPO: return {"error": "not configured"} try: api = HfApi(token=HF_TOKEN) api.upload_file( path_or_fileobj=meta_path, path_in_repo=f"metadata/{meta_name}", repo_id=DATASET_REPO, repo_type="dataset", ) return {"status": "uploaded"} except Exception as e: return {"error": str(e)[:200]} class Handler(http.server.BaseHTTPRequestHandler): def do_GET(self): raw_path = self.path.split("?", 1)[0] if raw_path == "/health": files = list(ARCHIVE_DIR.iterdir()) self._json(200, { "status": "ok", "dataset_repo": DATASET_REPO or "NOT CONFIGURED", "dataset_namespace": DATASET_NAMESPACE or "NOT CONFIGURED", "hf_token_set": bool(HF_TOKEN), "pyrogram_ready": _pyro_ready.is_set(), "pyrogram_enabled": bool(ARCHIVER_BOT_TOKEN), "files_staged": len(files), "workspace_free_gb": round(shutil.disk_usage("/tmp").free / (1024**3), 2), }) elif raw_path.startswith("/download/"): fname = Path(self.path.split("/download/", 1)[1]).name fpath = ARCHIVE_DIR / fname if fpath.exists(): self.send_response(200) self.send_header("Content-Type", "application/octet-stream") self.send_header("Content-Disposition", f'attachment; filename="{fname}"') self.end_headers() with open(fpath, "rb") as f: shutil.copyfileobj(f, self.wfile) else: self._json(404, {"error": "File not found"}) else: self._json(404, {"error": "Not found"}) def do_OPTIONS(self): self.send_response(200) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization") self.end_headers() def do_POST(self): length = int(self.headers.get("Content-Length", 0)) body = {} if length: try: body = json.loads(self.rfile.read(length)) except Exception: self._json(400, {"error": "Invalid JSON body"}) return raw_path = self.path.split("?", 1)[0] if raw_path == "/receive": result = self._receive_async(body) self._json(202, result) elif raw_path == "/exec": cmd = body.get("cmd", "") self._json(200, {"ok": True, "cmd": cmd, "result": "exec not implemented"}) else: self._json(404, {"error": f"Unknown path: {raw_path}"}) def _receive_async(self, body: dict) -> dict: file_name = body.get("file_name", "video.mp4") # archive_id can come at top level OR inside metadata dict archive_id = ( body.get("archive_id") or body.get("metadata", {}).get("archive_id") or f"ARC_{int(time.time())}" ) metadata = body.get("metadata", {}) # channel_id MUST be int for Pyrogram raw_channel_id = body.get("channel_id", CHANNEL_ID) try: channel_id = int(raw_channel_id) except (ValueError, TypeError): channel_id = CHANNEL_ID # message_id MUST be int raw_msg_id = body.get("message_id", 0) try: message_id = int(raw_msg_id) except (ValueError, TypeError): message_id = 0 if not message_id: _log(f"[{archive_id}] ERROR: Missing message_id in receive body") return {"error": "Missing or invalid message_id - archiver cannot download without it"} if not ARCHIVER_BOT_TOKEN: _log(f"[{archive_id}] ERROR: ARCHIVER_BOT_TOKEN not set") return {"error": "ARCHIVER_BOT_TOKEN not configured on this archiver space"} if not DATASET_REPO: _log(f"[{archive_id}] ERROR: DATASET_NAMESPACE not set") return {"error": "DATASET_NAMESPACE not configured on this archiver space - cannot save files"} safe_name = Path(file_name).name dest = str(ARCHIVE_DIR / safe_name) _log(f"[{archive_id}] Queued: {safe_name} ch={channel_id} msg={message_id} → {DATASET_REPO}") _report_progress(archive_id, "accepted", "queued", 2, f"تم قبول الطلب في {DATASET_NAMESPACE} - بدء التحميل...", {"speed": 0, "downloaded_bytes": 0, "total_bytes": 0, "eta_seconds": 0}) threading.Thread( target=self._process_archive, args=(safe_name, dest, archive_id, channel_id, message_id, metadata), daemon=True, ).start() return { "status": "accepted", "archive_id": archive_id, "dataset": DATASET_REPO, "namespace": DATASET_NAMESPACE, } def _process_archive(self, safe_name: str, dest: str, archive_id: str, channel_id: int, message_id: int, metadata: dict): start_time = time.time() file_size = 0 try: _log(f"[{archive_id}] Processing: {safe_name}") _report_progress(archive_id, "running", "downloading", 5, f"بدء تحميل من القناة (ch={channel_id} msg={message_id})...", {"speed": 0, "downloaded_bytes": 0, "total_bytes": 0, "eta_seconds": 0}) file_size = _download_from_channel(channel_id, message_id, dest, archive_id) elapsed = time.time() - start_time speed = file_size / elapsed if elapsed > 0 else 0 _report_progress(archive_id, "running", "archiving", 72, f"✅ تم التحميل ({file_size/1024/1024:.1f} MB في {elapsed:.0f}s @ {speed/1024/1024:.1f} MB/s) - جارٍ الأرشفة...", {"speed": int(speed), "downloaded_bytes": file_size, "total_bytes": file_size, "eta_seconds": 0}) # Save metadata JSON next to the file meta_name = f"{safe_name}.meta.json" meta_path = str(ARCHIVE_DIR / meta_name) with open(meta_path, "w", encoding="utf-8") as f: json.dump({ "archive_id": archive_id, "file_name": safe_name, "file_size": file_size, "dataset_repo": DATASET_REPO, "dataset_namespace": DATASET_NAMESPACE, "channel_id": channel_id, "message_id": message_id, "received_at": datetime.utcnow().isoformat(), "elapsed_seconds": round(elapsed, 1), "speed_bytes_per_sec": int(speed), "metadata": metadata, }, f, ensure_ascii=False, indent=2) # Upload video to dataset upload_result = _upload_to_dataset(dest, safe_name, archive_id) if upload_result.get("status") == "uploaded": # Also upload metadata JSON _upload_meta_to_dataset(meta_path, meta_name, archive_id) _log(f"[{archive_id}] ✅ Archived: {safe_name} ({file_size/1024/1024:.1f} MB) → {DATASET_REPO}") _report_progress(archive_id, "completed", "done", 100, f"✅ تمت الأرشفة في {DATASET_REPO}/media/{safe_name}", {"speed": int(speed), "elapsed_seconds": int(elapsed), "dataset": DATASET_REPO, "file_path": f"media/{safe_name}"}) # Clean up local temp files try: os.remove(dest) os.remove(meta_path) except Exception: pass else: err = upload_result.get("error", "unknown upload error") _log(f"[{archive_id}] ❌ Upload failed: {err}") _report_progress(archive_id, "failed", "upload_error", 72, f"❌ فشل الرفع إلى {DATASET_REPO}: {err[:120]}", {"error": err}) except Exception as e: _log(f"[{archive_id}] ❌ Error processing {safe_name}: {e}") import traceback traceback.print_exc() _report_progress(archive_id, "failed", "error", 0, f"❌ خطأ: {str(e)[:150]}", {"error": str(e)[:300]}) def _json(self, code, data): self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False, default=str).encode()) def log_message(self, fmt, *args): pass if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) _log(f"Archiver v4.0 starting on :{port}") _log(f"Dataset repo : {DATASET_REPO or '⚠️ NOT CONFIGURED (set DATASET_NAMESPACE)'}") _log(f"HF token : {'✅ set' if HF_TOKEN else '❌ NOT SET'}") _log(f"Pyrogram : {'starting...' if ARCHIVER_BOT_TOKEN else '❌ ARCHIVER_BOT_TOKEN not set'}") _log(f"Channel ID : {CHANNEL_ID}") if ARCHIVER_BOT_TOKEN: _log("Waiting for Pyrogram to connect (max 60s)...") _pyro_ready.wait(timeout=60) _log(f"Pyrogram ready: {_pyro_ready.is_set()}") httpd = http.server.HTTPServer(("0.0.0.0", port), Handler) _log(f"HTTP server ready on port {port}") httpd.serve_forever()