|
import json |
|
from datetime import datetime |
|
from typing import List, Dict, Optional, Union |
|
|
|
class PatientDataExtractor: |
|
"""Class to extract all fields from a FHIR Patient resource in a Bundle response.""" |
|
|
|
def __init__(self, patient_data: str): |
|
"""Initialize with patient data in JSON string format.""" |
|
self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data |
|
self.patients = self._extract_patients() |
|
self.current_patient_idx = 0 |
|
|
|
def _extract_patients(self) -> List[Dict]: |
|
"""Extract all patient entries from the Bundle.""" |
|
if self.data.get("resourceType") != "Bundle" or "entry" not in self.data: |
|
raise ValueError("Invalid FHIR Bundle format") |
|
return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"] |
|
|
|
def set_patient_by_index(self, index: int) -> bool: |
|
"""Set the current patient by index. Returns True if successful.""" |
|
if 0 <= index < len(self.patients): |
|
self.current_patient_idx = index |
|
return True |
|
return False |
|
|
|
def set_patient_by_id(self, patient_id: str) -> bool: |
|
"""Set the current patient by FHIR Patient ID. Returns True if successful.""" |
|
for i, patient in enumerate(self.patients): |
|
if patient["id"] == patient_id: |
|
self.current_patient_idx = i |
|
return True |
|
return False |
|
|
|
def _get_current_patient(self) -> Dict: |
|
"""Get the currently selected patient resource.""" |
|
return self.patients[self.current_patient_idx] |
|
|
|
|
|
def get_id(self) -> str: |
|
"""Extract FHIR Patient ID.""" |
|
return self._get_current_patient().get("id", "") |
|
|
|
def get_resource_type(self) -> str: |
|
"""Extract resource type (should always be 'Patient').""" |
|
return self._get_current_patient().get("resourceType", "") |
|
|
|
def get_meta_last_updated(self) -> str: |
|
"""Extract last updated timestamp from meta.""" |
|
return self._get_current_patient().get("meta", {}).get("lastUpdated", "") |
|
|
|
def get_meta_profile(self) -> List[str]: |
|
"""Extract profile URIs from meta.""" |
|
return self._get_current_patient().get("meta", {}).get("profile", []) |
|
|
|
def get_text_div(self) -> str: |
|
"""Extract generated text narrative (div content).""" |
|
return self._get_current_patient().get("text", {}).get("div", "") |
|
|
|
|
|
def get_first_name(self) -> str: |
|
"""Extract patient's first name.""" |
|
patient = self._get_current_patient() |
|
names = patient.get("name", []) |
|
for name in names: |
|
if name.get("use") == "official" and "given" in name: |
|
return name["given"][0] |
|
return "" |
|
|
|
def get_last_name(self) -> str: |
|
"""Extract patient's last name.""" |
|
patient = self._get_current_patient() |
|
names = patient.get("name", []) |
|
for name in names: |
|
if name.get("use") == "official" and "family" in name: |
|
return name["family"] |
|
return "" |
|
|
|
def get_middle_initial(self) -> str: |
|
"""Extract patient's middle initial (second given name initial if present).""" |
|
patient = self._get_current_patient() |
|
names = patient.get("name", []) |
|
for name in names: |
|
if name.get("use") == "official" and "given" in name and len(name["given"]) > 1: |
|
return name["given"][1][0] |
|
return "" |
|
|
|
def get_name_prefix(self) -> str: |
|
"""Extract patient's name prefix (e.g., Mr., Mrs.).""" |
|
patient = self._get_current_patient() |
|
names = patient.get("name", []) |
|
for name in names: |
|
if name.get("use") == "official" and "prefix" in name: |
|
return name["prefix"][0] |
|
return "" |
|
|
|
def get_maiden_name(self) -> str: |
|
"""Extract patient's maiden name if available.""" |
|
patient = self._get_current_patient() |
|
names = patient.get("name", []) |
|
for name in names: |
|
if name.get("use") == "maiden" and "family" in name: |
|
return name["family"] |
|
return "" |
|
|
|
|
|
def get_dob(self) -> str: |
|
"""Extract patient's date of birth.""" |
|
return self._get_current_patient().get("birthDate", "") |
|
|
|
def get_age(self) -> str: |
|
"""Calculate patient's age based on birth date.""" |
|
dob = self.get_dob() |
|
if not dob: |
|
return "" |
|
birth_date = datetime.strptime(dob, "%Y-%m-%d") |
|
today = datetime.now() |
|
age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day)) |
|
return str(age) |
|
|
|
def get_gender(self) -> str: |
|
"""Extract patient's gender.""" |
|
return self._get_current_patient().get("gender", "").capitalize() |
|
|
|
def get_birth_sex(self) -> str: |
|
"""Extract patient's birth sex from extensions.""" |
|
patient = self._get_current_patient() |
|
for ext in patient.get("extension", []): |
|
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex": |
|
return ext.get("valueCode", "") |
|
return "" |
|
|
|
def get_multiple_birth(self) -> Union[bool, None]: |
|
"""Extract multiple birth status.""" |
|
return self._get_current_patient().get("multipleBirthBoolean", None) |
|
|
|
|
|
def get_address_line(self) -> str: |
|
"""Extract patient's street address.""" |
|
patient = self._get_current_patient() |
|
addresses = patient.get("address", []) |
|
return addresses[0]["line"][0] if addresses and "line" in addresses[0] else "" |
|
|
|
def get_city(self) -> str: |
|
"""Extract patient's city.""" |
|
patient = self._get_current_patient() |
|
addresses = patient.get("address", []) |
|
return addresses[0]["city"] if addresses and "city" in addresses[0] else "" |
|
|
|
def get_state(self) -> str: |
|
"""Extract patient's state.""" |
|
patient = self._get_current_patient() |
|
addresses = patient.get("address", []) |
|
return addresses[0]["state"] if addresses and "state" in addresses[0] else "" |
|
|
|
def get_zip_code(self) -> str: |
|
"""Extract patient's postal code.""" |
|
patient = self._get_current_patient() |
|
addresses = patient.get("address", []) |
|
return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else "" |
|
|
|
def get_country(self) -> str: |
|
"""Extract patient's country.""" |
|
patient = self._get_current_patient() |
|
addresses = patient.get("address", []) |
|
return addresses[0]["country"] if addresses and "country" in addresses[0] else "" |
|
|
|
def get_geolocation(self) -> Dict[str, float]: |
|
"""Extract geolocation (latitude and longitude) from address extension.""" |
|
patient = self._get_current_patient() |
|
addresses = patient.get("address", []) |
|
if not addresses: |
|
return {"latitude": None, "longitude": None} |
|
for ext in addresses[0].get("extension", []): |
|
if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/geolocation": |
|
geo = {} |
|
for sub_ext in ext.get("extension", []): |
|
if sub_ext.get("url") == "latitude": |
|
geo["latitude"] = sub_ext.get("valueDecimal") |
|
elif sub_ext.get("url") == "longitude": |
|
geo["longitude"] = sub_ext.get("valueDecimal") |
|
return geo |
|
return {"latitude": None, "longitude": None} |
|
|
|
|
|
def get_phone(self) -> str: |
|
"""Extract patient's phone number.""" |
|
patient = self._get_current_patient() |
|
telecoms = patient.get("telecom", []) |
|
for telecom in telecoms: |
|
if telecom.get("system") == "phone" and telecom.get("use") == "home": |
|
return telecom.get("value", "") |
|
return "" |
|
|
|
|
|
def get_identifiers(self) -> Dict[str, str]: |
|
"""Extract all identifiers (e.g., SSN, MRN, Driver's License).""" |
|
patient = self._get_current_patient() |
|
identifiers = patient.get("identifier", []) |
|
id_dict = {} |
|
for id_entry in identifiers: |
|
id_type = id_entry.get("type", {}).get("text", "Unknown") |
|
id_dict[id_type] = id_entry.get("value", "") |
|
return id_dict |
|
|
|
|
|
def get_race(self) -> str: |
|
"""Extract patient's race from extensions.""" |
|
patient = self._get_current_patient() |
|
for ext in patient.get("extension", []): |
|
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race": |
|
for sub_ext in ext.get("extension", []): |
|
if sub_ext.get("url") == "text": |
|
return sub_ext.get("valueString", "") |
|
return "" |
|
|
|
def get_ethnicity(self) -> str: |
|
"""Extract patient's ethnicity from extensions.""" |
|
patient = self._get_current_patient() |
|
for ext in patient.get("extension", []): |
|
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity": |
|
for sub_ext in ext.get("extension", []): |
|
if sub_ext.get("url") == "text": |
|
return sub_ext.get("valueString", "") |
|
return "" |
|
|
|
def get_mothers_maiden_name(self) -> str: |
|
"""Extract patient's mother's maiden name from extensions.""" |
|
patient = self._get_current_patient() |
|
for ext in patient.get("extension", []): |
|
if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName": |
|
return ext.get("valueString", "") |
|
return "" |
|
|
|
def get_birth_place(self) -> Dict[str, str]: |
|
"""Extract patient's birth place from extensions.""" |
|
patient = self._get_current_patient() |
|
for ext in patient.get("extension", []): |
|
if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/patient-birthPlace": |
|
addr = ext.get("valueAddress", {}) |
|
return { |
|
"city": addr.get("city", ""), |
|
"state": addr.get("state", ""), |
|
"country": addr.get("country", "") |
|
} |
|
return {"city": "", "state": "", "country": ""} |
|
|
|
def get_disability_adjusted_life_years(self) -> Optional[float]: |
|
"""Extract disability-adjusted life years from extensions.""" |
|
patient = self._get_current_patient() |
|
for ext in patient.get("extension", []): |
|
if ext.get("url") == "http://synthetichealth.github.io/synthea/disability-adjusted-life-years": |
|
return ext.get("valueDecimal") |
|
return None |
|
|
|
def get_quality_adjusted_life_years(self) -> Optional[float]: |
|
"""Extract quality-adjusted life years from extensions.""" |
|
patient = self._get_current_patient() |
|
for ext in patient.get("extension", []): |
|
if ext.get("url") == "http://synthetichealth.github.io/synthea/quality-adjusted-life-years": |
|
return ext.get("valueDecimal") |
|
return None |
|
|
|
|
|
def get_marital_status(self) -> str: |
|
"""Extract patient's marital status.""" |
|
patient = self._get_current_patient() |
|
status = patient.get("maritalStatus", {}).get("text", "") |
|
return status if status else patient.get("maritalStatus", {}).get("coding", [{}])[0].get("display", "") |
|
|
|
|
|
def get_language(self) -> str: |
|
"""Extract patient's preferred language.""" |
|
patient = self._get_current_patient() |
|
comms = patient.get("communication", []) |
|
return comms[0]["language"]["text"] if comms and "language" in comms[0] else "" |
|
|
|
|
|
def get_all_patient_data(self) -> Dict[str, Union[str, Dict, List, float, bool, None]]: |
|
"""Extract all available data for the current patient.""" |
|
return { |
|
"id": self.get_id(), |
|
"resource_type": self.get_resource_type(), |
|
"meta_last_updated": self.get_meta_last_updated(), |
|
"meta_profile": self.get_meta_profile(), |
|
"text_div": self.get_text_div(), |
|
"first_name": self.get_first_name(), |
|
"last_name": self.get_last_name(), |
|
"middle_initial": self.get_middle_initial(), |
|
"name_prefix": self.get_name_prefix(), |
|
"maiden_name": self.get_maiden_name(), |
|
"dob": self.get_dob(), |
|
"age": self.get_age(), |
|
"gender": self.get_gender(), |
|
"birth_sex": self.get_birth_sex(), |
|
"multiple_birth": self.get_multiple_birth(), |
|
"address_line": self.get_address_line(), |
|
"city": self.get_city(), |
|
"state": self.get_state(), |
|
"zip_code": self.get_zip_code(), |
|
"country": self.get_country(), |
|
"geolocation": self.get_geolocation(), |
|
"phone": self.get_phone(), |
|
"identifiers": self.get_identifiers(), |
|
"race": self.get_race(), |
|
"ethnicity": self.get_ethnicity(), |
|
"mothers_maiden_name": self.get_mothers_maiden_name(), |
|
"birth_place": self.get_birth_place(), |
|
"disability_adjusted_life_years": self.get_disability_adjusted_life_years(), |
|
"quality_adjusted_life_years": self.get_quality_adjusted_life_years(), |
|
"marital_status": self.get_marital_status(), |
|
"language": self.get_language() |
|
} |
|
|
|
def get_patient_dict(self) -> Dict[str, str]: |
|
"""Return a dictionary of patient data mapped to discharge form fields (for app.py compatibility).""" |
|
patient_data = self.get_all_patient_data() |
|
return { |
|
"first_name": patient_data["first_name"], |
|
"last_name": patient_data["last_name"], |
|
"middle_initial": patient_data["middle_initial"], |
|
"dob": patient_data["dob"], |
|
"age": patient_data["age"], |
|
"sex": patient_data["gender"], |
|
"address": patient_data["address_line"], |
|
"city": patient_data["city"], |
|
"state": patient_data["state"], |
|
"zip_code": patient_data["zip_code"], |
|
"doctor_first_name": "", |
|
"doctor_last_name": "", |
|
"doctor_middle_initial": "", |
|
"hospital_name": "", |
|
"doctor_address": "", |
|
"doctor_city": "", |
|
"doctor_state": "", |
|
"doctor_zip": "", |
|
"admission_date": "", |
|
"referral_source": "", |
|
"admission_method": "", |
|
"discharge_date": "", |
|
"discharge_reason": "", |
|
"date_of_death": "", |
|
"diagnosis": "", |
|
"procedures": "", |
|
"medications": "", |
|
"preparer_name": "", |
|
"preparer_job_title": "" |
|
} |
|
|
|
def get_all_patients(self) -> List[Dict[str, str]]: |
|
"""Return a list of dictionaries for all patients (for app.py).""" |
|
original_idx = self.current_patient_idx |
|
all_patients = [] |
|
for i in range(len(self.patients)): |
|
self.set_patient_by_index(i) |
|
all_patients.append(self.get_patient_dict()) |
|
self.set_patient_by_index(original_idx) |
|
return all_patients |
|
|
|
def get_patient_ids(self) -> List[str]: |
|
"""Return a list of all patient IDs in the Bundle.""" |
|
return [patient["id"] for patient in self.patients] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|