cx_ai_agent_v1 / services /web_scraper.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Enterprise-grade Web Scraping Service
Extracts company information, contact pages, and decision-maker details
"""
import asyncio
import re
import logging
from typing import Dict, List, Optional
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
class WebScraperService:
"""Production-ready web scraper for company and contact information"""
def __init__(self, timeout: int = 10, max_retries: int = 2):
self.timeout = timeout
self.max_retries = max_retries
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
async def extract_company_info(self, url: str) -> Dict[str, any]:
"""
Extract company information from website
Args:
url: Company website URL
Returns:
Dictionary with company info
"""
try:
logger.info(f"Extracting company info from: {url}")
# Fetch page
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: self.session.get(url, timeout=self.timeout, allow_redirects=True)
)
if response.status_code != 200:
logger.warning(f"Failed to fetch {url}: Status {response.status_code}")
return {}
soup = BeautifulSoup(response.text, 'html.parser')
# Extract company name
company_name = self._extract_company_name(soup, url)
# Extract description
description = self._extract_description(soup)
# Find contact page URL
contact_url = self._find_contact_page(soup, url)
# Extract domain
domain = urlparse(url).netloc.replace('www.', '')
return {
'name': company_name,
'website': url,
'domain': domain,
'description': description,
'contact_page': contact_url
}
except Exception as e:
logger.error(f"Error extracting company info from {url}: {str(e)}")
return {}
def _extract_company_name(self, soup: BeautifulSoup, url: str) -> str:
"""Extract company name from page"""
# Try meta tags first
og_site_name = soup.find('meta', property='og:site_name')
if og_site_name and og_site_name.get('content'):
return og_site_name['content']
# Try title tag
title = soup.find('title')
if title:
# Clean up title (remove " - Home" etc.)
clean_title = re.sub(r'\s*[-|]\s*(Home|Homepage|Welcome).*$', '', title.text, flags=re.IGNORECASE)
return clean_title.strip()
# Fallback to domain
domain = urlparse(url).netloc.replace('www.', '')
return domain.split('.')[0].title()
def _extract_description(self, soup: BeautifulSoup) -> str:
"""Extract company description"""
# Try meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc and meta_desc.get('content'):
return meta_desc['content']
# Try og:description
og_desc = soup.find('meta', property='og:description')
if og_desc and og_desc.get('content'):
return og_desc['content']
# Try first paragraph
first_p = soup.find('p')
if first_p:
return first_p.text.strip()[:200]
return ""
def _find_contact_page(self, soup: BeautifulSoup, base_url: str) -> Optional[str]:
"""Find contact page URL"""
# Common contact page patterns
contact_patterns = [
r'contact',
r'about.*us',
r'team',
r'leadership',
r'get.*in.*touch',
r'reach.*us'
]
# Search all links
for link in soup.find_all('a', href=True):
href = link['href'].lower()
link_text = link.text.lower()
for pattern in contact_patterns:
if re.search(pattern, href) or re.search(pattern, link_text):
# Convert relative to absolute URL
full_url = urljoin(base_url, link['href'])
return full_url
# Try common URLs directly
domain = urlparse(base_url).scheme + "://" + urlparse(base_url).netloc
common_paths = ['/contact', '/contact-us', '/about', '/about-us', '/team']
for path in common_paths:
test_url = domain + path
try:
response = self.session.head(test_url, timeout=5, allow_redirects=True)
if response.status_code == 200:
return test_url
except:
continue
return None
async def scrape_page(self, url: str) -> Optional[Dict[str, any]]:
"""
Generic page scraper that returns full page content
Args:
url: Page URL to scrape
Returns:
Dictionary with page content (html, text, soup)
"""
try:
logger.info(f"Scraping page: {url}")
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: self.session.get(url, timeout=self.timeout, allow_redirects=True)
)
if response.status_code != 200:
logger.warning(f"Failed to scrape {url}: Status {response.status_code}")
return None
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text
text = soup.get_text()
# Clean up text - remove multiple newlines/spaces
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
return {
'url': url,
'html': response.text,
'text': text,
'soup': soup
}
except Exception as e:
logger.error(f"Error scraping page {url}: {str(e)}")
return None
async def scrape_contact_page(self, url: str) -> Dict[str, List[str]]:
"""
Scrape contact information from a page
Args:
url: Contact page URL
Returns:
Dictionary with emails, phones, names found
"""
try:
logger.info(f"Scraping contact page: {url}")
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: self.session.get(url, timeout=self.timeout, allow_redirects=True)
)
if response.status_code != 200:
return {'emails': [], 'phones': [], 'names': []}
text = response.text
soup = BeautifulSoup(text, 'html.parser')
# Extract emails
emails = self._extract_emails(text)
# Extract phone numbers
phones = self._extract_phones(text)
# Extract names (people mentioned)
names = self._extract_names(soup)
return {
'emails': list(set(emails)),
'phones': list(set(phones)),
'names': list(set(names))
}
except Exception as e:
logger.error(f"Error scraping contact page {url}: {str(e)}")
return {'emails': [], 'phones': [], 'names': []}
def _extract_emails(self, text: str) -> List[str]:
"""Extract email addresses from text"""
# Email regex pattern
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = re.findall(email_pattern, text)
# Filter out common junk emails
filtered = []
ignore_patterns = ['example.com', 'domain.com', 'email.com', 'yourcompany.com', 'image', 'pixel']
for email in emails:
if not any(pattern in email.lower() for pattern in ignore_patterns):
filtered.append(email.lower())
return filtered
def _extract_phones(self, text: str) -> List[str]:
"""Extract phone numbers from text"""
# Phone number patterns
phone_patterns = [
r'\+?1?\s*\(?([0-9]{3})\)?[\s.-]?([0-9]{3})[\s.-]?([0-9]{4})', # US format
r'\+?([0-9]{1,3})?[\s.-]?\(?([0-9]{2,4})\)?[\s.-]?([0-9]{3,4})[\s.-]?([0-9]{4})' # International
]
phones = []
for pattern in phone_patterns:
matches = re.findall(pattern, text)
for match in matches:
if isinstance(match, tuple):
phone = ''.join(match)
else:
phone = match
if len(phone) >= 10: # Valid phone number
phones.append(phone)
return phones[:5] # Limit to 5
def _extract_names(self, soup: BeautifulSoup) -> List[str]:
"""Extract person names from page"""
names = []
# Look for common patterns
# 1. "Meet the team" sections
team_sections = soup.find_all(['section', 'div'], class_=re.compile(r'team|staff|leadership|people', re.I))
for section in team_sections:
# Find headings that might be names
headings = section.find_all(['h2', 'h3', 'h4', 'p'])
for heading in headings:
text = heading.text.strip()
# Simple check: 2-4 words, each capitalized
words = text.split()
if 2 <= len(words) <= 4 and all(w[0].isupper() for w in words if w):
names.append(text)
# 2. Look for title patterns
title_patterns = [
r'(CEO|CTO|CFO|COO|President|VP|Director|Manager|Head of)\s*[:-]\s*([A-Z][a-z]+\s+[A-Z][a-z]+)',
r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s*,\s*(CEO|CTO|CFO|COO|President|VP|Director)'
]
page_text = soup.get_text()
for pattern in title_patterns:
matches = re.findall(pattern, page_text)
for match in matches:
if isinstance(match, tuple):
name = match[1] if match[0] in ['CEO', 'CTO', 'CFO', 'COO', 'President', 'VP', 'Director'] else match[0]
names.append(name)
return names[:10] # Limit to 10
async def find_linkedin_profiles(self, company_name: str, title: str = "CEO") -> List[Dict[str, str]]:
"""
Find LinkedIn profiles via Google search
Args:
company_name: Company name
title: Job title to search for
Returns:
List of potential profiles
"""
# We'll use the web search service for this
# Return empty for now, will integrate with WebSearchService
return []
def generate_email_patterns(self, name: str, domain: str) -> List[str]:
"""
Generate possible email addresses for a person
Args:
name: Person's full name
domain: Company domain
Returns:
List of possible email addresses
"""
if not name or not domain:
return []
# Parse name
parts = name.lower().split()
if len(parts) < 2:
return []
first = parts[0]
last = parts[-1]
# Common patterns
patterns = [
f"{first}.{last}@{domain}",
f"{first}{last}@{domain}",
f"{first[0]}{last}@{domain}",
f"{first}_{last}@{domain}",
f"{last}.{first}@{domain}",
f"{first}@{domain}",
f"{last}@{domain}"
]
return patterns
def validate_email_format(self, email: str) -> bool:
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))