Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| URL Discovery Agent - Pattern-Based Approach | |
| Discovers official government websites using sustainable, vendor-neutral methods: | |
| ✅ Pattern Matching: Generate URLs from jurisdiction names using common patterns | |
| ✅ GSA Domain Matching: Direct matching with .gov domain registry | |
| ✅ Web Crawling & Verification: Test candidate URLs and discover minutes pages | |
| ✅ CMS Detection: Identify government CMS platforms | |
| ✅ Confidence Scoring: Rank results by validation signals | |
| This approach is: | |
| ✅ Free (no API costs) | |
| ✅ Reliable (no API quotas or rate limits) | |
| ✅ Reproducible (deterministic patterns) | |
| ✅ Sustainable (vendor-neutral, future-proof) | |
| Note: Does NOT use Google Custom Search or Bing APIs - those are deprecated | |
| for production use and not recommended for new systems. | |
| """ | |
| import asyncio | |
| import re | |
| from typing import List, Optional, Set, Dict, Tuple | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from urllib.parse import urlparse, urljoin | |
| import httpx | |
| from bs4 import BeautifulSoup | |
| from loguru import logger | |
| from difflib import SequenceMatcher | |
| class JurisdictionURL: | |
| """Discovered URL for a jurisdiction.""" | |
| jurisdiction_id: str | |
| jurisdiction_name: str | |
| state: str | |
| homepage_url: Optional[str] = None | |
| minutes_url: Optional[str] = None | |
| cms_platform: Optional[str] = None | |
| is_gov_domain: bool = False | |
| discovery_method: str = "unknown" | |
| confidence_score: float = 0.0 | |
| last_verified: Optional[datetime] = None | |
| class URLDiscoveryAgent: | |
| """Pattern-based URL discovery agent.""" | |
| # CMS platform signatures | |
| CMS_SIGNATURES = { | |
| "granicus": ["granicus.com", "legistar.com"], | |
| "civicclerk": ["civicclerk.com", "civicweb.net"], | |
| "municode": ["municode.com"], | |
| "laserfiche": ["laserfiche.com"], | |
| "primegov": ["primegov.com"], | |
| "govos": ["govos.com"], | |
| "swagit": ["swagit.com"] | |
| } | |
| # Minutes page keywords | |
| MINUTES_KEYWORDS = [ | |
| "minutes", "agendas", "meetings", "council", "board", | |
| "government", "sessions", "agenda center" | |
| ] | |
| def __init__(self, gsa_domains: Set[str], gsa_domain_data: Optional[List[Dict]] = None): | |
| """ | |
| Initialize discovery agent. | |
| Args: | |
| gsa_domains: Set of .gov domains from GSA registry | |
| gsa_domain_data: Full GSA domain data with org names | |
| """ | |
| self.gsa_domains = gsa_domains | |
| self.gsa_domain_data = gsa_domain_data or [] | |
| self.client = httpx.AsyncClient( | |
| timeout=15.0, | |
| follow_redirects=True, | |
| headers={ | |
| "User-Agent": "Mozilla/5.0 (compatible; OralHealthPolicyBot/2.0)" | |
| } | |
| ) | |
| # Build fast lookup index | |
| self.domain_lookup = self._build_domain_lookup() | |
| def _build_domain_lookup(self) -> Dict[str, Dict]: | |
| """Build normalized name -> domain lookup.""" | |
| lookup = {} | |
| for item in self.gsa_domain_data: | |
| org_name = item.get("Organization", "") | |
| if org_name: | |
| normalized = self._normalize_name(org_name) | |
| lookup[normalized] = item | |
| return lookup | |
| def _normalize_name(self, name: str) -> str: | |
| """Normalize jurisdiction name for matching.""" | |
| name = name.lower() | |
| # Remove common prefixes/suffixes | |
| name = re.sub(r'\b(city of|town of|county of|village of|township of)\b', '', name) | |
| name = re.sub(r'\b(government|county|city|town)\b', '', name) | |
| name = re.sub(r'[^a-z0-9\s]', '', name) | |
| return ' '.join(name.split()).strip() | |
| def _similarity_score(self, str1: str, str2: str) -> float: | |
| """Calculate string similarity (0-1).""" | |
| return SequenceMatcher(None, str1, str2).ratio() | |
| def _generate_url_patterns(self, jurisdiction_name: str, state: str, | |
| jurisdiction_type: str) -> List[Tuple[str, float]]: | |
| """ | |
| Generate candidate URLs using common government patterns. | |
| Government URLs typically follow predictable patterns based on: | |
| - Jurisdiction type (county, city, school district) | |
| - Naming conventions (.gov, .us, .org) | |
| - Common formats (cityofX, co.X, Xschools) | |
| Args: | |
| jurisdiction_name: Name (e.g., "Sacramento") | |
| state: State abbreviation (e.g., "CA") | |
| jurisdiction_type: Type (county, municipality, school_district, etc.) | |
| Returns: | |
| List of (url, confidence_score) tuples | |
| """ | |
| # Normalize name for URLs | |
| name_clean = self._normalize_name(jurisdiction_name) | |
| name_slug = name_clean.replace(' ', '') | |
| name_dash = name_clean.replace(' ', '-') | |
| state_lower = state.lower() | |
| patterns = [] | |
| if jurisdiction_type == "county": | |
| # County URL patterns | |
| patterns.extend([ | |
| (f"https://www.co.{name_slug}.{state_lower}.us", 0.9), | |
| (f"https://{name_slug}county.gov", 0.9), | |
| (f"https://www.{name_slug}county.gov", 0.85), | |
| (f"https://{name_slug}.{state_lower}.gov", 0.8), | |
| (f"https://www.{name_slug}county.us", 0.7), | |
| (f"https://co-{name_dash}.{state_lower}.gov", 0.75), | |
| (f"https://{name_slug}county.org", 0.6), | |
| ]) | |
| elif jurisdiction_type == "municipality": | |
| # City/town URL patterns | |
| patterns.extend([ | |
| (f"https://www.{name_slug}.gov", 0.9), | |
| (f"https://{name_slug}.gov", 0.9), | |
| (f"https://www.cityof{name_slug}.gov", 0.85), | |
| (f"https://cityof{name_slug}.gov", 0.85), | |
| (f"https://www.{name_slug}.{state_lower}.gov", 0.8), | |
| (f"https://{name_slug}.{state_lower}.gov", 0.8), | |
| (f"https://www.{name_dash}.gov", 0.75), | |
| (f"https://www.{name_slug}.us", 0.7), | |
| (f"https://{name_slug}.us", 0.7), | |
| (f"https://www.{name_slug}.org", 0.6), | |
| ]) | |
| elif jurisdiction_type == "school_district": | |
| # School district patterns | |
| patterns.extend([ | |
| (f"https://www.{name_slug}schools.org", 0.8), | |
| (f"https://{name_slug}schools.org", 0.8), | |
| (f"https://www.{name_slug}schools.net", 0.75), | |
| (f"https://www.{name_slug}sd.org", 0.75), | |
| (f"https://{name_slug}sd.org", 0.75), | |
| (f"https://www.{name_slug}.k12.{state_lower}.us", 0.85), | |
| (f"https://{name_slug}.k12.{state_lower}.us", 0.85), | |
| (f"https://www.{name_slug}usd.org", 0.7), | |
| ]) | |
| elif jurisdiction_type == "township": | |
| # Township patterns | |
| patterns.extend([ | |
| (f"https://www.{name_slug}township.gov", 0.8), | |
| (f"https://{name_slug}township.gov", 0.8), | |
| (f"https://www.{name_slug}.{state_lower}.gov", 0.75), | |
| (f"https://{name_slug}twp.org", 0.7), | |
| ]) | |
| else: | |
| # Generic patterns for special districts, etc. | |
| patterns.extend([ | |
| (f"https://www.{name_slug}.gov", 0.7), | |
| (f"https://{name_slug}.gov", 0.7), | |
| (f"https://www.{name_slug}.org", 0.5), | |
| (f"https://{name_slug}.org", 0.5), | |
| ]) | |
| return patterns | |
| def _match_gsa_domain(self, jurisdiction_name: str, state: str) -> Optional[Tuple[str, float]]: | |
| """ | |
| Match jurisdiction to GSA .gov domain registry. | |
| Uses exact and fuzzy matching against the authoritative list. | |
| Args: | |
| jurisdiction_name: Jurisdiction name | |
| state: State name or abbreviation | |
| Returns: | |
| (domain_url, confidence) tuple or None | |
| """ | |
| normalized_name = self._normalize_name(jurisdiction_name) | |
| # Try exact match first | |
| if normalized_name in self.domain_lookup: | |
| domain_info = self.domain_lookup[normalized_name] | |
| domain = domain_info.get("Domain Name", "") | |
| if domain: | |
| return (f"https://{domain}", 1.0) | |
| # Fuzzy matching with state filter | |
| best_match = None | |
| best_score = 0.0 | |
| for org_name, domain_info in self.domain_lookup.items(): | |
| # Filter by state | |
| domain_state = domain_info.get("State", "") | |
| if domain_state and domain_state.lower() not in [state.lower(), state[:2].lower()]: | |
| continue | |
| # Calculate similarity | |
| score = self._similarity_score(normalized_name, org_name) | |
| if score > best_score and score > 0.75: # High threshold for fuzzy | |
| best_score = score | |
| domain = domain_info.get("Domain Name", "") | |
| if domain: | |
| best_match = (f"https://{domain}", score * 0.95) # Slight penalty for fuzzy | |
| return best_match | |
| async def _verify_url(self, url: str) -> bool: | |
| """ | |
| Verify URL is accessible. | |
| Args: | |
| url: URL to check | |
| Returns: | |
| True if accessible (status < 400) | |
| """ | |
| try: | |
| response = await self.client.head(url, timeout=10.0) | |
| return response.status_code < 400 | |
| except: | |
| # Some servers don't support HEAD, try GET | |
| try: | |
| response = await self.client.get(url, timeout=10.0) | |
| return response.status_code < 400 | |
| except: | |
| return False | |
| async def crawl_for_minutes(self, homepage_url: str) -> Optional[str]: | |
| """ | |
| Crawl homepage to find meeting minutes/agendas page. | |
| Args: | |
| homepage_url: Homepage URL | |
| Returns: | |
| Minutes page URL or None | |
| """ | |
| try: | |
| response = await self.client.get(homepage_url, timeout=15.0) | |
| if response.status_code >= 400: | |
| return None | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Search for links containing keywords | |
| for link in soup.find_all('a', href=True): | |
| href = link.get('href', '') | |
| text = link.get_text(strip=True).lower() | |
| # Check text and href for keywords | |
| combined = f"{text} {href}".lower() | |
| if any(keyword in combined for keyword in self.MINUTES_KEYWORDS): | |
| # Construct absolute URL | |
| full_url = urljoin(homepage_url, href) | |
| return full_url | |
| return None | |
| except Exception as e: | |
| logger.debug(f"Error crawling {homepage_url}: {e}") | |
| return None | |
| async def detect_cms_platform(self, url: str) -> Optional[str]: | |
| """ | |
| Detect government CMS platform. | |
| Args: | |
| url: URL to check | |
| Returns: | |
| CMS platform name or None | |
| """ | |
| try: | |
| response = await self.client.get(url, timeout=15.0) | |
| if response.status_code >= 400: | |
| return None | |
| html = response.text.lower() | |
| final_url = str(response.url).lower() | |
| # Check URL and HTML for CMS signatures | |
| for cms, signatures in self.CMS_SIGNATURES.items(): | |
| if any(sig in final_url or sig in html for sig in signatures): | |
| return cms | |
| return None | |
| except Exception as e: | |
| logger.debug(f"Error detecting CMS for {url}: {e}") | |
| return None | |
| def _calculate_confidence( | |
| self, | |
| base_confidence: float, | |
| is_gov_domain: bool, | |
| has_minutes_url: bool, | |
| has_cms_platform: bool | |
| ) -> float: | |
| """Calculate final confidence score.""" | |
| confidence = base_confidence | |
| # Bonuses for positive signals | |
| if is_gov_domain: | |
| confidence = min(confidence + 0.1, 1.0) | |
| if has_minutes_url: | |
| confidence = min(confidence + 0.1, 1.0) | |
| if has_cms_platform: | |
| confidence = min(confidence + 0.05, 1.0) | |
| return confidence | |
| async def _analyze_url( | |
| self, | |
| url: str, | |
| jurisdiction_id: str, | |
| jurisdiction_name: str, | |
| state: str, | |
| discovery_method: str, | |
| base_confidence: float | |
| ) -> JurisdictionURL: | |
| """ | |
| Analyze discovered URL for minutes and CMS. | |
| Args: | |
| url: Homepage URL | |
| jurisdiction_id: Jurisdiction ID | |
| jurisdiction_name: Jurisdiction name | |
| state: State | |
| discovery_method: How URL was found | |
| base_confidence: Base confidence score | |
| Returns: | |
| Complete JurisdictionURL object | |
| """ | |
| # Check if .gov domain | |
| domain = urlparse(url).netloc | |
| is_gov_domain = domain in self.gsa_domains or domain.endswith('.gov') | |
| # Find minutes page | |
| minutes_url = await self.crawl_for_minutes(url) | |
| # Detect CMS | |
| cms_platform = await self.detect_cms_platform(url) | |
| # Calculate confidence | |
| confidence = self._calculate_confidence( | |
| base_confidence=base_confidence, | |
| is_gov_domain=is_gov_domain, | |
| has_minutes_url=minutes_url is not None, | |
| has_cms_platform=cms_platform is not None | |
| ) | |
| return JurisdictionURL( | |
| jurisdiction_id=jurisdiction_id, | |
| jurisdiction_name=jurisdiction_name, | |
| state=state, | |
| homepage_url=url, | |
| minutes_url=minutes_url, | |
| cms_platform=cms_platform, | |
| is_gov_domain=is_gov_domain, | |
| discovery_method=discovery_method, | |
| confidence_score=confidence, | |
| last_verified=datetime.now() | |
| ) | |
| async def discover_jurisdiction( | |
| self, | |
| jurisdiction_id: str, | |
| jurisdiction_name: str, | |
| state: str, | |
| jurisdiction_type: str | |
| ) -> JurisdictionURL: | |
| """ | |
| Discover URLs for a jurisdiction using pattern-based approach. | |
| Strategy: | |
| 1. Try GSA domain registry matching (highest confidence) | |
| 2. Try common URL patterns (good confidence) | |
| 3. Verify and analyze discovered URLs | |
| Args: | |
| jurisdiction_id: Unique ID (FIPS code) | |
| jurisdiction_name: Name | |
| state: State name | |
| jurisdiction_type: Type (county, municipality, etc.) | |
| Returns: | |
| JurisdictionURL with discovered info | |
| """ | |
| logger.debug(f"Discovering: {jurisdiction_name}, {state} ({jurisdiction_type})") | |
| # Strategy 1: GSA domain matching (most reliable) | |
| gsa_match = self._match_gsa_domain(jurisdiction_name, state) | |
| if gsa_match: | |
| url, confidence = gsa_match | |
| if await self._verify_url(url): | |
| logger.info(f"✓ GSA match: {jurisdiction_name} -> {url}") | |
| return await self._analyze_url( | |
| url, jurisdiction_id, jurisdiction_name, | |
| state, "gsa_registry", confidence | |
| ) | |
| # Strategy 2: URL pattern matching | |
| patterns = self._generate_url_patterns(jurisdiction_name, state, jurisdiction_type) | |
| for url, pattern_confidence in patterns: | |
| if await self._verify_url(url): | |
| logger.info(f"✓ Pattern match: {jurisdiction_name} -> {url}") | |
| return await self._analyze_url( | |
| url, jurisdiction_id, jurisdiction_name, | |
| state, "pattern_match", pattern_confidence | |
| ) | |
| # No valid URL found | |
| logger.warning(f"✗ No URL found for {jurisdiction_name}, {state}") | |
| return JurisdictionURL( | |
| jurisdiction_id=jurisdiction_id, | |
| jurisdiction_name=jurisdiction_name, | |
| state=state, | |
| discovery_method="not_found" | |
| ) | |
| async def close(self): | |
| """Close HTTP client.""" | |
| await self.client.aclose() | |