""" Unified Search API LinkedIn-style search across contacts, meetings, organizations, and causes Uses hybrid approach: PostgreSQL (primary, fast) + HuggingFace Search API + DuckDB (fallback) """ from fastapi import APIRouter, Query, HTTPException from fastapi.responses import JSONResponse from typing import Optional, List, Dict, Any from pathlib import Path import pandas as pd import duckdb from loguru import logger import re import os import sys import requests from functools import lru_cache from datetime import datetime, timedelta from api.errors import ErrorDetail, parse_error # Import PostgreSQL search functions (primary) from api.routes import search_postgres # Import HuggingFace Search helpers from api.routes.hf_search import ( search_contacts_hf, search_organizations_hf, search_jurisdictions_hf, is_dataset_indexed ) router = APIRouter(tags=["search"]) # Paths to gold datasets GOLD_DIR = Path("data/gold") # Detect deployment environment IS_HF_SPACES = os.getenv("HF_SPACES") == "1" HF_ORGANIZATION = os.getenv('HF_ORGANIZATION', 'CommunityOne') # Cache for count queries (TTL: 1 hour) _count_cache = {} _count_cache_ttl = {} # In-memory DataFrame cache for HuggingFace datasets (TTL: 5 minutes) # Reduces remote HTTP requests from 2-3s to <10ms per search _dataframe_cache: Dict[str, pd.DataFrame] = {} _dataframe_cache_ttl: Dict[str, datetime] = {} DATAFRAME_CACHE_TTL = timedelta(minutes=5) # Every.org API config (fallback only) EVERYORG_API_KEY = os.getenv('EVERYORG_API_KEY', '') EVERYORG_API_BASE = "https://partners.every.org/v0.2" def load_parquet_cached(url: str) -> pd.DataFrame: """ Load parquet file with in-memory caching to avoid repeated HTTP requests. Cache TTL: 5 minutes (balances speed vs freshness) Reduces search latency from 2-3s to <10ms per query. Args: url: URL to parquet file (local path or HuggingFace URL) Returns: pandas DataFrame """ now = datetime.now() # Check cache if url in _dataframe_cache: cache_time = _dataframe_cache_ttl.get(url) if cache_time and (now - cache_time) < DATAFRAME_CACHE_TTL: logger.debug(f"🚀 Cache hit for {url}") return _dataframe_cache[url] # Cache miss - load from source logger.info(f"📥 Loading parquet from {url}") df = pd.read_parquet(url) # Store in cache _dataframe_cache[url] = df _dataframe_cache_ttl[url] = now logger.debug(f"💾 Cached {len(df)} rows from {url}") return df def get_hf_dataset_url(dataset_name: str) -> str: """ Convert dataset name to HuggingFace parquet URL. HuggingFace Datasets library stores parquet files in the standard format: data/train-00000-of-00001.parquet Examples: states-ma-contacts-local-officials -> https://huggingface.co/datasets/CommunityOne/states-ma-contacts-local-officials/resolve/main/data/train-00000-of-00001.parquet """ return f"https://huggingface.co/datasets/{HF_ORGANIZATION}/{dataset_name}/resolve/main/data/train-00000-of-00001.parquet" def get_data_source(file_path: Path, use_remote: bool = False) -> str: """ Get data source (local path or remote URL) based on environment. Args: file_path: Local file path (e.g., data/gold/states/MA/contacts_local_officials.parquet) use_remote: Force remote URL even in local environment Returns: File path string (local) or HuggingFace URL (remote) """ if not IS_HF_SPACES and not use_remote: return str(file_path) # Convert local path to HuggingFace dataset name # data/gold/states/MA/contacts_local_officials.parquet -> states-ma-contacts-local-officials parts = file_path.parts if 'states' in parts: state_idx = parts.index('states') state = parts[state_idx + 1].lower() filename = parts[-1].replace('.parquet', '').replace('_', '-') dataset_name = f"states-{state}-{filename}" elif 'national' in parts: filename = parts[-1].replace('.parquet', '').replace('_', '-') dataset_name = f"national-{filename}" elif 'reference' in parts: filename = parts[-1].replace('.parquet', '').replace('_', '-') dataset_name = f"reference-{filename}" else: # Fallback to local return str(file_path) return get_hf_dataset_url(dataset_name) @lru_cache(maxsize=5000) def fetch_form990_data(ein: str) -> Optional[Dict[str, Any]]: """ Fetch enrichment data from ProPublica Nonprofit Explorer (FREE!) Uses their API to get website and mission from Form 990 filings """ if not ein: return None try: clean_ein = str(ein).replace('-', '').zfill(9) url = f"https://projects.propublica.org/nonprofits/api/v2/organizations/{clean_ein}.json" response = requests.get(url, timeout=3) if response.status_code == 200: data = response.json() org = data.get('organization', {}) filings = data.get('filings_with_data', []) # Get most recent filing data website = None mission = None if filings: # ProPublica provides website from most recent filing latest = filings[0] # Note: ProPublica API doesn't directly expose website field # but we can use their organization name and data as fallback pass return { 'website': website, # ProPublica doesn't expose this in API 'mission': None, # Would need to parse PDF 'source': 'propublica', 'last_updated': datetime.now().isoformat(), 'tax_year': filings[0].get('tax_prd_yr') if filings else None } except Exception as e: logger.debug(f"ProPublica lookup failed for EIN {ein}: {e}") return None @lru_cache(maxsize=5000) def fetch_everyorg_data(ein: str) -> Optional[Dict[str, Any]]: """Fetch enrichment data from Every.org API (cached) - FALLBACK ONLY""" if not EVERYORG_API_KEY or not ein: return None try: # Format EIN (remove dashes, ensure 9 digits) clean_ein = str(ein).replace('-', '').zfill(9) url = f"{EVERYORG_API_BASE}/nonprofit/{clean_ein}" headers = { "Authorization": f"Bearer {EVERYORG_API_KEY}", "Accept": "application/json" } response = requests.get(url, headers=headers, timeout=3) if response.status_code == 200: data = response.json() if data and 'data' in data and 'nonprofit' in data['data']: nonprofit = data['data']['nonprofit'] tags = data['data'].get('nonprofitTags', []) causes = [tag.get('tagName') for tag in tags if tag.get('tagName')] return { 'mission': nonprofit.get('description') or nonprofit.get('descriptionLong'), 'website': nonprofit.get('websiteUrl'), 'logo_url': nonprofit.get('logoUrl'), 'profile_url': nonprofit.get('profileUrl'), 'causes': causes[:5], # Limit to top 5 causes 'source': 'everyorg', 'last_updated': datetime.now().isoformat() } except Exception as e: logger.debug(f"Every.org lookup failed for EIN {ein}: {e}") return None def get_enrichment_data(ein: str, existing_data: Optional[Dict] = None) -> Dict[str, Any]: """ Get enrichment data with intelligent backfill strategy Priority: 1. Existing form_990_* data (if recent) 2. GivingTuesday 990 XML (future: direct S3 access) 3. ProPublica API (current fallback) 4. Every.org API (last resort) Tracks source and update time for incremental processing """ result = { 'website': None, 'mission': None, 'logo_url': None, 'profile_url': None, 'causes': [], 'data_sources': [] } # Check existing data first (skip if older than 30 days) if existing_data: cutoff_date = datetime.now() - timedelta(days=30) # Check enrichment data (from any source: form_990, bigquery, etc.) if existing_data.get('enrichment_website'): last_updated = existing_data.get('enrichment_last_updated') if not last_updated or (isinstance(last_updated, str) and datetime.fromisoformat(last_updated) > cutoff_date): result['website'] = existing_data['enrichment_website'] result['data_sources'].append('cached') if existing_data.get('enrichment_mission'): result['mission'] = existing_data['enrichment_mission'] if 'cached' not in result['data_sources']: result['data_sources'].append('cached') # Try Every.org for missing fields (keeps logo and causes which 990 doesn't have) if not result['website'] or not result['mission']: everyorg_data = fetch_everyorg_data(ein) if everyorg_data: if not result['website'] and everyorg_data.get('website'): result['website'] = everyorg_data['website'] result['data_sources'].append('everyorg') if not result['mission'] and everyorg_data.get('mission'): result['mission'] = everyorg_data['mission'] result['data_sources'].append('everyorg') # Always get logo and causes from Every.org result['logo_url'] = everyorg_data.get('logo_url') result['profile_url'] = everyorg_data.get('profile_url') result['causes'] = everyorg_data.get('causes', []) if result['logo_url'] or result['causes']: if 'everyorg' not in result['data_sources']: result['data_sources'].append('everyorg') return result class SearchResult: """Unified search result""" def __init__(self, result_type: str, title: str, subtitle: str, description: str, url: str, score: float, metadata: Dict[str, Any]): self.result_type = result_type self.title = title self.subtitle = subtitle self.description = description self.url = url self.score = score self.metadata = metadata def to_dict(self): return { "type": self.result_type, "title": self.title, "subtitle": self.subtitle, "description": self.description, "url": self.url, "score": self.score, "metadata": self.metadata } def convert_pg_result(pg_result: search_postgres.SearchResult) -> 'SearchResult': """Convert PostgreSQL SearchResult dataclass to SearchResult class""" return SearchResult( result_type=pg_result.result_type, title=pg_result.title, subtitle=pg_result.subtitle, description=pg_result.description, url=pg_result.url, score=pg_result.score, metadata=pg_result.metadata ) def calculate_relevance_score(text: str, query: str) -> float: """Calculate relevance score for text matching query""" if not text or not query: return 0.0 text_lower = text.lower() query_lower = query.lower() # Exact match gets highest score if query_lower in text_lower: score = 1.0 # Boost if it's at the start if text_lower.startswith(query_lower): score += 0.5 return min(score, 2.0) # Partial word matches query_words = query_lower.split() text_words = text_lower.split() matches = sum(1 for qw in query_words if any(qw in tw for tw in text_words)) return matches / len(query_words) if query_words else 0.0 def search_contacts_duckdb(query: str, state: Optional[str] = None, limit: int = 10) -> List[SearchResult]: """ Search contacts using DuckDB (supports local files or remote HTTP parquet). This is the fallback when HF Search API is unavailable. Supports browse mode when query is empty. """ results = [] # Determine if this is browse mode (no query) or search mode is_browse_mode = not query or query.strip() == '' try: # Initialize DuckDB connection conn = duckdb.connect() # Search 1: State Officials (OpenStates - state legislators, mayors, etc.) if state: officials_file_path = GOLD_DIR / "states" / state / "contacts_officials.parquet" officials_file_paths = [officials_file_path] else: officials_file_paths = list(GOLD_DIR.glob("states/*/contacts_officials.parquet"))[:5] logger.info(f"Searching {len(officials_file_paths)} state official contact files (OpenStates) - browse_mode={is_browse_mode}") for file_path in officials_file_paths: if not file_path.exists(): continue # Get data source (local or remote URL) data_source = get_data_source(file_path, use_remote=IS_HF_SPACES) try: if is_browse_mode: # Browse mode: return all officials, prioritize mayors sql = """ SELECT full_name as name, role_type as title, city_jurisdiction as jurisdiction, state, email, phone, CASE WHEN LOWER(role_type) = 'mayor' THEN 2.0 WHEN LOWER(role_type) LIKE '%council%' THEN 1.8 WHEN LOWER(role_type) LIKE '%commission%' THEN 1.7 ELSE 1.5 END as score FROM read_parquet(?) ORDER BY score DESC, full_name ASC LIMIT ? """ rows = conn.execute(sql, [data_source, limit]).fetchall() else: # Search mode: relevance scoring sql = """ SELECT full_name as name, role_type as title, city_jurisdiction as jurisdiction, state, email, phone, GREATEST( CASE WHEN LOWER(full_name) LIKE LOWER(?) THEN 1.5 WHEN LOWER(full_name) LIKE LOWER(?) THEN 1.0 ELSE 0.0 END, CASE WHEN LOWER(role_type) LIKE LOWER(?) THEN 1.5 WHEN LOWER(role_type) LIKE LOWER(?) THEN 1.0 ELSE 0.0 END, CASE WHEN LOWER(city_jurisdiction) LIKE LOWER(?) THEN 1.5 WHEN LOWER(city_jurisdiction) LIKE LOWER(?) THEN 1.0 ELSE 0.0 END, CASE WHEN LOWER(jurisdiction_name) LIKE LOWER(?) THEN 1.5 WHEN LOWER(jurisdiction_name) LIKE LOWER(?) THEN 1.0 ELSE 0.0 END ) as score FROM read_parquet(?) WHERE LOWER(full_name) LIKE LOWER(?) OR LOWER(role_type) LIKE LOWER(?) OR LOWER(city_jurisdiction) LIKE LOWER(?) OR LOWER(jurisdiction_name) LIKE LOWER(?) ORDER BY score DESC LIMIT ? """ query_pattern = f'%{query}%' query_start = f'{query}%' rows = conn.execute(sql, [ query_start, query_pattern, # name scoring query_start, query_pattern, # role_type scoring query_start, query_pattern, # city_jurisdiction scoring query_start, query_pattern, # jurisdiction_name scoring data_source, # file path or URL query_pattern, query_pattern, query_pattern, query_pattern, # WHERE clause limit ]).fetchall() # Convert to SearchResult objects for row in rows: name, title, jurisdiction, state_code, email, phone, score = row if score > 0.3: # Relevance threshold contact_info = [] if email: contact_info.append(f"📧 {email}") if phone: contact_info.append(f"📞 {phone}") description = f"State official in {jurisdiction}" if jurisdiction else f"State official in {state_code}" if contact_info: description += f" • {' • '.join(contact_info)}" results.append(SearchResult( result_type="contact", title=name if name else "Unknown", subtitle=f"{title.title() if title else 'Official'} - {jurisdiction or state_code}", description=description, url=f"/people/{name.replace(' ', '-') if name else 'unknown'}", score=score, metadata={ "title": title, "jurisdiction": jurisdiction, "state": state_code, "name": name, "email": email, "phone": phone, "contact_type": "state_official", "data_source": "OpenStates" } )) except Exception as e: logger.debug(f"Error searching state officials {file_path}: {e}") # Search 2: Local Officials (from meeting transcripts) if state: local_file_path = GOLD_DIR / "states" / state / "contacts_local_officials.parquet" local_file_paths = [local_file_path] else: local_file_paths = list(GOLD_DIR.glob("states/*/contacts_local_officials.parquet"))[:5] logger.info(f"Searching {len(local_file_paths)} local official contact files (meeting transcripts)") for file_path in local_file_paths: # Get data source (local or remote URL) data_source = get_data_source(file_path, use_remote=IS_HF_SPACES) try: # SQL query with relevance scoring across name, title, jurisdiction sql = """ SELECT name, title, jurisdiction, state, GREATEST( CASE WHEN LOWER(name) LIKE LOWER(?) THEN 1.5 WHEN LOWER(name) LIKE LOWER(?) THEN 1.0 ELSE 0.0 END, CASE WHEN LOWER(title) LIKE LOWER(?) THEN 1.5 WHEN LOWER(title) LIKE LOWER(?) THEN 1.0 ELSE 0.0 END, CASE WHEN LOWER(jurisdiction) LIKE LOWER(?) THEN 1.5 WHEN LOWER(jurisdiction) LIKE LOWER(?) THEN 1.0 ELSE 0.0 END ) as score FROM read_parquet(?) WHERE LOWER(name) LIKE LOWER(?) OR LOWER(title) LIKE LOWER(?) OR LOWER(jurisdiction) LIKE LOWER(?) ORDER BY score DESC LIMIT ? """ query_pattern = f'%{query}%' query_start = f'{query}%' rows = conn.execute(sql, [ query_start, query_pattern, # name scoring query_start, query_pattern, # title scoring query_start, query_pattern, # jurisdiction scoring data_source, # file path or URL query_pattern, query_pattern, query_pattern, # WHERE clause limit ]).fetchall() # Convert to SearchResult objects for row in rows: name, title, jurisdiction, state_code, score = row if score > 0.3: # Relevance threshold results.append(SearchResult( result_type="contact", title=name if name else "Unknown", subtitle=f"{title} - {jurisdiction}, {state_code}", description=f"Local official in {jurisdiction}", url=f"/people/{name.replace(' ', '-') if name else 'unknown'}", score=score, metadata={ "title": title, "jurisdiction": jurisdiction, "state": state_code, "name": name } )) except Exception as e: logger.debug(f"Error searching {file_path}: {e}") # Search 3: Nonprofit Officers from state directories nonprofit_files = [] # If state specified, search that state's directory if state: state_nonprofit_file = GOLD_DIR / "states" / state / "contacts_nonprofit_officers.parquet" nonprofit_files.append(state_nonprofit_file) else: # Search all state directories for state_dir in (GOLD_DIR / "states").glob("*/"): state_file = state_dir / "contacts_nonprofit_officers.parquet" nonprofit_files.append(state_file) for nonprofit_file in nonprofit_files: # Get data source (local or remote URL) nonprofit_source = get_data_source(nonprofit_file, use_remote=IS_HF_SPACES) try: logger.info(f"Searching nonprofit officers: {nonprofit_source}") officer_sql = """ SELECT name, title, organization_name, city, state, compensation, GREATEST( CASE WHEN LOWER(name) LIKE LOWER(?) THEN 1.5 WHEN LOWER(name) LIKE LOWER(?) THEN 1.0 ELSE 0.0 END, CASE WHEN LOWER(title) LIKE LOWER(?) THEN 1.0 WHEN LOWER(title) LIKE LOWER(?) THEN 0.5 ELSE 0.0 END, CASE WHEN LOWER(organization_name) LIKE LOWER(?) THEN 1.0 WHEN LOWER(organization_name) LIKE LOWER(?) THEN 0.5 ELSE 0.0 END ) AS score FROM read_parquet(?) WHERE (LOWER(name) LIKE LOWER(?) OR LOWER(title) LIKE LOWER(?) OR LOWER(organization_name) LIKE LOWER(?)) """ query_pattern = f'%{query}%' query_start = f'{query}%' params = [ query_start, query_pattern, # name scoring query_start, query_pattern, # title scoring query_start, query_pattern, # organization scoring nonprofit_source, # file path or URL query_pattern, query_pattern, query_pattern # WHERE clause ] if state: officer_sql += " AND LOWER(state) = LOWER(?)" params.append(state) officer_sql += " ORDER BY score DESC, compensation DESC NULLS LAST LIMIT ?" params.append(limit) officer_rows = conn.execute(officer_sql, params).fetchall() for row in officer_rows: name, title, org_name, city, state_code, compensation, score = row if score > 0.3: comp_text = f" (${compensation:,.0f})" if compensation else "" results.append(SearchResult( result_type="contact", title=name if name else "Unknown", subtitle=f"{title} - {org_name}{comp_text}", description=f"Nonprofit officer in {city}, {state_code}", url=f"/nonprofits?name={org_name.replace(' ', '-') if org_name else 'unknown'}", score=score, metadata={ "title": title, "organization": org_name, "city": city, "state": state_code, "compensation": compensation, "contact_type": "nonprofit_officer" } )) logger.info(f"Found {len([r for r in results if r.metadata.get('contact_type') == 'nonprofit_officer'])} nonprofit officer results") except Exception as e: logger.debug(f"Error searching nonprofit officers: {e}") conn.close() # Sort all results by score and limit results.sort(key=lambda x: x.score, reverse=True) logger.info(f"DuckDB search found {len(results)} contacts for query '{query}'") return results[:limit] except Exception as e: logger.error(f"Contact search error: {e}") return results def search_contacts(query: str, state: Optional[str] = None, limit: int = 10) -> List[SearchResult]: """ HYBRID SEARCH: Search local officials AND nonprofit officers. Strategy: 1. Try HuggingFace Search API first (fast, server-side indexed) - HF Spaces only 2. Fall back to DuckDB (local files or remote HTTP parquet) Args: query: Search text (name, title, organization, etc.) state: Optional 2-letter state code filter limit: Maximum results to return Returns: List of SearchResult objects sorted by relevance """ logger.info(f"🔎 search_contacts() called - query={query!r}, state={state!r}, limit={limit}, IS_HF_SPACES={IS_HF_SPACES}") # STRATEGY 1: Try HuggingFace Search API (fast text search) if query and IS_HF_SPACES: logger.info(f"🔍 Trying HF Search API for '{query}' (state={state})") try: hf_results = search_contacts_hf(query, state, limit=limit) if hf_results: logger.info(f"✅ HF Search API returned {len(hf_results)} results") # Convert HF results to SearchResult objects results = [] for row in hf_results: source_type = row.get('source', 'contact') name = row.get('name', 'Unknown') title = row.get('title', '') jurisdiction = row.get('jurisdiction', row.get('organization_name', '')) state_code = row.get('state', state or '') results.append(SearchResult( result_type="contact", title=name, subtitle=f"{title} - {jurisdiction}, {state_code}", description=f"{'Local official' if source_type == 'local_officials' else 'Nonprofit officer'} in {jurisdiction}", url=f"/people/{name.replace(' ', '-')}", score=1.0, metadata={ "title": title, "jurisdiction": jurisdiction, "state": state_code, "name": name, "source": source_type } )) return results[:limit] except Exception as e: logger.warning(f"HF Search API failed, falling back to DuckDB: {e}") # STRATEGY 2: Fall back to DuckDB (works with local or remote parquet) logger.info(f"🔍 Using DuckDB {'remote' if IS_HF_SPACES else 'local'} search for '{query}'") return search_contacts_duckdb(query, state, limit) def search_meetings(query: str, state: Optional[str] = None, limit: int = 10) -> List[SearchResult]: """Search meeting transcripts and agendas""" results = [] try: # Search state event/meeting files (try new naming first, fallback to old) if state: meeting_files = list(GOLD_DIR.glob(f"states/{state}/events.parquet")) if not meeting_files: meeting_files = list(GOLD_DIR.glob(f"states/{state}/events_events.parquet")) if not meeting_files: meeting_files = list(GOLD_DIR.glob(f"states/{state}/meetings.parquet")) else: meeting_files = list(GOLD_DIR.glob("states/*/events.parquet")) if not meeting_files: meeting_files = list(GOLD_DIR.glob("states/*/events_events.parquet")) if not meeting_files: meeting_files = list(GOLD_DIR.glob("states/*/meetings.parquet")) for file_path in meeting_files[:5]: # Limit for performance try: df = pd.read_parquet(file_path) state_code = file_path.parent.name # Detect schema - different files have different column names columns = set(df.columns) # Map column names (handle LocalView vs CityScrapers vs other formats) title_col = 'vid_title' if 'vid_title' in columns else ('event_title' if 'event_title' in columns else 'title') body_col = 'caption_text_clean' if 'caption_text_clean' in columns else ('caption_text' if 'caption_text' in columns else ('full_text' if 'full_text' in columns else 'body')) jurisdiction_col = 'place_name' if 'place_name' in columns else ('jurisdiction_name' if 'jurisdiction_name' in columns else 'jurisdiction') date_col = 'meeting_date' if 'meeting_date' in columns else 'date' id_col = 'vid_id' if 'vid_id' in columns else ('meeting_id' if 'meeting_id' in columns else 'id') # Search in title, body, jurisdiction for _, row in df.iterrows(): title = str(row.get(title_col, '')) body = str(row.get(body_col, ''))[:500] # First 500 chars jurisdiction = str(row.get(jurisdiction_col, '')) meeting_date = str(row.get(date_col, '')) meeting_id = str(row.get(id_col, '')) score = max( calculate_relevance_score(title, query), calculate_relevance_score(body, query) * 0.8, # Body matches slightly lower calculate_relevance_score(jurisdiction, query) * 0.6 ) if score > 0.3: # Extract snippet around query snippet = body[:200] + "..." if len(body) > 200 else body results.append(SearchResult( result_type="meeting", title=title, subtitle=f"{jurisdiction}, {state_code} - {meeting_date}", description=snippet, url=f"/documents?meeting_id={meeting_id}", score=score, metadata={ "jurisdiction": jurisdiction, "state": state_code, "date": meeting_date, "meeting_id": meeting_id } )) except Exception as e: logger.debug(f"Error searching {file_path}: {e}") except Exception as e: logger.error(f"Meeting search error: {e}") results.sort(key=lambda x: x.score, reverse=True) return results[:limit] def count_organizations(state: Optional[str] = None, ntee_code: Optional[str] = None, query: Optional[str] = None) -> int: """Count total organizations matching criteria (for pagination) - cached""" # Create cache key cache_key = f"count_{state}_{ntee_code}_{query}" # Check cache (1 hour TTL) now = datetime.now() if cache_key in _count_cache: cached_time = _count_cache_ttl.get(cache_key) if cached_time and (now - cached_time).total_seconds() < 3600: return _count_cache[cache_key] try: # Determine file path if state: file_pattern = f"{GOLD_DIR}/states/{state}/nonprofits_organizations.parquet" else: file_pattern = f"{GOLD_DIR}/national/nonprofits_organizations.parquet" file_path = Path(file_pattern) if not file_path.exists(): return 0 conn = duckdb.connect() # Detect schema columns_query = f"DESCRIBE SELECT * FROM '{file_path}' LIMIT 0" available_columns = set([row[0] for row in conn.execute(columns_query).fetchall()]) name_col = 'organization_name' if 'organization_name' in available_columns else 'name' ntee_col = 'ntee_code' if 'ntee_code' in available_columns else 'ntee_cd' # Build WHERE clause where_clauses = [] params = [] if query and query.strip(): where_clauses.append(f"LOWER({name_col}) LIKE LOWER(?)") params.append(f'%{query}%') if ntee_code and ntee_col in available_columns: where_clauses.append(f"{ntee_col} LIKE ?") params.append(f'{ntee_code}%') where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" # Count query count_sql = f"SELECT COUNT(*) FROM '{data_source}' WHERE {where_sql}" result = conn.execute(count_sql, params).fetchone() conn.close() count = result[0] if result else 0 # Cache the result _count_cache[cache_key] = count _count_cache_ttl[cache_key] = now return count except Exception as e: logger.error(f"Count error: {e}") return 0 def search_organizations(query: str, state: Optional[str] = None, ntee_code: Optional[str] = None, limit: int = 10, offset: int = 0, enrich: bool = False, sort: str = 'relevance', ein: Optional[str] = None) -> List[SearchResult]: """Search nonprofit organizations using DuckDB for fast Parquet queries Args: enrich: If True, fetch additional data from Every.org API (slower) sort: Sort order - 'relevance', 'name-asc', 'name-desc', 'revenue-asc', 'revenue-desc', 'assets-asc', 'assets-desc' ein: If provided, filter to exact EIN match (for direct organization links) """ results = [] try: # Determine file path if state: file_pattern = f"{GOLD_DIR}/states/{state}/nonprofits_organizations.parquet" else: file_pattern = f"{GOLD_DIR}/national/nonprofits_organizations.parquet" # Get data source (local or remote HuggingFace URL) file_path = Path(file_pattern) data_source = get_data_source(file_path, use_remote=IS_HF_SPACES) # Load parquet with caching (speeds up from 2-3s to <10ms) df = load_parquet_cached(data_source) # Initialize DuckDB connection conn = duckdb.connect() # Query the DataFrame directly (DuckDB can query pandas DataFrames) available_columns = set(df.columns) # Detect column name variations (handle different schemas) name_col = 'organization_name' if 'organization_name' in available_columns else 'name' ntee_col = 'ntee_code' if 'ntee_code' in available_columns else 'ntee_cd' revenue_col = 'form_990_total_revenue' if 'form_990_total_revenue' in available_columns else 'revenue_amt' asset_col = 'form_990_total_assets' if 'form_990_total_assets' in available_columns else 'asset_amt' income_col = 'form_990_net_income' if 'form_990_net_income' in available_columns else 'income_amt' # Build WHERE clauses using detected column names where_clauses = [] params = [] # EIN filter (exact match - highest priority for direct organization links) if ein and ein.strip(): where_clauses.append("ein = ?") params.append(ein.strip()) # Search query (case-insensitive LIKE) - only if query provided and no EIN if query and query.strip() and not ein: where_clauses.append(f"LOWER({name_col}) LIKE LOWER(?)") params.append(f'%{query}%') # State filter (if using national file) if state and not file_pattern.startswith(f"{GOLD_DIR}/states/"): where_clauses.append("state = ?") params.append(state) # NTEE code filter if ntee_code and ntee_col in available_columns: where_clauses.append(f"{ntee_col} LIKE ?") params.append(f'{ntee_code}%') # Default to TRUE if no filters (browse all) where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" # Build column list with proper NULL handling for missing columns select_columns = [] # Add core columns (with aliases for consistency) select_columns.append(f'{name_col} as name') select_columns.append('city') select_columns.append('state') select_columns.append(f'{ntee_col} as ntee_cd' if ntee_col in available_columns else 'NULL as ntee_cd') select_columns.append('ein') select_columns.append(f'{revenue_col} as revenue_amt' if revenue_col in available_columns else 'NULL as revenue_amt') select_columns.append(f'{asset_col} as asset_amt' if asset_col in available_columns else 'NULL as asset_amt') select_columns.append(f'{income_col} as income_amt' if income_col in available_columns else 'NULL as income_amt') select_columns.append('tax_period' if 'tax_period' in available_columns else 'NULL as tax_period') # Track enrichment columns (form_990 and bigquery) enrichment_cols = [] enrichment_col_map = {} # Check for website columns (multiple possible names) - ALWAYS add if exists website_col_added = False for col_name in ['bigquery_website', 'form_990_website', 'website', 'everyorg_website']: if col_name in available_columns: select_columns.append(f'{col_name} as enrichment_website') enrichment_cols.append('enrichment_website') enrichment_col_map['enrichment_website'] = col_name website_col_added = True logger.debug(f"Added website column: {col_name}") break # Check for mission columns mission_col_added = False for col_name in ['bigquery_mission', 'form_990_mission', 'mission', 'everyorg_mission']: if col_name in available_columns: select_columns.append(f'{col_name} as enrichment_mission') enrichment_cols.append('enrichment_mission') enrichment_col_map['enrichment_mission'] = col_name mission_col_added = True logger.debug(f"Added mission column: {col_name}") break # Check for logo columns logo_col_added = False for col_name in ['logodev_logo_url', 'everyorg_logo_url', 'logo_url']: if col_name in available_columns: select_columns.append(f'{col_name} as enrichment_logo') enrichment_cols.append('enrichment_logo') enrichment_col_map['enrichment_logo'] = col_name logo_col_added = True logger.debug(f"Added logo column: {col_name}") break # Last updated timestamp for col_name in ['bigquery_updated_date', 'form_990_last_updated', 'everyorg_last_updated']: if col_name in available_columns: select_columns.append(f'{col_name} as enrichment_last_updated') enrichment_cols.append('enrichment_last_updated') enrichment_col_map['enrichment_last_updated'] = col_name logger.debug(f"Added timestamp column: {col_name}") break columns_sql = ', '.join(select_columns) # Log what we're selecting logger.info(f"🔍 Enrichment columns to select: {enrichment_cols}") logger.info(f"📋 Full SQL columns: {columns_sql}") # Build ORDER BY clause based on sort parameter order_by_clauses = [] if sort == 'name-asc': order_by_clauses.append(f"{name_col} ASC") elif sort == 'name-desc': order_by_clauses.append(f"{name_col} DESC") elif sort == 'revenue-desc': order_by_clauses.append(f"COALESCE(TRY_CAST({revenue_col} AS BIGINT), 0) DESC") elif sort == 'revenue-asc': # Low to high: Show positive values first (smallest to largest), then zeros, then negatives order_by_clauses.append(f""" CASE WHEN TRY_CAST({revenue_col} AS BIGINT) IS NULL THEN 3 WHEN TRY_CAST({revenue_col} AS BIGINT) <= 0 THEN 2 ELSE 1 END ASC, ABS(COALESCE(TRY_CAST({revenue_col} AS BIGINT), 0)) ASC """) elif sort == 'assets-desc': order_by_clauses.append(f"COALESCE(TRY_CAST({asset_col} AS BIGINT), 0) DESC") elif sort == 'assets-asc': # Low to high: Show positive values first (smallest to largest), then zeros, then negatives order_by_clauses.append(f""" CASE WHEN TRY_CAST({asset_col} AS BIGINT) IS NULL THEN 3 WHEN TRY_CAST({asset_col} AS BIGINT) <= 0 THEN 2 ELSE 1 END ASC, ABS(COALESCE(TRY_CAST({asset_col} AS BIGINT), 0)) ASC """) elif query and query.strip(): # Relevance sort (only for search mode) order_by_clauses.append("score DESC") order_by_clauses.append(f"COALESCE(TRY_CAST({revenue_col} AS BIGINT), 0) DESC") else: # Default browse mode: sort by revenue/assets order_by_clauses.append(f"COALESCE(TRY_CAST({revenue_col} AS BIGINT), 0) DESC") order_by_clauses.append(f"COALESCE(TRY_CAST({asset_col} AS BIGINT), 0) DESC") # Always add name as final sort for consistency if 'name' not in sort: order_by_clauses.append(f"{name_col}") order_by_sql = ', '.join(order_by_clauses) # SQL query with relevance scoring (browse mode if no query) if query and query.strip(): # Search mode: score by text match sql = f""" SELECT {columns_sql}, CASE WHEN LOWER({name_col}) LIKE LOWER(?) THEN 1.5 WHEN LOWER({name_col}) LIKE LOWER(?) THEN 1.0 ELSE 0.5 END as score FROM df WHERE {where_sql} ORDER BY {order_by_sql} LIMIT ? OFFSET ? """ # Execute query with scoring parameters query_params = [f'{query}%', f'%{query}%'] + params + [limit, offset] else: # Browse mode: sort by size/activity sql = f""" SELECT {columns_sql}, 1.0 as score FROM df WHERE {where_sql} ORDER BY {order_by_sql} LIMIT ? OFFSET ? """ # Execute query without scoring parameters query_params = params + [limit, offset] rows = conn.execute(sql, query_params).fetchall() # NTEE code descriptions for better context ntee_descriptions = { 'E': 'Health Services', 'E60': 'Health Support Services', 'E61': 'Blood Supply', 'E62': 'Emergency Medical Services', 'E65': 'Organ & Tissue Banks', 'E70': 'Public Health', 'E80': 'Health Treatment - Primary Care', 'E90': 'Nursing Services', 'E20': 'Hospitals & Primary Medical Care', 'E30': 'Ambulatory & Primary Health Care', 'E32': 'Clinics & Community Health Centers', 'P': 'Human Services', 'B': 'Education', 'X': 'Religion-Related', 'A': 'Arts, Culture & Humanities', } # Convert to SearchResult objects with intelligent enrichment for row in rows: # Unpack base columns (now includes tax_period) org_name, city, state_code, ntee, ein, revenue, assets, income, tax_period = row[:9] # Unpack optional enrichment columns if present existing_data = {} idx = 9 if 'enrichment_website' in enrichment_cols: existing_data['enrichment_website'] = row[idx] # Only log non-null websites to reduce spam if row[idx] and str(row[idx]) != 'nan': logger.debug(f"✅ Website: {row[idx]}") idx += 1 if 'enrichment_mission' in enrichment_cols: existing_data['enrichment_mission'] = row[idx] idx += 1 if 'enrichment_logo' in enrichment_cols: existing_data['enrichment_logo'] = row[idx] idx += 1 if 'enrichment_last_updated' in enrichment_cols: existing_data['enrichment_last_updated'] = row[idx] idx += 1 score = row[-1] # Score is always last # Parse tax year from tax_period (format: YYYYMM) tax_year = None if tax_period and str(tax_period).isdigit() and len(str(tax_period)) >= 4: tax_year = int(str(tax_period)[:4]) # Get enriched data with intelligent backfill (only if requested) enrichment = get_enrichment_data(ein, existing_data) if (ein and enrich) else {} # Build a more informative description ntee_desc = None if ntee: # Try exact match first, then prefix match ntee_desc = ntee_descriptions.get(ntee) if not ntee_desc: # Try first character (major category) ntee_desc = ntee_descriptions.get(ntee[0]) if ntee else None # Use enriched mission as primary description, fallback to NTEE + financial description = enrichment.get('mission') if enrichment.get('mission') else None # Validate mission: if it contains a different org name, it's stale data if description and org_name: # Check if mission mentions a completely different org name # (e.g., "Catalyst Institute" when org name is "CAREQUEST INSTITUTE") mission_lower = description.lower() name_words = set(org_name.lower().split()) # If mission starts with an org name that's not in our actual org name, skip it first_sentence = description.split('.')[0].lower() if ' is a nonprofit' in first_sentence or ' is an nonprofit' in first_sentence: # Extract the subject (organization name before "is a nonprofit") subject = first_sentence.split(' is a')[0].strip() subject_words = set(subject.split()) # If the subject shares NO significant words with our org name, it's stale # (e.g., "catalyst institute" vs "carequest institute") significant_words = subject_words - {'the', 'a', 'an', 'of', 'for', 'and', 'inc', 'llc'} name_significant = name_words - {'the', 'a', 'an', 'of', 'for', 'and', 'inc', 'llc', 'institute'} if significant_words and not (significant_words & name_significant): # Stale data - mission talks about a different org logger.warning(f"Stale mission data for {org_name}: '{subject}' != '{org_name}'") description = None if not description: description_parts = [] if ntee_desc: description_parts.append(ntee_desc) # Convert financial data to numbers (handle None and string types) try: revenue_num = float(revenue) if revenue else 0 assets_num = float(assets) if assets else 0 except (ValueError, TypeError): revenue_num = 0 assets_num = 0 if revenue_num > 0: description_parts.append(f"Revenue: ${revenue_num:,.0f}") elif assets_num > 0: description_parts.append(f"Assets: ${assets_num:,.0f}") if not description_parts: description_parts.append(f"Nonprofit serving {city}") description = " • ".join(description_parts) # Build metadata with enriched fields metadata = { "ein": ein, "city": city, "state": state_code, "ntee_code": ntee, "revenue": revenue, "assets": assets, "income": income, "tax_year": tax_year, "data_sources": [] } # ALWAYS add enrichment from parquet columns (existing_data) - no enrich flag needed if existing_data.get('enrichment_website'): metadata['website'] = existing_data['enrichment_website'] metadata['data_sources'].append('cached') if existing_data.get('enrichment_mission'): metadata['mission'] = existing_data['enrichment_mission'] if 'cached' not in metadata['data_sources']: metadata['data_sources'].append('cached') if existing_data.get('enrichment_logo'): metadata['logo_url'] = existing_data['enrichment_logo'] if 'cached' not in metadata['data_sources']: metadata['data_sources'].append('cached') # Add API enrichment if requested (enrich=true) if enrichment: if enrichment.get('website') and not metadata.get('website'): metadata['website'] = enrichment['website'] if enrichment.get('logo_url'): metadata['logo_url'] = enrichment['logo_url'] if enrichment.get('profile_url'): metadata['profile_url'] = enrichment['profile_url'] if enrichment.get('causes'): metadata['causes'] = enrichment['causes'] # Add API data sources for source in enrichment.get('data_sources', []): if source not in metadata['data_sources']: metadata['data_sources'].append(source) results.append(SearchResult( result_type="organization", title=org_name if org_name else "Unknown", subtitle=f"{city}, {state_code}" + (f" - NTEE: {ntee}" if ntee else ""), description=description, url=f"/search?types=organizations&state={state_code}&ein={ein}", score=score, metadata=metadata )) conn.close() logger.info(f"DuckDB search found {len(results)} organizations for query '{query}'") except Exception as e: logger.error(f"Organization search error: {e}") return results def search_causes(query: str, limit: int = 10) -> List[SearchResult]: """Search causes and NTEE categories - supports browse mode""" results = [] try: # Get data source (local or remote HuggingFace URL) ntee_file = GOLD_DIR / "reference" / "causes_ntee_codes.parquet" data_source = get_data_source(ntee_file, use_remote=IS_HF_SPACES) # Load with caching df = load_parquet_cached(data_source) logger.debug(f"Loaded {len(df)} NTEE codes from cache") for _, row in df.iterrows(): code = str(row.get('ntee_code', '')) description = str(row.get('description', '')) ntee_type = str(row.get('ntee_type', '')) # Browse mode: return all causes # Search mode: filter by relevance if query and query.strip(): score = max( calculate_relevance_score(description, query), calculate_relevance_score(code, query) ) if score <= 0.3: continue # Skip low relevance results else: score = 1.0 # Default score for browse mode results.append(SearchResult( result_type="cause", title=description, subtitle=f"NTEE Code: {code}", description=f"Category type: {ntee_type}", url=f"/nonprofits?ntee_code={code}", score=score, metadata={ "ntee_code": code, "ntee_type": ntee_type } )) logger.info(f"Found {len(results)} cause results for query '{query}'") except Exception as e: logger.error(f"Cause search error: {e}") results.sort(key=lambda x: x.score, reverse=True) return results[:limit] def search_jurisdictions(query: str, state: Optional[str] = None, city: Optional[str] = None, jurisdiction_levels: Optional[List[str]] = None, limit: int = 10, offset: int = 0) -> List[SearchResult]: """Search cities, counties, townships, and school districts using DuckDB""" all_results = [] try: conn = duckdb.connect() # Map frontend level IDs to file keys level_mapping = { 'city': 'city', 'county': 'county', 'town': 'township', 'village': 'township', 'school_district': 'school district', 'special_district': 'school district', # Use school district as proxy 'state': None # States handled separately if needed } # Define jurisdiction files with priority scores jurisdiction_files = { 'county': (f"{GOLD_DIR}/reference/jurisdictions_counties.parquet", 1.3), # Boost counties 'city': (f"{GOLD_DIR}/reference/jurisdictions_cities.parquet", 1.0), 'school district': (f"{GOLD_DIR}/reference/jurisdictions_school_districts.parquet", 1.1), # Boost school districts 'township': (f"{GOLD_DIR}/reference/jurisdictions_townships.parquet", 0.9) } # Filter jurisdiction files based on selected levels if jurisdiction_levels: # Map selected levels to file keys selected_file_keys = set() for level in jurisdiction_levels: file_key = level_mapping.get(level) if file_key: selected_file_keys.add(file_key) # Filter to only selected types if selected_file_keys: jurisdiction_files = { k: v for k, v in jurisdiction_files.items() if k in selected_file_keys } # Fetch enough results from each type to ensure diversity # Even with small limits, we want representation from each type per_type_limit = max(limit, 15) for jtype, (file_path, type_score) in jurisdiction_files.items(): file_path_obj = Path(file_path) if not file_path_obj.exists(): continue try: # Build SQL query - use state column (lowercase) where_clauses = [] params = [] if state: where_clauses.append("state = ?") params.append(state) if city and query: # If city is specified, search for jurisdictions matching the city name where_clauses.append("LOWER(NAME) LIKE LOWER(?)") params.append(f"%{city}%") elif query: # General search across jurisdiction names where_clauses.append("LOWER(NAME) LIKE LOWER(?)") params.append(f"%{query}%") where_clause = " AND ".join(where_clauses) if where_clauses else "1=1" # Calculate name match score if query provided score_expr = f"{type_score}" if query: score_expr = f"""CASE WHEN LOWER(NAME) = LOWER('{query}') THEN {type_score} * 2.0 WHEN LOWER(NAME) LIKE LOWER('{query}%') THEN {type_score} * 1.5 ELSE {type_score} END""" sql = f""" SELECT NAME as name, state, GEOID as geoid, jurisdiction_type, {score_expr} as score FROM read_parquet(?) WHERE {where_clause} ORDER BY score DESC, NAME ASC LIMIT ? """ query_params = [str(file_path_obj)] + params + [per_type_limit] df = conn.execute(sql, query_params).fetchdf() for _, row in df.iterrows(): jurisdiction_label = row['jurisdiction_type'].replace('_', ' ').title() all_results.append(SearchResult( result_type='jurisdiction', title=f"{row['name']}", subtitle=f"{jurisdiction_label}", description=f"{jurisdiction_label} in {row['state']}", url=f"/jurisdictions/{row['geoid']}", score=float(row['score']), metadata={ 'state': row['state'], 'geoid': row['geoid'], 'type': row['jurisdiction_type'] } )) except Exception as e: logger.error(f"Error searching {jtype} jurisdictions: {e}") continue except Exception as e: logger.error(f"Jurisdiction search error: {e}") # Sort all results by score, then apply pagination all_results.sort(key=lambda x: (x.score, x.title), reverse=True) return all_results[offset:offset + limit] @router.get("/api/search") @router.get("/api/search/", include_in_schema=False) async def unified_search( q: Optional[str] = Query(None, description="Search query (optional - browse by filters if omitted)"), types: Optional[str] = Query(None, description="Comma-separated result types: contacts,meetings,organizations,causes,jurisdictions"), state: Optional[str] = Query(None, description="Filter by state (2-letter code)"), city: Optional[str] = Query(None, description="Filter by city name"), jurisdiction_levels: Optional[str] = Query(None, description="Comma-separated jurisdiction levels: city,county,town,village,school_district,special_district,state"), ntee_code: Optional[str] = Query(None, description="Filter organizations by NTEE code"), ein: Optional[str] = Query(None, description="Filter organizations by exact EIN (for direct organization links)"), limit: int = Query(20, ge=1, le=100, description="Maximum results per type"), offset: int = Query(0, ge=0, description="Number of results to skip (for pagination)"), page: int = Query(1, ge=1, description="Page number (alternative to offset)"), enrich: bool = Query(False, description="Enable API enrichment (slower - fetches logos, causes from Every.org)"), sort: str = Query('relevance', description="Sort order: relevance, name-asc, name-desc, revenue-asc, revenue-desc, assets-asc, assets-desc") ): """ Unified search across all data types Search for contacts, meetings, organizations, and causes in one query. **NEW:** Query is now optional - you can browse by state/type without searching! **Pagination:** - Use `offset` to skip results: `offset=20` skips first 20 results - Or use `page` with `limit`: `page=2&limit=20` gets results 21-40 - `offset` takes precedence if both are provided **Examples:** - `/api/search?q=dental` - Search everything for "dental" - `/api/search?types=organizations&state=GA` - Browse all orgs in Georgia - `/api/search?q=budget&types=meetings` - Search only meetings - `/api/search?q=health&state=AL` - Search in Alabama only - `/api/search?q=education&types=organizations,causes` - Search orgs and causes - `/api/search?q=health&state=MA&page=2&limit=20` - Page 2 of MA health results """ # 🔍 DEBUG LOGGING - Log all incoming request parameters logger.info(f"🔍 SEARCH REQUEST: q={q!r}, types={types!r}, state={state!r}, city={city!r}, jurisdiction_levels={jurisdiction_levels!r}, ntee_code={ntee_code!r}, ein={ein!r}, limit={limit}, offset={offset}, page={page}, enrich={enrich}, sort={sort!r}") try: # Calculate offset from page if offset not explicitly provided if offset == 0 and page > 1: offset = (page - 1) * limit # Parse requested types if types: requested_types = [t.strip() for t in types.split(',')] else: requested_types = ['contacts', 'meetings', 'organizations', 'causes', 'jurisdictions'] # Parse jurisdiction levels if provided jurisdiction_levels_list = None if jurisdiction_levels: jurisdiction_levels_list = [level.strip() for level in jurisdiction_levels.split(',')] logger.info(f"📋 Requested types: {requested_types}, calculated offset: {offset}") all_results = [] # Optimize for single-type browse mode (no query) # Let database handle pagination for efficiency use_db_pagination = not q and len(requested_types) == 1 if use_db_pagination: # Single-type browse: pass offset to DB for efficient pagination search_limit = limit search_offset = offset else: # Multi-type or search mode: fetch extra for mixing/sorting search_limit = offset + limit + 100 search_offset = 0 if 'contacts' in requested_types: # Use PostgreSQL for fast indexed search contact_results_pg = await search_postgres.search_contacts_pg(q, state, limit=search_limit) contact_results = [convert_pg_result(r) for r in contact_results_pg] logger.info(f"👤 Contacts search returned {len(contact_results)} results") all_results.extend(contact_results) if 'meetings' in requested_types: # Use PostgreSQL for fast indexed search meeting_results_pg = await search_postgres.search_events_pg(q, state, limit=search_limit) meeting_results = [convert_pg_result(r) for r in meeting_results_pg] logger.info(f"📅 Meetings search returned {len(meeting_results)} results") all_results.extend(meeting_results) if 'organizations' in requested_types: # Use PostgreSQL for fast indexed search org_results_pg = await search_postgres.search_organizations_pg(q, state, ntee_code, ein, limit=search_limit, offset=search_offset, sort=sort) org_results = [convert_pg_result(r) for r in org_results_pg] logger.info(f"🏢 Organizations search returned {len(org_results)} results") all_results.extend(org_results) if 'causes' in requested_types: cause_results = search_causes(q or "", limit=search_limit) logger.info(f"🎯 Causes search returned {len(cause_results)} results") all_results.extend(cause_results) if 'jurisdictions' in requested_types: # Use PostgreSQL for fast indexed search jurisdiction_results_pg = await search_postgres.search_jurisdictions_pg(q, state, city, jurisdiction_levels_list, limit=search_limit, offset=search_offset) jurisdiction_results = [convert_pg_result(r) for r in jurisdiction_results_pg] logger.info(f"🏛️ Jurisdictions search returned {len(jurisdiction_results)} results") all_results.extend(jurisdiction_results) # Sort all results by score all_results.sort(key=lambda x: x.score, reverse=True) logger.info(f"📊 Total combined results: {len(all_results)}, applying pagination (offset={offset}, limit={limit})") # Apply pagination if use_db_pagination: # DB already paginated - use all results paginated_results = all_results else: # Paginate in-memory from combined results paginated_results = all_results[offset:offset + limit] logger.info(f"✂️ Paginated results: {len(paginated_results)} items") # Group by type for response grouped_results = { 'contacts': [r.to_dict() for r in paginated_results if r.result_type == 'contact'], 'meetings': [r.to_dict() for r in paginated_results if r.result_type == 'meeting'], 'organizations': [r.to_dict() for r in paginated_results if r.result_type == 'organization'], 'causes': [r.to_dict() for r in paginated_results if r.result_type == 'cause'], 'jurisdictions': [r.to_dict() for r in paginated_results if r.result_type == 'jurisdiction'], } logger.info(f"📦 Grouped results - contacts:{len(grouped_results['contacts'])}, meetings:{len(grouped_results['meetings'])}, organizations:{len(grouped_results['organizations'])}, causes:{len(grouped_results['causes'])}, jurisdictions:{len(grouped_results['jurisdictions'])}") # Calculate total results per type (from all_results before pagination) type_totals = { 'contacts': len([r for r in all_results if r.result_type == 'contact']), 'meetings': len([r for r in all_results if r.result_type == 'meeting']), 'organizations': len([r for r in all_results if r.result_type == 'organization']), 'causes': len([r for r in all_results if r.result_type == 'cause']), 'jurisdictions': len([r for r in all_results if r.result_type == 'jurisdiction']), } # Calculate total results # For single-type browse mode, get accurate count from database if not q and len(requested_types) == 1: # Browse mode: count total matching records in DB if 'organizations' in requested_types: total_results = count_organizations(state=state, ntee_code=ntee_code, query=q) type_totals['organizations'] = total_results # Use accurate DB count else: # Fallback to fetched results for other types total_results = len(all_results) else: # Search/multi-type mode: use fetched results total_results = len(all_results) total_pages = (total_results + limit - 1) // limit # Ceiling division response_data = { "query": q or "", "total_results": total_results, "type_totals": type_totals, # Add per-type totals "results": grouped_results, "pagination": { "page": page if offset == 0 or offset == (page - 1) * limit else (offset // limit) + 1, "limit": limit, "offset": offset, "total_pages": total_pages, "has_next": offset + limit < total_results, "has_prev": offset > 0 }, "filters": { "state": state, "ntee_code": ntee_code, "types": requested_types, "sort": sort } } logger.info(f"✅ Search complete - returning {total_results} total results, {len(paginated_results)} on this page") return response_data except Exception as e: logger.error(f"❌ Search error: {type(e).__name__}: {e}") logger.exception("Full traceback:") # Parse error into structured response error_detail = parse_error(e, context={ "query": q, "state": state, "types": types, "data_type": "search" }) return JSONResponse( status_code=500, content=error_detail.model_dump() ) @router.get("/api/search/suggest") async def search_suggestions( q: str = Query(..., min_length=1, description="Partial search query"), limit: int = Query(5, ge=1, le=20, description="Maximum suggestions") ): """ Get search suggestions/autocomplete Returns quick suggestions as user types """ try: suggestions = [] # Common search terms common_terms = [ "dental health", "oral health", "affordable housing", "public transit", "school funding", "budget", "water quality", "parks", "zoning", "police", "fire department", "mental health", "food assistance", "senior services", "youth programs", "employment", "job training" ] # Filter suggestions q_lower = q.lower() suggestions = [term for term in common_terms if q_lower in term.lower()] return { "query": q, "suggestions": suggestions[:limit] } except Exception as e: logger.error(f"Suggestion error: {e}") # Parse error into structured response error_detail = parse_error(e, context={ "query": q, "data_type": "suggestions" }) return JSONResponse( status_code=500, content=error_detail.model_dump() )