import os
import math
import h5py
import pickle
import logging
import datetime
import xlsxwriter
import numpy as np
import scipy.sparse as sp
from .home import CASASHome
from .stat_features import EventHour, EventSeconds, LastSensor, WindowDuration, \
SensorCount, DominantSensor, SensorElapseTime
logger = logging.getLogger(__name__)
[docs]class CASASData(object):
r"""A class to load activity data from CASAS smart home datasets.
The class load raw activity sensor events from CASAS smart home datasets. The class provides methods to
pre-process the data for future learning algorithms for activity recognition. The pre-processed data can
be exported to xlsx files for verification, and hdf5 file for faster read and search when evaluating a
activity recognition algorithm.
Args:
path (:obj:`str`): path to a dataset directory, the dataset event.rst file for dataset in legacy format.
Attributes:
sensor_list (:obj:`dict`): A dictionary containing sensor information.
activity_list (:obj:`dict`): A dictionary containing activity information.
event_list (:obj:`list` of :obj:`dict`): List of data used to store raw events.
x (:obj:`numpy.ndarray`): 2D numpy array that contains calculated feature data.
y (:obj:`numpy.ndarray`): 2D numpy array that contains activity label corresponding to feature array
data_path (:obj:`str`): path to data file.
home (:class:`pyActLearn.CASAS.home.CASASHome`): :class:`CASAS.home.CASASHome` object that stores
the home information associated with the dataset.
is_legacy (:obj:`bool`): Defaults to False. If the dataset loaded is in legacy format or not.
is_stat_feature (:obj:`bool`): Calculate statistical features or use raw data in ``x``
is_labeled (:obj:`bool`): If given dataset is labeled
time_list (:obj:`list` of :class:`datetime.datetime`): Datetime of each entry in ``x``. Used for back
annotation, and splitting dataset by weeks or days.
feature_list (:obj:`dict`): A dictionary of statistical features used in statistical feature calculation
routines (:obj:`dict`): Function routines that needs to run every time when calculating features.
Excluded from pickling.
num_enabled_features (:obj:`int`): Number of enabled features.
num_static_features (:obj:`int`): Number of features related to window
num_per_sensor_features (:obj:`int`): Number of features that needs to be calculated per enabled sensor
events_in_window (:obj:`int`): Number of sensor events (or statistical features of a sliding window)
grouped in a feature vector.
"""
def __init__(self, path):
path = os.path.abspath(os.path.expanduser(path))
if not os.path.exists(path):
logger.error('Cannot find %s' % path)
raise FileNotFoundError('Cannot find %s' % path)
# Initialize Default Values
self.x = None
self.y = None
self.is_labeled = True
self.activity_list = {}
self.sensor_list = {}
self.event_list = []
self.events_in_window = 1
self.time_list = []
# Statistical Features and flag
self.is_stat_feature = False
self.max_window_size = 30
self.feature_list = {}
self.routines = {}
self.num_feature_columns = 0
self.num_static_features = 0
self.num_per_sensor_features = 0
# From which source to construct CASAS data
if os.path.isdir(path):
logger.debug('Load CASAS data from directory %s' % path)
self.home = CASASHome(directory=path)
self.is_legacy = False
self.data_path = path
# Populate sensor list, activity list with data from self.home
for sensor in self.home.get_all_sensors():
self._add_sensor(sensor)
for activity in self.home.get_all_activities():
self._add_activity(activity)
# Load Events
logger.debug('Load CASAS sensor events from %s' % self.data_path)
self._load_events_from_dataset(os.path.join(path, './events.csv'))
else:
filename, file_ext = os.path.splitext(path)
if file_ext == '.pkl':
# A pickle file - unpickle it - but if this is the case, user can directly
# get the class from pickle.load function
logger.debug('Load from pickle file %s' % path)
else:
self.home = None
self.is_legacy = True
self.data_path = ""
[docs] def populate_feature(self, method='raw', normalized=True, per_sensor=True):
"""Populate the feature vector in ``x`` and activities in `y`
Args:
method (:obj:`str`): The method to convert sensor events into feature vector.
Available methods are ``'raw'`` and ``'stat'``.
normalized (:obj:`bool`): Will each feature be normalized between 0 and 1?
per_sensor (:obj:`bool`): For features related with sensor ID, are they
"""
if method == 'raw':
self._calculate_raw_features(normalized, per_sensor)
else:
self._add_feature(EventHour(normalized=normalized))
self._add_feature(EventSeconds(normalized=normalized))
self._add_feature(LastSensor(per_sensor=per_sensor))
self._add_feature(WindowDuration(normalized=normalized))
self._add_feature(SensorCount(normalized=normalized))
self._add_feature(DominantSensor(per_sensor=per_sensor))
self._add_feature(SensorElapseTime(normalized=normalized))
self._calculate_stat_features()
# region PublicActivityRoutines
[docs] def get_activities_by_indices(self, activity_ids):
"""Get a group of activities by their corresponding indices
Args:
activity_ids (:obj:`list` of :obj:`int`): A list of activity indices
Returns:
:obj:`list` of :obj:`str`: A list of activity labels in the same order
"""
return [self.get_activity_by_index(cur_id) for cur_id in activity_ids]
[docs] def get_activity_by_index(self, activity_id):
"""Get Activity name by their index
Args:
activity_id (:obj:`int`): Activity index
Returns:
:obj:`str`: Activity label
"""
for activity_label in self.activity_list.keys():
if activity_id == self.activity_list[activity_label]['index']:
return activity_label
logger.error('Failed to find activity with index %d' % activity_id)
return ""
[docs] def get_activity_index(self, activity_label):
"""Get Index of an activity
Args:
activity_label (:obj:`str`): Activity label
Returns:
:obj:`int`: Activity index (-1 if not found or not enabled)
"""
if activity_label in self.activity_list:
return self.activity_list[activity_label]['index']
else:
return -1
[docs] def get_enabled_activities(self):
"""Get label list of all enabled activities
Returns:
:obj:`list` of :obj:`str`: list of activity labels
"""
enabled_activities_list = []
for activity_label in self.activity_list.keys():
if self.activity_list[activity_label]['enable']:
enabled_activities_list.append(activity_label)
return enabled_activities_list
[docs] def get_activity_color(self, activity_label):
"""Find the color string for the activity.
Args:
activity_label (:obj:`str`): activity label
Returns:
:obj:`str`: RGB color string
"""
if self.is_legacy:
# Pick the color from color list based on the activity index
activity_index = self.get_activity_index(activity_label)
if activity_index >= 0:
return self._COLORS[activity_index % len(self._COLORS)]
else:
return '#C8C8C8' # returns grey
else:
return self.home.get_activity_color(activity_label)
[docs] def enable_activity(self, activity_label):
"""Enable an activity
Args:
activity_label (:obj:`str`): Activity label
Returns:
:obj:`int`: The index of the enabled activity
"""
if activity_label in self.activity_list:
logger.debug('Enable Activity %s' % activity_label)
self.activity_list[activity_label]['enable'] = True
self._assign_activity_indices()
return self.activity_list[activity_label]['index']
else:
logger.error('Activity %s not found' % activity_label)
return -1
[docs] def disable_activity(self, activity_label):
"""Disable an activity
Args:
activity_label (:obj:`str`): Activity label
"""
if activity_label in self.activity_list:
logger.debug('Disable Activity %s' % activity_label)
self.activity_list[activity_label]['enable'] = False
self.activity_list[activity_label]['index'] = -1
self._assign_activity_indices()
else:
logger.error('Activity %s not found' % activity_label)
# endregion
# region PublicSensorRoutines
[docs] def enable_sensor(self, sensor_name):
"""Enable a sensor
Args:
sensor_name (:obj:`str`): Sensor Name
Returns
:obj:`int`: The index of the enabled sensor
"""
if sensor_name in self.sensor_list:
logger.debug('Enable Sensor %s' % sensor_name)
self.sensor_list[sensor_name]['enable'] = True
self._assign_sensor_indices()
return self.sensor_list[sensor_name]['index']
else:
logger.error('Failed to find sensor %s' % sensor_name)
return -1
[docs] def disable_sensor(self, sensor_name):
"""Disable a sensor
Args:
sensor_name (:obj:`str`): Sensor Name
"""
if sensor_name in self.sensor_list:
logger.debug('Disable Sensor %s' % sensor_name)
self.sensor_list[sensor_name]['enable'] = False
self.sensor_list[sensor_name]['index'] = -1
self._assign_sensor_indices()
else:
logger.error('Failed to find sensor %s' % sensor_name)
[docs] def get_sensor_by_index(self, sensor_id):
"""Get the name of sensor by index
Args:
sensor_id (:obj:`int`): Sensor index
Returns:
:obj:`str`: Sensor name
"""
for sensor_name in self.sensor_list.keys():
if self.sensor_list[sensor_name]['index'] == sensor_id:
return sensor_name
logger.error('Failed to find sensor with index %d' % sensor_id)
return ''
[docs] def get_sensor_index(self, sensor_name):
"""Get Sensor Index
Args:
sensor_name (:obj:`str`): Sensor Name
Returns:
:obj:`int`: Sensor index (-1 if not found or not enabled)
"""
if sensor_name in self.sensor_list:
return self.sensor_list[sensor_name]['index']
else:
return -1
[docs] def get_enabled_sensors(self):
"""Get the names of all enabled sensors
Returns:
:obj:`list` of :obj:`str`: List of sensor names
"""
enabled_sensor_array = []
for sensor_label in self.sensor_list.keys():
if self.sensor_list[sensor_label]['enable']:
enabled_sensor_array.append(sensor_label)
return enabled_sensor_array
# endregion
# region PublicFeatureRoutines
[docs] def get_feature_by_index(self, index):
"""Get Feature Name by Index
Args:
index (:obj:`int`): column index of feature
Returns:
:obj:`tuple` of :obj:`str`: (feature name, sensor name) tuple.
If it is not per-sensor feature, the sensor name is None.
"""
max_id = self.num_feature_columns
num_enabled_sensors = len(self.get_enabled_sensors())
if index > max_id:
logger.error('index %d is greater than the number of feature columns %d' %
(index, max_id))
if index >= self.num_static_features:
# It is per_sensor Feature
sensor_id = (index - self.num_static_features) % num_enabled_sensors
feature_id = math.floor((index - self.num_static_features) / num_enabled_sensors)
per_sensor = True
else:
# It is a generic feature
sensor_id = -1
feature_id = index
per_sensor = False
# Find Corresponding feature name and sensor label
feature_name = None
for featureLabel in self.feature_list.keys():
feature = self.feature_list[featureLabel]
if feature.index == feature_id and feature.per_sensor == per_sensor:
feature_name = featureLabel
break
sensor_name = 'Window'
if sensor_id >= 0:
for sensor_label in self.sensor_list.keys():
sensor = self.sensor_list[sensor_label]
if sensor['index'] == sensor_id:
sensor_name = sensor_label
break
return feature_name, sensor_name
[docs] def get_feature_string_by_index(self, index):
"""Get the string describing the feature specified by column index
Args:
index (:obj:`int`): column index of feature
Returns:
:obj:`str`: Feature string
"""
# Check if it is a statistical feature
if self.is_stat_feature:
# It is stat feature
feature_name, sensor_name = self.get_feature_by_index(index)
if feature_name is None or sensor_name is None:
logger.error('Failed to find feature/sensor name for feature %d - got (%s/%s)' %
(index, str(feature_name), str(sensor_name)))
return 'None'
else:
return sensor_name + ": " + feature_name
else:
# It is a windowed event.rst feature
if self.x.shape[1] == 2 * self.events_in_window:
# Sensor ID is presented as integer
entry_num = int(index / 2)
index_in_entry = index % 2
if index_in_entry == 0:
return "-%d Entry: Time" % entry_num
else:
return "-%d Entry: Sensor ID" % entry_num
else:
# Sensor ID is presented as a binary array
num_sensors = len(self.get_enabled_sensors())
entry_num = int(index / (num_sensors + 1))
index_in_entry = int(index % (num_sensors + 1))
if index_in_entry == 0:
return "-%d Entry: Time" % entry_num
else:
return "-%d Entry: %s" % (entry_num, self.get_sensor_by_index(index_in_entry - 1))
# endregion
# region PickleState
def __getstate__(self):
"""Save x as sparse matrix if the density of x is smaller than 0.5
"""
state = self.__dict__.copy()
if self.x is not None:
density_count = np.count_nonzero(self.x)
density = float(density_count) / self.x.size
if density < 0.5:
state['x'] = sp.csr_matrix(state['x'])
return self.__dict__
def __setstate__(self, state):
"""Set state from pickled file
"""
if sp.issparse(state['x']):
state['x'] = state['x'].todense()
self.__dict__.update(state)
# endregion
# region LoadFromFile
def _load_events_from_legacy(self, filename):
"""Load CASAS data from annotated event.rst logs
It loads sensor event.rst logs from legacy event.rst log txt file, and populate :obj:`event_list`.
As legacy file does not come with information regarding the smart home, the procedure also adds
populates :obj:`self.activity_list` and :obj:`self.sensor_list` as well.
.. note::
This is a internal function that is not recommended for user to call directly.
Args:
filename (:obj:`str`): absolute path to file
"""
self.event_list = []
if os.path.isfile(filename):
self.data_path = filename
f = open(filename, 'r')
line_number = 0
for line in f:
line_number += 1
word_list = str(str(line).strip()).split()
if len(word_list) > 3:
# date, time, sensor ID, sensor status, annotated label
date_list = word_list[0].split('-')
time_list = word_list[1].split(':')
sec_list = time_list[2].split('.')
event_time = datetime.datetime(int(date_list[0]),
int(date_list[1]),
int(date_list[2]),
int(time_list[0]),
int(time_list[1]),
int(sec_list[0]),
int(sec_list[1]))
cur_data_dict = {
'datetime': event_time,
'sensor_id': word_list[2],
'sensor_status': word_list[3],
}
self._add_sensor(cur_data_dict['sensor_id'])
self.is_labeled = False
if len(word_list) > 4:
self.is_labeled = True
# Add Corresponding Labels
cur_data_dict['activity'] = word_list[4]
self._add_activity(cur_data_dict['activity'])
self.event_list.append(cur_data_dict)
else:
logger.error('Error parsing %s:%d' % (filename, line_number))
logger.error(' %s' % line)
else:
raise FileNotFoundError('Cannot find file %s' % filename)
def _load_events_from_dataset(self, filename):
"""Load events from CASAS event.rst list in csv format
It loads sensor event.rst logs from legacy event.rst log txt file, and populate :obj:`event_list`.
.. note::
This is a internal function that is not recommended for user to call directly.
Args:
filename (:obj:`str`): path to ``event.rst.csv`` file in the dataset
"""
self.event_list = []
self.is_labeled = False
sensors_notfound_list = {}
sensor_list = self.home.get_all_sensors()
if os.path.isfile(filename):
f = open(filename, 'r')
line_number = 0
for line in f:
line_number += 1
word_list = str(str(line).strip()).split(',')
if len(word_list) < 6:
logger.error('Error parsing %s:%d' % (filename, line_number))
logger.error(' %s' % line)
continue
# date, time, sensor ID, sensor status, annotated label
if '/' in word_list[0]:
time_fmt = "%m/%d/%Y"
else:
time_fmt = "%Y-%m-%d"
time_fmt += " %H:%M:%S"
if word_list[1][-1].upper() == 'M':
time_fmt += " %p"
event_time = datetime.datetime.strptime(word_list[0] + ' ' + word_list[1], time_fmt)
# Remove OFF - no use
if word_list[3] == "OFF":
continue
# Sensor Not Found
if word_list[2] not in sensor_list:
if word_list[2] not in sensors_notfound_list:
sensors_notfound_list[word_list[2]] = 1
logger.warning('Sensor name %s not found in home metadata' % word_list[2])
sensors_notfound_list[word_list[2]] += 1
continue
# Remove Continuous Firing
# if len(self.event_list) > 0 and
# word_list[2] == self.event_list[len(self.event_list) - 1]['sensor_id']:
# continue
cur_data_dict = {
'datetime': event_time,
'sensor_id': word_list[2],
'sensor_status': word_list[3],
'resident_name': word_list[4],
'activity': word_list[5]
}
if len(word_list[5]) > 0:
self.is_labeled = True
if not cur_data_dict['activity'] in self.activity_list:
logger.warning('Activity %s not found in activity list. Added it now.'
% cur_data_dict['activity'])
self._add_activity(cur_data_dict['activity'])
# Add Corresponding Labels
self.event_list.append(cur_data_dict)
else:
logger.error('Cannot find data file %s\n' % filename)
# endregion
# region FeatureCalculation
def _calculate_raw_features(self, normalized=True, per_sensor=True):
"""Populate the feature vector with raw sensor data
Args:
normalized (:obj:`bool`): Will each feature be normalized between 0 and 1?
per_sensor (:obj:`bool`): For features related with sensor ID, are they
"""
num_events = len(self.event_list)
events_in_window = self.events_in_window
self.y = np.zeros((num_events - events_in_window + 1,))
self.time_list = []
if per_sensor:
len_per_event = 1 + len(self.get_enabled_sensors())
else:
len_per_event = 2
num_col = len_per_event * events_in_window
self.x = np.zeros((num_events - events_in_window + 1, num_col))
for i in range(num_events - events_in_window + 1):
self.y[i] = self.get_activity_index(self.event_list[i + events_in_window - 1]['activity'])
for j in range(events_in_window):
# Datetime is represented in seconds
event_time = self.event_list[i + events_in_window - 1 - j]['datetime']
seconds = event_time.timestamp() - \
datetime.datetime.combine(event_time.date(), datetime.time.min).timestamp()
if normalized:
self.x[i, j*len_per_event] = seconds/(24*3600)
else:
self.x[i, j*len_per_event] = seconds
# Sensor id
sensor_index = self.get_sensor_index(self.event_list[i + events_in_window - 1 - j]['sensor_id'])
if per_sensor:
self.x[i, j * len_per_event + sensor_index + 1] = 1
else:
self.x[i, j * len_per_event + 1] = sensor_index
self.time_list.append(self.event_list[i + events_in_window - 1]['datetime'])
return num_events
def _calculate_stat_features(self):
"""Populate the feature vector with statistical features using sliding window
"""
num_feature_columns = self._count_feature_columns()
num_feature_rows = self._count_samples()
self.x = np.zeros((num_feature_rows, num_feature_columns), dtype=np.float)
self.y = np.zeros(num_feature_rows, dtype=np.int)
cur_row_id = self.max_window_size - 1
cur_sample_id = 0
# Execute feature update routine
for (key, routine) in self.routines.items():
if routine.enabled:
routine.clear()
while cur_row_id < len(self.event_list):
cur_sample_id += self._calculate_window_feature(cur_row_id, cur_sample_id)
cur_row_id += 1
# Due to sensor event.rst discontinuity, the sample size will be smaller than the num_feature_rows calculated
self.x = self.x[0:cur_sample_id, :]
self.y = self.y[0:cur_sample_id]
self.is_stat_feature = True
logger.debug('Total amount of feature vectors calculated: %d' % cur_sample_id)
def _count_samples(self):
"""Count the maximum possible samples in data_list
"""
num_events = len(self.event_list)
if num_events < self.max_window_size - 1:
logger.error('data size is %d smaller than window size %d' %
(len(self.event_list), self.max_window_size))
return 0
num_sample = 0
if self.is_labeled:
# If labeled, count enabled activity entry after the first
# max_window_size event.rst
for event in self.event_list:
if num_sample < self.max_window_size + self.events_in_window - 2:
num_sample += 1
else:
""" ToDo: Need to check sensor enable status to make count sample count """
if self.activity_list[event['activity']]['enable']:
num_sample += 1
num_sample -= self.max_window_size + self.events_in_window - 2
else:
# If not labeled, we need to calculate for each window
# and finally find which catalog it belongs to
num_sample = num_events - self.max_window_size - self.events_in_window + 2
return num_sample
def _calculate_window_feature(self, cur_row_id, cur_sample_id):
"""Calculate feature vector for current window specified by cur_row_id
Args:
cur_row_id (:obj:`int`): Row index of current window (last row)
cur_sample_id (:obj:`int`): Row index of current sample in self.x
Returns:
:obj:`int`: number of feature vector added
"""
# Default Window Size to 30
window_size = self.max_window_size
num_enabled_sensors = len(self.get_enabled_sensors())
# Skip current window if labeled activity is ignored
if self.is_labeled:
activity_label = self.event_list[cur_row_id]['activity']
window_size = self.activity_list[activity_label]['window_size']
if not self.activity_list[activity_label]['enable']:
return 0
if cur_row_id > self.max_window_size - 1:
if cur_sample_id == 0:
for i in range(self.num_feature_columns * (self.events_in_window - 1)):
self.x[cur_sample_id][self.num_feature_columns * self.events_in_window - i - 1] = \
self.x[cur_sample_id][self.num_feature_columns * (self.events_in_window - 1) - i - 1]
else:
for i in range(self.num_feature_columns * (self.events_in_window - 1)):
self.x[cur_sample_id][self.num_feature_columns * self.events_in_window - i - 1] = \
self.x[cur_sample_id-1][self.num_feature_columns * (self.events_in_window - 1) - i - 1]
# Execute feature update routine
for (key, routine) in self.routines.items():
if routine.enabled:
routine.update(data_list=self.event_list, cur_index=cur_row_id,
window_size=window_size, sensor_info=self.sensor_list)
# Get Feature Data and Put into arFeature array
for (key, feature) in self.feature_list.items():
if feature.enabled:
# If it is per Sensor index, we need to iterate through all sensors to calculate
if feature.per_sensor:
for sensor_name in self.sensor_list.keys():
if self.sensor_list[sensor_name]['enable']:
column_index = self.num_static_features + \
feature.index * num_enabled_sensors + \
self.sensor_list[sensor_name]['index']
self.x[cur_sample_id][column_index] = \
feature.get_feature_value(data_list=self.event_list,
cur_index=cur_row_id,
window_size=window_size,
sensor_info=self.sensor_list,
sensor_name=sensor_name)
else:
self.x[cur_sample_id][feature.index] = \
feature.get_feature_value(data_list=self.event_list,
cur_index=cur_row_id,
window_size=window_size,
sensor_info=self.sensor_list,
sensor_name=None)
if not feature.is_value_valid:
return 0
if cur_row_id < self.max_window_size + self.events_in_window - 2:
return 0
if self.is_labeled:
self.y[cur_sample_id] = self.activity_list[self.event_list[cur_row_id]['activity']]['index']
self.time_list.append(self.event_list[cur_row_id]['datetime'])
return 1
# endregion
# region ExportToIntermediateFiles
[docs] def export_fuel(self, directory, break_by='week', comments=''):
"""Export feature and label vector into hdf5 file and store the class information in a pickle file
Args:
directory (:obj:`str`): The directory to save hdf5 and complementary dataset information
break_by (:obj:`str`): Select the way to split the data, either by ``'week'`` or ``'day'``
comments (:obj:`str`): Additional comments to add
"""
try:
from fuel.datasets.hdf5 import H5PYDataset
except ImportError:
logger.error('Failed to import H5PYDataset from fuel.')
return
if os.path.exists(directory):
if os.path.isdir(directory):
overwrite = ' '
while overwrite not in ['n', 'N', 'y', 'Y']:
# ask if overwrite
overwrite = input('Directory %s found. Overwrite? [Y/n] ' % directory)
if overwrite == 'n' or overwrite == 'N':
return
elif overwrite == '':
break
else:
logger.error('%s is not a directory. Abort.')
return
else:
os.makedirs(directory)
# Create HDF5 File
f = h5py.File(directory + '/data.hdf5', mode='w')
# Create features and targets array
features = f.create_dataset('features', self.x.shape, dtype='float32')
targets = f.create_dataset('targets', (self.y.shape[0], 1), dtype='uint8')
features[...] = self.x
targets[...] = self.y.reshape((self.y.shape[0], 1))
features.dims[0].label = 'batch'
features.dims[1].label = 'feature'
targets.dims[0].label = 'batch'
targets.dims[1].label = 'index'
# Find Split Locations
if break_by == 'day':
break_list = self._break_by_day()
else:
break_list = self._break_by_week()
# Construct split dict
split_dict = {}
split_set = []
split_timearray = []
num_break_point = len(break_list) - 1
for i in range(num_break_point):
start = break_list[i]
stop = break_list[i + 1]
split_name = break_by + ' ' + str(i)
split_dict[split_name] = {
'features': (start, stop),
'targets': (start, stop)
}
split_set.append(split_name)
split_timearray.append(self.time_list[start:stop])
f.attrs['split'] = H5PYDataset.create_split_array(split_dict=split_dict)
# Save to file
f.flush()
f.close()
# Save Complementary Information
f = open(directory + '/info.pkl', 'wb')
dataset_info = {
'index_to_activity': {i: self.get_activity_by_index(i) for i in range(len(self.get_enabled_activities()))},
'index_to_feature': {i: self.get_feature_string_by_index(i) for i in range(self.x.shape[1])},
'activity_info': self.activity_list,
'sensor_info': self.sensor_list,
'split_sets': split_set,
'split_timearray': split_timearray,
'comments': comments
}
pickle.dump(dataset_info, f, pickle.HIGHEST_PROTOCOL)
f.close()
[docs] def export_hdf5(self, filename, comments='', bg_activity='Other_Activity', driver=None):
"""Export the dataset into a hdf5 dataset file with meta-data logged in attributes.
To load the data, you can use :class:`pyActLearn.CASAS.h5py.CASASH5PY` class.
Args:
filename (:obj:`str`): The directory to save hdf5 and complementary dataset information.
comments (:obj:`str`): Additional comments to add.
bg_activity (:obj:`str`): Background activity label.
driver (:obj:`str`): h5py dataset R/W driver.
"""
# Collect metadata
feature_description = [
self.get_feature_string_by_index(feature_id)
for feature_id in range(self.x.shape[1])
]
target_description = [
self.get_activity_by_index(activity_id)
for activity_id in range(len(self.get_enabled_activities()))
]
target_colors = [
self.get_activity_color(activity_name)
for activity_name in target_description
]
from .h5py import CASASH5PY
casas_hdf5 = CASASH5PY(filename, mode='w', driver=driver)
casas_hdf5.create_features(feature_array=self.x,
feature_description=feature_description)
casas_hdf5.create_targets(target_array=self.y,
target_description=target_description,
target_colors=target_colors)
casas_hdf5.create_time_list(time_array=self.time_list)
casas_hdf5.create_splits(days=self._break_by_day(), weeks=self._break_by_week())
casas_hdf5.create_comments(comments)
casas_hdf5.create_sensors(sensors=[self.get_sensor_by_index(i)
for i in range(len(self.get_enabled_sensors()))])
if bg_activity is not None:
casas_hdf5.set_background_target(bg_activity)
casas_hdf5.flush()
casas_hdf5.close()
[docs] def write_to_xlsx(self, filename, start=0, end=-1):
"""Write to file in xlsx format
Args:
filename (:obj:`str`): xlsx file name.
start (:obj:`int`): start index.
end (:obj:`int`): end index.
"""
workbook = xlsxwriter.Workbook(filename)
# Dump Activities
activity_sheet = workbook.add_worksheet("Activities")
c = 0
for item in self.activity_list[list(self.activity_list.keys())[0]].keys():
activity_sheet.write(0, c, str(item))
c += 1
r = 1
for activity in self.activity_list.keys():
c = 0
for item in self.activity_list[activity].keys():
activity_sheet.write(r, c, str(self.activity_list[activity][item]))
c += 1
r += 1
# Dump Sensors
sensor_sheet = workbook.add_worksheet("Sensors")
c = 0
for item in self.sensor_list[list(self.sensor_list.keys())[0]].keys():
sensor_sheet.write(0, c, str(item))
c += 1
r = 1
for sensor in self.sensor_list.keys():
c = 0
for item in self.sensor_list[sensor].keys():
sensor_sheet.write(r, c, str(self.sensor_list[sensor][item]))
c += 1
r += 1
# Dump Calculated Features
if self.is_stat_feature:
# Feature Description Sheet
feature_sheet = workbook.add_worksheet('Features')
feature_list_title = ['name', 'index', 'enabled', 'per_sensor', 'description', 'routine']
for c in range(0, len(feature_list_title)):
feature_sheet.write(0, c, str(feature_list_title[c]))
r = 1
for feature in self.feature_list:
feature_sheet.write(r, 0, str(self.feature_list[feature].name))
feature_sheet.write(r, 1, str(self.feature_list[feature].index))
feature_sheet.write(r, 2, str(self.feature_list[feature].enabled))
feature_sheet.write(r, 3, str(self.feature_list[feature].per_sensor))
feature_sheet.write(r, 4, str(self.feature_list[feature].description))
if self.feature_list[feature].routine is None:
feature_sheet.write(r, 5, 'None')
else:
feature_sheet.write(r, 5, str(self.feature_list[feature].routine.name))
r += 1
# Dump Events
if len(self.event_list) != 0:
event_sheet = workbook.add_worksheet('Events')
c = 0
for item in self.event_list[0].keys():
event_sheet.write(0, c, str(item))
c += 1
r = 1
for event in self.event_list[0:100]:
c = 0
for item in event.keys():
event_sheet.write(r, c, str(event[item]))
c += 1
r += 1
# Dump Data
if self.x is not None:
data_sheet = workbook.add_worksheet('Data')
# Export self.x feature
if self.is_stat_feature:
data_sheet.write(0, 0, 'activity')
# Calculate enabled sensor size
num_sensors = len(self.get_enabled_sensors())
# Add Feature Title
for feature_name in self.feature_list.keys():
if self.feature_list[feature_name].enabled:
if self.feature_list[feature_name].per_sensor:
# Calculate Start Position
start_col = self.num_static_features + \
self.feature_list[feature_name].index * num_sensors + 1
data_sheet.merge_range(0, start_col, 0, start_col + num_sensors - 1, feature_name)
else:
data_sheet.write(0, self.feature_list[feature_name].index + 1, feature_name)
for c in range(1, self.num_static_features + 1):
data_sheet.write(1, c, 'window')
for f in range(0, self.num_per_sensor_features):
for sensor in self.sensor_list.keys():
start_col = f * num_sensors + self.num_static_features + self.sensor_list[sensor]['index'] + 1
data_sheet.write(1, start_col, sensor)
# Add Data from Data Array
r = 2
(num_samples, num_features) = self.x.shape
if end == -1:
end = num_samples
if start < num_samples and start < end:
for i in range(start, end):
data_sheet.write(r, 0, str(self.y[i]))
c = 1
for item in self.x[i]:
data_sheet.write(r, c, str(item))
c += 1
r += 1
workbook.close()
# endregion
# region Summary
[docs] def summary(self):
"""Print summary of loaded datasets
"""
print('Dataset Path: %s' % self.data_path)
print('Sensors: %d' % len(self.sensor_list))
print('Sensors enabled: %d' % len(self.get_enabled_sensors()))
print('Activities: %d' % len(self.activity_list))
print('Activities enabled: %d' % len(self.get_enabled_activities()))
print('loaded events: %d' % len(self.event_list))
if self.x is not None:
print('feature array: (%d, %d)' % (self.x.shape[0], self.x.shape[1]))
print('activity array: (%d, )' % self.y.shape[0])
# endregion
_COLORS = ('#b20000, #56592d, #acdae6, #cc00be, #591616, #d5d9a3, '
'#007ae6, #4d0047, #a67c7c, #2f3326, #00294d, #b35995, '
'#ff9180, #1c330d, #73b0e6, #f2b6de, #592400, #6b994d, '
'#1d2873, #ff0088, #cc7033, #50e639, #0000ff, #7f0033, '
'#e6c3ac, #00d991, #c8bfff, #592d3e, #8c5e00, #80ffe5, '
'#646080, #d9003a, #332200, #397367, #6930bf, #33000e, '
'#ffbf40, #3dcef2, #1c0d33, #8c8300, #23778c, #ba79f2, '
'#e6f23d, #203940, #302633').split(',')
# region InternalActivityListManagement
def _add_activity(self, label):
"""Add activity to :attr:`activity_list`
Args:
label (:obj:`str`): activity label
Returns:
:obj:`int`: activity index
"""
if label not in self.activity_list:
logger.debug('add activity class %s' % label)
if self.is_legacy:
self.activity_list[label] = {'name': label}
else:
self.activity_list[label] = self.home.get_activity(label)
if self.activity_list[label] is None:
logger.warning('Failed to find information about activity %s' % label)
self.activity_list[label] = {'name': label}
self.activity_list[label]['index'] = -1
self.activity_list[label]['enable'] = True
self.activity_list[label]['window_size'] = 30
self._assign_activity_indices()
return self.activity_list[label]['index']
def _assign_activity_indices(self):
"""Assign index number to each activity enabled
Returns:
:obj:`int`: Number of enabled activities
"""
_enabled_activities_list = []
for label in self.activity_list.keys():
activity = self.activity_list[label]
if activity['enable']:
_enabled_activities_list.append(label)
else:
activity['index'] = -1
_enabled_activities_list.sort()
i = 0
for label in _enabled_activities_list:
self.activity_list[label]['index'] = i
i += 1
num_enabled_activities = len(_enabled_activities_list)
logger.debug('Finished assigning index to activities. %d Activities enabled' % num_enabled_activities)
return num_enabled_activities
# endregion
# region InternalSensorListManagement
def _add_sensor(self, name):
"""Add Sensor to :attr:`sensor_list`
Args:
name (:obj:`str`): sensor name
Returns:
(:obj:`int`): sensor index
"""
if name not in self.sensor_list:
logger.debug('Add sensor %s to sensor list' % name)
if self.is_legacy:
self.sensor_list[name] = {'name': name}
else:
self.sensor_list[name] = self.home.get_sensor(name)
if self.sensor_list[name] is None:
logger.error('Failed to find information about sensor %s' % name)
self.sensor_list[name] = {'name': name}
self.sensor_list[name]['index'] = -1
self.sensor_list[name]['enable'] = True
self.sensor_list[name]['lastFireTime'] = None
self._assign_sensor_indices()
return self.sensor_list[name]['index']
def _assign_sensor_indices(self):
"""Assign index to each enabled sensor
Returns
:obj:`int`: The number of enabled sensor
"""
sensor_id = 0
_enabled_sensor_list = []
for sensor_label in self.sensor_list.keys():
if self.sensor_list[sensor_label]['enable']:
_enabled_sensor_list.append(sensor_label)
else:
self.sensor_list[sensor_label]['index'] = -1
_enabled_sensor_list.sort()
for sensor_label in _enabled_sensor_list:
self.sensor_list[sensor_label]['index'] = sensor_id
sensor_id += 1
return sensor_id
# endregion
# region Stat Feature Routine Update Management
def _add_routine(self, routine):
"""Add routine to feature update routine list
Args:
routine (:class:`pyActLearn.CASAS.stat_features.FeatureRoutineTemplate`): routine to be added
"""
if routine.name in self.routines.keys():
logger.debug('feature routine %s already existed.' % routine.name)
else:
logger.debug('Add feature routine %s: %s' % (routine.name, routine.description))
self.routines[routine.name] = routine
[docs] def disable_routine(self, routine):
""" Disable a routine
Check all enabled feature list and see if the routine is used by other features.
If no feature need the routine, disable it
Args:
routine (:class:`pyActLearn.CASAS.stat_features.FeatureRoutineTemplate`): routine to be disabled
"""
if routine.name in self.routines.keys():
for feature_name in self.feature_list.keys():
if self.feature_list[feature_name].enabled:
if self.feature_list[feature_name].routine == routine:
logger.debug('routine %s is used by feature %s.' % (routine.name, feature_name))
return
logger.debug('routine %s is disabled.' % routine.name)
self.routines[routine.name].enabled = False
else:
logger.error('routine %s not added to routine list' % routine.name)
[docs] def enable_routine(self, routine):
"""Enable a given routine
Args:
routine (:class:`pyActLearn.CASAS.stat_features.FeatureRoutineTemplate`): routine to be disabled
"""
if routine.name in self.routines.keys():
logger.debug('routine %s is enabled.' % routine.name)
routine.enabled = True
else:
logger.error('routine %s not added to routine list' % routine.name)
# endregion
# region Stat Feature Management
def _add_feature(self, feature):
"""Add Feature to feature list
Args:
feature (:class:`pyActlearn.CASAS.stat_features`): FeatureTemplate Object
"""
if feature.name in self.feature_list.keys():
logger.warning('feature: %s already existed. Add Feature Function ignored.' % feature.name)
else:
logger.debug('Add Feature %s: %s' % (feature.name, feature.description))
self.feature_list[feature.name] = feature
if feature.routine is not None:
self._add_routine(feature.routine)
self._assign_feature_indexes()
[docs] def disable_feature(self, feature_name):
"""Disable a feature
Args:
feature_name (:obj:`str`): Feature name.
"""
if feature_name in self.feature_list.keys():
logger.debug('Disable Feature %s: %s' % (feature_name, self.feature_list[feature_name]['description']))
self.feature_list[feature_name].enabled = True
self.feature_list[feature_name].index = -1
self._assign_feature_indexes()
if self.feature_list[feature_name].routine is not None:
self.disable_routine(self.feature_list[feature_name].routine)
else:
logger.error('Feature %s Not Found' % feature_name)
[docs] def enable_feature(self, feature_name):
"""Enable a feature
Args:
feature_name (:obj:`str`): Feature name.
"""
if feature_name in self.feature_list.keys():
logger.debug('Enable Feature %s: %s' % (feature_name, self.feature_list[feature_name]['description']))
self.feature_list[feature_name].enabled = True
self._assign_feature_indexes()
if self.feature_list[feature_name].routine is not None:
self.enable_routine(self.feature_list[feature_name].routine)
else:
logger.error('Feature %s Not Found' % feature_name)
def _assign_feature_indexes(self):
"""Assign index to features
"""
static_id = 0
per_sensor_id = 0
for featureLabel in self.feature_list.keys():
feature = self.feature_list[featureLabel]
if feature.enabled:
if feature.per_sensor:
feature.index = per_sensor_id
per_sensor_id += 1
else:
feature.index = static_id
static_id += 1
else:
feature.index = -1
self.num_static_features = static_id
self.num_per_sensor_features = per_sensor_id
logger.debug('Finished assigning index to features. %d Static Features, %d Per Sensor Features' %
(static_id, per_sensor_id))
def _update_feature_count(self):
"""Update feature count values
"""
self.num_enabled_features = 0
self.num_static_features = 0
self.num_per_sensor_features = 0
for name, feature in self.feature_list.items():
if feature.enabled:
self.num_enabled_features += 1
if feature.per_sensor:
self.num_per_sensor_features += 1
else:
self.num_static_features += 1
def _count_feature_columns(self):
"""Count the size of feature columns
Returns:
:obj:`int`: size of feature columns
"""
self.num_feature_columns = 0
num_enabled_sensors = len(self.get_enabled_sensors())
for feature_name in self.feature_list.keys():
if self.feature_list[feature_name].enabled:
if self.feature_list[feature_name].per_sensor:
self.num_feature_columns += num_enabled_sensors
else:
self.num_feature_columns += 1
return self.num_feature_columns * self.events_in_window
# endregion
# region Segmentation
def _break_by_day(self):
"""Find the split point of the dataset by day
Returns:
:obj:`list` of :obj:`int`: List of indices of the event.rst at the beginning of each day
"""
day_index_list = [0]
start_date = self.time_list[0].date()
for i in range(len(self.time_list)):
cur_date = self.time_list[i].date()
if cur_date > start_date:
day_index_list.append(i)
start_date = cur_date
day_index_list.append(len(self.time_list))
return day_index_list
def _break_by_week(self):
"""Find the split point of the dataset by week
Returns:
:obj:`list` of :obj:`int`: List of indices of the event.rst at the beginning of each week
"""
week_index_list = [0]
start_date = self.time_list[0].date()
for i in range(len(self.time_list)):
cur_date = self.time_list[i].date()
# Monday - then not the same day as start_date
# Else, if more than 7 days apart
if (cur_date.weekday() == 0 and cur_date > start_date) or (cur_date - start_date).days >= 7:
week_index_list.append(i)
start_date = cur_date
week_index_list.append(len(self.time_list))
return week_index_list
# endregion