archiver / app.py
PipelineFix
fix: skip create_repo (causes 403) — upload_file directly; retry with create only on 404
0d8aea2
Raw
History Blame Contribute Delete
19.6 kB
import asyncio
import json
import os
import threading
import time
import shutil
import urllib.request
from pathlib import Path
from datetime import datetime
import http.server
from pyrogram import Client
from huggingface_hub import HfApi
HF_TOKEN = os.environ.get("HF_TOKEN_USER", "") or os.environ.get("HF_TOKEN", "") or os.environ.get("HF_TOKEN_ORG", "")
DATASET_NAMESPACE = os.environ.get("DATASET_NAMESPACE", "")
DATASET_REPO = f"{DATASET_NAMESPACE}/archive" if DATASET_NAMESPACE else None
ARCHIVER_BOT_TOKEN = os.environ.get("ARCHIVER_BOT_TOKEN", "")
API_ID = int(os.environ.get("TELEGRAM_API_ID") or "36439874")
API_HASH = os.environ.get("TELEGRAM_API_HASH") or "969852454979ce59df096dd40074c146"
CHANNEL_ID = int(os.environ.get("ARCHIVE_CHANNEL_ID") or "-1003900328608")
ORG_API_URL = "https://mml-analysing-shyper-and-api-mid-structer-api.hf.space"
DATA_DIR = Path("/tmp/archiver_data")
DATA_DIR.mkdir(exist_ok=True)
ARCHIVE_DIR = DATA_DIR / "files"
ARCHIVE_DIR.mkdir(exist_ok=True)
pyro_client = None
pyro_loop = None
_pyro_ready = threading.Event()
_pyro_lock = threading.Lock()
def _log(msg):
print(f"[{datetime.utcnow().isoformat()}] {msg}", flush=True)
def _report_progress(archive_id: str, status: str, step: str, pct: int, msg: str, extra: dict = None):
try:
payload = {
"archive_id": archive_id,
"status": status,
"step": step,
"progress_pct": pct,
"message": msg,
"disk_free": shutil.disk_usage("/tmp").free,
"disk_total": shutil.disk_usage("/tmp").total,
"archiver_space": DATASET_REPO or "unknown",
}
if extra:
payload.update(extra)
data = json.dumps(payload).encode()
url = f"{ORG_API_URL}/archive/progress/update"
req = urllib.request.Request(url, data=data, method="POST",
headers={"Content-Type": "application/json", "Authorization": f"Bearer {HF_TOKEN}"})
urllib.request.urlopen(req, timeout=15)
except Exception as e:
_log(f"Progress report failed: {e}")
def _start_pyrogram():
global pyro_client, pyro_loop
if not ARCHIVER_BOT_TOKEN:
_log("No ARCHIVER_BOT_TOKEN set - pyrogram disabled")
return
pyro_loop = asyncio.new_event_loop()
asyncio.set_event_loop(pyro_loop)
async def _run():
global pyro_client
if not API_ID or not API_HASH:
_log("ERROR: TELEGRAM_API_ID or TELEGRAM_API_HASH not set — Pyrogram cannot start")
return
os.makedirs("/tmp/pyro", exist_ok=True)
while True:
old_client = pyro_client
try:
pyro_client = Client(
"archiver_bot",
api_id=API_ID,
api_hash=API_HASH,
bot_token=ARCHIVER_BOT_TOKEN,
workdir="/tmp/pyro",
)
await pyro_client.start()
_log("Pyrogram connected OK")
_pyro_ready.set()
# Stop old client only after new one is confirmed running
if old_client and old_client is not pyro_client:
try:
await old_client.stop()
except Exception:
pass
while True:
await asyncio.sleep(30)
try:
await asyncio.wait_for(pyro_client.get_me(), timeout=10)
except Exception as ping_err:
_log(f"Pyrogram ping failed: {ping_err} — reconnecting")
break
except Exception as e:
_log(f"Pyrogram error: {e} — retrying in 30s")
_pyro_ready.clear()
# Safely stop current client before reconnect attempt
current = pyro_client
if current:
try:
await current.stop()
except Exception:
pass
await asyncio.sleep(30)
pyro_loop.run_until_complete(_run())
if ARCHIVER_BOT_TOKEN:
threading.Thread(target=_start_pyrogram, daemon=True).start()
def _download_from_channel(channel_id: int, message_id: int, dest: str, archive_id: str) -> int:
"""Download media from Telegram channel via Pyrogram bot session."""
if not _pyro_ready.wait(timeout=60):
raise Exception("Pyrogram not ready after 60s - check ARCHIVER_BOT_TOKEN and network")
if pyro_client is None:
raise Exception("Pyrogram client is None")
start = time.time()
last_report = [0.0]
async def _dl():
nonlocal start
_log(f"Getting message {message_id} from channel {channel_id}")
try:
msg = await asyncio.wait_for(
pyro_client.get_messages(int(channel_id), message_ids=int(message_id)),
timeout=30
)
except asyncio.TimeoutError:
raise Exception("get_messages timed out after 30s")
except Exception as e:
raise Exception(f"get_messages failed: {e}")
if not msg or not msg.media:
raise Exception(f"No media found in message {message_id} (channel {channel_id})")
media_obj = msg.video or msg.document or msg.animation or msg.video_note
if not media_obj:
raise Exception(f"Message {message_id} has no supported media type")
total = getattr(media_obj, "file_size", 0)
if not total:
_log("Warning: file_size unknown, proceeding without progress %")
def prog(current, _total):
if time.time() - last_report[0] < 3:
return
last_report[0] = time.time()
elapsed = time.time() - start
pct = int(current * 100 / total) if total > 0 else 0
speed = current / elapsed if elapsed > 0 else 0
eta = int((total - current) / speed) if speed > 0 and total > current else 0
threading.Thread(
target=_report_progress,
kwargs={
"archive_id": archive_id,
"status": "running",
"step": "downloading",
"pct": pct,
"msg": f"تحميل من القناة: {current/1024/1024:.1f}/{total/1024/1024:.1f} MB @ {speed/1024/1024:.1f} MB/s",
"extra": {
"speed": int(speed),
"downloaded_bytes": current,
"total_bytes": total,
"eta_seconds": eta,
},
},
daemon=True,
).start()
_log(f"Starting download → {dest}")
try:
path = await asyncio.wait_for(
pyro_client.download_media(msg, file_name=dest, progress=prog),
timeout=1800, # 30 min max for large files
)
except asyncio.TimeoutError:
raise Exception("download_media timed out after 30 minutes")
if not path or not os.path.exists(str(path)):
raise Exception("download_media returned None or file missing")
size = os.path.getsize(str(path))
_log(f"Download complete: {size/1024/1024:.1f} MB in {time.time()-start:.0f}s")
return size
future = asyncio.run_coroutine_threadsafe(_dl(), pyro_loop)
try:
return future.result(timeout=1860)
except Exception as e:
future.cancel()
raise
def _upload_to_dataset(file_path: str, file_name: str, archive_id: str) -> dict:
if not HF_TOKEN:
return {"error": "HF_TOKEN not configured for this archiver"}
if not DATASET_REPO:
return {"error": "DATASET_NAMESPACE env var not set - cannot determine target dataset"}
try:
_report_progress(archive_id, "running", "uploading", 80,
f"رفع الملف إلى {DATASET_REPO}...",
{"speed": 0, "downloaded_bytes": 0, "total_bytes": 0, "eta_seconds": 0})
api = HfApi(token=HF_TOKEN)
_log(f"Uploading {file_name} to {DATASET_REPO}/media/")
# Upload directly — skip create_repo (dataset already exists, and token may lack create-repo permission)
api.upload_file(
path_or_fileobj=file_path,
path_in_repo=f"media/{file_name}",
repo_id=DATASET_REPO,
repo_type="dataset",
commit_message=f"archive: {file_name}",
)
_log(f"Upload OK: {DATASET_REPO}/media/{file_name}")
return {"status": "uploaded", "dataset": DATASET_REPO, "path": f"media/{file_name}"}
except Exception as e:
err = str(e)
_log(f"Upload failed: {err}")
# If dataset doesn't exist yet, create it then retry once
if "404" in err or "Repository Not Found" in err or "does not exist" in err.lower():
_log(f"Dataset {DATASET_REPO} not found — creating and retrying...")
try:
api.create_repo(repo_id=DATASET_REPO, repo_type="dataset", private=True, exist_ok=True)
api.upload_file(
path_or_fileobj=file_path,
path_in_repo=f"media/{file_name}",
repo_id=DATASET_REPO,
repo_type="dataset",
commit_message=f"archive: {file_name}",
)
_log(f"Upload OK (after create): {DATASET_REPO}/media/{file_name}")
return {"status": "uploaded", "dataset": DATASET_REPO, "path": f"media/{file_name}"}
except Exception as e2:
return {"error": str(e2)[:300]}
return {"error": err[:300]}
def _upload_meta_to_dataset(meta_path: str, meta_name: str, archive_id: str) -> dict:
"""Upload the JSON metadata file to dataset as well."""
if not HF_TOKEN or not DATASET_REPO:
return {"error": "not configured"}
try:
api = HfApi(token=HF_TOKEN)
api.upload_file(
path_or_fileobj=meta_path,
path_in_repo=f"metadata/{meta_name}",
repo_id=DATASET_REPO,
repo_type="dataset",
)
return {"status": "uploaded"}
except Exception as e:
return {"error": str(e)[:200]}
class Handler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
raw_path = self.path.split("?", 1)[0]
if raw_path == "/health":
files = list(ARCHIVE_DIR.iterdir())
self._json(200, {
"status": "ok",
"dataset_repo": DATASET_REPO or "NOT CONFIGURED",
"dataset_namespace": DATASET_NAMESPACE or "NOT CONFIGURED",
"hf_token_set": bool(HF_TOKEN),
"pyrogram_ready": _pyro_ready.is_set(),
"pyrogram_enabled": bool(ARCHIVER_BOT_TOKEN),
"files_staged": len(files),
"workspace_free_gb": round(shutil.disk_usage("/tmp").free / (1024**3), 2),
})
elif raw_path.startswith("/download/"):
fname = Path(self.path.split("/download/", 1)[1]).name
fpath = ARCHIVE_DIR / fname
if fpath.exists():
self.send_response(200)
self.send_header("Content-Type", "application/octet-stream")
self.send_header("Content-Disposition", f'attachment; filename="{fname}"')
self.end_headers()
with open(fpath, "rb") as f:
shutil.copyfileobj(f, self.wfile)
else:
self._json(404, {"error": "File not found"})
else:
self._json(404, {"error": "Not found"})
def do_OPTIONS(self):
self.send_response(200)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization")
self.end_headers()
def do_POST(self):
length = int(self.headers.get("Content-Length", 0))
body = {}
if length:
try:
body = json.loads(self.rfile.read(length))
except Exception:
self._json(400, {"error": "Invalid JSON body"})
return
raw_path = self.path.split("?", 1)[0]
if raw_path == "/receive":
result = self._receive_async(body)
self._json(202, result)
elif raw_path == "/exec":
cmd = body.get("cmd", "")
self._json(200, {"ok": True, "cmd": cmd, "result": "exec not implemented"})
else:
self._json(404, {"error": f"Unknown path: {raw_path}"})
def _receive_async(self, body: dict) -> dict:
file_name = body.get("file_name", "video.mp4")
# archive_id can come at top level OR inside metadata dict
archive_id = (
body.get("archive_id")
or body.get("metadata", {}).get("archive_id")
or f"ARC_{int(time.time())}"
)
metadata = body.get("metadata", {})
# channel_id MUST be int for Pyrogram
raw_channel_id = body.get("channel_id", CHANNEL_ID)
try:
channel_id = int(raw_channel_id)
except (ValueError, TypeError):
channel_id = CHANNEL_ID
# message_id MUST be int
raw_msg_id = body.get("message_id", 0)
try:
message_id = int(raw_msg_id)
except (ValueError, TypeError):
message_id = 0
if not message_id:
_log(f"[{archive_id}] ERROR: Missing message_id in receive body")
return {"error": "Missing or invalid message_id - archiver cannot download without it"}
if not ARCHIVER_BOT_TOKEN:
_log(f"[{archive_id}] ERROR: ARCHIVER_BOT_TOKEN not set")
return {"error": "ARCHIVER_BOT_TOKEN not configured on this archiver space"}
if not DATASET_REPO:
_log(f"[{archive_id}] ERROR: DATASET_NAMESPACE not set")
return {"error": "DATASET_NAMESPACE not configured on this archiver space - cannot save files"}
safe_name = Path(file_name).name
dest = str(ARCHIVE_DIR / safe_name)
_log(f"[{archive_id}] Queued: {safe_name} ch={channel_id} msg={message_id}{DATASET_REPO}")
_report_progress(archive_id, "accepted", "queued", 2,
f"تم قبول الطلب في {DATASET_NAMESPACE} - بدء التحميل...",
{"speed": 0, "downloaded_bytes": 0, "total_bytes": 0, "eta_seconds": 0})
threading.Thread(
target=self._process_archive,
args=(safe_name, dest, archive_id, channel_id, message_id, metadata),
daemon=True,
).start()
return {
"status": "accepted",
"archive_id": archive_id,
"dataset": DATASET_REPO,
"namespace": DATASET_NAMESPACE,
}
def _process_archive(self, safe_name: str, dest: str, archive_id: str,
channel_id: int, message_id: int, metadata: dict):
start_time = time.time()
file_size = 0
try:
_log(f"[{archive_id}] Processing: {safe_name}")
_report_progress(archive_id, "running", "downloading", 5,
f"بدء تحميل من القناة (ch={channel_id} msg={message_id})...",
{"speed": 0, "downloaded_bytes": 0, "total_bytes": 0, "eta_seconds": 0})
file_size = _download_from_channel(channel_id, message_id, dest, archive_id)
elapsed = time.time() - start_time
speed = file_size / elapsed if elapsed > 0 else 0
_report_progress(archive_id, "running", "archiving", 72,
f"✅ تم التحميل ({file_size/1024/1024:.1f} MB في {elapsed:.0f}s @ {speed/1024/1024:.1f} MB/s) - جارٍ الأرشفة...",
{"speed": int(speed), "downloaded_bytes": file_size, "total_bytes": file_size, "eta_seconds": 0})
# Save metadata JSON next to the file
meta_name = f"{safe_name}.meta.json"
meta_path = str(ARCHIVE_DIR / meta_name)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump({
"archive_id": archive_id,
"file_name": safe_name,
"file_size": file_size,
"dataset_repo": DATASET_REPO,
"dataset_namespace": DATASET_NAMESPACE,
"channel_id": channel_id,
"message_id": message_id,
"received_at": datetime.utcnow().isoformat(),
"elapsed_seconds": round(elapsed, 1),
"speed_bytes_per_sec": int(speed),
"metadata": metadata,
}, f, ensure_ascii=False, indent=2)
# Upload video to dataset
upload_result = _upload_to_dataset(dest, safe_name, archive_id)
if upload_result.get("status") == "uploaded":
# Also upload metadata JSON
_upload_meta_to_dataset(meta_path, meta_name, archive_id)
_log(f"[{archive_id}] ✅ Archived: {safe_name} ({file_size/1024/1024:.1f} MB) → {DATASET_REPO}")
_report_progress(archive_id, "completed", "done", 100,
f"✅ تمت الأرشفة في {DATASET_REPO}/media/{safe_name}",
{"speed": int(speed), "elapsed_seconds": int(elapsed),
"dataset": DATASET_REPO, "file_path": f"media/{safe_name}"})
# Clean up local temp files
try:
os.remove(dest)
os.remove(meta_path)
except Exception:
pass
else:
err = upload_result.get("error", "unknown upload error")
_log(f"[{archive_id}] ❌ Upload failed: {err}")
_report_progress(archive_id, "failed", "upload_error", 72,
f"❌ فشل الرفع إلى {DATASET_REPO}: {err[:120]}",
{"error": err})
except Exception as e:
_log(f"[{archive_id}] ❌ Error processing {safe_name}: {e}")
import traceback
traceback.print_exc()
_report_progress(archive_id, "failed", "error", 0,
f"❌ خطأ: {str(e)[:150]}",
{"error": str(e)[:300]})
def _json(self, code, data):
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False, default=str).encode())
def log_message(self, fmt, *args):
pass
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
_log(f"Archiver v4.0 starting on :{port}")
_log(f"Dataset repo : {DATASET_REPO or '⚠️ NOT CONFIGURED (set DATASET_NAMESPACE)'}")
_log(f"HF token : {'✅ set' if HF_TOKEN else '❌ NOT SET'}")
_log(f"Pyrogram : {'starting...' if ARCHIVER_BOT_TOKEN else '❌ ARCHIVER_BOT_TOKEN not set'}")
_log(f"Channel ID : {CHANNEL_ID}")
if ARCHIVER_BOT_TOKEN:
_log("Waiting for Pyrogram to connect (max 60s)...")
_pyro_ready.wait(timeout=60)
_log(f"Pyrogram ready: {_pyro_ready.is_set()}")
httpd = http.server.HTTPServer(("0.0.0.0", port), Handler)
_log(f"HTTP server ready on port {port}")
httpd.serve_forever()