Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| #!/usr/bin/env python3 | |
| """ | |
| Open Navigator MCP Server | |
| ========================== | |
| Model Context Protocol (MCP) server for Open Navigator data sources. | |
| Provides AI assistants access to: | |
| - 90,000+ U.S. jurisdictions (Census data) | |
| - 1.8M nonprofit organizations (IRS data) | |
| - 4.5M+ legislative documents (Open States) | |
| - Vector search across bills and meetings | |
| - Real-time statistics and aggregates | |
| Usage: | |
| # Run locally | |
| python scripts/mcp/open_navigator_server.py | |
| # Configure in Claude Desktop (~/.config/Claude/claude_desktop_config.json): | |
| { | |
| "mcpServers": { | |
| "open-navigator": { | |
| "command": "python", | |
| "args": ["/path/to/open-navigator/scripts/mcp/open_navigator_server.py"], | |
| "env": { | |
| "QDRANT_HOST": "localhost", | |
| "QDRANT_PORT": "6333", | |
| "DATABASE_URL": "postgresql://postgres:password@localhost:5433/open_navigator" | |
| } | |
| } | |
| } | |
| } | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import asyncio | |
| from typing import Any, Optional | |
| from pathlib import Path | |
| # Add project root to path | |
| project_root = Path(__file__).parent.parent.parent | |
| sys.path.insert(0, str(project_root)) | |
| try: | |
| from mcp.server import Server | |
| from mcp.types import Tool, TextContent, Resource | |
| except ImportError: | |
| print("β MCP SDK not installed. Install with: pip install mcp anthropic-mcp-sdk") | |
| sys.exit(1) | |
| # Optional imports (graceful degradation) | |
| try: | |
| from qdrant_client import QdrantClient | |
| QDRANT_AVAILABLE = True | |
| except ImportError: | |
| QDRANT_AVAILABLE = False | |
| print("β οΈ Qdrant not available. Vector search tools disabled.") | |
| try: | |
| import psycopg2 | |
| from psycopg2.extras import RealDictCursor | |
| POSTGRES_AVAILABLE = True | |
| except ImportError: | |
| POSTGRES_AVAILABLE = False | |
| print("β οΈ PostgreSQL not available. Database tools disabled.") | |
| try: | |
| from datasets import load_dataset | |
| DATASETS_AVAILABLE = True | |
| except ImportError: | |
| DATASETS_AVAILABLE = False | |
| print("β οΈ HuggingFace datasets not available. Dataset tools disabled.") | |
| # Initialize MCP server | |
| app = Server("open-navigator") | |
| # Initialize clients | |
| qdrant_client = None | |
| pg_conn = None | |
| if QDRANT_AVAILABLE: | |
| try: | |
| qdrant_host = os.getenv("QDRANT_HOST", "localhost") | |
| qdrant_port = int(os.getenv("QDRANT_PORT", "6333")) | |
| qdrant_client = QdrantClient(host=qdrant_host, port=qdrant_port) | |
| print(f"β Connected to Qdrant at {qdrant_host}:{qdrant_port}") | |
| except Exception as e: | |
| print(f"β οΈ Failed to connect to Qdrant: {e}") | |
| QDRANT_AVAILABLE = False | |
| if POSTGRES_AVAILABLE: | |
| try: | |
| db_url = os.getenv("DATABASE_URL", "postgresql://postgres:password@localhost:5433/open_navigator") | |
| pg_conn = psycopg2.connect(db_url, cursor_factory=RealDictCursor) | |
| print(f"β Connected to PostgreSQL") | |
| except Exception as e: | |
| print(f"β οΈ Failed to connect to PostgreSQL: {e}") | |
| POSTGRES_AVAILABLE = False | |
| async def list_resources() -> list[Resource]: | |
| """List available datasets and collections""" | |
| resources = [] | |
| if DATASETS_AVAILABLE: | |
| resources.extend([ | |
| Resource( | |
| uri="hf://census-jurisdictions", | |
| name="U.S. Census Jurisdictions (90,000+)", | |
| description="Cities, counties, and states with geographic data", | |
| mimeType="application/x-parquet" | |
| ), | |
| Resource( | |
| uri="hf://nonprofits", | |
| name="Nonprofit Organizations (1.8M)", | |
| description="IRS-registered nonprofits with Form 990 data", | |
| mimeType="application/x-parquet" | |
| ), | |
| ]) | |
| if QDRANT_AVAILABLE and qdrant_client: | |
| try: | |
| collections = qdrant_client.get_collections() | |
| for coll in collections.collections: | |
| resources.append(Resource( | |
| uri=f"vector://{coll.name}", | |
| name=f"{coll.name.title()} (Vector Search)", | |
| description=f"Semantic search across {coll.points_count:,} documents", | |
| mimeType="application/json" | |
| )) | |
| except Exception as e: | |
| print(f"β οΈ Error listing Qdrant collections: {e}") | |
| return resources | |
| async def list_tools() -> list[Tool]: | |
| """List available tools""" | |
| tools = [] | |
| # HuggingFace dataset tools | |
| if DATASETS_AVAILABLE: | |
| tools.extend([ | |
| Tool( | |
| name="search_jurisdictions", | |
| description="Search 90,000+ U.S. jurisdictions (cities, counties, states) by name, type, or location", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "Search term (jurisdiction name)" | |
| }, | |
| "state": { | |
| "type": "string", | |
| "description": "Filter by state code (e.g., CA, NY)" | |
| }, | |
| "type": { | |
| "type": "string", | |
| "enum": ["city", "county", "state"], | |
| "description": "Filter by jurisdiction type" | |
| }, | |
| "limit": { | |
| "type": "number", | |
| "default": 10, | |
| "description": "Maximum results to return" | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| ), | |
| Tool( | |
| name="get_nonprofits", | |
| description="Get nonprofit organizations in a location with Form 990 data", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "state": { | |
| "type": "string", | |
| "description": "State code (e.g., CA, NY, TX)" | |
| }, | |
| "city": { | |
| "type": "string", | |
| "description": "Filter by city name" | |
| }, | |
| "subsection": { | |
| "type": "string", | |
| "description": "IRS subsection code (e.g., 03 for 501c3)" | |
| }, | |
| "limit": { | |
| "type": "number", | |
| "default": 50, | |
| "description": "Maximum results to return" | |
| } | |
| }, | |
| "required": ["state"] | |
| } | |
| ), | |
| ]) | |
| # Vector search tools | |
| if QDRANT_AVAILABLE and qdrant_client: | |
| tools.extend([ | |
| Tool( | |
| name="vector_search_bills", | |
| description="Semantic search across legislative bills using natural language", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "Natural language query" | |
| }, | |
| "state": { | |
| "type": "string", | |
| "description": "Filter by state code" | |
| }, | |
| "limit": { | |
| "type": "number", | |
| "default": 10, | |
| "description": "Maximum results to return" | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| ), | |
| Tool( | |
| name="vector_search_meetings", | |
| description="Semantic search across meeting transcripts using natural language", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "Natural language query" | |
| }, | |
| "municipality": { | |
| "type": "string", | |
| "description": "Filter by city/municipality name" | |
| }, | |
| "limit": { | |
| "type": "number", | |
| "default": 10, | |
| "description": "Maximum results to return" | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| ), | |
| ]) | |
| # PostgreSQL analytics tools | |
| if POSTGRES_AVAILABLE and pg_conn: | |
| tools.extend([ | |
| Tool( | |
| name="get_bill_stats", | |
| description="Get legislative statistics and aggregates by state/topic", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "state": { | |
| "type": "string", | |
| "description": "State code (e.g., CA, NY)" | |
| }, | |
| "topic": { | |
| "type": "string", | |
| "description": "Bill topic/category" | |
| } | |
| } | |
| } | |
| ), | |
| Tool( | |
| name="search_meetings", | |
| description="Search meeting records by keyword, location, or date", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "Search keyword" | |
| }, | |
| "state": { | |
| "type": "string", | |
| "description": "Filter by state" | |
| }, | |
| "limit": { | |
| "type": "number", | |
| "default": 20, | |
| "description": "Maximum results to return" | |
| } | |
| } | |
| } | |
| ), | |
| ]) | |
| return tools | |
| async def call_tool(name: str, arguments: dict) -> list[TextContent]: | |
| """Execute a tool""" | |
| # HuggingFace dataset tools | |
| if name == "search_jurisdictions" and DATASETS_AVAILABLE: | |
| try: | |
| ds = load_dataset("getcommunityone/open-navigator-census", split="train") | |
| # Filter by query | |
| query = arguments["query"].lower() | |
| results = ds.filter(lambda x: query in x["name"].lower()) | |
| # Filter by state | |
| if arguments.get("state"): | |
| state = arguments["state"].upper() | |
| results = results.filter(lambda x: x.get("state_code") == state) | |
| # Filter by type | |
| if arguments.get("type"): | |
| jtype = arguments["type"].lower() | |
| results = results.filter(lambda x: jtype in x.get("type", "").lower()) | |
| # Limit results | |
| limit = arguments.get("limit", 10) | |
| results = results.select(range(min(limit, len(results)))) | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(results.to_pandas().to_dict('records'), indent=2, default=str) | |
| )] | |
| except Exception as e: | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| elif name == "get_nonprofits" and DATASETS_AVAILABLE: | |
| try: | |
| state = arguments["state"].lower() | |
| ds = load_dataset(f"getcommunityone/open-navigator-nonprofits-{state}", split="train") | |
| df = ds.to_pandas() | |
| # Filter by city | |
| if arguments.get("city"): | |
| city = arguments["city"].lower() | |
| df = df[df['city'].str.lower().str.contains(city, na=False)] | |
| # Filter by subsection | |
| if arguments.get("subsection"): | |
| df = df[df['subsection'] == arguments["subsection"]] | |
| # Limit results | |
| limit = arguments.get("limit", 50) | |
| results = df.head(limit).to_dict('records') | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(results, indent=2, default=str) | |
| )] | |
| except Exception as e: | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| # Vector search tools | |
| elif name == "vector_search_bills" and QDRANT_AVAILABLE and qdrant_client: | |
| try: | |
| query_filter = None | |
| if arguments.get("state"): | |
| query_filter = { | |
| "must": [{"key": "state", "match": {"value": arguments["state"]}}] | |
| } | |
| results = qdrant_client.search( | |
| collection_name="bills", | |
| query_text=arguments["query"], | |
| limit=arguments.get("limit", 10), | |
| query_filter=query_filter | |
| ) | |
| formatted_results = [{ | |
| "bill_id": r.payload.get("bill_id"), | |
| "title": r.payload.get("title"), | |
| "state": r.payload.get("state"), | |
| "session": r.payload.get("session"), | |
| "score": float(r.score), | |
| "summary": r.payload.get("summary", "")[:200] | |
| } for r in results] | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(formatted_results, indent=2) | |
| )] | |
| except Exception as e: | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| elif name == "vector_search_meetings" and QDRANT_AVAILABLE and qdrant_client: | |
| try: | |
| query_filter = None | |
| if arguments.get("municipality"): | |
| query_filter = { | |
| "must": [{"key": "municipality", "match": {"value": arguments["municipality"]}}] | |
| } | |
| results = qdrant_client.search( | |
| collection_name="meetings", | |
| query_text=arguments["query"], | |
| limit=arguments.get("limit", 10), | |
| query_filter=query_filter | |
| ) | |
| formatted_results = [{ | |
| "meeting_id": r.payload.get("meeting_id"), | |
| "title": r.payload.get("title"), | |
| "municipality": r.payload.get("municipality"), | |
| "date": r.payload.get("date"), | |
| "score": float(r.score), | |
| "excerpt": r.payload.get("text", "")[:200] | |
| } for r in results] | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(formatted_results, indent=2) | |
| )] | |
| except Exception as e: | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| # PostgreSQL tools | |
| elif name == "get_bill_stats" and POSTGRES_AVAILABLE and pg_conn: | |
| try: | |
| cur = pg_conn.cursor() | |
| if arguments.get("state"): | |
| cur.execute(""" | |
| SELECT | |
| state, | |
| topic, | |
| COUNT(*) as total_bills, | |
| SUM(total_bills) as bill_count | |
| FROM bills_map_aggregates | |
| WHERE state = %s | |
| GROUP BY state, topic | |
| ORDER BY bill_count DESC | |
| LIMIT 20 | |
| """, (arguments["state"],)) | |
| else: | |
| cur.execute(""" | |
| SELECT | |
| state, | |
| COUNT(DISTINCT topic) as topics, | |
| SUM(total_bills) as total_bills | |
| FROM bills_map_aggregates | |
| GROUP BY state | |
| ORDER BY total_bills DESC | |
| LIMIT 50 | |
| """) | |
| results = cur.fetchall() | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps([dict(r) for r in results], indent=2, default=str) | |
| )] | |
| except Exception as e: | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| elif name == "search_meetings" and POSTGRES_AVAILABLE and pg_conn: | |
| try: | |
| cur = pg_conn.cursor() | |
| query = arguments.get("query", "") | |
| state = arguments.get("state") | |
| limit = arguments.get("limit", 20) | |
| if state: | |
| cur.execute(""" | |
| SELECT | |
| name, organization_name, state, event_date, | |
| description | |
| FROM meetings | |
| WHERE state = %s | |
| AND ( | |
| name ILIKE %s | |
| OR organization_name ILIKE %s | |
| OR description ILIKE %s | |
| ) | |
| ORDER BY event_date DESC | |
| LIMIT %s | |
| """, (state, f"%{query}%", f"%{query}%", f"%{query}%", limit)) | |
| else: | |
| cur.execute(""" | |
| SELECT | |
| name, organization_name, state, event_date, | |
| description | |
| FROM meetings | |
| WHERE name ILIKE %s | |
| OR organization_name ILIKE %s | |
| OR description ILIKE %s | |
| ORDER BY event_date DESC | |
| LIMIT %s | |
| """, (f"%{query}%", f"%{query}%", f"%{query}%", limit)) | |
| results = cur.fetchall() | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps([dict(r) for r in results], indent=2, default=str) | |
| )] | |
| except Exception as e: | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| else: | |
| return [TextContent( | |
| type="text", | |
| text=f"Unknown tool: {name}" | |
| )] | |
| async def main(): | |
| """Run the MCP server""" | |
| print("π Starting Open Navigator MCP Server...") | |
| print(f" π HuggingFace Datasets: {'β ' if DATASETS_AVAILABLE else 'β'}") | |
| print(f" π Qdrant Vector Search: {'β ' if QDRANT_AVAILABLE and qdrant_client else 'β'}") | |
| print(f" πΎ PostgreSQL Analytics: {'β ' if POSTGRES_AVAILABLE and pg_conn else 'β'}") | |
| print() | |
| print("Ready to serve requests via MCP protocol") | |
| # Run the server | |
| async with app.run_async(): | |
| await asyncio.Event().wait() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |