niharika17032001 commited on
Commit
5ca8483
·
1 Parent(s): 0bba599

Create app.py

Browse files
Files changed (4) hide show
  1. app.py +13 -2
  2. crawler.py +326 -0
  3. main_script.py +95 -0
  4. metadata_extractor.py +110 -0
app.py CHANGED
@@ -6,12 +6,14 @@ from PIL import Image
6
  from selenium import webdriver
7
  from selenium.common.exceptions import WebDriverException
8
 
 
 
9
  driver = None
10
 
11
 
12
  def get_chrome_options():
13
  options = webdriver.ChromeOptions()
14
- options.add_argument('--headless')
15
  options.add_argument('--no-sandbox')
16
  options.add_argument('--disable-dev-shm-usage')
17
 
@@ -56,9 +58,18 @@ def take_screenshot(url):
56
 
57
  return images
58
 
 
 
 
 
 
 
 
 
 
59
 
60
  iface = gr.Interface(
61
- fn=take_screenshot,
62
  inputs=gr.Textbox(label="Website URL", value="https://www.google.com/"),
63
  outputs=gr.Gallery(label="Screenshots", columns=3, height="auto"),
64
  title="Website Screenshots",
 
6
  from selenium import webdriver
7
  from selenium.common.exceptions import WebDriverException
8
 
9
+ import main_script
10
+
11
  driver = None
12
 
13
 
14
  def get_chrome_options():
15
  options = webdriver.ChromeOptions()
16
+ # options.add_argument('--headless')
17
  options.add_argument('--no-sandbox')
18
  options.add_argument('--disable-dev-shm-usage')
19
 
 
58
 
59
  return images
60
 
61
+ def call_main_script():
62
+ main_script.main()
63
+
64
+ def main(url):
65
+ return call_main_script()
66
+ # return take_screenshot(url)
67
+
68
+
69
+
70
 
71
  iface = gr.Interface(
72
+ fn=main,
73
  inputs=gr.Textbox(label="Website URL", value="https://www.google.com/"),
74
  outputs=gr.Gallery(label="Screenshots", columns=3, height="auto"),
75
  title="Website Screenshots",
crawler.py ADDED
@@ -0,0 +1,326 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ from lxml import html
3
+ from collections import deque
4
+ import json
5
+ import time
6
+ import os
7
+
8
+ # --- Choose your Selenium setup ---
9
+ # OPTION A: Standard Selenium
10
+ from selenium import webdriver
11
+ from selenium.webdriver.chrome.service import Service
12
+
13
+ # OPTION B: undetected_chromedriver (Uncomment these if you want to use UC)
14
+ # import undetected_chromedriver as uc
15
+
16
+
17
+ from selenium.webdriver.common.by import By
18
+ from selenium.webdriver.support.ui import WebDriverWait
19
+ from selenium.webdriver.support import expected_conditions as EC
20
+ from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException
21
+
22
+
23
+ from PIL import Image
24
+ from io import BytesIO
25
+
26
+
27
+ def set_screenshot(driver, images=[]):
28
+ png = driver.get_screenshot_as_png()
29
+ image = Image.open(BytesIO(png))
30
+ images.append(image)
31
+ return images
32
+
33
+
34
+ def get_chrome_options():
35
+ options = webdriver.ChromeOptions()
36
+ # options.add_argument('--headless')
37
+ options.add_argument('--no-sandbox')
38
+ options.add_argument('--disable-dev-shm-usage')
39
+
40
+ return options
41
+
42
+
43
+ def set_driver():
44
+ options = get_chrome_options()
45
+
46
+ try:
47
+ web_driver = webdriver.Chrome(options=options)
48
+ web_driver.set_window_size(1080, 720) # Adjust the window size here
49
+ except WebDriverException as e:
50
+ return Image.new('RGB', (1, 1))
51
+
52
+ return web_driver
53
+
54
+
55
+
56
+
57
+
58
+
59
+
60
+ # --- Selenium setup functions (choose one based on your choice above) ---
61
+
62
+ # OPTION A: Standard Selenium (Use this if you prefer standard selenium)
63
+ # def get_chrome_options():
64
+ # options = webdriver.ChromeOptions()
65
+ # options.add_argument("--headless")
66
+ # options.add_argument("--no-sandbox")
67
+ # options.add_argument("--disable-gpu")
68
+ # options.add_argument("--disable-dev-shm-usage")
69
+ # options.add_argument("--window-size=1920,1080")
70
+ # options.add_argument(
71
+ # "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
72
+ # return options
73
+
74
+
75
+ def create_webdriver_instance(browser_type="chrome"):
76
+ if browser_type.lower() == "chrome":
77
+ chrome_options = get_chrome_options()
78
+ try:
79
+ # Assumes chromedriver is in PATH or specified path (e.g., /usr/bin/chromedriver on GitHub Actions)
80
+ service = Service(executable_path="/usr/bin/chromedriver")
81
+ driver = webdriver.Chrome(service=service, options=chrome_options)
82
+ return driver
83
+ except WebDriverException as e:
84
+ print(f"Error initializing ChromeDriver. Error: {e}")
85
+ return None
86
+ else:
87
+ raise ValueError("Unsupported browser type.")
88
+
89
+
90
+ # OPTION B: undetected_chromedriver (Uncomment this block and comment OPTION A if you want to use UC)
91
+ # def get_chrome_options():
92
+ # options = uc.ChromeOptions()
93
+ # options.add_argument("--headless")
94
+ # options.add_argument("--no-sandbox")
95
+ # options.add_argument("--disable-gpu")
96
+ # options.add_argument("--disable-dev-shm-usage")
97
+ # options.add_argument("--window-size=1920,1080")
98
+ # return options
99
+
100
+ # def create_webdriver_instance(browser_type="chrome"):
101
+ # if browser_type.lower() == "chrome":
102
+ # chrome_options = get_chrome_options()
103
+ # try:
104
+ # driver = uc.Chrome(options=chrome_options)
105
+ # return driver
106
+ # except WebDriverException as e:
107
+ # print(f"Error initializing undetected_chromedriver. Error: {e}")
108
+ # return None
109
+ # else:
110
+ # raise ValueError("Unsupported browser type.")
111
+
112
+
113
+ # --- Resumable Crawling Logic ---
114
+
115
+ def save_crawl_state(to_visit_deque, visited_set, song_urls_list, state_filename="crawl_state.json",
116
+ song_pages_json_file="pagalgana_song_pages.json"):
117
+ """Saves the current state of the crawler to JSON files."""
118
+ try:
119
+ with open(song_pages_json_file, 'w', encoding='utf-8') as f:
120
+ json.dump(song_urls_list, f, indent=4)
121
+
122
+ crawl_state_data = {
123
+ "to_visit": list(to_visit_deque),
124
+ "visited_urls": list(visited_set)
125
+ }
126
+ with open(state_filename, 'w', encoding='utf-8') as f:
127
+ json.dump(crawl_state_data, f, indent=4)
128
+ print(
129
+ f"--- Crawl state saved. URLs to visit: {len(to_visit_deque)}, Visited: {len(visited_set)}, Song pages found: {len(song_urls_list)} ---")
130
+ except IOError as e:
131
+ print(f"Error saving crawl state: {e}")
132
+ except Exception as e:
133
+ print(f"An unexpected error occurred while saving state: {e}")
134
+
135
+
136
+ def load_crawl_state(state_filename="crawl_state.json", song_pages_json_file="pagalgana_song_pages.json"):
137
+ """Loads previous crawl state if files exist."""
138
+ to_visit_deque = deque()
139
+ visited_set = set()
140
+ song_urls_list = []
141
+
142
+ if os.path.exists(song_pages_json_file):
143
+ try:
144
+ with open(song_pages_json_file, 'r', encoding='utf-8') as f:
145
+ song_urls_list = json.load(f)
146
+ print(f"Loaded {len(song_urls_list)} song URLs from '{song_pages_json_file}'.")
147
+ except json.JSONDecodeError:
148
+ print(f"Warning: '{song_pages_json_file}' corrupted or empty. Starting fresh song list.")
149
+ song_urls_list = []
150
+ except Exception as e:
151
+ print(f"Error loading '{song_pages_json_file}': {e}")
152
+
153
+ if os.path.exists(state_filename):
154
+ try:
155
+ with open(state_filename, 'r', encoding='utf-8') as f:
156
+ crawl_state_data = json.load(f)
157
+ to_visit_deque = deque(crawl_state_data.get("to_visit", []))
158
+ visited_set = set(crawl_state_data.get("visited_urls", []))
159
+ print(f"Loaded crawl state: {len(to_visit_deque)} URLs to visit, {len(visited_set)} visited.")
160
+ except json.JSONDecodeError:
161
+ print(f"Warning: '{state_filename}' corrupted or empty. Starting fresh state.")
162
+ to_visit_deque = deque()
163
+ visited_set = set()
164
+ except Exception as e:
165
+ print(f"Error loading '{state_filename}': {e}")
166
+
167
+ return to_visit_deque, visited_set, song_urls_list
168
+
169
+
170
+ def crawl_pagalgana_site(base_url: str, song_pages_json_file: str, max_crawl_depth: int, state_filename: str,
171
+ save_interval: int,images):
172
+ """
173
+ Crawls Pagalgana.com to find and save song page URLs.
174
+ Supports resuming a crawl.
175
+ """
176
+ # driver = create_webdriver_instance()
177
+ driver = set_driver()
178
+ if not driver:
179
+ print("Failed to initialize WebDriver. Exiting.")
180
+ return [] # Return empty list if WebDriver fails
181
+
182
+ to_visit, visited_urls, song_page_urls = load_crawl_state(state_filename, song_pages_json_file)
183
+
184
+ if not to_visit and not visited_urls:
185
+ print("No previous crawl state found. Starting fresh.")
186
+ to_visit.append((base_url, 0))
187
+ else:
188
+ print("Resuming crawl from previous state.")
189
+ if base_url not in visited_urls and (base_url, 0) not in to_visit:
190
+ to_visit.appendleft((base_url, 0))
191
+
192
+ AUDIO_CONTAINER_XPATH = '//*[@id="audio-container"]'
193
+ LOAD_MORE_BUTTON_XPATH = '//a[@class="button" and contains(@onclick, "loadMoreCategory")]'
194
+
195
+ print(f"Starting/Resuming crawl with base: {base_url}, max depth: {max_crawl_depth}")
196
+ print(
197
+ f"Initial Queue size: {len(to_visit)}, Initial Visited size: {len(visited_urls)}, Song page URLs: {len(song_page_urls)}")
198
+
199
+ processed_count = 0
200
+ while to_visit:
201
+ current_url, current_depth = to_visit.popleft()
202
+
203
+ if current_url in visited_urls:
204
+ continue
205
+
206
+ if current_depth > max_crawl_depth:
207
+ print(f"Skipping {current_url} - max depth reached ({max_crawl_depth})")
208
+ continue
209
+
210
+ print(f"\n--- Visiting ({current_depth}): {current_url} ---")
211
+ visited_urls.add(current_url)
212
+ processed_count += 1
213
+
214
+ try:
215
+ driver.get(current_url)
216
+ time.sleep(3) # Give page more time to load and execute JS
217
+
218
+ print(f" Page title: {driver.title}")
219
+ print(f" Current URL after load: {driver.current_url}")
220
+ images=set_screenshot(driver=driver,images=images)
221
+
222
+ # Optional: print HTML snippet for debugging. Remove for cleaner logs in production.
223
+ # print(" --- HTML snippet (first 2000 chars) ---")
224
+ # print(driver.page_source[:2000])
225
+ # print(" --- End HTML snippet ---")
226
+
227
+ # Check for Cloudflare challenge (if using standard Selenium)
228
+ if "Attention Required" in driver.title or "cloudflare" in driver.page_source.lower():
229
+ print(
230
+ " --> Cloudflare challenge detected! Try switching to undetected_chromedriver or add a longer sleep.")
231
+ print(" --> Skipping current URL due to Cloudflare challenge.")
232
+ images = set_screenshot(driver=driver, images=images)
233
+ continue # Skip this URL if Cloudflare is blocking it
234
+
235
+ # Check if it's a song page
236
+ audio_container_elements = driver.find_elements(By.XPATH, AUDIO_CONTAINER_XPATH)
237
+ if audio_container_elements:
238
+ print(f" --> FOUND AUDIO CONTAINER! This is a song page: {current_url}")
239
+ if current_url not in song_page_urls:
240
+ song_page_urls.append(current_url)
241
+
242
+ # Handle "Load More" button if present
243
+ load_more_found_and_clicked = False
244
+ while True:
245
+ try:
246
+ load_more_button = WebDriverWait(driver, 15).until(
247
+ EC.element_to_be_clickable((By.XPATH, LOAD_MORE_BUTTON_XPATH))
248
+ )
249
+
250
+ last_height = driver.execute_script("return document.body.scrollHeight")
251
+
252
+ print(" Clicking 'Load More' button...")
253
+ load_more_button.click()
254
+ load_more_found_and_clicked = True
255
+
256
+ new_height = last_height
257
+ scroll_attempts = 0
258
+ while new_height == last_height and scroll_attempts < 7:
259
+ time.sleep(2)
260
+ new_height = driver.execute_script("return document.body.scrollHeight")
261
+ scroll_attempts += 1
262
+
263
+ if new_height == last_height:
264
+ print(" No more content loaded after click, or button disappeared.")
265
+ break
266
+
267
+ except (NoSuchElementException, TimeoutException):
268
+ if not load_more_found_and_clicked:
269
+ print(" 'Load More' button not found or not clickable.")
270
+ else:
271
+ print(" 'Load More' button no longer present (all content likely loaded).")
272
+ break
273
+ except Exception as e:
274
+ print(f" Error clicking 'Load More': {e}")
275
+ break
276
+
277
+ # After all content is loaded, parse the HTML
278
+ tree = html.fromstring(driver.page_source)
279
+
280
+ # Extract nested links from the fully loaded page
281
+ links = tree.xpath('//a/@href')
282
+ print(f" Found {len(links)} raw links on the page.")
283
+
284
+ links_added_to_queue = 0
285
+ for link in links:
286
+ absolute_url = requests.compat.urljoin(current_url, link)
287
+
288
+ if "pagalgana.com" in absolute_url and "#" not in absolute_url and "?" not in absolute_url:
289
+ if not (absolute_url.endswith(
290
+ ('.mp3', '.zip', '.rar', '.jpg', '.png', '.gif', '.pdf', '.txt', '.xml', '.css', '.js'))):
291
+ if absolute_url not in visited_urls and (absolute_url, current_depth + 1) not in to_visit:
292
+ if absolute_url not in song_page_urls: # Don't re-add if already identified as a song page
293
+ to_visit.append((absolute_url, current_depth + 1))
294
+ links_added_to_queue += 1
295
+ # print(f" Added {links_added_to_queue} new valid links to the queue from {current_url}.")
296
+
297
+ except Exception as e:
298
+ print(f" An unexpected error occurred for {current_url}: {e}")
299
+ finally:
300
+ if processed_count % save_interval == 0:
301
+ print(f"--- Processed {processed_count} pages. Saving current crawl state... ---")
302
+ save_crawl_state(to_visit, visited_urls, song_page_urls, state_filename, song_pages_json_file)
303
+
304
+ driver.quit()
305
+
306
+ print("\n--- Crawl finished. Performing final save of song page URLs. ---")
307
+ save_crawl_state(to_visit, visited_urls, song_page_urls, state_filename, song_pages_json_file)
308
+ print(f"\nCrawl complete. Total {len(song_page_urls)} song pages found and saved to '{song_pages_json_file}'.")
309
+ images = set_screenshot(driver=driver, images=images)
310
+ return song_page_urls,images # Return the list of discovered song pages
311
+
312
+
313
+ # This __name__ block is for testing `crawler.py` independently
314
+ if __name__ == "__main__":
315
+ # Example usage for standalone testing of the crawler
316
+ # When run via main_script.py, this block won't execute
317
+ images=[]
318
+ discovered_urls,images = crawl_pagalgana_site(
319
+ base_url="https://pagalgana.com/category/bollywood-mp3-songs.html",
320
+ song_pages_json_file="bollywood_song_pages.json",
321
+ state_filename="bollywood_crawl_state.json",
322
+ max_crawl_depth=2, # Keep low for testing
323
+ save_interval=5,
324
+ images=images
325
+ )
326
+ print(f"Crawler finished. Discovered {len(discovered_urls)} song URLs.")
main_script.py ADDED
@@ -0,0 +1,95 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import os
3
+ import time
4
+ from typing import List, Dict
5
+
6
+ # Import functions from your separate files
7
+ from crawler import crawl_pagalgana_site, load_crawl_state, save_crawl_state
8
+ from metadata_extractor import extract_song_metadata
9
+
10
+
11
+ def main():
12
+ images=[]
13
+ # --- Configuration ---
14
+ BASE_URL = "https://pagalgana.com/12-baje-le-chalau-blender-2025-raushan-rohi-bhojp-uuw.html"
15
+ MAX_CRAWL_DEPTH = 10 # Adjust this for how deep you want to crawl
16
+ CRAWL_STATE_FILE = "bollywood_crawl_state.json"
17
+ SONG_PAGES_FILE = "bollywood_song_pages.json" # Output from crawler
18
+ METADATA_OUTPUT_FILE = "bollywood_song_metadata.json" # Final output with detailed metadata
19
+ CRAWLER_SAVE_INTERVAL = 10 # Save crawler state every X pages
20
+ METADATA_SAVE_INTERVAL = 50 # Save metadata periodically every X songs extracted
21
+
22
+ print("Starting Pagalgana Web Scraper and Metadata Extractor.")
23
+
24
+ # --- Phase 1: Crawl the site to find song page URLs ---
25
+ print("\n## Phase 1: Discovering Song Page URLs ##")
26
+ # This function will handle loading/saving its own state
27
+ discovered_song_urls,images = crawl_pagalgana_site(
28
+ base_url=BASE_URL,
29
+ song_pages_json_file=SONG_PAGES_FILE,
30
+ state_filename=CRAWL_STATE_FILE,
31
+ max_crawl_depth=MAX_CRAWL_DEPTH,
32
+ save_interval=CRAWLER_SAVE_INTERVAL,
33
+ images=images
34
+ )
35
+ print(f"\nPhase 1 Complete. Found {len(discovered_song_urls)} unique song page URLs.")
36
+
37
+ # --- Phase 2: Extract metadata from discovered song URLs ---
38
+ print("\n## Phase 2: Extracting Metadata from Song Pages ##")
39
+
40
+ # Load previously extracted metadata to enable resuming this phase
41
+ # We use a dummy state_filename for this load to just get the metadata list
42
+ _, _, _, existing_metadata = load_crawl_state(
43
+ state_filename="dummy_state_for_metadata_load.json", # This specific file won't be used by crawler
44
+ song_pages_json_file=SONG_PAGES_FILE, # This is loaded by the crawler
45
+ metadata_json_file=METADATA_OUTPUT_FILE # This is the file we care about loading here
46
+ )
47
+
48
+ # Create a set of URLs for which we already have metadata
49
+ processed_metadata_urls = {entry.get("URL") for entry in existing_metadata if
50
+ isinstance(entry, dict) and "URL" in entry}
51
+
52
+ metadata_extracted_count = 0
53
+ new_metadata_entries: List[Dict] = [] # To store new entries from this run
54
+
55
+ # Iterate through each discovered song URL
56
+ for url in discovered_song_urls:
57
+ if url in processed_metadata_urls:
58
+ print(f" Metadata for {url} already extracted. Skipping.")
59
+ continue
60
+
61
+ metadata = extract_song_metadata(url)
62
+ new_metadata_entries.append(metadata)
63
+ metadata_extracted_count += 1
64
+
65
+ # Add the URL to our tracking set to avoid duplicates in this run
66
+ processed_metadata_urls.add(url)
67
+
68
+ # Save metadata periodically
69
+ if metadata_extracted_count % METADATA_SAVE_INTERVAL == 0:
70
+ # Combine existing and new metadata for periodic save
71
+ combined_metadata = existing_metadata + new_metadata_entries
72
+ try:
73
+ with open(METADATA_OUTPUT_FILE, 'w', encoding='utf-8') as f:
74
+ json.dump(combined_metadata, f, indent=4, ensure_ascii=False)
75
+ print(f" --- Saved {len(combined_metadata)} metadata entries to '{METADATA_OUTPUT_FILE}'. ---")
76
+ except IOError as e:
77
+ print(f" Error saving metadata periodically: {e}")
78
+
79
+ time.sleep(0.5) # Be kind to the server, small delay between fetches
80
+
81
+ # Final save of all metadata
82
+ final_metadata = existing_metadata + new_metadata_entries
83
+ try:
84
+ with open(METADATA_OUTPUT_FILE, 'w', encoding='utf-8') as f:
85
+ json.dump(final_metadata, f, indent=4, ensure_ascii=False)
86
+ except IOError as e:
87
+ print(f"Error saving final metadata to '{METADATA_OUTPUT_FILE}': {e}")
88
+
89
+ print(f"\nPhase 2 Complete. Extracted metadata for {len(new_metadata_entries)} new song pages.")
90
+ print(f"Total {len(final_metadata)} unique song metadata entries saved to '{METADATA_OUTPUT_FILE}'.")
91
+ print("\nScraping process finished.")
92
+
93
+
94
+ if __name__ == "__main__":
95
+ main()
metadata_extractor.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ from lxml import html
3
+ from bs4 import BeautifulSoup
4
+ import json
5
+ import re
6
+
7
+ def fetch_html_tree_requests(url: str) -> tuple:
8
+ """Fetches HTML using requests and returns lxml tree and raw HTML."""
9
+ headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
10
+ try:
11
+ response = requests.get(url, headers=headers, timeout=10)
12
+ response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
13
+ return html.fromstring(response.content), response.text
14
+ except requests.exceptions.RequestException as e:
15
+ print(f"Error fetching {url} with requests: {e}")
16
+ return None, None
17
+
18
+ def extract_tbody_html(tree: html.HtmlElement, xpath: str = "/html/body/div[3]/table/tbody") -> str:
19
+ """Extracts the tbody HTML string from an lxml tree."""
20
+ result = tree.xpath(xpath)
21
+ if not result:
22
+ return None
23
+ return html.tostring(result[0], encoding='unicode')
24
+
25
+ def extract_thumbnail(tree: html.HtmlElement) -> str:
26
+ """Extracts the thumbnail URL from JSON-LD script tags."""
27
+ scripts = tree.xpath("//script[@type='application/ld+json']/text()")
28
+ for script in scripts:
29
+ try:
30
+ json_data = json.loads(script.strip())
31
+ if isinstance(json_data, dict) and "image" in json_data:
32
+ return json_data["image"]
33
+ except json.JSONDecodeError:
34
+ continue
35
+ return None
36
+
37
+ def extract_audio_url(html_text: str) -> str:
38
+ """Extracts the MP3 audio URL using regex from raw HTML."""
39
+ match = re.search(r'new Audio\(["\'](https://[^"\']+\.mp3)["\']\)', html_text)
40
+ return match.group(1) if match else None
41
+
42
+ def tbody_to_json(html_tbody: str) -> dict:
43
+ """Parses tbody HTML using BeautifulSoup and converts to a dictionary."""
44
+ if not html_tbody:
45
+ return {}
46
+ soup = BeautifulSoup(html_tbody, "html.parser")
47
+ data = {}
48
+
49
+ for tr in soup.find_all("tr", class_="tr"):
50
+ tds = tr.find_all("td")
51
+ if len(tds) < 2:
52
+ continue
53
+
54
+ key = tds[0].get_text(strip=True).rstrip(":")
55
+ value_cell = tds[1]
56
+
57
+ if key == "Rating":
58
+ stars = value_cell.find_all("span")
59
+ if stars:
60
+ stars_str = ''.join(star.get_text(strip=True) for star in stars)
61
+ data[key] = {
62
+ "stars": stars_str,
63
+ "out_of": 5,
64
+ "value": stars_str.count("★") + 0.5 * stars_str.count("☆")
65
+ }
66
+ continue
67
+
68
+ value = value_cell.get_text(" ", strip=True)
69
+ data[key] = value
70
+
71
+ return data
72
+
73
+ def extract_song_metadata(url: str) -> dict:
74
+ """Fetches a song page and extracts all relevant metadata."""
75
+ print(f" Attempting to extract metadata from: {url}")
76
+ tree, html_text = fetch_html_tree_requests(url)
77
+ if tree is None:
78
+ return {"URL": url, "error": "Failed to fetch page with requests or network issue."}
79
+
80
+ metadata = {"URL": url}
81
+
82
+ try:
83
+ tbody_html = extract_tbody_html(tree)
84
+ if tbody_html:
85
+ metadata.update(tbody_to_json(tbody_html))
86
+ else:
87
+ metadata["tbody_data_present"] = False
88
+
89
+ thumbnail_url = extract_thumbnail(tree)
90
+ if thumbnail_url:
91
+ metadata["Thumbnail"] = thumbnail_url
92
+
93
+ audio_url = extract_audio_url(html_text)
94
+ if audio_url:
95
+ metadata["Play Online"] = audio_url
96
+ else:
97
+ metadata["Play Online"] = None
98
+
99
+ except Exception as e:
100
+ metadata["error_extracting_metadata"] = str(e)
101
+ print(f" Error extracting metadata for {url}: {e}")
102
+
103
+ return metadata
104
+
105
+ # This __name__ block is for testing `metadata_extractor.py` independently
106
+ if __name__ == "__main__":
107
+ # Example usage for standalone testing
108
+ test_url = "https://pagalgana.com/0mp-Mechanical-sundariye-2.0-hindiLl.html"
109
+ metadata = extract_song_metadata(test_url)
110
+ print(json.dumps(metadata, indent=4, ensure_ascii=False))