""" PostgreSQL-based search functions Uses indexed search tables for fast queries (10-100x faster than parquet) """ from typing import Optional, List from loguru import logger import asyncpg import os from datetime import datetime from dataclasses import dataclass # Database configuration # Priority: NEON_DATABASE_URL_DEV (local) > NEON_DATABASE_URL (production) NEON_DATABASE_URL_DEV = os.getenv('NEON_DATABASE_URL_DEV') NEON_DATABASE_URL = os.getenv('NEON_DATABASE_URL') # Use dev database for local development, production database for deployed environments DATABASE_URL = NEON_DATABASE_URL_DEV or NEON_DATABASE_URL # Connection pool (created on first request) _db_pool = None @dataclass class SearchResult: """Search result data class""" result_type: str title: str subtitle: str description: str url: str score: float metadata: dict async def get_db_pool(): """Get or create database connection pool""" global _db_pool if _db_pool is None: if not DATABASE_URL: raise ValueError("DATABASE_URL not configured") db_type = "Development (Local PostgreSQL)" if NEON_DATABASE_URL_DEV else "Production (Neon)" logger.info(f"🗄️ Creating connection pool to {db_type}") _db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=20) return _db_pool async def search_jurisdictions_pg( query: Optional[str] = None, state: Optional[str] = None, city: Optional[str] = None, jurisdiction_levels: Optional[List[str]] = None, limit: int = 10, offset: int = 0 ) -> List[SearchResult]: """ Search jurisdictions using PostgreSQL full-text search Args: query: Search text (jurisdiction name) state: Filter by state code (e.g., 'MA') city: Filter by city name jurisdiction_levels: Filter by types (city, county, town, school_district, etc.) limit: Max results offset: Pagination offset Returns: List of SearchResult objects """ try: pool = await get_db_pool() # Map frontend level IDs to database types level_mapping = { 'city': 'city', 'county': 'county', 'town': 'town', 'village': 'village', 'school_district': 'school_district', 'special_district': 'special_district', 'state': 'state' } # Build SQL query where_clauses = [] params = [] param_idx = 1 has_query = query and query.strip() # Text search filter first (if present) - must be $1 for score calculation score_param_idx = None if has_query: where_clauses.append(f"to_tsvector('english', name) @@ plainto_tsquery('english', ${param_idx})") params.append(query) score_param_idx = param_idx param_idx += 1 # State filter if state: where_clauses.append(f"state = ${param_idx}") params.append(state.upper()) param_idx += 1 # City filter if city: where_clauses.append(f"LOWER(name) LIKE LOWER(${param_idx})") params.append(f"%{city}%") param_idx += 1 # Jurisdiction level filter if jurisdiction_levels: db_types = [level_mapping.get(level) for level in jurisdiction_levels if level_mapping.get(level)] if db_types: placeholders = ','.join([f"${param_idx + i}" for i in range(len(db_types))]) where_clauses.append(f"type IN ({placeholders})") params.extend(db_types) param_idx += len(db_types) # Build final WHERE clause where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" # Select clause and order by if has_query: select_score = f"ts_rank(to_tsvector('english', name), plainto_tsquery('english', ${score_param_idx})) as score" order_by = f"score DESC, name ASC" else: select_score = "1.0 as score" order_by = "name ASC" # Build complete query sql = f""" SELECT name, type, state, county, geoid, population, {select_score} FROM jurisdictions_search WHERE {where_sql} ORDER BY {order_by} LIMIT ${param_idx} OFFSET ${param_idx + 1} """ # Add limit and offset params.append(limit) params.append(offset) async with pool.acquire() as conn: rows = await conn.fetch(sql, *params) results = [] for row in rows: jurisdiction_label = row['type'].replace('_', ' ').title() results.append(SearchResult( result_type='jurisdiction', title=row['name'], subtitle=f"{jurisdiction_label}", description=f"{jurisdiction_label} in {row['state']}" + (f" • Pop: {row['population']:,}" if row['population'] else ""), url=f"/jurisdictions/{row['geoid']}" if row['geoid'] else f"/jurisdictions/{row['name']}", score=float(row.get('score', 1.0)) if query else 1.0, metadata={ 'state': row['state'], 'geoid': row['geoid'], 'type': row['type'], 'county': row['county'], 'population': row['population'] } )) logger.info(f"🏛️ PostgreSQL jurisdictions search: {len(results)} results") return results except Exception as e: logger.error(f"PostgreSQL jurisdictions search error: {e}") return [] async def search_contacts_pg( query: Optional[str] = None, state: Optional[str] = None, limit: int = 10 ) -> List[SearchResult]: """ Search contacts (nonprofit officers, local officials) using PostgreSQL Args: query: Search text (name, title, organization) state: Filter by state code limit: Max results Returns: List of SearchResult objects """ try: pool = await get_db_pool() # Build WHERE clauses where_clauses = [] params = [] param_idx = 1 if state: where_clauses.append(f"state = ${param_idx}") params.append(state.upper()) param_idx += 1 # Text search across name and organization if query and query.strip(): where_clauses.append(f"""( to_tsvector('english', name) @@ plainto_tsquery('english', ${param_idx}) OR to_tsvector('english', COALESCE(organization_name, '')) @@ plainto_tsquery('english', ${param_idx}) OR LOWER(title) LIKE LOWER(${param_idx + 1}) )""") params.append(query) params.append(f"%{query}%") param_idx += 2 # Rank by relevance order_by = f""" GREATEST( ts_rank(to_tsvector('english', name), plainto_tsquery('english', ${param_idx - 2})), ts_rank(to_tsvector('english', COALESCE(organization_name, '')), plainto_tsquery('english', ${param_idx - 2})) ) DESC, name ASC """ else: order_by = "name ASC" where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" sql = f""" SELECT name, title, organization_name, organization_ein, city, state, role_type, compensation, source FROM contacts_search WHERE {where_sql} ORDER BY {order_by} LIMIT ${param_idx} """ params.append(limit) async with pool.acquire() as conn: rows = await conn.fetch(sql, *params) results = [] for row in rows: org_display = row['organization_name'] or 'Unknown Organization' location = f"{row['city']}, {row['state']}" if row['city'] and row['state'] else (row['state'] or '') results.append(SearchResult( result_type='contact', title=row['name'], subtitle=f"{row['title'] or 'Officer'} - {org_display}", description=f"{row['role_type'] or 'Contact'} in {location}", url=f"/people/{row['name'].replace(' ', '-')}", score=1.0, metadata={ 'title': row['title'], 'organization': org_display, 'organization_ein': row['organization_ein'], 'state': row['state'], 'city': row['city'], 'role_type': row['role_type'], 'compensation': row['compensation'], 'source': row['source'] } )) logger.info(f"👤 PostgreSQL contacts search: {len(results)} results") return results except Exception as e: logger.error(f"PostgreSQL contacts search error: {e}") return [] async def search_organizations_pg( query: Optional[str] = None, state: Optional[str] = None, ntee_code: Optional[str] = None, ein: Optional[str] = None, limit: int = 10, offset: int = 0, sort: str = 'relevance' ) -> List[SearchResult]: """ Search nonprofit organizations using PostgreSQL Args: query: Search text (organization name) state: Filter by state code ntee_code: Filter by NTEE code prefix ein: Exact EIN match limit: Max results offset: Pagination offset sort: Sort order (relevance, name-asc, name-desc, revenue-asc, revenue-desc, assets-asc, assets-desc) Returns: List of SearchResult objects """ try: pool = await get_db_pool() # Build WHERE clauses where_clauses = [] params = [] param_idx = 1 # EIN exact match (highest priority) if ein: where_clauses.append(f"ein = ${param_idx}") params.append(ein.strip()) param_idx += 1 # State filter if state: where_clauses.append(f"state = ${param_idx}") params.append(state.upper()) param_idx += 1 # NTEE code filter if ntee_code: where_clauses.append(f"ntee_code LIKE ${param_idx}") params.append(f"{ntee_code}%") param_idx += 1 # Text search (if no EIN specified) if query and query.strip() and not ein: where_clauses.append(f"to_tsvector('english', name) @@ plainto_tsquery('english', ${param_idx})") params.append(query) param_idx += 1 where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" # Determine sort order if sort == 'name-asc': order_by = "name ASC" elif sort == 'name-desc': order_by = "name DESC" elif sort == 'revenue-asc': order_by = "revenue ASC NULLS LAST" elif sort == 'revenue-desc': order_by = "revenue DESC NULLS LAST" elif sort == 'assets-asc': order_by = "assets ASC NULLS LAST" elif sort == 'assets-desc': order_by = "assets DESC NULLS LAST" elif query and query.strip() and not ein: # Relevance ranking for text search order_by = f"ts_rank(to_tsvector('english', name), plainto_tsquery('english', ${param_idx - 1})) DESC, name ASC" else: order_by = "name ASC" sql = f""" SELECT ein, name, city, state, county, ntee_code, ntee_description, revenue, assets, income, tax_period FROM nonprofits_search WHERE {where_sql} ORDER BY {order_by} LIMIT ${param_idx} OFFSET ${param_idx + 1} """ params.append(limit) params.append(offset) async with pool.acquire() as conn: rows = await conn.fetch(sql, *params) results = [] for row in rows: location = f"{row['city']}, {row['state']}" if row['city'] and row['state'] else (row['state'] or '') # Format financials financials = [] if row['revenue']: financials.append(f"Revenue: ${row['revenue']:,}") if row['assets']: financials.append(f"Assets: ${row['assets']:,}") description = f"{row['ntee_description'] or 'Nonprofit organization'}" if financials: description += " • " + " • ".join(financials) results.append(SearchResult( result_type='organization', title=row['name'], subtitle=location, description=description, url=f"/organizations/{row['ein']}", score=1.0, metadata={ 'ein': row['ein'], 'state': row['state'], 'city': row['city'], 'county': row['county'], 'ntee_code': row['ntee_code'], 'ntee_description': row['ntee_description'], 'revenue': row['revenue'], 'assets': row['assets'], 'income': row['income'], 'tax_period': row['tax_period'] } )) logger.info(f"🏢 PostgreSQL organizations search: {len(results)} results") return results except Exception as e: logger.error(f"PostgreSQL organizations search error: {e}") return [] async def search_events_pg( query: Optional[str] = None, state: Optional[str] = None, limit: int = 10 ) -> List[SearchResult]: """ Search meetings/events using PostgreSQL Args: query: Search text (title, jurisdiction, description) state: Filter by state code limit: Max results Returns: List of SearchResult objects """ try: pool = await get_db_pool() # Build WHERE clauses where_clauses = [] params = [] param_idx = 1 if state: where_clauses.append(f"state = ${param_idx}") params.append(state.upper()) param_idx += 1 # Text search if query and query.strip(): where_clauses.append(f"""( to_tsvector('english', title) @@ plainto_tsquery('english', ${param_idx}) OR LOWER(jurisdiction_name) LIKE LOWER(${param_idx + 1}) )""") params.append(query) params.append(f"%{query}%") param_idx += 2 order_by = f"ts_rank(to_tsvector('english', title), plainto_tsquery('english', ${param_idx - 2})) DESC, event_date DESC" else: order_by = "event_date DESC" where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" sql = f""" SELECT id, title, description, event_date, jurisdiction_name, jurisdiction_type, state, city, video_url, agenda_url FROM events_search WHERE {where_sql} ORDER BY {order_by} LIMIT ${param_idx} """ params.append(limit) async with pool.acquire() as conn: rows = await conn.fetch(sql, *params) results = [] for row in rows: location = f"{row['jurisdiction_name']}, {row['state']}" if row['jurisdiction_name'] and row['state'] else '' date_str = row['event_date'].strftime('%Y-%m-%d') if row['event_date'] else '' description = (row['description'] or '')[:200] if len(description) == 200: description += "..." results.append(SearchResult( result_type='meeting', title=row['title'], subtitle=f"{location} - {date_str}", description=description, url=f"/documents?meeting_id={row['id']}", score=1.0, metadata={ 'jurisdiction': row['jurisdiction_name'], 'jurisdiction_type': row['jurisdiction_type'], 'state': row['state'], 'city': row['city'], 'date': date_str, 'meeting_id': row['id'], 'video_url': row['video_url'], 'agenda_url': row['agenda_url'] } )) logger.info(f"📅 PostgreSQL events search: {len(results)} results") return results except Exception as e: logger.error(f"PostgreSQL events search error: {e}") return []