Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from internetarchive import search_items, get_item | |
| import requests | |
| import time | |
| import subprocess | |
| import json | |
| import re | |
| from bs4 import BeautifulSoup | |
| from requests.exceptions import ReadTimeout | |
| # --- News-station filter --- | |
| NEWS_FILTER = [ | |
| r"\bcnn\b", r"\bfox\b", r"\bmsnbc\b", r"\bbbc\b", r"\breuters\b", | |
| r"\bal jazeera\b", r"\bbloomberg\b", r"\bsky news\b", r"\bcnbc\b", | |
| r"\babc news\b", r"\bnbc\b", r"\bcbs\b", r"\bpbs\b", r"\bnewsweek\b", | |
| r"\bthe guardian\b", r"\bvice news\b", r"\bpolitico\b", r"\bwashington post\b", | |
| r"\bnew york times\b", r"\bforbes\b", r"\btime\b", r"\busa today\b" | |
| ] | |
| FILTER_REGEX = re.compile("|".join(f"({pat})" for pat in NEWS_FILTER), re.IGNORECASE) | |
| # --- VirusTotal helper functions --- | |
| def scan_url_vt(url, api_key): | |
| headers = {"x-apikey": api_key} | |
| resp = requests.post( | |
| "https://www.virustotal.com/api/v3/urls", headers=headers, data={"url": url} | |
| ) | |
| resp.raise_for_status() | |
| analysis_id = resp.json()["data"]["id"] | |
| while True: | |
| time.sleep(5) | |
| status_resp = requests.get( | |
| f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers | |
| ) | |
| status_resp.raise_for_status() | |
| attr = status_resp.json()["data"]["attributes"] | |
| if attr.get("status") == "completed": | |
| stats = attr.get("stats", {}) | |
| return stats.get("malicious", 0) == 0 | |
| # --- FFprobe metadata extraction --- | |
| def extract_ffprobe_metadata(url_or_path): | |
| cmd = [ | |
| "ffprobe", "-v", "error", "-print_format", "json", | |
| "-show_format", "-show_streams", | |
| url_or_path | |
| ] | |
| out = subprocess.check_output(cmd) | |
| md = json.loads(out) | |
| for stream in md.get("streams", []): | |
| if stream.get("codec_type") == "video": | |
| avg_fr = stream.get("avg_frame_rate", "") | |
| if avg_fr and "/" in avg_fr: | |
| num, den = avg_fr.split("/") | |
| if den != "0": | |
| stream["computed_fps"] = round(int(num) / int(den), 2) | |
| break | |
| return md | |
| # --- Scrape basic page metadata --- | |
| def fetch_page_metadata(url): | |
| try: | |
| resp = requests.get(url, timeout=5) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| meta = {"url": url, "title": soup.title.string if soup.title else None} | |
| for tag in soup.find_all("meta"): | |
| prop = tag.get("property") or tag.get("name") | |
| if prop and prop.startswith(("og:", "twitter:")): | |
| meta[prop] = tag.get("content") | |
| return meta | |
| except Exception as e: | |
| return {"url": url, "error": str(e)} | |
| # --- Core search & scan logic --- | |
| def fetch_clean_videos(keywords, api_key, scan_enabled): | |
| # build IA query | |
| query = " OR ".join(kw.strip().replace(" ", "+") for kw in keywords.split(",")) | |
| ia_query = f"mediatype:(movies) AND ({query})" | |
| # robust search with retries | |
| max_attempts = 3 | |
| for attempt in range(max_attempts): | |
| try: | |
| results = list(search_items(ia_query))[:50] | |
| break | |
| except ReadTimeout: | |
| if attempt < max_attempts - 1: | |
| time.sleep(2 ** attempt) | |
| else: | |
| results = [] | |
| clean_urls = [] | |
| for res in results: | |
| title = res.get("title", "").lower() | |
| # skip known news sources | |
| if FILTER_REGEX.search(title): | |
| continue | |
| identifier = res["identifier"] | |
| try: | |
| item = get_item(identifier) | |
| except Exception: | |
| continue | |
| for f in item.files: | |
| name = f.get("name", "").lower() | |
| # include common video file extensions | |
| if name.endswith((".mp4", ".m4v", ".mov", ".avi", ".mpg", ".mpeg", ".mkv", ".webm")): | |
| url = f"https://archive.org/download/{identifier}/{f['name']}" | |
| if scan_enabled and api_key: | |
| try: | |
| if not scan_url_vt(url, api_key): | |
| continue | |
| except Exception: | |
| continue | |
| clean_urls.append(url) | |
| return clean_urls | |
| # --- Gradio UI setup --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 📼 IA Scrape – Enhanced Archive Video Explorer") | |
| with gr.Row(): | |
| kw_input = gr.Textbox(label="Search keywords", value="drone strike, military uav") | |
| vt_key_input = gr.Textbox(label="VirusTotal API Key", type="password") | |
| scan_toggle = gr.Checkbox(label="Enable VT scan", value=True) | |
| ffprobe_toggle = gr.Checkbox(label="Enable FFprobe metadata", value=False) | |
| run_btn = gr.Button("Search & Scan") | |
| url_dropdown = gr.Dropdown(label="Clean Video URLs", choices=[], interactive=True) | |
| video_player = gr.Video(label="Video Player") | |
| ia_meta_json = gr.JSON(label="► Raw IA Metadata") | |
| ffprobe_json = gr.JSON(label="► FFprobe Metadata") | |
| origins_json = gr.JSON(label="► Source‑Origin Metadata") | |
| def search_and_populate(keywords, api_key, scan_enabled): | |
| urls = fetch_clean_videos(keywords, api_key, scan_enabled) | |
| return gr.update(choices=urls, value=urls[0] if urls else None) | |
| def update_all(selected_url, ff_on, api_key): | |
| if not selected_url: | |
| return None, {}, {}, [] | |
| # 1) IA metadata + files | |
| parts = selected_url.split("/") | |
| identifier = parts[4] if len(parts) > 4 else None | |
| raw_ia = {"identifier": identifier, "metadata": {}, "files": []} | |
| if identifier: | |
| try: | |
| item = get_item(identifier) | |
| raw_ia["metadata"] = item.metadata | |
| raw_ia["files"] = [ | |
| {"name": f.get("name"), "format": f.get("format"), "size": f.get("size"), "md5": f.get("md5"), | |
| **{k: v for k, v in f.items() if k not in ("name", "format", "size", "md5")}} | |
| for f in item.files | |
| ] | |
| except Exception: | |
| raw_ia["error"] = "could not fetch IA metadata" | |
| # 2) FFprobe metadata if toggled | |
| ff_md = {} | |
| if ff_on: | |
| try: | |
| ff_md = extract_ffprobe_metadata(selected_url) | |
| except Exception as e: | |
| ff_md = {"error": str(e)} | |
| # 3) Source‑origin tracing | |
| origins = [] | |
| source_url = None | |
| meta = raw_ia.get("metadata", {}) | |
| # explicit fields | |
| for key, val in meta.items(): | |
| if key.lower() in ("source", "originalurl"): | |
| source_url = val[0] if isinstance(val, list) else val | |
| break | |
| # fallback identifiers | |
| if not source_url: | |
| for key, val in meta.items(): | |
| if key.lower().startswith("external-identifier"): | |
| ext = val[0] if isinstance(val, list) else val | |
| if "youtube" in ext: | |
| vid = ext.split(":")[-1] | |
| source_url = f"https://www.youtube.com/watch?v={vid}" | |
| elif "vimeo" in ext: | |
| vid = ext.split(":")[-1] | |
| source_url = f"https://vimeo.com/{vid}" | |
| break | |
| # description fallback | |
| if not source_url: | |
| desc = meta.get("description", "") | |
| found = re.findall(r"https?://[^\s\"<]+", desc) | |
| if found: | |
| source_url = found[0] | |
| if source_url: | |
| origins.append(fetch_page_metadata(source_url)) | |
| return selected_url, raw_ia, ff_md, origins | |
| run_btn.click( | |
| fn=search_and_populate, | |
| inputs=[kw_input, vt_key_input, scan_toggle], | |
| outputs=[url_dropdown] | |
| ) | |
| url_dropdown.change( | |
| fn=update_all, | |
| inputs=[url_dropdown, ffprobe_toggle, vt_key_input], | |
| outputs=[video_player, ia_meta_json, ffprobe_json, origins_json] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |