Spaces:
Sleeping
Sleeping
PipelineFix
fix: skip create_repo (causes 403) — upload_file directly; retry with create only on 404
0d8aea2 | import asyncio | |
| import json | |
| import os | |
| import threading | |
| import time | |
| import shutil | |
| import urllib.request | |
| from pathlib import Path | |
| from datetime import datetime | |
| import http.server | |
| from pyrogram import Client | |
| from huggingface_hub import HfApi | |
| HF_TOKEN = os.environ.get("HF_TOKEN_USER", "") or os.environ.get("HF_TOKEN", "") or os.environ.get("HF_TOKEN_ORG", "") | |
| DATASET_NAMESPACE = os.environ.get("DATASET_NAMESPACE", "") | |
| DATASET_REPO = f"{DATASET_NAMESPACE}/archive" if DATASET_NAMESPACE else None | |
| ARCHIVER_BOT_TOKEN = os.environ.get("ARCHIVER_BOT_TOKEN", "") | |
| API_ID = int(os.environ.get("TELEGRAM_API_ID") or "36439874") | |
| API_HASH = os.environ.get("TELEGRAM_API_HASH") or "969852454979ce59df096dd40074c146" | |
| CHANNEL_ID = int(os.environ.get("ARCHIVE_CHANNEL_ID") or "-1003900328608") | |
| ORG_API_URL = "https://mml-analysing-shyper-and-api-mid-structer-api.hf.space" | |
| DATA_DIR = Path("/tmp/archiver_data") | |
| DATA_DIR.mkdir(exist_ok=True) | |
| ARCHIVE_DIR = DATA_DIR / "files" | |
| ARCHIVE_DIR.mkdir(exist_ok=True) | |
| pyro_client = None | |
| pyro_loop = None | |
| _pyro_ready = threading.Event() | |
| _pyro_lock = threading.Lock() | |
| def _log(msg): | |
| print(f"[{datetime.utcnow().isoformat()}] {msg}", flush=True) | |
| def _report_progress(archive_id: str, status: str, step: str, pct: int, msg: str, extra: dict = None): | |
| try: | |
| payload = { | |
| "archive_id": archive_id, | |
| "status": status, | |
| "step": step, | |
| "progress_pct": pct, | |
| "message": msg, | |
| "disk_free": shutil.disk_usage("/tmp").free, | |
| "disk_total": shutil.disk_usage("/tmp").total, | |
| "archiver_space": DATASET_REPO or "unknown", | |
| } | |
| if extra: | |
| payload.update(extra) | |
| data = json.dumps(payload).encode() | |
| url = f"{ORG_API_URL}/archive/progress/update" | |
| req = urllib.request.Request(url, data=data, method="POST", | |
| headers={"Content-Type": "application/json", "Authorization": f"Bearer {HF_TOKEN}"}) | |
| urllib.request.urlopen(req, timeout=15) | |
| except Exception as e: | |
| _log(f"Progress report failed: {e}") | |
| def _start_pyrogram(): | |
| global pyro_client, pyro_loop | |
| if not ARCHIVER_BOT_TOKEN: | |
| _log("No ARCHIVER_BOT_TOKEN set - pyrogram disabled") | |
| return | |
| pyro_loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(pyro_loop) | |
| async def _run(): | |
| global pyro_client | |
| if not API_ID or not API_HASH: | |
| _log("ERROR: TELEGRAM_API_ID or TELEGRAM_API_HASH not set — Pyrogram cannot start") | |
| return | |
| os.makedirs("/tmp/pyro", exist_ok=True) | |
| while True: | |
| old_client = pyro_client | |
| try: | |
| pyro_client = Client( | |
| "archiver_bot", | |
| api_id=API_ID, | |
| api_hash=API_HASH, | |
| bot_token=ARCHIVER_BOT_TOKEN, | |
| workdir="/tmp/pyro", | |
| ) | |
| await pyro_client.start() | |
| _log("Pyrogram connected OK") | |
| _pyro_ready.set() | |
| # Stop old client only after new one is confirmed running | |
| if old_client and old_client is not pyro_client: | |
| try: | |
| await old_client.stop() | |
| except Exception: | |
| pass | |
| while True: | |
| await asyncio.sleep(30) | |
| try: | |
| await asyncio.wait_for(pyro_client.get_me(), timeout=10) | |
| except Exception as ping_err: | |
| _log(f"Pyrogram ping failed: {ping_err} — reconnecting") | |
| break | |
| except Exception as e: | |
| _log(f"Pyrogram error: {e} — retrying in 30s") | |
| _pyro_ready.clear() | |
| # Safely stop current client before reconnect attempt | |
| current = pyro_client | |
| if current: | |
| try: | |
| await current.stop() | |
| except Exception: | |
| pass | |
| await asyncio.sleep(30) | |
| pyro_loop.run_until_complete(_run()) | |
| if ARCHIVER_BOT_TOKEN: | |
| threading.Thread(target=_start_pyrogram, daemon=True).start() | |
| def _download_from_channel(channel_id: int, message_id: int, dest: str, archive_id: str) -> int: | |
| """Download media from Telegram channel via Pyrogram bot session.""" | |
| if not _pyro_ready.wait(timeout=60): | |
| raise Exception("Pyrogram not ready after 60s - check ARCHIVER_BOT_TOKEN and network") | |
| if pyro_client is None: | |
| raise Exception("Pyrogram client is None") | |
| start = time.time() | |
| last_report = [0.0] | |
| async def _dl(): | |
| nonlocal start | |
| _log(f"Getting message {message_id} from channel {channel_id}") | |
| try: | |
| msg = await asyncio.wait_for( | |
| pyro_client.get_messages(int(channel_id), message_ids=int(message_id)), | |
| timeout=30 | |
| ) | |
| except asyncio.TimeoutError: | |
| raise Exception("get_messages timed out after 30s") | |
| except Exception as e: | |
| raise Exception(f"get_messages failed: {e}") | |
| if not msg or not msg.media: | |
| raise Exception(f"No media found in message {message_id} (channel {channel_id})") | |
| media_obj = msg.video or msg.document or msg.animation or msg.video_note | |
| if not media_obj: | |
| raise Exception(f"Message {message_id} has no supported media type") | |
| total = getattr(media_obj, "file_size", 0) | |
| if not total: | |
| _log("Warning: file_size unknown, proceeding without progress %") | |
| def prog(current, _total): | |
| if time.time() - last_report[0] < 3: | |
| return | |
| last_report[0] = time.time() | |
| elapsed = time.time() - start | |
| pct = int(current * 100 / total) if total > 0 else 0 | |
| speed = current / elapsed if elapsed > 0 else 0 | |
| eta = int((total - current) / speed) if speed > 0 and total > current else 0 | |
| threading.Thread( | |
| target=_report_progress, | |
| kwargs={ | |
| "archive_id": archive_id, | |
| "status": "running", | |
| "step": "downloading", | |
| "pct": pct, | |
| "msg": f"تحميل من القناة: {current/1024/1024:.1f}/{total/1024/1024:.1f} MB @ {speed/1024/1024:.1f} MB/s", | |
| "extra": { | |
| "speed": int(speed), | |
| "downloaded_bytes": current, | |
| "total_bytes": total, | |
| "eta_seconds": eta, | |
| }, | |
| }, | |
| daemon=True, | |
| ).start() | |
| _log(f"Starting download → {dest}") | |
| try: | |
| path = await asyncio.wait_for( | |
| pyro_client.download_media(msg, file_name=dest, progress=prog), | |
| timeout=1800, # 30 min max for large files | |
| ) | |
| except asyncio.TimeoutError: | |
| raise Exception("download_media timed out after 30 minutes") | |
| if not path or not os.path.exists(str(path)): | |
| raise Exception("download_media returned None or file missing") | |
| size = os.path.getsize(str(path)) | |
| _log(f"Download complete: {size/1024/1024:.1f} MB in {time.time()-start:.0f}s") | |
| return size | |
| future = asyncio.run_coroutine_threadsafe(_dl(), pyro_loop) | |
| try: | |
| return future.result(timeout=1860) | |
| except Exception as e: | |
| future.cancel() | |
| raise | |
| def _upload_to_dataset(file_path: str, file_name: str, archive_id: str) -> dict: | |
| if not HF_TOKEN: | |
| return {"error": "HF_TOKEN not configured for this archiver"} | |
| if not DATASET_REPO: | |
| return {"error": "DATASET_NAMESPACE env var not set - cannot determine target dataset"} | |
| try: | |
| _report_progress(archive_id, "running", "uploading", 80, | |
| f"رفع الملف إلى {DATASET_REPO}...", | |
| {"speed": 0, "downloaded_bytes": 0, "total_bytes": 0, "eta_seconds": 0}) | |
| api = HfApi(token=HF_TOKEN) | |
| _log(f"Uploading {file_name} to {DATASET_REPO}/media/") | |
| # Upload directly — skip create_repo (dataset already exists, and token may lack create-repo permission) | |
| api.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=f"media/{file_name}", | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| commit_message=f"archive: {file_name}", | |
| ) | |
| _log(f"Upload OK: {DATASET_REPO}/media/{file_name}") | |
| return {"status": "uploaded", "dataset": DATASET_REPO, "path": f"media/{file_name}"} | |
| except Exception as e: | |
| err = str(e) | |
| _log(f"Upload failed: {err}") | |
| # If dataset doesn't exist yet, create it then retry once | |
| if "404" in err or "Repository Not Found" in err or "does not exist" in err.lower(): | |
| _log(f"Dataset {DATASET_REPO} not found — creating and retrying...") | |
| try: | |
| api.create_repo(repo_id=DATASET_REPO, repo_type="dataset", private=True, exist_ok=True) | |
| api.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=f"media/{file_name}", | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| commit_message=f"archive: {file_name}", | |
| ) | |
| _log(f"Upload OK (after create): {DATASET_REPO}/media/{file_name}") | |
| return {"status": "uploaded", "dataset": DATASET_REPO, "path": f"media/{file_name}"} | |
| except Exception as e2: | |
| return {"error": str(e2)[:300]} | |
| return {"error": err[:300]} | |
| def _upload_meta_to_dataset(meta_path: str, meta_name: str, archive_id: str) -> dict: | |
| """Upload the JSON metadata file to dataset as well.""" | |
| if not HF_TOKEN or not DATASET_REPO: | |
| return {"error": "not configured"} | |
| try: | |
| api = HfApi(token=HF_TOKEN) | |
| api.upload_file( | |
| path_or_fileobj=meta_path, | |
| path_in_repo=f"metadata/{meta_name}", | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| ) | |
| return {"status": "uploaded"} | |
| except Exception as e: | |
| return {"error": str(e)[:200]} | |
| class Handler(http.server.BaseHTTPRequestHandler): | |
| def do_GET(self): | |
| raw_path = self.path.split("?", 1)[0] | |
| if raw_path == "/health": | |
| files = list(ARCHIVE_DIR.iterdir()) | |
| self._json(200, { | |
| "status": "ok", | |
| "dataset_repo": DATASET_REPO or "NOT CONFIGURED", | |
| "dataset_namespace": DATASET_NAMESPACE or "NOT CONFIGURED", | |
| "hf_token_set": bool(HF_TOKEN), | |
| "pyrogram_ready": _pyro_ready.is_set(), | |
| "pyrogram_enabled": bool(ARCHIVER_BOT_TOKEN), | |
| "files_staged": len(files), | |
| "workspace_free_gb": round(shutil.disk_usage("/tmp").free / (1024**3), 2), | |
| }) | |
| elif raw_path.startswith("/download/"): | |
| fname = Path(self.path.split("/download/", 1)[1]).name | |
| fpath = ARCHIVE_DIR / fname | |
| if fpath.exists(): | |
| self.send_response(200) | |
| self.send_header("Content-Type", "application/octet-stream") | |
| self.send_header("Content-Disposition", f'attachment; filename="{fname}"') | |
| self.end_headers() | |
| with open(fpath, "rb") as f: | |
| shutil.copyfileobj(f, self.wfile) | |
| else: | |
| self._json(404, {"error": "File not found"}) | |
| else: | |
| self._json(404, {"error": "Not found"}) | |
| def do_OPTIONS(self): | |
| self.send_response(200) | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") | |
| self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization") | |
| self.end_headers() | |
| def do_POST(self): | |
| length = int(self.headers.get("Content-Length", 0)) | |
| body = {} | |
| if length: | |
| try: | |
| body = json.loads(self.rfile.read(length)) | |
| except Exception: | |
| self._json(400, {"error": "Invalid JSON body"}) | |
| return | |
| raw_path = self.path.split("?", 1)[0] | |
| if raw_path == "/receive": | |
| result = self._receive_async(body) | |
| self._json(202, result) | |
| elif raw_path == "/exec": | |
| cmd = body.get("cmd", "") | |
| self._json(200, {"ok": True, "cmd": cmd, "result": "exec not implemented"}) | |
| else: | |
| self._json(404, {"error": f"Unknown path: {raw_path}"}) | |
| def _receive_async(self, body: dict) -> dict: | |
| file_name = body.get("file_name", "video.mp4") | |
| # archive_id can come at top level OR inside metadata dict | |
| archive_id = ( | |
| body.get("archive_id") | |
| or body.get("metadata", {}).get("archive_id") | |
| or f"ARC_{int(time.time())}" | |
| ) | |
| metadata = body.get("metadata", {}) | |
| # channel_id MUST be int for Pyrogram | |
| raw_channel_id = body.get("channel_id", CHANNEL_ID) | |
| try: | |
| channel_id = int(raw_channel_id) | |
| except (ValueError, TypeError): | |
| channel_id = CHANNEL_ID | |
| # message_id MUST be int | |
| raw_msg_id = body.get("message_id", 0) | |
| try: | |
| message_id = int(raw_msg_id) | |
| except (ValueError, TypeError): | |
| message_id = 0 | |
| if not message_id: | |
| _log(f"[{archive_id}] ERROR: Missing message_id in receive body") | |
| return {"error": "Missing or invalid message_id - archiver cannot download without it"} | |
| if not ARCHIVER_BOT_TOKEN: | |
| _log(f"[{archive_id}] ERROR: ARCHIVER_BOT_TOKEN not set") | |
| return {"error": "ARCHIVER_BOT_TOKEN not configured on this archiver space"} | |
| if not DATASET_REPO: | |
| _log(f"[{archive_id}] ERROR: DATASET_NAMESPACE not set") | |
| return {"error": "DATASET_NAMESPACE not configured on this archiver space - cannot save files"} | |
| safe_name = Path(file_name).name | |
| dest = str(ARCHIVE_DIR / safe_name) | |
| _log(f"[{archive_id}] Queued: {safe_name} ch={channel_id} msg={message_id} → {DATASET_REPO}") | |
| _report_progress(archive_id, "accepted", "queued", 2, | |
| f"تم قبول الطلب في {DATASET_NAMESPACE} - بدء التحميل...", | |
| {"speed": 0, "downloaded_bytes": 0, "total_bytes": 0, "eta_seconds": 0}) | |
| threading.Thread( | |
| target=self._process_archive, | |
| args=(safe_name, dest, archive_id, channel_id, message_id, metadata), | |
| daemon=True, | |
| ).start() | |
| return { | |
| "status": "accepted", | |
| "archive_id": archive_id, | |
| "dataset": DATASET_REPO, | |
| "namespace": DATASET_NAMESPACE, | |
| } | |
| def _process_archive(self, safe_name: str, dest: str, archive_id: str, | |
| channel_id: int, message_id: int, metadata: dict): | |
| start_time = time.time() | |
| file_size = 0 | |
| try: | |
| _log(f"[{archive_id}] Processing: {safe_name}") | |
| _report_progress(archive_id, "running", "downloading", 5, | |
| f"بدء تحميل من القناة (ch={channel_id} msg={message_id})...", | |
| {"speed": 0, "downloaded_bytes": 0, "total_bytes": 0, "eta_seconds": 0}) | |
| file_size = _download_from_channel(channel_id, message_id, dest, archive_id) | |
| elapsed = time.time() - start_time | |
| speed = file_size / elapsed if elapsed > 0 else 0 | |
| _report_progress(archive_id, "running", "archiving", 72, | |
| f"✅ تم التحميل ({file_size/1024/1024:.1f} MB في {elapsed:.0f}s @ {speed/1024/1024:.1f} MB/s) - جارٍ الأرشفة...", | |
| {"speed": int(speed), "downloaded_bytes": file_size, "total_bytes": file_size, "eta_seconds": 0}) | |
| # Save metadata JSON next to the file | |
| meta_name = f"{safe_name}.meta.json" | |
| meta_path = str(ARCHIVE_DIR / meta_name) | |
| with open(meta_path, "w", encoding="utf-8") as f: | |
| json.dump({ | |
| "archive_id": archive_id, | |
| "file_name": safe_name, | |
| "file_size": file_size, | |
| "dataset_repo": DATASET_REPO, | |
| "dataset_namespace": DATASET_NAMESPACE, | |
| "channel_id": channel_id, | |
| "message_id": message_id, | |
| "received_at": datetime.utcnow().isoformat(), | |
| "elapsed_seconds": round(elapsed, 1), | |
| "speed_bytes_per_sec": int(speed), | |
| "metadata": metadata, | |
| }, f, ensure_ascii=False, indent=2) | |
| # Upload video to dataset | |
| upload_result = _upload_to_dataset(dest, safe_name, archive_id) | |
| if upload_result.get("status") == "uploaded": | |
| # Also upload metadata JSON | |
| _upload_meta_to_dataset(meta_path, meta_name, archive_id) | |
| _log(f"[{archive_id}] ✅ Archived: {safe_name} ({file_size/1024/1024:.1f} MB) → {DATASET_REPO}") | |
| _report_progress(archive_id, "completed", "done", 100, | |
| f"✅ تمت الأرشفة في {DATASET_REPO}/media/{safe_name}", | |
| {"speed": int(speed), "elapsed_seconds": int(elapsed), | |
| "dataset": DATASET_REPO, "file_path": f"media/{safe_name}"}) | |
| # Clean up local temp files | |
| try: | |
| os.remove(dest) | |
| os.remove(meta_path) | |
| except Exception: | |
| pass | |
| else: | |
| err = upload_result.get("error", "unknown upload error") | |
| _log(f"[{archive_id}] ❌ Upload failed: {err}") | |
| _report_progress(archive_id, "failed", "upload_error", 72, | |
| f"❌ فشل الرفع إلى {DATASET_REPO}: {err[:120]}", | |
| {"error": err}) | |
| except Exception as e: | |
| _log(f"[{archive_id}] ❌ Error processing {safe_name}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| _report_progress(archive_id, "failed", "error", 0, | |
| f"❌ خطأ: {str(e)[:150]}", | |
| {"error": str(e)[:300]}) | |
| def _json(self, code, data): | |
| self.send_response(code) | |
| self.send_header("Content-Type", "application/json") | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.end_headers() | |
| self.wfile.write(json.dumps(data, ensure_ascii=False, default=str).encode()) | |
| def log_message(self, fmt, *args): | |
| pass | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| _log(f"Archiver v4.0 starting on :{port}") | |
| _log(f"Dataset repo : {DATASET_REPO or '⚠️ NOT CONFIGURED (set DATASET_NAMESPACE)'}") | |
| _log(f"HF token : {'✅ set' if HF_TOKEN else '❌ NOT SET'}") | |
| _log(f"Pyrogram : {'starting...' if ARCHIVER_BOT_TOKEN else '❌ ARCHIVER_BOT_TOKEN not set'}") | |
| _log(f"Channel ID : {CHANNEL_ID}") | |
| if ARCHIVER_BOT_TOKEN: | |
| _log("Waiting for Pyrogram to connect (max 60s)...") | |
| _pyro_ready.wait(timeout=60) | |
| _log(f"Pyrogram ready: {_pyro_ready.is_set()}") | |
| httpd = http.server.HTTPServer(("0.0.0.0", port), Handler) | |
| _log(f"HTTP server ready on port {port}") | |
| httpd.serve_forever() | |