lirony's picture
Update
ff5e06a
import json
from typing import Optional
from urllib.parse import quote
import hashlib
import sqlite3
from google.auth import default as get_auth_default
import google.auth.transport.requests as google_auth_requests
from langchain_core.tools import tool
import requests
fhir_resource_types = [
"Encounter",
"Practitioner",
"Condition",
"Observation",
"AllergyIntolerance",
"FamilyMemberHistory",
"MedicationRequest",
"MedicationStatement",
"MedicationAdministration",
"DiagnosticReport",
"Procedure",
"ServiceRequest",
]
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/cloud-healthcare",
]
FHIR_CACHE_DB = "fhir_cache.db"
def _init_fhir_cache():
conn = sqlite3.connect(FHIR_CACHE_DB)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS fhir_cache (
key TEXT PRIMARY KEY,
value TEXT
)
""")
conn.commit()
conn.close()
_init_fhir_cache()
def _get_fhir_resource(resource_path: str, fhir_store_url: str) -> dict:
"""Helper function to make an authenticated GET request to the FHIR store, with pagination and compaction."""
cache_key = hashlib.md5(f"{resource_path}:{fhir_store_url}".encode()).hexdigest()
conn = sqlite3.connect(FHIR_CACHE_DB)
cursor = conn.cursor()
try:
cursor.execute("SELECT value FROM fhir_cache WHERE key = ?", (cache_key,))
result = cursor.fetchone()
if result:
print(f"...[Tool] Cache hit for: {fhir_store_url}/{resource_path}")
return json.loads(result[0])
except Exception as e:
print(f"...[Tool] Cache read error: {str(e)}")
# If cache read fails, we proceed without cache
try:
credentials, _ = get_auth_default(scopes=SCOPES)
request = google_auth_requests.Request()
credentials.refresh(request)
headers = {"Authorization": f"Bearer {credentials.token}"}
all_entries = []
url = f"{fhir_store_url}/{resource_path}"
while url:
print(f"...[Tool] Making request to: {url}")
response = requests.get(url, headers=headers)
response.raise_for_status()
current_page = response.json()
if "entry" in current_page:
all_entries.extend(current_page["entry"])
url = None # Reset url for each iteration
for link in current_page.get("link", []):
if link.get("relation") == "next":
url = link.get("url")
break
# Reconstruct the bundle with all entries
data = {
"resourceType": "Bundle",
"type": "searchset",
"total": len(all_entries),
"entry": all_entries,
}
def clean(obj):
# Remove .resource.meta (timestamps/versions) from all objects
if isinstance(obj, list):
return [clean(i) for i in obj]
if isinstance(obj, dict):
return {k: clean(v) for k, v in obj.items() if k != "meta"}
return (
obj.split("/fhir/")[-1]
if isinstance(obj, str) and "/fhir/" in obj
else obj
)
# [OPTIONAL] Strip technical metadata and shorten URLs
for e in all_entries:
e.pop("fullUrl", None)
e.pop("search", None)
if "resource" in e:
e["resource"] = clean(e["resource"])
try:
cursor.execute(
"INSERT INTO fhir_cache (key, value) VALUES (?, ?)",
(cache_key, json.dumps(data)),
)
conn.commit()
except Exception as e:
print(f"...[Tool] Cache write error: {str(e)}")
return data
except Exception as e:
print(f"...[Tool] Error: {str(e)}")
return {"error": f"An error occurred: {str(e)}"}
finally:
conn.close()
@tool
def get_patient_fhir_resource(
patient_id: str,
fhir_resource: str,
fhir_store_url: str,
filter_code: Optional[str] = None,
) -> str:
"""Gets a list of FHIR resources for a single patient.
patient_id: The ID of the patient. fhir_resource: The FHIR resource type to
retrieve (Observation, Condition, MedicationRequest, etc.) fhir_store_url: The
URL of the FHIR store. filter_code: A comma seperated list of code filter to
apply to the resource (34117-2, 171207006, 82667-7, 8867-4, etc)
"""
resource_path = f"{fhir_resource}?patient=Patient/{patient_id}"
if filter_code:
resource_path += f"&code={quote(filter_code.replace(' ', ''))}"
if "Medication" in fhir_resource:
resource_path += f"&_include={fhir_resource}:medication"
content = _get_fhir_resource(resource_path, fhir_store_url)
# If the initial call with code:text returns no results, try with category:text
if content.get("total", 0) == 0 and filter_code:
print(
"...[Tool] No results found with 'code:text'. Retrying with"
" 'category:text'..."
)
resource_path = f"{fhir_resource}?patient=Patient/{patient_id}&category={quote(filter_code)}"
content = _get_fhir_resource(resource_path, fhir_store_url)
print(
f"...[Tool] Returning {len(content.get('entry', []))} results for"
f" {fhir_resource}"
)
return json.dumps(content)
@tool
def get_patient_data_manifest(patient_id: str, fhir_store_url: str) -> str:
"""Gets a manifest of all available FHIR resources and their codes for a patient by
querying the patient's entire record. Use this tool first to discover what
data is available.
"""
manifest = {}
for resource_type in fhir_resource_types:
resource_path = f"{resource_type}?patient=Patient/{patient_id}"
print(
f"...[Tool] Discovering all available {resource_type} resources for"
f" patient: {patient_id}"
)
resources_json = _get_fhir_resource(resource_path, fhir_store_url)
if isinstance(resources_json, dict) and resources_json.get("total", 0) > 0:
for entry in resources_json.get("entry", []):
resource = entry.get("resource", {})
if resource_type not in manifest:
manifest[resource_type] = []
if "code" in resource and "coding" in resource["code"]:
for code in resource.get("code").get("coding", []):
manifest[resource_type].append(
f'{code.get("display", "")}={code.get("code", "")}'
)
else:
print(
f"...[Tool] No {resource_type} resources found for patient:"
f" {patient_id}"
)
return json.dumps(manifest)