import json import lxml.etree as etree from datetime import datetime from typing import List, Dict, Optional, Union import base64 import logging # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class PatientDataExtractor: """Class to extract fields from FHIR Patient Bundle (JSON) or C-CDA (XML).""" def __init__(self, patient_data: str, format_type: str = None): """Initialize with patient data and optional format type.""" self.format = format_type.lower() if format_type else self._detect_format(patient_data) if self.format == "xml": self.data = etree.fromstring(patient_data.encode('utf-8')) if isinstance(patient_data, str) else patient_data self.ns = {'hl7': 'urn:hl7-org:v3'} elif self.format == "json": self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data else: raise ValueError("Unsupported format. Use 'xml' or 'json'") self.patients = self._extract_patients() self.current_patient_idx = 0 def _detect_format(self, data: str) -> str: """Detect the format of the input data.""" if isinstance(data, str): data = data.strip() if data.startswith('<'): return 'xml' elif data.startswith('{') or data.startswith('['): return 'json' raise ValueError("Cannot determine data format") def _extract_patients(self) -> List: """Extract all patient entries based on format.""" if self.format == "xml": return [self.data] # C-CDA has one patient per document elif self.format == "json": if self.data.get("resourceType") != "Bundle" or "entry" not in self.data: raise ValueError("Invalid FHIR Bundle format") return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"] def set_patient_by_index(self, index: int) -> bool: """Set the current patient by index.""" if 0 <= index < len(self.patients): self.current_patient_idx = index return True return False def _get_current_patient(self): """Get the currently selected patient resource.""" return self.patients[self.current_patient_idx] def get_id(self) -> str: patient = self._get_current_patient() if self.format == "xml": id_list = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:id/@extension", namespaces=self.ns) return id_list[0] if id_list else "" elif self.format == "json": # Check top-level 'id' first patient_id = patient.get("id", "") if patient_id: return patient_id # Fallback to 'identifier' field identifiers = patient.get("identifier", []) for identifier in identifiers: if identifier.get("value"): # Return the first non-empty identifier value return identifier["value"] return "" # Default to empty string if no ID found def get_resource_type(self) -> str: patient = self._get_current_patient() if self.format == "xml": return "ClinicalDocument" elif self.format == "json": return patient.get("resourceType", "") def get_meta_last_updated(self) -> str: patient = self._get_current_patient() if self.format == "xml": time_list = patient.xpath("//hl7:effectiveTime/@value", namespaces=self.ns) return time_list[0] if time_list else "" elif self.format == "json": return patient.get("meta", {}).get("lastUpdated", "") # Name Fields def get_first_name(self) -> str: patient = self._get_current_patient() if self.format == "xml": given = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:given/text()", namespaces=self.ns) return given[0] if given else "" elif self.format == "json": for name in patient.get("name", []): if name.get("use") == "official" and "given" in name: return name["given"][0] return "" def get_last_name(self) -> str: patient = self._get_current_patient() if self.format == "xml": family = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:family/text()", namespaces=self.ns) return family[0] if family else "" elif self.format == "json": for name in patient.get("name", []): if name.get("use") == "official" and "family" in name: return name["family"] return "" def get_name_prefix(self) -> str: patient = self._get_current_patient() if self.format == "xml": prefix = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:prefix/text()", namespaces=self.ns) return prefix[0] if prefix else "" elif self.format == "json": for name in patient.get("name", []): if name.get("use") == "official" and "prefix" in name: return name["prefix"][0] return "" # Demographic Fields def get_dob(self) -> str: patient = self._get_current_patient() if self.format == "xml": dob = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:birthTime/@value", namespaces=self.ns) return dob[0] if dob else "" elif self.format == "json": return patient.get("birthDate", "") def get_age(self) -> str: dob = self.get_dob() if not dob: return "" try: birth_date = datetime.strptime(dob[:8], "%Y%m%d") today = datetime.now() age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day)) return str(age) except ValueError: return "" def get_gender(self) -> str: patient = self._get_current_patient() if self.format == "xml": gender = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:administrativeGenderCode/@code", namespaces=self.ns) return "Male" if gender and gender[0] == "M" else "Female" if gender and gender[0] == "F" else "" elif self.format == "json": return patient.get("gender", "").capitalize() # Address Fields def get_address_line(self) -> str: patient = self._get_current_patient() if self.format == "xml": line = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:streetAddressLine/text()", namespaces=self.ns) return line[0] if line else "" elif self.format == "json": addresses = patient.get("address", []) return addresses[0]["line"][0] if addresses and "line" in addresses[0] else "" def get_city(self) -> str: patient = self._get_current_patient() if self.format == "xml": city = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:city/text()", namespaces=self.ns) return city[0] if city else "" elif self.format == "json": addresses = patient.get("address", []) return addresses[0]["city"] if addresses and "city" in addresses[0] else "" def get_state(self) -> str: patient = self._get_current_patient() if self.format == "xml": state = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:state/text()", namespaces=self.ns) return state[0] if state else "" elif self.format == "json": addresses = patient.get("address", []) return addresses[0]["state"] if addresses and "state" in addresses[0] else "" def get_zip_code(self) -> str: patient = self._get_current_patient() if self.format == "xml": zip = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:postalCode/text()", namespaces=self.ns) return zip[0] if zip else "" elif self.format == "json": addresses = patient.get("address", []) return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else "" # Contact Fields def get_phone(self) -> str: patient = self._get_current_patient() if self.format == "xml": telecom = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:telecom/@value", namespaces=self.ns) return telecom[0].replace("tel:", "") if telecom and "tel:" in telecom[0] else "" elif self.format == "json": for telecom in patient.get("telecom", []): if telecom.get("system") == "phone" and telecom.get("use") == "home": return telecom.get("value", "") return "" # Extensions and Additional Fields def get_race(self) -> str: patient = self._get_current_patient() if self.format == "xml": race = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:raceCode/@displayName", namespaces=self.ns) return race[0] if race else "" elif self.format == "json": for ext in patient.get("extension", []): if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race": for sub_ext in ext.get("extension", []): if sub_ext.get("url") == "text": return sub_ext.get("valueString", "") return "" def get_ethnicity(self) -> str: patient = self._get_current_patient() if self.format == "xml": ethnicity = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:ethnicGroupCode/@displayName", namespaces=self.ns) return ethnicity[0] if ethnicity else "" elif self.format == "json": for ext in patient.get("extension", []): if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity": for sub_ext in ext.get("extension", []): if sub_ext.get("url") == "text": return sub_ext.get("valueString", "") return "" def get_language(self) -> str: patient = self._get_current_patient() if self.format == "xml": lang = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:languageCommunication/hl7:languageCode/@code", namespaces=self.ns) return lang[0] if lang else "" elif self.format == "json": comms = patient.get("communication", []) if comms and "language" in comms[0]: lang = comms[0]["language"] # Try 'text' first, then fall back to 'coding' if available if "text" in lang: return lang["text"] elif "coding" in lang and lang["coding"]: return lang["coding"][0].get("display", lang["coding"][0].get("code", "")) return "" # Medications def get_medications(self) -> List[Dict[str, str]]: if self.format == "xml": section = self.data.xpath("//hl7:section[hl7:code/@code='10160-0']", namespaces=self.ns) if not section: return [] meds = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns) result = [] for med in meds: start_list = med.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) start = start_list[0] if start_list else "" stop_list = med.xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns) stop = stop_list[0] if stop_list else "" desc_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns) desc = desc_list[0] if desc_list else "" code_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns) code = code_list[0] if code_list else "" result.append({"start": start, "stop": stop, "description": desc, "code": code}) return result elif self.format == "json": entries = self.data.get("entry", []) result = [] for entry in entries: if entry["resource"]["resourceType"] == "MedicationRequest": med = entry["resource"] start = med.get("authoredOn", "") stop = med.get("dispenseRequest", {}).get("validityPeriod", {}).get("end", "") desc = med.get("medicationCodeableConcept", {}).get("text", "") code = med.get("medicationCodeableConcept", {}).get("coding", [{}])[0].get("code", "") result.append({"start": start, "stop": stop, "description": desc, "code": code}) return result # Encounters def get_encounters(self) -> List[Dict[str, str]]: if self.format == "xml": service = self.data.xpath("//hl7:documentationOf/hl7:serviceEvent", namespaces=self.ns) if not service: return [] start_list = service[0].xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) start = start_list[0] if start_list else "" end_list = service[0].xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns) end = end_list[0] if end_list else "" return [{"start": start, "end": end, "description": "Patient Care", "code": ""}] elif self.format == "json": entries = self.data.get("entry", []) result = [] for entry in entries: if entry["resource"]["resourceType"] == "Encounter": enc = entry["resource"] start = enc.get("period", {}).get("start", "") end = enc.get("period", {}).get("end", "") desc = enc.get("type", [{}])[0].get("text", "") code = enc.get("type", [{}])[0].get("coding", [{}])[0].get("code", "") result.append({"start": start, "end": end, "description": desc, "code": code}) return result # Conditions/Diagnoses def get_conditions(self) -> List[Dict[str, str]]: if self.format == "xml": section = self.data.xpath("//hl7:section[hl7:code/@code='11450-4']", namespaces=self.ns) if not section: return [] entries = section[0].xpath(".//hl7:entry/hl7:act/hl7:entryRelationship/hl7:observation", namespaces=self.ns) if section else [] result = [] for entry in entries: onset_list = entry.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) onset = onset_list[0] if onset_list else "" desc_list = entry.xpath(".//hl7:value/@displayName", namespaces=self.ns) desc = desc_list[0] if desc_list else "" code_list = entry.xpath(".//hl7:value/@code", namespaces=self.ns) code = code_list[0] if code_list else "" result.append({"onset": onset, "description": desc, "code": code}) return result elif self.format == "json": entries = self.data.get("entry", []) result = [] for entry in entries: if entry["resource"]["resourceType"] == "Condition": cond = entry["resource"] onset = cond.get("onsetDateTime", "") desc = cond.get("code", {}).get("text", "") code = cond.get("code", {}).get("coding", [{}])[0].get("code", "") result.append({"onset": onset, "description": desc, "code": code}) return result # Immunizations def get_immunizations(self) -> List[Dict[str, str]]: if self.format == "xml": section = self.data.xpath("//hl7:section[hl7:code/@code='11369-6']", namespaces=self.ns) if not section: return [] immunizations = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns) result = [] for imm in immunizations: date_list = imm.xpath(".//hl7:effectiveTime/@value", namespaces=self.ns) date = date_list[0] if date_list else "" desc_list = imm.xpath(".//hl7:consumable/hl7:manufacturedProduct/hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns) desc = desc_list[0] if desc_list else "" code_list = imm.xpath(".//hl7:consumable/hl7:manufacturedProduct/hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns) code = code_list[0] if code_list else "" result.append({"date": date, "description": desc, "code": code}) return result elif self.format == "json": entries = self.data.get("entry", []) result = [] for entry in entries: if entry["resource"]["resourceType"] == "Immunization": imm = entry["resource"] date = imm.get("occurrenceDateTime", "") desc = imm.get("vaccineCode", {}).get("text", "") code = imm.get("vaccineCode", {}).get("coding", [{}])[0].get("code", "") result.append({"date": date, "description": desc, "code": code}) return result # Diagnostic Reports def get_diagnostic_reports(self) -> List[Dict[str, str]]: if self.format == "xml": section = self.data.xpath("//hl7:section[hl7:code/@code='30954-2']", namespaces=self.ns) if not section: return [] reports = section[0].xpath(".//hl7:organizer", namespaces=self.ns) result = [] for report in reports: start_list = report.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) start = start_list[0] if start_list else "" desc_list = report.xpath(".//hl7:code/@displayName", namespaces=self.ns) desc = desc_list[0] if desc_list else "" code_list = report.xpath(".//hl7:code/@code", namespaces=self.ns) code = code_list[0] if code_list else "" result.append({"start": start, "description": desc, "code": code}) return result elif self.format == "json": entries = self.data.get("entry", []) result = [] for entry in entries: if entry["resource"]["resourceType"] == "DiagnosticReport": report = entry["resource"] start = report.get("effectiveDateTime", "") desc = report.get("code", {}).get("text", "") code = report.get("code", {}).get("coding", [{}])[0].get("code", "") data = report.get("presentedForm", [{}])[0].get("data", "") if data: decoded = base64.b64decode(data).decode('utf-8') result.append({"start": start, "description": desc, "code": code, "content": decoded}) else: result.append({"start": start, "description": desc, "code": code}) return result # Comprehensive Extraction def get_all_patient_data(self) -> Dict[str, Union[str, List, Dict]]: """Extract all available data for the current patient.""" return { "id": self.get_id(), "resource_type": self.get_resource_type(), "meta_last_updated": self.get_meta_last_updated(), "first_name": self.get_first_name(), "last_name": self.get_last_name(), "name_prefix": self.get_name_prefix(), "dob": self.get_dob(), "age": self.get_age(), "gender": self.get_gender(), "address_line": self.get_address_line(), "city": self.get_city(), "state": self.get_state(), "zip_code": self.get_zip_code(), "phone": self.get_phone(), "race": self.get_race(), "ethnicity": self.get_ethnicity(), "language": self.get_language(), "medications": self.get_medications(), "encounters": self.get_encounters(), "conditions": self.get_conditions(), "immunizations": self.get_immunizations(), "diagnostic_reports": self.get_diagnostic_reports() } def get_patient_dict(self) -> Dict[str, str]: """Return a dictionary of patient data mapped to discharge form fields.""" data = self.get_all_patient_data() latest_encounter = data["encounters"][-1] if data["encounters"] else {} latest_condition = data["conditions"][-1] if data["conditions"] else {} medications_str = "; ".join([m["description"] for m in data["medications"]]) return { "first_name": data["first_name"], "last_name": data["last_name"], "middle_initial": "", "dob": data["dob"], "age": data["age"], "sex": data["gender"], "address": data["address_line"], "city": data["city"], "state": data["state"], "zip_code": data["zip_code"], "doctor_first_name": "", "doctor_last_name": "", "doctor_middle_initial": "", "hospital_name": "", "doctor_address": "", "doctor_city": "", "doctor_state": "", "doctor_zip": "", "admission_date": latest_encounter.get("start", ""), "referral_source": "", "admission_method": "", "discharge_date": latest_encounter.get("end", ""), "discharge_reason": "", "date_of_death": "", "diagnosis": latest_condition.get("description", ""), "procedures": "", "medications": medications_str, "preparer_name": "", "preparer_job_title": "" } def get_all_patients(self) -> List[Dict[str, str]]: """Return a list of dictionaries for all patients.""" original_idx = self.current_patient_idx all_patients = [] for i in range(len(self.patients)): self.set_patient_by_index(i) all_patients.append(self.get_patient_dict()) self.set_patient_by_index(original_idx) return all_patients def get_patient_ids(self) -> List[str]: """Return a list of all patient IDs.""" return [self.get_id() for _ in self.patients]