Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| PostgreSQL-based search functions | |
| Uses indexed search tables for fast queries (10-100x faster than parquet) | |
| """ | |
| from typing import Optional, List | |
| from loguru import logger | |
| import asyncpg | |
| import os | |
| from datetime import datetime | |
| from dataclasses import dataclass | |
| # Database configuration | |
| # Priority: NEON_DATABASE_URL_DEV (local) > NEON_DATABASE_URL (production) | |
| NEON_DATABASE_URL_DEV = os.getenv('NEON_DATABASE_URL_DEV') | |
| NEON_DATABASE_URL = os.getenv('NEON_DATABASE_URL') | |
| # Use dev database for local development, production database for deployed environments | |
| DATABASE_URL = NEON_DATABASE_URL_DEV or NEON_DATABASE_URL | |
| # Connection pool (created on first request) | |
| _db_pool = None | |
| class SearchResult: | |
| """Search result data class""" | |
| result_type: str | |
| title: str | |
| subtitle: str | |
| description: str | |
| url: str | |
| score: float | |
| metadata: dict | |
| async def get_db_pool(): | |
| """Get or create database connection pool""" | |
| global _db_pool | |
| if _db_pool is None: | |
| if not DATABASE_URL: | |
| raise ValueError("DATABASE_URL not configured") | |
| db_type = "Development (Local PostgreSQL)" if NEON_DATABASE_URL_DEV else "Production (Neon)" | |
| logger.info(f"🗄️ Creating connection pool to {db_type}") | |
| _db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=20) | |
| return _db_pool | |
| async def search_jurisdictions_pg( | |
| query: Optional[str] = None, | |
| state: Optional[str] = None, | |
| city: Optional[str] = None, | |
| jurisdiction_levels: Optional[List[str]] = None, | |
| limit: int = 10, | |
| offset: int = 0 | |
| ) -> List[SearchResult]: | |
| """ | |
| Search jurisdictions using PostgreSQL full-text search | |
| Args: | |
| query: Search text (jurisdiction name) | |
| state: Filter by state code (e.g., 'MA') | |
| city: Filter by city name | |
| jurisdiction_levels: Filter by types (city, county, town, school_district, etc.) | |
| limit: Max results | |
| offset: Pagination offset | |
| Returns: | |
| List of SearchResult objects | |
| """ | |
| try: | |
| pool = await get_db_pool() | |
| # Map frontend level IDs to database types | |
| level_mapping = { | |
| 'city': 'city', | |
| 'county': 'county', | |
| 'town': 'town', | |
| 'village': 'village', | |
| 'school_district': 'school_district', | |
| 'special_district': 'special_district', | |
| 'state': 'state' | |
| } | |
| # Build SQL query | |
| where_clauses = [] | |
| params = [] | |
| param_idx = 1 | |
| has_query = query and query.strip() | |
| # Text search filter first (if present) - must be $1 for score calculation | |
| score_param_idx = None | |
| if has_query: | |
| where_clauses.append(f"to_tsvector('english', name) @@ plainto_tsquery('english', ${param_idx})") | |
| params.append(query) | |
| score_param_idx = param_idx | |
| param_idx += 1 | |
| # State filter | |
| if state: | |
| where_clauses.append(f"state = ${param_idx}") | |
| params.append(state.upper()) | |
| param_idx += 1 | |
| # City filter | |
| if city: | |
| where_clauses.append(f"LOWER(name) LIKE LOWER(${param_idx})") | |
| params.append(f"%{city}%") | |
| param_idx += 1 | |
| # Jurisdiction level filter | |
| if jurisdiction_levels: | |
| db_types = [level_mapping.get(level) for level in jurisdiction_levels if level_mapping.get(level)] | |
| if db_types: | |
| placeholders = ','.join([f"${param_idx + i}" for i in range(len(db_types))]) | |
| where_clauses.append(f"type IN ({placeholders})") | |
| params.extend(db_types) | |
| param_idx += len(db_types) | |
| # Build final WHERE clause | |
| where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" | |
| # Select clause and order by | |
| if has_query: | |
| select_score = f"ts_rank(to_tsvector('english', name), plainto_tsquery('english', ${score_param_idx})) as score" | |
| order_by = f"score DESC, name ASC" | |
| else: | |
| select_score = "1.0 as score" | |
| order_by = "name ASC" | |
| # Build complete query | |
| sql = f""" | |
| SELECT | |
| name, | |
| type, | |
| state, | |
| county, | |
| geoid, | |
| population, | |
| {select_score} | |
| FROM jurisdictions_search | |
| WHERE {where_sql} | |
| ORDER BY {order_by} | |
| LIMIT ${param_idx} | |
| OFFSET ${param_idx + 1} | |
| """ | |
| # Add limit and offset | |
| params.append(limit) | |
| params.append(offset) | |
| async with pool.acquire() as conn: | |
| rows = await conn.fetch(sql, *params) | |
| results = [] | |
| for row in rows: | |
| jurisdiction_label = row['type'].replace('_', ' ').title() | |
| results.append(SearchResult( | |
| result_type='jurisdiction', | |
| title=row['name'], | |
| subtitle=f"{jurisdiction_label}", | |
| description=f"{jurisdiction_label} in {row['state']}" + (f" • Pop: {row['population']:,}" if row['population'] else ""), | |
| url=f"/jurisdictions/{row['geoid']}" if row['geoid'] else f"/jurisdictions/{row['name']}", | |
| score=float(row.get('score', 1.0)) if query else 1.0, | |
| metadata={ | |
| 'state': row['state'], | |
| 'geoid': row['geoid'], | |
| 'type': row['type'], | |
| 'county': row['county'], | |
| 'population': row['population'] | |
| } | |
| )) | |
| logger.info(f"🏛️ PostgreSQL jurisdictions search: {len(results)} results") | |
| return results | |
| except Exception as e: | |
| logger.error(f"PostgreSQL jurisdictions search error: {e}") | |
| return [] | |
| async def search_contacts_pg( | |
| query: Optional[str] = None, | |
| state: Optional[str] = None, | |
| limit: int = 10 | |
| ) -> List[SearchResult]: | |
| """ | |
| Search contacts (nonprofit officers, local officials) using PostgreSQL | |
| Args: | |
| query: Search text (name, title, organization) | |
| state: Filter by state code | |
| limit: Max results | |
| Returns: | |
| List of SearchResult objects | |
| """ | |
| try: | |
| pool = await get_db_pool() | |
| # Build WHERE clauses | |
| where_clauses = [] | |
| params = [] | |
| param_idx = 1 | |
| if state: | |
| where_clauses.append(f"state = ${param_idx}") | |
| params.append(state.upper()) | |
| param_idx += 1 | |
| # Text search across name and organization | |
| if query and query.strip(): | |
| where_clauses.append(f"""( | |
| to_tsvector('english', name) @@ plainto_tsquery('english', ${param_idx}) | |
| OR to_tsvector('english', COALESCE(organization_name, '')) @@ plainto_tsquery('english', ${param_idx}) | |
| OR LOWER(title) LIKE LOWER(${param_idx + 1}) | |
| )""") | |
| params.append(query) | |
| params.append(f"%{query}%") | |
| param_idx += 2 | |
| # Rank by relevance | |
| order_by = f""" | |
| GREATEST( | |
| ts_rank(to_tsvector('english', name), plainto_tsquery('english', ${param_idx - 2})), | |
| ts_rank(to_tsvector('english', COALESCE(organization_name, '')), plainto_tsquery('english', ${param_idx - 2})) | |
| ) DESC, name ASC | |
| """ | |
| else: | |
| order_by = "name ASC" | |
| where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" | |
| sql = f""" | |
| SELECT | |
| name, | |
| title, | |
| organization_name, | |
| organization_ein, | |
| city, | |
| state, | |
| role_type, | |
| compensation, | |
| source | |
| FROM contacts_search | |
| WHERE {where_sql} | |
| ORDER BY {order_by} | |
| LIMIT ${param_idx} | |
| """ | |
| params.append(limit) | |
| async with pool.acquire() as conn: | |
| rows = await conn.fetch(sql, *params) | |
| results = [] | |
| for row in rows: | |
| org_display = row['organization_name'] or 'Unknown Organization' | |
| location = f"{row['city']}, {row['state']}" if row['city'] and row['state'] else (row['state'] or '') | |
| results.append(SearchResult( | |
| result_type='contact', | |
| title=row['name'], | |
| subtitle=f"{row['title'] or 'Officer'} - {org_display}", | |
| description=f"{row['role_type'] or 'Contact'} in {location}", | |
| url=f"/people/{row['name'].replace(' ', '-')}", | |
| score=1.0, | |
| metadata={ | |
| 'title': row['title'], | |
| 'organization': org_display, | |
| 'organization_ein': row['organization_ein'], | |
| 'state': row['state'], | |
| 'city': row['city'], | |
| 'role_type': row['role_type'], | |
| 'compensation': row['compensation'], | |
| 'source': row['source'] | |
| } | |
| )) | |
| logger.info(f"👤 PostgreSQL contacts search: {len(results)} results") | |
| return results | |
| except Exception as e: | |
| logger.error(f"PostgreSQL contacts search error: {e}") | |
| return [] | |
| async def search_organizations_pg( | |
| query: Optional[str] = None, | |
| state: Optional[str] = None, | |
| ntee_code: Optional[str] = None, | |
| ein: Optional[str] = None, | |
| limit: int = 10, | |
| offset: int = 0, | |
| sort: str = 'relevance' | |
| ) -> List[SearchResult]: | |
| """ | |
| Search nonprofit organizations using PostgreSQL | |
| Args: | |
| query: Search text (organization name) | |
| state: Filter by state code | |
| ntee_code: Filter by NTEE code prefix | |
| ein: Exact EIN match | |
| limit: Max results | |
| offset: Pagination offset | |
| sort: Sort order (relevance, name-asc, name-desc, revenue-asc, revenue-desc, assets-asc, assets-desc) | |
| Returns: | |
| List of SearchResult objects | |
| """ | |
| try: | |
| pool = await get_db_pool() | |
| # Build WHERE clauses | |
| where_clauses = [] | |
| params = [] | |
| param_idx = 1 | |
| # EIN exact match (highest priority) | |
| if ein: | |
| where_clauses.append(f"ein = ${param_idx}") | |
| params.append(ein.strip()) | |
| param_idx += 1 | |
| # State filter | |
| if state: | |
| where_clauses.append(f"state = ${param_idx}") | |
| params.append(state.upper()) | |
| param_idx += 1 | |
| # NTEE code filter | |
| if ntee_code: | |
| where_clauses.append(f"ntee_code LIKE ${param_idx}") | |
| params.append(f"{ntee_code}%") | |
| param_idx += 1 | |
| # Text search (if no EIN specified) | |
| if query and query.strip() and not ein: | |
| where_clauses.append(f"to_tsvector('english', name) @@ plainto_tsquery('english', ${param_idx})") | |
| params.append(query) | |
| param_idx += 1 | |
| where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" | |
| # Determine sort order | |
| if sort == 'name-asc': | |
| order_by = "name ASC" | |
| elif sort == 'name-desc': | |
| order_by = "name DESC" | |
| elif sort == 'revenue-asc': | |
| order_by = "revenue ASC NULLS LAST" | |
| elif sort == 'revenue-desc': | |
| order_by = "revenue DESC NULLS LAST" | |
| elif sort == 'assets-asc': | |
| order_by = "assets ASC NULLS LAST" | |
| elif sort == 'assets-desc': | |
| order_by = "assets DESC NULLS LAST" | |
| elif query and query.strip() and not ein: | |
| # Relevance ranking for text search | |
| order_by = f"ts_rank(to_tsvector('english', name), plainto_tsquery('english', ${param_idx - 1})) DESC, name ASC" | |
| else: | |
| order_by = "name ASC" | |
| sql = f""" | |
| SELECT | |
| ein, | |
| name, | |
| city, | |
| state, | |
| county, | |
| ntee_code, | |
| ntee_description, | |
| revenue, | |
| assets, | |
| income, | |
| tax_period | |
| FROM nonprofits_search | |
| WHERE {where_sql} | |
| ORDER BY {order_by} | |
| LIMIT ${param_idx} | |
| OFFSET ${param_idx + 1} | |
| """ | |
| params.append(limit) | |
| params.append(offset) | |
| async with pool.acquire() as conn: | |
| rows = await conn.fetch(sql, *params) | |
| results = [] | |
| for row in rows: | |
| location = f"{row['city']}, {row['state']}" if row['city'] and row['state'] else (row['state'] or '') | |
| # Format financials | |
| financials = [] | |
| if row['revenue']: | |
| financials.append(f"Revenue: ${row['revenue']:,}") | |
| if row['assets']: | |
| financials.append(f"Assets: ${row['assets']:,}") | |
| description = f"{row['ntee_description'] or 'Nonprofit organization'}" | |
| if financials: | |
| description += " • " + " • ".join(financials) | |
| results.append(SearchResult( | |
| result_type='organization', | |
| title=row['name'], | |
| subtitle=location, | |
| description=description, | |
| url=f"/organizations/{row['ein']}", | |
| score=1.0, | |
| metadata={ | |
| 'ein': row['ein'], | |
| 'state': row['state'], | |
| 'city': row['city'], | |
| 'county': row['county'], | |
| 'ntee_code': row['ntee_code'], | |
| 'ntee_description': row['ntee_description'], | |
| 'revenue': row['revenue'], | |
| 'assets': row['assets'], | |
| 'income': row['income'], | |
| 'tax_period': row['tax_period'] | |
| } | |
| )) | |
| logger.info(f"🏢 PostgreSQL organizations search: {len(results)} results") | |
| return results | |
| except Exception as e: | |
| logger.error(f"PostgreSQL organizations search error: {e}") | |
| return [] | |
| async def search_events_pg( | |
| query: Optional[str] = None, | |
| state: Optional[str] = None, | |
| limit: int = 10 | |
| ) -> List[SearchResult]: | |
| """ | |
| Search meetings/events using PostgreSQL | |
| Args: | |
| query: Search text (title, jurisdiction, description) | |
| state: Filter by state code | |
| limit: Max results | |
| Returns: | |
| List of SearchResult objects | |
| """ | |
| try: | |
| pool = await get_db_pool() | |
| # Build WHERE clauses | |
| where_clauses = [] | |
| params = [] | |
| param_idx = 1 | |
| if state: | |
| where_clauses.append(f"state = ${param_idx}") | |
| params.append(state.upper()) | |
| param_idx += 1 | |
| # Text search | |
| if query and query.strip(): | |
| where_clauses.append(f"""( | |
| to_tsvector('english', title) @@ plainto_tsquery('english', ${param_idx}) | |
| OR LOWER(jurisdiction_name) LIKE LOWER(${param_idx + 1}) | |
| )""") | |
| params.append(query) | |
| params.append(f"%{query}%") | |
| param_idx += 2 | |
| order_by = f"ts_rank(to_tsvector('english', title), plainto_tsquery('english', ${param_idx - 2})) DESC, event_date DESC" | |
| else: | |
| order_by = "event_date DESC" | |
| where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" | |
| sql = f""" | |
| SELECT | |
| id, | |
| title, | |
| description, | |
| event_date, | |
| jurisdiction_name, | |
| jurisdiction_type, | |
| state, | |
| city, | |
| video_url, | |
| agenda_url | |
| FROM events_search | |
| WHERE {where_sql} | |
| ORDER BY {order_by} | |
| LIMIT ${param_idx} | |
| """ | |
| params.append(limit) | |
| async with pool.acquire() as conn: | |
| rows = await conn.fetch(sql, *params) | |
| results = [] | |
| for row in rows: | |
| location = f"{row['jurisdiction_name']}, {row['state']}" if row['jurisdiction_name'] and row['state'] else '' | |
| date_str = row['event_date'].strftime('%Y-%m-%d') if row['event_date'] else '' | |
| description = (row['description'] or '')[:200] | |
| if len(description) == 200: | |
| description += "..." | |
| results.append(SearchResult( | |
| result_type='meeting', | |
| title=row['title'], | |
| subtitle=f"{location} - {date_str}", | |
| description=description, | |
| url=f"/documents?meeting_id={row['id']}", | |
| score=1.0, | |
| metadata={ | |
| 'jurisdiction': row['jurisdiction_name'], | |
| 'jurisdiction_type': row['jurisdiction_type'], | |
| 'state': row['state'], | |
| 'city': row['city'], | |
| 'date': date_str, | |
| 'meeting_id': row['id'], | |
| 'video_url': row['video_url'], | |
| 'agenda_url': row['agenda_url'] | |
| } | |
| )) | |
| logger.info(f"📅 PostgreSQL events search: {len(results)} results") | |
| return results | |
| except Exception as e: | |
| logger.error(f"PostgreSQL events search error: {e}") | |
| return [] | |