wuhp commited on
Commit
b9f1248
·
verified ·
1 Parent(s): ada90f3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +82 -135
app.py CHANGED
@@ -6,50 +6,45 @@ import subprocess
6
  import json
7
  import re
8
  from bs4 import BeautifulSoup
9
- from concurrent.futures import ThreadPoolExecutor
10
- from functools import lru_cache
11
- import networkx as nx
12
- from pyvis.network import Network
13
- from urllib.parse import urlparse
14
 
15
- # --- Theme ---
16
- THEME = "gradio/soft"
17
-
18
- # --- Shared HTTP session for speed & headers ---
19
- session = requests.Session()
20
- session.headers.update({
21
- "User-Agent": "Mozilla/5.0 (compatible; IA-Video-Meta-Explorer/1.0)"
22
- })
23
-
24
- # --- VirusTotal helper (optional) ---
25
  def scan_url_vt(url, api_key):
26
  headers = {"x-apikey": api_key}
27
- resp = session.post("https://www.virustotal.com/api/v3/urls", headers=headers, data={"url": url})
 
 
28
  resp.raise_for_status()
29
  analysis_id = resp.json()["data"]["id"]
30
  while True:
31
  time.sleep(5)
32
- st = session.get(f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers)
33
- st.raise_for_status()
34
- attr = st.json()["data"]["attributes"]
 
 
35
  if attr.get("status") == "completed":
36
- return attr.get("stats", {}).get("malicious", 0) == 0
 
37
 
38
  # --- FFprobe metadata extraction ---
39
  def extract_ffprobe_metadata(url_or_path):
40
- cmd = ["ffprobe", "-v", "error", "-print_format", "json", "-show_format", "-show_streams", url_or_path]
 
 
 
 
41
  out = subprocess.check_output(cmd)
42
  return json.loads(out)
43
 
44
- # --- Caching page metadata ---
45
- @lru_cache(maxsize=256)
46
  def fetch_page_metadata(url):
47
  try:
48
- resp = session.get(url, timeout=5)
49
  resp.raise_for_status()
50
- soup = BeautifulSoup(resp.text, "html.parser")
51
- title = soup.title.string.strip() if soup.title and soup.title.string else url
52
- meta = {"url": url, "title": title}
 
53
  for tag in soup.find_all("meta"):
54
  prop = tag.get("property") or tag.get("name")
55
  if prop and prop.startswith(("og:", "twitter:")):
@@ -58,103 +53,38 @@ def fetch_page_metadata(url):
58
  except Exception as e:
59
  return {"url": url, "error": str(e)}
60
 
61
- # --- Fetch favicon ---
62
- @lru_cache(maxsize=256)
63
- def fetch_favicon(url):
64
- try:
65
- parsed = urlparse(url)
66
- domain = f"{parsed.scheme}://{parsed.netloc}"
67
- ico_url = f"{domain}/favicon.ico"
68
- resp = session.get(ico_url, timeout=3)
69
- resp.raise_for_status()
70
- return ico_url
71
- except:
72
- return None
73
-
74
- # --- Recursive origin tracing ---
75
- def trace_origins(description_html, max_depth=2):
76
- graph = nx.DiGraph()
77
- # extract seeds from anchor tags
78
- soup_desc = BeautifulSoup(description_html or "", "html.parser")
79
- seeds = [a["href"] for a in soup_desc.find_all("a", href=True) if a["href"].startswith("http")]
80
- # include any URLs in text
81
- text = soup_desc.get_text(separator=' ')
82
- seeds += re.findall(r"https?://[^\s\"']+", text)
83
- seen = set()
84
- def recurse(url, depth):
85
- if depth > max_depth or url in seen:
86
- return
87
- seen.add(url)
88
- info = fetch_page_metadata(url)
89
- favicon = fetch_favicon(url)
90
- graph.add_node(url, title=info.get("title"), favicon=favicon)
91
- next_links = []
92
- if info.get("og:url"):
93
- next_links.append(info["og:url"])
94
- else:
95
- try:
96
- page = session.get(url, timeout=5).text
97
- soup = BeautifulSoup(page, "html.parser")
98
- for a in soup.find_all("a", href=True):
99
- href = a["href"].strip()
100
- if href.startswith("http"):
101
- next_links.append(href)
102
- except:
103
- pass
104
- for link in set(next_links):
105
- graph.add_edge(link, url)
106
- recurse(link, depth + 1)
107
- for seed in seeds:
108
- recurse(seed, 1)
109
- return graph
110
-
111
- # --- Build PyVis graph HTML ---
112
- def build_graph_html(graph):
113
- net = Network(height="500px", width="100%", directed=True, notebook=False)
114
- for url, data in graph.nodes(data=True):
115
- if data.get("favicon"):
116
- net.add_node(url, label=data.get("title") or url, title=url, shape="image", image=data["favicon"])
117
- else:
118
- net.add_node(url, label=data.get("title") or url, title=url)
119
- for src, dst in graph.edges():
120
- net.add_edge(src, dst)
121
- return net.generate_html()
122
-
123
- # --- Search IA videos ---
124
  def fetch_clean_videos(keywords, api_key, scan_enabled):
125
- terms = [kw.strip() for kw in keywords.split(",")]
126
- query = " OR ".join(term.replace(" ", "+") for term in terms)
127
  ia_query = f"mediatype:(movies) AND ({query})"
128
- items = list(search_items(ia_query))[:20]
129
- urls = []
130
- for res in items:
131
- identifier = res.get("identifier")
132
- if not identifier:
133
- continue
134
- try:
135
- item = get_item(identifier)
136
- except Exception:
137
- continue
138
  for f in item.files:
139
  fmt = f.get("format", "").lower()
140
  if fmt.startswith(("mpeg","mp4","avi","mov","webm","m4v")):
141
- video_url = f"https://archive.org/download/{identifier}/{f.get('name')}"
142
  if scan_enabled and api_key:
143
  try:
144
- if not scan_url_vt(video_url, api_key):
145
- continue
146
- except:
147
  continue
148
- urls.append(video_url)
149
- return urls
150
-
151
- # --- Gradio UI ---
152
- with gr.Blocks(theme=THEME) as demo:
153
- gr.Markdown("# 📼 IA Drone‑Strike Explorer — Enhanced Metadata & Origins")
 
 
 
154
  with gr.Row():
155
  kw_input = gr.Textbox(label="Search keywords", value="drone strike, military uav")
156
  vt_key_input = gr.Textbox(label="VirusTotal API Key", type="password")
157
- scan_toggle = gr.Checkbox(label="Enable VT scan", value=False)
158
  ffprobe_toggle = gr.Checkbox(label="Enable FFprobe metadata", value=False)
159
  run_btn = gr.Button("Search & Scan")
160
 
@@ -162,40 +92,57 @@ with gr.Blocks(theme=THEME) as demo:
162
  video_player = gr.Video(label="Video Player")
163
  ia_meta_json = gr.JSON(label="► Raw IA Metadata")
164
  ffprobe_json = gr.JSON(label="► FFprobe Metadata")
165
- origins_graph = gr.HTML(label="► Source‑Origin Graph")
166
-
167
- executor = ThreadPoolExecutor(max_workers=5)
168
 
169
  def search_and_populate(keywords, api_key, scan_enabled):
170
  urls = fetch_clean_videos(keywords, api_key, scan_enabled)
171
  return gr.update(choices=urls, value=urls[0] if urls else None)
172
 
173
  def update_all(selected_url, ff_on, api_key):
 
174
  if not selected_url:
175
- return None, {}, {}, "<p>No data.</p>"
176
- parsed = urlparse(selected_url)
177
- parts = parsed.path.strip("/").split("/")
178
- identifier = parts[1] if len(parts) > 1 else None
179
- try:
180
- item = get_item(identifier)
181
- raw_ia = {
182
- "metadata": item.metadata,
183
- "files": [
184
- {"name": f.get("name"), "format": f.get("format"), "size": f.get("size")} for f in item.files
 
 
 
 
 
 
 
 
 
185
  ]
186
- }
187
- except Exception as e:
188
- raw_ia = {"error": f"Could not fetch IA metadata: {e}"}
 
189
  ff_md = {}
190
  if ff_on:
191
  try:
192
  ff_md = extract_ffprobe_metadata(selected_url)
193
  except Exception as e:
194
  ff_md = {"error": str(e)}
195
- desc_html = raw_ia.get("metadata", {}).get("description", "")
196
- graph = trace_origins(desc_html, max_depth=2)
197
- graph_html = build_graph_html(graph) if graph.nodes else "<p>No origins found.</p>"
198
- return selected_url, raw_ia, ff_md, graph_html
 
 
 
 
 
 
 
 
199
 
200
  run_btn.click(
201
  fn=search_and_populate,
@@ -205,8 +152,8 @@ with gr.Blocks(theme=THEME) as demo:
205
  url_dropdown.change(
206
  fn=update_all,
207
  inputs=[url_dropdown, ffprobe_toggle, vt_key_input],
208
- outputs=[video_player, ia_meta_json, ffprobe_json, origins_graph]
209
  )
210
 
211
  if __name__ == "__main__":
212
- demo.launch(server_port=7860, share=False)
 
6
  import json
7
  import re
8
  from bs4 import BeautifulSoup
 
 
 
 
 
9
 
10
+ # --- VirusTotal helper functions ---
 
 
 
 
 
 
 
 
 
11
  def scan_url_vt(url, api_key):
12
  headers = {"x-apikey": api_key}
13
+ resp = requests.post(
14
+ "https://www.virustotal.com/api/v3/urls", headers=headers, data={"url": url}
15
+ )
16
  resp.raise_for_status()
17
  analysis_id = resp.json()["data"]["id"]
18
  while True:
19
  time.sleep(5)
20
+ status_resp = requests.get(
21
+ f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers
22
+ )
23
+ status_resp.raise_for_status()
24
+ attr = status_resp.json()["data"]["attributes"]
25
  if attr.get("status") == "completed":
26
+ stats = attr.get("stats", {})
27
+ return stats.get("malicious", 0) == 0
28
 
29
  # --- FFprobe metadata extraction ---
30
  def extract_ffprobe_metadata(url_or_path):
31
+ cmd = [
32
+ "ffprobe", "-v", "error", "-print_format", "json",
33
+ "-show_format", "-show_streams",
34
+ url_or_path
35
+ ]
36
  out = subprocess.check_output(cmd)
37
  return json.loads(out)
38
 
39
+ # --- Scrape basic page metadata (title + og: tags) ---
 
40
  def fetch_page_metadata(url):
41
  try:
42
+ resp = requests.get(url, timeout=5)
43
  resp.raise_for_status()
44
+ html = resp.text
45
+ soup = BeautifulSoup(html, "html.parser")
46
+ meta = {"url": url, "title": soup.title.string if soup.title else None}
47
+ # grab OpenGraph tags
48
  for tag in soup.find_all("meta"):
49
  prop = tag.get("property") or tag.get("name")
50
  if prop and prop.startswith(("og:", "twitter:")):
 
53
  except Exception as e:
54
  return {"url": url, "error": str(e)}
55
 
56
+ # --- Core search & scan logic ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  def fetch_clean_videos(keywords, api_key, scan_enabled):
58
+ query = " OR ".join([f"{kw.strip().replace(' ', '+')}" for kw in keywords.split(",")])
 
59
  ia_query = f"mediatype:(movies) AND ({query})"
60
+ results = list(search_items(ia_query))[:50]
61
+
62
+ clean_urls = []
63
+ for res in results:
64
+ identifier = res["identifier"]
65
+ item = get_item(identifier)
 
 
 
 
66
  for f in item.files:
67
  fmt = f.get("format", "").lower()
68
  if fmt.startswith(("mpeg","mp4","avi","mov","webm","m4v")):
69
+ url = f"https://archive.org/download/{identifier}/{f['name']}"
70
  if scan_enabled and api_key:
71
  try:
72
+ is_clean = scan_url_vt(url, api_key)
73
+ except Exception:
 
74
  continue
75
+ else:
76
+ is_clean = True
77
+ if is_clean:
78
+ clean_urls.append(url)
79
+ return clean_urls
80
+
81
+ # --- Gradio UI setup ---
82
+ with gr.Blocks() as demo:
83
+ gr.Markdown("# 📼 IA Drone‑Strike Explorer \nEnable VT scan, FFprobe & Origin Tracing")
84
  with gr.Row():
85
  kw_input = gr.Textbox(label="Search keywords", value="drone strike, military uav")
86
  vt_key_input = gr.Textbox(label="VirusTotal API Key", type="password")
87
+ scan_toggle = gr.Checkbox(label="Enable VT scan", value=True)
88
  ffprobe_toggle = gr.Checkbox(label="Enable FFprobe metadata", value=False)
89
  run_btn = gr.Button("Search & Scan")
90
 
 
92
  video_player = gr.Video(label="Video Player")
93
  ia_meta_json = gr.JSON(label="► Raw IA Metadata")
94
  ffprobe_json = gr.JSON(label="► FFprobe Metadata")
95
+ origins_json = gr.JSON(label="► Source‑Origin Metadata")
 
 
96
 
97
  def search_and_populate(keywords, api_key, scan_enabled):
98
  urls = fetch_clean_videos(keywords, api_key, scan_enabled)
99
  return gr.update(choices=urls, value=urls[0] if urls else None)
100
 
101
  def update_all(selected_url, ff_on, api_key):
102
+ # no selection guard
103
  if not selected_url:
104
+ return None, {}, {}, []
105
+
106
+ # 1) IA metadata + file list
107
+ parts = selected_url.split("/")
108
+ identifier = parts[4] if len(parts) > 4 else None
109
+ raw_ia = {"identifier": identifier, "metadata": {}, "files": []}
110
+ if identifier:
111
+ try:
112
+ item = get_item(identifier)
113
+ raw_ia["metadata"] = item.metadata
114
+ raw_ia["files"] = [
115
+ {
116
+ "name": f.get("name"),
117
+ "format": f.get("format"),
118
+ "size": f.get("size"),
119
+ "md5": f.get("md5"),
120
+ **{k: v for k,v in f.items() if k not in ("name","format","size","md5")}
121
+ }
122
+ for f in item.files
123
  ]
124
+ except Exception:
125
+ raw_ia["error"] = "could not fetch IA metadata"
126
+
127
+ # 2) FFprobe metadata if toggled
128
  ff_md = {}
129
  if ff_on:
130
  try:
131
  ff_md = extract_ffprobe_metadata(selected_url)
132
  except Exception as e:
133
  ff_md = {"error": str(e)}
134
+
135
+ # 3) Origin tracing: scrape each URL in description
136
+ origins = []
137
+ desc = raw_ia["metadata"].get("description", "")
138
+ urls_found = re.findall(r'https?://[^\s"<]+', desc)
139
+ for url in urls_found:
140
+ meta = fetch_page_metadata(url)
141
+ origins.append(meta)
142
+ # stop at first “real” origin (you can remove this break to collect all)
143
+ break
144
+
145
+ return selected_url, raw_ia, ff_md, origins
146
 
147
  run_btn.click(
148
  fn=search_and_populate,
 
152
  url_dropdown.change(
153
  fn=update_all,
154
  inputs=[url_dropdown, ffprobe_toggle, vt_key_input],
155
+ outputs=[video_player, ia_meta_json, ffprobe_json, origins_json]
156
  )
157
 
158
  if __name__ == "__main__":
159
+ demo.launch()