|
|
""" |
|
|
AI-Powered Contact Extraction Service |
|
|
Uses LLM to intelligently extract and validate contact information |
|
|
""" |
|
|
import asyncio |
|
|
import logging |
|
|
import json |
|
|
from typing import Dict, List, Optional |
|
|
import os |
|
|
import requests |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class AIContactExtractor: |
|
|
"""Uses AI to extract and validate contact information""" |
|
|
|
|
|
def __init__(self): |
|
|
self.hf_token = os.getenv('HF_TOKEN') |
|
|
self.api_url = "https://api-inference.huggingface.co/models/meta-llama/Llama-3.2-3B-Instruct" |
|
|
|
|
|
async def extract_decision_makers(self, company_info: Dict, page_content: str, titles_to_find: List[str]) -> List[Dict[str, str]]: |
|
|
""" |
|
|
Use AI to extract decision maker information from page content |
|
|
|
|
|
Args: |
|
|
company_info: Company information |
|
|
page_content: Text content from webpage |
|
|
titles_to_find: Job titles to look for |
|
|
|
|
|
Returns: |
|
|
List of decision makers with name, title, confidence |
|
|
""" |
|
|
try: |
|
|
|
|
|
content_preview = page_content[:3000] |
|
|
|
|
|
prompt = f"""Extract contact information for decision makers at {company_info.get('name', 'the company')}. |
|
|
|
|
|
From this webpage content, find people with these titles: {', '.join(titles_to_find)} |
|
|
|
|
|
Webpage content: |
|
|
{content_preview} |
|
|
|
|
|
Extract: |
|
|
1. Full name |
|
|
2. Job title |
|
|
3. Any contact information (email, LinkedIn) |
|
|
|
|
|
Return as JSON array: |
|
|
[{{"name": "John Doe", "title": "CEO", "email": "john@company.com", "linkedin": "linkedin.com/in/johndoe", "confidence": 0.9}}] |
|
|
|
|
|
If no clear matches found, return empty array: [] |
|
|
""" |
|
|
|
|
|
response = await self._call_llm(prompt) |
|
|
|
|
|
|
|
|
decision_makers = self._parse_llm_response(response) |
|
|
|
|
|
logger.info(f"AI extracted {len(decision_makers)} decision makers for {company_info.get('name')}") |
|
|
return decision_makers |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in AI contact extraction: {str(e)}") |
|
|
return [] |
|
|
|
|
|
async def validate_company_match(self, search_result_title: str, search_result_snippet: str) -> Dict[str, any]: |
|
|
""" |
|
|
Use AI to determine if a search result is actually a company website |
|
|
|
|
|
Args: |
|
|
search_result_title: Search result title |
|
|
search_result_snippet: Search result description |
|
|
|
|
|
Returns: |
|
|
Dictionary with is_company, company_name, confidence |
|
|
""" |
|
|
try: |
|
|
prompt = f"""Analyze this search result and determine if it's a real company website (not an article, blog post, or directory listing). |
|
|
|
|
|
Title: {search_result_title} |
|
|
Description: {search_result_snippet} |
|
|
|
|
|
Questions: |
|
|
1. Is this a company's official website? (yes/no) |
|
|
2. What is the company name? |
|
|
3. Confidence level (0.0 to 1.0) |
|
|
|
|
|
Return as JSON: |
|
|
{{"is_company": true/false, "company_name": "Company Name", "confidence": 0.0-1.0, "reason": "brief explanation"}} |
|
|
""" |
|
|
|
|
|
response = await self._call_llm(prompt) |
|
|
|
|
|
|
|
|
result = self._parse_json_from_text(response) |
|
|
|
|
|
if result: |
|
|
return result |
|
|
else: |
|
|
|
|
|
return self._fallback_company_validation(search_result_title, search_result_snippet) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in AI company validation: {str(e)}") |
|
|
return self._fallback_company_validation(search_result_title, search_result_snippet) |
|
|
|
|
|
def _fallback_company_validation(self, title: str, snippet: str) -> Dict[str, any]: |
|
|
"""Fallback validation without AI""" |
|
|
|
|
|
non_company_indicators = [ |
|
|
'blog', 'article', 'guide', 'how to', 'best', 'top 10', |
|
|
'list of', 'review', 'comparison', 'vs', 'alternatives', |
|
|
'wikipedia', 'linkedin', 'facebook', 'twitter' |
|
|
] |
|
|
|
|
|
title_lower = title.lower() |
|
|
snippet_lower = snippet.lower() |
|
|
|
|
|
is_company = not any(indicator in title_lower or indicator in snippet_lower |
|
|
for indicator in non_company_indicators) |
|
|
|
|
|
|
|
|
company_name = title.split('|')[0].split('-')[0].strip() |
|
|
|
|
|
return { |
|
|
'is_company': is_company, |
|
|
'company_name': company_name, |
|
|
'confidence': 0.6 if is_company else 0.3, |
|
|
'reason': 'Heuristic validation (AI unavailable)' |
|
|
} |
|
|
|
|
|
async def infer_contact_details(self, company_domain: str, person_name: str, title: str, known_emails: List[str]) -> Dict[str, str]: |
|
|
""" |
|
|
Use AI and patterns to infer likely contact details |
|
|
|
|
|
Args: |
|
|
company_domain: Company domain |
|
|
person_name: Person's name |
|
|
title: Job title |
|
|
known_emails: List of known email addresses from the company |
|
|
|
|
|
Returns: |
|
|
Dictionary with inferred email, confidence |
|
|
""" |
|
|
try: |
|
|
|
|
|
email_pattern = self._detect_email_pattern(known_emails) |
|
|
|
|
|
|
|
|
inferred_email = self._generate_email(person_name, company_domain, email_pattern) |
|
|
|
|
|
return { |
|
|
'email': inferred_email, |
|
|
'pattern': email_pattern, |
|
|
'confidence': 0.7 if email_pattern != 'unknown' else 0.4, |
|
|
'source': 'pattern_based' |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error inferring contact details: {str(e)}") |
|
|
return { |
|
|
'email': f"contact@{company_domain}", |
|
|
'pattern': 'generic', |
|
|
'confidence': 0.3, |
|
|
'source': 'fallback' |
|
|
} |
|
|
|
|
|
def _detect_email_pattern(self, emails: List[str]) -> str: |
|
|
"""Detect common email pattern from list""" |
|
|
if not emails: |
|
|
return 'unknown' |
|
|
|
|
|
patterns = {} |
|
|
|
|
|
for email in emails: |
|
|
local_part = email.split('@')[0] |
|
|
|
|
|
|
|
|
if '.' in local_part: |
|
|
pattern = 'first.last' |
|
|
elif '_' in local_part: |
|
|
pattern = 'first_last' |
|
|
else: |
|
|
pattern = 'firstlast' |
|
|
|
|
|
patterns[pattern] = patterns.get(pattern, 0) + 1 |
|
|
|
|
|
|
|
|
if patterns: |
|
|
return max(patterns, key=patterns.get) |
|
|
|
|
|
return 'first.last' |
|
|
|
|
|
def _generate_email(self, name: str, domain: str, pattern: str) -> str: |
|
|
"""Generate email based on name and pattern""" |
|
|
parts = name.lower().split() |
|
|
|
|
|
if len(parts) < 2: |
|
|
return f"contact@{domain}" |
|
|
|
|
|
first = parts[0] |
|
|
last = parts[-1] |
|
|
|
|
|
if pattern == 'first.last': |
|
|
return f"{first}.{last}@{domain}" |
|
|
elif pattern == 'first_last': |
|
|
return f"{first}_{last}@{domain}" |
|
|
elif pattern == 'firstlast': |
|
|
return f"{first}{last}@{domain}" |
|
|
elif pattern == 'flast': |
|
|
return f"{first[0]}{last}@{domain}" |
|
|
else: |
|
|
return f"{first}.{last}@{domain}" |
|
|
|
|
|
async def _call_llm(self, prompt: str, max_tokens: int = 500) -> str: |
|
|
"""Call HuggingFace LLM API""" |
|
|
if not self.hf_token: |
|
|
logger.warning("HF_TOKEN not set, AI features limited") |
|
|
return "" |
|
|
|
|
|
try: |
|
|
headers = {"Authorization": f"Bearer {self.hf_token}"} |
|
|
|
|
|
payload = { |
|
|
"inputs": prompt, |
|
|
"parameters": { |
|
|
"max_new_tokens": max_tokens, |
|
|
"temperature": 0.3, |
|
|
"return_full_text": False |
|
|
} |
|
|
} |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: requests.post(self.api_url, headers=headers, json=payload, timeout=30) |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
result = response.json() |
|
|
if isinstance(result, list) and len(result) > 0: |
|
|
return result[0].get('generated_text', '') |
|
|
|
|
|
logger.warning(f"LLM API returned status {response.status_code}") |
|
|
return "" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error calling LLM API: {str(e)}") |
|
|
return "" |
|
|
|
|
|
def _parse_llm_response(self, text: str) -> List[Dict[str, str]]: |
|
|
"""Parse LLM response to extract structured data""" |
|
|
try: |
|
|
|
|
|
result = self._parse_json_from_text(text) |
|
|
|
|
|
if isinstance(result, list): |
|
|
return result |
|
|
elif isinstance(result, dict): |
|
|
return [result] |
|
|
|
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error parsing LLM response: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def _parse_json_from_text(self, text: str) -> any: |
|
|
"""Extract JSON from text""" |
|
|
try: |
|
|
|
|
|
return json.loads(text) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
import re |
|
|
|
|
|
|
|
|
array_match = re.search(r'\[.*\]', text, re.DOTALL) |
|
|
if array_match: |
|
|
try: |
|
|
return json.loads(array_match.group()) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
obj_match = re.search(r'\{.*\}', text, re.DOTALL) |
|
|
if obj_match: |
|
|
try: |
|
|
return json.loads(obj_match.group()) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return None |
|
|
|