open-navigator / api /routes /search_postgres.py
jcbowyer's picture
Clean HuggingFace deployment without binary files
61d29fc
"""
PostgreSQL-based search functions
Uses indexed search tables for fast queries (10-100x faster than parquet)
"""
from typing import Optional, List
from loguru import logger
import asyncpg
import os
from datetime import datetime
from dataclasses import dataclass
# Database configuration
# Priority: NEON_DATABASE_URL_DEV (local) > NEON_DATABASE_URL (production)
NEON_DATABASE_URL_DEV = os.getenv('NEON_DATABASE_URL_DEV')
NEON_DATABASE_URL = os.getenv('NEON_DATABASE_URL')
# Use dev database for local development, production database for deployed environments
DATABASE_URL = NEON_DATABASE_URL_DEV or NEON_DATABASE_URL
# Connection pool (created on first request)
_db_pool = None
@dataclass
class SearchResult:
"""Search result data class"""
result_type: str
title: str
subtitle: str
description: str
url: str
score: float
metadata: dict
async def get_db_pool():
"""Get or create database connection pool"""
global _db_pool
if _db_pool is None:
if not DATABASE_URL:
raise ValueError("DATABASE_URL not configured")
db_type = "Development (Local PostgreSQL)" if NEON_DATABASE_URL_DEV else "Production (Neon)"
logger.info(f"🗄️ Creating connection pool to {db_type}")
_db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=20)
return _db_pool
async def search_jurisdictions_pg(
query: Optional[str] = None,
state: Optional[str] = None,
city: Optional[str] = None,
jurisdiction_levels: Optional[List[str]] = None,
limit: int = 10,
offset: int = 0
) -> List[SearchResult]:
"""
Search jurisdictions using PostgreSQL full-text search
Args:
query: Search text (jurisdiction name)
state: Filter by state code (e.g., 'MA')
city: Filter by city name
jurisdiction_levels: Filter by types (city, county, town, school_district, etc.)
limit: Max results
offset: Pagination offset
Returns:
List of SearchResult objects
"""
try:
pool = await get_db_pool()
# Map frontend level IDs to database types
level_mapping = {
'city': 'city',
'county': 'county',
'town': 'town',
'village': 'village',
'school_district': 'school_district',
'special_district': 'special_district',
'state': 'state'
}
# Build SQL query
where_clauses = []
params = []
param_idx = 1
has_query = query and query.strip()
# Text search filter first (if present) - must be $1 for score calculation
score_param_idx = None
if has_query:
where_clauses.append(f"to_tsvector('english', name) @@ plainto_tsquery('english', ${param_idx})")
params.append(query)
score_param_idx = param_idx
param_idx += 1
# State filter
if state:
where_clauses.append(f"state = ${param_idx}")
params.append(state.upper())
param_idx += 1
# City filter
if city:
where_clauses.append(f"LOWER(name) LIKE LOWER(${param_idx})")
params.append(f"%{city}%")
param_idx += 1
# Jurisdiction level filter
if jurisdiction_levels:
db_types = [level_mapping.get(level) for level in jurisdiction_levels if level_mapping.get(level)]
if db_types:
placeholders = ','.join([f"${param_idx + i}" for i in range(len(db_types))])
where_clauses.append(f"type IN ({placeholders})")
params.extend(db_types)
param_idx += len(db_types)
# Build final WHERE clause
where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE"
# Select clause and order by
if has_query:
select_score = f"ts_rank(to_tsvector('english', name), plainto_tsquery('english', ${score_param_idx})) as score"
order_by = f"score DESC, name ASC"
else:
select_score = "1.0 as score"
order_by = "name ASC"
# Build complete query
sql = f"""
SELECT
name,
type,
state,
county,
geoid,
population,
{select_score}
FROM jurisdictions_search
WHERE {where_sql}
ORDER BY {order_by}
LIMIT ${param_idx}
OFFSET ${param_idx + 1}
"""
# Add limit and offset
params.append(limit)
params.append(offset)
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
results = []
for row in rows:
jurisdiction_label = row['type'].replace('_', ' ').title()
results.append(SearchResult(
result_type='jurisdiction',
title=row['name'],
subtitle=f"{jurisdiction_label}",
description=f"{jurisdiction_label} in {row['state']}" + (f" • Pop: {row['population']:,}" if row['population'] else ""),
url=f"/jurisdictions/{row['geoid']}" if row['geoid'] else f"/jurisdictions/{row['name']}",
score=float(row.get('score', 1.0)) if query else 1.0,
metadata={
'state': row['state'],
'geoid': row['geoid'],
'type': row['type'],
'county': row['county'],
'population': row['population']
}
))
logger.info(f"🏛️ PostgreSQL jurisdictions search: {len(results)} results")
return results
except Exception as e:
logger.error(f"PostgreSQL jurisdictions search error: {e}")
return []
async def search_contacts_pg(
query: Optional[str] = None,
state: Optional[str] = None,
limit: int = 10
) -> List[SearchResult]:
"""
Search contacts (nonprofit officers, local officials) using PostgreSQL
Args:
query: Search text (name, title, organization)
state: Filter by state code
limit: Max results
Returns:
List of SearchResult objects
"""
try:
pool = await get_db_pool()
# Build WHERE clauses
where_clauses = []
params = []
param_idx = 1
if state:
where_clauses.append(f"state = ${param_idx}")
params.append(state.upper())
param_idx += 1
# Text search across name and organization
if query and query.strip():
where_clauses.append(f"""(
to_tsvector('english', name) @@ plainto_tsquery('english', ${param_idx})
OR to_tsvector('english', COALESCE(organization_name, '')) @@ plainto_tsquery('english', ${param_idx})
OR LOWER(title) LIKE LOWER(${param_idx + 1})
)""")
params.append(query)
params.append(f"%{query}%")
param_idx += 2
# Rank by relevance
order_by = f"""
GREATEST(
ts_rank(to_tsvector('english', name), plainto_tsquery('english', ${param_idx - 2})),
ts_rank(to_tsvector('english', COALESCE(organization_name, '')), plainto_tsquery('english', ${param_idx - 2}))
) DESC, name ASC
"""
else:
order_by = "name ASC"
where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE"
sql = f"""
SELECT
name,
title,
organization_name,
organization_ein,
city,
state,
role_type,
compensation,
source
FROM contacts_search
WHERE {where_sql}
ORDER BY {order_by}
LIMIT ${param_idx}
"""
params.append(limit)
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
results = []
for row in rows:
org_display = row['organization_name'] or 'Unknown Organization'
location = f"{row['city']}, {row['state']}" if row['city'] and row['state'] else (row['state'] or '')
results.append(SearchResult(
result_type='contact',
title=row['name'],
subtitle=f"{row['title'] or 'Officer'} - {org_display}",
description=f"{row['role_type'] or 'Contact'} in {location}",
url=f"/people/{row['name'].replace(' ', '-')}",
score=1.0,
metadata={
'title': row['title'],
'organization': org_display,
'organization_ein': row['organization_ein'],
'state': row['state'],
'city': row['city'],
'role_type': row['role_type'],
'compensation': row['compensation'],
'source': row['source']
}
))
logger.info(f"👤 PostgreSQL contacts search: {len(results)} results")
return results
except Exception as e:
logger.error(f"PostgreSQL contacts search error: {e}")
return []
async def search_organizations_pg(
query: Optional[str] = None,
state: Optional[str] = None,
ntee_code: Optional[str] = None,
ein: Optional[str] = None,
limit: int = 10,
offset: int = 0,
sort: str = 'relevance'
) -> List[SearchResult]:
"""
Search nonprofit organizations using PostgreSQL
Args:
query: Search text (organization name)
state: Filter by state code
ntee_code: Filter by NTEE code prefix
ein: Exact EIN match
limit: Max results
offset: Pagination offset
sort: Sort order (relevance, name-asc, name-desc, revenue-asc, revenue-desc, assets-asc, assets-desc)
Returns:
List of SearchResult objects
"""
try:
pool = await get_db_pool()
# Build WHERE clauses
where_clauses = []
params = []
param_idx = 1
# EIN exact match (highest priority)
if ein:
where_clauses.append(f"ein = ${param_idx}")
params.append(ein.strip())
param_idx += 1
# State filter
if state:
where_clauses.append(f"state = ${param_idx}")
params.append(state.upper())
param_idx += 1
# NTEE code filter
if ntee_code:
where_clauses.append(f"ntee_code LIKE ${param_idx}")
params.append(f"{ntee_code}%")
param_idx += 1
# Text search (if no EIN specified)
if query and query.strip() and not ein:
where_clauses.append(f"to_tsvector('english', name) @@ plainto_tsquery('english', ${param_idx})")
params.append(query)
param_idx += 1
where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE"
# Determine sort order
if sort == 'name-asc':
order_by = "name ASC"
elif sort == 'name-desc':
order_by = "name DESC"
elif sort == 'revenue-asc':
order_by = "revenue ASC NULLS LAST"
elif sort == 'revenue-desc':
order_by = "revenue DESC NULLS LAST"
elif sort == 'assets-asc':
order_by = "assets ASC NULLS LAST"
elif sort == 'assets-desc':
order_by = "assets DESC NULLS LAST"
elif query and query.strip() and not ein:
# Relevance ranking for text search
order_by = f"ts_rank(to_tsvector('english', name), plainto_tsquery('english', ${param_idx - 1})) DESC, name ASC"
else:
order_by = "name ASC"
sql = f"""
SELECT
ein,
name,
city,
state,
county,
ntee_code,
ntee_description,
revenue,
assets,
income,
tax_period
FROM nonprofits_search
WHERE {where_sql}
ORDER BY {order_by}
LIMIT ${param_idx}
OFFSET ${param_idx + 1}
"""
params.append(limit)
params.append(offset)
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
results = []
for row in rows:
location = f"{row['city']}, {row['state']}" if row['city'] and row['state'] else (row['state'] or '')
# Format financials
financials = []
if row['revenue']:
financials.append(f"Revenue: ${row['revenue']:,}")
if row['assets']:
financials.append(f"Assets: ${row['assets']:,}")
description = f"{row['ntee_description'] or 'Nonprofit organization'}"
if financials:
description += " • " + " • ".join(financials)
results.append(SearchResult(
result_type='organization',
title=row['name'],
subtitle=location,
description=description,
url=f"/organizations/{row['ein']}",
score=1.0,
metadata={
'ein': row['ein'],
'state': row['state'],
'city': row['city'],
'county': row['county'],
'ntee_code': row['ntee_code'],
'ntee_description': row['ntee_description'],
'revenue': row['revenue'],
'assets': row['assets'],
'income': row['income'],
'tax_period': row['tax_period']
}
))
logger.info(f"🏢 PostgreSQL organizations search: {len(results)} results")
return results
except Exception as e:
logger.error(f"PostgreSQL organizations search error: {e}")
return []
async def search_events_pg(
query: Optional[str] = None,
state: Optional[str] = None,
limit: int = 10
) -> List[SearchResult]:
"""
Search meetings/events using PostgreSQL
Args:
query: Search text (title, jurisdiction, description)
state: Filter by state code
limit: Max results
Returns:
List of SearchResult objects
"""
try:
pool = await get_db_pool()
# Build WHERE clauses
where_clauses = []
params = []
param_idx = 1
if state:
where_clauses.append(f"state = ${param_idx}")
params.append(state.upper())
param_idx += 1
# Text search
if query and query.strip():
where_clauses.append(f"""(
to_tsvector('english', title) @@ plainto_tsquery('english', ${param_idx})
OR LOWER(jurisdiction_name) LIKE LOWER(${param_idx + 1})
)""")
params.append(query)
params.append(f"%{query}%")
param_idx += 2
order_by = f"ts_rank(to_tsvector('english', title), plainto_tsquery('english', ${param_idx - 2})) DESC, event_date DESC"
else:
order_by = "event_date DESC"
where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE"
sql = f"""
SELECT
id,
title,
description,
event_date,
jurisdiction_name,
jurisdiction_type,
state,
city,
video_url,
agenda_url
FROM events_search
WHERE {where_sql}
ORDER BY {order_by}
LIMIT ${param_idx}
"""
params.append(limit)
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
results = []
for row in rows:
location = f"{row['jurisdiction_name']}, {row['state']}" if row['jurisdiction_name'] and row['state'] else ''
date_str = row['event_date'].strftime('%Y-%m-%d') if row['event_date'] else ''
description = (row['description'] or '')[:200]
if len(description) == 200:
description += "..."
results.append(SearchResult(
result_type='meeting',
title=row['title'],
subtitle=f"{location} - {date_str}",
description=description,
url=f"/documents?meeting_id={row['id']}",
score=1.0,
metadata={
'jurisdiction': row['jurisdiction_name'],
'jurisdiction_type': row['jurisdiction_type'],
'state': row['state'],
'city': row['city'],
'date': date_str,
'meeting_id': row['id'],
'video_url': row['video_url'],
'agenda_url': row['agenda_url']
}
))
logger.info(f"📅 PostgreSQL events search: {len(results)} results")
return results
except Exception as e:
logger.error(f"PostgreSQL events search error: {e}")
return []