open-navigator / discovery /url_discovery_agent.py
jcbowyer's picture
Clean HuggingFace deployment without binary files
61d29fc
"""
URL Discovery Agent - Pattern-Based Approach
Discovers official government websites using sustainable, vendor-neutral methods:
✅ Pattern Matching: Generate URLs from jurisdiction names using common patterns
✅ GSA Domain Matching: Direct matching with .gov domain registry
✅ Web Crawling & Verification: Test candidate URLs and discover minutes pages
✅ CMS Detection: Identify government CMS platforms
✅ Confidence Scoring: Rank results by validation signals
This approach is:
✅ Free (no API costs)
✅ Reliable (no API quotas or rate limits)
✅ Reproducible (deterministic patterns)
✅ Sustainable (vendor-neutral, future-proof)
Note: Does NOT use Google Custom Search or Bing APIs - those are deprecated
for production use and not recommended for new systems.
"""
import asyncio
import re
from typing import List, Optional, Set, Dict, Tuple
from dataclasses import dataclass
from datetime import datetime
from urllib.parse import urlparse, urljoin
import httpx
from bs4 import BeautifulSoup
from loguru import logger
from difflib import SequenceMatcher
@dataclass
class JurisdictionURL:
"""Discovered URL for a jurisdiction."""
jurisdiction_id: str
jurisdiction_name: str
state: str
homepage_url: Optional[str] = None
minutes_url: Optional[str] = None
cms_platform: Optional[str] = None
is_gov_domain: bool = False
discovery_method: str = "unknown"
confidence_score: float = 0.0
last_verified: Optional[datetime] = None
class URLDiscoveryAgent:
"""Pattern-based URL discovery agent."""
# CMS platform signatures
CMS_SIGNATURES = {
"granicus": ["granicus.com", "legistar.com"],
"civicclerk": ["civicclerk.com", "civicweb.net"],
"municode": ["municode.com"],
"laserfiche": ["laserfiche.com"],
"primegov": ["primegov.com"],
"govos": ["govos.com"],
"swagit": ["swagit.com"]
}
# Minutes page keywords
MINUTES_KEYWORDS = [
"minutes", "agendas", "meetings", "council", "board",
"government", "sessions", "agenda center"
]
def __init__(self, gsa_domains: Set[str], gsa_domain_data: Optional[List[Dict]] = None):
"""
Initialize discovery agent.
Args:
gsa_domains: Set of .gov domains from GSA registry
gsa_domain_data: Full GSA domain data with org names
"""
self.gsa_domains = gsa_domains
self.gsa_domain_data = gsa_domain_data or []
self.client = httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (compatible; OralHealthPolicyBot/2.0)"
}
)
# Build fast lookup index
self.domain_lookup = self._build_domain_lookup()
def _build_domain_lookup(self) -> Dict[str, Dict]:
"""Build normalized name -> domain lookup."""
lookup = {}
for item in self.gsa_domain_data:
org_name = item.get("Organization", "")
if org_name:
normalized = self._normalize_name(org_name)
lookup[normalized] = item
return lookup
def _normalize_name(self, name: str) -> str:
"""Normalize jurisdiction name for matching."""
name = name.lower()
# Remove common prefixes/suffixes
name = re.sub(r'\b(city of|town of|county of|village of|township of)\b', '', name)
name = re.sub(r'\b(government|county|city|town)\b', '', name)
name = re.sub(r'[^a-z0-9\s]', '', name)
return ' '.join(name.split()).strip()
def _similarity_score(self, str1: str, str2: str) -> float:
"""Calculate string similarity (0-1)."""
return SequenceMatcher(None, str1, str2).ratio()
def _generate_url_patterns(self, jurisdiction_name: str, state: str,
jurisdiction_type: str) -> List[Tuple[str, float]]:
"""
Generate candidate URLs using common government patterns.
Government URLs typically follow predictable patterns based on:
- Jurisdiction type (county, city, school district)
- Naming conventions (.gov, .us, .org)
- Common formats (cityofX, co.X, Xschools)
Args:
jurisdiction_name: Name (e.g., "Sacramento")
state: State abbreviation (e.g., "CA")
jurisdiction_type: Type (county, municipality, school_district, etc.)
Returns:
List of (url, confidence_score) tuples
"""
# Normalize name for URLs
name_clean = self._normalize_name(jurisdiction_name)
name_slug = name_clean.replace(' ', '')
name_dash = name_clean.replace(' ', '-')
state_lower = state.lower()
patterns = []
if jurisdiction_type == "county":
# County URL patterns
patterns.extend([
(f"https://www.co.{name_slug}.{state_lower}.us", 0.9),
(f"https://{name_slug}county.gov", 0.9),
(f"https://www.{name_slug}county.gov", 0.85),
(f"https://{name_slug}.{state_lower}.gov", 0.8),
(f"https://www.{name_slug}county.us", 0.7),
(f"https://co-{name_dash}.{state_lower}.gov", 0.75),
(f"https://{name_slug}county.org", 0.6),
])
elif jurisdiction_type == "municipality":
# City/town URL patterns
patterns.extend([
(f"https://www.{name_slug}.gov", 0.9),
(f"https://{name_slug}.gov", 0.9),
(f"https://www.cityof{name_slug}.gov", 0.85),
(f"https://cityof{name_slug}.gov", 0.85),
(f"https://www.{name_slug}.{state_lower}.gov", 0.8),
(f"https://{name_slug}.{state_lower}.gov", 0.8),
(f"https://www.{name_dash}.gov", 0.75),
(f"https://www.{name_slug}.us", 0.7),
(f"https://{name_slug}.us", 0.7),
(f"https://www.{name_slug}.org", 0.6),
])
elif jurisdiction_type == "school_district":
# School district patterns
patterns.extend([
(f"https://www.{name_slug}schools.org", 0.8),
(f"https://{name_slug}schools.org", 0.8),
(f"https://www.{name_slug}schools.net", 0.75),
(f"https://www.{name_slug}sd.org", 0.75),
(f"https://{name_slug}sd.org", 0.75),
(f"https://www.{name_slug}.k12.{state_lower}.us", 0.85),
(f"https://{name_slug}.k12.{state_lower}.us", 0.85),
(f"https://www.{name_slug}usd.org", 0.7),
])
elif jurisdiction_type == "township":
# Township patterns
patterns.extend([
(f"https://www.{name_slug}township.gov", 0.8),
(f"https://{name_slug}township.gov", 0.8),
(f"https://www.{name_slug}.{state_lower}.gov", 0.75),
(f"https://{name_slug}twp.org", 0.7),
])
else:
# Generic patterns for special districts, etc.
patterns.extend([
(f"https://www.{name_slug}.gov", 0.7),
(f"https://{name_slug}.gov", 0.7),
(f"https://www.{name_slug}.org", 0.5),
(f"https://{name_slug}.org", 0.5),
])
return patterns
def _match_gsa_domain(self, jurisdiction_name: str, state: str) -> Optional[Tuple[str, float]]:
"""
Match jurisdiction to GSA .gov domain registry.
Uses exact and fuzzy matching against the authoritative list.
Args:
jurisdiction_name: Jurisdiction name
state: State name or abbreviation
Returns:
(domain_url, confidence) tuple or None
"""
normalized_name = self._normalize_name(jurisdiction_name)
# Try exact match first
if normalized_name in self.domain_lookup:
domain_info = self.domain_lookup[normalized_name]
domain = domain_info.get("Domain Name", "")
if domain:
return (f"https://{domain}", 1.0)
# Fuzzy matching with state filter
best_match = None
best_score = 0.0
for org_name, domain_info in self.domain_lookup.items():
# Filter by state
domain_state = domain_info.get("State", "")
if domain_state and domain_state.lower() not in [state.lower(), state[:2].lower()]:
continue
# Calculate similarity
score = self._similarity_score(normalized_name, org_name)
if score > best_score and score > 0.75: # High threshold for fuzzy
best_score = score
domain = domain_info.get("Domain Name", "")
if domain:
best_match = (f"https://{domain}", score * 0.95) # Slight penalty for fuzzy
return best_match
async def _verify_url(self, url: str) -> bool:
"""
Verify URL is accessible.
Args:
url: URL to check
Returns:
True if accessible (status < 400)
"""
try:
response = await self.client.head(url, timeout=10.0)
return response.status_code < 400
except:
# Some servers don't support HEAD, try GET
try:
response = await self.client.get(url, timeout=10.0)
return response.status_code < 400
except:
return False
async def crawl_for_minutes(self, homepage_url: str) -> Optional[str]:
"""
Crawl homepage to find meeting minutes/agendas page.
Args:
homepage_url: Homepage URL
Returns:
Minutes page URL or None
"""
try:
response = await self.client.get(homepage_url, timeout=15.0)
if response.status_code >= 400:
return None
soup = BeautifulSoup(response.text, 'html.parser')
# Search for links containing keywords
for link in soup.find_all('a', href=True):
href = link.get('href', '')
text = link.get_text(strip=True).lower()
# Check text and href for keywords
combined = f"{text} {href}".lower()
if any(keyword in combined for keyword in self.MINUTES_KEYWORDS):
# Construct absolute URL
full_url = urljoin(homepage_url, href)
return full_url
return None
except Exception as e:
logger.debug(f"Error crawling {homepage_url}: {e}")
return None
async def detect_cms_platform(self, url: str) -> Optional[str]:
"""
Detect government CMS platform.
Args:
url: URL to check
Returns:
CMS platform name or None
"""
try:
response = await self.client.get(url, timeout=15.0)
if response.status_code >= 400:
return None
html = response.text.lower()
final_url = str(response.url).lower()
# Check URL and HTML for CMS signatures
for cms, signatures in self.CMS_SIGNATURES.items():
if any(sig in final_url or sig in html for sig in signatures):
return cms
return None
except Exception as e:
logger.debug(f"Error detecting CMS for {url}: {e}")
return None
def _calculate_confidence(
self,
base_confidence: float,
is_gov_domain: bool,
has_minutes_url: bool,
has_cms_platform: bool
) -> float:
"""Calculate final confidence score."""
confidence = base_confidence
# Bonuses for positive signals
if is_gov_domain:
confidence = min(confidence + 0.1, 1.0)
if has_minutes_url:
confidence = min(confidence + 0.1, 1.0)
if has_cms_platform:
confidence = min(confidence + 0.05, 1.0)
return confidence
async def _analyze_url(
self,
url: str,
jurisdiction_id: str,
jurisdiction_name: str,
state: str,
discovery_method: str,
base_confidence: float
) -> JurisdictionURL:
"""
Analyze discovered URL for minutes and CMS.
Args:
url: Homepage URL
jurisdiction_id: Jurisdiction ID
jurisdiction_name: Jurisdiction name
state: State
discovery_method: How URL was found
base_confidence: Base confidence score
Returns:
Complete JurisdictionURL object
"""
# Check if .gov domain
domain = urlparse(url).netloc
is_gov_domain = domain in self.gsa_domains or domain.endswith('.gov')
# Find minutes page
minutes_url = await self.crawl_for_minutes(url)
# Detect CMS
cms_platform = await self.detect_cms_platform(url)
# Calculate confidence
confidence = self._calculate_confidence(
base_confidence=base_confidence,
is_gov_domain=is_gov_domain,
has_minutes_url=minutes_url is not None,
has_cms_platform=cms_platform is not None
)
return JurisdictionURL(
jurisdiction_id=jurisdiction_id,
jurisdiction_name=jurisdiction_name,
state=state,
homepage_url=url,
minutes_url=minutes_url,
cms_platform=cms_platform,
is_gov_domain=is_gov_domain,
discovery_method=discovery_method,
confidence_score=confidence,
last_verified=datetime.now()
)
async def discover_jurisdiction(
self,
jurisdiction_id: str,
jurisdiction_name: str,
state: str,
jurisdiction_type: str
) -> JurisdictionURL:
"""
Discover URLs for a jurisdiction using pattern-based approach.
Strategy:
1. Try GSA domain registry matching (highest confidence)
2. Try common URL patterns (good confidence)
3. Verify and analyze discovered URLs
Args:
jurisdiction_id: Unique ID (FIPS code)
jurisdiction_name: Name
state: State name
jurisdiction_type: Type (county, municipality, etc.)
Returns:
JurisdictionURL with discovered info
"""
logger.debug(f"Discovering: {jurisdiction_name}, {state} ({jurisdiction_type})")
# Strategy 1: GSA domain matching (most reliable)
gsa_match = self._match_gsa_domain(jurisdiction_name, state)
if gsa_match:
url, confidence = gsa_match
if await self._verify_url(url):
logger.info(f"✓ GSA match: {jurisdiction_name} -> {url}")
return await self._analyze_url(
url, jurisdiction_id, jurisdiction_name,
state, "gsa_registry", confidence
)
# Strategy 2: URL pattern matching
patterns = self._generate_url_patterns(jurisdiction_name, state, jurisdiction_type)
for url, pattern_confidence in patterns:
if await self._verify_url(url):
logger.info(f"✓ Pattern match: {jurisdiction_name} -> {url}")
return await self._analyze_url(
url, jurisdiction_id, jurisdiction_name,
state, "pattern_match", pattern_confidence
)
# No valid URL found
logger.warning(f"✗ No URL found for {jurisdiction_name}, {state}")
return JurisdictionURL(
jurisdiction_id=jurisdiction_id,
jurisdiction_name=jurisdiction_name,
state=state,
discovery_method="not_found"
)
async def close(self):
"""Close HTTP client."""
await self.client.aclose()