wuhp commited on
Commit
6ac401a
·
verified ·
1 Parent(s): 673e148

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +114 -53
app.py CHANGED
@@ -1,39 +1,72 @@
1
  import gradio as gr
2
  from internetarchive import search_items, get_item
3
- import requests, time, subprocess, json, re, tempfile
 
 
 
 
4
  from bs4 import BeautifulSoup
5
  from urllib.parse import urlparse
 
6
  from pyvis.network import Network
 
 
7
 
8
  THEME = "gradio/soft"
 
 
9
  session = requests.Session()
10
- session.headers.update({"User-Agent": "Mozilla/5.0"})
 
 
 
 
 
 
 
11
 
12
  def scan_url_vt(url, api_key):
 
13
  headers = {"x-apikey": api_key}
14
- resp = session.post("https://www.virustotal.com/api/v3/urls", headers=headers, data={"url": url})
 
 
 
 
15
  resp.raise_for_status()
16
  analysis_id = resp.json()["data"]["id"]
 
 
17
  while True:
18
- time.sleep(5)
19
- st = session.get(f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers)
 
 
 
20
  st.raise_for_status()
21
  attr = st.json()["data"]["attributes"]
22
  if attr.get("status") == "completed":
23
  return attr.get("stats", {}).get("malicious", 0) == 0
24
 
25
- def extract_ffprobe_metadata(url_or_path):
26
- out = subprocess.check_output([
 
27
  "ffprobe", "-v", "error", "-print_format", "json",
28
- "-show_format", "-show_streams", url_or_path
29
- ])
 
30
  return json.loads(out)
31
 
32
  def fetch_page_metadata(url):
 
33
  try:
34
- r = session.get(url, timeout=5); r.raise_for_status()
 
35
  soup = BeautifulSoup(r.text, "html.parser")
36
- data = {"url": url, "title": getattr(soup.title, "string", None)}
 
 
 
37
  for tag in soup.find_all("meta"):
38
  prop = tag.get("property") or tag.get("name")
39
  if prop and prop.startswith(("og:", "twitter:")):
@@ -43,56 +76,63 @@ def fetch_page_metadata(url):
43
  return {"url": url, "error": str(e)}
44
 
45
  def fetch_clean_videos(keywords, api_key, scan_enabled):
 
46
  terms = [kw.strip().replace(" ", "+") for kw in keywords.split(",")]
47
  ia_query = f"mediatype:(movies) AND ({' OR '.join(terms)})"
48
- results = list(search_items(ia_query))[:50]
49
 
50
- clean = []
 
51
  for r in results:
52
  ident = r["identifier"]
53
- item = get_item(ident)
54
  for f in item.files:
55
  name = f.get("name", "")
56
  ext = name.lower().split(".")[-1]
57
  if ext in ("mp4", "mkv", "avi", "mov", "webm", "m4v"):
58
- url = f"https://archive.org/download/{ident}/{name}"
59
- if scan_enabled and api_key:
60
- try:
61
- if not scan_url_vt(url, api_key):
62
- continue
63
- except:
64
- continue
65
- clean.append(url)
66
- return clean
67
 
68
  def get_favicon_url(page_url):
 
69
  dom = urlparse(page_url).netloc
70
  return f"https://{dom}/favicon.ico"
71
 
72
  def build_origin_graph(origins):
 
73
  net = Network(height="400px", width="100%", directed=True)
74
  for i, m in enumerate(origins):
75
- fav = get_favicon_url(m["url"])
76
- label = urlparse(m["url"]).netloc
77
- title = json.dumps(m, indent=2)
78
- net.add_node(i, label=label, title=title, shape="image", image=fav)
 
 
 
79
  if i > 0:
80
  net.add_edge(i - 1, i)
81
- tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".html").name
82
- net.save_graph(tmp)
83
- return open(tmp, encoding="utf-8").read()
84
 
85
  with gr.Blocks(theme=THEME) as demo:
86
- gr.Markdown("## 📼 IA DroneStrike Explorer")
 
87
  with gr.Row():
88
  kw = gr.Textbox(label="Search keywords", value="drone strike, military uav")
89
  vt_key = gr.Textbox(label="VirusTotal API Key", type="password")
90
- scan = gr.Checkbox(label="Enable VT scan", value=True)
91
- ff = gr.Checkbox(label="Enable FFprobe metadata", value=False)
92
- btn = gr.Button("Search & Scan")
93
 
94
- dropdown = gr.Dropdown(label="Clean Video URLs", choices=[])
95
- graph_html = gr.HTML("<p>No origin graph yet.</p>")
96
  video_player = gr.Video()
97
 
98
  with gr.Tabs():
@@ -111,34 +151,55 @@ with gr.Blocks(theme=THEME) as demo:
111
  def update_all(url_sel, ff_on, api_key):
112
  if not url_sel:
113
  return None, {}, {}, "<p>No origin graph.</p>", []
114
- parts = url_sel.split("/")
115
- ident = parts[4]; fn = parts[-1]
116
- ia_data = {"identifier": ident, "file": {}}
 
 
 
117
  try:
118
- item = get_item(ident)
119
- ia_data["file"] = next(
120
- {**{k: item.metadata.get(k)}, **f}
121
- for f in item.files if f["name"] == fn
122
- )
 
123
  except Exception as e:
124
  ia_data["error"] = str(e)
125
 
 
126
  ff_data = {}
127
  if ff_on:
128
- try: ff_data = extract_ffprobe_metadata(url_sel)
129
- except Exception as e: ff_data = {"error": str(e)}
 
 
130
 
131
- desc = item.metadata.get("description", "") or ""
 
132
  found = re.findall(r"https?://[^\s\"'<]+", desc)
133
- origins = [fetch_page_metadata(u) for u in found]
134
- graph = build_origin_graph(origins) if origins else "<p>No origins.</p>"
 
 
 
 
135
 
136
  return url_sel, ia_data, ff_data, graph, origins
137
 
138
- btn.click(fn=search_and_populate, inputs=[kw, vt_key, scan], outputs=[dropdown])
139
- dropdown.change(fn=update_all,
140
- inputs=[dropdown, ff, vt_key],
141
- outputs=[video_player, ia_json, ff_json, graph_html, orig_json])
 
 
 
 
 
 
 
 
 
142
 
143
  if __name__ == "__main__":
144
  demo.launch()
 
1
  import gradio as gr
2
  from internetarchive import search_items, get_item
3
+ import requests
4
+ import time
5
+ import subprocess
6
+ import json
7
+ import re
8
  from bs4 import BeautifulSoup
9
  from urllib.parse import urlparse
10
+ import tempfile
11
  from pyvis.network import Network
12
+ from functools import lru_cache
13
+ from concurrent.futures import ThreadPoolExecutor
14
 
15
  THEME = "gradio/soft"
16
+
17
+ # --- Shared HTTP session for speed & consistent headers ---
18
  session = requests.Session()
19
+ session.headers.update({
20
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
21
+ })
22
+
23
+ @lru_cache(maxsize=64)
24
+ def get_cached_item(identifier):
25
+ """Cache IA get_item() calls to avoid duplicate downloads."""
26
+ return get_item(identifier)
27
 
28
  def scan_url_vt(url, api_key):
29
+ """Return True if VirusTotal sees no malicious hits."""
30
  headers = {"x-apikey": api_key}
31
+ resp = session.post(
32
+ "https://www.virustotal.com/api/v3/urls",
33
+ headers=headers,
34
+ data={"url": url}
35
+ )
36
  resp.raise_for_status()
37
  analysis_id = resp.json()["data"]["id"]
38
+
39
+ # Poll every second until complete
40
  while True:
41
+ time.sleep(1)
42
+ st = session.get(
43
+ f"https://www.virustotal.com/api/v3/analyses/{analysis_id}",
44
+ headers=headers
45
+ )
46
  st.raise_for_status()
47
  attr = st.json()["data"]["attributes"]
48
  if attr.get("status") == "completed":
49
  return attr.get("stats", {}).get("malicious", 0) == 0
50
 
51
+ def extract_ffprobe_metadata(path):
52
+ """Run ffprobe and parse its JSON output."""
53
+ cmd = [
54
  "ffprobe", "-v", "error", "-print_format", "json",
55
+ "-show_format", "-show_streams", path
56
+ ]
57
+ out = subprocess.check_output(cmd)
58
  return json.loads(out)
59
 
60
  def fetch_page_metadata(url):
61
+ """Grab <title>, og: and twitter: meta tags from any page."""
62
  try:
63
+ r = session.get(url, timeout=5)
64
+ r.raise_for_status()
65
  soup = BeautifulSoup(r.text, "html.parser")
66
+ data = {
67
+ "url": url,
68
+ "title": getattr(soup.title, "string", None)
69
+ }
70
  for tag in soup.find_all("meta"):
71
  prop = tag.get("property") or tag.get("name")
72
  if prop and prop.startswith(("og:", "twitter:")):
 
76
  return {"url": url, "error": str(e)}
77
 
78
  def fetch_clean_videos(keywords, api_key, scan_enabled):
79
+ """Search IA, filter by file extension, optional parallel VT scan."""
80
  terms = [kw.strip().replace(" ", "+") for kw in keywords.split(",")]
81
  ia_query = f"mediatype:(movies) AND ({' OR '.join(terms)})"
82
+ results = list(search_items(ia_query))[:10] # top 10 to stay speedy
83
 
84
+ # collect candidate video URLs
85
+ candidates = []
86
  for r in results:
87
  ident = r["identifier"]
88
+ item = get_cached_item(ident)
89
  for f in item.files:
90
  name = f.get("name", "")
91
  ext = name.lower().split(".")[-1]
92
  if ext in ("mp4", "mkv", "avi", "mov", "webm", "m4v"):
93
+ candidates.append(f"https://archive.org/download/{ident}/{name}")
94
+
95
+ # optionally scan in parallel
96
+ if scan_enabled and api_key:
97
+ with ThreadPoolExecutor(max_workers=5) as exe:
98
+ safe_urls = exe.map(lambda u: u if scan_url_vt(u, api_key) else None, candidates)
99
+ return [u for u in safe_urls if u]
100
+ return candidates
 
101
 
102
  def get_favicon_url(page_url):
103
+ """Assume https://domain/favicon.ico exists."""
104
  dom = urlparse(page_url).netloc
105
  return f"https://{dom}/favicon.ico"
106
 
107
  def build_origin_graph(origins):
108
+ """Render a PyVis network; return its full HTML."""
109
  net = Network(height="400px", width="100%", directed=True)
110
  for i, m in enumerate(origins):
111
+ net.add_node(
112
+ i,
113
+ label=urlparse(m["url"]).netloc,
114
+ title=json.dumps(m, indent=2),
115
+ shape="image",
116
+ image=get_favicon_url(m["url"])
117
+ )
118
  if i > 0:
119
  net.add_edge(i - 1, i)
120
+ tmpfile = tempfile.NamedTemporaryFile(delete=False, suffix=".html").name
121
+ net.save_graph(tmpfile)
122
+ return open(tmpfile, encoding="utf-8").read()
123
 
124
  with gr.Blocks(theme=THEME) as demo:
125
+ gr.Markdown("## 📼 IA Drone-Strike Explorer")
126
+
127
  with gr.Row():
128
  kw = gr.Textbox(label="Search keywords", value="drone strike, military uav")
129
  vt_key = gr.Textbox(label="VirusTotal API Key", type="password")
130
+ scan = gr.Checkbox(label="Enable VT scan", value=True)
131
+ ff = gr.Checkbox(label="Enable FFprobe metadata", value=False)
132
+ btn = gr.Button("Search & Scan")
133
 
134
+ dropdown = gr.Dropdown(label="Clean Video URLs", choices=[])
135
+ graph_html = gr.HTML("<p>No origin graph yet.</p>")
136
  video_player = gr.Video()
137
 
138
  with gr.Tabs():
 
151
  def update_all(url_sel, ff_on, api_key):
152
  if not url_sel:
153
  return None, {}, {}, "<p>No origin graph.</p>", []
154
+
155
+ parts = url_sel.split("/")
156
+ ident, fn = parts[4], parts[-1]
157
+ ia_data = {"identifier": ident, "file": {}}
158
+
159
+ # — Fix metadata merge
160
  try:
161
+ item = get_cached_item(ident)
162
+ file_rec = next((f for f in item.files if f["name"] == fn), None)
163
+ if file_rec:
164
+ ia_data["file"] = {**item.metadata, **file_rec}
165
+ else:
166
+ ia_data["file"] = {}
167
  except Exception as e:
168
  ia_data["error"] = str(e)
169
 
170
+ # — FFprobe
171
  ff_data = {}
172
  if ff_on:
173
+ try:
174
+ ff_data = extract_ffprobe_metadata(url_sel)
175
+ except Exception as e:
176
+ ff_data = {"error": str(e)}
177
 
178
+ # Origin tracing
179
+ desc = (item.metadata.get("description") or "")
180
  found = re.findall(r"https?://[^\s\"'<]+", desc)
181
+ if found:
182
+ with ThreadPoolExecutor(max_workers=5) as exe:
183
+ origins = list(exe.map(fetch_page_metadata, found))
184
+ graph = build_origin_graph(origins)
185
+ else:
186
+ origins, graph = [], "<p>No origins.</p>"
187
 
188
  return url_sel, ia_data, ff_data, graph, origins
189
 
190
+ btn.click(
191
+ fn=search_and_populate,
192
+ inputs=[kw, vt_key, scan],
193
+ outputs=[dropdown]
194
+ )
195
+ dropdown.change(
196
+ fn=update_all,
197
+ inputs=[dropdown, ff, vt_key],
198
+ outputs=[video_player, ia_json, ff_json, graph_html, orig_json]
199
+ )
200
+
201
+ # <<< enable async queue & spinner >>>
202
+ demo.queue()
203
 
204
  if __name__ == "__main__":
205
  demo.launch()