Spaces:
Running
Running
| import json | |
| import time | |
| import re | |
| import os | |
| from typing import Optional | |
| from urllib.parse import urlparse | |
| from duckduckgo_search import DDGS | |
| import requests | |
| from bs4 import BeautifulSoup | |
| # with open("../data/uncleaned_companies.json", "r") as f: | |
| # companies = json.load(f).get("companies", []) | |
| EXCLUDE_WORDS = {"inc.", "llc", "ltd", "corp", "corporation", "the"} | |
| def clean_url(url: str) -> str: | |
| """Trim tracking and subpage paths to get the main domain.""" | |
| try: | |
| parsed = urlparse(url) | |
| domain = f"{parsed.scheme}://{parsed.netloc}" | |
| return domain | |
| except: | |
| return url | |
| def extract_website_from_tables(soup: BeautifulSoup, comp_name) -> Optional[str]: | |
| """ | |
| Finds the first website URL in an <a> tag within any table row (<tr>) | |
| in the BeautifulSoup object. | |
| """ | |
| tables = soup.find_all("table") | |
| def is_website_link(href: str) -> bool: | |
| href = href.lower() | |
| return href.startswith(("http://", "https://")) and not any( | |
| href.startswith(p) for p in ["#", "mailto:", "javascript:", "tel:"] | |
| ) | |
| def company_name_exists() -> bool: | |
| temp_parts = comp_name.lower().split() | |
| comp_name_parts = [word for word in temp_parts if word not in EXCLUDE_WORDS] | |
| for w in comp_name_parts: | |
| if w in href.lower(): | |
| return True | |
| return False | |
| for table in tables: | |
| rows = table.find_all("tr") | |
| for row in rows: | |
| anchor_tags = row.find_all("a", href=True) | |
| for a_tag in anchor_tags: | |
| href = a_tag["href"] | |
| if is_website_link(href): | |
| if company_name_exists(): | |
| return href | |
| return None | |
| def find_company_website(company_name, location=None, industry=None): | |
| query = f"{company_name} {location or ''} {industry or ''} official website" | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=3)) | |
| if not results: | |
| return None | |
| best_match = None | |
| best_score = 0 | |
| for r in results: | |
| url = r.get("href") or r.get("url") | |
| if not url: | |
| continue | |
| cleaned = clean_url(url) | |
| domain = urlparse(cleaned).netloc.lower() | |
| score = 0 | |
| name = company_name.lower().split()[0] | |
| if name in domain: | |
| score += 5 | |
| if any(domain.endswith(tld) for tld in [".com", ".org", ".net", ".co", ".io"]): | |
| score += 2 | |
| if any(domain.startswith(prefix) for prefix in ["support.", "careers.", "ir.", "blog.", "community.", "forum.", "media.", "news.", "docs.", "developer.", "help.", "about.", "ttlc.", "privacy.", "terms.", "legal.", "events.", "partners.", "investors.", "research.", "customers.", "resources.", "contact.", "shop.", "store.", "login.", "app.", "apps.", "download.", "downloads.", "status.", "jobs.", "work.", "team.", "company.", "corporate."]): | |
| score -= 3 | |
| if re.search(r"/(drivers|about|news|products|careers|support)", url, re.IGNORECASE): | |
| score -= 2 | |
| if score > best_score: | |
| best_score = score | |
| best_match = cleaned | |
| return best_match | |
| def find_all_company_websites(companies): | |
| for c in companies: | |
| if not c.get("website_url"): | |
| print(f"Searching for {c['company_name']}...") | |
| temp_url = find_company_website( | |
| c["company_name"], | |
| location=c.get("location"), | |
| industry=c.get("industry_type") | |
| ) | |
| temp_parts = c["company_name"].lower().split() | |
| comp_name_parts = [word for word in temp_parts if word not in EXCLUDE_WORDS] | |
| len_thres = int(len(comp_name_parts)/2) | |
| count = 0 | |
| if temp_url: | |
| for p in comp_name_parts: | |
| if p in temp_url.lower(): | |
| count += 1 | |
| if count >= len_thres: | |
| c["website_url"] = temp_url | |
| print(f"Found website via DDG: {c['website_url']}") | |
| else: | |
| c["website_url"] = None | |
| print(f"No suitable website found via DDG for {c['company_name']}") | |
| time.sleep(2) | |
| return companies | |
| # with open("../data/companies_with_urls.json", "r") as f: | |
| # companies = json.load(f) | |
| def check_percent_with_urls(companies): | |
| percent_with_urls = sum(1 for c in companies if c.get("website_url")) / len(companies) * 100 | |
| return percent_with_urls | |
| def wiki_search_mode(companies, main_data_folder): | |
| percent_with_urls = check_percent_with_urls(companies) | |
| if percent_with_urls < 100: | |
| print("Less than 100% of companies have website URLs. Going to wikisearch mode...") | |
| for c in companies: | |
| if not c.get("website_url"): | |
| print(f"Wikisearching for {c['company_name']}...") | |
| headers = { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/123.0 Safari/537.36" | |
| ) | |
| } | |
| if("(" in c["company_name"]): | |
| mod_comp_name = c["company_name"].split("(")[0].strip() | |
| wiki_url = f"https://en.wikipedia.org/wiki/{mod_comp_name.replace(' ', '_')}" | |
| else: | |
| mod_comp_name = c["company_name"] | |
| # print(mod_comp_name.replace(' ', '_')) | |
| wiki_url = f"https://en.wikipedia.org/wiki/{mod_comp_name.replace(' ', '_')}" | |
| try: | |
| res = requests.get(wiki_url, headers=headers, timeout=10) | |
| res.raise_for_status() | |
| soup = BeautifulSoup(res.text, 'html.parser') | |
| website_url = extract_website_from_tables(soup, mod_comp_name) | |
| if website_url: | |
| c["website_url"] = clean_url(website_url) | |
| print(f"Found website via Wikipedia: {c['website_url']}",flush=True) | |
| else: | |
| print(f"No website found on Wikipedia for {c['company_name']}", flush=True) | |
| except Exception as e: | |
| print(f"Error accessing Wikipedia for {c['company_name']}: {str(e)}") | |
| continue | |
| time.sleep(5) | |
| else: | |
| print("All companies already have website URLs. Skipping wikisearch mode...") | |
| print("Saving results...") | |
| # data_folder = "/tmp/data" | |
| os.makedirs(main_data_folder, exist_ok=True) | |
| file_path = os.path.join(main_data_folder, "all_cleaned_companies.json") | |
| with open(file_path, "w") as f: | |
| json.dump({"companies": companies}, f, indent=2) | |
| print("Enriched company list saved to all_cleaned_companies.json",flush=True) | |
| return {"companies": companies} | |
| # with open("../data/uncleaned_companies.json", "r") as f: | |
| # companies = json.load(f).get("companies", []) | |
| # intermediate_data = find_all_company_websites(companies) | |
| # final_data = wiki_search_mode(intermediate_data) |