Spaces:
Running
Running
File size: 7,180 Bytes
723bbe6 45021e5 723bbe6 8f6827d 723bbe6 8f6827d 723bbe6 45021e5 723bbe6 45021e5 723bbe6 8f6827d 723bbe6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
import json
import time
import re
import os
from typing import Optional
from urllib.parse import urlparse
from duckduckgo_search import DDGS
import requests
from bs4 import BeautifulSoup
# with open("../data/uncleaned_companies.json", "r") as f:
# companies = json.load(f).get("companies", [])
EXCLUDE_WORDS = {"inc.", "llc", "ltd", "corp", "corporation", "the"}
def clean_url(url: str) -> str:
"""Trim tracking and subpage paths to get the main domain."""
try:
parsed = urlparse(url)
domain = f"{parsed.scheme}://{parsed.netloc}"
return domain
except:
return url
def extract_website_from_tables(soup: BeautifulSoup, comp_name) -> Optional[str]:
"""
Finds the first website URL in an <a> tag within any table row (<tr>)
in the BeautifulSoup object.
"""
tables = soup.find_all("table")
def is_website_link(href: str) -> bool:
href = href.lower()
return href.startswith(("http://", "https://")) and not any(
href.startswith(p) for p in ["#", "mailto:", "javascript:", "tel:"]
)
def company_name_exists() -> bool:
temp_parts = comp_name.lower().split()
comp_name_parts = [word for word in temp_parts if word not in EXCLUDE_WORDS]
for w in comp_name_parts:
if w in href.lower():
return True
return False
for table in tables:
rows = table.find_all("tr")
for row in rows:
anchor_tags = row.find_all("a", href=True)
for a_tag in anchor_tags:
href = a_tag["href"]
if is_website_link(href):
if company_name_exists():
return href
return None
def find_company_website(company_name, location=None, industry=None):
query = f"{company_name} {location or ''} {industry or ''} official website"
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=3))
if not results:
return None
best_match = None
best_score = 0
for r in results:
url = r.get("href") or r.get("url")
if not url:
continue
cleaned = clean_url(url)
domain = urlparse(cleaned).netloc.lower()
score = 0
name = company_name.lower().split()[0]
if name in domain:
score += 5
if any(domain.endswith(tld) for tld in [".com", ".org", ".net", ".co", ".io"]):
score += 2
if any(domain.startswith(prefix) for prefix in ["support.", "careers.", "ir.", "blog.", "community.", "forum.", "media.", "news.", "docs.", "developer.", "help.", "about.", "ttlc.", "privacy.", "terms.", "legal.", "events.", "partners.", "investors.", "research.", "customers.", "resources.", "contact.", "shop.", "store.", "login.", "app.", "apps.", "download.", "downloads.", "status.", "jobs.", "work.", "team.", "company.", "corporate."]):
score -= 3
if re.search(r"/(drivers|about|news|products|careers|support)", url, re.IGNORECASE):
score -= 2
if score > best_score:
best_score = score
best_match = cleaned
return best_match
def find_all_company_websites(companies):
for c in companies:
if not c.get("website_url"):
print(f"Searching for {c['company_name']}...")
temp_url = find_company_website(
c["company_name"],
location=c.get("location"),
industry=c.get("industry_type")
)
temp_parts = c["company_name"].lower().split()
comp_name_parts = [word for word in temp_parts if word not in EXCLUDE_WORDS]
len_thres = int(len(comp_name_parts)/2)
count = 0
if temp_url:
for p in comp_name_parts:
if p in temp_url.lower():
count += 1
if count >= len_thres:
c["website_url"] = temp_url
print(f"Found website via DDG: {c['website_url']}")
else:
c["website_url"] = None
print(f"No suitable website found via DDG for {c['company_name']}")
time.sleep(2)
return companies
# with open("../data/companies_with_urls.json", "r") as f:
# companies = json.load(f)
def check_percent_with_urls(companies):
percent_with_urls = sum(1 for c in companies if c.get("website_url")) / len(companies) * 100
return percent_with_urls
def wiki_search_mode(companies, main_data_folder):
percent_with_urls = check_percent_with_urls(companies)
if percent_with_urls < 100:
print("Less than 100% of companies have website URLs. Going to wikisearch mode...")
for c in companies:
if not c.get("website_url"):
print(f"Wikisearching for {c['company_name']}...")
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/123.0 Safari/537.36"
)
}
if("(" in c["company_name"]):
mod_comp_name = c["company_name"].split("(")[0].strip()
wiki_url = f"https://en.wikipedia.org/wiki/{mod_comp_name.replace(' ', '_')}"
else:
mod_comp_name = c["company_name"]
# print(mod_comp_name.replace(' ', '_'))
wiki_url = f"https://en.wikipedia.org/wiki/{mod_comp_name.replace(' ', '_')}"
try:
res = requests.get(wiki_url, headers=headers, timeout=10)
res.raise_for_status()
soup = BeautifulSoup(res.text, 'html.parser')
website_url = extract_website_from_tables(soup, mod_comp_name)
if website_url:
c["website_url"] = clean_url(website_url)
print(f"Found website via Wikipedia: {c['website_url']}",flush=True)
else:
print(f"No website found on Wikipedia for {c['company_name']}", flush=True)
except Exception as e:
print(f"Error accessing Wikipedia for {c['company_name']}: {str(e)}")
continue
time.sleep(5)
else:
print("All companies already have website URLs. Skipping wikisearch mode...")
print("Saving results...")
# data_folder = "/tmp/data"
os.makedirs(main_data_folder, exist_ok=True)
file_path = os.path.join(main_data_folder, "all_cleaned_companies.json")
with open(file_path, "w") as f:
json.dump({"companies": companies}, f, indent=2)
print("Enriched company list saved to all_cleaned_companies.json",flush=True)
return {"companies": companies}
# with open("../data/uncleaned_companies.json", "r") as f:
# companies = json.load(f).get("companies", [])
# intermediate_data = find_all_company_websites(companies)
# final_data = wiki_search_mode(intermediate_data) |