tonic-discharge-guard / responseparser.py
Tonic's picture
complete response parser
bf761ef unverified
raw
history blame
21.4 kB
import json
from datetime import datetime
from typing import List, Dict, Optional, Union
class PatientDataExtractor:
"""Class to extract all fields from a FHIR Patient resource in a Bundle response."""
def __init__(self, patient_data: str):
"""Initialize with patient data in JSON string format."""
self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data
self.patients = self._extract_patients()
self.current_patient_idx = 0 # Default to first patient
def _extract_patients(self) -> List[Dict]:
"""Extract all patient entries from the Bundle."""
if self.data.get("resourceType") != "Bundle" or "entry" not in self.data:
raise ValueError("Invalid FHIR Bundle format")
return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"]
def set_patient_by_index(self, index: int) -> bool:
"""Set the current patient by index. Returns True if successful."""
if 0 <= index < len(self.patients):
self.current_patient_idx = index
return True
return False
def set_patient_by_id(self, patient_id: str) -> bool:
"""Set the current patient by FHIR Patient ID. Returns True if successful."""
for i, patient in enumerate(self.patients):
if patient["id"] == patient_id:
self.current_patient_idx = i
return True
return False
def _get_current_patient(self) -> Dict:
"""Get the currently selected patient resource."""
return self.patients[self.current_patient_idx]
# Basic Identification Fields
def get_id(self) -> str:
"""Extract FHIR Patient ID."""
return self._get_current_patient().get("id", "")
def get_resource_type(self) -> str:
"""Extract resource type (should always be 'Patient')."""
return self._get_current_patient().get("resourceType", "")
def get_meta_last_updated(self) -> str:
"""Extract last updated timestamp from meta."""
return self._get_current_patient().get("meta", {}).get("lastUpdated", "")
def get_meta_profile(self) -> List[str]:
"""Extract profile URIs from meta."""
return self._get_current_patient().get("meta", {}).get("profile", [])
def get_text_div(self) -> str:
"""Extract generated text narrative (div content)."""
return self._get_current_patient().get("text", {}).get("div", "")
# Name Fields
def get_first_name(self) -> str:
"""Extract patient's first name."""
patient = self._get_current_patient()
names = patient.get("name", [])
for name in names:
if name.get("use") == "official" and "given" in name:
return name["given"][0]
return ""
def get_last_name(self) -> str:
"""Extract patient's last name."""
patient = self._get_current_patient()
names = patient.get("name", [])
for name in names:
if name.get("use") == "official" and "family" in name:
return name["family"]
return ""
def get_middle_initial(self) -> str:
"""Extract patient's middle initial (second given name initial if present)."""
patient = self._get_current_patient()
names = patient.get("name", [])
for name in names:
if name.get("use") == "official" and "given" in name and len(name["given"]) > 1:
return name["given"][1][0]
return ""
def get_name_prefix(self) -> str:
"""Extract patient's name prefix (e.g., Mr., Mrs.)."""
patient = self._get_current_patient()
names = patient.get("name", [])
for name in names:
if name.get("use") == "official" and "prefix" in name:
return name["prefix"][0]
return ""
def get_maiden_name(self) -> str:
"""Extract patient's maiden name if available."""
patient = self._get_current_patient()
names = patient.get("name", [])
for name in names:
if name.get("use") == "maiden" and "family" in name:
return name["family"]
return ""
# Demographic Fields
def get_dob(self) -> str:
"""Extract patient's date of birth."""
return self._get_current_patient().get("birthDate", "")
def get_age(self) -> str:
"""Calculate patient's age based on birth date."""
dob = self.get_dob()
if not dob:
return ""
birth_date = datetime.strptime(dob, "%Y-%m-%d")
today = datetime.now()
age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
return str(age)
def get_gender(self) -> str:
"""Extract patient's gender."""
return self._get_current_patient().get("gender", "").capitalize()
def get_birth_sex(self) -> str:
"""Extract patient's birth sex from extensions."""
patient = self._get_current_patient()
for ext in patient.get("extension", []):
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex":
return ext.get("valueCode", "")
return ""
def get_multiple_birth(self) -> Union[bool, None]:
"""Extract multiple birth status."""
return self._get_current_patient().get("multipleBirthBoolean", None)
# Address Fields
def get_address_line(self) -> str:
"""Extract patient's street address."""
patient = self._get_current_patient()
addresses = patient.get("address", [])
return addresses[0]["line"][0] if addresses and "line" in addresses[0] else ""
def get_city(self) -> str:
"""Extract patient's city."""
patient = self._get_current_patient()
addresses = patient.get("address", [])
return addresses[0]["city"] if addresses and "city" in addresses[0] else ""
def get_state(self) -> str:
"""Extract patient's state."""
patient = self._get_current_patient()
addresses = patient.get("address", [])
return addresses[0]["state"] if addresses and "state" in addresses[0] else ""
def get_zip_code(self) -> str:
"""Extract patient's postal code."""
patient = self._get_current_patient()
addresses = patient.get("address", [])
return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else ""
def get_country(self) -> str:
"""Extract patient's country."""
patient = self._get_current_patient()
addresses = patient.get("address", [])
return addresses[0]["country"] if addresses and "country" in addresses[0] else ""
def get_geolocation(self) -> Dict[str, float]:
"""Extract geolocation (latitude and longitude) from address extension."""
patient = self._get_current_patient()
addresses = patient.get("address", [])
if not addresses:
return {"latitude": None, "longitude": None}
for ext in addresses[0].get("extension", []):
if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/geolocation":
geo = {}
for sub_ext in ext.get("extension", []):
if sub_ext.get("url") == "latitude":
geo["latitude"] = sub_ext.get("valueDecimal")
elif sub_ext.get("url") == "longitude":
geo["longitude"] = sub_ext.get("valueDecimal")
return geo
return {"latitude": None, "longitude": None}
# Contact Fields
def get_phone(self) -> str:
"""Extract patient's phone number."""
patient = self._get_current_patient()
telecoms = patient.get("telecom", [])
for telecom in telecoms:
if telecom.get("system") == "phone" and telecom.get("use") == "home":
return telecom.get("value", "")
return ""
# Identifiers
def get_identifiers(self) -> Dict[str, str]:
"""Extract all identifiers (e.g., SSN, MRN, Driver's License)."""
patient = self._get_current_patient()
identifiers = patient.get("identifier", [])
id_dict = {}
for id_entry in identifiers:
id_type = id_entry.get("type", {}).get("text", "Unknown")
id_dict[id_type] = id_entry.get("value", "")
return id_dict
# Extensions
def get_race(self) -> str:
"""Extract patient's race from extensions."""
patient = self._get_current_patient()
for ext in patient.get("extension", []):
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
for sub_ext in ext.get("extension", []):
if sub_ext.get("url") == "text":
return sub_ext.get("valueString", "")
return ""
def get_ethnicity(self) -> str:
"""Extract patient's ethnicity from extensions."""
patient = self._get_current_patient()
for ext in patient.get("extension", []):
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity":
for sub_ext in ext.get("extension", []):
if sub_ext.get("url") == "text":
return sub_ext.get("valueString", "")
return ""
def get_mothers_maiden_name(self) -> str:
"""Extract patient's mother's maiden name from extensions."""
patient = self._get_current_patient()
for ext in patient.get("extension", []):
if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName":
return ext.get("valueString", "")
return ""
def get_birth_place(self) -> Dict[str, str]:
"""Extract patient's birth place from extensions."""
patient = self._get_current_patient()
for ext in patient.get("extension", []):
if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/patient-birthPlace":
addr = ext.get("valueAddress", {})
return {
"city": addr.get("city", ""),
"state": addr.get("state", ""),
"country": addr.get("country", "")
}
return {"city": "", "state": "", "country": ""}
def get_disability_adjusted_life_years(self) -> Optional[float]:
"""Extract disability-adjusted life years from extensions."""
patient = self._get_current_patient()
for ext in patient.get("extension", []):
if ext.get("url") == "http://synthetichealth.github.io/synthea/disability-adjusted-life-years":
return ext.get("valueDecimal")
return None
def get_quality_adjusted_life_years(self) -> Optional[float]:
"""Extract quality-adjusted life years from extensions."""
patient = self._get_current_patient()
for ext in patient.get("extension", []):
if ext.get("url") == "http://synthetichealth.github.io/synthea/quality-adjusted-life-years":
return ext.get("valueDecimal")
return None
# Marital Status
def get_marital_status(self) -> str:
"""Extract patient's marital status."""
patient = self._get_current_patient()
status = patient.get("maritalStatus", {}).get("text", "")
return status if status else patient.get("maritalStatus", {}).get("coding", [{}])[0].get("display", "")
# Communication
def get_language(self) -> str:
"""Extract patient's preferred language."""
patient = self._get_current_patient()
comms = patient.get("communication", [])
return comms[0]["language"]["text"] if comms and "language" in comms[0] else ""
# Comprehensive Extraction
def get_all_patient_data(self) -> Dict[str, Union[str, Dict, List, float, bool, None]]:
"""Extract all available data for the current patient."""
return {
"id": self.get_id(),
"resource_type": self.get_resource_type(),
"meta_last_updated": self.get_meta_last_updated(),
"meta_profile": self.get_meta_profile(),
"text_div": self.get_text_div(),
"first_name": self.get_first_name(),
"last_name": self.get_last_name(),
"middle_initial": self.get_middle_initial(),
"name_prefix": self.get_name_prefix(),
"maiden_name": self.get_maiden_name(),
"dob": self.get_dob(),
"age": self.get_age(),
"gender": self.get_gender(),
"birth_sex": self.get_birth_sex(),
"multiple_birth": self.get_multiple_birth(),
"address_line": self.get_address_line(),
"city": self.get_city(),
"state": self.get_state(),
"zip_code": self.get_zip_code(),
"country": self.get_country(),
"geolocation": self.get_geolocation(),
"phone": self.get_phone(),
"identifiers": self.get_identifiers(),
"race": self.get_race(),
"ethnicity": self.get_ethnicity(),
"mothers_maiden_name": self.get_mothers_maiden_name(),
"birth_place": self.get_birth_place(),
"disability_adjusted_life_years": self.get_disability_adjusted_life_years(),
"quality_adjusted_life_years": self.get_quality_adjusted_life_years(),
"marital_status": self.get_marital_status(),
"language": self.get_language()
}
def get_patient_dict(self) -> Dict[str, str]:
"""Return a dictionary of patient data mapped to discharge form fields (for app.py compatibility)."""
patient_data = self.get_all_patient_data()
return {
"first_name": patient_data["first_name"],
"last_name": patient_data["last_name"],
"middle_initial": patient_data["middle_initial"],
"dob": patient_data["dob"],
"age": patient_data["age"],
"sex": patient_data["gender"],
"address": patient_data["address_line"],
"city": patient_data["city"],
"state": patient_data["state"],
"zip_code": patient_data["zip_code"],
"doctor_first_name": "",
"doctor_last_name": "",
"doctor_middle_initial": "",
"hospital_name": "",
"doctor_address": "",
"doctor_city": "",
"doctor_state": "",
"doctor_zip": "",
"admission_date": "",
"referral_source": "",
"admission_method": "",
"discharge_date": "",
"discharge_reason": "",
"date_of_death": "",
"diagnosis": "",
"procedures": "",
"medications": "",
"preparer_name": "",
"preparer_job_title": ""
}
def get_all_patients(self) -> List[Dict[str, str]]:
"""Return a list of dictionaries for all patients (for app.py)."""
original_idx = self.current_patient_idx
all_patients = []
for i in range(len(self.patients)):
self.set_patient_by_index(i)
all_patients.append(self.get_patient_dict())
self.set_patient_by_index(original_idx)
return all_patients
def get_patient_ids(self) -> List[str]:
"""Return a list of all patient IDs in the Bundle."""
return [patient["id"] for patient in self.patients]
# # Example usage with integration into app.py
# def integrate_with_app(patient_data: str):
# """Integrate PatientDataExtractor with the Gradio app."""
# extractor = PatientDataExtractor(patient_data)
# # Function to populate form with selected patient's data
# def populate_form(patient_id: str):
# if extractor.set_patient_by_id(patient_id):
# patient_dict = extractor.get_patient_dict()
# return list(patient_dict.values()) # Return values in order expected by display_form
# return [""] * 28 # Return empty values if patient not found
# # Modify the Gradio app to include patient selection
# with gr.Blocks() as demo:
# gr.Markdown("# Patient Discharge Form with MeldRx Integration")
# with gr.Tab("Authenticate with MeldRx"):
# gr.Markdown("## SMART on FHIR Authentication")
# auth_url_output = gr.Textbox(label="Authorization URL", value=CALLBACK_MANAGER.get_auth_url(), interactive=False)
# gr.Markdown("Copy the URL above, open it in a browser, log in, and paste the 'code' from the redirect URL below.")
# auth_code_input = gr.Textbox(label="Authorization Code")
# auth_submit = gr.Button("Submit Code")
# auth_result = gr.Textbox(label="Authentication Result")
# patient_data_button = gr.Button("Fetch Patient Data")
# patient_data_output = gr.Textbox(label="Patient Data")
# auth_submit.click(fn=CALLBACK_MANAGER.set_auth_code, inputs=auth_code_input, outputs=auth_result)
# patient_data_button.click(fn=CALLBACK_MANAGER.get_patient_data, inputs=None, outputs=patient_data_output)
# with gr.Tab("Discharge Form"):
# gr.Markdown("## Select Patient")
# patient_dropdown = gr.Dropdown(choices=extractor.get_patient_ids(), label="Select Patient ID")
# populate_button = gr.Button("Populate Form with Patient Data")
# gr.Markdown("## Patient Details")
# with gr.Row():
# first_name = gr.Textbox(label="First Name")
# last_name = gr.Textbox(label="Last Name")
# middle_initial = gr.Textbox(label="Middle Initial")
# with gr.Row():
# dob = gr.Textbox(label="Date of Birth")
# age = gr.Textbox(label="Age")
# sex = gr.Textbox(label="Sex")
# address = gr.Textbox(label="Address")
# with gr.Row():
# city = gr.Textbox(label="City")
# state = gr.Textbox(label="State")
# zip_code = gr.Textbox(label="Zip Code")
# gr.Markdown("## Primary Healthcare Professional Details")
# with gr.Row():
# doctor_first_name = gr.Textbox(label="Doctor's First Name")
# doctor_last_name = gr.Textbox(label="Doctor's Last Name")
# doctor_middle_initial = gr.Textbox(label="Middle Initial")
# hospital_name = gr.Textbox(label="Hospital/Clinic Name")
# doctor_address = gr.Textbox(label="Address")
# with gr.Row():
# doctor_city = gr.Textbox(label="City")
# doctor_state = gr.Textbox(label="State")
# doctor_zip = gr.Textbox(label="Zip Code")
# gr.Markdown("## Admission and Discharge Details")
# with gr.Row():
# admission_date = gr.Textbox(label="Date of Admission")
# referral_source = gr.Textbox(label="Source of Referral")
# admission_method = gr.Textbox(label="Method of Admission")
# with gr.Row():
# discharge_date = gr.Textbox(label="Date of Discharge")
# discharge_reason = gr.Radio(["Treated", "Transferred", "Discharge Against Advice", "Patient Died"], label="Discharge Reason")
# date_of_death = gr.Textbox(label="Date of Death (if applicable)")
# gr.Markdown("## Diagnosis & Procedures")
# diagnosis = gr.Textbox(label="Diagnosis")
# procedures = gr.Textbox(label="Operation & Procedures")
# gr.Markdown("## Medication Details")
# medications = gr.Textbox(label="Medication on Discharge")
# gr.Markdown("## Prepared By")
# with gr.Row():
# preparer_name = gr.Textbox(label="Name")
# preparer_job_title = gr.Textbox(label="Job Title")
# submit = gr.Button("Generate Form")
# output = gr.Markdown()
# # Inputs list for populate_form and display_form
# inputs_list = [
# first_name, last_name, middle_initial, dob, age, sex, address, city, state, zip_code,
# doctor_first_name, doctor_last_name, doctor_middle_initial, hospital_name, doctor_address,
# doctor_city, doctor_state, doctor_zip,
# admission_date, referral_source, admission_method, discharge_date, discharge_reason, date_of_death,
# diagnosis, procedures, medications, preparer_name, preparer_job_title
# ]
# # Populate form with patient data when button is clicked
# populate_button.click(
# fn=populate_form,
# inputs=patient_dropdown,
# outputs=inputs_list
# )
# # Generate the form output
# submit.click(
# display_form,
# inputs=inputs_list,
# outputs=output
# )
# return demo
# # Assuming patient_data is the JSON string from your example
# # patient_data = <your JSON string here>
# # demo = integrate_with_app(patient_data)
# # demo.launch()