Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| import requests | |
| import logging | |
| from fake_useragent import UserAgent | |
| try: | |
| from ddgs import DDGS | |
| except ImportError: | |
| from duckduckgo_search import DDGS | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.chrome.service import Service | |
| from selenium_stealth import stealth | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| from bs4 import BeautifulSoup | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| ua = UserAgent() | |
| # Progress tracking | |
| progress_callback = None | |
| def set_progress_callback(callback): | |
| """Set a callback function to report progress""" | |
| global progress_callback | |
| progress_callback = callback | |
| def report_progress(message, percentage): | |
| """Report progress if callback is set""" | |
| if progress_callback: | |
| progress_callback(message, percentage) | |
| print(f"[{percentage}%] {message}") | |
| def setup_selenium_driver(): | |
| """Setup a stealth Selenium driver with HuggingFace/Docker compatibility""" | |
| options = Options() | |
| options.add_argument("--headless=new") # New headless mode | |
| options.add_argument("--no-sandbox") | |
| options.add_argument("--disable-dev-shm-usage") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument("--disable-extensions") | |
| options.add_argument("--disable-infobars") | |
| options.add_argument("--window-size=1920,1080") | |
| options.add_argument(f"user-agent={ua.random}") | |
| options.add_experimental_option("excludeSwitches", ["enable-automation"]) | |
| options.add_experimental_option('useAutomationExtension', False) | |
| # Check if running in Docker/HuggingFace environment | |
| is_docker = os.path.exists("/.dockerenv") or os.environ.get("HF_SPACE_ID") | |
| driver = None | |
| if is_docker: | |
| logger.info("Running in Docker/HuggingFace environment, using system Chromium") | |
| # Use system Chromium in Docker | |
| chromium_paths = ["/usr/bin/chromium", "/usr/bin/chromium-browser", "/usr/bin/google-chrome"] | |
| chromedriver_paths = ["/usr/bin/chromedriver", "/usr/local/bin/chromedriver"] | |
| for chromium_path in chromium_paths: | |
| if os.path.exists(chromium_path): | |
| options.binary_location = chromium_path | |
| logger.info(f"Using Chromium at: {chromium_path}") | |
| break | |
| try: | |
| # Try with system chromedriver first | |
| for chromedriver_path in chromedriver_paths: | |
| if os.path.exists(chromedriver_path): | |
| service = Service(chromedriver_path) | |
| driver = webdriver.Chrome(service=service, options=options) | |
| logger.info(f"Using chromedriver at: {chromedriver_path}") | |
| break | |
| if driver is None: | |
| # Fallback to webdriver_manager | |
| service = Service(ChromeDriverManager().install()) | |
| driver = webdriver.Chrome(service=service, options=options) | |
| except Exception as e: | |
| logger.error(f"Docker Chrome setup failed: {e}") | |
| # Final fallback - try default Chrome | |
| try: | |
| driver = webdriver.Chrome(options=options) | |
| except Exception as e2: | |
| logger.error(f"All Chrome drivers failed: {e2}") | |
| raise | |
| else: | |
| # Local development - use webdriver_manager | |
| try: | |
| service = Service(ChromeDriverManager().install()) | |
| driver = webdriver.Chrome(service=service, options=options) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Chrome driver with manager: {e}") | |
| driver = webdriver.Chrome(options=options) | |
| # Apply stealth settings | |
| stealth(driver, | |
| languages=["en-US", "en"], | |
| vendor="Google Inc.", | |
| platform="Win32", | |
| webgl_vendor="Intel Inc.", | |
| renderer="Intel Iris OpenGL Engine", | |
| fix_hairline=True, | |
| ) | |
| return driver | |
| async def scrape_url_selenium(url): | |
| """Scrape a URL using Selenium Stealth for better evasion""" | |
| logger.info(f"Scraping with Selenium: {url}") | |
| try: | |
| def _selenium_task(): | |
| driver = setup_selenium_driver() | |
| try: | |
| driver.get(url) | |
| # Wait for some content (simple sleep for now, could be improved with WebDriverWait) | |
| import time | |
| time.sleep(3) | |
| content = driver.page_source | |
| return content | |
| finally: | |
| driver.quit() | |
| content = await asyncio.to_thread(_selenium_task) | |
| # Parse with BS4 to get clean text | |
| soup = BeautifulSoup(content, 'html.parser') | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| text = soup.get_text(separator=' ', strip=True) | |
| return text, content | |
| except Exception as e: | |
| logger.error(f"Selenium scraping failed for {url}: {e}") | |
| return "", "" | |
| async def search_web(query, max_results=5): | |
| """ | |
| Search the web using DuckDuckGo (no API key required) | |
| """ | |
| try: | |
| results = [] | |
| # specific implementation for DuckDuckGo might need sync wrapper if library is sync-only | |
| # DDGS().text() is synchronous generator | |
| def run_search(): | |
| with DDGS() as ddgs: | |
| return list(ddgs.text(query, max_results=max_results)) | |
| # Run sync search in thread | |
| search_results = await asyncio.to_thread(run_search) | |
| for res in search_results: | |
| results.append({ | |
| "title": res.get('title', ''), | |
| "url": res.get('href', ''), | |
| "content": res.get('body', ''), | |
| "query_type": "web_search" | |
| }) | |
| return results | |
| except Exception as e: | |
| print(f"Search error for '{query}': {e}") | |
| return [] | |
| async def get_news_from_api(company_name): | |
| """ | |
| Use NewsAPI for reliable news collection | |
| """ | |
| api_key = os.getenv('NEWS_API_KEY') | |
| if not api_key: | |
| return [] | |
| try: | |
| url = f"https://newsapi.org/v2/everything" | |
| params = { | |
| 'q': f'{company_name} AND (sustainability OR greenwashing OR ESG OR environmental)', | |
| 'language': 'en', | |
| 'sortBy': 'relevancy', | |
| 'pageSize': 15, | |
| 'apiKey': api_key | |
| } | |
| # Requests is blocking, so we run it in a thread to verify | |
| response = await asyncio.to_thread(requests.get, url, params=params, timeout=10) | |
| data = response.json() | |
| if data.get('status') == 'ok': | |
| articles = [] | |
| for article in data.get('articles', []): | |
| # Filter out removed content | |
| if article.get('title') == '[Removed]': continue | |
| # KEYWORD FILTERS (Same as Web Search) | |
| title_lower = (article.get('title') or "").lower() | |
| desc_lower = (article.get('description') or "").lower() | |
| text_to_check = title_lower + " " + desc_lower | |
| # 1. NEGATIVE FILTER: Exclude crime/fraud | |
| bad_keywords = ["fraud", "arrest", "scam", "police", "laundering", "jail", "cbi", "ed", "bribe", "punish", "litigation"] | |
| if any(bad in title_lower for bad in bad_keywords): | |
| continue | |
| # 2. POSITIVE FILTER: Must have ESG context (If query logic fails) | |
| # NewsAPI query already has keywords, but let's double check to be safe | |
| pass # Relying on API query "AND (sustainability OR ...)" for now | |
| articles.append({ | |
| 'url': article.get('url', ''), | |
| 'title': article.get('title', ''), | |
| 'content': (article.get('description') or '') + ' ' + (article.get('content') or ''), | |
| 'query_type': 'news_api' | |
| }) | |
| return articles | |
| except Exception as e: | |
| print(f"NewsAPI error: {e}") | |
| return [] | |
| # Helper for Filtering | |
| def is_valid_result(res): | |
| """Filter out navigational, login, and irrelevant links""" | |
| url = res.get('url', '').lower() | |
| title = res.get('title', '').lower() | |
| content = res.get('content', '').lower() | |
| # 1. Exclude generic Google/Navigational links | |
| invalid_domains = ['google.com/search', 'google.com/url', 'accounts.google.com', 'support.google.com', | |
| 'youtube.com', 'facebook.com', 'twitter.com/login', 'linkedin.com/login'] | |
| # 2. Exclude actions | |
| invalid_terms = ['sign in', 'log in', 'forgot password', 'download', 'captcha', 'security check', 'robot', 'access denied'] | |
| if any(d in url for d in invalid_domains): return False | |
| if any(t in title for t in invalid_terms): return False | |
| # 3. Minimum content length/quality (for reviews) | |
| # if len(content) < 20: return False # Optional rule | |
| return True | |
| async def get_company_news(company_name): | |
| """Get news using NewsAPI and DuckDuckGo Fallback""" | |
| report_progress(f"Starting news collection for {company_name}", 10) | |
| articles = [] | |
| # 1. Try NewsAPI (Limit increased to 20) | |
| report_progress("Checking NewsAPI...", 15) | |
| api_articles = await get_news_from_api(company_name) | |
| articles.extend(api_articles) | |
| # 2. Add Web Search (DuckDuckGo) for deeper coverage | |
| report_progress("Fetching additional news via Web Search...", 25) | |
| queries = [ | |
| f'"{company_name}" environmental impact report news', | |
| f'"{company_name}" greenwashing controversy scandal', | |
| f'"{company_name}" sustainability goals criticism', | |
| f'"{company_name}" ESG rating news detected', | |
| f'"{company_name}" climate change commitments review' | |
| ] | |
| # ESG/Climate Keywords (Refined to avoid generic matches) | |
| ESG_KEYWORDS = [ | |
| "climate", "carbon", "emission", "pollution", "sustainability", "esg", | |
| "renewable", "net zero", "biodiversity", "ecological", "greenhouse", "fossil fuel" | |
| ] | |
| # "green" and "environment" removed as they match "green light", "business environment" | |
| # Negative Keywords to exclude financial crime/generic news | |
| NEGATIVE_KEYWORDS = ["fraud", "arrest", "scam", "police", "laundering", "jail", "cbi", "ed", "bribe"] | |
| for query in queries: | |
| if len(articles) >= 20: break | |
| results = await search_web(query, max_results=5) | |
| for res in results: | |
| if not is_valid_result(res): continue | |
| # Combine Title + Body for checking | |
| text_to_check = (res.get('title', '') + " " + res.get('body', '')).lower() | |
| title_lower = res.get('title', '').lower() | |
| # 1. NEGATIVE FILTER: Exclude crime/fraud immediately | |
| if any(bad in title_lower for bad in NEGATIVE_KEYWORDS): | |
| continue | |
| # 2. POSITIVE FILTER: Must have ESG context | |
| # Re-adding "environmental" specifically (not just environment) | |
| if "environmental" in text_to_check: pass | |
| elif not any(k in text_to_check for k in ESG_KEYWORDS): | |
| continue # Skip if no environmental context found | |
| # Simple de-duplication | |
| if not any(a['url'] == res['url'] for a in articles): | |
| articles.append(res) | |
| report_progress(f"News collection complete: {len(articles)} articles", 45) | |
| return articles[:20] | |
| async def get_company_reviews(company_name): | |
| """Get reviews using Web Search (Glassdoor, Reddit, etc.)""" | |
| report_progress(f"Starting review collection for {company_name}", 50) | |
| reviews = [] | |
| # Using site: operators to force specific sources | |
| queries = [ | |
| f'site:glassdoor.com "{company_name}" reviews "environment" OR "sustainability"', | |
| f'site:reddit.com "{company_name}" greenwashing OR "toxic"', | |
| f'site:trustpilot.com "{company_name}" environment', | |
| f'"{company_name}" employee reviews sustainability ethics', | |
| f'"{company_name}" environmental controversy reviews', # Broad fallback | |
| f'"{company_name}" corporate responsibility feedback' # Broad fallback | |
| ] | |
| total_queries = len(queries) | |
| for idx, query in enumerate(queries): | |
| progress = 50 + (idx / total_queries) * 30 | |
| report_progress(f"Searching specific reviews: {query}", int(progress)) | |
| results = await search_web(query, max_results=8) | |
| for res in results: | |
| if len(reviews) >= 40: break | |
| if not is_valid_result(res): continue # FILTER HERE | |
| # RELEVANCE CHECK (Strict) | |
| # Ensure company name is actually mentioned in title or snippet | |
| c_name_lower = company_name.lower() | |
| res_content = (res.get('title', '') + " " + res.get('content', '')).lower() | |
| # Simple substring match (can be improved with fuzzy later if needed) | |
| if c_name_lower not in res_content and c_name_lower.split()[0] not in res_content: | |
| # Try strict full name, then at least first word (e.g. "Google" in "Google Inc") | |
| # But careful with generic first words like "The" or "Green" | |
| if len(c_name_lower.split()[0]) > 3: | |
| if c_name_lower.split()[0] not in res_content: | |
| print(f"Skipping unrelated result: {res['title']}") | |
| continue | |
| else: | |
| continue # Too short, require full name match | |
| # Determine source type based on URL | |
| source = "web" | |
| if "glassdoor" in res['url']: source = "Glassdoor" | |
| elif "twitter" in res['url'] or "x.com" in res['url']: source = "Twitter" | |
| elif "linkedin" in res['url']: source = "LinkedIn" | |
| elif "reddit" in res['url']: source = "Reddit" | |
| elif "trustpilot" in res['url']: source = "Trustpilot" | |
| # Clean title | |
| title = res['title'].replace(" | Glassdoor", "").replace(" | Reddit", "") | |
| reviews.append({ | |
| "url": res['url'], | |
| "title": title, | |
| "content": res['content'], # Use the snippet as the review content | |
| "source_type": source | |
| }) | |
| await asyncio.sleep(1) | |
| # If few reviews found, try a broader fallback | |
| if len(reviews) < 3: | |
| report_progress("Few reviews found, trying specific broader query...", 75) | |
| fallback_results = await search_web(f'"{company_name}" reviews environment', max_results=5) | |
| for res in fallback_results: | |
| if is_valid_result(res) and not any(r['url'] == res['url'] for r in reviews): | |
| # RELEVANCE CHECK | |
| c_name_lower = company_name.lower() | |
| res_content = (res.get('title', '') + " " + res.get('content', '')).lower() | |
| if c_name_lower not in res_content and c_name_lower.split()[0] not in res_content: | |
| if len(c_name_lower.split()[0]) > 3: | |
| if c_name_lower.split()[0] not in res_content: continue | |
| else: continue | |
| reviews.append({ | |
| "url": res['url'], | |
| "title": res['title'], | |
| "content": res['content'], | |
| "source_type": "Web Search" | |
| }) | |
| report_progress(f"Review collection complete: {len(reviews)} reviews", 80) | |
| return reviews | |
| # NO MOCK DATA FALLBACK | |
| return reviews | |