wuhp commited on
Commit
d4356c2
·
verified ·
1 Parent(s): ffdb027

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +106 -70
app.py CHANGED
@@ -6,27 +6,35 @@ import subprocess
6
  import json
7
  import re
8
  from bs4 import BeautifulSoup
 
 
 
 
 
 
 
 
 
 
9
 
10
- # --- VirusTotal helper functions ---
11
  def scan_url_vt(url, api_key):
12
  headers = {"x-apikey": api_key}
13
- resp = requests.post(
14
  "https://www.virustotal.com/api/v3/urls", headers=headers, data={"url": url}
15
  )
16
  resp.raise_for_status()
17
  analysis_id = resp.json()["data"]["id"]
 
18
  while True:
19
  time.sleep(5)
20
- status_resp = requests.get(
21
- f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers
22
- )
23
- status_resp.raise_for_status()
24
- attr = status_resp.json()["data"]["attributes"]
25
  if attr.get("status") == "completed":
26
- stats = attr.get("stats", {})
27
- return stats.get("malicious", 0) == 0
28
 
29
- # --- FFprobe metadata extraction ---
30
  def extract_ffprobe_metadata(url_or_path):
31
  cmd = [
32
  "ffprobe", "-v", "error", "-print_format", "json",
@@ -36,15 +44,15 @@ def extract_ffprobe_metadata(url_or_path):
36
  out = subprocess.check_output(cmd)
37
  return json.loads(out)
38
 
39
- # --- Scrape basic page metadata (title + og: tags) ---
 
40
  def fetch_page_metadata(url):
41
  try:
42
- resp = requests.get(url, timeout=5)
43
  resp.raise_for_status()
44
- html = resp.text
45
- soup = BeautifulSoup(html, "html.parser")
46
  meta = {"url": url, "title": soup.title.string if soup.title else None}
47
- # grab OpenGraph tags
48
  for tag in soup.find_all("meta"):
49
  prop = tag.get("property") or tag.get("name")
50
  if prop and prop.startswith(("og:", "twitter:")):
@@ -53,38 +61,86 @@ def fetch_page_metadata(url):
53
  except Exception as e:
54
  return {"url": url, "error": str(e)}
55
 
56
- # --- Core search & scan logic ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  def fetch_clean_videos(keywords, api_key, scan_enabled):
58
  query = " OR ".join([f"{kw.strip().replace(' ', '+')}" for kw in keywords.split(",")])
59
  ia_query = f"mediatype:(movies) AND ({query})"
60
- results = list(search_items(ia_query))[:50]
61
-
62
  clean_urls = []
63
  for res in results:
64
- identifier = res["identifier"]
65
  item = get_item(identifier)
66
  for f in item.files:
67
- fmt = f.get("format", "").lower()
68
- if fmt.startswith(("mpeg","mp4","avi","mov","webm","m4v")):
69
  url = f"https://archive.org/download/{identifier}/{f['name']}"
70
  if scan_enabled and api_key:
71
  try:
72
- is_clean = scan_url_vt(url, api_key)
73
- except Exception:
 
74
  continue
75
- else:
76
- is_clean = True
77
- if is_clean:
78
- clean_urls.append(url)
79
  return clean_urls
80
 
81
- # --- Gradio UI setup ---
82
  with gr.Blocks() as demo:
83
- gr.Markdown("# 📼 IA Drone‑Strike Explorer \nEnable VT scan, FFprobe & Origin Tracing")
84
  with gr.Row():
85
  kw_input = gr.Textbox(label="Search keywords", value="drone strike, military uav")
86
  vt_key_input = gr.Textbox(label="VirusTotal API Key", type="password")
87
- scan_toggle = gr.Checkbox(label="Enable VT scan", value=True)
88
  ffprobe_toggle = gr.Checkbox(label="Enable FFprobe metadata", value=False)
89
  run_btn = gr.Button("Search & Scan")
90
 
@@ -92,57 +148,37 @@ with gr.Blocks() as demo:
92
  video_player = gr.Video(label="Video Player")
93
  ia_meta_json = gr.JSON(label="► Raw IA Metadata")
94
  ffprobe_json = gr.JSON(label="► FFprobe Metadata")
95
- origins_json = gr.JSON(label="► Source‑Origin Metadata")
 
 
96
 
97
  def search_and_populate(keywords, api_key, scan_enabled):
98
  urls = fetch_clean_videos(keywords, api_key, scan_enabled)
99
  return gr.update(choices=urls, value=urls[0] if urls else None)
100
 
101
  def update_all(selected_url, ff_on, api_key):
102
- # no selection guard
103
  if not selected_url:
104
- return None, {}, {}, []
105
-
106
- # 1) IA metadata + file list
107
- parts = selected_url.split("/")
108
- identifier = parts[4] if len(parts) > 4 else None
109
- raw_ia = {"identifier": identifier, "metadata": {}, "files": []}
110
- if identifier:
111
- try:
112
- item = get_item(identifier)
113
- raw_ia["metadata"] = item.metadata
114
- raw_ia["files"] = [
115
- {
116
- "name": f.get("name"),
117
- "format": f.get("format"),
118
- "size": f.get("size"),
119
- "md5": f.get("md5"),
120
- **{k: v for k,v in f.items() if k not in ("name","format","size","md5")}
121
- }
122
- for f in item.files
123
- ]
124
- except Exception:
125
- raw_ia["error"] = "could not fetch IA metadata"
126
-
127
- # 2) FFprobe metadata if toggled
128
  ff_md = {}
129
  if ff_on:
130
  try:
131
  ff_md = extract_ffprobe_metadata(selected_url)
132
  except Exception as e:
133
  ff_md = {"error": str(e)}
134
-
135
- # 3) Origin tracing: scrape each URL in description
136
- origins = []
137
- desc = raw_ia["metadata"].get("description", "")
138
- urls_found = re.findall(r'https?://[^\s"<]+', desc)
139
- for url in urls_found:
140
- meta = fetch_page_metadata(url)
141
- origins.append(meta)
142
- # stop at first “real” origin (you can remove this break to collect all)
143
- break
144
-
145
- return selected_url, raw_ia, ff_md, origins
146
 
147
  run_btn.click(
148
  fn=search_and_populate,
@@ -152,8 +188,8 @@ with gr.Blocks() as demo:
152
  url_dropdown.change(
153
  fn=update_all,
154
  inputs=[url_dropdown, ffprobe_toggle, vt_key_input],
155
- outputs=[video_player, ia_meta_json, ffprobe_json, origins_json]
156
  )
157
 
158
  if __name__ == "__main__":
159
- demo.launch()
 
6
  import json
7
  import re
8
  from bs4 import BeautifulSoup
9
+ from concurrent.futures import ThreadPoolExecutor, as_completed
10
+ from functools import lru_cache
11
+ import networkx as nx
12
+ from pyvis.network import Network
13
+ from urllib.parse import urlparse
14
+ \# --- Shared HTTP session for speed & headers ---
15
+ session = requests.Session()
16
+ session.headers.update({
17
+ "User-Agent": "Mozilla/5.0 (compatible; IA-Video-Meta-Explorer/1.0)"
18
+ })
19
 
20
+ \# --- VirusTotal helper (optional) ---
21
  def scan_url_vt(url, api_key):
22
  headers = {"x-apikey": api_key}
23
+ resp = session.post(
24
  "https://www.virustotal.com/api/v3/urls", headers=headers, data={"url": url}
25
  )
26
  resp.raise_for_status()
27
  analysis_id = resp.json()["data"]["id"]
28
+ # Poll until complete
29
  while True:
30
  time.sleep(5)
31
+ st = session.get(f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers)
32
+ st.raise_for_status()
33
+ attr = st.json()["data"]["attributes"]
 
 
34
  if attr.get("status") == "completed":
35
+ return attr.get("stats", {}).get("malicious", 0) == 0
 
36
 
37
+ \# --- FFprobe metadata extraction ---
38
  def extract_ffprobe_metadata(url_or_path):
39
  cmd = [
40
  "ffprobe", "-v", "error", "-print_format", "json",
 
44
  out = subprocess.check_output(cmd)
45
  return json.loads(out)
46
 
47
+ \# --- Caching page metadata ---
48
+ @lru_cache(maxsize=256)
49
  def fetch_page_metadata(url):
50
  try:
51
+ resp = session.get(url, timeout=5)
52
  resp.raise_for_status()
53
+ soup = BeautifulSoup(resp.text, "html.parser")
 
54
  meta = {"url": url, "title": soup.title.string if soup.title else None}
55
+ # OpenGraph & twitter
56
  for tag in soup.find_all("meta"):
57
  prop = tag.get("property") or tag.get("name")
58
  if prop and prop.startswith(("og:", "twitter:")):
 
61
  except Exception as e:
62
  return {"url": url, "error": str(e)}
63
 
64
+ \# --- Fetch favicon for clickable graph nodes ---
65
+ @lru_cache(maxsize=256)
66
+ def fetch_favicon(url):
67
+ try:
68
+ domain = urlparse(url).scheme + "://" + urlparse(url).netloc
69
+ ico_url = domain + "/favicon.ico"
70
+ resp = session.get(ico_url, timeout=3)
71
+ resp.raise_for_status()
72
+ return ico_url
73
+ except:
74
+ return None
75
+
76
+ \# --- Trace origins recursively up to a max depth ---
77
+ def trace_origins(description, max_depth=2, executor=None):
78
+ graph = nx.DiGraph()
79
+ def _recurse(url, depth):
80
+ if depth > max_depth or url in graph:
81
+ return
82
+ info = fetch_page_metadata(url)
83
+ favicon = fetch_favicon(url)
84
+ graph.add_node(url, title=info.get("title"), favicon=favicon)
85
+ # find OG:url or linked URLs on page as potential origins
86
+ links = []
87
+ if "og:url" in info:
88
+ links.append(info["og:url"])
89
+ else:
90
+ try:
91
+ soup = BeautifulSoup(session.get(url, timeout=5).text, "html.parser")
92
+ for a in soup.find_all("a", href=True):
93
+ if a["href"].startswith("http"):
94
+ links.append(a["href"])
95
+ except:
96
+ pass
97
+ for link in set(links):
98
+ graph.add_edge(link, url)
99
+ _recurse(link, depth + 1)
100
+ # initial URLs from IA description
101
+ seeds = re.findall(r'https?://[^\s"<]+', description)
102
+ for seed in seeds:
103
+ _recurse(seed, 1)
104
+ return graph
105
+
106
+ \# --- Build PyVis network HTML ---
107
+ def build_graph_html(graph):
108
+ net = Network(height="500px", width="100%", directed=True)
109
+ for url, data in graph.nodes(data=True):
110
+ net.add_node(url, label=data.get("title") or url, title=url, shape="image" if data.get("favicon") else "ellipse", image=data.get("favicon"))
111
+ for src, dst in graph.edges():
112
+ net.add_edge(src, dst)
113
+ return net.generate_html()
114
+
115
+ \# --- Fetch IA items (movies) ---
116
  def fetch_clean_videos(keywords, api_key, scan_enabled):
117
  query = " OR ".join([f"{kw.strip().replace(' ', '+')}" for kw in keywords.split(",")])
118
  ia_query = f"mediatype:(movies) AND ({query})"
119
+ results = list(search_items(ia_query))[:20]
 
120
  clean_urls = []
121
  for res in results:
122
+ identifier = res['identifier']
123
  item = get_item(identifier)
124
  for f in item.files:
125
+ fmt = f.get('format', '').lower()
126
+ if fmt.startswith(('mpeg','mp4','avi','mov','webm','m4v')):
127
  url = f"https://archive.org/download/{identifier}/{f['name']}"
128
  if scan_enabled and api_key:
129
  try:
130
+ if not scan_url_vt(url, api_key):
131
+ continue
132
+ except:
133
  continue
134
+ clean_urls.append(url)
 
 
 
135
  return clean_urls
136
 
137
+ \# --- Gradio UI ---
138
  with gr.Blocks() as demo:
139
+ gr.Markdown("# 📼 IA Drone‑Strike Explorer Enhanced Metadata & Origin Tracing")
140
  with gr.Row():
141
  kw_input = gr.Textbox(label="Search keywords", value="drone strike, military uav")
142
  vt_key_input = gr.Textbox(label="VirusTotal API Key", type="password")
143
+ scan_toggle = gr.Checkbox(label="Enable VT scan", value=False)
144
  ffprobe_toggle = gr.Checkbox(label="Enable FFprobe metadata", value=False)
145
  run_btn = gr.Button("Search & Scan")
146
 
 
148
  video_player = gr.Video(label="Video Player")
149
  ia_meta_json = gr.JSON(label="► Raw IA Metadata")
150
  ffprobe_json = gr.JSON(label="► FFprobe Metadata")
151
+ origins_graph = gr.HTML(label="► Source‑Origin Graph")
152
+
153
+ executor = ThreadPoolExecutor(max_workers=10)
154
 
155
  def search_and_populate(keywords, api_key, scan_enabled):
156
  urls = fetch_clean_videos(keywords, api_key, scan_enabled)
157
  return gr.update(choices=urls, value=urls[0] if urls else None)
158
 
159
  def update_all(selected_url, ff_on, api_key):
 
160
  if not selected_url:
161
+ return None, {}, {}, ""
162
+ identifier = selected_url.split("/")[4]
163
+ # 1) IA metadata
164
+ raw_ia = {}
165
+ try:
166
+ item = get_item(identifier)
167
+ raw_ia = {"metadata": item.metadata, "files": [dict(name=f.name, format=f.format, size=f.size) for f in item.files]}
168
+ except:
169
+ raw_ia = {"error": "Could not fetch IA metadata"}
170
+ # 2) FFprobe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
171
  ff_md = {}
172
  if ff_on:
173
  try:
174
  ff_md = extract_ffprobe_metadata(selected_url)
175
  except Exception as e:
176
  ff_md = {"error": str(e)}
177
+ # 3) Origins
178
+ desc = raw_ia.get("metadata", {}).get("description", "")
179
+ graph = trace_origins(desc, max_depth=2, executor=executor)
180
+ graph_html = build_graph_html(graph) if graph.nodes else "<p>No origins found.</p>"
181
+ return selected_url, raw_ia, ff_md, graph_html
 
 
 
 
 
 
 
182
 
183
  run_btn.click(
184
  fn=search_and_populate,
 
188
  url_dropdown.change(
189
  fn=update_all,
190
  inputs=[url_dropdown, ffprobe_toggle, vt_key_input],
191
+ outputs=[video_player, ia_meta_json, ffprobe_json, origins_graph]
192
  )
193
 
194
  if __name__ == "__main__":
195
+ demo.launch(server_port=7860, share=False)