Tanxshh's picture
Deploy GreenIntellect Backend API with ML models and scraping
02cc7f6
import asyncio
import os
import requests
import logging
from fake_useragent import UserAgent
try:
from ddgs import DDGS
except ImportError:
from duckduckgo_search import DDGS
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium_stealth import stealth
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
ua = UserAgent()
# Progress tracking
progress_callback = None
def set_progress_callback(callback):
"""Set a callback function to report progress"""
global progress_callback
progress_callback = callback
def report_progress(message, percentage):
"""Report progress if callback is set"""
if progress_callback:
progress_callback(message, percentage)
print(f"[{percentage}%] {message}")
def setup_selenium_driver():
"""Setup a stealth Selenium driver with HuggingFace/Docker compatibility"""
options = Options()
options.add_argument("--headless=new") # New headless mode
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--disable-extensions")
options.add_argument("--disable-infobars")
options.add_argument("--window-size=1920,1080")
options.add_argument(f"user-agent={ua.random}")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
# Check if running in Docker/HuggingFace environment
is_docker = os.path.exists("/.dockerenv") or os.environ.get("HF_SPACE_ID")
driver = None
if is_docker:
logger.info("Running in Docker/HuggingFace environment, using system Chromium")
# Use system Chromium in Docker
chromium_paths = ["/usr/bin/chromium", "/usr/bin/chromium-browser", "/usr/bin/google-chrome"]
chromedriver_paths = ["/usr/bin/chromedriver", "/usr/local/bin/chromedriver"]
for chromium_path in chromium_paths:
if os.path.exists(chromium_path):
options.binary_location = chromium_path
logger.info(f"Using Chromium at: {chromium_path}")
break
try:
# Try with system chromedriver first
for chromedriver_path in chromedriver_paths:
if os.path.exists(chromedriver_path):
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=options)
logger.info(f"Using chromedriver at: {chromedriver_path}")
break
if driver is None:
# Fallback to webdriver_manager
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)
except Exception as e:
logger.error(f"Docker Chrome setup failed: {e}")
# Final fallback - try default Chrome
try:
driver = webdriver.Chrome(options=options)
except Exception as e2:
logger.error(f"All Chrome drivers failed: {e2}")
raise
else:
# Local development - use webdriver_manager
try:
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)
except Exception as e:
logger.error(f"Failed to initialize Chrome driver with manager: {e}")
driver = webdriver.Chrome(options=options)
# Apply stealth settings
stealth(driver,
languages=["en-US", "en"],
vendor="Google Inc.",
platform="Win32",
webgl_vendor="Intel Inc.",
renderer="Intel Iris OpenGL Engine",
fix_hairline=True,
)
return driver
async def scrape_url_selenium(url):
"""Scrape a URL using Selenium Stealth for better evasion"""
logger.info(f"Scraping with Selenium: {url}")
try:
def _selenium_task():
driver = setup_selenium_driver()
try:
driver.get(url)
# Wait for some content (simple sleep for now, could be improved with WebDriverWait)
import time
time.sleep(3)
content = driver.page_source
return content
finally:
driver.quit()
content = await asyncio.to_thread(_selenium_task)
# Parse with BS4 to get clean text
soup = BeautifulSoup(content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text(separator=' ', strip=True)
return text, content
except Exception as e:
logger.error(f"Selenium scraping failed for {url}: {e}")
return "", ""
async def search_web(query, max_results=5):
"""
Search the web using DuckDuckGo (no API key required)
"""
try:
results = []
# specific implementation for DuckDuckGo might need sync wrapper if library is sync-only
# DDGS().text() is synchronous generator
def run_search():
with DDGS() as ddgs:
return list(ddgs.text(query, max_results=max_results))
# Run sync search in thread
search_results = await asyncio.to_thread(run_search)
for res in search_results:
results.append({
"title": res.get('title', ''),
"url": res.get('href', ''),
"content": res.get('body', ''),
"query_type": "web_search"
})
return results
except Exception as e:
print(f"Search error for '{query}': {e}")
return []
async def get_news_from_api(company_name):
"""
Use NewsAPI for reliable news collection
"""
api_key = os.getenv('NEWS_API_KEY')
if not api_key:
return []
try:
url = f"https://newsapi.org/v2/everything"
params = {
'q': f'{company_name} AND (sustainability OR greenwashing OR ESG OR environmental)',
'language': 'en',
'sortBy': 'relevancy',
'pageSize': 15,
'apiKey': api_key
}
# Requests is blocking, so we run it in a thread to verify
response = await asyncio.to_thread(requests.get, url, params=params, timeout=10)
data = response.json()
if data.get('status') == 'ok':
articles = []
for article in data.get('articles', []):
# Filter out removed content
if article.get('title') == '[Removed]': continue
# KEYWORD FILTERS (Same as Web Search)
title_lower = (article.get('title') or "").lower()
desc_lower = (article.get('description') or "").lower()
text_to_check = title_lower + " " + desc_lower
# 1. NEGATIVE FILTER: Exclude crime/fraud
bad_keywords = ["fraud", "arrest", "scam", "police", "laundering", "jail", "cbi", "ed", "bribe", "punish", "litigation"]
if any(bad in title_lower for bad in bad_keywords):
continue
# 2. POSITIVE FILTER: Must have ESG context (If query logic fails)
# NewsAPI query already has keywords, but let's double check to be safe
pass # Relying on API query "AND (sustainability OR ...)" for now
articles.append({
'url': article.get('url', ''),
'title': article.get('title', ''),
'content': (article.get('description') or '') + ' ' + (article.get('content') or ''),
'query_type': 'news_api'
})
return articles
except Exception as e:
print(f"NewsAPI error: {e}")
return []
# Helper for Filtering
def is_valid_result(res):
"""Filter out navigational, login, and irrelevant links"""
url = res.get('url', '').lower()
title = res.get('title', '').lower()
content = res.get('content', '').lower()
# 1. Exclude generic Google/Navigational links
invalid_domains = ['google.com/search', 'google.com/url', 'accounts.google.com', 'support.google.com',
'youtube.com', 'facebook.com', 'twitter.com/login', 'linkedin.com/login']
# 2. Exclude actions
invalid_terms = ['sign in', 'log in', 'forgot password', 'download', 'captcha', 'security check', 'robot', 'access denied']
if any(d in url for d in invalid_domains): return False
if any(t in title for t in invalid_terms): return False
# 3. Minimum content length/quality (for reviews)
# if len(content) < 20: return False # Optional rule
return True
async def get_company_news(company_name):
"""Get news using NewsAPI and DuckDuckGo Fallback"""
report_progress(f"Starting news collection for {company_name}", 10)
articles = []
# 1. Try NewsAPI (Limit increased to 20)
report_progress("Checking NewsAPI...", 15)
api_articles = await get_news_from_api(company_name)
articles.extend(api_articles)
# 2. Add Web Search (DuckDuckGo) for deeper coverage
report_progress("Fetching additional news via Web Search...", 25)
queries = [
f'"{company_name}" environmental impact report news',
f'"{company_name}" greenwashing controversy scandal',
f'"{company_name}" sustainability goals criticism',
f'"{company_name}" ESG rating news detected',
f'"{company_name}" climate change commitments review'
]
# ESG/Climate Keywords (Refined to avoid generic matches)
ESG_KEYWORDS = [
"climate", "carbon", "emission", "pollution", "sustainability", "esg",
"renewable", "net zero", "biodiversity", "ecological", "greenhouse", "fossil fuel"
]
# "green" and "environment" removed as they match "green light", "business environment"
# Negative Keywords to exclude financial crime/generic news
NEGATIVE_KEYWORDS = ["fraud", "arrest", "scam", "police", "laundering", "jail", "cbi", "ed", "bribe"]
for query in queries:
if len(articles) >= 20: break
results = await search_web(query, max_results=5)
for res in results:
if not is_valid_result(res): continue
# Combine Title + Body for checking
text_to_check = (res.get('title', '') + " " + res.get('body', '')).lower()
title_lower = res.get('title', '').lower()
# 1. NEGATIVE FILTER: Exclude crime/fraud immediately
if any(bad in title_lower for bad in NEGATIVE_KEYWORDS):
continue
# 2. POSITIVE FILTER: Must have ESG context
# Re-adding "environmental" specifically (not just environment)
if "environmental" in text_to_check: pass
elif not any(k in text_to_check for k in ESG_KEYWORDS):
continue # Skip if no environmental context found
# Simple de-duplication
if not any(a['url'] == res['url'] for a in articles):
articles.append(res)
report_progress(f"News collection complete: {len(articles)} articles", 45)
return articles[:20]
async def get_company_reviews(company_name):
"""Get reviews using Web Search (Glassdoor, Reddit, etc.)"""
report_progress(f"Starting review collection for {company_name}", 50)
reviews = []
# Using site: operators to force specific sources
queries = [
f'site:glassdoor.com "{company_name}" reviews "environment" OR "sustainability"',
f'site:reddit.com "{company_name}" greenwashing OR "toxic"',
f'site:trustpilot.com "{company_name}" environment',
f'"{company_name}" employee reviews sustainability ethics',
f'"{company_name}" environmental controversy reviews', # Broad fallback
f'"{company_name}" corporate responsibility feedback' # Broad fallback
]
total_queries = len(queries)
for idx, query in enumerate(queries):
progress = 50 + (idx / total_queries) * 30
report_progress(f"Searching specific reviews: {query}", int(progress))
results = await search_web(query, max_results=8)
for res in results:
if len(reviews) >= 40: break
if not is_valid_result(res): continue # FILTER HERE
# RELEVANCE CHECK (Strict)
# Ensure company name is actually mentioned in title or snippet
c_name_lower = company_name.lower()
res_content = (res.get('title', '') + " " + res.get('content', '')).lower()
# Simple substring match (can be improved with fuzzy later if needed)
if c_name_lower not in res_content and c_name_lower.split()[0] not in res_content:
# Try strict full name, then at least first word (e.g. "Google" in "Google Inc")
# But careful with generic first words like "The" or "Green"
if len(c_name_lower.split()[0]) > 3:
if c_name_lower.split()[0] not in res_content:
print(f"Skipping unrelated result: {res['title']}")
continue
else:
continue # Too short, require full name match
# Determine source type based on URL
source = "web"
if "glassdoor" in res['url']: source = "Glassdoor"
elif "twitter" in res['url'] or "x.com" in res['url']: source = "Twitter"
elif "linkedin" in res['url']: source = "LinkedIn"
elif "reddit" in res['url']: source = "Reddit"
elif "trustpilot" in res['url']: source = "Trustpilot"
# Clean title
title = res['title'].replace(" | Glassdoor", "").replace(" | Reddit", "")
reviews.append({
"url": res['url'],
"title": title,
"content": res['content'], # Use the snippet as the review content
"source_type": source
})
await asyncio.sleep(1)
# If few reviews found, try a broader fallback
if len(reviews) < 3:
report_progress("Few reviews found, trying specific broader query...", 75)
fallback_results = await search_web(f'"{company_name}" reviews environment', max_results=5)
for res in fallback_results:
if is_valid_result(res) and not any(r['url'] == res['url'] for r in reviews):
# RELEVANCE CHECK
c_name_lower = company_name.lower()
res_content = (res.get('title', '') + " " + res.get('content', '')).lower()
if c_name_lower not in res_content and c_name_lower.split()[0] not in res_content:
if len(c_name_lower.split()[0]) > 3:
if c_name_lower.split()[0] not in res_content: continue
else: continue
reviews.append({
"url": res['url'],
"title": res['title'],
"content": res['content'],
"source_type": "Web Search"
})
report_progress(f"Review collection complete: {len(reviews)} reviews", 80)
return reviews
# NO MOCK DATA FALLBACK
return reviews