| import os |
| import json |
| import subprocess |
| import requests |
| import gradio as gr |
| from datetime import datetime |
| from huggingface_hub import HfApi, login |
|
|
| |
| REPO_ID = "Mafia2008/Vod" |
|
|
| def extract_json_from_http_text(text: str): |
| """Extracts the JSON payload from the raw HTTP capture text.""" |
| try: |
| start_idx = text.find('{"status"') |
| if start_idx == -1: return None |
| body = text[start_idx:] |
| end_idx = body.rfind('}') |
| return json.loads(body[:end_idx+1]) if end_idx != -1 else None |
| except: |
| return None |
|
|
| def convert_and_upload_single(api, item, folder_name): |
| """Downloads/converts a single file, uploads it immediately, and cleans up.""" |
| title = item.get("title", "untitled").replace("/", "-").replace("\\", "-").strip() |
| link = item.get("link") |
| duration = item.get("duration", "0") |
| original_date = item.get("date", "") |
| |
| is_video = "VIDEO" in item.get("type", "").upper() or "video" in folder_name.lower() |
| ext = ".mp4" if is_video else ".pdf" |
| |
| local_filename = f"{title}{ext}" |
| local_path = os.path.join("/tmp", local_filename) |
| repo_path = f"{folder_name}/{local_filename}" |
| |
| |
| hf_direct_link = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{repo_path.replace(' ', '%20')}" |
|
|
| try: |
| if is_video: |
| print(f"π¬ Downloading and converting: {title}") |
| |
| command = [ |
| "ffmpeg", "-y", |
| "-user_agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", |
| "-protocol_whitelist", "file,http,https,tcp,tls,crypto", |
| "-i", link, |
| "-c", "copy", "-bsf:a", "aac_adtstoasc", |
| local_path |
| ] |
| subprocess.run(command, check=True) |
| |
| |
| if os.path.exists(local_path) and os.path.getsize(local_path) < 1024 * 1024: |
| if os.path.exists(local_path): os.remove(local_path) |
| return False, f"β Failed {title}: Output file is too small (likely blocked by server).", None |
|
|
| else: |
| print(f"π Downloading PDF: {title}") |
| with requests.get(link, stream=True) as r: |
| r.raise_for_status() |
| with open(local_path, 'wb') as f: |
| for chunk in r.iter_content(chunk_size=8192): f.write(chunk) |
|
|
| |
| api.upload_file( |
| path_or_fileobj=local_path, |
| path_in_repo=repo_path, |
| repo_id=REPO_ID, |
| repo_type="dataset" |
| ) |
| |
| |
| if os.path.exists(local_path): os.remove(local_path) |
| |
| |
| metadata = { |
| "title": title, |
| "type": "VIDEO" if is_video else "PDF", |
| "duration": duration, |
| "original_date": original_date, |
| "upload_date": datetime.utcnow().isoformat() + "Z", |
| "hf_direct_link": hf_direct_link |
| } |
| return True, f"β
Uploaded: {local_filename}", metadata |
|
|
| except Exception as e: |
| if os.path.exists(local_path): os.remove(local_path) |
| return False, f"β Failed {title}: {str(e)}", None |
|
|
| def start_process(vid_file, not_file): |
| hf_token = os.environ.get("HF_TOKEN") |
| if not hf_token: return "HF_TOKEN missing in Secrets.", "" |
| login(token=hf_token) |
| api = HfApi() |
|
|
| with open(vid_file, 'r') as f: vid_data = extract_json_from_http_text(f.read()) |
| with open(not_file, 'r') as f: not_data = extract_json_from_http_text(f.read()) |
|
|
| logs = [] |
| final_json_data = { |
| "dataset_repo": REPO_ID, |
| "last_updated": datetime.utcnow().isoformat() + "Z", |
| "videos": [], |
| "notes": [] |
| } |
|
|
| |
| if vid_data: |
| logs.append("### π₯ Processing Videos") |
| for item in vid_data.get('data', {}).get('chapters', []): |
| success, msg, meta = convert_and_upload_single(api, item, "English 12th videos") |
| logs.append(msg) |
| if success and meta: |
| final_json_data["videos"].append(meta) |
| |
| |
| if not_data: |
| logs.append("\n### π Processing Notes") |
| for item in not_data.get('data', {}).get('chapters', []): |
| success, msg, meta = convert_and_upload_single(api, item, "English 12th notes") |
| logs.append(msg) |
| if success and meta: |
| final_json_data["notes"].append(meta) |
|
|
| |
| logs.append("\n### π Generating Master JSON File") |
| json_path = "/tmp/index.json" |
| with open(json_path, 'w', encoding='utf-8') as f: |
| json.dump(final_json_data, f, indent=4) |
| |
| try: |
| api.upload_file( |
| path_or_fileobj=json_path, |
| path_in_repo="index.json", |
| repo_id=REPO_ID, |
| repo_type="dataset" |
| ) |
| logs.append("β
Success: Uploaded index.json to dataset root") |
| except Exception as e: |
| logs.append(f"β Failed to upload index.json: {str(e)}") |
| |
| if os.path.exists(json_path): os.remove(json_path) |
|
|
| return "Process Finished", "\n".join(logs) |
|
|
| |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: |
| gr.Markdown(f"# π¬ Fast Pipeline Worker for {REPO_ID}") |
| gr.Markdown("Safely converts and uploads videos one-by-one to prevent storage crashes, then generates a master `index.json`.") |
| |
| with gr.Row(): |
| v_in = gr.File(label="Upload Video TXT") |
| n_in = gr.File(label="Upload Notes TXT") |
| |
| btn = gr.Button("Start Conversion & Upload", variant="primary") |
| out = gr.Textbox(label="Status") |
| log = gr.Markdown(label="Execution Logs") |
| |
| btn.click(start_process, [v_in, n_in], [out, log]) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|