wuhp commited on
Commit
673e148
·
verified ·
1 Parent(s): 49b172d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +29 -67
app.py CHANGED
@@ -1,59 +1,39 @@
1
  import gradio as gr
2
  from internetarchive import search_items, get_item
3
- import requests
4
- import time
5
- import subprocess
6
- import json
7
- import re
8
  from bs4 import BeautifulSoup
9
  from urllib.parse import urlparse
10
- import tempfile
11
  from pyvis.network import Network
12
 
13
  THEME = "gradio/soft"
14
-
15
- # --- Shared HTTP session for speed & headers ---
16
  session = requests.Session()
17
- session.headers.update({
18
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
19
- })
20
 
21
  def scan_url_vt(url, api_key):
22
- """Return True if VirusTotal sees no malicious hits."""
23
  headers = {"x-apikey": api_key}
24
- resp = session.post("https://www.virustotal.com/api/v3/urls",
25
- headers=headers, data={"url": url})
26
  resp.raise_for_status()
27
  analysis_id = resp.json()["data"]["id"]
28
- # Poll until complete
29
  while True:
30
  time.sleep(5)
31
- st = session.get(f"https://www.virustotal.com/api/v3/analyses/{analysis_id}",
32
- headers=headers)
33
  st.raise_for_status()
34
  attr = st.json()["data"]["attributes"]
35
  if attr.get("status") == "completed":
36
  return attr.get("stats", {}).get("malicious", 0) == 0
37
 
38
  def extract_ffprobe_metadata(url_or_path):
39
- """Run ffprobe and parse its JSON output."""
40
- cmd = [
41
  "ffprobe", "-v", "error", "-print_format", "json",
42
  "-show_format", "-show_streams", url_or_path
43
- ]
44
- out = subprocess.check_output(cmd)
45
  return json.loads(out)
46
 
47
  def fetch_page_metadata(url):
48
- """Grab <title>, og: and twitter: meta tags from any page."""
49
  try:
50
- r = session.get(url, timeout=5)
51
- r.raise_for_status()
52
  soup = BeautifulSoup(r.text, "html.parser")
53
- data = {
54
- "url": url,
55
- "title": soup.title.string if soup.title else None
56
- }
57
  for tag in soup.find_all("meta"):
58
  prop = tag.get("property") or tag.get("name")
59
  if prop and prop.startswith(("og:", "twitter:")):
@@ -63,7 +43,6 @@ def fetch_page_metadata(url):
63
  return {"url": url, "error": str(e)}
64
 
65
  def fetch_clean_videos(keywords, api_key, scan_enabled):
66
- """Search IA, filter for common video formats, optional VT scan."""
67
  terms = [kw.strip().replace(" ", "+") for kw in keywords.split(",")]
68
  ia_query = f"mediatype:(movies) AND ({' OR '.join(terms)})"
69
  results = list(search_items(ia_query))[:50]
@@ -73,9 +52,10 @@ def fetch_clean_videos(keywords, api_key, scan_enabled):
73
  ident = r["identifier"]
74
  item = get_item(ident)
75
  for f in item.files:
76
- fmt = f.get("format", "").lower()
77
- if fmt.startswith(("mp4", "avi", "mov", "webm", "m4v")):
78
- url = f"https://archive.org/download/{ident}/{f['name']}"
 
79
  if scan_enabled and api_key:
80
  try:
81
  if not scan_url_vt(url, api_key):
@@ -86,19 +66,16 @@ def fetch_clean_videos(keywords, api_key, scan_enabled):
86
  return clean
87
 
88
  def get_favicon_url(page_url):
89
- """Assume https://domain/favicon.ico exists."""
90
  dom = urlparse(page_url).netloc
91
  return f"https://{dom}/favicon.ico"
92
 
93
  def build_origin_graph(origins):
94
- """Render a PyVis network; return its full HTML."""
95
  net = Network(height="400px", width="100%", directed=True)
96
  for i, m in enumerate(origins):
97
  fav = get_favicon_url(m["url"])
98
  label = urlparse(m["url"]).netloc
99
  title = json.dumps(m, indent=2)
100
- net.add_node(i, label=label, title=title,
101
- shape="image", image=fav)
102
  if i > 0:
103
  net.add_edge(i - 1, i)
104
  tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".html").name
@@ -106,19 +83,18 @@ def build_origin_graph(origins):
106
  return open(tmp, encoding="utf-8").read()
107
 
108
  with gr.Blocks(theme=THEME) as demo:
109
- gr.Markdown("## 📼 IA Drone-Strike Explorer")
110
  with gr.Row():
111
- kw = gr.Textbox(label="Search keywords",
112
- value="drone strike, military uav")
113
  vt_key = gr.Textbox(label="VirusTotal API Key", type="password")
114
- scan = gr.Checkbox(label="Enable VT scan", value=True)
115
- ff = gr.Checkbox(label="Enable FFprobe metadata", value=False)
116
- btn = gr.Button("Search & Scan")
117
 
118
- dropdown = gr.Dropdown(label="Clean Video URLs", choices=[])
119
- graph_html = gr.HTML("<p>No origin graph yet.</p>")
120
  video_player = gr.Video()
121
-
122
  with gr.Tabs():
123
  with gr.TabItem("IA Metadata"):
124
  ia_json = gr.JSON()
@@ -135,11 +111,8 @@ with gr.Blocks(theme=THEME) as demo:
135
  def update_all(url_sel, ff_on, api_key):
136
  if not url_sel:
137
  return None, {}, {}, "<p>No origin graph.</p>", []
138
-
139
- # --- IA metadata for just this file ---
140
  parts = url_sel.split("/")
141
- ident = parts[4]
142
- fn = parts[-1]
143
  ia_data = {"identifier": ident, "file": {}}
144
  try:
145
  item = get_item(ident)
@@ -150,33 +123,22 @@ with gr.Blocks(theme=THEME) as demo:
150
  except Exception as e:
151
  ia_data["error"] = str(e)
152
 
153
- # --- FFprobe if requested ---
154
  ff_data = {}
155
  if ff_on:
156
- try:
157
- ff_data = extract_ffprobe_metadata(url_sel)
158
- except Exception as e:
159
- ff_data = {"error": str(e)}
160
 
161
- # --- Origin tracing: all links in description ---
162
  desc = item.metadata.get("description", "") or ""
163
  found = re.findall(r"https?://[^\s\"'<]+", desc)
164
  origins = [fetch_page_metadata(u) for u in found]
 
165
 
166
- # --- Build the interactive graph HTML ---
167
- graph = build_origin_graph(origins) if origins else "<p>No origins.</p>"
168
  return url_sel, ia_data, ff_data, graph, origins
169
 
170
- btn.click(
171
- fn=search_and_populate,
172
- inputs=[kw, vt_key, scan],
173
- outputs=[dropdown]
174
- )
175
- dropdown.change(
176
- fn=update_all,
177
- inputs=[dropdown, ff, vt_key],
178
- outputs=[video_player, ia_json, ff_json, graph_html, orig_json]
179
- )
180
 
181
  if __name__ == "__main__":
182
  demo.launch()
 
1
  import gradio as gr
2
  from internetarchive import search_items, get_item
3
+ import requests, time, subprocess, json, re, tempfile
 
 
 
 
4
  from bs4 import BeautifulSoup
5
  from urllib.parse import urlparse
 
6
  from pyvis.network import Network
7
 
8
  THEME = "gradio/soft"
 
 
9
  session = requests.Session()
10
+ session.headers.update({"User-Agent": "Mozilla/5.0"})
 
 
11
 
12
  def scan_url_vt(url, api_key):
 
13
  headers = {"x-apikey": api_key}
14
+ resp = session.post("https://www.virustotal.com/api/v3/urls", headers=headers, data={"url": url})
 
15
  resp.raise_for_status()
16
  analysis_id = resp.json()["data"]["id"]
 
17
  while True:
18
  time.sleep(5)
19
+ st = session.get(f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers)
 
20
  st.raise_for_status()
21
  attr = st.json()["data"]["attributes"]
22
  if attr.get("status") == "completed":
23
  return attr.get("stats", {}).get("malicious", 0) == 0
24
 
25
  def extract_ffprobe_metadata(url_or_path):
26
+ out = subprocess.check_output([
 
27
  "ffprobe", "-v", "error", "-print_format", "json",
28
  "-show_format", "-show_streams", url_or_path
29
+ ])
 
30
  return json.loads(out)
31
 
32
  def fetch_page_metadata(url):
 
33
  try:
34
+ r = session.get(url, timeout=5); r.raise_for_status()
 
35
  soup = BeautifulSoup(r.text, "html.parser")
36
+ data = {"url": url, "title": getattr(soup.title, "string", None)}
 
 
 
37
  for tag in soup.find_all("meta"):
38
  prop = tag.get("property") or tag.get("name")
39
  if prop and prop.startswith(("og:", "twitter:")):
 
43
  return {"url": url, "error": str(e)}
44
 
45
  def fetch_clean_videos(keywords, api_key, scan_enabled):
 
46
  terms = [kw.strip().replace(" ", "+") for kw in keywords.split(",")]
47
  ia_query = f"mediatype:(movies) AND ({' OR '.join(terms)})"
48
  results = list(search_items(ia_query))[:50]
 
52
  ident = r["identifier"]
53
  item = get_item(ident)
54
  for f in item.files:
55
+ name = f.get("name", "")
56
+ ext = name.lower().split(".")[-1]
57
+ if ext in ("mp4", "mkv", "avi", "mov", "webm", "m4v"):
58
+ url = f"https://archive.org/download/{ident}/{name}"
59
  if scan_enabled and api_key:
60
  try:
61
  if not scan_url_vt(url, api_key):
 
66
  return clean
67
 
68
  def get_favicon_url(page_url):
 
69
  dom = urlparse(page_url).netloc
70
  return f"https://{dom}/favicon.ico"
71
 
72
  def build_origin_graph(origins):
 
73
  net = Network(height="400px", width="100%", directed=True)
74
  for i, m in enumerate(origins):
75
  fav = get_favicon_url(m["url"])
76
  label = urlparse(m["url"]).netloc
77
  title = json.dumps(m, indent=2)
78
+ net.add_node(i, label=label, title=title, shape="image", image=fav)
 
79
  if i > 0:
80
  net.add_edge(i - 1, i)
81
  tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".html").name
 
83
  return open(tmp, encoding="utf-8").read()
84
 
85
  with gr.Blocks(theme=THEME) as demo:
86
+ gr.Markdown("## 📼 IA DroneStrike Explorer")
87
  with gr.Row():
88
+ kw = gr.Textbox(label="Search keywords", value="drone strike, military uav")
 
89
  vt_key = gr.Textbox(label="VirusTotal API Key", type="password")
90
+ scan = gr.Checkbox(label="Enable VT scan", value=True)
91
+ ff = gr.Checkbox(label="Enable FFprobe metadata", value=False)
92
+ btn = gr.Button("Search & Scan")
93
 
94
+ dropdown = gr.Dropdown(label="Clean Video URLs", choices=[])
95
+ graph_html = gr.HTML("<p>No origin graph yet.</p>")
96
  video_player = gr.Video()
97
+
98
  with gr.Tabs():
99
  with gr.TabItem("IA Metadata"):
100
  ia_json = gr.JSON()
 
111
  def update_all(url_sel, ff_on, api_key):
112
  if not url_sel:
113
  return None, {}, {}, "<p>No origin graph.</p>", []
 
 
114
  parts = url_sel.split("/")
115
+ ident = parts[4]; fn = parts[-1]
 
116
  ia_data = {"identifier": ident, "file": {}}
117
  try:
118
  item = get_item(ident)
 
123
  except Exception as e:
124
  ia_data["error"] = str(e)
125
 
 
126
  ff_data = {}
127
  if ff_on:
128
+ try: ff_data = extract_ffprobe_metadata(url_sel)
129
+ except Exception as e: ff_data = {"error": str(e)}
 
 
130
 
 
131
  desc = item.metadata.get("description", "") or ""
132
  found = re.findall(r"https?://[^\s\"'<]+", desc)
133
  origins = [fetch_page_metadata(u) for u in found]
134
+ graph = build_origin_graph(origins) if origins else "<p>No origins.</p>"
135
 
 
 
136
  return url_sel, ia_data, ff_data, graph, origins
137
 
138
+ btn.click(fn=search_and_populate, inputs=[kw, vt_key, scan], outputs=[dropdown])
139
+ dropdown.change(fn=update_all,
140
+ inputs=[dropdown, ff, vt_key],
141
+ outputs=[video_player, ia_json, ff_json, graph_html, orig_json])
 
 
 
 
 
 
142
 
143
  if __name__ == "__main__":
144
  demo.launch()