Spaces:
Sleeping
Sleeping
| # __author__ = 'Gemini AI, based on collaborative design' | |
| # __version__ = '2.1.0 - CRS Aware' | |
| import sqlite3 | |
| from pathlib import Path | |
| import json | |
| import re | |
| import traceback | |
| import argparse | |
| import sys | |
| # Geopandas is used for its robust GeoPackage reading capabilities, | |
| # including automatic CRS detection. | |
| import geopandas as gpd | |
| import pandas as pd | |
| import numpy as np | |
| # --- UNIFIED COUNTY CONFIGURATION --- | |
| # This is the central configuration. The keys ('Kitsap', 'Pierce') | |
| # must match the 'choices' in the --county command-line argument. | |
| # These configurations are designed to match the output of their | |
| # respective lattice-builder scripts. | |
| COUNTY_CONFIGS = { | |
| "Kitsap": { | |
| "LATTICE_PATH": Path('lattice_output/blueprint_lattice.gpkg'), | |
| "TABLE_NAME": 'parcels', | |
| "FTS_TABLE_NAME": 'parcels_fts', | |
| "ID_COLUMN": 'rp_acct_id', | |
| # Simple regex to check if a query looks like a Kitsap parcel ID. | |
| "ID_CHECK_FUNCTION": lambda q: re.fullmatch(r'\d{10}', q.strip()) is not None, | |
| "RANKED_SEARCH_LIMIT": 15, | |
| "DISPLAY_NAME": "Kitsap County (Blueprint)" | |
| }, | |
| "Pierce": { | |
| "LATTICE_PATH": Path('lattice_output/pierce_lattice.gpkg'), | |
| "TABLE_NAME": 'parcels', | |
| "FTS_TABLE_NAME": 'parcels_fts', | |
| "ID_COLUMN": 'taxparcelnumber', | |
| # Simple regex to check if a query looks like a Pierce parcel ID. | |
| "ID_CHECK_FUNCTION": lambda q: re.fullmatch(r'\d{10,12}', q.strip()) is not None, | |
| "RANKED_SEARCH_LIMIT": 15, | |
| "DISPLAY_NAME": "Pierce County" | |
| } | |
| # To add another county, copy one of the blocks above, modify the details, | |
| # and ensure its key is added to the 'choices' in the argparse setup. | |
| } | |
| def format_results_as_json(status: str, query: str, results: list, stage: str = None, source_crs_wkt: str = None, error_code: str = None, error_message: str = None, search_trace: list = None): | |
| """ | |
| Formats the search results or error into a structured JSON object. | |
| Includes a 'metadata' block in successful responses to carry information | |
| like the Coordinate Reference System (CRS) of the source data. | |
| """ | |
| output = { | |
| "status": status, | |
| "query_received": query, | |
| } | |
| if status == "SUCCESS": | |
| output["successful_search_stage"] = stage | |
| # NEW: Add a metadata block for non-result data like CRS. | |
| output["metadata"] = { | |
| "source_crs_wkt": source_crs_wkt | |
| } | |
| output["result_count"] = len(results) | |
| output["results"] = results | |
| else: | |
| output["error_code"] = error_code | |
| output["error_message"] = error_message | |
| output["search_trace"] = search_trace if search_trace is not None else [] | |
| # Use a custom handler for numpy types which are not JSON serializable by default | |
| def numpy_default(obj): | |
| if isinstance(obj, (np.integer, np.floating, np.bool_)): | |
| return obj.item() | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable") | |
| return json.dumps(output, indent=2, default=numpy_default) | |
| def fetch_data_by_ids(row_ids: list, config: dict) -> tuple[list, str | None]: | |
| """ | |
| Uses GeoPandas to fetch specific rows by FID from the GeoPackage. | |
| Crucially, it also inspects the GeoDataFrame to extract the | |
| Coordinate Reference System (CRS) and returns it alongside the data. | |
| Returns: | |
| A tuple containing: (list_of_result_dicts, source_crs_wkt_string or None) | |
| """ | |
| if not row_ids: | |
| return [], None | |
| # Use a map for efficient sorting later | |
| id_map = {id_val: i for i, id_val in enumerate(row_ids)} | |
| id_list_str = ','.join(map(str, row_ids)) | |
| gdf = gpd.read_file( | |
| config["LATTICE_PATH"], | |
| layer=config["TABLE_NAME"], | |
| where=f"fid IN ({id_list_str})" | |
| ) | |
| if gdf.empty: | |
| return [], None | |
| # --- NEW: CRS EXTRACTION --- | |
| # A valid GeoPackage layer will have CRS info. We extract it here. | |
| # The frontend app will use this to correctly reproject the geometry for mapping. | |
| source_crs_wkt = None | |
| if gdf.crs: | |
| source_crs_wkt = gdf.crs.to_wkt() | |
| # --------------------------- | |
| # Add the original 'fid' if it's not already a column (it's usually the index) | |
| if 'fid' not in gdf.columns: | |
| gdf.reset_index(inplace=True) | |
| gdf.rename(columns={'index': 'fid'}, inplace=True) | |
| # Convert geometry to WKT for JSON serialization and drop the binary geometry object | |
| gdf['geom_wkt'] = gdf.geometry.to_wkt() | |
| gdf.drop(columns='geometry', inplace=True) | |
| # Replace Pandas/Numpy null types with None for clean JSON | |
| gdf = gdf.replace({pd.NaT: None, np.nan: None}) | |
| # Restore the original search result order | |
| gdf['sort_order'] = gdf['fid'].map(id_map) | |
| gdf.sort_values('sort_order', inplace=True) | |
| gdf.drop(columns='sort_order', inplace=True) | |
| results_list = gdf.to_dict(orient='records') | |
| return results_list, source_crs_wkt | |
| def execute_tiered_search(address_query: str, config: dict): | |
| """Executes a multi-tiered search strategy using the provided county configuration.""" | |
| search_trace = [] | |
| clean_query = address_query.strip() | |
| if not config["LATTICE_PATH"].exists(): | |
| return format_results_as_json("FAILURE", address_query, [], error_code="LATTICE_NOT_FOUND", error_message=f"Lattice file not found for {config['DISPLAY_NAME']} at '{config['LATTICE_PATH']}'.") | |
| con = None | |
| try: | |
| # Connect in read-only mode to prevent accidental writes | |
| con = sqlite3.connect(f"file:{config['LATTICE_PATH']}?mode=ro", uri=True) | |
| con.row_factory = sqlite3.Row | |
| cur = con.cursor() | |
| # Pre-flight check for the FTS table | |
| cur.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{config['FTS_TABLE_NAME']}';") | |
| if not cur.fetchone(): | |
| return format_results_as_json("FAILURE", address_query, [], error_code="FTS_INDEX_MISSING", error_message=f"FTS index table '{config['FTS_TABLE_NAME']}' does not exist in {config['LATTICE_PATH']}. Please rebuild the lattice.") | |
| row_ids = [] | |
| # --- Tier 1: High-Confidence Parcel ID Search --- | |
| # Use the function defined in the config to see if the query looks like a parcel ID. | |
| if config["ID_CHECK_FUNCTION"](clean_query): | |
| # This is a direct SQL query against the main table, not FTS. It's faster for exact ID matches. | |
| cur.execute(f"SELECT fid FROM {config['TABLE_NAME']} WHERE {config['ID_COLUMN']} = ?", (clean_query,)) | |
| row_ids = [row['fid'] for row in cur.fetchall()] | |
| search_trace.append({"stage": "id_search", "result": f"{len(row_ids)}_matches"}) | |
| else: | |
| search_trace.append({"stage": "id_search", "result": "skipped_not_id_format"}) | |
| # --- FTS-based Search Tiers (only if ID search yields no results) --- | |
| if not row_ids: | |
| # Prepare query for FTS by cleaning it and creating search terms | |
| fts_query = clean_query.replace(',', ' ').replace('.', ' ').upper() | |
| query_parts = fts_query.split() | |
| # Tier 2: Precise FTS (All words - implicit AND logic) | |
| # This looks for records containing all the search words, in any order. | |
| precise_term = ' '.join(f'"{part}"' for part in query_parts) # Wrap in quotes for phrase-like matching | |
| if precise_term: | |
| cur.execute(f"SELECT rowid FROM {config['FTS_TABLE_NAME']} WHERE {config['FTS_TABLE_NAME']} MATCH ?", (precise_term,)) | |
| row_ids = [row['rowid'] for row in cur.fetchall()] | |
| search_trace.append({"stage": "precise_fts_and", "term": precise_term, "result": f"{len(row_ids)}_matches"}) | |
| # Tier 3: Ranked FTS (BM25 relevance scoring - OR logic) | |
| # If the precise search fails, this broader search ranks results by relevance. | |
| if not row_ids and query_parts: | |
| ranked_term = ' OR '.join(query_parts) | |
| sql = f""" | |
| SELECT rowid, bm25({config['FTS_TABLE_NAME']}) as rank | |
| FROM {config['FTS_TABLE_NAME']} | |
| WHERE {config['FTS_TABLE_NAME']} MATCH ? ORDER BY rank LIMIT ?; | |
| """ | |
| cur.execute(sql, (ranked_term, config['RANKED_SEARCH_LIMIT'])) | |
| row_ids = [row['rowid'] for row in cur.fetchall()] | |
| search_trace.append({"stage": "ranked_fts_bm25", "term": ranked_term, "result": f"{len(row_ids)}_matches"}) | |
| # --- Final Data Fetching and Formatting --- | |
| if row_ids: | |
| # The fetch function now returns both results and CRS | |
| results, source_crs = fetch_data_by_ids(row_ids, config) | |
| successful_stage = next((s['stage'] for s in reversed(search_trace) if '_matches' in s['result'] and not s['result'].startswith('0')), 'unknown') | |
| return format_results_as_json("SUCCESS", address_query, results, stage=successful_stage, source_crs_wkt=source_crs) | |
| else: | |
| # If all search tiers fail, return a failure message. | |
| return format_results_as_json("FAILURE", address_query, [], error_code="NO_MATCH_FOUND", error_message="All search strategies were exhausted with no results.", search_trace=search_trace) | |
| except Exception as e: | |
| return format_results_as_json(status="FAILURE", query=address_query, results=[], error_code=type(e).__name__, | |
| error_message=f"{str(e)}\n\nTRACEBACK:\n{traceback.format_exc()}", search_trace=search_trace) | |
| finally: | |
| if con: | |
| con.close() | |
| def main(): | |
| """Main function to handle command-line arguments and run the appropriate search.""" | |
| parser = argparse.ArgumentParser( | |
| description="Anamnesis-GIS Unified Query Engine. Queries a specified county's data lattice.", | |
| formatter_class=argparse.RawTextHelpFormatter | |
| ) | |
| parser.add_argument( | |
| "--county", | |
| type=str, | |
| required=True, | |
| choices=list(COUNTY_CONFIGS.keys()), | |
| help="Specify the county to query." | |
| ) | |
| parser.add_argument( | |
| "--query", | |
| type=str, | |
| help="Direct query string for non-interactive mode. The result is printed to stdout as JSON." | |
| ) | |
| parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}') | |
| args = parser.parse_args() | |
| # Get the selected county's configuration dictionary | |
| selected_config = COUNTY_CONFIGS[args.county] | |
| if args.query: | |
| # Non-interactive mode (e.g., called by another script like Gradio) | |
| json_output = execute_tiered_search(args.query, selected_config) | |
| print(json_output) | |
| else: | |
| # Interactive mode (for direct command-line testing and use) | |
| print(f"--- Anamnesis-GIS Interactive Query Engine (v{__version__}) ---") | |
| print(f"--- Querying: {selected_config['DISPLAY_NAME']} | Lattice: {selected_config['LATTICE_PATH'].name} ---") | |
| print("Enter a full or partial address or Parcel ID to search.") | |
| print("Type 'exit' or 'quit' to end.") | |
| while True: | |
| try: | |
| address_query = input("\nAddress search > ").strip() | |
| if address_query.lower() in ['exit', 'quit']: | |
| print("Exiting query engine. Goodbye!") | |
| break | |
| if not address_query: | |
| continue | |
| json_output = execute_tiered_search(address_query, selected_config) | |
| print(json_output) | |
| except (EOFError, KeyboardInterrupt): | |
| print("\nExiting query engine. Goodbye!") | |
| break | |
| if __name__ == '__main__': | |
| # A simple check to prevent the script from hanging if called without arguments in a non-interactive shell | |
| if not sys.stdout.isatty() and len(sys.argv) == 1: | |
| print(json.dumps({ | |
| "status": "FAILURE", | |
| "error_code": "INVALID_INVOCATION", | |
| "error_message": "Script called non-interactively without required arguments (--county and --query)." | |
| })) | |
| sys.exit(1) | |
| __version__ = '2.1.0' | |
| main() |