Source code for ecgprocess.utils.reader_tools

"""
Tools to help read and process ECG files.

Functions and classes are predominantly aimed to map ECG files to native
python objects such as dictionary and systematically process these. Functions
specifically focussed on ECG signals, such as calculating the limb leads,
are collected in `ecgprocess.utils.ecg_tools`.
"""

import re
import pydicom
import warnings
import xmltodict
import numpy as np
from lxml import etree
from pathlib import Path
from typing import (
    Optional, Self, Any, Literal, Type,
)
from ecgprocess.utils.general import(
    parse_number,
)
from ecgprocess.constants import (
    CoreData as Core,
    UtilsReaderNames as RNames,
)
from ecgprocess.errors import (
    is_type,
    MissingTagError,
    Error_MSG,
)
from ecgprocess.constants import UtilsGeneralNames as UGNames
from ecgprocess.errors import XMLValidationError

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# constants
WARN1 = "XML file is valid (ignoring XML elements not in the supplied XSD)."
CTypes = Core.DataTypes
CProc = Core.ProcessingData

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Base class with slots
[docs] class BaseReader: ''' An ECGDICOMReader base class implementing the more efficient __slots__ for the waveform arrays, while still retaining __dict__ dynamic attribute creation. ''' __slots__ = (CTypes.WaveForms, CTypes.MedianBeats, CTypes.MetaData, CTypes.OtherData, CProc.RAW) # ///////////////////////////////////////////////////////////////////////// def __init__(self, waveforms=None, medianbeats=None, metadata=None, otherdata=None, raw=None): ''' Initialises slots entries to `None`. ''' setattr(self, CTypes.WaveForms, waveforms) setattr(self, CTypes.MedianBeats,medianbeats) setattr(self, CTypes.MetaData,metadata) setattr(self, CTypes.OtherData, otherdata) setattr(self, CProc.RAW,raw)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def validate_xml(xml_path:str|Path, xsd_path:str|Path, strict:bool=True, verbose:bool=True, ) -> etree._ElementTree: """ Validates an XML file against an XSD schema. Parameters ---------- xml_path : `str` or `Path` Path to the XML file. xsd_path : `str` or `Path` Path to the XSD file. strict : `bool`, default `True` If False, ignores elements in the XML that are not in the XSD. Returns ------- etree._ElementTree The parsed XML document. Raises ------ XMLValidationError Raised if the XSD and XML are incompatible. """ # #### Check input is_type(strict, bool) is_type(verbose, bool) is_type(xml_path, (str, Path)) is_type(xsd_path, (str, Path)) # #### Load the XSD schema with open(xsd_path, 'rb') as xsd: schema_root = etree.XML(xsd.read()) schema = etree.XMLSchema(schema_root) # #### Parse the XML file try: with open(xml_path, 'rb') as xml: xml_doc = etree.parse(xml) except etree.XMLSyntaxError: parser = etree.XMLParser(resolve_entities=False, huge_tree=True) with open(xml_path, 'rb') as xml: xml_doc = etree.parse(xml, parser=parser,) # #### Compare XML against XSD is_valid = schema.validate(xml_doc) # #### Do we want to raise errors when XML elements are absent in the XSD if not is_valid and not strict: # if strict is False remove error relating to missing XSD elements errors = [err.message for err in schema.error_log] filtered_errs = [err for err in schema.error_log if\ not UGNames.LXML_ERROR in err.message] # check if there are any other type of errors is_valid = len(filtered_errs) == 0 if is_valid: # Where there any errors to begin with if verbose == True and len(schema.error_log) > 0: warnings.warn(WARN1) else: errors = [err.message for err in filtered_errs] raise XMLValidationError(errors) elif is_valid == False: errors = [err.message for err in schema.error_log] raise XMLValidationError(errors) # return xml_doc return xml_doc
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def xml_to_dict(xml_doc:etree._ElementTree, encoding:str='utf-8', ) -> dict[str, Any]: """ Converts an lxml ElementTree document to a dictionary. Parameters ---------- xml_doc : `etree._ElementTree` A validated XML document. encoding : `str`, default `utf-8` Returns ------- dict [`str`, `any`] A dictionary representation of the XML data. """ is_type(encoding,str) is_type(xml_doc, etree._ElementTree) # convert to string xml_str = etree.tostring(xml_doc, encoding=encoding) # parste to dict xml_dict = xmltodict.parse(xml_str) # returning stuf. return xml_dict
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def dicom_to_dict(ds: pydicom.dataset.Dataset) -> dict[str, Any]: """ Turn a pydicom Dataset into a dict with keys derived from the Element names. Parameters ---------- ds : `pydicom.dataset.Dataset` The DICOM dataset to convert. Returns ------- dict [`str`, `any`] A dictionary representation of the dataset. """ output = {} for elem in ds: # SQ: sequences should be dealt with through recursion # Each sequence element contains a list of sub-elements # (nested datasets), which may themselves have multiple elements. if elem.VR != "SQ": output[elem.name] = elem.value else: output[elem.name] = [dicom_to_dict(item) for item in elem] # Extract waveform data if available if hasattr(ds, RNames.DICOM_WAVE_ARRAY): # Attempt to extract waveform data if the method exists waveform_data = {} # Extract the waveform data try: waveform_data["ECG_Leads"] = ds.waveform_array(0).T.tolist() except (IndexError, AttributeError): pass # Extract median beats try: waveform_data["Median_Beats"] = ds.waveform_array(1).T.tolist() except (IndexError, AttributeError): pass if waveform_data: output["WaveformData"] = waveform_data # return return output
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def flatten_dict(d:dict[str, Any], parent_prefix:str='', sep:str='.', skip_root:bool=True, ) -> dict[str, Any]: """ Recursively flatten a nested dictionary, optionally skipping the root element. Parameters ---------- d : dict The dictionary to flatten. parent_prefix : str, default '' The base string added as a prefix to all keys during recursion. Useful for maintaining context or indicating a higher-level structure. sep : str, default '.' The key separator skip_root : bool, default True If True, skips the first level (root) key. Returns ------- dict A flattened dictionary where nested keys are concatenated into a single key. Examples -------- >>> nested_dict = { ... 'a': { ... 'b': 1, ... 'c': { ... 'd': 2 ... } ... } ... } >>> flatten_dict(nested_dict) {'b': 1, 'c.d': 2} >>> flatten_dict(nested_dict, skip_root=False) {'a.b': 1, 'a.c.d': 2} >>> flatten_dict(nested_dict, sep='_', skip_root=False) {'a_b': 1, 'a_c_d': 2} """ # check input is_type(d, dict) is_type(parent_prefix, str) is_type(sep, str) is_type(skip_root, bool) # initiate empty results res = [] # If skipping the root, adjust d to be the first nested dictionary level if skip_root and isinstance(d, dict) and len(d) == 1: _, d = next(iter(d.items())) # loop over the remaining keys and values for k, v in d.items(): new_key = f"{parent_prefix}{sep}{k}" if parent_prefix else k # if v is dict, unpack if isinstance(v, dict): res.extend(flatten_dict(v, new_key, sep=sep, skip_root=False).items()) # if v is list, unpack elif isinstance(v, list): for i, item in enumerate(v): # flatten lists if item is not None and isinstance(item, dict): res.extend(flatten_dict(item, f"{new_key}_{i}", sep=sep, skip_root=False).\ items()) # otherwise add non-dict and None types else: res.append((f"{new_key}_{i}", item)) # otherwise simply assign else: res.append((new_key, v)) return dict(res)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_ecg_data(data_dict:dict[str, Any], config:dict[str, str], parse_numeric:bool=True, as_array:bool=False, bits:np.dtype | None =None, skip_empty:bool=True, **kwargs:Optional[Any], ) -> tuple[dict[str, Any], list[str]]: ''' Extracts metadata or signal data from a `data_dict` based on a supplied `config` dictionary. Parameters ---------- data_dict : `dict` [`str`, `any`] A dictionary with keys and values matching the config object config : `dict` [`str`, `str`] a dictionary where the values match some keys in `data_dict` and the keys represent the names these will be stored to. parse_numeric : `bool`, default `True` Will check if a numbers are accidentally presented as strings and parse these to numbers. as_array : `bool`, default `False` Whether data should be mapped to `np.array` using a direct map: `np.array(., dtype=bits)`. bits : `np.dtype`, default `None` np.array bits passed to numpy.array dtype. skip_empty : `bool`, default `True` Whether to skip config values not matching data_dict keys. **kwargs keyword arguments to `parse_number`. Returns ------- dict A dictionary with the extracted signal data as numpy.ndarray. list A list with `config` values which did not match `data_dict` keys. ''' # #### check input is_type(data_dict, dict) is_type(config, dict) is_type(skip_empty, bool) is_type(as_array, bool) is_type(parse_numeric, bool) # #### initialise object results_dict = {} results_missing = [] # #### extract signal data for t, s in config.items(): if s in data_dict: # Check if numerics are accidentally strings. if parse_numeric: parsed_string = parse_number(data_dict[s], **kwargs) results_dict[t] = parsed_string if isinstance(parsed_string, list): if len(parsed_string) == 1: results_dict[t] = parsed_string[0] else: results_dict[t] = data_dict[s] # Do we want to extract signals and store these in a numpy array.. if as_array == True: try: results_dict[t] = np.array(results_dict[t], dtype=bits) except ValueError: pass elif skip_empty == False and not s is None: # Should an Error be returned raise MissingTagError(s) else: # assign None and append missing metadata results_dict[t] = None results_missing.append(s) # return return results_dict, results_missing
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def subset_dict(data:dict[str, Any], pattern:dict[str, str], substitute:tuple[str,str]|None=(r'_[0-9]{1,2}\.*', ' '), character_trim:int=0, verbose:bool=True, skip_empty:bool=True, ) -> dict[str, Any]: """ This will identify a subset of `data` items based on a `startswith` call using the `pattern` key, and based on the patternl values this function will identify the single entry in the subset whose values contains a unique name which will be added as a prefix to the subset keys. Parameters --------- data : `dict` [`str`, `any`] The dictionary to be subsetted and transformed. pattern : `dict` [`str`, `str`] A dictionary where each key is a prefix to match against the keys of `data`, and each value defines the suffix to search within the matching keys in `data`. The value found in `data` corresponding to this suffix is used as the prefix for the resulting dictionary keys. The pattern keys will be matched to the data keys based on a `startswith`, while the pattern values will be matched to the data keys using: `value in key`. substitute : `tuple` [`str`,`str`] or `None`, default `(r"_[0-9]{1,2}\\.*", " ")` A tuple containing a regular expression pattern and replacement string. This substitution is applied to the remaining portion of the `data` key after removing the matching prefix. character_trim : int, default `0` The number of charecters which should be removed from the right-hand side of the `data` key which did not match the `pattern` key. verbose : `bool`, default `True` Whether warnings should be issued. Returns ------- dict [`str`, `any`] A dictionary with keys grouped and transformed based on the `pattern` and `substitute`. The keys are prefixed with values derived from `data`. Examples -------- >>> data = { ... "Sequence_0.Referenced Waveform Channels_0": "Channel 1", ... "Sequence_0.Referenced Waveform Channels_1": "Channel 2", ... "Sequence_0.Annotation Group Number": 1, ... "Sequence_0.Unformatted Text Value": "Event A", ... "Sequence_8.Measurement Units Code Sequence_0.Code Value": "bpm", ... "Sequence_8.Measurement Units Code Sequence_0.Code Meaning": "Heart Rate", ... "Sequence_15.Measurement Units Code Sequence_0.Code Meaning": "Temperature", ... "Sequence_15.Referenced Waveform Channels_1": "Channel 10", ... "Sequence_15.Numeric Value": 36.7, ... "Other Annotation Sequence_1.Some Value": "Other Data", ... "Other Annotation Sequence_1.Code Meaning": "Other Code Meaning", ... } >>> pattern = { ... "Sequence_15": "Code Meaning", ... "Sequence_8": "Code Meaning", ... "Sequence_16": "Code Meaning", ... "Sequence_11": "Code Meaning", ... "Other Annotation Sequence_1": "Code Meaning", ... } >>> subset_dict(data, pattern) {'Temperature (Referenced Waveform Channels)': 'Channel 10', 'Temperature (Numeric Value)': 36.7, 'Heart Rate (Measurement Units Code SequenceCode Value)': 'bpm', 'Other Code Meaning (Some Value)': 'Other Data'} """ # #### confirm input is_type(data, dict) is_type(pattern, dict) is_type(substitute, (type(None), tuple)) if not substitute is None: if len(substitute) != 2: raise ValueError("`substitute` should be a `Nonetype` or a tuple " "with exactly two entries.") # #### empty default # regex=True grouped = {} unmatched_keys = set(pattern.keys()) # #### loop for d_name, g_name in pattern.items(): # fist get all data entries where the key starts with `d_name` prefix = None sub_dict = {} for k, v in data.items(): mtch = k.startswith(d_name) if mtch: # remove from unmatched unmatched_keys.discard(d_name) # Subset the non-matching string d_name_len = len(d_name) str_remainder = k[d_name_len + character_trim:] # Do we want a cleaner string str_remainder_clean = str_remainder if not substitute is None: m, r = substitute str_remainder_clean = re.sub(m, r, str_remainder).strip() # Check g_name is in str_remainder if g_name in str_remainder: if prefix is not None: raise ValueError( f"For pattern {d_name}, `prefix`: {prefix} is " f"already set, cannot assign new value: {v} " f"please ensure `{g_name}` is unique.") prefix = v else: sub_dict[str_remainder_clean] = v # next confirm prefix is not None, and add the prefix if len(sub_dict) > 0: if prefix is None: if skip_empty: continue else: raise ValueError(f"`{g_name}` was not found in data.") grouped.update( {f"{prefix} ({key})": val for key, val in sub_dict.items()} ) del sub_dict # do we want to return un-matching keys if verbose and unmatched_keys: warnings.warn("The following keys did not match any pattern: " f"{', '.join(unmatched_keys)}") # return return grouped