diff --git "a/src/utils/utils.py" "b/src/utils/utils.py" --- "a/src/utils/utils.py" +++ "b/src/utils/utils.py" @@ -1,7 +1,7 @@ # src/utils/utils.py """ COMPLETE - All scraping tools and utilities for Roger platform -Updated: +Updated: - Fixed Playwright Syntax Error (removed invalid 'request_timeout'). - Added 'Requests-First' strategy for 10x faster scraping. - Added 'Rainfall' PDF detection for district-level rain data. @@ -25,7 +25,11 @@ import random # Optional Playwright import try: - from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError + from playwright.sync_api import ( + sync_playwright, + TimeoutError as PlaywrightTimeoutError, + ) + PLAYWRIGHT_AVAILABLE = True except Exception: PLAYWRIGHT_AVAILABLE = False @@ -33,6 +37,7 @@ except Exception: # Optional PDF Reader import try: from pypdf import PdfReader + PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False @@ -70,6 +75,7 @@ logger.setLevel(logging.INFO) # UTILITIES # ============================================ + def get_today_str() -> str: return datetime.now().strftime("%a %b %d, %Y") @@ -82,13 +88,15 @@ def _get_site_timeout(url: str) -> int: return DEFAULT_TIMEOUT -def _safe_get(url: str, timeout: int = None, headers: Optional[Dict[str,str]] = None) -> Optional[requests.Response]: +def _safe_get( + url: str, timeout: int = None, headers: Optional[Dict[str, str]] = None +) -> Optional[requests.Response]: """HTTP GET with retries, site-specific timeouts, and error handling.""" headers = headers or DEFAULT_HEADERS # Use site-specific timeout if not explicitly provided if timeout is None: timeout = _get_site_timeout(url) - + for attempt in range(MAX_RETRIES): try: resp = requests.get(url, headers=headers, timeout=timeout) @@ -96,11 +104,13 @@ def _safe_get(url: str, timeout: int = None, headers: Optional[Dict[str,str]] = return resp logger.warning(f"[HTTP] {url} returned {resp.status_code}") except requests.exceptions.Timeout: - logger.warning(f"[HTTP] Timeout on {url} (attempt {attempt + 1}/{MAX_RETRIES}, timeout={timeout}s)") + logger.warning( + f"[HTTP] Timeout on {url} (attempt {attempt + 1}/{MAX_RETRIES}, timeout={timeout}s)" + ) except requests.exceptions.RequestException as e: logger.error(f"[HTTP] Error fetching {url}: {e}") if attempt < MAX_RETRIES - 1: - time.sleep(2 ** attempt) + time.sleep(2**attempt) return None @@ -134,7 +144,7 @@ def _extract_text_from_pdf_url(pdf_url: str) -> str: """ Downloads a PDF from a URL and extracts its text content. Returns a summarized string of the content. - + ENHANCED: Validates content-type before parsing to avoid HTML error pages. """ if not PDF_AVAILABLE: @@ -149,28 +159,38 @@ def _extract_text_from_pdf_url(pdf_url: str) -> str: elif "meteo.gov.lk" in pdf_url: headers["Referer"] = "https://meteo.gov.lk/" else: - headers["Referer"] = pdf_url.rsplit('/', 1)[0] - - response = requests.get(pdf_url, headers=headers, timeout=30, allow_redirects=True) + headers["Referer"] = pdf_url.rsplit("/", 1)[0] + + response = requests.get( + pdf_url, headers=headers, timeout=30, allow_redirects=True + ) response.raise_for_status() - + # 2. CRITICAL: Validate content-type before parsing - content_type = response.headers.get('Content-Type', '').lower() + content_type = response.headers.get("Content-Type", "").lower() content_bytes = response.content[:20] # First 20 bytes for header check - + # Check if response is actually a PDF - is_pdf_content_type = 'application/pdf' in content_type - is_pdf_header = content_bytes.startswith(b'%PDF') - + is_pdf_content_type = "application/pdf" in content_type + is_pdf_header = content_bytes.startswith(b"%PDF") + if not is_pdf_content_type and not is_pdf_header: # Check if we got HTML instead (common error response) - if content_bytes.startswith(b' str: except Exception as pdf_error: logger.warning(f"[PDF] Failed to parse PDF from {pdf_url}: {pdf_error}") return "[PDF unavailable: Could not parse PDF structure]" - + text_content = [] - + # Extract text from ALL pages (no limit) for i, page in enumerate(reader.pages): try: @@ -190,14 +210,14 @@ def _extract_text_from_pdf_url(pdf_url: str) -> str: except Exception as page_error: logger.debug(f"[PDF] Error extracting page {i}: {page_error}") continue - + if not text_content: return "[PDF extracted but contains no readable text]" - + full_text = "\n".join(text_content) - + # No language filtering - extract ALL text regardless of language - full_text = re.sub(r'\n+', '\n', full_text).strip() + full_text = re.sub(r"\n+", "\n", full_text).strip() return full_text # Return full text without length limit except requests.exceptions.Timeout: @@ -215,12 +235,17 @@ def _extract_text_from_pdf_url(pdf_url: str) -> str: # PLAYWRIGHT SESSION HELPERS # ============================================ + def ensure_playwright(): if not PLAYWRIGHT_AVAILABLE: - raise RuntimeError("Playwright is not installed. Install with `pip install playwright` and run `playwright install`.") + raise RuntimeError( + "Playwright is not installed. Install with `pip install playwright` and run `playwright install`." + ) -def save_playwright_storage_state(site_name: str, storage_state: dict, out_dir: str = ".sessions") -> str: +def save_playwright_storage_state( + site_name: str, storage_state: dict, out_dir: str = ".sessions" +) -> str: os.makedirs(out_dir, exist_ok=True) path = os.path.join(out_dir, f"{site_name}_storage_state.json") with open(path, "w", encoding="utf-8") as f: @@ -228,7 +253,9 @@ def save_playwright_storage_state(site_name: str, storage_state: dict, out_dir: return path -def load_playwright_storage_state_path(site_name: str, out_dir: str = ".sessions") -> Optional[str]: +def load_playwright_storage_state_path( + site_name: str, out_dir: str = ".sessions" +) -> Optional[str]: """ Robustly finds the session file in multiple possible locations. Priority order: @@ -237,21 +264,23 @@ def load_playwright_storage_state_path(site_name: str, out_dir: str = ".sessions 3. Root project .sessions/ """ filename = f"{site_name}_storage_state.json" - + # Priority 1: Check src/utils/.sessions/ (most likely location) src_utils_path = os.path.join(os.getcwd(), "src", "utils", out_dir, filename) if os.path.exists(src_utils_path): logger.info(f"[SESSION] āœ… Found session at {src_utils_path}") return src_utils_path - + # Priority 2: Check current working directory .sessions/ cwd_path = os.path.join(os.getcwd(), out_dir, filename) if os.path.exists(cwd_path): logger.info(f"[SESSION] āœ… Found session at {cwd_path}") return cwd_path - + # Priority 3: Check project root .sessions/ - base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + base_dir = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ) root_path = os.path.join(base_dir, out_dir, filename) if os.path.exists(root_path): logger.info(f"[SESSION] āœ… Found session at {root_path}") @@ -268,9 +297,12 @@ def load_playwright_storage_state_path(site_name: str, out_dir: str = ".sessions logger.warning(f" 1. {src_utils_path}") logger.warning(f" 2. {cwd_path}") logger.warning(f" 3. {root_path}") - logger.warning(f"\nšŸ’” Run 'python src/utils/session_manager.py' to create sessions.") + logger.warning( + f"\nšŸ’” Run 'python src/utils/session_manager.py' to create sessions." + ) return None + def create_or_restore_playwright_session( site_name: str, login_flow: Optional[dict] = None, @@ -287,7 +319,9 @@ def create_or_restore_playwright_session( session_path = os.path.join(storage_dir, f"{site_name}_storage_state.json") if not login_flow: - raise RuntimeError(f"No existing session for {site_name} and no login_flow provided to create one.") + raise RuntimeError( + f"No existing session for {site_name} and no login_flow provided to create one." + ) logger.info(f"[PLAYWRIGHT] Creating new session for {site_name}...") with sync_playwright() as p: @@ -305,29 +339,38 @@ def create_or_restore_playwright_session( elif st == "click": page.click(sel, timeout=15000) elif st == "wait": - page.wait_for_selector(step.get("selector"), timeout=step.get("timeout", 15000)) + page.wait_for_selector( + step.get("selector"), timeout=step.get("timeout", 15000) + ) elif st == "goto": page.goto(step.get("url"), wait_until=wait_until, timeout=60000) - + storage = context.storage_state() with open(session_path, "w", encoding="utf-8") as f: json.dump(storage, f) logger.info(f"[PLAYWRIGHT] Saved session storage_state to {session_path}") return session_path finally: - try: context.close() - except: pass + try: + context.close() + except: + pass browser.close() -def playwright_fetch_html_using_session(url: str, storage_state_path: Optional[str], headless: bool = True, wait_until: str = "networkidle") -> str: +def playwright_fetch_html_using_session( + url: str, + storage_state_path: Optional[str], + headless: bool = True, + wait_until: str = "networkidle", +) -> str: ensure_playwright() with sync_playwright() as p: browser = p.chromium.launch(headless=headless) context_args = {} if storage_state_path and os.path.exists(storage_state_path): context_args["storage_state"] = storage_state_path - + context = browser.new_context(**context_args) page = context.new_page() try: @@ -339,8 +382,10 @@ def playwright_fetch_html_using_session(url: str, storage_state_path: Optional[s logger.error(f"[PLAYWRIGHT] Timeout fetching {url}: {e}") return "" finally: - try: context.close() - except: pass + try: + context.close() + except: + pass browser.close() @@ -356,120 +401,165 @@ RIVERNET_CACHE_DURATION_MINUTES = 30 # Increased from 15 to reduce load # All rivers monitored by rivernet.lk (expanded list) RIVERNET_LOCATIONS = { # Main rivers - "kelaniya": {"name": "Kelani River", "region": "Western", "url": "https://rivernet.lk/kelaniya"}, - "ratnapura": {"name": "Kalu Ganga", "region": "Sabaragamuwa", "url": "https://rivernet.lk/ratnapura"}, - "gampaha": {"name": "Maha Oya", "region": "Western", "url": "https://rivernet.lk/gampaha"}, - "nilwala": {"name": "Nilwala River", "region": "Southern", "url": "https://rivernet.lk/nilwala"}, - "galoya": {"name": "Gal Oya", "region": "Eastern", "url": "https://rivernet.lk/galoya"}, - "deduruoya": {"name": "Deduru Oya", "region": "North Western", "url": "https://rivernet.lk/deduruoya"}, + "kelaniya": { + "name": "Kelani River", + "region": "Western", + "url": "https://rivernet.lk/kelaniya", + }, + "ratnapura": { + "name": "Kalu Ganga", + "region": "Sabaragamuwa", + "url": "https://rivernet.lk/ratnapura", + }, + "gampaha": { + "name": "Maha Oya", + "region": "Western", + "url": "https://rivernet.lk/gampaha", + }, + "nilwala": { + "name": "Nilwala River", + "region": "Southern", + "url": "https://rivernet.lk/nilwala", + }, + "galoya": { + "name": "Gal Oya", + "region": "Eastern", + "url": "https://rivernet.lk/galoya", + }, + "deduruoya": { + "name": "Deduru Oya", + "region": "North Western", + "url": "https://rivernet.lk/deduruoya", + }, # Batticaloa basins (accessed via query parameter) - "maduru_oya": {"name": "Maduru Oya", "region": "Batticaloa", "url": "https://rivernet.lk/batticaloa?basin=maduru_oya_basin"}, - "andella_oya": {"name": "Andella Oya", "region": "Batticaloa", "url": "https://rivernet.lk/batticaloa?basin=andella_oya_basin"}, - "magalawattuwan_oya": {"name": "Magalawattuwan Oya", "region": "Batticaloa", "url": "https://rivernet.lk/batticaloa?basin=magalawattuwan_oya_basin"}, - "mundeni_aru": {"name": "Mundeni Aru", "region": "Batticaloa", "url": "https://rivernet.lk/batticaloa?basin=mundeni_aru_basin"}, + "maduru_oya": { + "name": "Maduru Oya", + "region": "Batticaloa", + "url": "https://rivernet.lk/batticaloa?basin=maduru_oya_basin", + }, + "andella_oya": { + "name": "Andella Oya", + "region": "Batticaloa", + "url": "https://rivernet.lk/batticaloa?basin=andella_oya_basin", + }, + "magalawattuwan_oya": { + "name": "Magalawattuwan Oya", + "region": "Batticaloa", + "url": "https://rivernet.lk/batticaloa?basin=magalawattuwan_oya_basin", + }, + "mundeni_aru": { + "name": "Mundeni Aru", + "region": "Batticaloa", + "url": "https://rivernet.lk/batticaloa?basin=mundeni_aru_basin", + }, } - def scrape_rivernet_impl( locations: Optional[List[str]] = None, use_cache: bool = True, ) -> Dict[str, Any]: """ Scrape river level data from rivernet.lk (Flood Early Warning System) - + IMPORTANT: rivernet.lk is a Flutter SPA, so we need Playwright for scraping. Data is cached for 15 minutes to reduce load on the service. - + Args: locations: List of location keys to scrape (e.g., ["kelaniya", "ratnapura"]) If None, scrapes all major locations use_cache: Whether to use cached data if available - + Returns: Dict with river levels, warnings, and status for each location """ global _rivernet_cache, _rivernet_cache_time - + # Check cache if use_cache and _rivernet_cache_time: cache_age = (datetime.utcnow() - _rivernet_cache_time).total_seconds() / 60 if cache_age < RIVERNET_CACHE_DURATION_MINUTES: logger.info(f"[RIVERNET] Using cached data ({cache_age:.1f} min old)") return _rivernet_cache - + if not PLAYWRIGHT_AVAILABLE: - logger.warning("[RIVERNET] Playwright not available. Cannot scrape rivernet.lk (Flutter SPA)") + logger.warning( + "[RIVERNET] Playwright not available. Cannot scrape rivernet.lk (Flutter SPA)" + ) return { "error": "Playwright required for rivernet.lk (Flutter SPA)", "suggestion": "Install playwright: pip install playwright && playwright install chromium", - "fetched_at": datetime.utcnow().isoformat() + "fetched_at": datetime.utcnow().isoformat(), } - + logger.info("[RIVERNET] Starting river level data collection...") - + results = { "rivers": [], "alerts": [], "summary": {}, "fetched_at": datetime.utcnow().isoformat(), - "source": "rivernet.lk" + "source": "rivernet.lk", } - + # Determine which locations to scrape target_locations = locations or list(RIVERNET_LOCATIONS.keys()) - + try: with sync_playwright() as p: browser = p.chromium.launch(headless=True) context = browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", - viewport={"width": 1280, "height": 720} + viewport={"width": 1280, "height": 720}, ) page = context.new_page() page.set_default_timeout(90000) # Increased to 90s for slow Flutter SPA - + # First, visit main page to get overall status try: - page.goto("https://rivernet.lk/", wait_until="networkidle", timeout=90000) # 90s + page.goto( + "https://rivernet.lk/", wait_until="networkidle", timeout=90000 + ) # 90s # Wait for Flutter to load time.sleep(5) # Increased to 5s for Flutter rendering - + # Try to extract any visible data from main page main_html = page.content() main_soup = BeautifulSoup(main_html, "html.parser") - + # NOTE: Disabled loose keyword extraction - was causing false positives # Real flood alerts will be determined from individual river page status # The previous alert_keywords approach matched generic site text like # "warning: javascript required" causing fake alerts - + # If we need main page alerts, look for specific alert banner elements # alert_banners = main_soup.select(".alert-banner, .flood-warning, .critical-notice") # for banner in alert_banners: # results["alerts"].append({...}) - + logger.info("[RIVERNET] Main page loaded successfully") - + except Exception as e: logger.warning(f"[RIVERNET] Error loading main page: {e}") - + # Visit each river location page (all 10 rivers) for loc_key in target_locations[:10]: # All 10 rivers if loc_key not in RIVERNET_LOCATIONS: continue - + loc_info = RIVERNET_LOCATIONS[loc_key] - + try: logger.info(f"[RIVERNET] Checking {loc_info['name']}...") - page.goto(loc_info["url"], wait_until="networkidle", timeout=90000) # 90s timeout + page.goto( + loc_info["url"], wait_until="networkidle", timeout=90000 + ) # 90s timeout time.sleep(5) # Wait for Flutter content to render - + html = page.content() soup = BeautifulSoup(html, "html.parser") page_text = soup.get_text(separator="\n", strip=True) - + # Extract river data from page text river_data = { "location_key": loc_key, @@ -480,9 +570,9 @@ def scrape_rivernet_impl( "water_level": None, "warning_level": None, "last_updated": None, - "raw_text": page_text[:500] if page_text else None + "raw_text": page_text[:500] if page_text else None, } - + # Try to extract water level (expanded patterns for rivernet.lk) level_patterns = [ # Standard formats @@ -496,117 +586,163 @@ def scrape_rivernet_impl( # Warning threshold pattern r"threshold[:\s]*([0-9]+\.?[0-9]*)", ] - + for pattern in level_patterns: match = re.search(pattern, page_text, re.I) if match: try: value = float(match.group(1)) - if 0 < value < 50: # Sanity check (rivers typically 0-50m) + if ( + 0 < value < 50 + ): # Sanity check (rivers typically 0-50m) river_data["water_level"] = { "value": round(value, 2), - "unit": match.group(2) if len(match.groups()) > 1 and match.group(2) else "m" + "unit": ( + match.group(2) + if len(match.groups()) > 1 + and match.group(2) + else "m" + ), } logger.info(f" Water level: {value}m") break except (ValueError, IndexError): continue - + # Determine status based on keywords (STRICTER to avoid false positives) text_lower = page_text.lower() - + # Default to normal - only escalate if clear flood indicators river_data["status"] = "normal" - + # CRITICAL: Only consider keywords in FLOOD CONTEXT # Look for phrases, not just words, to avoid false positives - + # DANGER / CRITICAL - Very specific phrases only danger_phrases = [ - "major flood", "danger level exceeded", "critical flood", - "red alert", "evacuate immediately", "extreme flood", - "water level exceeds danger", "above danger level" + "major flood", + "danger level exceeded", + "critical flood", + "red alert", + "evacuate immediately", + "extreme flood", + "water level exceeds danger", + "above danger level", ] if any(phrase in text_lower for phrase in danger_phrases): river_data["status"] = "danger" - + # WARNING - Specific flood warning phrases - elif any(phrase in text_lower for phrase in [ - "minor flood", "warning level exceeded", "flood alert issued", - "amber alert", "approaching warning level", - "water level exceeds warning", "above warning level" - ]): + elif any( + phrase in text_lower + for phrase in [ + "minor flood", + "warning level exceeded", + "flood alert issued", + "amber alert", + "approaching warning level", + "water level exceeds warning", + "above warning level", + ] + ): river_data["status"] = "warning" - + # RISING - Only if explicitly rising - elif any(phrase in text_lower for phrase in [ - "water level rising", "rising trend detected", - "level is rising rapidly", "increasing water level" - ]): + elif any( + phrase in text_lower + for phrase in [ + "water level rising", + "rising trend detected", + "level is rising rapidly", + "increasing water level", + ] + ): river_data["status"] = "rising" - + # NORMAL indicators (optional, just for logging) - elif any(phrase in text_lower for phrase in [ - "normal level", "stable", "safe level", "decreasing", "below warning" - ]): + elif any( + phrase in text_lower + for phrase in [ + "normal level", + "stable", + "safe level", + "decreasing", + "below warning", + ] + ): river_data["status"] = "normal" - + results["rivers"].append(river_data) logger.info(f" āœ“ {loc_info['name']}: {river_data['status']}") - + except Exception as e: logger.warning(f"[RIVERNET] Error scraping {loc_info['name']}: {e}") - results["rivers"].append({ - "location_key": loc_key, - "name": loc_info["name"], - "region": loc_info["region"], - "status": "error", - "error": str(e) - }) - + results["rivers"].append( + { + "location_key": loc_key, + "name": loc_info["name"], + "region": loc_info["region"], + "status": "error", + "error": str(e), + } + ) + browser.close() - + except Exception as e: logger.error(f"[RIVERNET] Critical error: {e}") results["error"] = str(e) - + # Generate summary - status_counts = {"danger": 0, "warning": 0, "rising": 0, "normal": 0, "unknown": 0, "error": 0} + status_counts = { + "danger": 0, + "warning": 0, + "rising": 0, + "normal": 0, + "unknown": 0, + "error": 0, + } for river in results["rivers"]: status = river.get("status", "unknown") status_counts[status] = status_counts.get(status, 0) + 1 - + results["summary"] = { "total_monitored": len(results["rivers"]), "status_breakdown": status_counts, "has_alerts": status_counts["danger"] > 0 or status_counts["warning"] > 0, - "overall_status": "danger" if status_counts["danger"] > 0 else ( - "warning" if status_counts["warning"] > 0 else ( - "rising" if status_counts["rising"] > 0 else "normal" + "overall_status": ( + "danger" + if status_counts["danger"] > 0 + else ( + "warning" + if status_counts["warning"] > 0 + else ("rising" if status_counts["rising"] > 0 else "normal") ) - ) + ), } - + # Update cache _rivernet_cache = results _rivernet_cache_time = datetime.utcnow() - - logger.info(f"[RIVERNET] Completed: {len(results['rivers'])} rivers, {len(results['alerts'])} alerts") + + logger.info( + f"[RIVERNET] Completed: {len(results['rivers'])} rivers, {len(results['alerts'])} alerts" + ) return results def tool_rivernet_status() -> Dict[str, Any]: """ Get current river levels and flood warnings from rivernet.lk - + Returns real-time river level data for major rivers in Sri Lanka including: - Kelani River (Western Province) - - Kalu Ganga (Sabaragamuwa) + - Kalu Ganga (Sabaragamuwa) - Nilwala (Southern) - Maha Oya (Western) - Gal Oya (Eastern) - Deduru Oya (North Western) - + Data is cached for 15 minutes to reduce load. """ return scrape_rivernet_impl(use_cache=True) @@ -615,24 +751,24 @@ def tool_rivernet_status() -> Dict[str, Any]: def tool_district_weather(district: str = "colombo") -> Dict[str, Any]: """ Get weather forecast for a specific district of Sri Lanka. - + Args: district: District name (e.g., 'colombo', 'kandy', 'galle') - + Returns: District-specific weather forecast with temperature and conditions """ district_lower = district.lower().strip() - + # Use the weather nowcast tool and filter for district weather_data = tool_weather_nowcast(location=district) - + if "error" in weather_data: return weather_data - + # Extract district-specific information from the forecast forecast_text = weather_data.get("forecast", "") - + # Try to find district-specific mention district_info = { "district": district.title(), @@ -640,13 +776,13 @@ def tool_district_weather(district: str = "colombo") -> Dict[str, Any]: "source": weather_data.get("source"), "fetched_at": weather_data.get("fetched_at"), } - + # Look for district in the forecast text district_pattern = rf"(?:{district}|{district.title()})[:\s]*([^\n]+)" match = re.search(district_pattern, forecast_text, re.I) if match: district_info["specific_forecast"] = match.group(0) - + return district_info @@ -663,46 +799,44 @@ FLOODWATCH_CACHE_DURATION_HOURS = 24 def tool_floodwatch_historical() -> Dict[str, Any]: """ Get 30-year historical flood pattern analysis data. - + Provides climate trend data including: - Average annual rainfall (mm) - Maximum daily rainfall records - Heavy rain days (>50mm) count - Extreme rain days (>100mm) count - Decadal comparison (1995-2025) - + Data is cached for 24 hours as it doesn't change frequently. - + Returns: Dict with historical flood pattern analysis """ global _floodwatch_historical_cache, _floodwatch_cache_time - + # Check cache (24 hour TTL) if _floodwatch_historical_cache and _floodwatch_cache_time: cache_age = (datetime.utcnow() - _floodwatch_cache_time).total_seconds() / 3600 if cache_age < FLOODWATCH_CACHE_DURATION_HOURS: logger.info("[FLOODWATCH] Returning cached historical data") return _floodwatch_historical_cache - + logger.info("[FLOODWATCH] Fetching historical climate data") - + # Historical data based on Sri Lanka Meteorological Department records # These are realistic values for Sri Lanka's climate historical_data = { "source": "FloodWatch Sri Lanka / Meteorological Department", "period": "1995-2025 (30 Years)", "fetched_at": datetime.utcnow().isoformat(), - # Overall statistics "statistics": { "avg_annual_rainfall_mm": 2930, "max_daily_rainfall_mm": 218, "heavy_rain_days_50mm": 98, "extreme_rain_days_100mm": 15, - "avg_flood_events_per_year": 4.2 + "avg_flood_events_per_year": 4.2, }, - # Decadal comparison "decadal_analysis": [ { @@ -710,83 +844,80 @@ def tool_floodwatch_historical() -> Dict[str, Any]: "avg_rainfall_mm": 2650, "extreme_days": 11, "max_daily_mm": 175, - "major_flood_events": 8 + "major_flood_events": 8, }, { - "period": "2005-2014", + "period": "2005-2014", "avg_rainfall_mm": 2850, "extreme_days": 14, "max_daily_mm": 198, - "major_flood_events": 12 + "major_flood_events": 12, }, { "period": "2015-2025", "avg_rainfall_mm": 3290, "extreme_days": 18, "max_daily_mm": 218, - "major_flood_events": 17 - } + "major_flood_events": 17, + }, ], - # Key climate change findings "key_findings": [ "Maximum daily rainfall intensity has increased by 43%", "Extreme rain days (>100mm) have increased by 64% since 1995", "Major flood events have doubled in the last decade", "Southwest monsoon intensity shows increasing trend", - "Inter-monsoonal rainfall becoming more erratic" + "Inter-monsoonal rainfall becoming more erratic", ], - # High-risk months "high_risk_periods": [ {"months": "May-June", "type": "Southwest Monsoon Onset", "risk": "high"}, {"months": "October-November", "type": "Northeast Monsoon", "risk": "high"}, - {"months": "April-May", "type": "Inter-monsoon (First)", "risk": "medium"} - ] + {"months": "April-May", "type": "Inter-monsoon (First)", "risk": "medium"}, + ], } - + # Cache the data _floodwatch_historical_cache = historical_data _floodwatch_cache_time = datetime.utcnow() - + return historical_data def tool_calculate_national_threat( - river_data: Optional[Dict[str, Any]] = None, - dmc_alerts: Optional[List[str]] = None + river_data: Optional[Dict[str, Any]] = None, dmc_alerts: Optional[List[str]] = None ) -> Dict[str, Any]: """ Calculate national flood threat score (0-100). - + Aggregates data from multiple sources to compute an overall threat level for Sri Lanka. - + Args: river_data: RiverNet data with river statuses dmc_alerts: List of active DMC alerts - + Returns: Dict with threat score, breakdown, and risk districts """ logger.info("[THREAT] Calculating national threat score") - + score = 0 breakdown = { "river_contribution": 0, "alert_contribution": 0, - "seasonal_contribution": 0 + "seasonal_contribution": 0, } critical_districts = [] high_risk_districts = [] medium_risk_districts = [] - + # 1. River status contribution (max 50 points) if river_data and river_data.get("rivers"): for river in river_data.get("rivers", []): status = river.get("status", "unknown").lower() region = river.get("region", "") - + if status == "danger": breakdown["river_contribution"] += 15 if region and region not in critical_districts: @@ -799,9 +930,9 @@ def tool_calculate_national_threat( breakdown["river_contribution"] += 3 if region and region not in medium_risk_districts: medium_risk_districts.append(region) - + breakdown["river_contribution"] = min(50, breakdown["river_contribution"]) - + # 2. DMC Alert contribution (max 30 points) if dmc_alerts: for alert in dmc_alerts: @@ -812,25 +943,25 @@ def tool_calculate_national_threat( breakdown["alert_contribution"] += 5 elif any(kw in alert_lower for kw in ["advisory", "caution"]): breakdown["alert_contribution"] += 2 - + breakdown["alert_contribution"] = min(30, breakdown["alert_contribution"]) - + # 3. Seasonal contribution (max 20 points) current_month = datetime.utcnow().month monsoon_months = {5: 15, 6: 18, 10: 15, 11: 18} # High risk months inter_monsoon = {4: 8, 9: 8} # Medium risk - + if current_month in monsoon_months: breakdown["seasonal_contribution"] = monsoon_months[current_month] elif current_month in inter_monsoon: breakdown["seasonal_contribution"] = inter_monsoon[current_month] else: breakdown["seasonal_contribution"] = 3 - + # Calculate total score score = sum(breakdown.values()) score = min(100, max(0, score)) - + # Determine threat level if score >= 70: threat_level = "CRITICAL" @@ -844,7 +975,7 @@ def tool_calculate_national_threat( else: threat_level = "LOW" color = "green" - + return { "national_threat_score": score, "threat_level": threat_level, @@ -856,9 +987,9 @@ def tool_calculate_national_threat( "medium_count": len(medium_risk_districts), "critical_districts": critical_districts, "high_risk_districts": high_risk_districts, - "medium_risk_districts": medium_risk_districts + "medium_risk_districts": medium_risk_districts, }, - "calculated_at": datetime.utcnow().isoformat() + "calculated_at": datetime.utcnow().isoformat(), } @@ -866,21 +997,43 @@ def tool_calculate_national_threat( # METEOROLOGICAL TOOLS (Upgraded) # ============================================ + def tool_dmc_alerts() -> Dict[str, Any]: # ... (Existing DMC alerts code - unchanged) ... url = "http://www.meteo.gov.lk/index.php?lang=en" resp = _safe_get(url) if not resp: - return {"source": url, "alerts": ["Failed to fetch alerts from DMC."], "fetched_at": datetime.utcnow().isoformat()} + return { + "source": url, + "alerts": ["Failed to fetch alerts from DMC."], + "fetched_at": datetime.utcnow().isoformat(), + } soup = BeautifulSoup(resp.text, "html.parser") alerts: List[str] = [] - keywords = ["warning", "advisory", "alert", "heavy rain", "strong wind", "thunderstorm", "flood", "landslide", "cyclone", "severe"] + keywords = [ + "warning", + "advisory", + "alert", + "heavy rain", + "strong wind", + "thunderstorm", + "flood", + "landslide", + "cyclone", + "severe", + ] for text in soup.find_all(string=True): if len(text.strip()) > 20 and any(k in text.lower() for k in keywords): - clean = re.sub(r'\s+', ' ', text.strip()) - if clean not in alerts: alerts.append(clean) - if not alerts: alerts = ["No active severe weather alerts detected."] - return {"source": url, "alerts": alerts[:10], "fetched_at": datetime.utcnow().isoformat()} + clean = re.sub(r"\s+", " ", text.strip()) + if clean not in alerts: + alerts.append(clean) + if not alerts: + alerts = ["No active severe weather alerts detected."] + return { + "source": url, + "alerts": alerts[:10], + "fetched_at": datetime.utcnow().isoformat(), + } def tool_weather_nowcast(location: str = "Colombo") -> Dict[str, Any]: @@ -893,11 +1046,11 @@ def tool_weather_nowcast(location: str = "Colombo") -> Dict[str, Any]: """ base_url = "https://meteo.gov.lk/" city_forecast_url = "https://meteo.gov.lk/index.php?option=com_content&view=article&id=102&Itemid=360&lang=en" - + combined_report = [] html_home = "" html_city = "" - + if PLAYWRIGHT_AVAILABLE: try: with sync_playwright() as p: @@ -907,30 +1060,38 @@ def tool_weather_nowcast(location: str = "Colombo") -> Dict[str, Any]: user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) page = context.new_page() - page.set_default_timeout(60000) # Give it 60 seconds (it's slow) + page.set_default_timeout(60000) # Give it 60 seconds (it's slow) # --- A. Visit Homepage --- try: page.goto(base_url, wait_until="domcontentloaded") # Try to wait for text, but don't crash if it takes too long - try: page.wait_for_selector("div.itemFullText", timeout=15000) - except: pass + try: + page.wait_for_selector("div.itemFullText", timeout=15000) + except: + pass html_home = page.content() except Exception as e: # Even if it times out, grab what we have! - logger.warning(f"[WEATHER] Homepage timeout (capturing partial): {e}") + logger.warning( + f"[WEATHER] Homepage timeout (capturing partial): {e}" + ) html_home = page.content() # --- B. Visit City Forecast --- try: page.goto(city_forecast_url, wait_until="domcontentloaded") - try: page.wait_for_selector("table", timeout=15000) - except: pass + try: + page.wait_for_selector("table", timeout=15000) + except: + pass html_city = page.content() except Exception as e: - logger.warning(f"[WEATHER] City Forecast timeout (capturing partial): {e}") + logger.warning( + f"[WEATHER] City Forecast timeout (capturing partial): {e}" + ) html_city = page.content() - + browser.close() except Exception as e: logger.warning(f"[WEATHER] Playwright critical fail: {e}") @@ -939,7 +1100,7 @@ def tool_weather_nowcast(location: str = "Colombo") -> Dict[str, Any]: if not html_home or len(html_home) < 500: resp = _safe_get(base_url) html_home = resp.text if resp else "" - + if not html_city or len(html_city) < 500: resp = _safe_get(city_forecast_url) html_city = resp.text if resp else "" @@ -950,7 +1111,7 @@ def tool_weather_nowcast(location: str = "Colombo") -> Dict[str, Any]: # --- PARSE HOMEPAGE --- soup_home = BeautifulSoup(html_home, "html.parser") english_forecast = "" - + header = soup_home.find(string=re.compile(r"WEATHER FORECAST FOR", re.I)) if header: container = header.find_parent("div") or header.find_parent("article") @@ -959,10 +1120,16 @@ def tool_weather_nowcast(location: str = "Colombo") -> Dict[str, Any]: start = text.upper().find("WEATHER FORECAST FOR") if start != -1: english_forecast = text[start:][:2500] - + if not english_forecast: - main = soup_home.find("div", class_="itemFullText") or soup_home.find("div", itemprop="articleBody") - english_forecast = main.get_text(separator="\n", strip=True)[:2500] if main else "General forecast text not found." + main = soup_home.find("div", class_="itemFullText") or soup_home.find( + "div", itemprop="articleBody" + ) + english_forecast = ( + main.get_text(separator="\n", strip=True)[:2500] + if main + else "General forecast text not found." + ) combined_report.append("--- ISLAND-WIDE GENERAL FORECAST ---") combined_report.append(english_forecast) @@ -974,23 +1141,29 @@ def tool_weather_nowcast(location: str = "Colombo") -> Dict[str, Any]: if table: combined_report.append("\n--- DISTRICT/CITY FORECASTS ---") rows = table.find_all("tr") - + # Header logic if rows: header_row = rows[0] - headers = [th.get_text(strip=True) for th in header_row.find_all(["th", "td"])] + headers = [ + th.get_text(strip=True) for th in header_row.find_all(["th", "td"]) + ] if not "".join(headers).strip() and len(rows) > 1: - headers = [th.get_text(strip=True) for th in rows[1].find_all(["th", "td"])] - - clean_header = " | ".join(headers[:4]) + headers = [ + th.get_text(strip=True) for th in rows[1].find_all(["th", "td"]) + ] + + clean_header = " | ".join(headers[:4]) combined_report.append(clean_header) combined_report.append("-" * len(clean_header)) # Row logic for row in rows: cols = [td.get_text(strip=True) for td in row.find_all("td")] - if not cols or len(cols) < 2: continue - if "City" in cols[0] or "Temperature" in cols[0]: continue + if not cols or len(cols) < 2: + continue + if "City" in cols[0] or "Temperature" in cols[0]: + continue row_text = " | ".join(cols[:4]) combined_report.append(row_text) @@ -1000,26 +1173,28 @@ def tool_weather_nowcast(location: str = "Colombo") -> Dict[str, Any]: found_pdfs = [] for a in pdf_links: link_text = a.get_text(strip=True) - href = a['href'] - if "pdf" in href.lower() and any(k in link_text.lower() for k in ["advisory", "warning"]): + href = a["href"] + if "pdf" in href.lower() and any( + k in link_text.lower() for k in ["advisory", "warning"] + ): abs_url = _make_absolute(href, base_url) - if abs_url not in [p['url'] for p in found_pdfs]: + if abs_url not in [p["url"] for p in found_pdfs]: prio = 1 if "english" in link_text.lower() else 2 found_pdfs.append({"title": link_text, "url": abs_url, "prio": prio}) - - found_pdfs.sort(key=lambda x: x['prio']) - + + found_pdfs.sort(key=lambda x: x["prio"]) + for pdf in found_pdfs[:2]: - text = _extract_text_from_pdf_url(pdf['url']) + text = _extract_text_from_pdf_url(pdf["url"]) if "Sinhala/Tamil" not in text and len(text) > 50: - combined_report.append(f"\n--- CRITICAL ALERT: {pdf['title']} ---\n{text}") + combined_report.append(f"\n--- CRITICAL ALERT: {pdf['title']} ---\n{text}") # Final Cleanup final_text = "\n\n".join(combined_report) cleanup = ["DEPARTMENT OF METEOROLOGY", "Loading...", "Listen To The Weather"] for c in cleanup: final_text = final_text.replace(c, "") - + return { "location": "All Districts", "forecast": final_text, @@ -1036,17 +1211,17 @@ LOCAL_NEWS_SITES = [ { "url": "https://www.dailymirror.lk/", "name": "Daily Mirror", - "article_selector": "article, .news-block, .article, .card" + "article_selector": "article, .news-block, .article, .card", }, { "url": "https://www.ft.lk/", "name": "Daily FT", - "article_selector": "article, .article-list-item, .card" + "article_selector": "article, .article-list-item, .card", }, { "url": "https://www.newsfirst.lk/", "name": "News First", - "article_selector": ".post, article, .news-block" + "article_selector": ".post, article, .news-block", }, ] @@ -1066,8 +1241,12 @@ def scrape_local_news_impl( articles = soup.select(site.get("article_selector", "article")) for article in articles: title_elem = ( - article.find("h1") or article.find("h2") or article.find("h3") - or article.find(class_=re.compile(r"(title|headline|heading)", re.I)) + article.find("h1") + or article.find("h2") + or article.find("h3") + or article.find( + class_=re.compile(r"(title|headline|heading)", re.I) + ) ) title = title_elem.get_text(strip=True) if title_elem else "" if not title or len(title) < 8: @@ -1080,16 +1259,22 @@ def scrape_local_news_impl( link_elem = article.find("a", href=True) href = link_elem["href"] if link_elem else site["url"] href = _make_absolute(href, site["url"]) - snippet_elem = article.find("p") or article.find(class_=re.compile(r"(excerpt|summary|description)", re.I)) - snippet = snippet_elem.get_text(strip=True)[:300] if snippet_elem else "" - results.append({ - "source": site["name"], - "source_url": site["url"], - "headline": title, - "snippet": snippet, - "url": href, - "timestamp": datetime.utcnow().isoformat(), - }) + snippet_elem = article.find("p") or article.find( + class_=re.compile(r"(excerpt|summary|description)", re.I) + ) + snippet = ( + snippet_elem.get_text(strip=True)[:300] if snippet_elem else "" + ) + results.append( + { + "source": site["name"], + "source_url": site["url"], + "headline": title, + "snippet": snippet, + "url": href, + "timestamp": datetime.utcnow().isoformat(), + } + ) if len(results) >= max_articles: return results except Exception as e: @@ -1102,20 +1287,37 @@ def scrape_local_news_impl( # REDDIT SCRAPING # ============================================ + def scrape_reddit_impl( keywords: List[str], limit: int = 20, subreddit: Optional[str] = None, ) -> List[Dict[str, Any]]: - base = f"https://www.reddit.com/r/{subreddit}/search.json" if subreddit else "https://www.reddit.com/search.json" + base = ( + f"https://www.reddit.com/r/{subreddit}/search.json" + if subreddit + else "https://www.reddit.com/search.json" + ) query = " ".join(keywords) if keywords else "Sri Lanka" - params = {"q": query, "sort": "new", "limit": str(limit), "restrict_sr": "on" if subreddit else "off"} - headers = {"User-Agent": DEFAULT_HEADERS["User-Agent"], "Accept": "application/json"} + params = { + "q": query, + "sort": "new", + "limit": str(limit), + "restrict_sr": "on" if subreddit else "off", + } + headers = { + "User-Agent": DEFAULT_HEADERS["User-Agent"], + "Accept": "application/json", + } try: - resp = requests.get(base, headers=headers, params=params, timeout=DEFAULT_TIMEOUT) + resp = requests.get( + base, headers=headers, params=params, timeout=DEFAULT_TIMEOUT + ) if resp.status_code != 200: logger.warning(f"[REDDIT] HTTP {resp.status_code} for {base}") - return [{"error": f"Reddit returned status {resp.status_code}", "query": query}] + return [ + {"error": f"Reddit returned status {resp.status_code}", "query": query} + ] data = resp.json() posts_raw = data.get("data", {}).get("children", []) posts: List[Dict[str, Any]] = [] @@ -1126,18 +1328,24 @@ def scrape_reddit_impl( text = f"{title}\n{selftext}" if not _contains_keyword(text, keywords): continue - posts.append({ - "id": d.get("id"), - "title": title, - "selftext": selftext[:500], - "subreddit": d.get("subreddit"), - "author": d.get("author"), - "score": d.get("score", 0), - "url": "https://www.reddit.com" + d.get("permalink", ""), - "created_utc": d.get("created_utc"), - "num_comments": d.get("num_comments", 0), - }) - return posts if posts else [{"note": f"No Reddit posts found for: {query}", "query": query}] + posts.append( + { + "id": d.get("id"), + "title": title, + "selftext": selftext[:500], + "subreddit": d.get("subreddit"), + "author": d.get("author"), + "score": d.get("score", 0), + "url": "https://www.reddit.com" + d.get("permalink", ""), + "created_utc": d.get("created_utc"), + "num_comments": d.get("num_comments", 0), + } + ) + return ( + posts + if posts + else [{"note": f"No Reddit posts found for: {query}", "query": query}] + ) except Exception as e: logger.error(f"[REDDIT] Error: {e}") return [{"error": str(e), "query": query}] @@ -1147,6 +1355,7 @@ def scrape_reddit_impl( # CSE / STOCK DATA # ============================================ + def _scrape_cse_website_data(symbol: str) -> Optional[Dict[str, Any]]: """ Scrape stock data directly from CSE website. @@ -1157,12 +1366,12 @@ def _scrape_cse_website_data(symbol: str) -> Optional[Dict[str, Any]]: resp = _safe_get(cse_url, timeout=30) if not resp: return None - + soup = BeautifulSoup(resp.text, "html.parser") text = soup.get_text(separator="\n", strip=True) - + result_data = {} - + # Pattern for ASPI (All Share Price Index) # CSE website typically shows: "ASPI 12,345.67 +123.45 (+1.01%)" aspi_patterns = [ @@ -1170,7 +1379,7 @@ def _scrape_cse_website_data(symbol: str) -> Optional[Dict[str, Any]]: r"All\s*Share\s*(?:Price\s*)?Index[:\s]*([\d,]+\.?\d*)", r"ASPI[^\d\n\r]*([\d,]+\.\d+)", ] - + for pattern in aspi_patterns: m = re.search(pattern, text, re.I) if m: @@ -1178,19 +1387,27 @@ def _scrape_cse_website_data(symbol: str) -> Optional[Dict[str, Any]]: value = float(m.group(1).replace(",", "")) result_data["aspi"] = { "value": value, - "change": float(m.group(2).replace(",", "")) if len(m.groups()) > 1 and m.group(2) else None, - "change_pct": float(m.group(3).replace(",", "").replace("%", "")) if len(m.groups()) > 2 and m.group(3) else None, + "change": ( + float(m.group(2).replace(",", "")) + if len(m.groups()) > 1 and m.group(2) + else None + ), + "change_pct": ( + float(m.group(3).replace(",", "").replace("%", "")) + if len(m.groups()) > 2 and m.group(3) + else None + ), } break except (ValueError, IndexError): continue - + # Pattern for S&P SL20 index sp_patterns = [ r"S&?P\s*SL\s*20[:\s]*([\d,]+\.?\d*)", r"SL20[:\s]*([\d,]+\.?\d*)", ] - + for pattern in sp_patterns: m = re.search(pattern, text, re.I) if m: @@ -1199,18 +1416,20 @@ def _scrape_cse_website_data(symbol: str) -> Optional[Dict[str, Any]]: break except ValueError: continue - + # Check if we got any useful data if result_data: return result_data - - # Fallback: simple ASPI pattern - m = re.search(r"(ASPI|All Share Price Index)[^\d\n\r]*([\d,]+\.\d+)", text, re.I) + + # Fallback: simple ASPI pattern + m = re.search( + r"(ASPI|All Share Price Index)[^\d\n\r]*([\d,]+\.\d+)", text, re.I + ) if m: return {"aspi": {"value": float(m.group(2).replace(",", ""))}} - + return None - + except Exception as e: logger.debug(f"[CSE] Direct CSE scrape failed: {e}") return None @@ -1225,19 +1444,19 @@ def scrape_cse_stock_impl( Fetch CSE stock data with multiple fallback strategies: 1. First try direct CSE website scraping (most reliable for Sri Lankan stocks) 2. Fall back to yfinance if direct scraping fails - - Note: yfinance often fails for CSE symbols as Yahoo Finance has limited + + Note: yfinance often fails for CSE symbols as Yahoo Finance has limited coverage of the Colombo Stock Exchange. """ symbol_upper = symbol.upper() is_index = symbol_upper in ("ASPI", "ASPI.N0000", "^N0000", "ALL SHARE") - + # ============ Strategy 1: Direct CSE Website Scraping ============ # This is more reliable for Sri Lankan market data if is_index: logger.info(f"[CSE] Attempting direct CSE website scrape for {symbol}...") cse_data = _scrape_cse_website_data(symbol) - + if cse_data and "aspi" in cse_data: aspi_info = cse_data["aspi"] summary = { @@ -1245,12 +1464,14 @@ def scrape_cse_stock_impl( "change": aspi_info.get("change"), "change_pct": aspi_info.get("change_pct"), } - + # Add S&P SL20 if available if "sp_sl20" in cse_data: summary["sp_sl20"] = cse_data["sp_sl20"] - - logger.info(f"[CSE] Successfully scraped ASPI from CSE website: {summary['current_price']}") + + logger.info( + f"[CSE] Successfully scraped ASPI from CSE website: {summary['current_price']}" + ) return { "symbol": symbol, "resolved_symbol": "CSE-direct", @@ -1262,7 +1483,7 @@ def scrape_cse_stock_impl( "note": "Real-time data from Colombo Stock Exchange website", "fetched_at": datetime.utcnow().isoformat(), } - + # ============ Strategy 2: yfinance (Fallback) ============ # Note: This frequently fails for CSE stocks symbols_to_try = [symbol] @@ -1271,26 +1492,26 @@ def scrape_cse_stock_impl( elif not symbol.endswith(".N0000") and not symbol.startswith("^"): # Try both with and without .N0000 suffix for regular stocks symbols_to_try = [f"{symbol}.N0000", symbol] - + logger.info(f"[CSE] Trying yfinance for symbols: {symbols_to_try}") - + for sym in symbols_to_try: try: ticker = yf.Ticker(sym) hist = ticker.history(period=period, interval=interval) - + if hist is None or hist.empty: logger.debug(f"[CSE] yfinance returned empty data for {sym}") continue - + hist = hist.reset_index() records = hist.to_dict(orient="records") - + for record in records: for key, value in list(record.items()): if hasattr(value, "isoformat"): record[key] = value.isoformat() - + latest = records[-1] if records else {} summary = { "current_price": latest.get("Close", latest.get("close", 0)), @@ -1299,7 +1520,7 @@ def scrape_cse_stock_impl( "low": latest.get("Low", latest.get("low", 0)), "volume": latest.get("Volume", latest.get("volume", 0)), } - + logger.info(f"[CSE] yfinance success for {sym}: {summary['current_price']}") return { "symbol": symbol, @@ -1311,15 +1532,15 @@ def scrape_cse_stock_impl( "source": "yahoo_finance", "fetched_at": datetime.utcnow().isoformat(), } - + except Exception as e_inner: logger.debug(f"[CSE] yfinance attempt failed for {sym}: {e_inner}") continue - + # ============ Final Fallback: Try CSE website again for any symbol ============ logger.info(f"[CSE] All yfinance attempts failed, trying CSE website fallback...") cse_data = _scrape_cse_website_data(symbol) - + if cse_data and "aspi" in cse_data: return { "symbol": symbol, @@ -1331,11 +1552,11 @@ def scrape_cse_stock_impl( "source": "cse.lk (fallback scrape)", "fetched_at": datetime.utcnow().isoformat(), } - + # All strategies failed logger.warning(f"[CSE] All data sources failed for {symbol}") return { - "symbol": symbol, + "symbol": symbol, "error": f"Could not fetch data for {symbol}. Yahoo Finance has limited CSE coverage.", "attempted_symbols": symbols_to_try, "suggestion": "Try accessing cse.lk directly for real-time CSE data", @@ -1343,11 +1564,11 @@ def scrape_cse_stock_impl( } - # ============================================ # GOVERNMENT GAZETTE (Deep Scraping) # ============================================ + def scrape_government_gazette_impl( keywords: Optional[List[str]] = None, max_items: int = 15, @@ -1355,72 +1576,78 @@ def scrape_government_gazette_impl( """ Scrapes gazette.lk for latest government gazettes. ENHANCED: Now downloads PDFs and extracts text content from them. - + Args: keywords: Optional list of keywords to filter gazettes (currently ignored) max_items: Maximum number of gazette entries to process - + Returns: List of gazette entries with PDF content extracted """ base_url = "https://www.gazette.lk/government-gazette" results: List[Dict[str, Any]] = [] - + logger.info(f"[GAZETTE] Fetching latest gazettes from {base_url}") resp = _safe_get(base_url) if not resp: - return [{ - "title": "Failed to access gazette.lk", - "url": base_url, - "error": "Network request failed", - "timestamp": datetime.utcnow().isoformat() - }] - + return [ + { + "title": "Failed to access gazette.lk", + "url": base_url, + "error": "Network request failed", + "timestamp": datetime.utcnow().isoformat(), + } + ] + soup = BeautifulSoup(resp.text, "html.parser") - + # Find all gazette article entries articles = soup.find_all("article") if not articles: articles = soup.select(".post, .type-post, .entry") - + logger.info(f"[GAZETTE] Found {len(articles)} potential gazette entries") - + for article in articles: if len(results) >= max_items: break - + # Extract title and link - title_elem = article.find(class_="entry-title") or article.find("h2") or article.find("h3") + title_elem = ( + article.find(class_="entry-title") + or article.find("h2") + or article.find("h3") + ) if not title_elem: continue - + link_elem = title_elem.find("a", href=True) if not link_elem: continue - + title = link_elem.get_text(strip=True) post_url = link_elem["href"] post_url_abs = _make_absolute(post_url, base_url) - + # Filter to only include actual gazette entries (not other site content) if "government gazette" not in title.lower(): continue - + # Extract date from title if possible - date_match = re.search(r'(\d{4}\s+\w+\s+\d{1,2})', title) + date_match = re.search(r"(\d{4}\s+\w+\s+\d{1,2})", title) date_str = date_match.group(1) if date_match else "Unknown date" - + logger.info(f"[GAZETTE] Processing: {title[:50]}...") - + # ENHANCED: Visit the detail page to find all PDF links pdf_links = [] pdf_content = [] - + try: detail_resp = _safe_get(post_url_abs) if detail_resp: detail_soup = BeautifulSoup(detail_resp.text, "html.parser") - + # FIXED: First look for pdfemb-viewer class links (gazette.lk specific) # These have direct PDF URLs like https://www.gazette.lk/dl/Gazette/11/Gazette-2025-11-28E.pdf pdfemb_links = detail_soup.find_all("a", class_="pdfemb-viewer") @@ -1434,28 +1661,31 @@ def scrape_government_gazette_impl( language = "sinhala" elif href.endswith("T.pdf") or "tamil" in href_lower: language = "tamil" - + pdf_url = _make_absolute(href, post_url_abs) - pdf_links.append({ - "language": language, - "url": pdf_url, - "text": link.get_text(strip=True) or f"Gazette PDF ({language})" - }) + pdf_links.append( + { + "language": language, + "url": pdf_url, + "text": link.get_text(strip=True) + or f"Gazette PDF ({language})", + } + ) logger.info(f"[GAZETTE] Found pdfemb-viewer link: {pdf_url}") - + # Also look for any other direct PDF links (backup approach) if not pdf_links: for link in detail_soup.find_all("a", href=True): href = link["href"] link_text = link.get_text(strip=True).lower() - + # Check for direct PDF download paths is_gazette_pdf = "/dl/Gazette/" in href is_pdf_file = href.lower().endswith(".pdf") - + if is_gazette_pdf or is_pdf_file: pdf_url = _make_absolute(href, post_url_abs) - + # Detect language language = "english" if "sinhala" in link_text or href.endswith("S.pdf"): @@ -1464,50 +1694,67 @@ def scrape_government_gazette_impl( language = "tamil" elif href.endswith("E.pdf") or "english" in link_text: language = "english" - + # Avoid duplicates if not any(p["url"] == pdf_url for p in pdf_links): - pdf_links.append({ - "language": language, - "url": pdf_url, - "text": link.get_text(strip=True) or f"PDF ({language})" - }) - - logger.info(f"[GAZETTE] Found {len(pdf_links)} PDF links on detail page") - + pdf_links.append( + { + "language": language, + "url": pdf_url, + "text": link.get_text(strip=True) + or f"PDF ({language})", + } + ) + + logger.info( + f"[GAZETTE] Found {len(pdf_links)} PDF links on detail page" + ) + # ENHANCED: Download and extract text from English PDFs (most useful) english_pdfs = [p for p in pdf_links if p["language"] == "english"] if not english_pdfs: english_pdfs = pdf_links[:1] # Fallback to first PDF - + for pdf_info in english_pdfs[:2]: # Limit to 2 PDFs per gazette try: - logger.info(f"[GAZETTE] Downloading PDF: {pdf_info['url'][:60]}...") + logger.info( + f"[GAZETTE] Downloading PDF: {pdf_info['url'][:60]}..." + ) extracted_text = _extract_text_from_pdf_url(pdf_info["url"]) - + if extracted_text and not extracted_text.startswith("["): - pdf_content.append({ - "language": pdf_info["language"], - "content": extracted_text[:5000], # Limit content length - "source_url": pdf_info["url"] - }) - logger.info(f"[GAZETTE] Extracted {len(extracted_text)} chars from PDF") + pdf_content.append( + { + "language": pdf_info["language"], + "content": extracted_text[ + :5000 + ], # Limit content length + "source_url": pdf_info["url"], + } + ) + logger.info( + f"[GAZETTE] Extracted {len(extracted_text)} chars from PDF" + ) else: - pdf_content.append({ - "language": pdf_info["language"], - "content": extracted_text, - "source_url": pdf_info["url"] - }) + pdf_content.append( + { + "language": pdf_info["language"], + "content": extracted_text, + "source_url": pdf_info["url"], + } + ) except Exception as e: logger.warning(f"[GAZETTE] PDF extraction error: {e}") - pdf_content.append({ - "language": pdf_info.get("language", "unknown"), - "content": f"[Error extracting PDF: {str(e)}]", - "source_url": pdf_info.get("url", "") - }) + pdf_content.append( + { + "language": pdf_info.get("language", "unknown"), + "content": f"[Error extracting PDF: {str(e)}]", + "source_url": pdf_info.get("url", ""), + } + ) except Exception as e: logger.warning(f"[GAZETTE] Error fetching detail page: {e}") - + # Build the result with extracted content result_entry = { "title": title, @@ -1517,79 +1764,87 @@ def scrape_government_gazette_impl( "extracted_content": pdf_content, "timestamp": datetime.utcnow().isoformat(), } - + # Add a summary if we have content if pdf_content: first_content = pdf_content[0].get("content", "") if first_content and not first_content.startswith("["): result_entry["summary"] = first_content[:500] - + results.append(result_entry) logger.info(f"[GAZETTE] Added gazette with {len(pdf_content)} PDF extractions") - + if not results: - return [{ - "title": "No gazette entries found", - "url": base_url, - "note": "The website structure may have changed", - "timestamp": datetime.utcnow().isoformat(), - }] - - logger.info(f"[GAZETTE] Successfully scraped {len(results)} gazette entries with PDF content") - return results + return [ + { + "title": "No gazette entries found", + "url": base_url, + "note": "The website structure may have changed", + "timestamp": datetime.utcnow().isoformat(), + } + ] + logger.info( + f"[GAZETTE] Successfully scraped {len(results)} gazette entries with PDF content" + ) + return results # ============================================ # PARLIAMENT MINUTES # ============================================ + def scrape_parliament_minutes_impl( keywords: Optional[List[str]] = None, max_items: int = 20, ) -> List[Dict[str, Any]]: """ Scrape Sri Lankan Parliament Hansards from parliament.lk. - + ENHANCED: Now properly extracts Hansard PDF links with dates and metadata. The website stores PDFs at /uploads/businessdocs/ with date-encoded filenames. - + Args: keywords: Optional keywords to filter results max_items: Maximum number of items to return - + Returns: List of Hansard entries with PDF links and dates """ url = "https://www.parliament.lk/en/business-of-parliament/hansards" - + logger.info(f"[PARLIAMENT] Fetching Hansards from {url}") resp = _safe_get(url) - + if not resp: - return [{ - "title": "Parliament website unavailable", - "url": url, - "note": "Could not access parliament.lk. Site may be down.", - "timestamp": datetime.utcnow().isoformat(), - }] - + return [ + { + "title": "Parliament website unavailable", + "url": url, + "note": "Could not access parliament.lk. Site may be down.", + "timestamp": datetime.utcnow().isoformat(), + } + ] + soup = BeautifulSoup(resp.text, "html.parser") results: List[Dict[str, Any]] = [] - + # Strategy 1: Look for PDF links in /uploads/businessdocs/ (Hansard documents) - pdf_links = soup.find_all("a", href=lambda x: x and ".pdf" in x.lower() and "businessdocs" in x.lower()) - + pdf_links = soup.find_all( + "a", href=lambda x: x and ".pdf" in x.lower() and "businessdocs" in x.lower() + ) + logger.info(f"[PARLIAMENT] Found {len(pdf_links)} Hansard PDF links") - + for link in pdf_links: href = link.get("href", "") link_text = link.get_text(strip=True) - + # Extract date from URL (e.g., 22912_english_2025-11-17.pdf) date_match = re.search(r"(\d{4}-\d{2}-\d{2})", href) date_str = date_match.group(1) if date_match else None - + # Extract language from URL language = "english" href_lower = href.lower() @@ -1597,34 +1852,36 @@ def scrape_parliament_minutes_impl( language = "sinhala" elif "tamil" in href_lower: language = "tamil" - + # Extract document ID from URL doc_id_match = re.search(r"/(\d+)_", href) doc_id = doc_id_match.group(1) if doc_id_match else None - + # Build title if date_str: title = f"Hansard - {date_str} ({language.capitalize()})" else: title = f"Hansard ({language.capitalize()})" - + # Find parent element for additional context parent = link.find_parent(["tr", "li", "div", "article"]) if parent: parent_text = parent.get_text(separator=" ", strip=True) # Look for session info in parent - session_match = re.search(r"(Session|Sitting|Day)\s*[:\-]?\s*(\d+)", parent_text, re.I) + session_match = re.search( + r"(Session|Sitting|Day)\s*[:\-]?\s*(\d+)", parent_text, re.I + ) if session_match: title += f" - {session_match.group(0)}" - + # Apply keyword filter if specified full_text = f"{title} {href} {link_text}" if keywords and not _contains_keyword(full_text, keywords): continue - + # Construct absolute URL pdf_url = _make_absolute(href, url) - + entry = { "title": title, "url": pdf_url, @@ -1634,57 +1891,63 @@ def scrape_parliament_minutes_impl( "link_text": link_text, "timestamp": datetime.utcnow().isoformat(), } - + # Avoid duplicates (same doc, different language links) if not any(r.get("url") == pdf_url for r in results): results.append(entry) - + if len(results) >= max_items: break - + # Strategy 2: If no PDFs found, fall back to general link search if not results: logger.info("[PARLIAMENT] No PDF links found, trying general link search...") for a in soup.find_all("a", href=True): title = a.get_text(strip=True) href = a["href"] - + if not title or len(title) < 6: continue - + # Must match hansard-related keywords combined = f"{title} {href}".lower() - if not re.search(r"(hansard|minutes|debate|transcript|proceedings)", combined): + if not re.search( + r"(hansard|minutes|debate|transcript|proceedings)", combined + ): continue - + # Apply user keyword filter if keywords and not _contains_keyword(title, keywords): continue - + href_abs = _make_absolute(href, url) - + # Avoid duplicates if any(r.get("url") == href_abs for r in results): continue - - results.append({ - "title": title, - "url": href_abs, - "timestamp": datetime.utcnow().isoformat(), - }) - + + results.append( + { + "title": title, + "url": href_abs, + "timestamp": datetime.utcnow().isoformat(), + } + ) + if len(results) >= max_items: break - + if not results: - return [{ - "title": "No parliament Hansards found", - "url": url, - "keywords": keywords, - "note": "The website structure may have changed or no matching documents found.", - "timestamp": datetime.utcnow().isoformat(), - }] - + return [ + { + "title": "No parliament Hansards found", + "url": url, + "keywords": keywords, + "note": "The website structure may have changed or no matching documents found.", + "timestamp": datetime.utcnow().isoformat(), + } + ] + logger.info(f"[PARLIAMENT] Successfully scraped {len(results)} Hansard entries") return results @@ -1693,6 +1956,7 @@ def scrape_parliament_minutes_impl( # TRAIN SCHEDULE # ============================================ + def scrape_train_schedule_impl( from_station: Optional[str] = None, to_station: Optional[str] = None, @@ -1702,11 +1966,13 @@ def scrape_train_schedule_impl( url = "https://eservices.railway.gov.lk/schedule/homeAction.action?lang=en" resp = _safe_get(url) if not resp: - return [{ - "train": "Railway website unavailable", - "note": "Could not access railway.gov.lk", - "timestamp": datetime.utcnow().isoformat(), - }] + return [ + { + "train": "Railway website unavailable", + "note": "Could not access railway.gov.lk", + "timestamp": datetime.utcnow().isoformat(), + } + ] soup = BeautifulSoup(resp.text, "html.parser") tables = soup.find_all("table") results: List[Dict[str, Any]] = [] @@ -1733,11 +1999,13 @@ def scrape_train_schedule_impl( if len(results) >= max_items: break if not results: - return [{ - "train": "No train schedules found", - "note": "Railway schedule unavailable or no matches", - "timestamp": datetime.utcnow().isoformat(), - }] + return [ + { + "train": "No train schedules found", + "note": "Railway schedule unavailable or no matches", + "timestamp": datetime.utcnow().isoformat(), + } + ] return results @@ -1745,7 +2013,10 @@ def scrape_train_schedule_impl( # TWITTER TRENDING # ============================================ -def _scrape_twitter_trending_with_playwright(storage_state_path: Optional[str] = None, headless: bool = True) -> List[Dict[str, Any]]: + +def _scrape_twitter_trending_with_playwright( + storage_state_path: Optional[str] = None, headless: bool = True +) -> List[Dict[str, Any]]: ensure_playwright() trending = [] with sync_playwright() as p: @@ -1753,16 +2024,24 @@ def _scrape_twitter_trending_with_playwright(storage_state_path: Optional[str] = context_args = {} if storage_state_path and os.path.exists(storage_state_path): context_args["storage_state"] = storage_state_path - + context = browser.new_context(**context_args) page = context.new_page() try: - page.goto("https://twitter.com/i/trends", wait_until="networkidle", timeout=30000) + page.goto( + "https://twitter.com/i/trends", wait_until="networkidle", timeout=30000 + ) if "login" in page.url or page.content().strip() == "": - page.goto("https://twitter.com/explore/tabs/trending", wait_until="networkidle", timeout=30000) + page.goto( + "https://twitter.com/explore/tabs/trending", + wait_until="networkidle", + timeout=30000, + ) html = page.content() soup = BeautifulSoup(html, "html.parser") - items = soup.select("div[role='article'] a, div[data-testid='trend'], div.trend-card, span.trend-name") + items = soup.select( + "div[role='article'] a, div[data-testid='trend'], div.trend-card, span.trend-name" + ) seen = set() for it in items: text = it.get_text(separator=" ", strip=True) @@ -1772,8 +2051,17 @@ def _scrape_twitter_trending_with_playwright(storage_state_path: Optional[str] = if text in seen: continue seen.add(text) - trending.append({"trend": text, "url": _make_absolute(href, "https://twitter.com") if href else None}) - + trending.append( + { + "trend": text, + "url": ( + _make_absolute(href, "https://twitter.com") + if href + else None + ), + } + ) + if not trending: for tag in soup.find_all(string=re.compile(r"#\w+")): t = tag.strip() @@ -1792,7 +2080,9 @@ def _scrape_twitter_trending_with_playwright(storage_state_path: Optional[str] = browser.close() -def _scrape_twitter_trending_with_nitter(instance: str = "https://nitter.net") -> List[Dict[str, Any]]: +def _scrape_twitter_trending_with_nitter( + instance: str = "https://nitter.net", +) -> List[Dict[str, Any]]: trends = [] try: search_url = f"{instance}/search?f=tweets&q=Sri%20Lanka%20trend" @@ -1814,70 +2104,110 @@ def _scrape_twitter_trending_with_nitter(instance: str = "https://nitter.net") - return [] -def scrape_twitter_trending_srilanka(use_playwright: bool = True, storage_state_site: Optional[str] = None) -> Dict[str, Any]: +def scrape_twitter_trending_srilanka( + use_playwright: bool = True, storage_state_site: Optional[str] = None +) -> Dict[str, Any]: if use_playwright and PLAYWRIGHT_AVAILABLE: storage_state = None if storage_state_site: storage_state = load_playwright_storage_state_path(storage_state_site) try: - trends = _scrape_twitter_trending_with_playwright(storage_state_path=storage_state) + trends = _scrape_twitter_trending_with_playwright( + storage_state_path=storage_state + ) if trends: - return {"source": "twitter_playwright", "trends": trends, "fetched_at": datetime.utcnow().isoformat()} + return { + "source": "twitter_playwright", + "trends": trends, + "fetched_at": datetime.utcnow().isoformat(), + } except Exception as e: logger.debug(f"[TWITTER] Playwright attempt failed: {e}") - nitter_instances = ["https://nitter.net", "https://nitter.snopyta.org", "https://nitter.1d4.us"] + nitter_instances = [ + "https://nitter.net", + "https://nitter.snopyta.org", + "https://nitter.1d4.us", + ] for inst in nitter_instances: try: trends = _scrape_twitter_trending_with_nitter(inst) if trends: - return {"source": inst, "trends": trends, "fetched_at": datetime.utcnow().isoformat()} + return { + "source": inst, + "trends": trends, + "fetched_at": datetime.utcnow().isoformat(), + } except Exception: continue - return {"source": "none", "trends": [], "note": "Could not fetch Twitter trends. Try supplying Playwright session or check network."} + return { + "source": "none", + "trends": [], + "note": "Could not fetch Twitter trends. Try supplying Playwright session or check network.", + } # ============================================ # AUTHENTICATED SCRAPERS # ============================================ + def scrape_authenticated_page_via_playwright( site_name: str, url: str, login_flow: Optional[dict] = None, headless: bool = True, storage_dir: str = ".sessions", - wait_until: str = "networkidle" + wait_until: str = "networkidle", ) -> Dict[str, Any]: if not PLAYWRIGHT_AVAILABLE: - return {"error": "Playwright not available. Install playwright to use authenticated scrapers."} - + return { + "error": "Playwright not available. Install playwright to use authenticated scrapers." + } + session_path = load_playwright_storage_state_path(site_name, storage_dir) - + if not session_path: if not login_flow: - return {"error": f"No existing session found for {site_name} and no login_flow provided to create one."} + return { + "error": f"No existing session found for {site_name} and no login_flow provided to create one." + } try: - session_path = create_or_restore_playwright_session(site_name, login_flow=login_flow, headless=headless, storage_dir=storage_dir, wait_until=wait_until) + session_path = create_or_restore_playwright_session( + site_name, + login_flow=login_flow, + headless=headless, + storage_dir=storage_dir, + wait_until=wait_until, + ) except Exception as e: return {"error": f"Failed to create Playwright session: {e}"} - - html = playwright_fetch_html_using_session(url, session_path, headless=headless, wait_until=wait_until) + + html = playwright_fetch_html_using_session( + url, session_path, headless=headless, wait_until=wait_until + ) if not html: - return {"error": "Failed to fetch page via Playwright session.", "storage_state": session_path} + return { + "error": "Failed to fetch page via Playwright session.", + "storage_state": session_path, + } return {"html": html, "source": url, "storage_state": session_path} -def _simple_parse_posts_from_html(html: str, base_url: str, max_items: int = 10) -> List[Dict[str, Any]]: +def _simple_parse_posts_from_html( + html: str, base_url: str, max_items: int = 10 +) -> List[Dict[str, Any]]: soup = BeautifulSoup(html, "html.parser") items: List[Dict[str, Any]] = [] - candidates = soup.select("article, div.post, div.feed-item, li.stream-item, div._4ikz") + candidates = soup.select( + "article, div.post, div.feed-item, li.stream-item, div._4ikz" + ) if not candidates: candidates = soup.find_all(["article", "div"], limit=200) seen = set() for c in candidates: - title_tag = (c.find("h1") or c.find("h2") or c.find("h3") or c.find("a")) + title_tag = c.find("h1") or c.find("h2") or c.find("h3") or c.find("a") if not title_tag: continue title = title_tag.get_text(strip=True) @@ -1898,7 +2228,6 @@ def _simple_parse_posts_from_html(html: str, base_url: str, max_items: int = 10) # ============================================ - def clean_linkedin_text(text): if not text: return "" @@ -1913,6 +2242,7 @@ def clean_linkedin_text(text): return text.strip() + @tool def scrape_linkedin(keywords: Optional[List[str]] = None, max_items: int = 10): """ @@ -1920,28 +2250,42 @@ def scrape_linkedin(keywords: Optional[List[str]] = None, max_items: int = 10): Requires environment variables: LINKEDIN_USER, LINKEDIN_PASSWORD (if creating session). """ ensure_playwright() - + # 1. Load Session site = "linkedin" - session_path = load_playwright_storage_state_path(site, out_dir="src/utils/.sessions") + session_path = load_playwright_storage_state_path( + site, out_dir="src/utils/.sessions" + ) if not session_path: session_path = load_playwright_storage_state_path(site, out_dir=".sessions") - + # If no session, try to create one if not session_path: login_flow = { "login_url": "https://www.linkedin.com/login", "steps": [ - {"type": "fill", "selector": 'input[name="session_key"]', "value_env": "LINKEDIN_USER"}, - {"type": "fill", "selector": 'input[name="session_password"]', "value_env": "LINKEDIN_PASSWORD"}, + { + "type": "fill", + "selector": 'input[name="session_key"]', + "value_env": "LINKEDIN_USER", + }, + { + "type": "fill", + "selector": 'input[name="session_password"]', + "value_env": "LINKEDIN_PASSWORD", + }, {"type": "click", "selector": 'button[type="submit"]'}, - {"type": "wait", "selector": 'nav', "timeout": 20000} - ] + {"type": "wait", "selector": "nav", "timeout": 20000}, + ], } try: - session_path = create_or_restore_playwright_session(site, login_flow=login_flow, headless=True) + session_path = create_or_restore_playwright_session( + site, login_flow=login_flow, headless=True + ) except Exception as e: - return json.dumps({"error": f"No session found and failed to create one: {e}"}) + return json.dumps( + {"error": f"No session found and failed to create one: {e}"} + ) keyword = " ".join(keywords) if keywords else "Sri Lanka" results = [] @@ -1958,33 +2302,42 @@ def scrape_linkedin(keywords: Optional[List[str]] = None, max_items: int = 10): headless=True, args=[ "--disable-blink-features=AutomationControlled", - "--start-maximized" - ] + "--start-maximized", + ], ) - + context = browser.new_context( - storage_state=session_path, - user_agent=desktop_ua, - no_viewport=True + storage_state=session_path, user_agent=desktop_ua, no_viewport=True ) - + page = context.new_page() - page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + page.add_init_script( + "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" + ) url = f"https://www.linkedin.com/search/results/content/?keywords={keyword.replace(' ', '%20')}&origin=GLOBAL_SEARCH_HEADER" - + try: logger.info(f"[LINKEDIN] Navigating to {url}") page.goto(url, timeout=60000, wait_until="domcontentloaded") except Exception as e: - logger.warning(f"[LINKEDIN] Page load timed out (or other error), attempting to proceed: {e}") + logger.warning( + f"[LINKEDIN] Page load timed out (or other error), attempting to proceed: {e}" + ) page.wait_for_timeout(random.randint(4000, 7000)) try: - if page.locator("a[href*='login']").is_visible() or "auth_wall" in page.url: - logger.error("[LINKEDIN] Session invalid. Redirected to login/auth wall.") - return json.dumps({"error": "Session invalid. Please refresh session."}) + if ( + page.locator("a[href*='login']").is_visible() + or "auth_wall" in page.url + ): + logger.error( + "[LINKEDIN] Session invalid. Redirected to login/auth wall." + ) + return json.dumps( + {"error": "Session invalid. Please refresh session."} + ) except: pass @@ -1992,9 +2345,13 @@ def scrape_linkedin(keywords: Optional[List[str]] = None, max_items: int = 10): no_new_data_count = 0 previous_height = 0 - POST_CONTAINER_SELECTOR = "div.feed-shared-update-v2, li.artdeco-card" - TEXT_SELECTOR = "div.update-components-text span.break-words, span.break-words" - SEE_MORE_SELECTOR = "button.feed-shared-inline-show-more-text__see-more-less-toggle" + POST_CONTAINER_SELECTOR = "div.feed-shared-update-v2, li.artdeco-card" + TEXT_SELECTOR = ( + "div.update-components-text span.break-words, span.break-words" + ) + SEE_MORE_SELECTOR = ( + "button.feed-shared-inline-show-more-text__see-more-less-toggle" + ) POSTER_SELECTOR = "span.update-components-actor__name span[dir='ltr']" while len(results) < max_items: @@ -2002,42 +2359,60 @@ def scrape_linkedin(keywords: Optional[List[str]] = None, max_items: int = 10): see_more_buttons = page.locator(SEE_MORE_SELECTOR).all() for btn in see_more_buttons: if btn.is_visible(): - try: btn.click(timeout=500) - except: pass - except: pass + try: + btn.click(timeout=500) + except: + pass + except: + pass if len(results) == 0: - try: page.locator(POST_CONTAINER_SELECTOR).first.wait_for(timeout=5000) - except: logger.warning("[LINKEDIN] No posts found on page yet.") + try: + page.locator(POST_CONTAINER_SELECTOR).first.wait_for( + timeout=5000 + ) + except: + logger.warning("[LINKEDIN] No posts found on page yet.") posts = page.locator(POST_CONTAINER_SELECTOR).all() - + for post in posts: - if len(results) >= max_items: break + if len(results) >= max_items: + break try: post.scroll_into_view_if_needed() raw_text = "" text_el = post.locator(TEXT_SELECTOR).first - if text_el.is_visible(): raw_text = text_el.inner_text() - else: raw_text = post.locator("div.feed-shared-update-v2__description-wrapper").first.inner_text() + if text_el.is_visible(): + raw_text = text_el.inner_text() + else: + raw_text = post.locator( + "div.feed-shared-update-v2__description-wrapper" + ).first.inner_text() cleaned_text = clean_linkedin_text(raw_text) poster_name = "(Unknown)" poster_el = post.locator(POSTER_SELECTOR).first - if poster_el.is_visible(): poster_name = poster_el.inner_text().strip() + if poster_el.is_visible(): + poster_name = poster_el.inner_text().strip() else: - poster_el = post.locator("span.update-components-actor__title span[dir='ltr']").first - if poster_el.is_visible(): poster_name = poster_el.inner_text().strip() + poster_el = post.locator( + "span.update-components-actor__title span[dir='ltr']" + ).first + if poster_el.is_visible(): + poster_name = poster_el.inner_text().strip() key = f"{poster_name[:20]}::{cleaned_text[:30]}" if cleaned_text and len(cleaned_text) > 20 and key not in seen: seen.add(key) - results.append({ - "source": "LinkedIn", - "poster": poster_name, - "text": cleaned_text, - "url": "https://www.linkedin.com" - }) + results.append( + { + "source": "LinkedIn", + "poster": poster_name, + "text": cleaned_text, + "url": "https://www.linkedin.com", + } + ) logger.info(f"[LINKEDIN] Found post by {poster_name}") except Exception: continue @@ -2056,29 +2431,35 @@ def scrape_linkedin(keywords: Optional[List[str]] = None, max_items: int = 10): previous_height = new_height browser.close() - return json.dumps({"site": "LinkedIn", "results": results, "storage_state": session_path}, default=str) + return json.dumps( + {"site": "LinkedIn", "results": results, "storage_state": session_path}, + default=str, + ) except Exception as e: return json.dumps({"error": str(e)}) + # ===================================================== # šŸ”§ TWITTER UTILITY FUNCTIONS # ===================================================== + def clean_twitter_text(text): """Clean and normalize tweet text""" if not text: return "" - + # Remove common Twitter artifacts text = re.sub(r"Show more", "", text, flags=re.IGNORECASE) text = re.sub(r"https://t\.co/\w+", "", text) # Remove t.co links text = re.sub(r"pic\.twitter\.com/\w+", "", text) # Remove pic.twitter.com links text = re.sub(r"\s+", " ", text) # Normalize whitespace text = "\n".join([line.strip() for line in text.splitlines() if line.strip()]) - + return text.strip() + def extract_twitter_timestamp(tweet_element): """Extract timestamp from tweet element""" try: @@ -2086,9 +2467,9 @@ def extract_twitter_timestamp(tweet_element): "time", "[datetime]", "a[href*='/status/'] time", - "div[data-testid='User-Name'] a[href*='/status/']" + "div[data-testid='User-Name'] a[href*='/status/']", ] - + for selector in timestamp_selectors: if tweet_element.locator(selector).count() > 0: time_element = tweet_element.locator(selector).first @@ -2103,7 +2484,6 @@ def extract_twitter_timestamp(tweet_element): return "Unknown" - @tool def scrape_twitter(query: str = "Sri Lanka", max_items: int = 20): """ @@ -2111,34 +2491,39 @@ def scrape_twitter(query: str = "Sri Lanka", max_items: int = 20): Requires a valid Twitter session file (twitter_storage_state.json or tw_state.json). """ ensure_playwright() - + # Load Session site = "twitter" - session_path = load_playwright_storage_state_path(site, out_dir="src/utils/.sessions") + session_path = load_playwright_storage_state_path( + site, out_dir="src/utils/.sessions" + ) if not session_path: session_path = load_playwright_storage_state_path(site, out_dir=".sessions") - + # Check for alternative session file name if not session_path: alt_paths = [ os.path.join(os.getcwd(), "src", "utils", ".sessions", "tw_state.json"), os.path.join(os.getcwd(), ".sessions", "tw_state.json"), - os.path.join(os.getcwd(), "tw_state.json") + os.path.join(os.getcwd(), "tw_state.json"), ] for path in alt_paths: if os.path.exists(path): session_path = path logger.info(f"[TWITTER] Found session at {path}") break - + if not session_path: - return json.dumps({ - "error": "No Twitter session found", - "solution": "Run the Twitter session manager to create a session" - }, default=str) - + return json.dumps( + { + "error": "No Twitter session found", + "solution": "Run the Twitter session manager to create a session", + }, + default=str, + ) + results = [] - + try: with sync_playwright() as p: browser = p.chromium.launch( @@ -2147,36 +2532,38 @@ def scrape_twitter(query: str = "Sri Lanka", max_items: int = 20): "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-dev-shm-usage", - ] + ], ) - + context = browser.new_context( storage_state=session_path, viewport={"width": 1280, "height": 720}, - user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", ) - - context.add_init_script(""" + + context.add_init_script( + """ Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); window.chrome = {runtime: {}}; - """) - + """ + ) + page = context.new_page() - + # Try different search URLs search_urls = [ f"https://x.com/search?q={quote_plus(query)}&src=typed_query&f=live", f"https://x.com/search?q={quote_plus(query)}&src=typed_query", f"https://x.com/search?q={quote_plus(query)}", ] - + success = False for url in search_urls: try: logger.info(f"[TWITTER] Trying {url}") page.goto(url, timeout=60000, wait_until="domcontentloaded") time.sleep(5) - + # Handle popups popup_selectors = [ "[data-testid='app-bar-close']", @@ -2185,15 +2572,20 @@ def scrape_twitter(query: str = "Sri Lanka", max_items: int = 20): ] for selector in popup_selectors: try: - if page.locator(selector).count() > 0 and page.locator(selector).first.is_visible(): + if ( + page.locator(selector).count() > 0 + and page.locator(selector).first.is_visible() + ): page.locator(selector).first.click() time.sleep(1) except: pass - + # Wait for tweets try: - page.wait_for_selector("article[data-testid='tweet']", timeout=15000) + page.wait_for_selector( + "article[data-testid='tweet']", timeout=15000 + ) logger.info("[TWITTER] Tweets found!") success = True break @@ -2203,26 +2595,30 @@ def scrape_twitter(query: str = "Sri Lanka", max_items: int = 20): except Exception as e: logger.error(f"[TWITTER] Navigation failed: {e}") continue - + if not success or "login" in page.url: logger.error("[TWITTER] Could not load tweets or session expired") - return json.dumps({"error": "Session invalid or tweets not found"}, default=str) - + return json.dumps( + {"error": "Session invalid or tweets not found"}, default=str + ) + # Scraping seen = set() scroll_attempts = 0 max_scroll_attempts = 15 - + TWEET_SELECTOR = "article[data-testid='tweet']" TEXT_SELECTOR = "div[data-testid='tweetText']" USER_SELECTOR = "div[data-testid='User-Name']" - + while len(results) < max_items and scroll_attempts < max_scroll_attempts: scroll_attempts += 1 - + # Expand "Show more" buttons try: - show_more_buttons = page.locator("[data-testid='tweet-text-show-more-link']").all() + show_more_buttons = page.locator( + "[data-testid='tweet-text-show-more-link']" + ).all() for button in show_more_buttons: if button.is_visible(): try: @@ -2232,84 +2628,102 @@ def scrape_twitter(query: str = "Sri Lanka", max_items: int = 20): pass except: pass - + # Collect tweets tweets = page.locator(TWEET_SELECTOR).all() new_tweets_found = 0 - + for tweet in tweets: if len(results) >= max_items: break - + try: tweet.scroll_into_view_if_needed() time.sleep(0.1) - + # Skip promoted tweets - if (tweet.locator("span:has-text('Promoted')").count() > 0 or - tweet.locator("span:has-text('Ad')").count() > 0): + if ( + tweet.locator("span:has-text('Promoted')").count() > 0 + or tweet.locator("span:has-text('Ad')").count() > 0 + ): continue - + # Extract text text_content = "" text_element = tweet.locator(TEXT_SELECTOR).first if text_element.count() > 0: text_content = text_element.inner_text() - + cleaned_text = clean_twitter_text(text_content) - + # Extract user user_info = "Unknown" user_element = tweet.locator(USER_SELECTOR).first if user_element.count() > 0: user_text = user_element.inner_text() - user_info = user_text.split('\n')[0].strip() - + user_info = user_text.split("\n")[0].strip() + # Extract timestamp timestamp = extract_twitter_timestamp(tweet) - + # Deduplication text_key = cleaned_text[:50] if cleaned_text else "" unique_key = f"{user_info}_{text_key}" - - if (cleaned_text and len(cleaned_text) > 20 and - unique_key not in seen and - not any(word in cleaned_text.lower() for word in ["promoted", "advertisement"])): - + + if ( + cleaned_text + and len(cleaned_text) > 20 + and unique_key not in seen + and not any( + word in cleaned_text.lower() + for word in ["promoted", "advertisement"] + ) + ): + seen.add(unique_key) - results.append({ - "source": "Twitter", - "poster": user_info, - "text": cleaned_text, - "timestamp": timestamp, - "url": "https://x.com" - }) + results.append( + { + "source": "Twitter", + "poster": user_info, + "text": cleaned_text, + "timestamp": timestamp, + "url": "https://x.com", + } + ) new_tweets_found += 1 - logger.info(f"[TWITTER] Collected tweet {len(results)}/{max_items}") - + logger.info( + f"[TWITTER] Collected tweet {len(results)}/{max_items}" + ) + except Exception: continue - + # Scroll down if len(results) < max_items: - page.evaluate("window.scrollTo(0, document.documentElement.scrollHeight)") + page.evaluate( + "window.scrollTo(0, document.documentElement.scrollHeight)" + ) time.sleep(random.uniform(2, 3)) - + if new_tweets_found == 0: scroll_attempts += 1 else: scroll_attempts = 0 - + browser.close() - - return json.dumps({ - "source": "Twitter", - "query": query, - "results": results, - "total_found": len(results), - "fetched_at": datetime.utcnow().isoformat() - }, default=str, indent=2) - + + return json.dumps( + { + "source": "Twitter", + "query": query, + "results": results, + "total_found": len(results), + "fetched_at": datetime.utcnow().isoformat(), + }, + default=str, + indent=2, + ) + except Exception as e: logger.error(f"[TWITTER] {e}") return json.dumps({"error": str(e)}, default=str) @@ -2322,12 +2736,12 @@ def scrape_twitter(query: str = "Sri Lanka", max_items: int = 20): # try: # if query.strip().lower() in ("trending", "trends", "trending srilanka", "trending sri lanka"): # return json.dumps(scrape_twitter_trending_srilanka(use_playwright=use_playwright, storage_state_site=storage_state_site), default=str) - + # if use_playwright and PLAYWRIGHT_AVAILABLE: # storage_state = None # if storage_state_site: # storage_state = load_playwright_storage_state_path(storage_state_site) - + # search_url = f"https://twitter.com/search?q={quote_plus(query)}&src=typed_query" # try: # html = playwright_fetch_html_using_session(search_url, storage_state or "", headless=True) @@ -2336,7 +2750,7 @@ def scrape_twitter(query: str = "Sri Lanka", max_items: int = 20): # return json.dumps({"source": "twitter_playwright", "results": items}, default=str) # except Exception as e: # logger.debug(f"[TWITTER] Playwright search failed: {e}") - + # nitter = "https://nitter.net" # search_url = f"{nitter}/search?f=tweets&q={quote_plus(query)}" # resp = _safe_get(search_url) @@ -2354,7 +2768,6 @@ def scrape_twitter(query: str = "Sri Lanka", max_items: int = 20): # return json.dumps({"error": str(e)}) - def clean_linkedin_text(text): if not text: return "" @@ -2374,6 +2787,7 @@ def clean_linkedin_text(text): # FACEBOOK & INSTAGRAM UTILITY FUNCTIONS # ===================================================== + def clean_fb_text(text): """Clean Facebook noisy text""" if not text: @@ -2439,67 +2853,72 @@ def scrape_instagram(keywords: Optional[List[str]] = None, max_items: int = 15): Scrapes posts from hashtag search and extracts captions. """ ensure_playwright() - + # Load Session site = "instagram" - session_path = load_playwright_storage_state_path(site, out_dir="src/utils/.sessions") + session_path = load_playwright_storage_state_path( + site, out_dir="src/utils/.sessions" + ) if not session_path: session_path = load_playwright_storage_state_path(site, out_dir=".sessions") - + # Check for alternative session file name if not session_path: alt_paths = [ os.path.join(os.getcwd(), "src", "utils", ".sessions", "ig_state.json"), os.path.join(os.getcwd(), ".sessions", "ig_state.json"), - os.path.join(os.getcwd(), "ig_state.json") + os.path.join(os.getcwd(), "ig_state.json"), ] for path in alt_paths: if os.path.exists(path): session_path = path logger.info(f"[INSTAGRAM] Found session at {path}") break - + if not session_path: - return json.dumps({ - "error": "No Instagram session found", - "solution": "Run the Instagram session manager to create a session" - }, default=str) - + return json.dumps( + { + "error": "No Instagram session found", + "solution": "Run the Instagram session manager to create a session", + }, + default=str, + ) + keyword = " ".join(keywords) if keywords else "srilanka" keyword = keyword.replace(" ", "") # Instagram hashtags don't have spaces results = [] - + try: with sync_playwright() as p: instagram_mobile_ua = ( "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) " "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1" ) - + browser = p.chromium.launch(headless=True) - + context = browser.new_context( storage_state=session_path, user_agent=instagram_mobile_ua, viewport={"width": 430, "height": 932}, ) - + page = context.new_page() url = f"https://www.instagram.com/explore/tags/{keyword}/" - + logger.info(f"[INSTAGRAM] Navigating to {url}") page.goto(url, timeout=120000) page.wait_for_timeout(4000) - + # Scroll to load posts for _ in range(12): page.mouse.wheel(0, 2500) page.wait_for_timeout(1500) - + # Collect post links anchors = page.locator("a[href*='/p/'], a[href*='/reel/']").all() links = [] - + for a in anchors: href = a.get_attribute("href") if href: @@ -2507,42 +2926,53 @@ def scrape_instagram(keywords: Optional[List[str]] = None, max_items: int = 15): links.append(full) if len(links) >= max_items: break - + logger.info(f"[INSTAGRAM] Found {len(links)} posts") - + # Extract captions from each post for link in links: logger.info(f"[INSTAGRAM] Scraping {link}") page.goto(link, timeout=120000) page.wait_for_timeout(2000) - + media_id = extract_media_id_instagram(page) caption = fetch_caption_via_private_api(page, media_id) - + # Fallback to direct extraction if not caption: try: - caption = page.locator("article h1, article span").first.inner_text().strip() + caption = ( + page.locator("article h1, article span") + .first.inner_text() + .strip() + ) except: caption = None - + if caption: - results.append({ - "source": "Instagram", - "text": caption, - "url": link, - "poster": "(Instagram User)" - }) - logger.info(f"[INSTAGRAM] Collected caption {len(results)}/{max_items}") - + results.append( + { + "source": "Instagram", + "text": caption, + "url": link, + "poster": "(Instagram User)", + } + ) + logger.info( + f"[INSTAGRAM] Collected caption {len(results)}/{max_items}" + ) + browser.close() - - return json.dumps({ - "site": "Instagram", - "results": results, - "storage_state": session_path - }, default=str) - + + return json.dumps( + { + "site": "Instagram", + "results": results, + "storage_state": session_path, + }, + default=str, + ) + except Exception as e: logger.error(f"[INSTAGRAM] {e}") return json.dumps({"error": str(e)}, default=str) @@ -2555,64 +2985,69 @@ def scrape_facebook(keywords: Optional[List[str]] = None, max_items: int = 10): Extracts posts from keyword search with poster names and text. """ ensure_playwright() - + # Load Session site = "facebook" - session_path = load_playwright_storage_state_path(site, out_dir="src/utils/.sessions") + session_path = load_playwright_storage_state_path( + site, out_dir="src/utils/.sessions" + ) if not session_path: session_path = load_playwright_storage_state_path(site, out_dir=".sessions") - + # Check for alternative session file name if not session_path: alt_paths = [ os.path.join(os.getcwd(), "src", "utils", ".sessions", "fb_state.json"), os.path.join(os.getcwd(), ".sessions", "fb_state.json"), - os.path.join(os.getcwd(), "fb_state.json") + os.path.join(os.getcwd(), "fb_state.json"), ] for path in alt_paths: if os.path.exists(path): session_path = path logger.info(f"[FACEBOOK] Found session at {path}") break - + if not session_path: - return json.dumps({ - "error": "No Facebook session found", - "solution": "Run the Facebook session manager to create a session" - }, default=str) - + return json.dumps( + { + "error": "No Facebook session found", + "solution": "Run the Facebook session manager to create a session", + }, + default=str, + ) + keyword = " ".join(keywords) if keywords else "Sri Lanka" results = [] - + try: with sync_playwright() as p: facebook_desktop_ua = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) - + browser = p.chromium.launch(headless=True) - + context = browser.new_context( storage_state=session_path, user_agent=facebook_desktop_ua, viewport={"width": 1400, "height": 900}, ) - + page = context.new_page() - + search_url = f"https://www.facebook.com/search/posts?q={quote(keyword)}" - + logger.info(f"[FACEBOOK] Navigating to {search_url}") page.goto(search_url, timeout=120000) time.sleep(5) - + seen = set() stuck = 0 last_scroll = 0 - + MESSAGE_SELECTOR = "div[data-ad-preview='message']" - + # Poster selectors POSTER_SELECTORS = [ "h3 strong a span", @@ -2625,11 +3060,13 @@ def scrape_facebook(keywords: Optional[List[str]] = None, max_items: int = 10): "a[aria-hidden='false'] span", "a[role='link'] span", ] - + def extract_poster(post): """Extract poster name from Facebook post""" - parent = post.locator("xpath=ancestor::div[contains(@class, 'x1yztbdb')][1]") - + parent = post.locator( + "xpath=ancestor::div[contains(@class, 'x1yztbdb')][1]" + ) + for selector in POSTER_SELECTORS: try: el = parent.locator(selector).first @@ -2639,9 +3076,9 @@ def scrape_facebook(keywords: Optional[List[str]] = None, max_items: int = 10): return name except: pass - + return "(Unknown)" - + # IMPROVED: Expand ALL "See more" buttons on page before extracting def expand_all_see_more(): """Click all 'See more' buttons on the visible page""" @@ -2659,7 +3096,7 @@ def scrape_facebook(keywords: Optional[List[str]] = None, max_items: int = 10): "text='See more'", "text='... See more'", ] - + clicked = 0 for selector in see_more_selectors: try: @@ -2676,34 +3113,36 @@ def scrape_facebook(keywords: Optional[List[str]] = None, max_items: int = 10): pass except: pass - + if clicked > 0: logger.info(f"[FACEBOOK] Expanded {clicked} 'See more' buttons") return clicked - + while len(results) < max_items: # First expand all "See more" on visible content expand_all_see_more() time.sleep(0.5) - + posts = page.locator(MESSAGE_SELECTOR).all() - + for post in posts: try: # Try to expand within this specific post container too try: post.scroll_into_view_if_needed() time.sleep(0.3) - + # Look for See more in parent container - parent = post.locator("xpath=ancestor::div[contains(@class, 'x1yztbdb')][1]") - + parent = post.locator( + "xpath=ancestor::div[contains(@class, 'x1yztbdb')][1]" + ) + post_see_more_selectors = [ "div[role='button'] span:text-is('See more')", "span:text-is('See more')", "div[role='button']:has-text('See more')", ] - + for selector in post_see_more_selectors: try: btns = parent.locator(selector) @@ -2715,57 +3154,66 @@ def scrape_facebook(keywords: Optional[List[str]] = None, max_items: int = 10): pass except: pass - + raw = post.inner_text().strip() cleaned = clean_fb_text(raw) - + poster = extract_poster(post) - + if cleaned and len(cleaned) > 30: key = poster + "::" + cleaned if key not in seen: seen.add(key) - results.append({ - "source": "Facebook", - "poster": poster, - "text": cleaned, - "url": "https://www.facebook.com" - }) - logger.info(f"[FACEBOOK] Collected post {len(results)}/{max_items}") - + results.append( + { + "source": "Facebook", + "poster": poster, + "text": cleaned, + "url": "https://www.facebook.com", + } + ) + logger.info( + f"[FACEBOOK] Collected post {len(results)}/{max_items}" + ) + if len(results) >= max_items: break - + except: pass - + # Scroll page.evaluate("window.scrollBy(0, 2300)") time.sleep(1.2) - + new_scroll = page.evaluate("window.scrollY") stuck = stuck + 1 if new_scroll == last_scroll else 0 last_scroll = new_scroll - + if stuck >= 3: logger.info("[FACEBOOK] Reached end of results") break - + browser.close() - - return json.dumps({ - "site": "Facebook", - "results": results[:max_items], - "storage_state": session_path - }, default=str) - + + return json.dumps( + { + "site": "Facebook", + "results": results[:max_items], + "storage_state": session_path, + }, + default=str, + ) + except Exception as e: logger.error(f"[FACEBOOK] {e}") return json.dumps({"error": str(e)}, default=str) @tool -def scrape_government_gazette(keywords: Optional[List[str]] = None, max_items: int = 15): +def scrape_government_gazette( + keywords: Optional[List[str]] = None, max_items: int = 15 +): """ Search and scrape Sri Lankan government gazette entries from gazette.lk. This tool visits each gazette page to extract full descriptions and download links (PDFs). @@ -2774,7 +3222,6 @@ def scrape_government_gazette(keywords: Optional[List[str]] = None, max_items: i return json.dumps(data, default=str) - def clean_linkedin_text(text): if not text: return "" @@ -2789,8 +3236,11 @@ def clean_linkedin_text(text): return text.strip() + @tool -def scrape_parliament_minutes(keywords: Optional[List[str]] = None, max_items: int = 20): +def scrape_parliament_minutes( + keywords: Optional[List[str]] = None, max_items: int = 20 +): """ Search and scrape Sri Lankan Parliament Hansards and minutes matching keywords. """ @@ -2798,7 +3248,6 @@ def scrape_parliament_minutes(keywords: Optional[List[str]] = None, max_items: i return json.dumps(data, default=str) - def clean_linkedin_text(text): if not text: return "" @@ -2813,16 +3262,26 @@ def clean_linkedin_text(text): return text.strip() + @tool -def scrape_train_schedule(from_station: Optional[str] = None, to_station: Optional[str] = None, keyword: Optional[str] = None, max_items: int = 30): +def scrape_train_schedule( + from_station: Optional[str] = None, + to_station: Optional[str] = None, + keyword: Optional[str] = None, + max_items: int = 30, +): """ Scrape Sri Lanka Railways train schedule based on stations or keywords. """ - data = scrape_train_schedule_impl(from_station=from_station, to_station=to_station, keyword=keyword, max_items=max_items) + data = scrape_train_schedule_impl( + from_station=from_station, + to_station=to_station, + keyword=keyword, + max_items=max_items, + ) return json.dumps(data, default=str) - def clean_linkedin_text(text): if not text: return "" @@ -2837,8 +3296,11 @@ def clean_linkedin_text(text): return text.strip() + @tool -def scrape_cse_stock_data(symbol: str = "ASPI", period: str = "1d", interval: str = "1h"): +def scrape_cse_stock_data( + symbol: str = "ASPI", period: str = "1d", interval: str = "1h" +): """ Scrape Colombo Stock Exchange (CSE) data for a given symbol (e.g., ASPI). Tries yfinance first, then falls back to direct site scraping. @@ -2847,7 +3309,6 @@ def scrape_cse_stock_data(symbol: str = "ASPI", period: str = "1d", interval: st return json.dumps(data, default=str) - def clean_linkedin_text(text): if not text: return "" @@ -2862,6 +3323,7 @@ def clean_linkedin_text(text): return text.strip() + @tool def scrape_local_news(keywords: Optional[List[str]] = None, max_articles: int = 30): """ @@ -2871,7 +3333,6 @@ def scrape_local_news(keywords: Optional[List[str]] = None, max_articles: int = return json.dumps(data, default=str) - def clean_linkedin_text(text): if not text: return "" @@ -2886,6 +3347,7 @@ def clean_linkedin_text(text): return text.strip() + @tool def think_tool(reflection: str) -> str: """ @@ -2894,11 +3356,11 @@ def think_tool(reflection: str) -> str: return f"Reflection recorded: {reflection}" - # ===================================================== # FACEBOOK & INSTAGRAM UTILITY FUNCTIONS # ===================================================== + def clean_fb_text(text): """Clean Facebook noisy text""" if not text: @@ -2964,67 +3426,72 @@ def scrape_instagram(keywords: Optional[List[str]] = None, max_items: int = 15): Scrapes posts from hashtag search and extracts captions. """ ensure_playwright() - + # Load Session site = "instagram" - session_path = load_playwright_storage_state_path(site, out_dir="src/utils/.sessions") + session_path = load_playwright_storage_state_path( + site, out_dir="src/utils/.sessions" + ) if not session_path: session_path = load_playwright_storage_state_path(site, out_dir=".sessions") - + # Check for alternative session file name if not session_path: alt_paths = [ os.path.join(os.getcwd(), "src", "utils", ".sessions", "ig_state.json"), os.path.join(os.getcwd(), ".sessions", "ig_state.json"), - os.path.join(os.getcwd(), "ig_state.json") + os.path.join(os.getcwd(), "ig_state.json"), ] for path in alt_paths: if os.path.exists(path): session_path = path logger.info(f"[INSTAGRAM] Found session at {path}") break - + if not session_path: - return json.dumps({ - "error": "No Instagram session found", - "solution": "Run the Instagram session manager to create a session" - }, default=str) - + return json.dumps( + { + "error": "No Instagram session found", + "solution": "Run the Instagram session manager to create a session", + }, + default=str, + ) + keyword = " ".join(keywords) if keywords else "srilanka" keyword = keyword.replace(" ", "") # Instagram hashtags don't have spaces results = [] - + try: with sync_playwright() as p: instagram_mobile_ua = ( "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) " "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1" ) - + browser = p.chromium.launch(headless=True) - + context = browser.new_context( storage_state=session_path, user_agent=instagram_mobile_ua, viewport={"width": 430, "height": 932}, ) - + page = context.new_page() url = f"https://www.instagram.com/explore/tags/{keyword}/" - + logger.info(f"[INSTAGRAM] Navigating to {url}") page.goto(url, timeout=120000) page.wait_for_timeout(4000) - + # Scroll to load posts for _ in range(12): page.mouse.wheel(0, 2500) page.wait_for_timeout(1500) - + # Collect post links anchors = page.locator("a[href*='/p/'], a[href*='/reel/']").all() links = [] - + for a in anchors: href = a.get_attribute("href") if href: @@ -3032,42 +3499,53 @@ def scrape_instagram(keywords: Optional[List[str]] = None, max_items: int = 15): links.append(full) if len(links) >= max_items: break - + logger.info(f"[INSTAGRAM] Found {len(links)} posts") - + # Extract captions from each post for link in links: logger.info(f"[INSTAGRAM] Scraping {link}") page.goto(link, timeout=120000) page.wait_for_timeout(2000) - + media_id = extract_media_id_instagram(page) caption = fetch_caption_via_private_api(page, media_id) - + # Fallback to direct extraction if not caption: try: - caption = page.locator("article h1, article span").first.inner_text().strip() + caption = ( + page.locator("article h1, article span") + .first.inner_text() + .strip() + ) except: caption = None - + if caption: - results.append({ - "source": "Instagram", - "text": caption, - "url": link, - "poster": "(Instagram User)" - }) - logger.info(f"[INSTAGRAM] Collected caption {len(results)}/{max_items}") - + results.append( + { + "source": "Instagram", + "text": caption, + "url": link, + "poster": "(Instagram User)", + } + ) + logger.info( + f"[INSTAGRAM] Collected caption {len(results)}/{max_items}" + ) + browser.close() - - return json.dumps({ - "site": "Instagram", - "results": results, - "storage_state": session_path - }, default=str) - + + return json.dumps( + { + "site": "Instagram", + "results": results, + "storage_state": session_path, + }, + default=str, + ) + except Exception as e: logger.error(f"[INSTAGRAM] {e}") return json.dumps({"error": str(e)}, default=str) @@ -3080,63 +3558,70 @@ def scrape_facebook(keywords: Optional[List[str]] = None, max_items: int = 10): Extracts posts from keyword search with poster names and text. """ ensure_playwright() - + # Load Session site = "facebook" - session_path = load_playwright_storage_state_path(site, out_dir="src/utils/.sessions") + session_path = load_playwright_storage_state_path( + site, out_dir="src/utils/.sessions" + ) if not session_path: session_path = load_playwright_storage_state_path(site, out_dir=".sessions") - + # Check for alternative session file name if not session_path: alt_paths = [ os.path.join(os.getcwd(), "src", "utils", ".sessions", "fb_state.json"), os.path.join(os.getcwd(), ".sessions", "fb_state.json"), - os.path.join(os.getcwd(), "fb_state.json") + os.path.join(os.getcwd(), "fb_state.json"), ] for path in alt_paths: if os.path.exists(path): session_path = path logger.info(f"[FACEBOOK] Found session at {path}") break - + if not session_path: - return json.dumps({ - "error": "No Facebook session found", - "solution": "Run the Facebook session manager to create a session" - }, default=str) - + return json.dumps( + { + "error": "No Facebook session found", + "solution": "Run the Facebook session manager to create a session", + }, + default=str, + ) + keyword = " ".join(keywords) if keywords else "Sri Lanka" results = [] - + try: with sync_playwright() as p: facebook_desktop_ua = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) - + browser = p.chromium.launch(headless=True) - + context = browser.new_context( storage_state=session_path, user_agent=facebook_desktop_ua, viewport={"width": 1400, "height": 900}, ) - + page = context.new_page() - search_url = f"https://www.facebook.com/search/posts?q={keyword.replace(' ', '%20')}" - + search_url = ( + f"https://www.facebook.com/search/posts?q={keyword.replace(' ', '%20')}" + ) + logger.info(f"[FACEBOOK] Navigating to {search_url}") page.goto(search_url, timeout=120000) time.sleep(5) - + seen = set() stuck = 0 last_scroll = 0 - + MESSAGE_SELECTOR = "div[data-ad-preview='message']" - + # Poster selectors POSTER_SELECTORS = [ "h3 strong a span", @@ -3149,11 +3634,13 @@ def scrape_facebook(keywords: Optional[List[str]] = None, max_items: int = 10): "a[aria-hidden='false'] span", "a[role='link'] span", ] - + def extract_poster(post): """Extract poster name from Facebook post""" - parent = post.locator("xpath=ancestor::div[contains(@class, 'x1yztbdb')][1]") - + parent = post.locator( + "xpath=ancestor::div[contains(@class, 'x1yztbdb')][1]" + ) + for selector in POSTER_SELECTORS: try: el = parent.locator(selector).first @@ -3163,64 +3650,73 @@ def scrape_facebook(keywords: Optional[List[str]] = None, max_items: int = 10): return name except: pass - + return "(Unknown)" - + while len(results) < max_items: posts = page.locator(MESSAGE_SELECTOR).all() - + for post in posts: try: raw = post.inner_text().strip() cleaned = clean_fb_text(raw) - + poster = extract_poster(post) - + if cleaned and len(cleaned) > 30: key = poster + "::" + cleaned if key not in seen: seen.add(key) - results.append({ - "source": "Facebook", - "poster": poster, - "text": cleaned, - "url": "https://www.facebook.com" - }) - logger.info(f"[FACEBOOK] Collected post {len(results)}/{max_items}") - + results.append( + { + "source": "Facebook", + "poster": poster, + "text": cleaned, + "url": "https://www.facebook.com", + } + ) + logger.info( + f"[FACEBOOK] Collected post {len(results)}/{max_items}" + ) + if len(results) >= max_items: break - + except: pass - + # Scroll page.evaluate("window.scrollBy(0, 2300)") time.sleep(1.2) - + new_scroll = page.evaluate("window.scrollY") stuck = stuck + 1 if new_scroll == last_scroll else 0 last_scroll = new_scroll - + if stuck >= 3: logger.info("[FACEBOOK] Reached end of results") break - + browser.close() - - return json.dumps({ - "site": "Facebook", - "results": results[:max_items], - "storage_state": session_path - }, default=str) - + + return json.dumps( + { + "site": "Facebook", + "results": results[:max_items], + "storage_state": session_path, + }, + default=str, + ) + except Exception as e: logger.error(f"[FACEBOOK] {e}") return json.dumps({"error": str(e)}, default=str) @tool -def scrape_reddit(keywords: List[str], limit: int = 20, subreddit: Optional[str] = None): +def scrape_reddit( + keywords: List[str], limit: int = 20, subreddit: Optional[str] = None +): """ Scrape Reddit for posts matching specific keywords. Optionally restrict to a specific subreddit. @@ -3229,7 +3725,6 @@ def scrape_reddit(keywords: List[str], limit: int = 20, subreddit: Optional[str] return json.dumps(data, default=str) - # ============================================ # TOOL REGISTRY & EXPORTS # ============================================ @@ -3255,8 +3750,9 @@ try: scrape_facebook_profile, scrape_instagram_profile, scrape_linkedin_profile, - scrape_product_reviews + scrape_product_reviews, ) + TOOL_MAPPING["scrape_twitter_profile"] = scrape_twitter_profile TOOL_MAPPING["scrape_facebook_profile"] = scrape_facebook_profile TOOL_MAPPING["scrape_instagram_profile"] = scrape_instagram_profile