Source code for ecgprocess.utils.ecg_tools

'''
Collecting established tools for ECG derivation or cleaning.
'''

import numpy as np
from ecgprocess.constants import (
    CoreData as Core,
)
from scipy import signal
from ecgprocess.errors import (
    is_type,
)
from typing import Any
import warnings

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# constants
CLeads = Core.Leads
_STANDARD_LEADS: tuple[str, ...] = (
    'I', 'II', 'III', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'aVF', 'aVL', 'aVR'
)

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def resampling_500hz(signals:dict[str, np.array], duration: int | float | None=None, median:bool=False) -> dict[str, np.array]: """ Re-sample an ECG signal to 500 hz. Parameters ---------- signals : `dict` [`str`, np.array] A dictionary with the lead names as string keys and the signals as a 1D np.array. duration : `int` or `float` The represents the duration of the ECG in seconds, which is calculated based on the fraction of number of samples by the sampling frequency in seconds. For raw wavefomrs duration determines the number of samples needed to get a 500hz sample: duration times 500. median : `bool`, default `False` Set to true to resample a median beat ECG to 500hz. The duration of a median beat signal is 1.2 seconds, hence the number of samples is fixed at: 1.2 times 500 = 600. """ is_type(signals, dict) is_type(duration, (type(None), int, float)) is_type(median, bool) # #### get number of samples num_samples = 600 if median == False: if duration is None: raise ValueError('`duration` should not be `NoneType` when `median` ' 'is `False`.') num_samples = int(duration * 500) # #### resample new_dict = {} for l, sig in signals.items(): new_dict[l] = signal.resample(sig, num_samples) # return return new_dict
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_limb_leads(signals:dict[str, np.array], lead_I:str=CLeads.I, lead_II:str=CLeads.II, ) -> dict[str, np.array]: """ Calculate the derived limb leads (III, aVR, aVL, aVF) from leads I and II. Parameters ---------- signals : `dict` [`str`, np.array] A dictionary with the lead names as string keys and the signals as a 1D np.array. lead_I : `str`, default 'I' The key name for lead I in `signals` lead_II : `str`, default 'II' The key name for lead II in `signals` Returns ------- dict A dictionary including limb lead signals. Notes ----- please see this `url <https://ecgwaves.com/topic/ekg-ecg-leads-electrodes-systems-limb-chest-precordial/>`_ for the relevant explanation about the relationships between leads I and II and the limb leads. """ # #### check input and set constants is_type(signals, dict) is_type(lead_I, str) is_type(lead_II, str) missing_src = [l for l in (lead_I, lead_II) if signals.get(l) is None] if missing_src: raise KeyError( f'Source leads {missing_src} are not available in `signals`.' ) # making sure we do not affect the original parsed signals signals = dict(signals) # #### get limb leads if signals.get(CLeads.III) is None: signals[CLeads.III] = np.subtract(signals[CLeads.II], signals[CLeads.I]) if signals.get(CLeads.aVR) is None: signals[CLeads.aVR] = np.add(signals[CLeads.I], signals[CLeads.II]) * (-0.5) if signals.get(CLeads.aVL) is None: signals[CLeads.aVL] = np.subtract(signals[CLeads.I], 0.5 * signals[CLeads.II]) if signals.get(CLeads.aVF) is None: signals[CLeads.aVF] = np.subtract(signals[CLeads.II], 0.5 * signals[CLeads.I]) # return return signals
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def signal_dicts_to_numpy_array( signals: list[dict[str, np.ndarray]], leads: list[str] | tuple[str, ...] | None = _STANDARD_LEADS, padding: bool = True, ) -> np.ndarray: """ Convert a list of ECG signal dictionaries to a 3D NumPy array suitable for deep learning. Parameters ---------- signals : `list` [`dict` [`str`, `np.ndarray`]] List where each dictionary represents an ECG sample with lead names as keys and numpy arrays as values. leads : `list` [`str`] or `tuple` [`str`, ...] or `None`, \ default ``_STANDARD_LEADS`` Lead names to include and their order. Defaults to the standard 12-lead set defined in ``_STANDARD_LEADS`` (an immutable tuple). Pass `None` to collect all unique leads found across samples in sorted order. padding : `bool`, default `True` Whether to pad shorter signals to the length of the longest signal. If `False`, all signals must have the same length. Default is `True`. Returns ------- np.ndarray 3D NumPy array with shape `(num_samples, num_leads, signal_length)` containing the ECG data. Raises ------ ValueError If `ecg_data` is empty. If any sample is missing leads specified in `leads`. If `padding` is `False` and signals have varying lengths. Notes ----- The numpy array column matches the order of the supplied leads. """ # check input is_type(signals, list) if not signals: raise ValueError("The signals list is empty.") # Determine the list of leads if leads is None: # Collect all unique leads across samples unique_leads: set[str] = set() for sample in signals: unique_leads.update(sample.keys()) leads = sorted(unique_leads) else: # Ensure leads is a sequence of strings if not isinstance(leads, (list, tuple)) or not all( isinstance(lead, str) for lead in leads ): raise TypeError("`leads` must be a list or tuple of strings.") leads = list(leads) # set the samples and lead numbers num_samples = len(signals) num_leads = len(leads) # do we want to pad the signals if needed. if padding: # Find the maximum length among all leads in all samples signal_length = max( len(sample.get(lead, [])) for sample in signals for lead in leads ) else: # Ensure all signals have the same length lengths = [ len(sample.get(lead, [])) for sample in signals for lead in leads ] unique_lengths = set(lengths) if len(unique_lengths) != 1: raise ValueError( "All signals must have the same length when padding is False." ) signal_length = unique_lengths.pop() # Initialize the array with NaNs data_array = np.full( (num_samples, num_leads, signal_length), np.nan, dtype=np.float32 ) for i, sample in enumerate(signals): for j, lead in enumerate(leads): lead_data = sample.get(lead, np.array([])) # Validate that lead_data is a np.array if not isinstance(lead_data, np.ndarray): raise TypeError( f"Lead '{lead}' in sample {i} is not a NumPy array." ) current_length = len(lead_data) if current_length == 0: # If lead data is missing or empty, leave it as NaN continue elif padding: if current_length > signal_length: # Truncate the signal if it's longer than signal_length warnings.warn('Signal is longer than the max length ' 'this is unexpected, please check input.') data_array[i, j, :] = lead_data[:signal_length] else: # Pad the signal with NaNs if it's shorter than # signal_length data_array[i, j, :current_length] = lead_data else: if current_length != signal_length: raise ValueError( f"Signal length for lead '{lead}' in sample {i} " f"({current_length}) does not match signal_length " f"({signal_length})." ) data_array[i, j, :] = lead_data # return return data_array
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def signal_calibration(signal:np.ndarray, correctionfactor: float, baseline: float, ) -> np.ndarray: """ Adjusts the ECG signal by subtracting the channel baseline from the signal, followed by multiplying the adjusted signal by the channel correction factor. Parameters ---------- signal : `np.ndarray` The lead-specific ECG signal. correctionfactor : `float` The channel correction factor. baseline : `float` The channel baseline. Returns ------- np.ndarray The recalibrated signal. """ is_type(signal, np.ndarray) is_type(correctionfactor, float) is_type(baseline, float) # algorithm new_signal = (signal - baseline) * correctionfactor # return return new_signal
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def signal_resolution(signal:np.ndarray, resolution_current: float, resolution_target: float, ) -> np.ndarray: """ Adjust the amplitude scale of an ECG signal to match a desired resolution. Parameters ---------- signal : `np.ndarray` The lead-specific ECG signal. resolution_current : `float` The current resolution. resolution_target : `float` The target resolution. Returns ------- np.ndarray The rescaled signal. Example ------- >>> import numpy as np >>> ecg_signal = np.array([10, 20, 30, 40, 50]) >>> current_res = 2.0 # each digital unit equals 2 μV >>> new_signal = adjust_resolution( ... ecg_signal, ... resolution_current=current_res, ... resolution_target=5 ... ) >>> print(new_signal) [ 25. 50. 75. 100. 125.] """ is_type(signal, np.ndarray) is_type(resolution_current, float) is_type(resolution_target, float) # algorithm new_signal = signal * resolution_target/resolution_current # return return new_signal