"""
A module containing a collection of function or classes which can be used
as engineering functions in the Tabular module.
The listed programs are meant to provide an idea of potentially relevant
solutions. Users can use these functions out of the box, adapt them, or
simply write their own custom code.
When writing your own engineering solution remember that the first argument
will take the metadata, waveforms, or metadata. The waveforms and metadata
functions should have a kwargs argument which will be used internally by
Tabular to pass meta_dict (the metadata of the file being processed) to the
function environment making it available to alter the signal data.
"""
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# imports
import re
import logging
import numpy as np
from typing import (
Callable, Self, Optional, Any, Literal,
)
from ecgprocess.errors import (
FileValidationError,
InputValidationError,
is_type,
)
from ecgprocess.utils.ecg_tools import(
signal_dicts_to_numpy_array,
signal_calibration,
signal_resolution,
)
from ecgprocess.constants import (
TabularNames as TabNames,
)
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
_log = logging.getLogger(__name__)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def signal_correction(
signals:dict[str, np.ndarray], baseline_name:str='wave_channel_baseline_',
correctionfactor_name:str='wave_channel_correctionfactor_',
**kwargs) -> dict[str, np.ndarray]:
"""
Adjusts the signals by subtracting the channel baseline multiplied
by the channel correction factor. These parameters must be provided in
`kwargs[TabNames.META_DICT]`.
Parameters
----------
signals : `dict` [`str`, `np.ndarray`]
A dictionary mapping channel names (strings) to waveform arrays.
baseline_name : str
The dictionary key name for the channel baseline. Will internally add
a numeric suffix ranging from 0 to 11 (inclusive).
correctionfactor_name : str
The dictionary key name for the channel correctionfactor. Will
internally add a numeric suffix ranging from 0 to 11 (inclusive).
**kwargs
Additional keyword arguments, which must include a dictionary under
the key `TabNames.META_DICT`. This dictionary should contain:
- wave_channel_correctionfactor_i : float
Correction factor for channel i.
- wave_channel_baseline_i : float
Baseline offset for channel i.
Returns
-------
dict [`str`, `np.ndarray`]
The input signals dictionary with corrected signals.
Raises
------
KeyError
If `TabNames.META_DICT` is not found in **kwargs.
"""
# constants - these values should be in meta_dict
corr = correctionfactor_name
base = baseline_name
# TODO
# add a check/raise error for the presence or absence of these keys in
# meta_dict.
# the algorithm
if not TabNames.META_DICT in kwargs:
raise KeyError(f"`{TabNames.META_DICT}` should be included as kwargs")
else:
meta_dict = kwargs[TabNames.META_DICT]
signals = dict(signals)
for i, (k, v) in enumerate(signals.items()):
# skip of None
if v is None:
signals[k] = v
continue
# confirming this is a np.array
is_type(v, np.ndarray)
_log.debug(
'Applying baseline: %s and factor: %s corrections to lead: %s.',
meta_dict[base+str(i)], meta_dict[corr+str(i)], k,
)
signals[k] = signal_calibration(
v, correctionfactor=meta_dict[corr+str(i)],
baseline=meta_dict[base+str(i)],
)
return signals
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def signal_standardise_res(
signals:dict[str, np.ndarray], resolution_name:str = 'wave_channel_sens_',
target_resolution:float = 5., **kwargs,
) -> dict[str, np.ndarray]:
"""
Standardise the resolution signal by adjusting the amplitude scale by the
ratio of the source and target solution.
Parameters
----------
signals : `dict` [`str`, `np.ndarray`]
A dictionary mapping channel names (strings) to waveform arrays.
resolution_name : str
The dictionary key name for the channel sensitivity/resolution. Will
internally add a numeric suffix ranging from 0 to 11 (inclusive).
target_resolution : `float`, default 5
The target resolution.
**kwargs
Additional keyword arguments, which must include a dictionary under
the key `TabNames.META_DICT`. This dictionary should contain:
- wave_channel_sens : float
The wave channel sensitivity/resolution for channel i.
Returns
-------
dict [`str`, `np.ndarray`]
The input signals dictionary with corrected signals.
Raises
------
KeyError
If `TabNames.META_DICT` is not found in **kwargs.
If resolution_name+str(i) is not found in `TabNames.META_DICT`.
Notes
-----
The function will apply a scaling factor of
source_resolution/target_resolution to ensure the returned signal has the
desired target uV.
"""
# constants - these values should be in meta_dict
sens = resolution_name
# the algorithm
if not TabNames.META_DICT in kwargs:
raise KeyError(f"`{TabNames.META_DICT}` should be included as kwargs")
else:
meta_dict = kwargs[TabNames.META_DICT]
signals = dict(signals)
for i, (k, v) in enumerate(signals.items()):
# skip of None
if v is None:
signals[k] = v
continue
# confirming this is a np.array
is_type(v, np.ndarray)
# check key is in dict
key_name = sens+str(i)
if not key_name in meta_dict:
raise KeyError(f"`{key_name}` not found in `{TabNames.META_DICT}`.")
if target_resolution/meta_dict[key_name] != 1.0:
_log.debug(
'Rescaling lead `%s` by factor `%s`.',
k, target_resolution/meta_dict[key_name],
)
signals[k] = signal_resolution(
v, resolution_current=meta_dict[key_name],
resolution_target=target_resolution,
)
return signals
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[docs]
class LeadMapper(object):
"""
Normalise ECG lead ordering when a device writes channels out of sequence.
Some DICOM devices store channels in a non-standard order (e.g. Lead II
data in channel-0 slot). ``LeadMapper`` reads ``signal name X`` entries
from ``meta_dict``, builds an actual-label-to-key mapping, then reassigns
each canonical key in ``signals`` to the correct array.
Parameters
----------
accepted_mappings : `dict` [`str`, `list` [`str`]]
Maps each canonical lead key (e.g. ``'I'``) to a list of device
label strings that are acceptable for that lead
(e.g. ``['I', 'Lead I', 'Lead I (Einthoven)']``).
Attributes
----------
accepted_mappings : `dict` [`str`, `list` [`str`]]
The accepted lead label mappings supplied at initialisation.
Methods
-------
__call__(signals, **kwargs)
Reassign signal arrays to their canonical lead keys.
Raises
------
InputValidationError
If ``accepted_mappings`` is not a ``dict``, or if any value is
not a ``list``.
Notes
-----
The callable works for both ``engineer_wave`` and ``engineer_median``
call sites in ``tabular.py`` since both share the same signature
``(signals, meta_dict=meta_temp)``.
"""
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def __init__(self, accepted_mappings: dict[str, list[str]]) -> None:
"""Initialise LeadMapper."""
is_type(accepted_mappings, dict)
for key, val in accepted_mappings.items():
if not isinstance(val, list):
raise InputValidationError(
f"accepted_mappings values must be lists; "
f"key '{key}' has type {type(val)}"
)
self.accepted_mappings = accepted_mappings
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def __call__(
self,
signals: dict[str, np.ndarray],
**kwargs,
) -> dict[str, np.ndarray]:
"""
Reassign signal arrays to their canonical lead keys.
Parameters
----------
signals : `dict` [`str`, `np.ndarray`]
Dict mapping canonical key names to waveform arrays, as parsed
by the config parser (keys correct, values potentially in wrong
channel order).
**kwargs
Must include ``meta_dict`` (keyed by ``TabNames.META_DICT``),
a dict containing ``'signal name N'`` entries that map channel
indices to device lead labels.
Returns
-------
mapped_signals : `dict` [`str`, `np.ndarray`]
Shallow copy of ``signals`` with arrays reassigned to the correct
canonical keys.
Raises
------
KeyError
If ``TabNames.META_DICT`` is absent from ``kwargs``, or if no
device label matches a required canonical key.
"""
# check input
is_type(signals, dict)
if TabNames.META_DICT not in kwargs:
raise KeyError(
f"`{TabNames.META_DICT}` should be included as kwargs"
)
# map data
meta_dict = kwargs[TabNames.META_DICT]
mapped_signals = dict(signals)
signal_keys = list(mapped_signals.keys())
# Map each canonical slot key to the device label the device wrote
# there; e.g. {'I': 'Lead II'} means channel 0 holds 'Lead II' data.
actual_mappings = {
signal_keys[int(re.sub(r"^signal name\s*", "", k))]: v
for k, v in meta_dict.items()
if k.startswith('signal name')
}
_log.debug('Accepted lead mappings: %s', self.accepted_mappings)
_log.debug('Actual lead mappings: %s', actual_mappings)
# Now apply the mappings
for accepted_key, accepted_vals in self.accepted_mappings.items():
match = False
for actual_key, actual_val in actual_mappings.items():
if actual_val in accepted_vals:
_log.debug(
"Mapping '%s' (%s) <- actual '%s' (%s)",
accepted_key, accepted_vals, actual_key, actual_val,
)
mapped_signals[accepted_key] = signals[actual_key]
match = True
break
if not match:
raise KeyError(
f"No device label found for canonical key '{accepted_key}' "
f"(accepted: {accepted_vals})"
)
return mapped_signals
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def __repr__(self) -> str:
"""Return unambiguous string representation."""
CLASS_NAME = type(self).__name__
return f"{CLASS_NAME}(accepted_mappings={self.accepted_mappings!r})"
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def __str__(self) -> str:
"""Return human-readable string representation."""
CLASS_NAME = type(self).__name__
leads = list(self.accepted_mappings.keys())
return f"{CLASS_NAME} with {len(leads)} canonical leads: {leads}"