tonic-discharge-guard / responseparser.py
nchandu2023's picture
Update responseparser.py (#5)
5960b3d verified
raw
history blame
22.7 kB
import lxml.etree as etree
from datetime import datetime
from typing import List, Dict, Optional, Union
class PatientDataExtractor:
"""Class to extract all fields from a FHIR Patient resource in a Bundle response (XML format)."""
def __init__(self, patient_data: str):
"""Initialize with patient data in XML string format."""
# Parse XML string or use pre-parsed data
self.data = etree.fromstring(patient_data) if isinstance(patient_data, str) else patient_data
# Define FHIR namespace for XPath queries
self.ns = {'fhir': 'http://hl7.org/fhir'}
self.patients = self._extract_patients()
self.current_patient_idx = 0 # Default to first patient
def _extract_patients(self) -> List[etree._Element]:
"""Extract all patient entries from the Bundle."""
# Use XPath to find all Patient elements in the Bundle
return self.data.xpath("//fhir:entry/fhir:resource/fhir:Patient", namespaces=self.ns)
def set_patient_by_index(self, index: int) -> bool:
"""Set the current patient by index. Returns True if successful."""
if 0 <= index < len(self.patients):
self.current_patient_idx = index
return True
return False
def set_patient_by_id(self, patient_id: str) -> bool:
"""Set the current patient by FHIR Patient ID. Returns True if successful."""
for i, patient in enumerate(self.patients):
if patient.attrib.get("id") == patient_id:
self.current_patient_idx = i
return True
return False
def _get_current_patient(self) -> etree._Element:
"""Get the currently selected patient resource."""
return self.patients[self.current_patient_idx]
# Basic Identification Fields
def get_id(self) -> str:
"""Extract FHIR Patient ID."""
return self._get_current_patient().attrib.get("id", "")
def get_resource_type(self) -> str:
"""Extract resource type (should always be 'Patient')."""
return etree.QName(self._get_current_patient().tag).localname
def get_meta_last_updated(self) -> str:
"""Extract last updated timestamp from meta."""
patient = self._get_current_patient()
last_updated = patient.xpath("fhir:meta/fhir:lastUpdated/@value", namespaces=self.ns)
return last_updated[0] if last_updated else ""
def get_meta_profile(self) -> List[str]:
"""Extract profile URIs from meta."""
patient = self._get_current_patient()
profiles = patient.xpath("fhir:meta/fhir:profile/@value", namespaces=self.ns)
return profiles
def get_text_div(self) -> str:
"""Extract generated text narrative (div content)."""
patient = self._get_current_patient()
div = patient.xpath("fhir:text/fhir:div", namespaces=self.ns)
if div:
return etree.tostring(div[0], encoding="unicode")
return ""
# Name Fields
def get_first_name(self) -> str:
"""Extract patient's first name."""
patient = self._get_current_patient()
official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns)
if official_names:
given = official_names[0].xpath("fhir:given/@value", namespaces=self.ns)
if given:
return given[0]
return ""
def get_last_name(self) -> str:
"""Extract patient's last name."""
patient = self._get_current_patient()
official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns)
if official_names:
family = official_names[0].xpath("fhir:family/@value", namespaces=self.ns)
if family:
return family[0]
return ""
def get_middle_initial(self) -> str:
"""Extract patient's middle initial (second given name initial if present)."""
patient = self._get_current_patient()
official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns)
if official_names:
given = official_names[0].xpath("fhir:given/@value", namespaces=self.ns)
if len(given) > 1:
return given[1][0]
return ""
def get_name_prefix(self) -> str:
"""Extract patient's name prefix (e.g., Mr., Mrs.)."""
patient = self._get_current_patient()
official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns)
if official_names:
prefix = official_names[0].xpath("fhir:prefix/@value", namespaces=self.ns)
if prefix:
return prefix[0]
return ""
def get_maiden_name(self) -> str:
"""Extract patient's maiden name if available."""
patient = self._get_current_patient()
maiden_names = patient.xpath("fhir:name[fhir:use/@value='maiden']", namespaces=self.ns)
if maiden_names:
family = maiden_names[0].xpath("fhir:family/@value", namespaces=self.ns)
if family:
return family[0]
return ""
# Demographic Fields
def get_dob(self) -> str:
"""Extract patient's date of birth."""
patient = self._get_current_patient()
dob = patient.xpath("fhir:birthDate/@value", namespaces=self.ns)
return dob[0] if dob else ""
def get_age(self) -> str:
"""Calculate patient's age based on birth date."""
dob = self.get_dob()
if not dob:
return ""
birth_date = datetime.strptime(dob, "%Y-%m-%d")
today = datetime.now()
age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
return str(age)
def get_gender(self) -> str:
"""Extract patient's gender."""
patient = self._get_current_patient()
gender = patient.xpath("fhir:gender/@value", namespaces=self.ns)
return gender[0].capitalize() if gender else ""
def get_birth_sex(self) -> str:
"""Extract patient's birth sex from extensions."""
patient = self._get_current_patient()
birth_sex = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex']/fhir:valueCode/@value", namespaces=self.ns)
return birth_sex[0] if birth_sex else ""
def get_multiple_birth(self) -> Union[bool, None]:
"""Extract multiple birth status."""
patient = self._get_current_patient()
multiple_birth = patient.xpath("fhir:multipleBirthBoolean/@value", namespaces=self.ns)
return multiple_birth[0] == "true" if multiple_birth else None
# Address Fields
def get_address_line(self) -> str:
"""Extract patient's street address."""
patient = self._get_current_patient()
line = patient.xpath("fhir:address/fhir:line/@value", namespaces=self.ns)
return line[0] if line else ""
def get_city(self) -> str:
"""Extract patient's city."""
patient = self._get_current_patient()
city = patient.xpath("fhir:address/fhir:city/@value", namespaces=self.ns)
return city[0] if city else ""
def get_state(self) -> str:
"""Extract patient's state."""
patient = self._get_current_patient()
state = patient.xpath("fhir:address/fhir:state/@value", namespaces=self.ns)
return state[0] if state else ""
def get_zip_code(self) -> str:
"""Extract patient's postal code."""
patient = self._get_current_patient()
postal_code = patient.xpath("fhir:address/fhir:postalCode/@value", namespaces=self.ns)
return postal_code[0] if postal_code else ""
def get_country(self) -> str:
"""Extract patient's country."""
patient = self._get_current_patient()
country = patient.xpath("fhir:address/fhir:country/@value", namespaces=self.ns)
return country[0] if country else ""
def get_geolocation(self) -> Dict[str, float]:
"""Extract geolocation (latitude and longitude) from address extension."""
patient = self._get_current_patient()
lat = patient.xpath("fhir:address/fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/geolocation']/fhir:extension[@url='latitude']/fhir:valueDecimal/@value", namespaces=self.ns)
lon = patient.xpath("fhir:address/fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/geolocation']/fhir:extension[@url='longitude']/fhir:valueDecimal/@value", namespaces=self.ns)
return {
"latitude": float(lat[0]) if lat else None,
"longitude": float(lon[0]) if lon else None
}
# Contact Fields
def get_phone(self) -> str:
"""Extract patient's phone number."""
patient = self._get_current_patient()
phone = patient.xpath("fhir:telecom[fhir:system/@value='phone' and fhir:use/@value='home']/fhir:value/@value", namespaces=self.ns)
return phone[0] if phone else ""
# Identifiers
def get_identifiers(self) -> Dict[str, str]:
"""Extract all identifiers (e.g., SSN, MRN, Driver's License)."""
patient = self._get_current_patient()
id_dict = {}
identifiers = patient.xpath("fhir:identifier", namespaces=self.ns)
for id_entry in identifiers:
id_type = id_entry.xpath("fhir:type/fhir:text/@value", namespaces=self.ns)
id_value = id_entry.xpath("fhir:value/@value", namespaces=self.ns)
if id_type and id_value:
id_dict[id_type[0]] = id_value[0]
return id_dict
# Extensions
def get_race(self) -> str:
"""Extract patient's race from extensions."""
patient = self._get_current_patient()
race = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-race']/fhir:extension[@url='text']/fhir:valueString/@value", namespaces=self.ns)
return race[0] if race else ""
def get_ethnicity(self) -> str:
"""Extract patient's ethnicity from extensions."""
patient = self._get_current_patient()
ethnicity = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity']/fhir:extension[@url='text']/fhir:valueString/@value", namespaces=self.ns)
return ethnicity[0] if ethnicity else ""
def get_mothers_maiden_name(self) -> str:
"""Extract patient's mother's maiden name from extensions."""
patient = self._get_current_patient()
mothers_maiden = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName']/fhir:valueString/@value", namespaces=self.ns)
return mothers_maiden[0] if mothers_maiden else ""
def get_birth_place(self) -> Dict[str, str]:
"""Extract patient's birth place from extensions."""
patient = self._get_current_patient()
birth_place = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/patient-birthPlace']/fhir:valueAddress", namespaces=self.ns)
if birth_place:
city = birth_place[0].xpath("fhir:city/@value", namespaces=self.ns)
state = birth_place[0].xpath("fhir:state/@value", namespaces=self.ns)
country = birth_place[0].xpath("fhir:country/@value", namespaces=self.ns)
return {
"city": city[0] if city else "",
"state": state[0] if state else "",
"country": country[0] if country else ""
}
return {"city": "", "state": "", "country": ""}
def get_disability_adjusted_life_years(self) -> Optional[float]:
"""Extract disability-adjusted life years from extensions."""
patient = self._get_current_patient()
daly = patient.xpath("fhir:extension[@url='http://synthetichealth.github.io/synthea/disability-adjusted-life-years']/fhir:valueDecimal/@value", namespaces=self.ns)
return float(daly[0]) if daly else None
def get_quality_adjusted_life_years(self) -> Optional[float]:
"""Extract quality-adjusted life years from extensions."""
patient = self._get_current_patient()
qaly = patient.xpath("fhir:extension[@url='http://synthetichealth.github.io/synthea/quality-adjusted-life-years']/fhir:valueDecimal/@value", namespaces=self.ns)
return float(qaly[0]) if qaly else None
# Marital Status
def get_marital_status(self) -> str:
"""Extract patient's marital status."""
patient = self._get_current_patient()
status = patient.xpath("fhir:maritalStatus/fhir:text/@value", namespaces=self.ns)
if status:
return status[0]
coding = patient.xpath("fhir:maritalStatus/fhir:coding/fhir:display/@value", namespaces=self.ns)
return coding[0] if coding else ""
# Communication
def get_language(self) -> str:
"""Extract patient's preferred language."""
patient = self._get_current_patient()
language = patient.xpath("fhir:communication/fhir:language/fhir:text/@value", namespaces=self.ns)
return language[0] if language else ""
# Comprehensive Extraction
def get_all_patient_data(self) -> Dict[str, Union[str, Dict, List, float, bool, None]]:
"""Extract all available data for the current patient."""
return {
"id": self.get_id(),
"resource_type": self.get_resource_type(),
"meta_last_updated": self.get_meta_last_updated(),
"meta_profile": self.get_meta_profile(),
"text_div": self.get_text_div(),
"first_name": self.get_first_name(),
"last_name": self.get_last_name(),
"middle_initial": self.get_middle_initial(),
"name_prefix": self.get_name_prefix(),
"maiden_name": self.get_maiden_name(),
"dob": self.get_dob(),
"age": self.get_age(),
"gender": self.get_gender(),
"birth_sex": self.get_birth_sex(),
"multiple_birth": self.get_multiple_birth(),
"address_line": self.get_address_line(),
"city": self.get_city(),
"state": self.get_state(),
"zip_code": self.get_zip_code(),
"country": self.get_country(),
"geolocation": self.get_geolocation(),
"phone": self.get_phone(),
"identifiers": self.get_identifiers(),
"race": self.get_race(),
"ethnicity": self.get_ethnicity(),
"mothers_maiden_name": self.get_mothers_maiden_name(),
"birth_place": self.get_birth_place(),
"disability_adjusted_life_years": self.get_disability_adjusted_life_years(),
"quality_adjusted_life_years": self.get_quality_adjusted_life_years(),
"marital_status": self.get_marital_status(),
"language": self.get_language()
}
def get_patient_dict(self) -> Dict[str, str]:
"""Return a dictionary of patient data mapped to discharge form fields (for app.py compatibility)."""
patient_data = self.get_all_patient_data()
return {
"first_name": patient_data["first_name"],
"last_name": patient_data["last_name"],
"middle_initial": patient_data["middle_initial"],
"dob": patient_data["dob"],
"age": patient_data["age"],
"sex": patient_data["gender"],
"address": patient_data["address_line"],
"city": patient_data["city"],
"state": patient_data["state"],
"zip_code": patient_data["zip_code"],
"doctor_first_name": "",
"doctor_last_name": "",
"doctor_middle_initial": "",
"hospital_name": "",
"doctor_address": "",
"doctor_city": "",
"doctor_state": "",
"doctor_zip": "",
"admission_date": "",
"referral_source": "",
"admission_method": "",
"discharge_date": "",
"discharge_reason": "",
"date_of_death": "",
"diagnosis": "",
"procedures": "",
"medications": "",
"preparer_name": "",
"preparer_job_title": ""
}
def get_all_patients(self) -> List[Dict[str, str]]:
"""Return a list of dictionaries for all patients (for app.py)."""
original_idx = self.current_patient_idx
all_patients = []
for i in range(len(self.patients)):
self.set_patient_by_index(i)
all_patients.append(self.get_patient_dict())
self.set_patient_by_index(original_idx)
return all_patients
def get_patient_ids(self) -> List[str]:
"""Return a list of all patient IDs in the Bundle."""
return [patient.attrib.get("id", "") for patient in self.patients]
# # Example usage with integration into app.py
# def integrate_with_app(patient_data: str):
# """Integrate PatientDataExtractor with the Gradio app."""
# extractor = PatientDataExtractor(patient_data)
# # Function to populate form with selected patient's data
# def populate_form(patient_id: str):
# if extractor.set_patient_by_id(patient_id):
# patient_dict = extractor.get_patient_dict()
# return list(patient_dict.values()) # Return values in order expected by display_form
# return [""] * 28 # Return empty values if patient not found
# # Modify the Gradio app to include patient selection
# with gr.Blocks() as demo:
# gr.Markdown("# Patient Discharge Form with MeldRx Integration")
# with gr.Tab("Authenticate with MeldRx"):
# gr.Markdown("## SMART on FHIR Authentication")
# auth_url_output = gr.Textbox(label="Authorization URL", value=CALLBACK_MANAGER.get_auth_url(), interactive=False)
# gr.Markdown("Copy the URL above, open it in a browser, log in, and paste the 'code' from the redirect URL below.")
# auth_code_input = gr.Textbox(label="Authorization Code")
# auth_submit = gr.Button("Submit Code")
# auth_result = gr.Textbox(label="Authentication Result")
# patient_data_button = gr.Button("Fetch Patient Data")
# patient_data_output = gr.Textbox(label="Patient Data")
# auth_submit.click(fn=CALLBACK_MANAGER.set_auth_code, inputs=auth_code_input, outputs=auth_result)
# patient_data_button.click(fn=CALLBACK_MANAGER.get_patient_data, inputs=None, outputs=patient_data_output)
# with gr.Tab("Discharge Form"):
# gr.Markdown("## Select Patient")
# patient_dropdown = gr.Dropdown(choices=extractor.get_patient_ids(), label="Select Patient ID")
# populate_button = gr.Button("Populate Form with Patient Data")
# gr.Markdown("## Patient Details")
# with gr.Row():
# first_name = gr.Textbox(label="First Name")
# last_name = gr.Textbox(label="Last Name")
# middle_initial = gr.Textbox(label="Middle Initial")
# with gr.Row():
# dob = gr.Textbox(label="Date of Birth")
# age = gr.Textbox(label="Age")
# sex = gr.Textbox(label="Sex")
# address = gr.Textbox(label="Address")
# with gr.Row():
# city = gr.Textbox(label="City")
# state = gr.Textbox(label="State")
# zip_code = gr.Textbox(label="Zip Code")
# gr.Markdown("## Primary Healthcare Professional Details")
# with gr.Row():
# doctor_first_name = gr.Textbox(label="Doctor's First Name")
# doctor_last_name = gr.Textbox(label="Doctor's Last Name")
# doctor_middle_initial = gr.Textbox(label="Middle Initial")
# hospital_name = gr.Textbox(label="Hospital/Clinic Name")
# doctor_address = gr.Textbox(label="Address")
# with gr.Row():
# doctor_city = gr.Textbox(label="City")
# doctor_state = gr.Textbox(label="State")
# doctor_zip = gr.Textbox(label="Zip Code")
# gr.Markdown("## Admission and Discharge Details")
# with gr.Row():
# admission_date = gr.Textbox(label="Date of Admission")
# referral_source = gr.Textbox(label="Source of Referral")
# admission_method = gr.Textbox(label="Method of Admission")
# with gr.Row():
# discharge_date = gr.Textbox(label="Date of Discharge")
# discharge_reason = gr.Radio(["Treated", "Transferred", "Discharge Against Advice", "Patient Died"], label="Discharge Reason")
# date_of_death = gr.Textbox(label="Date of Death (if applicable)")
# gr.Markdown("## Diagnosis & Procedures")
# diagnosis = gr.Textbox(label="Diagnosis")
# procedures = gr.Textbox(label="Operation & Procedures")
# gr.Markdown("## Medication Details")
# medications = gr.Textbox(label="Medication on Discharge")
# gr.Markdown("## Prepared By")
# with gr.Row():
# preparer_name = gr.Textbox(label="Name")
# preparer_job_title = gr.Textbox(label="Job Title")
# submit = gr.Button("Generate Form")
# output = gr.Markdown()
# # Inputs list for populate_form and display_form
# inputs_list = [
# first_name, last_name, middle_initial, dob, age, sex, address, city, state, zip_code,
# doctor_first_name, doctor_last_name, doctor_middle_initial, hospital_name, doctor_address,
# doctor_city, doctor_state, doctor_zip,
# admission_date, referral_source, admission_method, discharge_date, discharge_reason, date_of_death,
# diagnosis, procedures, medications, preparer_name, preparer_job_title
# ]
# # Populate form with patient data when button is clicked
# populate_button.click(
# fn=populate_form,
# inputs=patient_dropdown,
# outputs=inputs_list
# )
# # Generate the form output
# submit.click(
# display_form,
# inputs=inputs_list,
# outputs=output
# )
# return demo
# # Assuming patient_data is the JSON string from your example
# # patient_data = <your JSON string here>
# # demo = integrate_with_app(patient_data)
# # demo.launch()