wuhp commited on
Commit
3180a1f
·
verified ·
1 Parent(s): 6ac401a

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +105 -151
app.py CHANGED
@@ -6,200 +6,154 @@ import subprocess
6
  import json
7
  import re
8
  from bs4 import BeautifulSoup
9
- from urllib.parse import urlparse
10
- import tempfile
11
- from pyvis.network import Network
12
- from functools import lru_cache
13
- from concurrent.futures import ThreadPoolExecutor
14
-
15
- THEME = "gradio/soft"
16
-
17
- # --- Shared HTTP session for speed & consistent headers ---
18
- session = requests.Session()
19
- session.headers.update({
20
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
21
- })
22
-
23
- @lru_cache(maxsize=64)
24
- def get_cached_item(identifier):
25
- """Cache IA get_item() calls to avoid duplicate downloads."""
26
- return get_item(identifier)
27
 
 
28
  def scan_url_vt(url, api_key):
29
- """Return True if VirusTotal sees no malicious hits."""
30
  headers = {"x-apikey": api_key}
31
- resp = session.post(
32
- "https://www.virustotal.com/api/v3/urls",
33
- headers=headers,
34
- data={"url": url}
35
  )
36
  resp.raise_for_status()
37
  analysis_id = resp.json()["data"]["id"]
38
-
39
- # Poll every second until complete
40
  while True:
41
- time.sleep(1)
42
- st = session.get(
43
- f"https://www.virustotal.com/api/v3/analyses/{analysis_id}",
44
- headers=headers
45
  )
46
- st.raise_for_status()
47
- attr = st.json()["data"]["attributes"]
48
  if attr.get("status") == "completed":
49
- return attr.get("stats", {}).get("malicious", 0) == 0
 
50
 
51
- def extract_ffprobe_metadata(path):
52
- """Run ffprobe and parse its JSON output."""
53
  cmd = [
54
  "ffprobe", "-v", "error", "-print_format", "json",
55
- "-show_format", "-show_streams", path
 
56
  ]
57
  out = subprocess.check_output(cmd)
58
  return json.loads(out)
59
 
 
60
  def fetch_page_metadata(url):
61
- """Grab <title>, og: and twitter: meta tags from any page."""
62
  try:
63
- r = session.get(url, timeout=5)
64
- r.raise_for_status()
65
- soup = BeautifulSoup(r.text, "html.parser")
66
- data = {
67
- "url": url,
68
- "title": getattr(soup.title, "string", None)
69
- }
70
  for tag in soup.find_all("meta"):
71
  prop = tag.get("property") or tag.get("name")
72
  if prop and prop.startswith(("og:", "twitter:")):
73
- data[prop] = tag.get("content")
74
- return data
75
  except Exception as e:
76
  return {"url": url, "error": str(e)}
77
 
 
78
  def fetch_clean_videos(keywords, api_key, scan_enabled):
79
- """Search IA, filter by file extension, optional parallel VT scan."""
80
- terms = [kw.strip().replace(" ", "+") for kw in keywords.split(",")]
81
- ia_query = f"mediatype:(movies) AND ({' OR '.join(terms)})"
82
- results = list(search_items(ia_query))[:10] # top 10 to stay speedy
83
-
84
- # collect candidate video URLs
85
- candidates = []
86
- for r in results:
87
- ident = r["identifier"]
88
- item = get_cached_item(ident)
89
  for f in item.files:
90
- name = f.get("name", "")
91
- ext = name.lower().split(".")[-1]
92
- if ext in ("mp4", "mkv", "avi", "mov", "webm", "m4v"):
93
- candidates.append(f"https://archive.org/download/{ident}/{name}")
94
-
95
- # optionally scan in parallel
96
- if scan_enabled and api_key:
97
- with ThreadPoolExecutor(max_workers=5) as exe:
98
- safe_urls = exe.map(lambda u: u if scan_url_vt(u, api_key) else None, candidates)
99
- return [u for u in safe_urls if u]
100
- return candidates
101
-
102
- def get_favicon_url(page_url):
103
- """Assume https://domain/favicon.ico exists."""
104
- dom = urlparse(page_url).netloc
105
- return f"https://{dom}/favicon.ico"
106
-
107
- def build_origin_graph(origins):
108
- """Render a PyVis network; return its full HTML."""
109
- net = Network(height="400px", width="100%", directed=True)
110
- for i, m in enumerate(origins):
111
- net.add_node(
112
- i,
113
- label=urlparse(m["url"]).netloc,
114
- title=json.dumps(m, indent=2),
115
- shape="image",
116
- image=get_favicon_url(m["url"])
117
- )
118
- if i > 0:
119
- net.add_edge(i - 1, i)
120
- tmpfile = tempfile.NamedTemporaryFile(delete=False, suffix=".html").name
121
- net.save_graph(tmpfile)
122
- return open(tmpfile, encoding="utf-8").read()
123
-
124
- with gr.Blocks(theme=THEME) as demo:
125
- gr.Markdown("## 📼 IA Drone-Strike Explorer")
126
-
127
  with gr.Row():
128
- kw = gr.Textbox(label="Search keywords", value="drone strike, military uav")
129
- vt_key = gr.Textbox(label="VirusTotal API Key", type="password")
130
- scan = gr.Checkbox(label="Enable VT scan", value=True)
131
- ff = gr.Checkbox(label="Enable FFprobe metadata", value=False)
132
- btn = gr.Button("Search & Scan")
133
-
134
- dropdown = gr.Dropdown(label="Clean Video URLs", choices=[])
135
- graph_html = gr.HTML("<p>No origin graph yet.</p>")
136
- video_player = gr.Video()
137
-
138
- with gr.Tabs():
139
- with gr.TabItem("IA Metadata"):
140
- ia_json = gr.JSON()
141
- with gr.TabItem("FFprobe"):
142
- ff_json = gr.JSON()
143
- with gr.TabItem("Origins"):
144
- graph_html
145
- orig_json = gr.JSON()
146
 
147
  def search_and_populate(keywords, api_key, scan_enabled):
148
  urls = fetch_clean_videos(keywords, api_key, scan_enabled)
149
  return gr.update(choices=urls, value=urls[0] if urls else None)
150
 
151
- def update_all(url_sel, ff_on, api_key):
152
- if not url_sel:
153
- return None, {}, {}, "<p>No origin graph.</p>", []
 
154
 
155
- parts = url_sel.split("/")
156
- ident, fn = parts[4], parts[-1]
157
- ia_data = {"identifier": ident, "file": {}}
158
-
159
- # — Fix metadata merge
160
- try:
161
- item = get_cached_item(ident)
162
- file_rec = next((f for f in item.files if f["name"] == fn), None)
163
- if file_rec:
164
- ia_data["file"] = {**item.metadata, **file_rec}
165
- else:
166
- ia_data["file"] = {}
167
- except Exception as e:
168
- ia_data["error"] = str(e)
169
-
170
- # — FFprobe
171
- ff_data = {}
 
 
 
 
 
 
172
  if ff_on:
173
  try:
174
- ff_data = extract_ffprobe_metadata(url_sel)
175
  except Exception as e:
176
- ff_data = {"error": str(e)}
177
 
178
- # Origin tracing
179
- desc = (item.metadata.get("description") or "")
180
- found = re.findall(r"https?://[^\s\"'<]+", desc)
181
- if found:
182
- with ThreadPoolExecutor(max_workers=5) as exe:
183
- origins = list(exe.map(fetch_page_metadata, found))
184
- graph = build_origin_graph(origins)
185
- else:
186
- origins, graph = [], "<p>No origins.</p>"
187
 
188
- return url_sel, ia_data, ff_data, graph, origins
189
 
190
- btn.click(
191
  fn=search_and_populate,
192
- inputs=[kw, vt_key, scan],
193
- outputs=[dropdown]
194
  )
195
- dropdown.change(
196
  fn=update_all,
197
- inputs=[dropdown, ff, vt_key],
198
- outputs=[video_player, ia_json, ff_json, graph_html, orig_json]
199
  )
200
 
201
- # <<< enable async queue & spinner >>>
202
- demo.queue()
203
-
204
  if __name__ == "__main__":
205
  demo.launch()
 
6
  import json
7
  import re
8
  from bs4 import BeautifulSoup
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
 
10
+ # --- VirusTotal helper functions ---
11
  def scan_url_vt(url, api_key):
 
12
  headers = {"x-apikey": api_key}
13
+ resp = requests.post(
14
+ "https://www.virustotal.com/api/v3/urls", headers=headers, data={"url": url}
 
 
15
  )
16
  resp.raise_for_status()
17
  analysis_id = resp.json()["data"]["id"]
 
 
18
  while True:
19
+ time.sleep(5)
20
+ status_resp = requests.get(
21
+ f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers
 
22
  )
23
+ status_resp.raise_for_status()
24
+ attr = status_resp.json()["data"]["attributes"]
25
  if attr.get("status") == "completed":
26
+ stats = attr.get("stats", {})
27
+ return stats.get("malicious", 0) == 0
28
 
29
+ # --- FFprobe metadata extraction ---
30
+ def extract_ffprobe_metadata(url_or_path):
31
  cmd = [
32
  "ffprobe", "-v", "error", "-print_format", "json",
33
+ "-show_format", "-show_streams",
34
+ url_or_path
35
  ]
36
  out = subprocess.check_output(cmd)
37
  return json.loads(out)
38
 
39
+ # --- Scrape basic page metadata (title + og: tags) ---
40
  def fetch_page_metadata(url):
 
41
  try:
42
+ resp = requests.get(url, timeout=5)
43
+ resp.raise_for_status()
44
+ html = resp.text
45
+ soup = BeautifulSoup(html, "html.parser")
46
+ meta = {"url": url, "title": soup.title.string if soup.title else None}
47
+ # grab OpenGraph tags
 
48
  for tag in soup.find_all("meta"):
49
  prop = tag.get("property") or tag.get("name")
50
  if prop and prop.startswith(("og:", "twitter:")):
51
+ meta[prop] = tag.get("content")
52
+ return meta
53
  except Exception as e:
54
  return {"url": url, "error": str(e)}
55
 
56
+ # --- Core search & scan logic ---
57
  def fetch_clean_videos(keywords, api_key, scan_enabled):
58
+ query = " OR ".join([f"{kw.strip().replace(' ', '+')}" for kw in keywords.split(",")])
59
+ ia_query = f"mediatype:(movies) AND ({query})"
60
+ results = list(search_items(ia_query))[:50]
61
+
62
+ clean_urls = []
63
+ for res in results:
64
+ identifier = res["identifier"]
65
+ item = get_item(identifier)
 
 
66
  for f in item.files:
67
+ fmt = f.get("format", "").lower()
68
+ if fmt.startswith(("mpeg","mp4","avi","mov","webm","m4v")):
69
+ url = f"https://archive.org/download/{identifier}/{f['name']}"
70
+ if scan_enabled and api_key:
71
+ try:
72
+ is_clean = scan_url_vt(url, api_key)
73
+ except Exception:
74
+ continue
75
+ else:
76
+ is_clean = True
77
+ if is_clean:
78
+ clean_urls.append(url)
79
+ return clean_urls
80
+
81
+ # --- Gradio UI setup ---
82
+ with gr.Blocks() as demo:
83
+ gr.Markdown("# 📼 IA Drone‑Strike Explorer \nEnable VT scan, FFprobe & Origin Tracing")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  with gr.Row():
85
+ kw_input = gr.Textbox(label="Search keywords", value="drone strike, military uav")
86
+ vt_key_input = gr.Textbox(label="VirusTotal API Key", type="password")
87
+ scan_toggle = gr.Checkbox(label="Enable VT scan", value=True)
88
+ ffprobe_toggle = gr.Checkbox(label="Enable FFprobe metadata", value=False)
89
+ run_btn = gr.Button("Search & Scan")
90
+
91
+ url_dropdown = gr.Dropdown(label="Clean Video URLs", choices=[], interactive=True)
92
+ video_player = gr.Video(label="Video Player")
93
+ ia_meta_json = gr.JSON(label="► Raw IA Metadata")
94
+ ffprobe_json = gr.JSON(label="► FFprobe Metadata")
95
+ origins_json = gr.JSON(label="► Source‑Origin Metadata")
 
 
 
 
 
 
 
96
 
97
  def search_and_populate(keywords, api_key, scan_enabled):
98
  urls = fetch_clean_videos(keywords, api_key, scan_enabled)
99
  return gr.update(choices=urls, value=urls[0] if urls else None)
100
 
101
+ def update_all(selected_url, ff_on, api_key):
102
+ # no selection guard
103
+ if not selected_url:
104
+ return None, {}, {}, []
105
 
106
+ # 1) IA metadata + file list
107
+ parts = selected_url.split("/")
108
+ identifier = parts[4] if len(parts) > 4 else None
109
+ raw_ia = {"identifier": identifier, "metadata": {}, "files": []}
110
+ if identifier:
111
+ try:
112
+ item = get_item(identifier)
113
+ raw_ia["metadata"] = item.metadata
114
+ raw_ia["files"] = [
115
+ {
116
+ "name": f.get("name"),
117
+ "format": f.get("format"),
118
+ "size": f.get("size"),
119
+ "md5": f.get("md5"),
120
+ **{k: v for k,v in f.items() if k not in ("name","format","size","md5")}
121
+ }
122
+ for f in item.files
123
+ ]
124
+ except Exception:
125
+ raw_ia["error"] = "could not fetch IA metadata"
126
+
127
+ # 2) FFprobe metadata if toggled
128
+ ff_md = {}
129
  if ff_on:
130
  try:
131
+ ff_md = extract_ffprobe_metadata(selected_url)
132
  except Exception as e:
133
+ ff_md = {"error": str(e)}
134
 
135
+ # 3) Origin tracing: scrape each URL in description
136
+ origins = []
137
+ desc = raw_ia["metadata"].get("description", "")
138
+ urls_found = re.findall(r'https?://[^\s"<]+', desc)
139
+ for url in urls_found:
140
+ meta = fetch_page_metadata(url)
141
+ origins.append(meta)
142
+ # stop at first “real” origin (you can remove this break to collect all)
143
+ break
144
 
145
+ return selected_url, raw_ia, ff_md, origins
146
 
147
+ run_btn.click(
148
  fn=search_and_populate,
149
+ inputs=[kw_input, vt_key_input, scan_toggle],
150
+ outputs=[url_dropdown]
151
  )
152
+ url_dropdown.change(
153
  fn=update_all,
154
+ inputs=[url_dropdown, ffprobe_toggle, vt_key_input],
155
+ outputs=[video_player, ia_meta_json, ffprobe_json, origins_json]
156
  )
157
 
 
 
 
158
  if __name__ == "__main__":
159
  demo.launch()