wuhp commited on
Commit
ffdb027
·
verified ·
1 Parent(s): 44936dc

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +109 -102
app.py CHANGED
@@ -1,15 +1,13 @@
1
  import gradio as gr
2
  from internetarchive import search_items, get_item
3
- import requests, time, subprocess, json, re, tempfile, os
 
 
 
 
4
  from bs4 import BeautifulSoup
5
- import networkx as nx
6
- from pyvis.network import Network
7
 
8
- # --- SETTINGS ---
9
- NEWS_FILTER = [r"\bcnn\b", r"\bfox\b", r"\bbbc\b", r"\bmsnbc\b", r"\breuters\b"]
10
- THEME = "gradio/soft"
11
-
12
- # --- VirusTotal scan ---
13
  def scan_url_vt(url, api_key):
14
  headers = {"x-apikey": api_key}
15
  resp = requests.post(
@@ -19,133 +17,142 @@ def scan_url_vt(url, api_key):
19
  analysis_id = resp.json()["data"]["id"]
20
  while True:
21
  time.sleep(5)
22
- status = requests.get(
23
  f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers
24
  )
25
- attr = status.json()["data"]["attributes"]
 
26
  if attr.get("status") == "completed":
27
- return attr["stats"].get("malicious", 0) == 0
 
28
 
29
- # --- FFprobe metadata ---
30
- def extract_ffprobe_metadata(path):
31
- output = subprocess.check_output([
32
- "ffprobe", "-v", "error", "-print_format", "json", "-show_format", "-show_streams", path
33
- ])
34
- return json.loads(output)
 
 
 
35
 
36
- # --- Fetch page metadata + favicon ---
37
  def fetch_page_metadata(url):
38
  try:
39
- r = requests.get(url, timeout=5)
40
- r.raise_for_status()
41
- soup = BeautifulSoup(r.text, "html.parser")
42
- meta = {"url": url, "title": soup.title.string if soup.title else url}
 
 
43
  for tag in soup.find_all("meta"):
44
- key = tag.get("property") or tag.get("name")
45
- if key and (key.startswith("og:") or key.startswith("twitter:")):
46
- meta[key] = tag.get("content")
47
- icon = soup.find("link", rel=lambda v: v and "icon" in v.lower())
48
- meta["favicon"] = icon.get("href") if icon else ""
49
  return meta
50
  except Exception as e:
51
- return {"url": url, "error": str(e), "favicon": ""}
52
 
53
- # --- IA search + filter ---
54
  def fetch_clean_videos(keywords, api_key, scan_enabled):
55
- query = " OR ".join(kw.strip().replace(" ", "+") for kw in keywords.split(","))
56
- items = list(search_items(f"mediatype:(movies) AND ({query})"))[:50]
57
- results = []
58
- for item_meta in items:
59
- title = item_meta.get("title", "").lower()
60
- if any(re.search(p, title) for p in NEWS_FILTER):
61
- continue
62
- item = get_item(item_meta['identifier'])
63
  for f in item.files:
64
- fmt = f.get('format', '').lower()
65
  if fmt.startswith(("mpeg","mp4","avi","mov","webm","m4v")):
66
- url = f"https://archive.org/download/{item_meta['identifier']}/{f['name']}"
67
  if scan_enabled and api_key:
68
  try:
69
- if not scan_url_vt(url, api_key):
70
- continue
71
- except:
72
  continue
73
- results.append(url)
74
- return results
 
 
 
75
 
76
- # --- Build graph HTML (using write_html) ---
77
- def build_graph_html(chain):
78
- net = Network(height="300px", width="100%", directed=True)
79
- for hop in chain:
80
- url = hop['url']
81
- meta = hop['metadata']
82
- title = meta.get('title', url)
83
- favicon = meta.get('favicon', '')
84
- if favicon:
85
- net.add_node(url, label="", shape='image', image=favicon, title=title)
86
- else:
87
- net.add_node(url, label=title, title=title)
88
- for i in range(len(chain)-1):
89
- net.add_edge(chain[i]['url'], chain[i+1]['url'])
90
- tmp_path = tempfile.mktemp(suffix='.html')
91
- net.write_html(tmp_path, notebook=False, open_browser=False)
92
- with open(tmp_path, 'r', encoding='utf8') as f:
93
- html = f.read()
94
- os.remove(tmp_path)
95
- return html
96
-
97
- # --- Gradio UI ---
98
- with gr.Blocks(theme=THEME) as demo:
99
- gr.Markdown("# 📼 IA Drone‑Strike Explorer")
100
  with gr.Row():
101
- kw_input = gr.Textbox(label="Search keywords (comma-separated)", value="drone strike, military uav")
102
- vt_key = gr.Textbox(label="VirusTotal API Key", type="password")
103
- scan_toggle = gr.Checkbox(label="Enable VirusTotal scan", value=True)
104
  ffprobe_toggle = gr.Checkbox(label="Enable FFprobe metadata", value=False)
105
  run_btn = gr.Button("Search & Scan")
106
 
107
  url_dropdown = gr.Dropdown(label="Clean Video URLs", choices=[], interactive=True)
108
  video_player = gr.Video(label="Video Player")
109
- ia_meta = gr.JSON(label="► Raw IA Metadata")
110
- ff_meta = gr.JSON(label="► FFprobe Metadata")
111
- graph_panel = gr.HTML(label="► Reupload Chain Graph")
112
- origin_meta = gr.JSON(label="► Origin Node Metadata")
113
 
114
- def search_and_populate(keywords, api_key, scan_on):
115
- urls = fetch_clean_videos(keywords, api_key, scan_on)
116
  return gr.update(choices=urls, value=urls[0] if urls else None)
117
 
118
- def on_url_select(selected_url, ff_on, api_key):
 
119
  if not selected_url:
120
- return None, {}, {}, "", {}
121
- # IA metadata + files
122
- parts = selected_url.split('/')
123
- ident = parts[4]
124
- item = get_item(ident)
125
- ia_data = {'metadata': item.metadata, 'files': item.files}
126
- # FFprobe
127
- ff_data = extract_ffprobe_metadata(selected_url) if ff_on else {}
128
- # origin chain
129
- desc = item.metadata.get('description', '')
130
- urls = re.findall(r'https?://[^\s"<]+', desc)
131
- chain = []
132
- for u in urls:
133
- chain.append({'url': u, 'metadata': fetch_page_metadata(u)})
134
- # append IA as last hop
135
- chain.append({'url': selected_url, 'metadata': {'title': item.metadata.get('title','')}})
136
- graph_html = build_graph_html(chain)
137
- origin_node = chain[0]['metadata'] if chain else {}
138
- return selected_url, ia_data, ff_data, graph_html, origin_node
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
 
140
  run_btn.click(
141
- search_and_populate,
142
- inputs=[kw_input, vt_key, scan_toggle],
143
  outputs=[url_dropdown]
144
  )
145
  url_dropdown.change(
146
- on_url_select,
147
- inputs=[url_dropdown, ffprobe_toggle, vt_key],
148
- outputs=[video_player, ia_meta, ff_meta, graph_panel, origin_meta]
149
  )
150
 
151
  if __name__ == "__main__":
 
1
  import gradio as gr
2
  from internetarchive import search_items, get_item
3
+ import requests
4
+ import time
5
+ import subprocess
6
+ import json
7
+ import re
8
  from bs4 import BeautifulSoup
 
 
9
 
10
+ # --- VirusTotal helper functions ---
 
 
 
 
11
  def scan_url_vt(url, api_key):
12
  headers = {"x-apikey": api_key}
13
  resp = requests.post(
 
17
  analysis_id = resp.json()["data"]["id"]
18
  while True:
19
  time.sleep(5)
20
+ status_resp = requests.get(
21
  f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers
22
  )
23
+ status_resp.raise_for_status()
24
+ attr = status_resp.json()["data"]["attributes"]
25
  if attr.get("status") == "completed":
26
+ stats = attr.get("stats", {})
27
+ return stats.get("malicious", 0) == 0
28
 
29
+ # --- FFprobe metadata extraction ---
30
+ def extract_ffprobe_metadata(url_or_path):
31
+ cmd = [
32
+ "ffprobe", "-v", "error", "-print_format", "json",
33
+ "-show_format", "-show_streams",
34
+ url_or_path
35
+ ]
36
+ out = subprocess.check_output(cmd)
37
+ return json.loads(out)
38
 
39
+ # --- Scrape basic page metadata (title + og: tags) ---
40
  def fetch_page_metadata(url):
41
  try:
42
+ resp = requests.get(url, timeout=5)
43
+ resp.raise_for_status()
44
+ html = resp.text
45
+ soup = BeautifulSoup(html, "html.parser")
46
+ meta = {"url": url, "title": soup.title.string if soup.title else None}
47
+ # grab OpenGraph tags
48
  for tag in soup.find_all("meta"):
49
+ prop = tag.get("property") or tag.get("name")
50
+ if prop and prop.startswith(("og:", "twitter:")):
51
+ meta[prop] = tag.get("content")
 
 
52
  return meta
53
  except Exception as e:
54
+ return {"url": url, "error": str(e)}
55
 
56
+ # --- Core search & scan logic ---
57
  def fetch_clean_videos(keywords, api_key, scan_enabled):
58
+ query = " OR ".join([f"{kw.strip().replace(' ', '+')}" for kw in keywords.split(",")])
59
+ ia_query = f"mediatype:(movies) AND ({query})"
60
+ results = list(search_items(ia_query))[:50]
61
+
62
+ clean_urls = []
63
+ for res in results:
64
+ identifier = res["identifier"]
65
+ item = get_item(identifier)
66
  for f in item.files:
67
+ fmt = f.get("format", "").lower()
68
  if fmt.startswith(("mpeg","mp4","avi","mov","webm","m4v")):
69
+ url = f"https://archive.org/download/{identifier}/{f['name']}"
70
  if scan_enabled and api_key:
71
  try:
72
+ is_clean = scan_url_vt(url, api_key)
73
+ except Exception:
 
74
  continue
75
+ else:
76
+ is_clean = True
77
+ if is_clean:
78
+ clean_urls.append(url)
79
+ return clean_urls
80
 
81
+ # --- Gradio UI setup ---
82
+ with gr.Blocks() as demo:
83
+ gr.Markdown("# 📼 IA Drone‑Strike Explorer \nEnable VT scan, FFprobe & Origin Tracing")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  with gr.Row():
85
+ kw_input = gr.Textbox(label="Search keywords", value="drone strike, military uav")
86
+ vt_key_input = gr.Textbox(label="VirusTotal API Key", type="password")
87
+ scan_toggle = gr.Checkbox(label="Enable VT scan", value=True)
88
  ffprobe_toggle = gr.Checkbox(label="Enable FFprobe metadata", value=False)
89
  run_btn = gr.Button("Search & Scan")
90
 
91
  url_dropdown = gr.Dropdown(label="Clean Video URLs", choices=[], interactive=True)
92
  video_player = gr.Video(label="Video Player")
93
+ ia_meta_json = gr.JSON(label="► Raw IA Metadata")
94
+ ffprobe_json = gr.JSON(label="► FFprobe Metadata")
95
+ origins_json = gr.JSON(label="► Source‑Origin Metadata")
 
96
 
97
+ def search_and_populate(keywords, api_key, scan_enabled):
98
+ urls = fetch_clean_videos(keywords, api_key, scan_enabled)
99
  return gr.update(choices=urls, value=urls[0] if urls else None)
100
 
101
+ def update_all(selected_url, ff_on, api_key):
102
+ # no selection guard
103
  if not selected_url:
104
+ return None, {}, {}, []
105
+
106
+ # 1) IA metadata + file list
107
+ parts = selected_url.split("/")
108
+ identifier = parts[4] if len(parts) > 4 else None
109
+ raw_ia = {"identifier": identifier, "metadata": {}, "files": []}
110
+ if identifier:
111
+ try:
112
+ item = get_item(identifier)
113
+ raw_ia["metadata"] = item.metadata
114
+ raw_ia["files"] = [
115
+ {
116
+ "name": f.get("name"),
117
+ "format": f.get("format"),
118
+ "size": f.get("size"),
119
+ "md5": f.get("md5"),
120
+ **{k: v for k,v in f.items() if k not in ("name","format","size","md5")}
121
+ }
122
+ for f in item.files
123
+ ]
124
+ except Exception:
125
+ raw_ia["error"] = "could not fetch IA metadata"
126
+
127
+ # 2) FFprobe metadata if toggled
128
+ ff_md = {}
129
+ if ff_on:
130
+ try:
131
+ ff_md = extract_ffprobe_metadata(selected_url)
132
+ except Exception as e:
133
+ ff_md = {"error": str(e)}
134
+
135
+ # 3) Origin tracing: scrape each URL in description
136
+ origins = []
137
+ desc = raw_ia["metadata"].get("description", "")
138
+ urls_found = re.findall(r'https?://[^\s"<]+', desc)
139
+ for url in urls_found:
140
+ meta = fetch_page_metadata(url)
141
+ origins.append(meta)
142
+ # stop at first “real” origin (you can remove this break to collect all)
143
+ break
144
+
145
+ return selected_url, raw_ia, ff_md, origins
146
 
147
  run_btn.click(
148
+ fn=search_and_populate,
149
+ inputs=[kw_input, vt_key_input, scan_toggle],
150
  outputs=[url_dropdown]
151
  )
152
  url_dropdown.change(
153
+ fn=update_all,
154
+ inputs=[url_dropdown, ffprobe_toggle, vt_key_input],
155
+ outputs=[video_player, ia_meta_json, ffprobe_json, origins_json]
156
  )
157
 
158
  if __name__ == "__main__":