Data sets

Data set discovery using Prefix Trees

Data sets are discovered based on being folders within the provided data set root directory which contain subdirectories that start with cleaned_.

Once the data sets are discovered, we take the cleaned_<feature> subdirectories and use the <feature> as the feature name.

Then we take the files within the cleaned_<feature> subdirectories and discover the ids that data set has for that feature. These do not need to be the same across features, hence all of our data getters might also return None.

Automagic ID discovery is done using a prefix tree, which is a data structure that allows for efficient searching of strings based on their prefixes.


source

IdExtractor

 IdExtractor (delimiter:str='', key:str='')

*Class extending the prefix trees that incorporates the algorithm for extracting IDs from a list of file names. The algorithm is somewhat oblique, so it’s better to just use the extract_ids method versus trying to use the prfix trees directly at the call site.

The algorithm is based on the assumption that the IDs are the same across all file names, but that the file names may have different suffixes. The algorithm reverses the file names, inserts them into the tree, and then simplifes and flattens that tree in order to find the IDs as leaves of that simplified tree.

  1. Insert the file name string into the tree, but with each string reversed.
  2. Simplify the tree, combining nodes with only one child.
  3. There may be unexpected suffix matches for these IDs, so we flatten the tree to depth 1, meaning all children of the root are combined to make leaves.
  4. The leaves are the IDs we want to extract. However, we must reverse these leaf keys to get the original IDs, since we reversed the file names in step 1.

TODO: * If we want to find IDs for files with differing prefixes instead, we should instead insert the file names NOT reversed and then NOT reverse in the last step.

  • To handle IDs that appear in the middle of file names, we can use both methods to come up with a list of potential IDs based on prefix and suffix, then figure out the “intersection” of those lists. (Maybe using another prefix tree?)*

source

SimplifiablePrefixTree

 SimplifiablePrefixTree (delimiter:str='', key:str='')

A standard prefix tree with the ability to “simplify” itself by combining nodes with only one child. These also have the ability to “flatten” themselves, which means to convert all nodes at and below a certain depth into leaves on the most recent ancestor of that depth.

Type Default Details
delimiter str The delimiter to use when splitting words into characters. If empty, the words are treated as sequences of characters.
key str The key of the current node in its parent’s .children dictionary. If empty, the node is (likely) the root of the tree.

source

DataSetObject

 DataSetObject (name:str, path:pathlib.Path)

Initialize self. See help(type(self)) for accurate signature.


source

psg_to_WLDM

 psg_to_WLDM (psg:polars.dataframe.frame.DataFrame, N4:bool=True)

** map all positive classes as follows: If N4 is True: - 1, 2 => 1 (light sleep) - 3, 4 => 2 (deep sleep) - 5 => 3 (REM) If N4 is False: - 1, 2 => 1 (light sleep) - 3 => 2 (deep sleep) - 4 => 3 (REM) * retain all 0 (wake) and -1 (mask) classes*


source

to_WLDM

 to_WLDM (x:float, N4:bool=True)

Map sleep stages to wake, light, deep, and REM sleep. Retain masked values. If N4 stage is not present, PSG=4 is mapped to REM. Otherwise it is mapped to deep sleep.


source

psg_to_sleep_wake

 psg_to_sleep_wake (psg:polars.dataframe.frame.DataFrame)

** map all positive classes to 1 (sleep) * retain all 0 (wake) and -1 (mask) classes*


source

ModelInputSpectrogram

 ModelInputSpectrogram (input_features:Union[List[str],str],
                        input_sampling_hz:int|float, spectrogram_preproces
                        sing_config:Dict={'preprocessing': [{'args':
                        {'window_size': 30, 'fs': 32}, 'type': 'median'},
                        {'args': {'iqr_window': 300, 'median_window': 300,
                        'fs': 32}, 'type': 'iqr_normalization_adaptive'},
                        {'args': {'threshold': 20, 'fs': 32}, 'type':
                        'clip_by_iqr'}, {'args': {'fs': 32, 'nfft': 512,
                        'f_max': 6, 'f_min': 0, 'f_sub': 3, 'window': 320,
                        'noverlap': 256}, 'type': 'cal_psd'}]})

Initialize self. See help(type(self)) for accurate signature.

Type Default Details
input_features Union
input_sampling_hz int | float Sampling rate of the input data (1/s)
spectrogram_preprocessing_config Dict {‘preprocessing’: [{‘args’: {‘window_size’: 30, ‘fs’: 32}, ‘type’: ‘median’}, {‘args’: {‘iqr_window’: 300, ‘median_window’: 300, ‘fs’: 32}, ‘type’: ‘iqr_normalization_adaptive’}, {‘args’: {‘threshold’: 20, ‘fs’: 32}, ‘type’: ‘clip_by_iqr’}, {‘args’: {‘fs’: 32, ‘nfft’: 512, ‘f_max’: 6, ‘f_min’: 0, ‘f_sub’: 3, ‘window’: 320, ‘noverlap’: 256}, ‘type’: ‘cal_psd’}]} Steps in the preprocessing pipeline for getting a spectrogram from acceleration

source

ModelInput1D

 ModelInput1D (input_features:Union[List[str],str],
               input_sampling_hz:int|float, input_window_time:int|float)

Initialize self. See help(type(self)) for accurate signature.

Type Details
input_features Union
input_sampling_hz int | float Sampling rate of the input data (1/s)
input_window_time int | float Window size (in seconds) for the input data. Window will be centered around the time point for which the model is making a prediction

source

ModelInput

 ModelInput (input_features:Union[List[str],str],
             input_sampling_hz:int|float)

Initialize self. See help(type(self)) for accurate signature.

Type Details
input_features Union
input_sampling_hz int | float Sampling rate of the input data (1/s)

source

PSGType

 PSGType (value, names=None, module=None, qualname=None, type=None,
          start=1)

An enumeration.


source

ModelOutputType

 ModelOutputType (value, names=None, module=None, qualname=None,
                  type=None, start=1)

An enumeration.


source

psg_to_WLDM

 psg_to_WLDM (psg:polars.dataframe.frame.DataFrame, N4:bool=True)

** map all positive classes as follows: If N4 is True: - 1, 2 => 1 (light sleep) - 3, 4 => 2 (deep sleep) - 5 => 3 (REM) If N4 is False: - 1, 2 => 1 (light sleep) - 3 => 2 (deep sleep) - 5 => 3 (REM) * retain all 0 (wake) and -1 (mask) classes*


source

to_WLDM

 to_WLDM (x:float, N4:bool=True)

Map sleep stages to wake, light, deep, and REM sleep. Retain masked values. If N4 stage is not present, PSG=4 is mapped to REM. Otherwise it is mapped to deep sleep.


source

psg_to_sleep_wake

 psg_to_sleep_wake (psg:polars.dataframe.frame.DataFrame)

** map all positive classes to 1 (sleep) * retain all 0 (wake) and -1 (mask) classes*


source

ModelInputSpectrogram

 ModelInputSpectrogram (input_features:Union[List[str],str],
                        input_sampling_hz:int|float, spectrogram_preproces
                        sing_config:Dict={'preprocessing': [{'args':
                        {'window_size': 30, 'fs': 32}, 'type': 'median'},
                        {'args': {'iqr_window': 300, 'median_window': 300,
                        'fs': 32}, 'type': 'iqr_normalization_adaptive'},
                        {'args': {'threshold': 20, 'fs': 32}, 'type':
                        'clip_by_iqr'}, {'args': {'fs': 32, 'nfft': 512,
                        'f_max': 6, 'f_min': 0, 'f_sub': 3, 'window': 320,
                        'noverlap': 256}, 'type': 'cal_psd'}]})

Initialize self. See help(type(self)) for accurate signature.

Type Default Details
input_features Union
input_sampling_hz int | float Sampling rate of the input data (1/s)
spectrogram_preprocessing_config Dict {‘preprocessing’: [{‘args’: {‘window_size’: 30, ‘fs’: 32}, ‘type’: ‘median’}, {‘args’: {‘iqr_window’: 300, ‘median_window’: 300, ‘fs’: 32}, ‘type’: ‘iqr_normalization_adaptive’}, {‘args’: {‘threshold’: 20, ‘fs’: 32}, ‘type’: ‘clip_by_iqr’}, {‘args’: {‘fs’: 32, ‘nfft’: 512, ‘f_max’: 6, ‘f_min’: 0, ‘f_sub’: 3, ‘window’: 320, ‘noverlap’: 256}, ‘type’: ‘cal_psd’}]} Steps in the preprocessing pipeline for getting a spectrogram from acceleration

source

ModelInput1D

 ModelInput1D (input_features:Union[List[str],str],
               input_sampling_hz:int|float, input_window_time:int|float)

Initialize self. See help(type(self)) for accurate signature.

Type Details
input_features Union
input_sampling_hz int | float Sampling rate of the input data (1/s)
input_window_time int | float Window size (in seconds) for the input data. Window will be centered around the time point for which the model is making a prediction

source

ModelInput

 ModelInput (input_features:Union[List[str],str],
             input_sampling_hz:int|float)

Initialize self. See help(type(self)) for accurate signature.

Type Details
input_features Union
input_sampling_hz int | float Sampling rate of the input data (1/s)

source

PSGType

 PSGType (value, names=None, module=None, qualname=None, type=None,
          start=1)

An enumeration.


source

ModelOutputType

 ModelOutputType (value, names=None, module=None, qualname=None,
                  type=None, start=1)

An enumeration.


source

fill_gaps_in_accelerometer_data

 fill_gaps_in_accelerometer_data (acc:polars.dataframe.frame.DataFrame,
                                  smooth:bool=False,
                                  final_sampling_rate_hz:int|None=None)

source

apply_gausian_filter

 apply_gausian_filter (df:polars.dataframe.frame.DataFrame,
                       sigma:float=1.0, overwrite:bool=False)

source

mask_psg_from_accel

 mask_psg_from_accel (psg:numpy.ndarray, accel:numpy.ndarray,
                      psg_epoch:int=30, accel_sample_rate:float|None=None,
                      min_epoch_fraction_covered:float=0.5)

source

get_sample_weights

 get_sample_weights (y:numpy.ndarray)

Calculate sample weights based on the distribution of classes in the data. Doesn’t count masked values (-1) in the class distribution.


source

DataProcessor

 DataProcessor (data_set:__main__.DataSetObject,
                model_input:__main__.ModelInput, output_feature:str='psg',
                output_type:__main__.ModelOutputType=<ModelOutputType.WAKE
                _LIGHT_DEEP_REM: 2>,
                psg_type:__main__.PSGType=<PSGType.NO_N4: 1>)

Initialize self. See help(type(self)) for accurate signature.

# not exported
def _create_mock_data():
    np.random.seed(42) # by Deep Thought
    mock_data_location = "../mock_data_sets"
    if not os.path.exists(mock_data_location):
        os.makedirs(mock_data_location)
    total_time_hrs = 1.0
    total_subjects = 3
    total_data_sets = 2
    accelerometer_sampling_hz = 1.0 
    activity_dt_seconds = 15.0
    max_activity = 50
    psg_dt_seconds = 30.0
    for data_set in range(total_data_sets):
        data_set_path = f"{mock_data_location}/data_set_{data_set}"
        if not os.path.exists(data_set_path):
            os.makedirs(data_set_path)
        # Accelerometer data
        accelerometer_path = f"{data_set_path}/cleaned_accelerometer"
        if not os.path.exists(accelerometer_path):
            os.makedirs(accelerometer_path)
        accelerometer_time = np.arange(0, total_time_hrs * 3600, 1.0 / accelerometer_sampling_hz)
        for i in range(total_subjects):
            accelerometer_data = np.random.randn(len(accelerometer_time), 3)
            accelerometer = pl.DataFrame({
                'timestamp': accelerometer_time,
                'x': accelerometer_data[:, 0],
                'y': accelerometer_data[:, 1],
                'z': accelerometer_data[:, 2],
            })
            subject_path = f"{accelerometer_path}/id00{i}_cleaned_motion.out"
            accelerometer.write_csv(subject_path, include_header=False, separator=' ')
        # Activity data
        activity_path = f"{data_set_path}/cleaned_activity"
        if not os.path.exists(activity_path):
            os.makedirs(activity_path)
        activity_time = np.arange(0, total_time_hrs * 3600, activity_dt_seconds)
        for i in range(total_subjects):
            activity_data = np.random.randint(0, max_activity, len(activity_time))
            activity = pl.DataFrame({
                'timestamp': activity_time,
                'activity': activity_data,
            })
            subject_path = f"{activity_path}/id00{i}_cleaned_counts.out"
            activity.write_csv(subject_path, include_header=False, separator=' ')
        # Heart rate data
        hr_path = f"{data_set_path}/cleaned_heartrate"
        if not os.path.exists(hr_path):
            os.makedirs(hr_path)
        ## Irregular sampling rate
        hr_time = np.random.choice(accelerometer_time, len(activity_time), replace=False)
        for i in range(total_subjects):
            hr_data = np.random.randint(60, 120, len(activity_time))
            hr = pl.DataFrame({
                'timestamp': hr_time,
                'hr': hr_data,
            })
            subject_path = f"{hr_path}/id00{i}_cleaned_hr.out"
            hr.write_csv(subject_path, include_header=False, separator=' ')
        # PSG data
        psg_path = f"{data_set_path}/cleaned_psg"
        if not os.path.exists(psg_path):
            os.makedirs(psg_path)
        psg_time = np.arange(0, total_time_hrs * 3600, psg_dt_seconds)
        for i in range(total_subjects):
            psg_data = np.random.randint(-1, 5, len(psg_time))
            psg = pl.DataFrame({
                'timestamp': psg_time,
                'stage': psg_data,
            })
            subject_path = f"{psg_path}/id00{i}_cleaned_psg.out"
            psg.write_csv(subject_path, include_header=False, separator=' ')