File size: 6,306 Bytes
ff5e06a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
import json
from typing import Optional
from urllib.parse import quote
import hashlib
import sqlite3
from google.auth import default as get_auth_default
import google.auth.transport.requests as google_auth_requests
from langchain_core.tools import tool
import requests
fhir_resource_types = [
"Encounter",
"Practitioner",
"Condition",
"Observation",
"AllergyIntolerance",
"FamilyMemberHistory",
"MedicationRequest",
"MedicationStatement",
"MedicationAdministration",
"DiagnosticReport",
"Procedure",
"ServiceRequest",
]
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/cloud-healthcare",
]
FHIR_CACHE_DB = "fhir_cache.db"
def _init_fhir_cache():
conn = sqlite3.connect(FHIR_CACHE_DB)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS fhir_cache (
key TEXT PRIMARY KEY,
value TEXT
)
""")
conn.commit()
conn.close()
_init_fhir_cache()
def _get_fhir_resource(resource_path: str, fhir_store_url: str) -> dict:
"""Helper function to make an authenticated GET request to the FHIR store, with pagination and compaction."""
cache_key = hashlib.md5(f"{resource_path}:{fhir_store_url}".encode()).hexdigest()
conn = sqlite3.connect(FHIR_CACHE_DB)
cursor = conn.cursor()
try:
cursor.execute("SELECT value FROM fhir_cache WHERE key = ?", (cache_key,))
result = cursor.fetchone()
if result:
print(f"...[Tool] Cache hit for: {fhir_store_url}/{resource_path}")
return json.loads(result[0])
except Exception as e:
print(f"...[Tool] Cache read error: {str(e)}")
# If cache read fails, we proceed without cache
try:
credentials, _ = get_auth_default(scopes=SCOPES)
request = google_auth_requests.Request()
credentials.refresh(request)
headers = {"Authorization": f"Bearer {credentials.token}"}
all_entries = []
url = f"{fhir_store_url}/{resource_path}"
while url:
print(f"...[Tool] Making request to: {url}")
response = requests.get(url, headers=headers)
response.raise_for_status()
current_page = response.json()
if "entry" in current_page:
all_entries.extend(current_page["entry"])
url = None # Reset url for each iteration
for link in current_page.get("link", []):
if link.get("relation") == "next":
url = link.get("url")
break
# Reconstruct the bundle with all entries
data = {
"resourceType": "Bundle",
"type": "searchset",
"total": len(all_entries),
"entry": all_entries,
}
def clean(obj):
# Remove .resource.meta (timestamps/versions) from all objects
if isinstance(obj, list):
return [clean(i) for i in obj]
if isinstance(obj, dict):
return {k: clean(v) for k, v in obj.items() if k != "meta"}
return (
obj.split("/fhir/")[-1]
if isinstance(obj, str) and "/fhir/" in obj
else obj
)
# [OPTIONAL] Strip technical metadata and shorten URLs
for e in all_entries:
e.pop("fullUrl", None)
e.pop("search", None)
if "resource" in e:
e["resource"] = clean(e["resource"])
try:
cursor.execute(
"INSERT INTO fhir_cache (key, value) VALUES (?, ?)",
(cache_key, json.dumps(data)),
)
conn.commit()
except Exception as e:
print(f"...[Tool] Cache write error: {str(e)}")
return data
except Exception as e:
print(f"...[Tool] Error: {str(e)}")
return {"error": f"An error occurred: {str(e)}"}
finally:
conn.close()
@tool
def get_patient_fhir_resource(
patient_id: str,
fhir_resource: str,
fhir_store_url: str,
filter_code: Optional[str] = None,
) -> str:
"""Gets a list of FHIR resources for a single patient.
patient_id: The ID of the patient. fhir_resource: The FHIR resource type to
retrieve (Observation, Condition, MedicationRequest, etc.) fhir_store_url: The
URL of the FHIR store. filter_code: A comma seperated list of code filter to
apply to the resource (34117-2, 171207006, 82667-7, 8867-4, etc)
"""
resource_path = f"{fhir_resource}?patient=Patient/{patient_id}"
if filter_code:
resource_path += f"&code={quote(filter_code.replace(' ', ''))}"
if "Medication" in fhir_resource:
resource_path += f"&_include={fhir_resource}:medication"
content = _get_fhir_resource(resource_path, fhir_store_url)
# If the initial call with code:text returns no results, try with category:text
if content.get("total", 0) == 0 and filter_code:
print(
"...[Tool] No results found with 'code:text'. Retrying with"
" 'category:text'..."
)
resource_path = f"{fhir_resource}?patient=Patient/{patient_id}&category={quote(filter_code)}"
content = _get_fhir_resource(resource_path, fhir_store_url)
print(
f"...[Tool] Returning {len(content.get('entry', []))} results for"
f" {fhir_resource}"
)
return json.dumps(content)
@tool
def get_patient_data_manifest(patient_id: str, fhir_store_url: str) -> str:
"""Gets a manifest of all available FHIR resources and their codes for a patient by
querying the patient's entire record. Use this tool first to discover what
data is available.
"""
manifest = {}
for resource_type in fhir_resource_types:
resource_path = f"{resource_type}?patient=Patient/{patient_id}"
print(
f"...[Tool] Discovering all available {resource_type} resources for"
f" patient: {patient_id}"
)
resources_json = _get_fhir_resource(resource_path, fhir_store_url)
if isinstance(resources_json, dict) and resources_json.get("total", 0) > 0:
for entry in resources_json.get("entry", []):
resource = entry.get("resource", {})
if resource_type not in manifest:
manifest[resource_type] = []
if "code" in resource and "coding" in resource["code"]:
for code in resource.get("code").get("coding", []):
manifest[resource_type].append(
f'{code.get("display", "")}={code.get("code", "")}'
)
else:
print(
f"...[Tool] No {resource_type} resources found for patient:"
f" {patient_id}"
)
return json.dumps(manifest)
|