openEHR2FHIRquestionnaire / fill_composition_from_response.py
jbuch's picture
remove template_id from response script and UI
057ba8a
import json
from typing import Dict, List, Any, Optional
from collections import defaultdict
import argparse
from datetime import datetime, timezone
import os
import requests
import locale
def process_questionnaire_bundle(bundle_json: dict, ctx_setting="238", ctx_territory=None) -> List[Dict[str, Any]]:
"""Processes a FHIR Bundle containing multiple QuestionnaireResponses."""
compositions = []
if bundle_json.get("resourceType") == "QuestionnaireResponse":
composition = convert_fhir_to_openehr_flat(bundle_json, ctx_setting=ctx_setting, ctx_territory=ctx_territory)
questionnaire_ref = bundle_json.get("questionnaire", "")
compositions.append({
"questionnaire": questionnaire_ref,
"composition": composition
})
return compositions # early return if single QuestionnaireResponse (in [] anyway)
for entry in bundle_json.get("entry", []):
resource = entry.get("resource", {})
if resource.get("resourceType") == "Practitioner":
practitioner_id = resource.get("id", "")
elif resource.get("resourceType") == "Encounter":
encounter_id = resource.get("id", "") # visit number (?)
elif resource.get("resourceType") == "QuestionnaireResponse":
print(f"Processing QuestionnaireResponse: {resource.get('id', 'unknown')}")
qr = resource
questionnaire_ref = qr.get("questionnaire", "")
composition = convert_fhir_to_openehr_flat(qr, ctx_setting=ctx_setting, ctx_territory=ctx_territory, ctx_author=practitioner_id)
compositions.append({
"questionnaire": questionnaire_ref,
"composition": composition
})
else:
continue
return compositions
def convert_fhir_to_openehr_flat(questionnaire_response: Dict[str, Any], ctx_setting=None, ctx_territory=None, ctx_author=None) -> Dict[str, Any]:
composition = {}
#if not template_id:
# try:
# questionnaire = fetch_questionnaire_from_server(questionnaire_response.get("questionnaire"))
# metadata_questionnaire = extract_metadata_from_questionnaire(questionnaire)
# template_id_composition = metadata_questionnaire.get("template_id", None)
# except Exception as e:
# print(f"Warning: could not fetch/extract template_id: {e}")
# template_id_composition = "openehr.place_holder_template.v1"
#else:
# template_id_composition = template_id
#questionnaire = fetch_questionnaire_from_server(questionnaire_response.get("questionnaire"))
#metadata_questionnaire = extract_metadata_from_questionnaire(questionnaire)
if not ctx_author:
ctx_author = questionnaire_response.get("author", {}).get("display", "Unknown Author")
### NOTE: doesn't work for gradio/huggingface
if not ctx_territory:
ctx_territory = locale.getdefaultlocale() # e.g., ('en_US', 'UTF-8')
if ctx_territory and '_' in ctx_territory[0]:
ctx_territory = ctx_territory[0].split('_')[1] # → "US"
language_response = questionnaire_response.get("language", "en")
ctx_values = {
#"ctx/template_id": metadata_questionnaire["template_id"],
#"ctx/template_id": template_id_composition,
"ctx/territory": ctx_territory,
#"ctx/language": metadata_questionnaire["language"],
"ctx/language": language_response,
"ctx/composer_name": ctx_author, # author
#"ctx/setting":
}
if ctx_setting:
ctx_values["ctx/setting"] = ctx_setting
composition.update(ctx_values)
#group_counters = defaultdict(int) # Keeps count of group-level indices
#group_stack = [] # Stack of current parent groups
def process_items(items: List[Dict[str, Any]], parent_path: str = ""):
grouped_by_link = defaultdict(list)
for item in items:
grouped_by_link[item["linkId"]].append(item)
#print(f"Processing items with parent path: {parent_path}")
#print("Grouped items by linkId:")
#for k, v in grouped_by_link.items():
# print(k, v)
for link_id, group_items in grouped_by_link.items():
for index, item in enumerate(group_items):
# Only append index if repeated group
last_part = link_id.split("/")[-1]
if len(group_items) > 1:
path = f"{parent_path}/{last_part}:{index}" if parent_path else f"{link_id}:{index}"
else:
path = f"{parent_path}/{last_part}" if parent_path else link_id
#if len(group_items) > 1:
# path = f"{link_id}:{index}"
#else:
# path = link_id
process_item(item, path)
def process_item(item: Dict[str, Any], path: str):
# Answers
if "answer" in item:
answers = item["answer"]
for idx, answer in enumerate(answers):
final_path = f"{path}:{idx}" if len(answers) > 1 else path
process_answer(final_path, answer)
# Nested items
if "item" in item:
process_items(item["item"], parent_path=path)
def process_answer(path: str, answer: Dict[str, Any]):
if 'valueQuantity' in answer:
quantity = answer['valueQuantity']
composition[f"{path}|magnitude"] = quantity.get('value')
composition[f"{path}|unit"] = quantity.get('unit')
composition[f"{path}|precision"] = quantity.get('precision', 0)
elif 'valueCoding' in answer:
coding = answer['valueCoding']
composition[f"{path}|value"] = coding.get('display')
composition[f"{path}|code"] = coding.get('code')
composition[f"{path}|terminology"] = coding.get('system', 'local')
elif 'valueString' in answer:
composition[path] = answer['valueString']
elif 'valueBoolean' in answer:
composition[path] = answer['valueBoolean']
elif 'valueInteger' in answer:
composition[path] = answer['valueInteger']
elif 'valueDecimal' in answer:
composition[path] = answer['valueDecimal']
elif 'valueDate' in answer:
composition[path] = answer['valueDate']
elif 'valueDateTime' in answer:
composition[path] = answer['valueDateTime']
elif 'valueTime' in answer:
composition[path] = answer['valueTime']
elif 'valueUri' in answer:
composition[path] = answer['valueUri']
elif 'valueReference' in answer:
reference = answer['valueReference']
composition[path] = reference.get('reference')
# Kick off the process
if "item" in questionnaire_response:
process_items(questionnaire_response["item"])
return composition
def fetch_questionnaire_from_server(canonical_url: str) -> dict:
#base_url, sep, version = canonical_url.partition("|")
try:
response = requests.get(canonical_url)
response.raise_for_status()
questionnaire = response.json()
# Optional: verify version
#if version and questionnaire.get("version") != version:
# raise ValueError(f"Version mismatch: expected {version}, got {questionnaire.get('version')}")
return questionnaire
except requests.RequestException as e:
raise RuntimeError(f"Failed to fetch Questionnaire: {e}")
def extract_metadata_from_questionnaire(questionnaire: dict):
# Extract template ID from identifier
#template_id = None
#for idf in questionnaire.get("identifier", []):
# if idf.get("system") == "http://example.org/openEHR/templates":
# template_id = idf.get("value")
# break
language = questionnaire.get("language", "en")
return {
# "template_id": template_id,
"language": language
}
# Run the example
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Creates an openEHR composition from a questionnaireResponse."
)
parser.add_argument("--input", help="Path to the input questionnaireResponse JSON file")
#parser.add_argument(
# "--template_id",
# required=False,
# help="Template ID for the openEHR composition"
#)
parser.add_argument(
"--care_setting",
required=False,
help="Care setting for the openEHR composition, either 3-digit code or description (example: 228 / primary medical care)"
)
parser.add_argument(
"--territory",
required=False,
help="Territory: 2-character code according to ISO 3166-1 (example: 'US')." \
"If not provided, will default to 'US'" \
)
parser.add_argument(
"--output",
required=False,
help="Base name for the output questionnaireResponse"
)
parser.add_argument(
"--output_folder",
required=False,
default=".",
help="Output folder path"
)
args = parser.parse_args()
if not args.input:
### individual questionnaire responses:
#args.input = "../outputs/questionnaires/testing/20251007_0907-heart_sounds_response.json" # Default input file if not provided
#args.input = "../outputs/questionnaires/testing/20251007_0924-medication_order_response.json"
#args.input = "../outputs/questionnaires/testing/20250725-1032_BloodPressure_Response.json"
### bundles:
#args.input = "../outputs/questionnaires/testing/Bundle-CollectionBundleK6_adapted.json"
args.input = "../outputs/questionnaires/testing/Bundle-CollectionBundleK6_adapted2.json"
# You can choose how to handle the output naming convention. For example:
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
# set (default) output base name:
if args.output:
base_name = args.output
else:
base_name = os.path.splitext(os.path.basename(args.input))[0]
# example command for local testing:
# python fill_composition_from_response.py --input ../outputs/questionnaires/testing/cistec.openehr.blood_pressure.v1.json --languages en --fhir_version R4 --publisher "Command local" --output_folder ../outputs/questionnaires/testing
# Convert to openEHR composition
fhir_response = json.load(open(args.input, 'r', encoding='utf-8'))
#compositions = convert_fhir_to_openehr_flat(fhir_response)
compositions = process_questionnaire_bundle(fhir_response, ctx_setting=args.care_setting, ctx_territory=args.territory)
# Print the result
print("openEHR Composition(s) (FLAT format):")
#print(json.dumps(compositions, indent=2))
for comp in compositions:
print(json.dumps(comp, indent=2))
#print(json.dumps(fhir_response, indent=2))
#out_file = os.path.join(args.output_folder, f"{timestamp}-{base_name}-{lang}.json")