Source code for ecgprocess.utils.ecg_tools

'''
Collecting established tools for ECG derivation or cleaning.
'''

import numpy as np
from ecgprocess.constants import (
    CoreData as Core,
)
from scipy import signal
from ecgprocess.errors import (
    is_type,
)
from typing import Any

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# constants
CLeads = Core.Leads

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def resampling_500hz(signals:dict[str, np.array], duration: int | float | None=None, median:bool=False) -> dict[str, np.array]: """ Re-sample an ECG signal to 500 hz. Parameters ---------- signals : `dict` [`str`, np.array] A dictionary with the lead names as string keys and the signals as a 1D np.array. duration : `int` or `float` The represents the duration of the ECG in seconds, which is calculated based on the fraction of number of samples by the sampling frequency in seconds. For raw wavefomrs duration determines the number of samples needed to get a 500hz sample: duration times 500. median : `bool`, default `False` Set to true to resample a median beat ECG to 500hz. The duration of a median beat signal is 1.2 seconds, hence the sampling rate is fixed at: 1.2 times 500 = 600. """ is_type(signals, dict) is_type(duration, (type(None), int, float)) is_type(median, bool) # #### get sampling rate samp_rate = 600 if median == False: if duration is None: raise ValueError('`duration` should not be `NoneType` when `median` ' 'is `False`.') samp_rate = duration * 500 # #### resample new_dict = {} for l, sig in signals.items(): new_dict[l] = signal.resample(sig, samp_rate) # return return new_dict
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_limb_leads(signals:dict[str, np.array], lead_I:str=CLeads.I, lead_II:str=CLeads.II, ) -> dict[str, np.array]: """ Calculate the derived limb leads (III, aVR, aVL, aVF) from leads I and II. Parameters ---------- signals : `dict` [`str`, np.array] A dictionary with the lead names as string keys and the signals as a 1D np.array. lead_I : `str`, default 'I' The key name for lead I in `signals` lead_II : `str`, default 'II' The key name for lead II in `signals` Returns ------- dict A dictionary including limb lead signals. Notes ----- please see this `url <https://ecgwaves.com/topic/ekg-ecg-leads-electrodes-systems-limb-chest-precordial/>`_ for the relevant explantion about the relationships between leads I and II and the limb leads. """ # #### check input and set constants is_type(signals, dict) is_type(lead_I, str) is_type(lead_II, str) if not {lead_I, lead_II}.issubset(signals): raise KeyError('leads I and/or II are not available in `signals`: ' f'{signals.keys()}') # #### get limb leads if not CLeads.III in signals: signals[CLeads.III] = np.subtract(signals[CLeads.II], signals[CLeads.I]) if not CLeads.aVR in signals: signals[CLeads.aVR] = np.add(signals[CLeads.I], signals[CLeads.II]) * (-0.5) if not CLeads.aVL in signals: signals[CLeads.aVL] = np.subtract(signals[CLeads.I], 0.5 * signals[CLeads.II]) if not CLeads.aVF in signals: signals[CLeads.aVF] = np.subtract(signals[CLeads.II], 0.5 * signals[CLeads.I]) # return return signals
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # TODO write pytest
[docs] def signal_dicts_to_numpy_array( signals: list[dict[str, np.ndarray]], leads: list[str] | None = ['I', 'II', 'III', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'aVF', 'aVL', 'aVR'], padding: bool = True, ) -> np.ndarray: """ Convert a list of ECG signal dictionaries to a 3D NumPy array suitable for deep learning. Parameters ---------- signals : `list` [`dict` [`str`, `np.ndarray`]] List where each dictionary represents an ECG sample with lead names as keys and numpy arrays as values. leads : `list` [`str`] or `None` List of lead names to include and their order. If `None`, all unique leads across samples are used in sorted order. padding : `bool`, default `True` Whether to pad shorter signals to the length of the longest signal. If `False`, all signals must have the same length. Default is `True`. Returns ------- np.ndarray 3D NumPy array with shape `(num_samples, num_leads, signal_length)` containing the ECG data. Raises ------ ValueError If `ecg_data` is empty. If any sample is missing leads specified in `leads`. If `padding` is `False` and signals have varying lengths. Notes ----- The numpy array column matches the order of the supplied leads. """ # check input is_type(signals, list) if not signals: raise ValueError("The signals list is empty.") # Determine the list of leads if leads is None: # Collect all unique leads across samples unique_leads = set() for sample in signals: unique_leads.update(sample.keys()) leads = sorted(unique_leads) else: # Ensure leads is a list of strings if not isinstance(leads, list) or not all( isinstance(lead, str) for lead in leads ): raise TypeError("`leads` must be a list of strings.") # set the samples and lead numbers num_samples = len(signals) num_leads = len(leads) # do we want to padd the signals if needed. if padding: # Find the maximum length among all leads in all samples signal_length = max( len(sample.get(lead, [])) for sample in signals for lead in leads ) else: # Ensure all signals have the same length lengths = [ len(sample.get(lead, [])) for sample in signals for lead in leads ] unique_lengths = set(lengths) if len(unique_lengths) != 1: raise ValueError( "All signals must have the same length when padding is False." ) signal_length = unique_lengths.pop() # Initialize the array with NaNs data_array = np.full( (num_samples, num_leads, signal_length), np.nan, dtype=np.float32 ) for i, sample in enumerate(signals): for j, lead in enumerate(leads): lead_data = sample.get(lead, np.array([])) # Validate that lead_data is a np.array if not isinstance(lead_data, np.ndarray): raise TypeError( f"Lead '{lead}' in sample {i} is not a NumPy array." ) current_length = len(lead_data) if current_length == 0: # If lead data is missing or empty, leave it as NaN continue elif padding: if current_length > signal_length: # Truncate the signal if it's longer than signal_length warnings.warn('Signal is longer than the max length ' 'this is unexpeted, please check input.') data_array[i, j, :] = lead_data[:signal_length] else: # Pad the signal with NaNs if it's shorter than # signal_length data_array[i, j, :current_length] = lead_data else: if current_length != signal_length: raise ValueError( f"Signal length for lead '{lead}' in sample {i} " f"({current_length}) does not match signal_length " f"({signal_length})." ) data_array[i, j, :] = lead_data # return return data_array
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # TODO add pytest
[docs] def signal_calibration(signal:np.ndarray, correctionfactor: float, baseline: float, ) -> np.ndarray: """ Adjusts the ECG signal by subtracting the channel baseline from the signal, followed by multiplying the adjusted singal by the channel correction factor. Parameters ---------- signal : `np.ndarray` The lead-specific ECG signal. correctionfactor : `float` The channel correction factor. baseline : `float` The channel baseline. Returns ------- np.ndarray The recalibrated signal. """ is_type(signal, np.ndarray) is_type(correctionfactor, float) is_type(baseline, float) # algorithm new_signal = (signal - baseline) * correctionfactor # return return new_signal
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def signal_resolution(signal:np.ndarray, resolution_current: float, resolution_target: float, ) -> np.ndarray: """ Adjust the amplitude scale of an ECG signal to match a desired resolution. Parameters ---------- signal : `np.ndarray` The lead-specific ECG signal. resolution_current : `float` The current resolution. resolution_target : `float` The target resolution. Returns ------- np.ndarray The rescaled signal. Example ------- >>> import numpy as np >>> ecg_signal = np.array([10, 20, 30, 40, 50]) >>> current_res = 2.0 # each digital unit equals 2 μV >>> new_signal = adjust_resolution( ... ecg_signal, ... resolution_current=current_res, ... resolution_target=5 ... ) >>> print(new_signal) [ 25. 50. 75. 100. 125.] """ is_type(signal, np.ndarray) is_type(resolution_current, float) is_type(resolution_target, float) # algorithm new_signal = signal * resolution_target/resolution_current # return return new_signal