Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Unified Search API | |
| LinkedIn-style search across contacts, meetings, organizations, and causes | |
| Uses hybrid approach: PostgreSQL (primary, fast) + HuggingFace Search API + DuckDB (fallback) | |
| """ | |
| from fastapi import APIRouter, Query, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from typing import Optional, List, Dict, Any | |
| from pathlib import Path | |
| import pandas as pd | |
| import duckdb | |
| from loguru import logger | |
| import re | |
| import os | |
| import sys | |
| import requests | |
| from functools import lru_cache | |
| from datetime import datetime, timedelta | |
| from api.errors import ErrorDetail, parse_error | |
| # Import PostgreSQL search functions (primary) | |
| from api.routes import search_postgres | |
| # Import HuggingFace Search helpers | |
| from api.routes.hf_search import ( | |
| search_contacts_hf, | |
| search_organizations_hf, | |
| search_jurisdictions_hf, | |
| is_dataset_indexed | |
| ) | |
| router = APIRouter(tags=["search"]) | |
| # Paths to gold datasets | |
| GOLD_DIR = Path("data/gold") | |
| # Detect deployment environment | |
| IS_HF_SPACES = os.getenv("HF_SPACES") == "1" | |
| HF_ORGANIZATION = os.getenv('HF_ORGANIZATION', 'CommunityOne') | |
| # Cache for count queries (TTL: 1 hour) | |
| _count_cache = {} | |
| _count_cache_ttl = {} | |
| # In-memory DataFrame cache for HuggingFace datasets (TTL: 5 minutes) | |
| # Reduces remote HTTP requests from 2-3s to <10ms per search | |
| _dataframe_cache: Dict[str, pd.DataFrame] = {} | |
| _dataframe_cache_ttl: Dict[str, datetime] = {} | |
| DATAFRAME_CACHE_TTL = timedelta(minutes=5) | |
| # Every.org API config (fallback only) | |
| EVERYORG_API_KEY = os.getenv('EVERYORG_API_KEY', '') | |
| EVERYORG_API_BASE = "https://partners.every.org/v0.2" | |
| def load_parquet_cached(url: str) -> pd.DataFrame: | |
| """ | |
| Load parquet file with in-memory caching to avoid repeated HTTP requests. | |
| Cache TTL: 5 minutes (balances speed vs freshness) | |
| Reduces search latency from 2-3s to <10ms per query. | |
| Args: | |
| url: URL to parquet file (local path or HuggingFace URL) | |
| Returns: | |
| pandas DataFrame | |
| """ | |
| now = datetime.now() | |
| # Check cache | |
| if url in _dataframe_cache: | |
| cache_time = _dataframe_cache_ttl.get(url) | |
| if cache_time and (now - cache_time) < DATAFRAME_CACHE_TTL: | |
| logger.debug(f"🚀 Cache hit for {url}") | |
| return _dataframe_cache[url] | |
| # Cache miss - load from source | |
| logger.info(f"📥 Loading parquet from {url}") | |
| df = pd.read_parquet(url) | |
| # Store in cache | |
| _dataframe_cache[url] = df | |
| _dataframe_cache_ttl[url] = now | |
| logger.debug(f"💾 Cached {len(df)} rows from {url}") | |
| return df | |
| def get_hf_dataset_url(dataset_name: str) -> str: | |
| """ | |
| Convert dataset name to HuggingFace parquet URL. | |
| HuggingFace Datasets library stores parquet files in the standard format: | |
| data/train-00000-of-00001.parquet | |
| Examples: | |
| states-ma-contacts-local-officials -> | |
| https://huggingface.co/datasets/CommunityOne/states-ma-contacts-local-officials/resolve/main/data/train-00000-of-00001.parquet | |
| """ | |
| return f"https://huggingface.co/datasets/{HF_ORGANIZATION}/{dataset_name}/resolve/main/data/train-00000-of-00001.parquet" | |
| def get_data_source(file_path: Path, use_remote: bool = False) -> str: | |
| """ | |
| Get data source (local path or remote URL) based on environment. | |
| Args: | |
| file_path: Local file path (e.g., data/gold/states/MA/contacts_local_officials.parquet) | |
| use_remote: Force remote URL even in local environment | |
| Returns: | |
| File path string (local) or HuggingFace URL (remote) | |
| """ | |
| if not IS_HF_SPACES and not use_remote: | |
| return str(file_path) | |
| # Convert local path to HuggingFace dataset name | |
| # data/gold/states/MA/contacts_local_officials.parquet -> states-ma-contacts-local-officials | |
| parts = file_path.parts | |
| if 'states' in parts: | |
| state_idx = parts.index('states') | |
| state = parts[state_idx + 1].lower() | |
| filename = parts[-1].replace('.parquet', '').replace('_', '-') | |
| dataset_name = f"states-{state}-{filename}" | |
| elif 'national' in parts: | |
| filename = parts[-1].replace('.parquet', '').replace('_', '-') | |
| dataset_name = f"national-{filename}" | |
| elif 'reference' in parts: | |
| filename = parts[-1].replace('.parquet', '').replace('_', '-') | |
| dataset_name = f"reference-{filename}" | |
| else: | |
| # Fallback to local | |
| return str(file_path) | |
| return get_hf_dataset_url(dataset_name) | |
| def fetch_form990_data(ein: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Fetch enrichment data from ProPublica Nonprofit Explorer (FREE!) | |
| Uses their API to get website and mission from Form 990 filings | |
| """ | |
| if not ein: | |
| return None | |
| try: | |
| clean_ein = str(ein).replace('-', '').zfill(9) | |
| url = f"https://projects.propublica.org/nonprofits/api/v2/organizations/{clean_ein}.json" | |
| response = requests.get(url, timeout=3) | |
| if response.status_code == 200: | |
| data = response.json() | |
| org = data.get('organization', {}) | |
| filings = data.get('filings_with_data', []) | |
| # Get most recent filing data | |
| website = None | |
| mission = None | |
| if filings: | |
| # ProPublica provides website from most recent filing | |
| latest = filings[0] | |
| # Note: ProPublica API doesn't directly expose website field | |
| # but we can use their organization name and data as fallback | |
| pass | |
| return { | |
| 'website': website, # ProPublica doesn't expose this in API | |
| 'mission': None, # Would need to parse PDF | |
| 'source': 'propublica', | |
| 'last_updated': datetime.now().isoformat(), | |
| 'tax_year': filings[0].get('tax_prd_yr') if filings else None | |
| } | |
| except Exception as e: | |
| logger.debug(f"ProPublica lookup failed for EIN {ein}: {e}") | |
| return None | |
| def fetch_everyorg_data(ein: str) -> Optional[Dict[str, Any]]: | |
| """Fetch enrichment data from Every.org API (cached) - FALLBACK ONLY""" | |
| if not EVERYORG_API_KEY or not ein: | |
| return None | |
| try: | |
| # Format EIN (remove dashes, ensure 9 digits) | |
| clean_ein = str(ein).replace('-', '').zfill(9) | |
| url = f"{EVERYORG_API_BASE}/nonprofit/{clean_ein}" | |
| headers = { | |
| "Authorization": f"Bearer {EVERYORG_API_KEY}", | |
| "Accept": "application/json" | |
| } | |
| response = requests.get(url, headers=headers, timeout=3) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data and 'data' in data and 'nonprofit' in data['data']: | |
| nonprofit = data['data']['nonprofit'] | |
| tags = data['data'].get('nonprofitTags', []) | |
| causes = [tag.get('tagName') for tag in tags if tag.get('tagName')] | |
| return { | |
| 'mission': nonprofit.get('description') or nonprofit.get('descriptionLong'), | |
| 'website': nonprofit.get('websiteUrl'), | |
| 'logo_url': nonprofit.get('logoUrl'), | |
| 'profile_url': nonprofit.get('profileUrl'), | |
| 'causes': causes[:5], # Limit to top 5 causes | |
| 'source': 'everyorg', | |
| 'last_updated': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.debug(f"Every.org lookup failed for EIN {ein}: {e}") | |
| return None | |
| def get_enrichment_data(ein: str, existing_data: Optional[Dict] = None) -> Dict[str, Any]: | |
| """ | |
| Get enrichment data with intelligent backfill strategy | |
| Priority: | |
| 1. Existing form_990_* data (if recent) | |
| 2. GivingTuesday 990 XML (future: direct S3 access) | |
| 3. ProPublica API (current fallback) | |
| 4. Every.org API (last resort) | |
| Tracks source and update time for incremental processing | |
| """ | |
| result = { | |
| 'website': None, | |
| 'mission': None, | |
| 'logo_url': None, | |
| 'profile_url': None, | |
| 'causes': [], | |
| 'data_sources': [] | |
| } | |
| # Check existing data first (skip if older than 30 days) | |
| if existing_data: | |
| cutoff_date = datetime.now() - timedelta(days=30) | |
| # Check enrichment data (from any source: form_990, bigquery, etc.) | |
| if existing_data.get('enrichment_website'): | |
| last_updated = existing_data.get('enrichment_last_updated') | |
| if not last_updated or (isinstance(last_updated, str) and datetime.fromisoformat(last_updated) > cutoff_date): | |
| result['website'] = existing_data['enrichment_website'] | |
| result['data_sources'].append('cached') | |
| if existing_data.get('enrichment_mission'): | |
| result['mission'] = existing_data['enrichment_mission'] | |
| if 'cached' not in result['data_sources']: | |
| result['data_sources'].append('cached') | |
| # Try Every.org for missing fields (keeps logo and causes which 990 doesn't have) | |
| if not result['website'] or not result['mission']: | |
| everyorg_data = fetch_everyorg_data(ein) | |
| if everyorg_data: | |
| if not result['website'] and everyorg_data.get('website'): | |
| result['website'] = everyorg_data['website'] | |
| result['data_sources'].append('everyorg') | |
| if not result['mission'] and everyorg_data.get('mission'): | |
| result['mission'] = everyorg_data['mission'] | |
| result['data_sources'].append('everyorg') | |
| # Always get logo and causes from Every.org | |
| result['logo_url'] = everyorg_data.get('logo_url') | |
| result['profile_url'] = everyorg_data.get('profile_url') | |
| result['causes'] = everyorg_data.get('causes', []) | |
| if result['logo_url'] or result['causes']: | |
| if 'everyorg' not in result['data_sources']: | |
| result['data_sources'].append('everyorg') | |
| return result | |
| class SearchResult: | |
| """Unified search result""" | |
| def __init__(self, | |
| result_type: str, | |
| title: str, | |
| subtitle: str, | |
| description: str, | |
| url: str, | |
| score: float, | |
| metadata: Dict[str, Any]): | |
| self.result_type = result_type | |
| self.title = title | |
| self.subtitle = subtitle | |
| self.description = description | |
| self.url = url | |
| self.score = score | |
| self.metadata = metadata | |
| def to_dict(self): | |
| return { | |
| "type": self.result_type, | |
| "title": self.title, | |
| "subtitle": self.subtitle, | |
| "description": self.description, | |
| "url": self.url, | |
| "score": self.score, | |
| "metadata": self.metadata | |
| } | |
| def convert_pg_result(pg_result: search_postgres.SearchResult) -> 'SearchResult': | |
| """Convert PostgreSQL SearchResult dataclass to SearchResult class""" | |
| return SearchResult( | |
| result_type=pg_result.result_type, | |
| title=pg_result.title, | |
| subtitle=pg_result.subtitle, | |
| description=pg_result.description, | |
| url=pg_result.url, | |
| score=pg_result.score, | |
| metadata=pg_result.metadata | |
| ) | |
| def calculate_relevance_score(text: str, query: str) -> float: | |
| """Calculate relevance score for text matching query""" | |
| if not text or not query: | |
| return 0.0 | |
| text_lower = text.lower() | |
| query_lower = query.lower() | |
| # Exact match gets highest score | |
| if query_lower in text_lower: | |
| score = 1.0 | |
| # Boost if it's at the start | |
| if text_lower.startswith(query_lower): | |
| score += 0.5 | |
| return min(score, 2.0) | |
| # Partial word matches | |
| query_words = query_lower.split() | |
| text_words = text_lower.split() | |
| matches = sum(1 for qw in query_words if any(qw in tw for tw in text_words)) | |
| return matches / len(query_words) if query_words else 0.0 | |
| def search_contacts_duckdb(query: str, state: Optional[str] = None, limit: int = 10) -> List[SearchResult]: | |
| """ | |
| Search contacts using DuckDB (supports local files or remote HTTP parquet). | |
| This is the fallback when HF Search API is unavailable. | |
| Supports browse mode when query is empty. | |
| """ | |
| results = [] | |
| # Determine if this is browse mode (no query) or search mode | |
| is_browse_mode = not query or query.strip() == '' | |
| try: | |
| # Initialize DuckDB connection | |
| conn = duckdb.connect() | |
| # Search 1: State Officials (OpenStates - state legislators, mayors, etc.) | |
| if state: | |
| officials_file_path = GOLD_DIR / "states" / state / "contacts_officials.parquet" | |
| officials_file_paths = [officials_file_path] | |
| else: | |
| officials_file_paths = list(GOLD_DIR.glob("states/*/contacts_officials.parquet"))[:5] | |
| logger.info(f"Searching {len(officials_file_paths)} state official contact files (OpenStates) - browse_mode={is_browse_mode}") | |
| for file_path in officials_file_paths: | |
| if not file_path.exists(): | |
| continue | |
| # Get data source (local or remote URL) | |
| data_source = get_data_source(file_path, use_remote=IS_HF_SPACES) | |
| try: | |
| if is_browse_mode: | |
| # Browse mode: return all officials, prioritize mayors | |
| sql = """ | |
| SELECT | |
| full_name as name, | |
| role_type as title, | |
| city_jurisdiction as jurisdiction, | |
| state, | |
| email, | |
| phone, | |
| CASE | |
| WHEN LOWER(role_type) = 'mayor' THEN 2.0 | |
| WHEN LOWER(role_type) LIKE '%council%' THEN 1.8 | |
| WHEN LOWER(role_type) LIKE '%commission%' THEN 1.7 | |
| ELSE 1.5 | |
| END as score | |
| FROM read_parquet(?) | |
| ORDER BY score DESC, full_name ASC | |
| LIMIT ? | |
| """ | |
| rows = conn.execute(sql, [data_source, limit]).fetchall() | |
| else: | |
| # Search mode: relevance scoring | |
| sql = """ | |
| SELECT | |
| full_name as name, | |
| role_type as title, | |
| city_jurisdiction as jurisdiction, | |
| state, | |
| email, | |
| phone, | |
| GREATEST( | |
| CASE | |
| WHEN LOWER(full_name) LIKE LOWER(?) THEN 1.5 | |
| WHEN LOWER(full_name) LIKE LOWER(?) THEN 1.0 | |
| ELSE 0.0 | |
| END, | |
| CASE | |
| WHEN LOWER(role_type) LIKE LOWER(?) THEN 1.5 | |
| WHEN LOWER(role_type) LIKE LOWER(?) THEN 1.0 | |
| ELSE 0.0 | |
| END, | |
| CASE | |
| WHEN LOWER(city_jurisdiction) LIKE LOWER(?) THEN 1.5 | |
| WHEN LOWER(city_jurisdiction) LIKE LOWER(?) THEN 1.0 | |
| ELSE 0.0 | |
| END, | |
| CASE | |
| WHEN LOWER(jurisdiction_name) LIKE LOWER(?) THEN 1.5 | |
| WHEN LOWER(jurisdiction_name) LIKE LOWER(?) THEN 1.0 | |
| ELSE 0.0 | |
| END | |
| ) as score | |
| FROM read_parquet(?) | |
| WHERE LOWER(full_name) LIKE LOWER(?) | |
| OR LOWER(role_type) LIKE LOWER(?) | |
| OR LOWER(city_jurisdiction) LIKE LOWER(?) | |
| OR LOWER(jurisdiction_name) LIKE LOWER(?) | |
| ORDER BY score DESC | |
| LIMIT ? | |
| """ | |
| query_pattern = f'%{query}%' | |
| query_start = f'{query}%' | |
| rows = conn.execute(sql, [ | |
| query_start, query_pattern, # name scoring | |
| query_start, query_pattern, # role_type scoring | |
| query_start, query_pattern, # city_jurisdiction scoring | |
| query_start, query_pattern, # jurisdiction_name scoring | |
| data_source, # file path or URL | |
| query_pattern, query_pattern, query_pattern, query_pattern, # WHERE clause | |
| limit | |
| ]).fetchall() | |
| # Convert to SearchResult objects | |
| for row in rows: | |
| name, title, jurisdiction, state_code, email, phone, score = row | |
| if score > 0.3: # Relevance threshold | |
| contact_info = [] | |
| if email: | |
| contact_info.append(f"📧 {email}") | |
| if phone: | |
| contact_info.append(f"📞 {phone}") | |
| description = f"State official in {jurisdiction}" if jurisdiction else f"State official in {state_code}" | |
| if contact_info: | |
| description += f" • {' • '.join(contact_info)}" | |
| results.append(SearchResult( | |
| result_type="contact", | |
| title=name if name else "Unknown", | |
| subtitle=f"{title.title() if title else 'Official'} - {jurisdiction or state_code}", | |
| description=description, | |
| url=f"/people/{name.replace(' ', '-') if name else 'unknown'}", | |
| score=score, | |
| metadata={ | |
| "title": title, | |
| "jurisdiction": jurisdiction, | |
| "state": state_code, | |
| "name": name, | |
| "email": email, | |
| "phone": phone, | |
| "contact_type": "state_official", | |
| "data_source": "OpenStates" | |
| } | |
| )) | |
| except Exception as e: | |
| logger.debug(f"Error searching state officials {file_path}: {e}") | |
| # Search 2: Local Officials (from meeting transcripts) | |
| if state: | |
| local_file_path = GOLD_DIR / "states" / state / "contacts_local_officials.parquet" | |
| local_file_paths = [local_file_path] | |
| else: | |
| local_file_paths = list(GOLD_DIR.glob("states/*/contacts_local_officials.parquet"))[:5] | |
| logger.info(f"Searching {len(local_file_paths)} local official contact files (meeting transcripts)") | |
| for file_path in local_file_paths: | |
| # Get data source (local or remote URL) | |
| data_source = get_data_source(file_path, use_remote=IS_HF_SPACES) | |
| try: | |
| # SQL query with relevance scoring across name, title, jurisdiction | |
| sql = """ | |
| SELECT | |
| name, | |
| title, | |
| jurisdiction, | |
| state, | |
| GREATEST( | |
| CASE | |
| WHEN LOWER(name) LIKE LOWER(?) THEN 1.5 | |
| WHEN LOWER(name) LIKE LOWER(?) THEN 1.0 | |
| ELSE 0.0 | |
| END, | |
| CASE | |
| WHEN LOWER(title) LIKE LOWER(?) THEN 1.5 | |
| WHEN LOWER(title) LIKE LOWER(?) THEN 1.0 | |
| ELSE 0.0 | |
| END, | |
| CASE | |
| WHEN LOWER(jurisdiction) LIKE LOWER(?) THEN 1.5 | |
| WHEN LOWER(jurisdiction) LIKE LOWER(?) THEN 1.0 | |
| ELSE 0.0 | |
| END | |
| ) as score | |
| FROM read_parquet(?) | |
| WHERE LOWER(name) LIKE LOWER(?) | |
| OR LOWER(title) LIKE LOWER(?) | |
| OR LOWER(jurisdiction) LIKE LOWER(?) | |
| ORDER BY score DESC | |
| LIMIT ? | |
| """ | |
| query_pattern = f'%{query}%' | |
| query_start = f'{query}%' | |
| rows = conn.execute(sql, [ | |
| query_start, query_pattern, # name scoring | |
| query_start, query_pattern, # title scoring | |
| query_start, query_pattern, # jurisdiction scoring | |
| data_source, # file path or URL | |
| query_pattern, query_pattern, query_pattern, # WHERE clause | |
| limit | |
| ]).fetchall() | |
| # Convert to SearchResult objects | |
| for row in rows: | |
| name, title, jurisdiction, state_code, score = row | |
| if score > 0.3: # Relevance threshold | |
| results.append(SearchResult( | |
| result_type="contact", | |
| title=name if name else "Unknown", | |
| subtitle=f"{title} - {jurisdiction}, {state_code}", | |
| description=f"Local official in {jurisdiction}", | |
| url=f"/people/{name.replace(' ', '-') if name else 'unknown'}", | |
| score=score, | |
| metadata={ | |
| "title": title, | |
| "jurisdiction": jurisdiction, | |
| "state": state_code, | |
| "name": name | |
| } | |
| )) | |
| except Exception as e: | |
| logger.debug(f"Error searching {file_path}: {e}") | |
| # Search 3: Nonprofit Officers from state directories | |
| nonprofit_files = [] | |
| # If state specified, search that state's directory | |
| if state: | |
| state_nonprofit_file = GOLD_DIR / "states" / state / "contacts_nonprofit_officers.parquet" | |
| nonprofit_files.append(state_nonprofit_file) | |
| else: | |
| # Search all state directories | |
| for state_dir in (GOLD_DIR / "states").glob("*/"): | |
| state_file = state_dir / "contacts_nonprofit_officers.parquet" | |
| nonprofit_files.append(state_file) | |
| for nonprofit_file in nonprofit_files: | |
| # Get data source (local or remote URL) | |
| nonprofit_source = get_data_source(nonprofit_file, use_remote=IS_HF_SPACES) | |
| try: | |
| logger.info(f"Searching nonprofit officers: {nonprofit_source}") | |
| officer_sql = """ | |
| SELECT | |
| name, | |
| title, | |
| organization_name, | |
| city, | |
| state, | |
| compensation, | |
| GREATEST( | |
| CASE | |
| WHEN LOWER(name) LIKE LOWER(?) THEN 1.5 | |
| WHEN LOWER(name) LIKE LOWER(?) THEN 1.0 | |
| ELSE 0.0 | |
| END, | |
| CASE | |
| WHEN LOWER(title) LIKE LOWER(?) THEN 1.0 | |
| WHEN LOWER(title) LIKE LOWER(?) THEN 0.5 | |
| ELSE 0.0 | |
| END, | |
| CASE | |
| WHEN LOWER(organization_name) LIKE LOWER(?) THEN 1.0 | |
| WHEN LOWER(organization_name) LIKE LOWER(?) THEN 0.5 | |
| ELSE 0.0 | |
| END | |
| ) AS score | |
| FROM read_parquet(?) | |
| WHERE (LOWER(name) LIKE LOWER(?) | |
| OR LOWER(title) LIKE LOWER(?) | |
| OR LOWER(organization_name) LIKE LOWER(?)) | |
| """ | |
| query_pattern = f'%{query}%' | |
| query_start = f'{query}%' | |
| params = [ | |
| query_start, query_pattern, # name scoring | |
| query_start, query_pattern, # title scoring | |
| query_start, query_pattern, # organization scoring | |
| nonprofit_source, # file path or URL | |
| query_pattern, query_pattern, query_pattern # WHERE clause | |
| ] | |
| if state: | |
| officer_sql += " AND LOWER(state) = LOWER(?)" | |
| params.append(state) | |
| officer_sql += " ORDER BY score DESC, compensation DESC NULLS LAST LIMIT ?" | |
| params.append(limit) | |
| officer_rows = conn.execute(officer_sql, params).fetchall() | |
| for row in officer_rows: | |
| name, title, org_name, city, state_code, compensation, score = row | |
| if score > 0.3: | |
| comp_text = f" (${compensation:,.0f})" if compensation else "" | |
| results.append(SearchResult( | |
| result_type="contact", | |
| title=name if name else "Unknown", | |
| subtitle=f"{title} - {org_name}{comp_text}", | |
| description=f"Nonprofit officer in {city}, {state_code}", | |
| url=f"/nonprofits?name={org_name.replace(' ', '-') if org_name else 'unknown'}", | |
| score=score, | |
| metadata={ | |
| "title": title, | |
| "organization": org_name, | |
| "city": city, | |
| "state": state_code, | |
| "compensation": compensation, | |
| "contact_type": "nonprofit_officer" | |
| } | |
| )) | |
| logger.info(f"Found {len([r for r in results if r.metadata.get('contact_type') == 'nonprofit_officer'])} nonprofit officer results") | |
| except Exception as e: | |
| logger.debug(f"Error searching nonprofit officers: {e}") | |
| conn.close() | |
| # Sort all results by score and limit | |
| results.sort(key=lambda x: x.score, reverse=True) | |
| logger.info(f"DuckDB search found {len(results)} contacts for query '{query}'") | |
| return results[:limit] | |
| except Exception as e: | |
| logger.error(f"Contact search error: {e}") | |
| return results | |
| def search_contacts(query: str, state: Optional[str] = None, limit: int = 10) -> List[SearchResult]: | |
| """ | |
| HYBRID SEARCH: Search local officials AND nonprofit officers. | |
| Strategy: | |
| 1. Try HuggingFace Search API first (fast, server-side indexed) - HF Spaces only | |
| 2. Fall back to DuckDB (local files or remote HTTP parquet) | |
| Args: | |
| query: Search text (name, title, organization, etc.) | |
| state: Optional 2-letter state code filter | |
| limit: Maximum results to return | |
| Returns: | |
| List of SearchResult objects sorted by relevance | |
| """ | |
| logger.info(f"🔎 search_contacts() called - query={query!r}, state={state!r}, limit={limit}, IS_HF_SPACES={IS_HF_SPACES}") | |
| # STRATEGY 1: Try HuggingFace Search API (fast text search) | |
| if query and IS_HF_SPACES: | |
| logger.info(f"🔍 Trying HF Search API for '{query}' (state={state})") | |
| try: | |
| hf_results = search_contacts_hf(query, state, limit=limit) | |
| if hf_results: | |
| logger.info(f"✅ HF Search API returned {len(hf_results)} results") | |
| # Convert HF results to SearchResult objects | |
| results = [] | |
| for row in hf_results: | |
| source_type = row.get('source', 'contact') | |
| name = row.get('name', 'Unknown') | |
| title = row.get('title', '') | |
| jurisdiction = row.get('jurisdiction', row.get('organization_name', '')) | |
| state_code = row.get('state', state or '') | |
| results.append(SearchResult( | |
| result_type="contact", | |
| title=name, | |
| subtitle=f"{title} - {jurisdiction}, {state_code}", | |
| description=f"{'Local official' if source_type == 'local_officials' else 'Nonprofit officer'} in {jurisdiction}", | |
| url=f"/people/{name.replace(' ', '-')}", | |
| score=1.0, | |
| metadata={ | |
| "title": title, | |
| "jurisdiction": jurisdiction, | |
| "state": state_code, | |
| "name": name, | |
| "source": source_type | |
| } | |
| )) | |
| return results[:limit] | |
| except Exception as e: | |
| logger.warning(f"HF Search API failed, falling back to DuckDB: {e}") | |
| # STRATEGY 2: Fall back to DuckDB (works with local or remote parquet) | |
| logger.info(f"🔍 Using DuckDB {'remote' if IS_HF_SPACES else 'local'} search for '{query}'") | |
| return search_contacts_duckdb(query, state, limit) | |
| def search_meetings(query: str, state: Optional[str] = None, limit: int = 10) -> List[SearchResult]: | |
| """Search meeting transcripts and agendas""" | |
| results = [] | |
| try: | |
| # Search state event/meeting files (try new naming first, fallback to old) | |
| if state: | |
| meeting_files = list(GOLD_DIR.glob(f"states/{state}/events.parquet")) | |
| if not meeting_files: | |
| meeting_files = list(GOLD_DIR.glob(f"states/{state}/events_events.parquet")) | |
| if not meeting_files: | |
| meeting_files = list(GOLD_DIR.glob(f"states/{state}/meetings.parquet")) | |
| else: | |
| meeting_files = list(GOLD_DIR.glob("states/*/events.parquet")) | |
| if not meeting_files: | |
| meeting_files = list(GOLD_DIR.glob("states/*/events_events.parquet")) | |
| if not meeting_files: | |
| meeting_files = list(GOLD_DIR.glob("states/*/meetings.parquet")) | |
| for file_path in meeting_files[:5]: # Limit for performance | |
| try: | |
| df = pd.read_parquet(file_path) | |
| state_code = file_path.parent.name | |
| # Detect schema - different files have different column names | |
| columns = set(df.columns) | |
| # Map column names (handle LocalView vs CityScrapers vs other formats) | |
| title_col = 'vid_title' if 'vid_title' in columns else ('event_title' if 'event_title' in columns else 'title') | |
| body_col = 'caption_text_clean' if 'caption_text_clean' in columns else ('caption_text' if 'caption_text' in columns else ('full_text' if 'full_text' in columns else 'body')) | |
| jurisdiction_col = 'place_name' if 'place_name' in columns else ('jurisdiction_name' if 'jurisdiction_name' in columns else 'jurisdiction') | |
| date_col = 'meeting_date' if 'meeting_date' in columns else 'date' | |
| id_col = 'vid_id' if 'vid_id' in columns else ('meeting_id' if 'meeting_id' in columns else 'id') | |
| # Search in title, body, jurisdiction | |
| for _, row in df.iterrows(): | |
| title = str(row.get(title_col, '')) | |
| body = str(row.get(body_col, ''))[:500] # First 500 chars | |
| jurisdiction = str(row.get(jurisdiction_col, '')) | |
| meeting_date = str(row.get(date_col, '')) | |
| meeting_id = str(row.get(id_col, '')) | |
| score = max( | |
| calculate_relevance_score(title, query), | |
| calculate_relevance_score(body, query) * 0.8, # Body matches slightly lower | |
| calculate_relevance_score(jurisdiction, query) * 0.6 | |
| ) | |
| if score > 0.3: | |
| # Extract snippet around query | |
| snippet = body[:200] + "..." if len(body) > 200 else body | |
| results.append(SearchResult( | |
| result_type="meeting", | |
| title=title, | |
| subtitle=f"{jurisdiction}, {state_code} - {meeting_date}", | |
| description=snippet, | |
| url=f"/documents?meeting_id={meeting_id}", | |
| score=score, | |
| metadata={ | |
| "jurisdiction": jurisdiction, | |
| "state": state_code, | |
| "date": meeting_date, | |
| "meeting_id": meeting_id | |
| } | |
| )) | |
| except Exception as e: | |
| logger.debug(f"Error searching {file_path}: {e}") | |
| except Exception as e: | |
| logger.error(f"Meeting search error: {e}") | |
| results.sort(key=lambda x: x.score, reverse=True) | |
| return results[:limit] | |
| def count_organizations(state: Optional[str] = None, ntee_code: Optional[str] = None, query: Optional[str] = None) -> int: | |
| """Count total organizations matching criteria (for pagination) - cached""" | |
| # Create cache key | |
| cache_key = f"count_{state}_{ntee_code}_{query}" | |
| # Check cache (1 hour TTL) | |
| now = datetime.now() | |
| if cache_key in _count_cache: | |
| cached_time = _count_cache_ttl.get(cache_key) | |
| if cached_time and (now - cached_time).total_seconds() < 3600: | |
| return _count_cache[cache_key] | |
| try: | |
| # Determine file path | |
| if state: | |
| file_pattern = f"{GOLD_DIR}/states/{state}/nonprofits_organizations.parquet" | |
| else: | |
| file_pattern = f"{GOLD_DIR}/national/nonprofits_organizations.parquet" | |
| file_path = Path(file_pattern) | |
| if not file_path.exists(): | |
| return 0 | |
| conn = duckdb.connect() | |
| # Detect schema | |
| columns_query = f"DESCRIBE SELECT * FROM '{file_path}' LIMIT 0" | |
| available_columns = set([row[0] for row in conn.execute(columns_query).fetchall()]) | |
| name_col = 'organization_name' if 'organization_name' in available_columns else 'name' | |
| ntee_col = 'ntee_code' if 'ntee_code' in available_columns else 'ntee_cd' | |
| # Build WHERE clause | |
| where_clauses = [] | |
| params = [] | |
| if query and query.strip(): | |
| where_clauses.append(f"LOWER({name_col}) LIKE LOWER(?)") | |
| params.append(f'%{query}%') | |
| if ntee_code and ntee_col in available_columns: | |
| where_clauses.append(f"{ntee_col} LIKE ?") | |
| params.append(f'{ntee_code}%') | |
| where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" | |
| # Count query | |
| count_sql = f"SELECT COUNT(*) FROM '{data_source}' WHERE {where_sql}" | |
| result = conn.execute(count_sql, params).fetchone() | |
| conn.close() | |
| count = result[0] if result else 0 | |
| # Cache the result | |
| _count_cache[cache_key] = count | |
| _count_cache_ttl[cache_key] = now | |
| return count | |
| except Exception as e: | |
| logger.error(f"Count error: {e}") | |
| return 0 | |
| def search_organizations(query: str, state: Optional[str] = None, ntee_code: Optional[str] = None, limit: int = 10, offset: int = 0, enrich: bool = False, sort: str = 'relevance', ein: Optional[str] = None) -> List[SearchResult]: | |
| """Search nonprofit organizations using DuckDB for fast Parquet queries | |
| Args: | |
| enrich: If True, fetch additional data from Every.org API (slower) | |
| sort: Sort order - 'relevance', 'name-asc', 'name-desc', 'revenue-asc', 'revenue-desc', 'assets-asc', 'assets-desc' | |
| ein: If provided, filter to exact EIN match (for direct organization links) | |
| """ | |
| results = [] | |
| try: | |
| # Determine file path | |
| if state: | |
| file_pattern = f"{GOLD_DIR}/states/{state}/nonprofits_organizations.parquet" | |
| else: | |
| file_pattern = f"{GOLD_DIR}/national/nonprofits_organizations.parquet" | |
| # Get data source (local or remote HuggingFace URL) | |
| file_path = Path(file_pattern) | |
| data_source = get_data_source(file_path, use_remote=IS_HF_SPACES) | |
| # Load parquet with caching (speeds up from 2-3s to <10ms) | |
| df = load_parquet_cached(data_source) | |
| # Initialize DuckDB connection | |
| conn = duckdb.connect() | |
| # Query the DataFrame directly (DuckDB can query pandas DataFrames) | |
| available_columns = set(df.columns) | |
| # Detect column name variations (handle different schemas) | |
| name_col = 'organization_name' if 'organization_name' in available_columns else 'name' | |
| ntee_col = 'ntee_code' if 'ntee_code' in available_columns else 'ntee_cd' | |
| revenue_col = 'form_990_total_revenue' if 'form_990_total_revenue' in available_columns else 'revenue_amt' | |
| asset_col = 'form_990_total_assets' if 'form_990_total_assets' in available_columns else 'asset_amt' | |
| income_col = 'form_990_net_income' if 'form_990_net_income' in available_columns else 'income_amt' | |
| # Build WHERE clauses using detected column names | |
| where_clauses = [] | |
| params = [] | |
| # EIN filter (exact match - highest priority for direct organization links) | |
| if ein and ein.strip(): | |
| where_clauses.append("ein = ?") | |
| params.append(ein.strip()) | |
| # Search query (case-insensitive LIKE) - only if query provided and no EIN | |
| if query and query.strip() and not ein: | |
| where_clauses.append(f"LOWER({name_col}) LIKE LOWER(?)") | |
| params.append(f'%{query}%') | |
| # State filter (if using national file) | |
| if state and not file_pattern.startswith(f"{GOLD_DIR}/states/"): | |
| where_clauses.append("state = ?") | |
| params.append(state) | |
| # NTEE code filter | |
| if ntee_code and ntee_col in available_columns: | |
| where_clauses.append(f"{ntee_col} LIKE ?") | |
| params.append(f'{ntee_code}%') | |
| # Default to TRUE if no filters (browse all) | |
| where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" | |
| # Build column list with proper NULL handling for missing columns | |
| select_columns = [] | |
| # Add core columns (with aliases for consistency) | |
| select_columns.append(f'{name_col} as name') | |
| select_columns.append('city') | |
| select_columns.append('state') | |
| select_columns.append(f'{ntee_col} as ntee_cd' if ntee_col in available_columns else 'NULL as ntee_cd') | |
| select_columns.append('ein') | |
| select_columns.append(f'{revenue_col} as revenue_amt' if revenue_col in available_columns else 'NULL as revenue_amt') | |
| select_columns.append(f'{asset_col} as asset_amt' if asset_col in available_columns else 'NULL as asset_amt') | |
| select_columns.append(f'{income_col} as income_amt' if income_col in available_columns else 'NULL as income_amt') | |
| select_columns.append('tax_period' if 'tax_period' in available_columns else 'NULL as tax_period') | |
| # Track enrichment columns (form_990 and bigquery) | |
| enrichment_cols = [] | |
| enrichment_col_map = {} | |
| # Check for website columns (multiple possible names) - ALWAYS add if exists | |
| website_col_added = False | |
| for col_name in ['bigquery_website', 'form_990_website', 'website', 'everyorg_website']: | |
| if col_name in available_columns: | |
| select_columns.append(f'{col_name} as enrichment_website') | |
| enrichment_cols.append('enrichment_website') | |
| enrichment_col_map['enrichment_website'] = col_name | |
| website_col_added = True | |
| logger.debug(f"Added website column: {col_name}") | |
| break | |
| # Check for mission columns | |
| mission_col_added = False | |
| for col_name in ['bigquery_mission', 'form_990_mission', 'mission', 'everyorg_mission']: | |
| if col_name in available_columns: | |
| select_columns.append(f'{col_name} as enrichment_mission') | |
| enrichment_cols.append('enrichment_mission') | |
| enrichment_col_map['enrichment_mission'] = col_name | |
| mission_col_added = True | |
| logger.debug(f"Added mission column: {col_name}") | |
| break | |
| # Check for logo columns | |
| logo_col_added = False | |
| for col_name in ['logodev_logo_url', 'everyorg_logo_url', 'logo_url']: | |
| if col_name in available_columns: | |
| select_columns.append(f'{col_name} as enrichment_logo') | |
| enrichment_cols.append('enrichment_logo') | |
| enrichment_col_map['enrichment_logo'] = col_name | |
| logo_col_added = True | |
| logger.debug(f"Added logo column: {col_name}") | |
| break | |
| # Last updated timestamp | |
| for col_name in ['bigquery_updated_date', 'form_990_last_updated', 'everyorg_last_updated']: | |
| if col_name in available_columns: | |
| select_columns.append(f'{col_name} as enrichment_last_updated') | |
| enrichment_cols.append('enrichment_last_updated') | |
| enrichment_col_map['enrichment_last_updated'] = col_name | |
| logger.debug(f"Added timestamp column: {col_name}") | |
| break | |
| columns_sql = ', '.join(select_columns) | |
| # Log what we're selecting | |
| logger.info(f"🔍 Enrichment columns to select: {enrichment_cols}") | |
| logger.info(f"📋 Full SQL columns: {columns_sql}") | |
| # Build ORDER BY clause based on sort parameter | |
| order_by_clauses = [] | |
| if sort == 'name-asc': | |
| order_by_clauses.append(f"{name_col} ASC") | |
| elif sort == 'name-desc': | |
| order_by_clauses.append(f"{name_col} DESC") | |
| elif sort == 'revenue-desc': | |
| order_by_clauses.append(f"COALESCE(TRY_CAST({revenue_col} AS BIGINT), 0) DESC") | |
| elif sort == 'revenue-asc': | |
| # Low to high: Show positive values first (smallest to largest), then zeros, then negatives | |
| order_by_clauses.append(f""" | |
| CASE | |
| WHEN TRY_CAST({revenue_col} AS BIGINT) IS NULL THEN 3 | |
| WHEN TRY_CAST({revenue_col} AS BIGINT) <= 0 THEN 2 | |
| ELSE 1 | |
| END ASC, | |
| ABS(COALESCE(TRY_CAST({revenue_col} AS BIGINT), 0)) ASC | |
| """) | |
| elif sort == 'assets-desc': | |
| order_by_clauses.append(f"COALESCE(TRY_CAST({asset_col} AS BIGINT), 0) DESC") | |
| elif sort == 'assets-asc': | |
| # Low to high: Show positive values first (smallest to largest), then zeros, then negatives | |
| order_by_clauses.append(f""" | |
| CASE | |
| WHEN TRY_CAST({asset_col} AS BIGINT) IS NULL THEN 3 | |
| WHEN TRY_CAST({asset_col} AS BIGINT) <= 0 THEN 2 | |
| ELSE 1 | |
| END ASC, | |
| ABS(COALESCE(TRY_CAST({asset_col} AS BIGINT), 0)) ASC | |
| """) | |
| elif query and query.strip(): | |
| # Relevance sort (only for search mode) | |
| order_by_clauses.append("score DESC") | |
| order_by_clauses.append(f"COALESCE(TRY_CAST({revenue_col} AS BIGINT), 0) DESC") | |
| else: | |
| # Default browse mode: sort by revenue/assets | |
| order_by_clauses.append(f"COALESCE(TRY_CAST({revenue_col} AS BIGINT), 0) DESC") | |
| order_by_clauses.append(f"COALESCE(TRY_CAST({asset_col} AS BIGINT), 0) DESC") | |
| # Always add name as final sort for consistency | |
| if 'name' not in sort: | |
| order_by_clauses.append(f"{name_col}") | |
| order_by_sql = ', '.join(order_by_clauses) | |
| # SQL query with relevance scoring (browse mode if no query) | |
| if query and query.strip(): | |
| # Search mode: score by text match | |
| sql = f""" | |
| SELECT | |
| {columns_sql}, | |
| CASE | |
| WHEN LOWER({name_col}) LIKE LOWER(?) THEN 1.5 | |
| WHEN LOWER({name_col}) LIKE LOWER(?) THEN 1.0 | |
| ELSE 0.5 | |
| END as score | |
| FROM df | |
| WHERE {where_sql} | |
| ORDER BY {order_by_sql} | |
| LIMIT ? OFFSET ? | |
| """ | |
| # Execute query with scoring parameters | |
| query_params = [f'{query}%', f'%{query}%'] + params + [limit, offset] | |
| else: | |
| # Browse mode: sort by size/activity | |
| sql = f""" | |
| SELECT | |
| {columns_sql}, | |
| 1.0 as score | |
| FROM df | |
| WHERE {where_sql} | |
| ORDER BY {order_by_sql} | |
| LIMIT ? OFFSET ? | |
| """ | |
| # Execute query without scoring parameters | |
| query_params = params + [limit, offset] | |
| rows = conn.execute(sql, query_params).fetchall() | |
| # NTEE code descriptions for better context | |
| ntee_descriptions = { | |
| 'E': 'Health Services', | |
| 'E60': 'Health Support Services', | |
| 'E61': 'Blood Supply', | |
| 'E62': 'Emergency Medical Services', | |
| 'E65': 'Organ & Tissue Banks', | |
| 'E70': 'Public Health', | |
| 'E80': 'Health Treatment - Primary Care', | |
| 'E90': 'Nursing Services', | |
| 'E20': 'Hospitals & Primary Medical Care', | |
| 'E30': 'Ambulatory & Primary Health Care', | |
| 'E32': 'Clinics & Community Health Centers', | |
| 'P': 'Human Services', | |
| 'B': 'Education', | |
| 'X': 'Religion-Related', | |
| 'A': 'Arts, Culture & Humanities', | |
| } | |
| # Convert to SearchResult objects with intelligent enrichment | |
| for row in rows: | |
| # Unpack base columns (now includes tax_period) | |
| org_name, city, state_code, ntee, ein, revenue, assets, income, tax_period = row[:9] | |
| # Unpack optional enrichment columns if present | |
| existing_data = {} | |
| idx = 9 | |
| if 'enrichment_website' in enrichment_cols: | |
| existing_data['enrichment_website'] = row[idx] | |
| # Only log non-null websites to reduce spam | |
| if row[idx] and str(row[idx]) != 'nan': | |
| logger.debug(f"✅ Website: {row[idx]}") | |
| idx += 1 | |
| if 'enrichment_mission' in enrichment_cols: | |
| existing_data['enrichment_mission'] = row[idx] | |
| idx += 1 | |
| if 'enrichment_logo' in enrichment_cols: | |
| existing_data['enrichment_logo'] = row[idx] | |
| idx += 1 | |
| if 'enrichment_last_updated' in enrichment_cols: | |
| existing_data['enrichment_last_updated'] = row[idx] | |
| idx += 1 | |
| score = row[-1] # Score is always last | |
| # Parse tax year from tax_period (format: YYYYMM) | |
| tax_year = None | |
| if tax_period and str(tax_period).isdigit() and len(str(tax_period)) >= 4: | |
| tax_year = int(str(tax_period)[:4]) | |
| # Get enriched data with intelligent backfill (only if requested) | |
| enrichment = get_enrichment_data(ein, existing_data) if (ein and enrich) else {} | |
| # Build a more informative description | |
| ntee_desc = None | |
| if ntee: | |
| # Try exact match first, then prefix match | |
| ntee_desc = ntee_descriptions.get(ntee) | |
| if not ntee_desc: | |
| # Try first character (major category) | |
| ntee_desc = ntee_descriptions.get(ntee[0]) if ntee else None | |
| # Use enriched mission as primary description, fallback to NTEE + financial | |
| description = enrichment.get('mission') if enrichment.get('mission') else None | |
| # Validate mission: if it contains a different org name, it's stale data | |
| if description and org_name: | |
| # Check if mission mentions a completely different org name | |
| # (e.g., "Catalyst Institute" when org name is "CAREQUEST INSTITUTE") | |
| mission_lower = description.lower() | |
| name_words = set(org_name.lower().split()) | |
| # If mission starts with an org name that's not in our actual org name, skip it | |
| first_sentence = description.split('.')[0].lower() | |
| if ' is a nonprofit' in first_sentence or ' is an nonprofit' in first_sentence: | |
| # Extract the subject (organization name before "is a nonprofit") | |
| subject = first_sentence.split(' is a')[0].strip() | |
| subject_words = set(subject.split()) | |
| # If the subject shares NO significant words with our org name, it's stale | |
| # (e.g., "catalyst institute" vs "carequest institute") | |
| significant_words = subject_words - {'the', 'a', 'an', 'of', 'for', 'and', 'inc', 'llc'} | |
| name_significant = name_words - {'the', 'a', 'an', 'of', 'for', 'and', 'inc', 'llc', 'institute'} | |
| if significant_words and not (significant_words & name_significant): | |
| # Stale data - mission talks about a different org | |
| logger.warning(f"Stale mission data for {org_name}: '{subject}' != '{org_name}'") | |
| description = None | |
| if not description: | |
| description_parts = [] | |
| if ntee_desc: | |
| description_parts.append(ntee_desc) | |
| # Convert financial data to numbers (handle None and string types) | |
| try: | |
| revenue_num = float(revenue) if revenue else 0 | |
| assets_num = float(assets) if assets else 0 | |
| except (ValueError, TypeError): | |
| revenue_num = 0 | |
| assets_num = 0 | |
| if revenue_num > 0: | |
| description_parts.append(f"Revenue: ${revenue_num:,.0f}") | |
| elif assets_num > 0: | |
| description_parts.append(f"Assets: ${assets_num:,.0f}") | |
| if not description_parts: | |
| description_parts.append(f"Nonprofit serving {city}") | |
| description = " • ".join(description_parts) | |
| # Build metadata with enriched fields | |
| metadata = { | |
| "ein": ein, | |
| "city": city, | |
| "state": state_code, | |
| "ntee_code": ntee, | |
| "revenue": revenue, | |
| "assets": assets, | |
| "income": income, | |
| "tax_year": tax_year, | |
| "data_sources": [] | |
| } | |
| # ALWAYS add enrichment from parquet columns (existing_data) - no enrich flag needed | |
| if existing_data.get('enrichment_website'): | |
| metadata['website'] = existing_data['enrichment_website'] | |
| metadata['data_sources'].append('cached') | |
| if existing_data.get('enrichment_mission'): | |
| metadata['mission'] = existing_data['enrichment_mission'] | |
| if 'cached' not in metadata['data_sources']: | |
| metadata['data_sources'].append('cached') | |
| if existing_data.get('enrichment_logo'): | |
| metadata['logo_url'] = existing_data['enrichment_logo'] | |
| if 'cached' not in metadata['data_sources']: | |
| metadata['data_sources'].append('cached') | |
| # Add API enrichment if requested (enrich=true) | |
| if enrichment: | |
| if enrichment.get('website') and not metadata.get('website'): | |
| metadata['website'] = enrichment['website'] | |
| if enrichment.get('logo_url'): | |
| metadata['logo_url'] = enrichment['logo_url'] | |
| if enrichment.get('profile_url'): | |
| metadata['profile_url'] = enrichment['profile_url'] | |
| if enrichment.get('causes'): | |
| metadata['causes'] = enrichment['causes'] | |
| # Add API data sources | |
| for source in enrichment.get('data_sources', []): | |
| if source not in metadata['data_sources']: | |
| metadata['data_sources'].append(source) | |
| results.append(SearchResult( | |
| result_type="organization", | |
| title=org_name if org_name else "Unknown", | |
| subtitle=f"{city}, {state_code}" + (f" - NTEE: {ntee}" if ntee else ""), | |
| description=description, | |
| url=f"/search?types=organizations&state={state_code}&ein={ein}", | |
| score=score, | |
| metadata=metadata | |
| )) | |
| conn.close() | |
| logger.info(f"DuckDB search found {len(results)} organizations for query '{query}'") | |
| except Exception as e: | |
| logger.error(f"Organization search error: {e}") | |
| return results | |
| def search_causes(query: str, limit: int = 10) -> List[SearchResult]: | |
| """Search causes and NTEE categories - supports browse mode""" | |
| results = [] | |
| try: | |
| # Get data source (local or remote HuggingFace URL) | |
| ntee_file = GOLD_DIR / "reference" / "causes_ntee_codes.parquet" | |
| data_source = get_data_source(ntee_file, use_remote=IS_HF_SPACES) | |
| # Load with caching | |
| df = load_parquet_cached(data_source) | |
| logger.debug(f"Loaded {len(df)} NTEE codes from cache") | |
| for _, row in df.iterrows(): | |
| code = str(row.get('ntee_code', '')) | |
| description = str(row.get('description', '')) | |
| ntee_type = str(row.get('ntee_type', '')) | |
| # Browse mode: return all causes | |
| # Search mode: filter by relevance | |
| if query and query.strip(): | |
| score = max( | |
| calculate_relevance_score(description, query), | |
| calculate_relevance_score(code, query) | |
| ) | |
| if score <= 0.3: | |
| continue # Skip low relevance results | |
| else: | |
| score = 1.0 # Default score for browse mode | |
| results.append(SearchResult( | |
| result_type="cause", | |
| title=description, | |
| subtitle=f"NTEE Code: {code}", | |
| description=f"Category type: {ntee_type}", | |
| url=f"/nonprofits?ntee_code={code}", | |
| score=score, | |
| metadata={ | |
| "ntee_code": code, | |
| "ntee_type": ntee_type | |
| } | |
| )) | |
| logger.info(f"Found {len(results)} cause results for query '{query}'") | |
| except Exception as e: | |
| logger.error(f"Cause search error: {e}") | |
| results.sort(key=lambda x: x.score, reverse=True) | |
| return results[:limit] | |
| def search_jurisdictions(query: str, state: Optional[str] = None, city: Optional[str] = None, jurisdiction_levels: Optional[List[str]] = None, limit: int = 10, offset: int = 0) -> List[SearchResult]: | |
| """Search cities, counties, townships, and school districts using DuckDB""" | |
| all_results = [] | |
| try: | |
| conn = duckdb.connect() | |
| # Map frontend level IDs to file keys | |
| level_mapping = { | |
| 'city': 'city', | |
| 'county': 'county', | |
| 'town': 'township', | |
| 'village': 'township', | |
| 'school_district': 'school district', | |
| 'special_district': 'school district', # Use school district as proxy | |
| 'state': None # States handled separately if needed | |
| } | |
| # Define jurisdiction files with priority scores | |
| jurisdiction_files = { | |
| 'county': (f"{GOLD_DIR}/reference/jurisdictions_counties.parquet", 1.3), # Boost counties | |
| 'city': (f"{GOLD_DIR}/reference/jurisdictions_cities.parquet", 1.0), | |
| 'school district': (f"{GOLD_DIR}/reference/jurisdictions_school_districts.parquet", 1.1), # Boost school districts | |
| 'township': (f"{GOLD_DIR}/reference/jurisdictions_townships.parquet", 0.9) | |
| } | |
| # Filter jurisdiction files based on selected levels | |
| if jurisdiction_levels: | |
| # Map selected levels to file keys | |
| selected_file_keys = set() | |
| for level in jurisdiction_levels: | |
| file_key = level_mapping.get(level) | |
| if file_key: | |
| selected_file_keys.add(file_key) | |
| # Filter to only selected types | |
| if selected_file_keys: | |
| jurisdiction_files = { | |
| k: v for k, v in jurisdiction_files.items() | |
| if k in selected_file_keys | |
| } | |
| # Fetch enough results from each type to ensure diversity | |
| # Even with small limits, we want representation from each type | |
| per_type_limit = max(limit, 15) | |
| for jtype, (file_path, type_score) in jurisdiction_files.items(): | |
| file_path_obj = Path(file_path) | |
| if not file_path_obj.exists(): | |
| continue | |
| try: | |
| # Build SQL query - use state column (lowercase) | |
| where_clauses = [] | |
| params = [] | |
| if state: | |
| where_clauses.append("state = ?") | |
| params.append(state) | |
| if city and query: | |
| # If city is specified, search for jurisdictions matching the city name | |
| where_clauses.append("LOWER(NAME) LIKE LOWER(?)") | |
| params.append(f"%{city}%") | |
| elif query: | |
| # General search across jurisdiction names | |
| where_clauses.append("LOWER(NAME) LIKE LOWER(?)") | |
| params.append(f"%{query}%") | |
| where_clause = " AND ".join(where_clauses) if where_clauses else "1=1" | |
| # Calculate name match score if query provided | |
| score_expr = f"{type_score}" | |
| if query: | |
| score_expr = f"""CASE | |
| WHEN LOWER(NAME) = LOWER('{query}') THEN {type_score} * 2.0 | |
| WHEN LOWER(NAME) LIKE LOWER('{query}%') THEN {type_score} * 1.5 | |
| ELSE {type_score} | |
| END""" | |
| sql = f""" | |
| SELECT | |
| NAME as name, | |
| state, | |
| GEOID as geoid, | |
| jurisdiction_type, | |
| {score_expr} as score | |
| FROM read_parquet(?) | |
| WHERE {where_clause} | |
| ORDER BY score DESC, NAME ASC | |
| LIMIT ? | |
| """ | |
| query_params = [str(file_path_obj)] + params + [per_type_limit] | |
| df = conn.execute(sql, query_params).fetchdf() | |
| for _, row in df.iterrows(): | |
| jurisdiction_label = row['jurisdiction_type'].replace('_', ' ').title() | |
| all_results.append(SearchResult( | |
| result_type='jurisdiction', | |
| title=f"{row['name']}", | |
| subtitle=f"{jurisdiction_label}", | |
| description=f"{jurisdiction_label} in {row['state']}", | |
| url=f"/jurisdictions/{row['geoid']}", | |
| score=float(row['score']), | |
| metadata={ | |
| 'state': row['state'], | |
| 'geoid': row['geoid'], | |
| 'type': row['jurisdiction_type'] | |
| } | |
| )) | |
| except Exception as e: | |
| logger.error(f"Error searching {jtype} jurisdictions: {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"Jurisdiction search error: {e}") | |
| # Sort all results by score, then apply pagination | |
| all_results.sort(key=lambda x: (x.score, x.title), reverse=True) | |
| return all_results[offset:offset + limit] | |
| async def unified_search( | |
| q: Optional[str] = Query(None, description="Search query (optional - browse by filters if omitted)"), | |
| types: Optional[str] = Query(None, description="Comma-separated result types: contacts,meetings,organizations,causes,jurisdictions"), | |
| state: Optional[str] = Query(None, description="Filter by state (2-letter code)"), | |
| city: Optional[str] = Query(None, description="Filter by city name"), | |
| jurisdiction_levels: Optional[str] = Query(None, description="Comma-separated jurisdiction levels: city,county,town,village,school_district,special_district,state"), | |
| ntee_code: Optional[str] = Query(None, description="Filter organizations by NTEE code"), | |
| ein: Optional[str] = Query(None, description="Filter organizations by exact EIN (for direct organization links)"), | |
| limit: int = Query(20, ge=1, le=100, description="Maximum results per type"), | |
| offset: int = Query(0, ge=0, description="Number of results to skip (for pagination)"), | |
| page: int = Query(1, ge=1, description="Page number (alternative to offset)"), | |
| enrich: bool = Query(False, description="Enable API enrichment (slower - fetches logos, causes from Every.org)"), | |
| sort: str = Query('relevance', description="Sort order: relevance, name-asc, name-desc, revenue-asc, revenue-desc, assets-asc, assets-desc") | |
| ): | |
| """ | |
| Unified search across all data types | |
| Search for contacts, meetings, organizations, and causes in one query. | |
| **NEW:** Query is now optional - you can browse by state/type without searching! | |
| **Pagination:** | |
| - Use `offset` to skip results: `offset=20` skips first 20 results | |
| - Or use `page` with `limit`: `page=2&limit=20` gets results 21-40 | |
| - `offset` takes precedence if both are provided | |
| **Examples:** | |
| - `/api/search?q=dental` - Search everything for "dental" | |
| - `/api/search?types=organizations&state=GA` - Browse all orgs in Georgia | |
| - `/api/search?q=budget&types=meetings` - Search only meetings | |
| - `/api/search?q=health&state=AL` - Search in Alabama only | |
| - `/api/search?q=education&types=organizations,causes` - Search orgs and causes | |
| - `/api/search?q=health&state=MA&page=2&limit=20` - Page 2 of MA health results | |
| """ | |
| # 🔍 DEBUG LOGGING - Log all incoming request parameters | |
| logger.info(f"🔍 SEARCH REQUEST: q={q!r}, types={types!r}, state={state!r}, city={city!r}, jurisdiction_levels={jurisdiction_levels!r}, ntee_code={ntee_code!r}, ein={ein!r}, limit={limit}, offset={offset}, page={page}, enrich={enrich}, sort={sort!r}") | |
| try: | |
| # Calculate offset from page if offset not explicitly provided | |
| if offset == 0 and page > 1: | |
| offset = (page - 1) * limit | |
| # Parse requested types | |
| if types: | |
| requested_types = [t.strip() for t in types.split(',')] | |
| else: | |
| requested_types = ['contacts', 'meetings', 'organizations', 'causes', 'jurisdictions'] | |
| # Parse jurisdiction levels if provided | |
| jurisdiction_levels_list = None | |
| if jurisdiction_levels: | |
| jurisdiction_levels_list = [level.strip() for level in jurisdiction_levels.split(',')] | |
| logger.info(f"📋 Requested types: {requested_types}, calculated offset: {offset}") | |
| all_results = [] | |
| # Optimize for single-type browse mode (no query) | |
| # Let database handle pagination for efficiency | |
| use_db_pagination = not q and len(requested_types) == 1 | |
| if use_db_pagination: | |
| # Single-type browse: pass offset to DB for efficient pagination | |
| search_limit = limit | |
| search_offset = offset | |
| else: | |
| # Multi-type or search mode: fetch extra for mixing/sorting | |
| search_limit = offset + limit + 100 | |
| search_offset = 0 | |
| if 'contacts' in requested_types: | |
| # Use PostgreSQL for fast indexed search | |
| contact_results_pg = await search_postgres.search_contacts_pg(q, state, limit=search_limit) | |
| contact_results = [convert_pg_result(r) for r in contact_results_pg] | |
| logger.info(f"👤 Contacts search returned {len(contact_results)} results") | |
| all_results.extend(contact_results) | |
| if 'meetings' in requested_types: | |
| # Use PostgreSQL for fast indexed search | |
| meeting_results_pg = await search_postgres.search_events_pg(q, state, limit=search_limit) | |
| meeting_results = [convert_pg_result(r) for r in meeting_results_pg] | |
| logger.info(f"📅 Meetings search returned {len(meeting_results)} results") | |
| all_results.extend(meeting_results) | |
| if 'organizations' in requested_types: | |
| # Use PostgreSQL for fast indexed search | |
| org_results_pg = await search_postgres.search_organizations_pg(q, state, ntee_code, ein, limit=search_limit, offset=search_offset, sort=sort) | |
| org_results = [convert_pg_result(r) for r in org_results_pg] | |
| logger.info(f"🏢 Organizations search returned {len(org_results)} results") | |
| all_results.extend(org_results) | |
| if 'causes' in requested_types: | |
| cause_results = search_causes(q or "", limit=search_limit) | |
| logger.info(f"🎯 Causes search returned {len(cause_results)} results") | |
| all_results.extend(cause_results) | |
| if 'jurisdictions' in requested_types: | |
| # Use PostgreSQL for fast indexed search | |
| jurisdiction_results_pg = await search_postgres.search_jurisdictions_pg(q, state, city, jurisdiction_levels_list, limit=search_limit, offset=search_offset) | |
| jurisdiction_results = [convert_pg_result(r) for r in jurisdiction_results_pg] | |
| logger.info(f"🏛️ Jurisdictions search returned {len(jurisdiction_results)} results") | |
| all_results.extend(jurisdiction_results) | |
| # Sort all results by score | |
| all_results.sort(key=lambda x: x.score, reverse=True) | |
| logger.info(f"📊 Total combined results: {len(all_results)}, applying pagination (offset={offset}, limit={limit})") | |
| # Apply pagination | |
| if use_db_pagination: | |
| # DB already paginated - use all results | |
| paginated_results = all_results | |
| else: | |
| # Paginate in-memory from combined results | |
| paginated_results = all_results[offset:offset + limit] | |
| logger.info(f"✂️ Paginated results: {len(paginated_results)} items") | |
| # Group by type for response | |
| grouped_results = { | |
| 'contacts': [r.to_dict() for r in paginated_results if r.result_type == 'contact'], | |
| 'meetings': [r.to_dict() for r in paginated_results if r.result_type == 'meeting'], | |
| 'organizations': [r.to_dict() for r in paginated_results if r.result_type == 'organization'], | |
| 'causes': [r.to_dict() for r in paginated_results if r.result_type == 'cause'], | |
| 'jurisdictions': [r.to_dict() for r in paginated_results if r.result_type == 'jurisdiction'], | |
| } | |
| logger.info(f"📦 Grouped results - contacts:{len(grouped_results['contacts'])}, meetings:{len(grouped_results['meetings'])}, organizations:{len(grouped_results['organizations'])}, causes:{len(grouped_results['causes'])}, jurisdictions:{len(grouped_results['jurisdictions'])}") | |
| # Calculate total results per type (from all_results before pagination) | |
| type_totals = { | |
| 'contacts': len([r for r in all_results if r.result_type == 'contact']), | |
| 'meetings': len([r for r in all_results if r.result_type == 'meeting']), | |
| 'organizations': len([r for r in all_results if r.result_type == 'organization']), | |
| 'causes': len([r for r in all_results if r.result_type == 'cause']), | |
| 'jurisdictions': len([r for r in all_results if r.result_type == 'jurisdiction']), | |
| } | |
| # Calculate total results | |
| # For single-type browse mode, get accurate count from database | |
| if not q and len(requested_types) == 1: | |
| # Browse mode: count total matching records in DB | |
| if 'organizations' in requested_types: | |
| total_results = count_organizations(state=state, ntee_code=ntee_code, query=q) | |
| type_totals['organizations'] = total_results # Use accurate DB count | |
| else: | |
| # Fallback to fetched results for other types | |
| total_results = len(all_results) | |
| else: | |
| # Search/multi-type mode: use fetched results | |
| total_results = len(all_results) | |
| total_pages = (total_results + limit - 1) // limit # Ceiling division | |
| response_data = { | |
| "query": q or "", | |
| "total_results": total_results, | |
| "type_totals": type_totals, # Add per-type totals | |
| "results": grouped_results, | |
| "pagination": { | |
| "page": page if offset == 0 or offset == (page - 1) * limit else (offset // limit) + 1, | |
| "limit": limit, | |
| "offset": offset, | |
| "total_pages": total_pages, | |
| "has_next": offset + limit < total_results, | |
| "has_prev": offset > 0 | |
| }, | |
| "filters": { | |
| "state": state, | |
| "ntee_code": ntee_code, | |
| "types": requested_types, | |
| "sort": sort | |
| } | |
| } | |
| logger.info(f"✅ Search complete - returning {total_results} total results, {len(paginated_results)} on this page") | |
| return response_data | |
| except Exception as e: | |
| logger.error(f"❌ Search error: {type(e).__name__}: {e}") | |
| logger.exception("Full traceback:") | |
| # Parse error into structured response | |
| error_detail = parse_error(e, context={ | |
| "query": q, | |
| "state": state, | |
| "types": types, | |
| "data_type": "search" | |
| }) | |
| return JSONResponse( | |
| status_code=500, | |
| content=error_detail.model_dump() | |
| ) | |
| async def search_suggestions( | |
| q: str = Query(..., min_length=1, description="Partial search query"), | |
| limit: int = Query(5, ge=1, le=20, description="Maximum suggestions") | |
| ): | |
| """ | |
| Get search suggestions/autocomplete | |
| Returns quick suggestions as user types | |
| """ | |
| try: | |
| suggestions = [] | |
| # Common search terms | |
| common_terms = [ | |
| "dental health", "oral health", "affordable housing", "public transit", | |
| "school funding", "budget", "water quality", "parks", "zoning", | |
| "police", "fire department", "mental health", "food assistance", | |
| "senior services", "youth programs", "employment", "job training" | |
| ] | |
| # Filter suggestions | |
| q_lower = q.lower() | |
| suggestions = [term for term in common_terms if q_lower in term.lower()] | |
| return { | |
| "query": q, | |
| "suggestions": suggestions[:limit] | |
| } | |
| except Exception as e: | |
| logger.error(f"Suggestion error: {e}") | |
| # Parse error into structured response | |
| error_detail = parse_error(e, context={ | |
| "query": q, | |
| "data_type": "suggestions" | |
| }) | |
| return JSONResponse( | |
| status_code=500, | |
| content=error_detail.model_dump() | |
| ) | |