wuhp commited on
Commit
49b172d
·
verified ·
1 Parent(s): 7cee5ed

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +122 -99
app.py CHANGED
@@ -6,153 +6,176 @@ import subprocess
6
  import json
7
  import re
8
  from bs4 import BeautifulSoup
 
 
 
 
 
 
 
 
 
 
 
9
 
10
- # --- VirusTotal helper functions ---
11
  def scan_url_vt(url, api_key):
 
12
  headers = {"x-apikey": api_key}
13
- resp = requests.post(
14
- "https://www.virustotal.com/api/v3/urls", headers=headers, data={"url": url}
15
- )
16
  resp.raise_for_status()
17
  analysis_id = resp.json()["data"]["id"]
 
18
  while True:
19
  time.sleep(5)
20
- status_resp = requests.get(
21
- f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers
22
- )
23
- status_resp.raise_for_status()
24
- attr = status_resp.json()["data"]["attributes"]
25
  if attr.get("status") == "completed":
26
- stats = attr.get("stats", {})
27
- return stats.get("malicious", 0) == 0
28
 
29
- # --- FFprobe metadata extraction ---
30
  def extract_ffprobe_metadata(url_or_path):
 
31
  cmd = [
32
  "ffprobe", "-v", "error", "-print_format", "json",
33
- "-show_format", "-show_streams",
34
- url_or_path
35
  ]
36
  out = subprocess.check_output(cmd)
37
  return json.loads(out)
38
 
39
- # --- Scrape basic page metadata (title + og: tags) ---
40
  def fetch_page_metadata(url):
 
41
  try:
42
- resp = requests.get(url, timeout=5)
43
- resp.raise_for_status()
44
- html = resp.text
45
- soup = BeautifulSoup(html, "html.parser")
46
- meta = {"url": url, "title": soup.title.string if soup.title else None}
47
- # grab OpenGraph tags
 
48
  for tag in soup.find_all("meta"):
49
  prop = tag.get("property") or tag.get("name")
50
  if prop and prop.startswith(("og:", "twitter:")):
51
- meta[prop] = tag.get("content")
52
- return meta
53
  except Exception as e:
54
  return {"url": url, "error": str(e)}
55
 
56
- # --- Core search & scan logic ---
57
  def fetch_clean_videos(keywords, api_key, scan_enabled):
58
- query = " OR ".join([f"{kw.strip().replace(' ', '+')}" for kw in keywords.split(",")])
59
- ia_query = f"mediatype:(movies) AND ({query})"
 
60
  results = list(search_items(ia_query))[:50]
61
 
62
- clean_urls = []
63
- for res in results:
64
- identifier = res["identifier"]
65
- item = get_item(identifier)
66
  for f in item.files:
67
  fmt = f.get("format", "").lower()
68
- if fmt.startswith(("mpeg","mp4","avi","mov","webm","m4v")):
69
- url = f"https://archive.org/download/{identifier}/{f['name']}"
70
  if scan_enabled and api_key:
71
  try:
72
- is_clean = scan_url_vt(url, api_key)
73
- except Exception:
 
74
  continue
75
- else:
76
- is_clean = True
77
- if is_clean:
78
- clean_urls.append(url)
79
- return clean_urls
80
-
81
- # --- Gradio UI setup ---
82
- with gr.Blocks() as demo:
83
- gr.Markdown("# 📼 IA Drone‑Strike Explorer \nEnable VT scan, FFprobe & Origin Tracing")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  with gr.Row():
85
- kw_input = gr.Textbox(label="Search keywords", value="drone strike, military uav")
86
- vt_key_input = gr.Textbox(label="VirusTotal API Key", type="password")
87
- scan_toggle = gr.Checkbox(label="Enable VT scan", value=True)
88
- ffprobe_toggle = gr.Checkbox(label="Enable FFprobe metadata", value=False)
89
- run_btn = gr.Button("Search & Scan")
90
-
91
- url_dropdown = gr.Dropdown(label="Clean Video URLs", choices=[], interactive=True)
92
- video_player = gr.Video(label="Video Player")
93
- ia_meta_json = gr.JSON(label=" Raw IA Metadata")
94
- ffprobe_json = gr.JSON(label="► FFprobe Metadata")
95
- origins_json = gr.JSON(label="► Source‑Origin Metadata")
 
 
 
 
 
 
 
 
96
 
97
  def search_and_populate(keywords, api_key, scan_enabled):
98
  urls = fetch_clean_videos(keywords, api_key, scan_enabled)
99
  return gr.update(choices=urls, value=urls[0] if urls else None)
100
 
101
- def update_all(selected_url, ff_on, api_key):
102
- # no selection guard
103
- if not selected_url:
104
- return None, {}, {}, []
105
 
106
- # 1) IA metadata + file list
107
- parts = selected_url.split("/")
108
- identifier = parts[4] if len(parts) > 4 else None
109
- raw_ia = {"identifier": identifier, "metadata": {}, "files": []}
110
- if identifier:
111
- try:
112
- item = get_item(identifier)
113
- raw_ia["metadata"] = item.metadata
114
- raw_ia["files"] = [
115
- {
116
- "name": f.get("name"),
117
- "format": f.get("format"),
118
- "size": f.get("size"),
119
- "md5": f.get("md5"),
120
- **{k: v for k,v in f.items() if k not in ("name","format","size","md5")}
121
- }
122
- for f in item.files
123
- ]
124
- except Exception:
125
- raw_ia["error"] = "could not fetch IA metadata"
126
-
127
- # 2) FFprobe metadata if toggled
128
- ff_md = {}
129
  if ff_on:
130
  try:
131
- ff_md = extract_ffprobe_metadata(selected_url)
132
  except Exception as e:
133
- ff_md = {"error": str(e)}
134
 
135
- # 3) Origin tracing: scrape each URL in description
136
- origins = []
137
- desc = raw_ia["metadata"].get("description", "")
138
- urls_found = re.findall(r'https?://[^\s"<]+', desc)
139
- for url in urls_found:
140
- meta = fetch_page_metadata(url)
141
- origins.append(meta)
142
- # stop at first “real” origin (you can remove this break to collect all)
143
- break
144
 
145
- return selected_url, raw_ia, ff_md, origins
 
 
146
 
147
- run_btn.click(
148
  fn=search_and_populate,
149
- inputs=[kw_input, vt_key_input, scan_toggle],
150
- outputs=[url_dropdown]
151
  )
152
- url_dropdown.change(
153
  fn=update_all,
154
- inputs=[url_dropdown, ffprobe_toggle, vt_key_input],
155
- outputs=[video_player, ia_meta_json, ffprobe_json, origins_json]
156
  )
157
 
158
  if __name__ == "__main__":
 
6
  import json
7
  import re
8
  from bs4 import BeautifulSoup
9
+ from urllib.parse import urlparse
10
+ import tempfile
11
+ from pyvis.network import Network
12
+
13
+ THEME = "gradio/soft"
14
+
15
+ # --- Shared HTTP session for speed & headers ---
16
+ session = requests.Session()
17
+ session.headers.update({
18
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
19
+ })
20
 
 
21
  def scan_url_vt(url, api_key):
22
+ """Return True if VirusTotal sees no malicious hits."""
23
  headers = {"x-apikey": api_key}
24
+ resp = session.post("https://www.virustotal.com/api/v3/urls",
25
+ headers=headers, data={"url": url})
 
26
  resp.raise_for_status()
27
  analysis_id = resp.json()["data"]["id"]
28
+ # Poll until complete
29
  while True:
30
  time.sleep(5)
31
+ st = session.get(f"https://www.virustotal.com/api/v3/analyses/{analysis_id}",
32
+ headers=headers)
33
+ st.raise_for_status()
34
+ attr = st.json()["data"]["attributes"]
 
35
  if attr.get("status") == "completed":
36
+ return attr.get("stats", {}).get("malicious", 0) == 0
 
37
 
 
38
  def extract_ffprobe_metadata(url_or_path):
39
+ """Run ffprobe and parse its JSON output."""
40
  cmd = [
41
  "ffprobe", "-v", "error", "-print_format", "json",
42
+ "-show_format", "-show_streams", url_or_path
 
43
  ]
44
  out = subprocess.check_output(cmd)
45
  return json.loads(out)
46
 
 
47
  def fetch_page_metadata(url):
48
+ """Grab <title>, og: and twitter: meta tags from any page."""
49
  try:
50
+ r = session.get(url, timeout=5)
51
+ r.raise_for_status()
52
+ soup = BeautifulSoup(r.text, "html.parser")
53
+ data = {
54
+ "url": url,
55
+ "title": soup.title.string if soup.title else None
56
+ }
57
  for tag in soup.find_all("meta"):
58
  prop = tag.get("property") or tag.get("name")
59
  if prop and prop.startswith(("og:", "twitter:")):
60
+ data[prop] = tag.get("content")
61
+ return data
62
  except Exception as e:
63
  return {"url": url, "error": str(e)}
64
 
 
65
  def fetch_clean_videos(keywords, api_key, scan_enabled):
66
+ """Search IA, filter for common video formats, optional VT scan."""
67
+ terms = [kw.strip().replace(" ", "+") for kw in keywords.split(",")]
68
+ ia_query = f"mediatype:(movies) AND ({' OR '.join(terms)})"
69
  results = list(search_items(ia_query))[:50]
70
 
71
+ clean = []
72
+ for r in results:
73
+ ident = r["identifier"]
74
+ item = get_item(ident)
75
  for f in item.files:
76
  fmt = f.get("format", "").lower()
77
+ if fmt.startswith(("mp4", "avi", "mov", "webm", "m4v")):
78
+ url = f"https://archive.org/download/{ident}/{f['name']}"
79
  if scan_enabled and api_key:
80
  try:
81
+ if not scan_url_vt(url, api_key):
82
+ continue
83
+ except:
84
  continue
85
+ clean.append(url)
86
+ return clean
87
+
88
+ def get_favicon_url(page_url):
89
+ """Assume https://domain/favicon.ico exists."""
90
+ dom = urlparse(page_url).netloc
91
+ return f"https://{dom}/favicon.ico"
92
+
93
+ def build_origin_graph(origins):
94
+ """Render a PyVis network; return its full HTML."""
95
+ net = Network(height="400px", width="100%", directed=True)
96
+ for i, m in enumerate(origins):
97
+ fav = get_favicon_url(m["url"])
98
+ label = urlparse(m["url"]).netloc
99
+ title = json.dumps(m, indent=2)
100
+ net.add_node(i, label=label, title=title,
101
+ shape="image", image=fav)
102
+ if i > 0:
103
+ net.add_edge(i - 1, i)
104
+ tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".html").name
105
+ net.save_graph(tmp)
106
+ return open(tmp, encoding="utf-8").read()
107
+
108
+ with gr.Blocks(theme=THEME) as demo:
109
+ gr.Markdown("## 📼 IA Drone-Strike Explorer")
110
  with gr.Row():
111
+ kw = gr.Textbox(label="Search keywords",
112
+ value="drone strike, military uav")
113
+ vt_key = gr.Textbox(label="VirusTotal API Key", type="password")
114
+ scan = gr.Checkbox(label="Enable VT scan", value=True)
115
+ ff = gr.Checkbox(label="Enable FFprobe metadata", value=False)
116
+ btn = gr.Button("Search & Scan")
117
+
118
+ dropdown = gr.Dropdown(label="Clean Video URLs", choices=[])
119
+ graph_html = gr.HTML("<p>No origin graph yet.</p>")
120
+ video_player = gr.Video()
121
+
122
+ with gr.Tabs():
123
+ with gr.TabItem("IA Metadata"):
124
+ ia_json = gr.JSON()
125
+ with gr.TabItem("FFprobe"):
126
+ ff_json = gr.JSON()
127
+ with gr.TabItem("Origins"):
128
+ graph_html
129
+ orig_json = gr.JSON()
130
 
131
  def search_and_populate(keywords, api_key, scan_enabled):
132
  urls = fetch_clean_videos(keywords, api_key, scan_enabled)
133
  return gr.update(choices=urls, value=urls[0] if urls else None)
134
 
135
+ def update_all(url_sel, ff_on, api_key):
136
+ if not url_sel:
137
+ return None, {}, {}, "<p>No origin graph.</p>", []
 
138
 
139
+ # --- IA metadata for just this file ---
140
+ parts = url_sel.split("/")
141
+ ident = parts[4]
142
+ fn = parts[-1]
143
+ ia_data = {"identifier": ident, "file": {}}
144
+ try:
145
+ item = get_item(ident)
146
+ ia_data["file"] = next(
147
+ {**{k: item.metadata.get(k)}, **f}
148
+ for f in item.files if f["name"] == fn
149
+ )
150
+ except Exception as e:
151
+ ia_data["error"] = str(e)
152
+
153
+ # --- FFprobe if requested ---
154
+ ff_data = {}
 
 
 
 
 
 
 
155
  if ff_on:
156
  try:
157
+ ff_data = extract_ffprobe_metadata(url_sel)
158
  except Exception as e:
159
+ ff_data = {"error": str(e)}
160
 
161
+ # --- Origin tracing: all links in description ---
162
+ desc = item.metadata.get("description", "") or ""
163
+ found = re.findall(r"https?://[^\s\"'<]+", desc)
164
+ origins = [fetch_page_metadata(u) for u in found]
 
 
 
 
 
165
 
166
+ # --- Build the interactive graph HTML ---
167
+ graph = build_origin_graph(origins) if origins else "<p>No origins.</p>"
168
+ return url_sel, ia_data, ff_data, graph, origins
169
 
170
+ btn.click(
171
  fn=search_and_populate,
172
+ inputs=[kw, vt_key, scan],
173
+ outputs=[dropdown]
174
  )
175
+ dropdown.change(
176
  fn=update_all,
177
+ inputs=[dropdown, ff, vt_key],
178
+ outputs=[video_player, ia_json, ff_json, graph_html, orig_json]
179
  )
180
 
181
  if __name__ == "__main__":