|
import lxml.etree as etree |
|
from datetime import datetime |
|
from typing import List, Dict, Optional, Union |
|
|
|
class PatientDataExtractor: |
|
"""Class to extract all fields from a FHIR Patient resource in a Bundle response (XML format).""" |
|
|
|
def __init__(self, patient_data: str): |
|
"""Initialize with patient data in XML string format.""" |
|
|
|
self.data = etree.fromstring(patient_data) if isinstance(patient_data, str) else patient_data |
|
|
|
self.ns = {'fhir': 'http://hl7.org/fhir'} |
|
self.patients = self._extract_patients() |
|
self.current_patient_idx = 0 |
|
|
|
def _extract_patients(self) -> List[etree._Element]: |
|
"""Extract all patient entries from the Bundle.""" |
|
|
|
return self.data.xpath("//fhir:entry/fhir:resource/fhir:Patient", namespaces=self.ns) |
|
|
|
def set_patient_by_index(self, index: int) -> bool: |
|
"""Set the current patient by index. Returns True if successful.""" |
|
if 0 <= index < len(self.patients): |
|
self.current_patient_idx = index |
|
return True |
|
return False |
|
|
|
def set_patient_by_id(self, patient_id: str) -> bool: |
|
"""Set the current patient by FHIR Patient ID. Returns True if successful.""" |
|
for i, patient in enumerate(self.patients): |
|
if patient.attrib.get("id") == patient_id: |
|
self.current_patient_idx = i |
|
return True |
|
return False |
|
|
|
def _get_current_patient(self) -> etree._Element: |
|
"""Get the currently selected patient resource.""" |
|
return self.patients[self.current_patient_idx] |
|
|
|
|
|
def get_id(self) -> str: |
|
"""Extract FHIR Patient ID.""" |
|
return self._get_current_patient().attrib.get("id", "") |
|
|
|
def get_resource_type(self) -> str: |
|
"""Extract resource type (should always be 'Patient').""" |
|
return etree.QName(self._get_current_patient().tag).localname |
|
|
|
def get_meta_last_updated(self) -> str: |
|
"""Extract last updated timestamp from meta.""" |
|
patient = self._get_current_patient() |
|
last_updated = patient.xpath("fhir:meta/fhir:lastUpdated/@value", namespaces=self.ns) |
|
return last_updated[0] if last_updated else "" |
|
|
|
def get_meta_profile(self) -> List[str]: |
|
"""Extract profile URIs from meta.""" |
|
patient = self._get_current_patient() |
|
profiles = patient.xpath("fhir:meta/fhir:profile/@value", namespaces=self.ns) |
|
return profiles |
|
|
|
def get_text_div(self) -> str: |
|
"""Extract generated text narrative (div content).""" |
|
patient = self._get_current_patient() |
|
div = patient.xpath("fhir:text/fhir:div", namespaces=self.ns) |
|
if div: |
|
return etree.tostring(div[0], encoding="unicode") |
|
return "" |
|
|
|
|
|
def get_first_name(self) -> str: |
|
"""Extract patient's first name.""" |
|
patient = self._get_current_patient() |
|
official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns) |
|
if official_names: |
|
given = official_names[0].xpath("fhir:given/@value", namespaces=self.ns) |
|
if given: |
|
return given[0] |
|
return "" |
|
|
|
def get_last_name(self) -> str: |
|
"""Extract patient's last name.""" |
|
patient = self._get_current_patient() |
|
official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns) |
|
if official_names: |
|
family = official_names[0].xpath("fhir:family/@value", namespaces=self.ns) |
|
if family: |
|
return family[0] |
|
return "" |
|
|
|
def get_middle_initial(self) -> str: |
|
"""Extract patient's middle initial (second given name initial if present).""" |
|
patient = self._get_current_patient() |
|
official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns) |
|
if official_names: |
|
given = official_names[0].xpath("fhir:given/@value", namespaces=self.ns) |
|
if len(given) > 1: |
|
return given[1][0] |
|
return "" |
|
|
|
def get_name_prefix(self) -> str: |
|
"""Extract patient's name prefix (e.g., Mr., Mrs.).""" |
|
patient = self._get_current_patient() |
|
official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns) |
|
if official_names: |
|
prefix = official_names[0].xpath("fhir:prefix/@value", namespaces=self.ns) |
|
if prefix: |
|
return prefix[0] |
|
return "" |
|
|
|
def get_maiden_name(self) -> str: |
|
"""Extract patient's maiden name if available.""" |
|
patient = self._get_current_patient() |
|
maiden_names = patient.xpath("fhir:name[fhir:use/@value='maiden']", namespaces=self.ns) |
|
if maiden_names: |
|
family = maiden_names[0].xpath("fhir:family/@value", namespaces=self.ns) |
|
if family: |
|
return family[0] |
|
return "" |
|
|
|
|
|
def get_dob(self) -> str: |
|
"""Extract patient's date of birth.""" |
|
patient = self._get_current_patient() |
|
dob = patient.xpath("fhir:birthDate/@value", namespaces=self.ns) |
|
return dob[0] if dob else "" |
|
|
|
def get_age(self) -> str: |
|
"""Calculate patient's age based on birth date.""" |
|
dob = self.get_dob() |
|
if not dob: |
|
return "" |
|
birth_date = datetime.strptime(dob, "%Y-%m-%d") |
|
today = datetime.now() |
|
age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day)) |
|
return str(age) |
|
|
|
def get_gender(self) -> str: |
|
"""Extract patient's gender.""" |
|
patient = self._get_current_patient() |
|
gender = patient.xpath("fhir:gender/@value", namespaces=self.ns) |
|
return gender[0].capitalize() if gender else "" |
|
|
|
def get_birth_sex(self) -> str: |
|
"""Extract patient's birth sex from extensions.""" |
|
patient = self._get_current_patient() |
|
birth_sex = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex']/fhir:valueCode/@value", namespaces=self.ns) |
|
return birth_sex[0] if birth_sex else "" |
|
|
|
def get_multiple_birth(self) -> Union[bool, None]: |
|
"""Extract multiple birth status.""" |
|
patient = self._get_current_patient() |
|
multiple_birth = patient.xpath("fhir:multipleBirthBoolean/@value", namespaces=self.ns) |
|
return multiple_birth[0] == "true" if multiple_birth else None |
|
|
|
|
|
def get_address_line(self) -> str: |
|
"""Extract patient's street address.""" |
|
patient = self._get_current_patient() |
|
line = patient.xpath("fhir:address/fhir:line/@value", namespaces=self.ns) |
|
return line[0] if line else "" |
|
|
|
def get_city(self) -> str: |
|
"""Extract patient's city.""" |
|
patient = self._get_current_patient() |
|
city = patient.xpath("fhir:address/fhir:city/@value", namespaces=self.ns) |
|
return city[0] if city else "" |
|
|
|
def get_state(self) -> str: |
|
"""Extract patient's state.""" |
|
patient = self._get_current_patient() |
|
state = patient.xpath("fhir:address/fhir:state/@value", namespaces=self.ns) |
|
return state[0] if state else "" |
|
|
|
def get_zip_code(self) -> str: |
|
"""Extract patient's postal code.""" |
|
patient = self._get_current_patient() |
|
postal_code = patient.xpath("fhir:address/fhir:postalCode/@value", namespaces=self.ns) |
|
return postal_code[0] if postal_code else "" |
|
|
|
def get_country(self) -> str: |
|
"""Extract patient's country.""" |
|
patient = self._get_current_patient() |
|
country = patient.xpath("fhir:address/fhir:country/@value", namespaces=self.ns) |
|
return country[0] if country else "" |
|
|
|
def get_geolocation(self) -> Dict[str, float]: |
|
"""Extract geolocation (latitude and longitude) from address extension.""" |
|
patient = self._get_current_patient() |
|
lat = patient.xpath("fhir:address/fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/geolocation']/fhir:extension[@url='latitude']/fhir:valueDecimal/@value", namespaces=self.ns) |
|
lon = patient.xpath("fhir:address/fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/geolocation']/fhir:extension[@url='longitude']/fhir:valueDecimal/@value", namespaces=self.ns) |
|
return { |
|
"latitude": float(lat[0]) if lat else None, |
|
"longitude": float(lon[0]) if lon else None |
|
} |
|
|
|
|
|
def get_phone(self) -> str: |
|
"""Extract patient's phone number.""" |
|
patient = self._get_current_patient() |
|
phone = patient.xpath("fhir:telecom[fhir:system/@value='phone' and fhir:use/@value='home']/fhir:value/@value", namespaces=self.ns) |
|
return phone[0] if phone else "" |
|
|
|
|
|
def get_identifiers(self) -> Dict[str, str]: |
|
"""Extract all identifiers (e.g., SSN, MRN, Driver's License).""" |
|
patient = self._get_current_patient() |
|
id_dict = {} |
|
identifiers = patient.xpath("fhir:identifier", namespaces=self.ns) |
|
for id_entry in identifiers: |
|
id_type = id_entry.xpath("fhir:type/fhir:text/@value", namespaces=self.ns) |
|
id_value = id_entry.xpath("fhir:value/@value", namespaces=self.ns) |
|
if id_type and id_value: |
|
id_dict[id_type[0]] = id_value[0] |
|
return id_dict |
|
|
|
|
|
def get_race(self) -> str: |
|
"""Extract patient's race from extensions.""" |
|
patient = self._get_current_patient() |
|
race = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-race']/fhir:extension[@url='text']/fhir:valueString/@value", namespaces=self.ns) |
|
return race[0] if race else "" |
|
|
|
def get_ethnicity(self) -> str: |
|
"""Extract patient's ethnicity from extensions.""" |
|
patient = self._get_current_patient() |
|
ethnicity = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity']/fhir:extension[@url='text']/fhir:valueString/@value", namespaces=self.ns) |
|
return ethnicity[0] if ethnicity else "" |
|
|
|
def get_mothers_maiden_name(self) -> str: |
|
"""Extract patient's mother's maiden name from extensions.""" |
|
patient = self._get_current_patient() |
|
mothers_maiden = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName']/fhir:valueString/@value", namespaces=self.ns) |
|
return mothers_maiden[0] if mothers_maiden else "" |
|
|
|
def get_birth_place(self) -> Dict[str, str]: |
|
"""Extract patient's birth place from extensions.""" |
|
patient = self._get_current_patient() |
|
birth_place = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/patient-birthPlace']/fhir:valueAddress", namespaces=self.ns) |
|
if birth_place: |
|
city = birth_place[0].xpath("fhir:city/@value", namespaces=self.ns) |
|
state = birth_place[0].xpath("fhir:state/@value", namespaces=self.ns) |
|
country = birth_place[0].xpath("fhir:country/@value", namespaces=self.ns) |
|
return { |
|
"city": city[0] if city else "", |
|
"state": state[0] if state else "", |
|
"country": country[0] if country else "" |
|
} |
|
return {"city": "", "state": "", "country": ""} |
|
|
|
def get_disability_adjusted_life_years(self) -> Optional[float]: |
|
"""Extract disability-adjusted life years from extensions.""" |
|
patient = self._get_current_patient() |
|
daly = patient.xpath("fhir:extension[@url='http://synthetichealth.github.io/synthea/disability-adjusted-life-years']/fhir:valueDecimal/@value", namespaces=self.ns) |
|
return float(daly[0]) if daly else None |
|
|
|
def get_quality_adjusted_life_years(self) -> Optional[float]: |
|
"""Extract quality-adjusted life years from extensions.""" |
|
patient = self._get_current_patient() |
|
qaly = patient.xpath("fhir:extension[@url='http://synthetichealth.github.io/synthea/quality-adjusted-life-years']/fhir:valueDecimal/@value", namespaces=self.ns) |
|
return float(qaly[0]) if qaly else None |
|
|
|
|
|
def get_marital_status(self) -> str: |
|
"""Extract patient's marital status.""" |
|
patient = self._get_current_patient() |
|
status = patient.xpath("fhir:maritalStatus/fhir:text/@value", namespaces=self.ns) |
|
if status: |
|
return status[0] |
|
coding = patient.xpath("fhir:maritalStatus/fhir:coding/fhir:display/@value", namespaces=self.ns) |
|
return coding[0] if coding else "" |
|
|
|
|
|
def get_language(self) -> str: |
|
"""Extract patient's preferred language.""" |
|
patient = self._get_current_patient() |
|
language = patient.xpath("fhir:communication/fhir:language/fhir:text/@value", namespaces=self.ns) |
|
return language[0] if language else "" |
|
|
|
|
|
def get_all_patient_data(self) -> Dict[str, Union[str, Dict, List, float, bool, None]]: |
|
"""Extract all available data for the current patient.""" |
|
return { |
|
"id": self.get_id(), |
|
"resource_type": self.get_resource_type(), |
|
"meta_last_updated": self.get_meta_last_updated(), |
|
"meta_profile": self.get_meta_profile(), |
|
"text_div": self.get_text_div(), |
|
"first_name": self.get_first_name(), |
|
"last_name": self.get_last_name(), |
|
"middle_initial": self.get_middle_initial(), |
|
"name_prefix": self.get_name_prefix(), |
|
"maiden_name": self.get_maiden_name(), |
|
"dob": self.get_dob(), |
|
"age": self.get_age(), |
|
"gender": self.get_gender(), |
|
"birth_sex": self.get_birth_sex(), |
|
"multiple_birth": self.get_multiple_birth(), |
|
"address_line": self.get_address_line(), |
|
"city": self.get_city(), |
|
"state": self.get_state(), |
|
"zip_code": self.get_zip_code(), |
|
"country": self.get_country(), |
|
"geolocation": self.get_geolocation(), |
|
"phone": self.get_phone(), |
|
"identifiers": self.get_identifiers(), |
|
"race": self.get_race(), |
|
"ethnicity": self.get_ethnicity(), |
|
"mothers_maiden_name": self.get_mothers_maiden_name(), |
|
"birth_place": self.get_birth_place(), |
|
"disability_adjusted_life_years": self.get_disability_adjusted_life_years(), |
|
"quality_adjusted_life_years": self.get_quality_adjusted_life_years(), |
|
"marital_status": self.get_marital_status(), |
|
"language": self.get_language() |
|
} |
|
|
|
def get_patient_dict(self) -> Dict[str, str]: |
|
"""Return a dictionary of patient data mapped to discharge form fields (for app.py compatibility).""" |
|
patient_data = self.get_all_patient_data() |
|
return { |
|
"first_name": patient_data["first_name"], |
|
"last_name": patient_data["last_name"], |
|
"middle_initial": patient_data["middle_initial"], |
|
"dob": patient_data["dob"], |
|
"age": patient_data["age"], |
|
"sex": patient_data["gender"], |
|
"address": patient_data["address_line"], |
|
"city": patient_data["city"], |
|
"state": patient_data["state"], |
|
"zip_code": patient_data["zip_code"], |
|
"doctor_first_name": "", |
|
"doctor_last_name": "", |
|
"doctor_middle_initial": "", |
|
"hospital_name": "", |
|
"doctor_address": "", |
|
"doctor_city": "", |
|
"doctor_state": "", |
|
"doctor_zip": "", |
|
"admission_date": "", |
|
"referral_source": "", |
|
"admission_method": "", |
|
"discharge_date": "", |
|
"discharge_reason": "", |
|
"date_of_death": "", |
|
"diagnosis": "", |
|
"procedures": "", |
|
"medications": "", |
|
"preparer_name": "", |
|
"preparer_job_title": "" |
|
} |
|
|
|
def get_all_patients(self) -> List[Dict[str, str]]: |
|
"""Return a list of dictionaries for all patients (for app.py).""" |
|
original_idx = self.current_patient_idx |
|
all_patients = [] |
|
for i in range(len(self.patients)): |
|
self.set_patient_by_index(i) |
|
all_patients.append(self.get_patient_dict()) |
|
self.set_patient_by_index(original_idx) |
|
return all_patients |
|
|
|
def get_patient_ids(self) -> List[str]: |
|
"""Return a list of all patient IDs in the Bundle.""" |
|
return [patient.attrib.get("id", "") for patient in self.patients] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|