import h5py
import logging
import dateutil.parser
import numpy as np
from collections import OrderedDict
logger = logging.getLogger(__name__)
[docs]class CASASHDF5:
"""CASASHDF5 Class to create and retrieve CASAS smart home data from h5df file
The data saved to or retrieved from a H5PY data file are pre-calculated features by
:class:`CASASData` class. The H5PY data file also contains meta-data about the
dataset, which include description for each feature, splits by week and/or splits
by days.
Attributes:
_file (:class:`h5py.File`): :class:`h5py.File` object that represents root group.
Args:
filename (:obj:`str`): HDF5 File Name
mode (:obj:`str`): 'r' for load from the file, and 'w' for create a new h5py data
"""
def __init__(self, filename, mode='r', driver=None):
self._file = h5py.File(filename, mode=mode, driver=driver)
if mode == 'w':
self._sources = []
self._weeks = OrderedDict()
self._days = OrderedDict()
self._feature_description = []
self._target_description = []
self._target_colors = []
self._sensors = []
self._comment = ''
self._bg_target = ''
elif mode == 'r':
self._load_dataset_info()
else:
raise ValueError('mode should be \'w\' or \'r\', but got %s.' % mode)
[docs] def fetch_data(self, start_split=None, stop_split=None, pre_load=0):
"""Fetch data between start and stop splits
Args:
start_split (:obj:`str`): Begin of data
stop_split (:obj:`str`): End of data
pre_load (:obj:`int`): Load extra number of data before start split.
Returns:
:obj:`tuple` of :obj:`numpy.ndarray`: Returns a tuple of all sources sliced by the split defined.
The sources should be in the order of ('time', 'feature', 'target')
"""
start, stop = self._get_split_range(start_split, stop_split, pre_load)
# Get time into a array of datetime
if 'time' in self._sources:
time_list = [dateutil.parser.parse(date_string.decode('utf-8'))
for date_string in self._file['time'][start:stop]]
else:
time_list = None
# Get feature array
if 'features' in self._sources:
features = self._file['features'][start:stop]
else:
features = None
# Get label array
if 'targets' in self._sources:
targets = self._file['targets'][start:stop]
else:
targets = None
return time_list, features, targets
# region Metadata Auxiliary Functions
[docs] def num_sensors(self):
"""Return the number of sensors in the sensor list
"""
return len(self._sensors)
[docs] def get_sensor_by_index(self, i):
"""Get sensor name by index
Args:
i (:obj:`int`): Index to sensor
"""
return self._sensors[i]
[docs] def num_features(self):
"""Get number of features in the dataset
"""
return len(self._feature_description)
[docs] def get_feature_description_by_index(self, i):
"""Get the description of feature column :math:`i`.
Args:
i (:obj:`int`): Column index.
Returns:
:obj:`str`: Corresponding column description.
"""
return self._feature_description[i]
[docs] def num_targets(self):
"""Total number of target classes.
Returns:
:obj:`int`: Total number of target classes.
"""
return len(self._target_description)
[docs] def get_target_descriptions(self):
"""Get list of target descriptions
Returns:
:obj:`list` of :obj:`str`: List of target class description strings.
"""
return self._target_description
[docs] def get_target_description_by_index(self, i):
"""Get target description by class index :math:`i`.
Args:
i (:obj:`int`): Class index.
Returns:
:obj:`str`: Corresponding target class description.
"""
return self._target_description[i]
def get_target_colors(self):
return self._target_colors
[docs] def get_target_color_by_index(self, i):
"""Get the color string of target class :math:`i`.
Args:
i (:obj:`int`): Class index.
Returns:
:obj:`str`: Corresponding target class color string.
"""
return self._target_colors[i]
[docs] def is_bg_target(self, i=None, label=None):
"""Check if the target class given by :param:`i` or :param:`label` is considered background
Args:
i (:obj:`int`): Class index.
label (:obj:`str`): Class name.
Returns:
:obj:`bool`: True if it is considered background.
"""
if i is not None:
return i == self._target_description.index(self._bg_target)
if label is not None:
return label == self._bg_target
return False
[docs] def get_bg_target(self):
"""Get the description of the target class considered background in the dataset.
Returns:
:obj:`str`: Name of the class which is considered background in the dataset. Usually it is 'Other_Activity'.
"""
return self._bg_target
[docs] def get_bg_target_id(self):
"""Get the id of the target class considered background.
Returns:
:obj:`int`: The index of the target class which is considered background in the dataset.
"""
return self._target_description.index(self._bg_target)
[docs] def num_between_splits(self, start_split=None, stop_split=None):
"""Get the number of item between splits
Args:
start_split (:obj:`str`): Begin of data
stop_split (:obj:`str`): End of data
Returns:
:obj:`int`: The number of items between two splits.
"""
start, stop = self._get_split_range(start_split, stop_split)
return stop - start
[docs] def get_weeks_info(self):
"""Get splits by week.
Returns:
:obj:`List` of :obj:`tuple`: List of (key, value) tuple, where key is the name of the split and value is
number of items in that split.
"""
return [(week, self._weeks[week][1] - self._weeks[week][0]) for week in self._weeks]
[docs] def get_days_info(self):
"""Get splits by day.
Returns:
:obj:`List` of :obj:`tuple`: List of (key, value) tuple, where key is the name of the split and value is
number of items in that split.
"""
return [(day, self._days[day][1] - self._days[day][0]) for day in self._days]
# endregion
# region CASASH5PY Dataset Creation
[docs] def create_features(self, feature_array, feature_description):
""" Create Feature Dataset
Args:
feature_array (:obj:`numpy.ndarray`): Numpy array holding calculated feature vectors
feature_description (:obj:`list` of :obj:`str`): List of strings that describe each column of
feature vectors.
"""
if 'features' in self._sources:
logger.error('Feature array already exists in the dataset.')
return
self._sources.append('features')
self._feature_description = feature_description
# Create feature array
dset = self._file.create_dataset('features', data=feature_array,
chuncks=True, compression="gzip", compression_opts=9)
dset.dims[0].label = 'batch'
dset.dims[1].label = 'feature'
# Add Feature Description as attributes
self._file.attrs['features'] = [description.encode('utf-8')
for description in feature_description]
[docs] def create_targets(self, target_array, target_description, target_colors):
""" Create Target Dataset
Args:
target_array (:obj:`numpy.ndarray`): Numpy array holding target labels
target_description (:obj:`list` of :obj:`str`): List of strings that describe each each target class.
target_colors (:obj:`list` of :obj:`str`): List of color values corresponding to each target class.
"""
if 'targets' in self._sources:
logger.error('Target array already exists in the dataset.')
return
self._sources.append('targets')
self._target_description = target_description
self._target_colors = target_colors
# Create feature array
dset = self._file.create_dataset('targets', data=target_array.reshape((target_array.size, 1)))
dset.dims[0].label = 'batch'
dset.dims[1].label = 'target'
# Add Target Description as attributes
self._file.attrs['targets'] = [description.encode('utf-8')
for description in target_description]
# Add Target Color as attributes
self._file.attrs['target_colors'] = [color_string.encode('utf-8')
for color_string in target_colors]
[docs] def create_time_list(self, time_array):
""" Create Time List
Args:
time_array (:obj:`list` of :obj:`datetime`): datetime corresponding to each feature vector in feature
dataset.
"""
if 'time' in self._sources:
logger.error('Time list already exists in the dataset.')
return
self._sources.append('time')
# Create Time lists
num_items = len(time_array)
dt = h5py.special_dtype(vlen=bytes)
dset = self._file.create_dataset('time', (num_items,), dtype=dt)
for i in range(num_items):
dset[i] = time_array[i].isoformat().encode('utf-8')
[docs] def create_splits(self, days, weeks):
""" Create splits by days and weeks
Args:
days (:obj:`list` of :obj:`int`): Start index for each day
weeks (:obj:`list` of :obj:`int`): Start index for week
"""
if len(self._days) != 0 or len(self._weeks) != 0:
logger.error('Splits already exist.')
return
self._days = OrderedDict()
self._weeks = OrderedDict()
max_name_len = len('week_%d' % len(self._days))
# Create days numpy array
days_array = np.empty(
len(days) - 1,
dtype=np.dtype([
('name', 'a', max_name_len),
('start', np.int64, 1),
('stop', np.int64, 1)]
))
# Create days numpy array
weeks_array = np.empty(
len(weeks) - 1,
dtype=np.dtype([
('name', 'a', max_name_len),
('start', np.int64, 1),
('stop', np.int64, 1)]
))
# Populate days_array
for i in range(len(days) - 1):
days_array[i]['name'] = ('day_%d' % i).encode('utf-8')
days_array[i]['start'] = days[i]
days_array[i]['stop'] = days[i+1]
days[('day_%d' % i)] = [days[i], days[i+1]]
# Populate weeks array
for i in range(len(weeks) - 1):
weeks_array[i]['name'] = ('week_%d' % i).encode('utf-8')
weeks_array[i]['start'] = weeks[i]
weeks_array[i]['stop'] = weeks[i+1]
weeks[('week_%d' % i)] = [weeks[i], weeks[i+1]]
# Set attributes
self._file.attrs['days'] = days_array
self._file.attrs['weeks'] = weeks_array
[docs] def create_sensors(self, sensors):
""" Add sensors list to attributes
If the sensor IDs in the dataset is not binary coded, there is a need to provide the sensor list to go along
with the feature vectors.
Args:
sensors (:obj:`list` of :obj:`str`): List of sensor name corresponds to the id in the feature array.
"""
self._file.attrs['sensors'] = [sensor.encode('utf-8') for sensor in sensors]
[docs] def set_background_target(self, target_name):
""" Set 'target_name' as background target
Args:
target_name (:obj:`str`): Name of background target
"""
if self._bg_target != '':
logger.error('background target label has been set to %s.' % self._bg_target)
return
self._bg_target = target_name
self._file.attrs['bg_target'] = target_name.encode('utf-8')
[docs] def flush(self):
""" Write To File
"""
self._file.attrs['sources'] = [source.encode('utf-8') for source in self._sources]
self._file.flush()
# endregion
[docs] def close(self):
""" Close Dataset
"""
self._file.close()
# region InternalSupportRoutines
def _get_split_range(self, start_split=None, stop_split=None, pre_load=0):
"""Get the requested splits range
Args:
start_split (:obj:`str`): Begin of data
stop_split (:obj:`str`): End of data
pre_load (:obj:`int`): Load extra number of data before start split.
Returns:
:obj:`tuple` of :obj:`int`: Returns a tuple of the start and stop index.
"""
# Determine the start index
if start_split is None:
start = 0
stop = self._file[self._sources[0]].shape[0]
elif start_split in self._weeks:
start = self._weeks[start_split][0]
stop = self._weeks[start_split][1]
elif start_split in self._days:
start = self._days[start_split][0]
stop = self._days[start_split][1]
else:
raise ValueError('start_split error: Cannot find %s in splitting array.' % start_split)
# Determine the stop index
if stop_split is not None:
if stop_split in self._weeks:
stop = self._weeks[stop_split][1]
elif stop_split in self._days:
stop = self._weeks[stop_split][1]
else:
raise ValueError('stop_split error: Cannot find %s in splitting array.' % stop_split)
# Compensate pre-load
start = start - pre_load
if start < 0:
start = 0
return start, stop
def _load_dataset_info(self):
"""Populate attributes of current class based on meta-data from h5py file
"""
attrs = self._file.attrs.keys()
# Check sources set
if 'sources' in attrs:
self._sources = [source.decode('utf-8') for source in self._file.attrs['sources']]
else:
self._sources = []
# Parse splits
self._weeks = OrderedDict()
self._days = OrderedDict()
if 'weeks' in attrs and 'days' in attrs:
for row in self._file.attrs['weeks']:
self._weeks[row['name'].decode('utf-8')] = [row['start'], row['stop']]
for row in self._file.attrs['days']:
self._days[row['name'].decode('utf-8')] = [row['start'], row['stop']]
# Meta-data about dataset
if 'features' in attrs:
self._feature_description = [description.decode('utf-8')
for description in self._file.attrs['features']]
else:
self._feature_description = []
if 'targets' in attrs:
self._target_description = [description.decode('utf-8')
for description in self._file.attrs['targets']]
else:
self._target_description = []
if 'target_colors' in attrs:
self._target_colors = [color_string.decode('utf-8')
for color_string in self._file.attrs['target_colors']]
else:
self._target_colors = []
if 'sensors' in attrs:
self._sensors = [sensor.decode('utf-8') for sensor in self._file.attrs['sensors']]
else:
self._sensors = []
# Load Comments and Background task
if 'bg_target' in attrs:
self._bg_target = self._file.attrs['bg_target'].decode('utf-8')
else:
self._bg_target = ''
if 'comment' in attrs:
self._comment = self._file.attrs['comment'].decode('utf-8')
else:
self._comment = ''
# endregion