import json from datetime import datetime from typing import List, Dict, Optional, Union class PatientDataExtractor: """Class to extract all fields from a FHIR Patient resource in a Bundle response.""" def __init__(self, patient_data: str): """Initialize with patient data in JSON string format.""" self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data self.patients = self._extract_patients() self.current_patient_idx = 0 # Default to first patient def _extract_patients(self) -> List[Dict]: """Extract all patient entries from the Bundle.""" if self.data.get("resourceType") != "Bundle" or "entry" not in self.data: raise ValueError("Invalid FHIR Bundle format") return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"] def set_patient_by_index(self, index: int) -> bool: """Set the current patient by index. Returns True if successful.""" if 0 <= index < len(self.patients): self.current_patient_idx = index return True return False def set_patient_by_id(self, patient_id: str) -> bool: """Set the current patient by FHIR Patient ID. Returns True if successful.""" for i, patient in enumerate(self.patients): if patient["id"] == patient_id: self.current_patient_idx = i return True return False def _get_current_patient(self) -> Dict: """Get the currently selected patient resource.""" return self.patients[self.current_patient_idx] # Basic Identification Fields def get_id(self) -> str: """Extract FHIR Patient ID.""" return self._get_current_patient().get("id", "") def get_resource_type(self) -> str: """Extract resource type (should always be 'Patient').""" return self._get_current_patient().get("resourceType", "") def get_meta_last_updated(self) -> str: """Extract last updated timestamp from meta.""" return self._get_current_patient().get("meta", {}).get("lastUpdated", "") def get_meta_profile(self) -> List[str]: """Extract profile URIs from meta.""" return self._get_current_patient().get("meta", {}).get("profile", []) def get_text_div(self) -> str: """Extract generated text narrative (div content).""" return self._get_current_patient().get("text", {}).get("div", "") # Name Fields def get_first_name(self) -> str: """Extract patient's first name.""" patient = self._get_current_patient() names = patient.get("name", []) for name in names: if name.get("use") == "official" and "given" in name: return name["given"][0] return "" def get_last_name(self) -> str: """Extract patient's last name.""" patient = self._get_current_patient() names = patient.get("name", []) for name in names: if name.get("use") == "official" and "family" in name: return name["family"] return "" def get_middle_initial(self) -> str: """Extract patient's middle initial (second given name initial if present).""" patient = self._get_current_patient() names = patient.get("name", []) for name in names: if name.get("use") == "official" and "given" in name and len(name["given"]) > 1: return name["given"][1][0] return "" def get_name_prefix(self) -> str: """Extract patient's name prefix (e.g., Mr., Mrs.).""" patient = self._get_current_patient() names = patient.get("name", []) for name in names: if name.get("use") == "official" and "prefix" in name: return name["prefix"][0] return "" def get_maiden_name(self) -> str: """Extract patient's maiden name if available.""" patient = self._get_current_patient() names = patient.get("name", []) for name in names: if name.get("use") == "maiden" and "family" in name: return name["family"] return "" # Demographic Fields def get_dob(self) -> str: """Extract patient's date of birth.""" return self._get_current_patient().get("birthDate", "") def get_age(self) -> str: """Calculate patient's age based on birth date.""" dob = self.get_dob() if not dob: return "" birth_date = datetime.strptime(dob, "%Y-%m-%d") today = datetime.now() age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day)) return str(age) def get_gender(self) -> str: """Extract patient's gender.""" return self._get_current_patient().get("gender", "").capitalize() def get_birth_sex(self) -> str: """Extract patient's birth sex from extensions.""" patient = self._get_current_patient() for ext in patient.get("extension", []): if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex": return ext.get("valueCode", "") return "" def get_multiple_birth(self) -> Union[bool, None]: """Extract multiple birth status.""" return self._get_current_patient().get("multipleBirthBoolean", None) # Address Fields def get_address_line(self) -> str: """Extract patient's street address.""" patient = self._get_current_patient() addresses = patient.get("address", []) return addresses[0]["line"][0] if addresses and "line" in addresses[0] else "" def get_city(self) -> str: """Extract patient's city.""" patient = self._get_current_patient() addresses = patient.get("address", []) return addresses[0]["city"] if addresses and "city" in addresses[0] else "" def get_state(self) -> str: """Extract patient's state.""" patient = self._get_current_patient() addresses = patient.get("address", []) return addresses[0]["state"] if addresses and "state" in addresses[0] else "" def get_zip_code(self) -> str: """Extract patient's postal code.""" patient = self._get_current_patient() addresses = patient.get("address", []) return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else "" def get_country(self) -> str: """Extract patient's country.""" patient = self._get_current_patient() addresses = patient.get("address", []) return addresses[0]["country"] if addresses and "country" in addresses[0] else "" def get_geolocation(self) -> Dict[str, float]: """Extract geolocation (latitude and longitude) from address extension.""" patient = self._get_current_patient() addresses = patient.get("address", []) if not addresses: return {"latitude": None, "longitude": None} for ext in addresses[0].get("extension", []): if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/geolocation": geo = {} for sub_ext in ext.get("extension", []): if sub_ext.get("url") == "latitude": geo["latitude"] = sub_ext.get("valueDecimal") elif sub_ext.get("url") == "longitude": geo["longitude"] = sub_ext.get("valueDecimal") return geo return {"latitude": None, "longitude": None} # Contact Fields def get_phone(self) -> str: """Extract patient's phone number.""" patient = self._get_current_patient() telecoms = patient.get("telecom", []) for telecom in telecoms: if telecom.get("system") == "phone" and telecom.get("use") == "home": return telecom.get("value", "") return "" # Identifiers def get_identifiers(self) -> Dict[str, str]: """Extract all identifiers (e.g., SSN, MRN, Driver's License).""" patient = self._get_current_patient() identifiers = patient.get("identifier", []) id_dict = {} for id_entry in identifiers: id_type = id_entry.get("type", {}).get("text", "Unknown") id_dict[id_type] = id_entry.get("value", "") return id_dict # Extensions def get_race(self) -> str: """Extract patient's race from extensions.""" patient = self._get_current_patient() for ext in patient.get("extension", []): if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race": for sub_ext in ext.get("extension", []): if sub_ext.get("url") == "text": return sub_ext.get("valueString", "") return "" def get_ethnicity(self) -> str: """Extract patient's ethnicity from extensions.""" patient = self._get_current_patient() for ext in patient.get("extension", []): if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity": for sub_ext in ext.get("extension", []): if sub_ext.get("url") == "text": return sub_ext.get("valueString", "") return "" def get_mothers_maiden_name(self) -> str: """Extract patient's mother's maiden name from extensions.""" patient = self._get_current_patient() for ext in patient.get("extension", []): if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName": return ext.get("valueString", "") return "" def get_birth_place(self) -> Dict[str, str]: """Extract patient's birth place from extensions.""" patient = self._get_current_patient() for ext in patient.get("extension", []): if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/patient-birthPlace": addr = ext.get("valueAddress", {}) return { "city": addr.get("city", ""), "state": addr.get("state", ""), "country": addr.get("country", "") } return {"city": "", "state": "", "country": ""} def get_disability_adjusted_life_years(self) -> Optional[float]: """Extract disability-adjusted life years from extensions.""" patient = self._get_current_patient() for ext in patient.get("extension", []): if ext.get("url") == "http://synthetichealth.github.io/synthea/disability-adjusted-life-years": return ext.get("valueDecimal") return None def get_quality_adjusted_life_years(self) -> Optional[float]: """Extract quality-adjusted life years from extensions.""" patient = self._get_current_patient() for ext in patient.get("extension", []): if ext.get("url") == "http://synthetichealth.github.io/synthea/quality-adjusted-life-years": return ext.get("valueDecimal") return None # Marital Status def get_marital_status(self) -> str: """Extract patient's marital status.""" patient = self._get_current_patient() status = patient.get("maritalStatus", {}).get("text", "") return status if status else patient.get("maritalStatus", {}).get("coding", [{}])[0].get("display", "") # Communication def get_language(self) -> str: """Extract patient's preferred language.""" patient = self._get_current_patient() comms = patient.get("communication", []) return comms[0]["language"]["text"] if comms and "language" in comms[0] else "" # Comprehensive Extraction def get_all_patient_data(self) -> Dict[str, Union[str, Dict, List, float, bool, None]]: """Extract all available data for the current patient.""" return { "id": self.get_id(), "resource_type": self.get_resource_type(), "meta_last_updated": self.get_meta_last_updated(), "meta_profile": self.get_meta_profile(), "text_div": self.get_text_div(), "first_name": self.get_first_name(), "last_name": self.get_last_name(), "middle_initial": self.get_middle_initial(), "name_prefix": self.get_name_prefix(), "maiden_name": self.get_maiden_name(), "dob": self.get_dob(), "age": self.get_age(), "gender": self.get_gender(), "birth_sex": self.get_birth_sex(), "multiple_birth": self.get_multiple_birth(), "address_line": self.get_address_line(), "city": self.get_city(), "state": self.get_state(), "zip_code": self.get_zip_code(), "country": self.get_country(), "geolocation": self.get_geolocation(), "phone": self.get_phone(), "identifiers": self.get_identifiers(), "race": self.get_race(), "ethnicity": self.get_ethnicity(), "mothers_maiden_name": self.get_mothers_maiden_name(), "birth_place": self.get_birth_place(), "disability_adjusted_life_years": self.get_disability_adjusted_life_years(), "quality_adjusted_life_years": self.get_quality_adjusted_life_years(), "marital_status": self.get_marital_status(), "language": self.get_language() } def get_patient_dict(self) -> Dict[str, str]: """Return a dictionary of patient data mapped to discharge form fields (for app.py compatibility).""" patient_data = self.get_all_patient_data() return { "first_name": patient_data["first_name"], "last_name": patient_data["last_name"], "middle_initial": patient_data["middle_initial"], "dob": patient_data["dob"], "age": patient_data["age"], "sex": patient_data["gender"], "address": patient_data["address_line"], "city": patient_data["city"], "state": patient_data["state"], "zip_code": patient_data["zip_code"], "doctor_first_name": "", "doctor_last_name": "", "doctor_middle_initial": "", "hospital_name": "", "doctor_address": "", "doctor_city": "", "doctor_state": "", "doctor_zip": "", "admission_date": "", "referral_source": "", "admission_method": "", "discharge_date": "", "discharge_reason": "", "date_of_death": "", "diagnosis": "", "procedures": "", "medications": "", "preparer_name": "", "preparer_job_title": "" } def get_all_patients(self) -> List[Dict[str, str]]: """Return a list of dictionaries for all patients (for app.py).""" original_idx = self.current_patient_idx all_patients = [] for i in range(len(self.patients)): self.set_patient_by_index(i) all_patients.append(self.get_patient_dict()) self.set_patient_by_index(original_idx) return all_patients def get_patient_ids(self) -> List[str]: """Return a list of all patient IDs in the Bundle.""" return [patient["id"] for patient in self.patients] # # Example usage with integration into app.py # def integrate_with_app(patient_data: str): # """Integrate PatientDataExtractor with the Gradio app.""" # extractor = PatientDataExtractor(patient_data) # # Function to populate form with selected patient's data # def populate_form(patient_id: str): # if extractor.set_patient_by_id(patient_id): # patient_dict = extractor.get_patient_dict() # return list(patient_dict.values()) # Return values in order expected by display_form # return [""] * 28 # Return empty values if patient not found # # Modify the Gradio app to include patient selection # with gr.Blocks() as demo: # gr.Markdown("# Patient Discharge Form with MeldRx Integration") # with gr.Tab("Authenticate with MeldRx"): # gr.Markdown("## SMART on FHIR Authentication") # auth_url_output = gr.Textbox(label="Authorization URL", value=CALLBACK_MANAGER.get_auth_url(), interactive=False) # gr.Markdown("Copy the URL above, open it in a browser, log in, and paste the 'code' from the redirect URL below.") # auth_code_input = gr.Textbox(label="Authorization Code") # auth_submit = gr.Button("Submit Code") # auth_result = gr.Textbox(label="Authentication Result") # patient_data_button = gr.Button("Fetch Patient Data") # patient_data_output = gr.Textbox(label="Patient Data") # auth_submit.click(fn=CALLBACK_MANAGER.set_auth_code, inputs=auth_code_input, outputs=auth_result) # patient_data_button.click(fn=CALLBACK_MANAGER.get_patient_data, inputs=None, outputs=patient_data_output) # with gr.Tab("Discharge Form"): # gr.Markdown("## Select Patient") # patient_dropdown = gr.Dropdown(choices=extractor.get_patient_ids(), label="Select Patient ID") # populate_button = gr.Button("Populate Form with Patient Data") # gr.Markdown("## Patient Details") # with gr.Row(): # first_name = gr.Textbox(label="First Name") # last_name = gr.Textbox(label="Last Name") # middle_initial = gr.Textbox(label="Middle Initial") # with gr.Row(): # dob = gr.Textbox(label="Date of Birth") # age = gr.Textbox(label="Age") # sex = gr.Textbox(label="Sex") # address = gr.Textbox(label="Address") # with gr.Row(): # city = gr.Textbox(label="City") # state = gr.Textbox(label="State") # zip_code = gr.Textbox(label="Zip Code") # gr.Markdown("## Primary Healthcare Professional Details") # with gr.Row(): # doctor_first_name = gr.Textbox(label="Doctor's First Name") # doctor_last_name = gr.Textbox(label="Doctor's Last Name") # doctor_middle_initial = gr.Textbox(label="Middle Initial") # hospital_name = gr.Textbox(label="Hospital/Clinic Name") # doctor_address = gr.Textbox(label="Address") # with gr.Row(): # doctor_city = gr.Textbox(label="City") # doctor_state = gr.Textbox(label="State") # doctor_zip = gr.Textbox(label="Zip Code") # gr.Markdown("## Admission and Discharge Details") # with gr.Row(): # admission_date = gr.Textbox(label="Date of Admission") # referral_source = gr.Textbox(label="Source of Referral") # admission_method = gr.Textbox(label="Method of Admission") # with gr.Row(): # discharge_date = gr.Textbox(label="Date of Discharge") # discharge_reason = gr.Radio(["Treated", "Transferred", "Discharge Against Advice", "Patient Died"], label="Discharge Reason") # date_of_death = gr.Textbox(label="Date of Death (if applicable)") # gr.Markdown("## Diagnosis & Procedures") # diagnosis = gr.Textbox(label="Diagnosis") # procedures = gr.Textbox(label="Operation & Procedures") # gr.Markdown("## Medication Details") # medications = gr.Textbox(label="Medication on Discharge") # gr.Markdown("## Prepared By") # with gr.Row(): # preparer_name = gr.Textbox(label="Name") # preparer_job_title = gr.Textbox(label="Job Title") # submit = gr.Button("Generate Form") # output = gr.Markdown() # # Inputs list for populate_form and display_form # inputs_list = [ # first_name, last_name, middle_initial, dob, age, sex, address, city, state, zip_code, # doctor_first_name, doctor_last_name, doctor_middle_initial, hospital_name, doctor_address, # doctor_city, doctor_state, doctor_zip, # admission_date, referral_source, admission_method, discharge_date, discharge_reason, date_of_death, # diagnosis, procedures, medications, preparer_name, preparer_job_title # ] # # Populate form with patient data when button is clicked # populate_button.click( # fn=populate_form, # inputs=patient_dropdown, # outputs=inputs_list # ) # # Generate the form output # submit.click( # display_form, # inputs=inputs_list, # outputs=output # ) # return demo # # Assuming patient_data is the JSON string from your example # # patient_data = # # demo = integrate_with_app(patient_data) # # demo.launch()