anamnesis_GIS_TOOLS / query_lattice_master2.py
threeorfour's picture
Upload 16 files
e1ec339 verified
# __author__ = 'Gemini AI, based on collaborative design'
# __version__ = '2.1.0 - CRS Aware'
import sqlite3
from pathlib import Path
import json
import re
import traceback
import argparse
import sys
# Geopandas is used for its robust GeoPackage reading capabilities,
# including automatic CRS detection.
import geopandas as gpd
import pandas as pd
import numpy as np
# --- UNIFIED COUNTY CONFIGURATION ---
# This is the central configuration. The keys ('Kitsap', 'Pierce')
# must match the 'choices' in the --county command-line argument.
# These configurations are designed to match the output of their
# respective lattice-builder scripts.
COUNTY_CONFIGS = {
"Kitsap": {
"LATTICE_PATH": Path('lattice_output/blueprint_lattice.gpkg'),
"TABLE_NAME": 'parcels',
"FTS_TABLE_NAME": 'parcels_fts',
"ID_COLUMN": 'rp_acct_id',
# Simple regex to check if a query looks like a Kitsap parcel ID.
"ID_CHECK_FUNCTION": lambda q: re.fullmatch(r'\d{10}', q.strip()) is not None,
"RANKED_SEARCH_LIMIT": 15,
"DISPLAY_NAME": "Kitsap County (Blueprint)"
},
"Pierce": {
"LATTICE_PATH": Path('lattice_output/pierce_lattice.gpkg'),
"TABLE_NAME": 'parcels',
"FTS_TABLE_NAME": 'parcels_fts',
"ID_COLUMN": 'taxparcelnumber',
# Simple regex to check if a query looks like a Pierce parcel ID.
"ID_CHECK_FUNCTION": lambda q: re.fullmatch(r'\d{10,12}', q.strip()) is not None,
"RANKED_SEARCH_LIMIT": 15,
"DISPLAY_NAME": "Pierce County"
}
# To add another county, copy one of the blocks above, modify the details,
# and ensure its key is added to the 'choices' in the argparse setup.
}
def format_results_as_json(status: str, query: str, results: list, stage: str = None, source_crs_wkt: str = None, error_code: str = None, error_message: str = None, search_trace: list = None):
"""
Formats the search results or error into a structured JSON object.
Includes a 'metadata' block in successful responses to carry information
like the Coordinate Reference System (CRS) of the source data.
"""
output = {
"status": status,
"query_received": query,
}
if status == "SUCCESS":
output["successful_search_stage"] = stage
# NEW: Add a metadata block for non-result data like CRS.
output["metadata"] = {
"source_crs_wkt": source_crs_wkt
}
output["result_count"] = len(results)
output["results"] = results
else:
output["error_code"] = error_code
output["error_message"] = error_message
output["search_trace"] = search_trace if search_trace is not None else []
# Use a custom handler for numpy types which are not JSON serializable by default
def numpy_default(obj):
if isinstance(obj, (np.integer, np.floating, np.bool_)):
return obj.item()
elif isinstance(obj, np.ndarray):
return obj.tolist()
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
return json.dumps(output, indent=2, default=numpy_default)
def fetch_data_by_ids(row_ids: list, config: dict) -> tuple[list, str | None]:
"""
Uses GeoPandas to fetch specific rows by FID from the GeoPackage.
Crucially, it also inspects the GeoDataFrame to extract the
Coordinate Reference System (CRS) and returns it alongside the data.
Returns:
A tuple containing: (list_of_result_dicts, source_crs_wkt_string or None)
"""
if not row_ids:
return [], None
# Use a map for efficient sorting later
id_map = {id_val: i for i, id_val in enumerate(row_ids)}
id_list_str = ','.join(map(str, row_ids))
gdf = gpd.read_file(
config["LATTICE_PATH"],
layer=config["TABLE_NAME"],
where=f"fid IN ({id_list_str})"
)
if gdf.empty:
return [], None
# --- NEW: CRS EXTRACTION ---
# A valid GeoPackage layer will have CRS info. We extract it here.
# The frontend app will use this to correctly reproject the geometry for mapping.
source_crs_wkt = None
if gdf.crs:
source_crs_wkt = gdf.crs.to_wkt()
# ---------------------------
# Add the original 'fid' if it's not already a column (it's usually the index)
if 'fid' not in gdf.columns:
gdf.reset_index(inplace=True)
gdf.rename(columns={'index': 'fid'}, inplace=True)
# Convert geometry to WKT for JSON serialization and drop the binary geometry object
gdf['geom_wkt'] = gdf.geometry.to_wkt()
gdf.drop(columns='geometry', inplace=True)
# Replace Pandas/Numpy null types with None for clean JSON
gdf = gdf.replace({pd.NaT: None, np.nan: None})
# Restore the original search result order
gdf['sort_order'] = gdf['fid'].map(id_map)
gdf.sort_values('sort_order', inplace=True)
gdf.drop(columns='sort_order', inplace=True)
results_list = gdf.to_dict(orient='records')
return results_list, source_crs_wkt
def execute_tiered_search(address_query: str, config: dict):
"""Executes a multi-tiered search strategy using the provided county configuration."""
search_trace = []
clean_query = address_query.strip()
if not config["LATTICE_PATH"].exists():
return format_results_as_json("FAILURE", address_query, [], error_code="LATTICE_NOT_FOUND", error_message=f"Lattice file not found for {config['DISPLAY_NAME']} at '{config['LATTICE_PATH']}'.")
con = None
try:
# Connect in read-only mode to prevent accidental writes
con = sqlite3.connect(f"file:{config['LATTICE_PATH']}?mode=ro", uri=True)
con.row_factory = sqlite3.Row
cur = con.cursor()
# Pre-flight check for the FTS table
cur.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{config['FTS_TABLE_NAME']}';")
if not cur.fetchone():
return format_results_as_json("FAILURE", address_query, [], error_code="FTS_INDEX_MISSING", error_message=f"FTS index table '{config['FTS_TABLE_NAME']}' does not exist in {config['LATTICE_PATH']}. Please rebuild the lattice.")
row_ids = []
# --- Tier 1: High-Confidence Parcel ID Search ---
# Use the function defined in the config to see if the query looks like a parcel ID.
if config["ID_CHECK_FUNCTION"](clean_query):
# This is a direct SQL query against the main table, not FTS. It's faster for exact ID matches.
cur.execute(f"SELECT fid FROM {config['TABLE_NAME']} WHERE {config['ID_COLUMN']} = ?", (clean_query,))
row_ids = [row['fid'] for row in cur.fetchall()]
search_trace.append({"stage": "id_search", "result": f"{len(row_ids)}_matches"})
else:
search_trace.append({"stage": "id_search", "result": "skipped_not_id_format"})
# --- FTS-based Search Tiers (only if ID search yields no results) ---
if not row_ids:
# Prepare query for FTS by cleaning it and creating search terms
fts_query = clean_query.replace(',', ' ').replace('.', ' ').upper()
query_parts = fts_query.split()
# Tier 2: Precise FTS (All words - implicit AND logic)
# This looks for records containing all the search words, in any order.
precise_term = ' '.join(f'"{part}"' for part in query_parts) # Wrap in quotes for phrase-like matching
if precise_term:
cur.execute(f"SELECT rowid FROM {config['FTS_TABLE_NAME']} WHERE {config['FTS_TABLE_NAME']} MATCH ?", (precise_term,))
row_ids = [row['rowid'] for row in cur.fetchall()]
search_trace.append({"stage": "precise_fts_and", "term": precise_term, "result": f"{len(row_ids)}_matches"})
# Tier 3: Ranked FTS (BM25 relevance scoring - OR logic)
# If the precise search fails, this broader search ranks results by relevance.
if not row_ids and query_parts:
ranked_term = ' OR '.join(query_parts)
sql = f"""
SELECT rowid, bm25({config['FTS_TABLE_NAME']}) as rank
FROM {config['FTS_TABLE_NAME']}
WHERE {config['FTS_TABLE_NAME']} MATCH ? ORDER BY rank LIMIT ?;
"""
cur.execute(sql, (ranked_term, config['RANKED_SEARCH_LIMIT']))
row_ids = [row['rowid'] for row in cur.fetchall()]
search_trace.append({"stage": "ranked_fts_bm25", "term": ranked_term, "result": f"{len(row_ids)}_matches"})
# --- Final Data Fetching and Formatting ---
if row_ids:
# The fetch function now returns both results and CRS
results, source_crs = fetch_data_by_ids(row_ids, config)
successful_stage = next((s['stage'] for s in reversed(search_trace) if '_matches' in s['result'] and not s['result'].startswith('0')), 'unknown')
return format_results_as_json("SUCCESS", address_query, results, stage=successful_stage, source_crs_wkt=source_crs)
else:
# If all search tiers fail, return a failure message.
return format_results_as_json("FAILURE", address_query, [], error_code="NO_MATCH_FOUND", error_message="All search strategies were exhausted with no results.", search_trace=search_trace)
except Exception as e:
return format_results_as_json(status="FAILURE", query=address_query, results=[], error_code=type(e).__name__,
error_message=f"{str(e)}\n\nTRACEBACK:\n{traceback.format_exc()}", search_trace=search_trace)
finally:
if con:
con.close()
def main():
"""Main function to handle command-line arguments and run the appropriate search."""
parser = argparse.ArgumentParser(
description="Anamnesis-GIS Unified Query Engine. Queries a specified county's data lattice.",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"--county",
type=str,
required=True,
choices=list(COUNTY_CONFIGS.keys()),
help="Specify the county to query."
)
parser.add_argument(
"--query",
type=str,
help="Direct query string for non-interactive mode. The result is printed to stdout as JSON."
)
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}')
args = parser.parse_args()
# Get the selected county's configuration dictionary
selected_config = COUNTY_CONFIGS[args.county]
if args.query:
# Non-interactive mode (e.g., called by another script like Gradio)
json_output = execute_tiered_search(args.query, selected_config)
print(json_output)
else:
# Interactive mode (for direct command-line testing and use)
print(f"--- Anamnesis-GIS Interactive Query Engine (v{__version__}) ---")
print(f"--- Querying: {selected_config['DISPLAY_NAME']} | Lattice: {selected_config['LATTICE_PATH'].name} ---")
print("Enter a full or partial address or Parcel ID to search.")
print("Type 'exit' or 'quit' to end.")
while True:
try:
address_query = input("\nAddress search > ").strip()
if address_query.lower() in ['exit', 'quit']:
print("Exiting query engine. Goodbye!")
break
if not address_query:
continue
json_output = execute_tiered_search(address_query, selected_config)
print(json_output)
except (EOFError, KeyboardInterrupt):
print("\nExiting query engine. Goodbye!")
break
if __name__ == '__main__':
# A simple check to prevent the script from hanging if called without arguments in a non-interactive shell
if not sys.stdout.isatty() and len(sys.argv) == 1:
print(json.dumps({
"status": "FAILURE",
"error_code": "INVALID_INVOCATION",
"error_message": "Script called non-interactively without required arguments (--county and --query)."
}))
sys.exit(1)
__version__ = '2.1.0'
main()