# -*- coding: utf-8 -*-
"""
This module allows for the importing of participant data for use in fitting
:Author: Dominic Hunt
"""
import pickle
import scipy.io as io
import numpy as np
import pandas as pd
import os
import re
import collections
import utils
[docs]class LengthError(Exception):
pass
[docs]class IDError(Exception):
pass
[docs]class DimentionError(Exception):
pass
[docs]class FileTypeError(Exception):
pass
[docs]class FileError(Exception):
pass
[docs]class FoldersError(Exception):
pass
[docs]class ProcessingError(Exception):
pass
[docs]class FileFilterError(Exception):
pass
DATA_KEYWORDS = {"filename": "filename",
"ID": "participant_ID",
"folder": "folder"}
[docs]class Data(list):
[docs] @classmethod
def load_data(cls,
file_type='csv',
folders='./',
file_name_filter=None,
terminal_ID=True,
split_by=None,
participantID=None,
choices='actions',
feedbacks='feedbacks',
stimuli=None,
action_options=None,
group_by=None,
extra_processing=None,
data_read_options=None):
"""
Import data from a folder. This is a wrapper function for the other import methods
Parameters
----------
file_type : string, optional
The file type of the data, from ``mat``, ``csv``, ``xlsx`` and ``pkl``. Default is ``csv``
folders : string or list of strings, optional
The folder or folders where the data can be found. Default is the current folder.
file_name_filter : callable, string, list of strings or None, optional
A function to process the file names or a list of possible prefixes as strings or a single string.
Default ``None``, no file names removed
terminal_ID : bool, optional
Is there an ID number at the end of the filename? If not then a more general search will be performed.
Default ``True``
split_by : string or list of strings, optional
If multiple participant datasets are in one file sheet, this specifies the column or columns that can
distinguish and identify the rows for each participant. Default ``None``
participantID : string, optional
The dict key where the participant ID can be found. Default ``None``, which results in the file name being
used.
choices : string, optional
The dict key where the participant choices can be found. Default ``'actions'``
feedbacks : string, optional
The dict key where the feedbacks the participant received can be found. Default ``'feedbacks'``
stimuli : string or list of strings, optional
The dict keys where the stimulus cues for each trial can be found. Default ``'None'``
action_options : string or list of strings or None or one element list with a list, optional
If a string or list of strings these are treated as dict keys where the valid actions for each trial can
be found. If None then all trials will use all available actions. If the list contains one list then it will
be treated as a list of valid actions for each trialstep. Default ``'None'``
group_by : list of strings, optional
A list of parts of filenames that are repeated across participants, identifying all the files that should
be grouped together to form one participants data. The rest of the filename is assumed to identify the
participant. Default is ``None``
extra_processing : callable, optional
A function that modifies the dictionary of data read for each participant in such that it is appropriate
for fitting. Default is ``None``
data_read_options : dict, optional
The keyword arguments for the data importing method chosen
Returns
-------
Data : Data class instance
"""
if isinstance(folders, str):
folder_list = [folders]
elif isinstance(folders, list):
folder_list = folders
else:
raise FoldersError('``folders`` must be a string or a list of strings. Found {}'.format(type(folders)))
dat = None
for folder in folder_list:
if file_type == 'mat':
subdat = cls.from_mat(folder=folder,
file_name_filter=file_name_filter,
terminal_ID=terminal_ID,
participantID=participantID,
choices=choices,
feedbacks=feedbacks,
stimuli=stimuli,
action_options=action_options,
group_by=group_by,
extra_processing=extra_processing)
elif file_type == 'csv':
subdat = cls.from_csv(folder=folder,
file_name_filter=file_name_filter,
terminal_ID=terminal_ID,
split_by=split_by,
participantID=participantID,
choices=choices,
feedbacks=feedbacks,
stimuli=stimuli,
action_options=action_options,
group_by=group_by,
extra_processing=extra_processing,
csv_read_options=data_read_options)
elif file_type == 'xlsx':
subdat = cls.from_xlsx(folder=folder,
file_name_filter=file_name_filter,
terminal_ID=terminal_ID,
split_by=split_by,
participantID=participantID,
choices=choices,
feedbacks=feedbacks,
stimuli=stimuli,
action_options=action_options,
group_by=group_by,
extra_processing=extra_processing,
xlsx_read_options=data_read_options)
elif file_type == 'pkl':
subdat = cls.from_pkl(folder=folder,
file_name_filter=file_name_filter,
terminal_ID=terminal_ID,
participantID=participantID,
choices=choices,
feedbacks=feedbacks,
stimuli=stimuli,
action_options=action_options,
group_by=group_by,
extra_processing=extra_processing)
else:
raise FileTypeError('{} is not a supported file type. Please use ``mat``, ``csv``, ``xlsx`` or ``pkl``'.format(file_type))
if dat is None:
dat = subdat
else:
dat.extend(subdat)
return dat
[docs] @classmethod
def from_mat(cls,
folder='./',
file_name_filter=None,
terminal_ID=True,
participantID=None,
choices='actions',
feedbacks='feedbacks',
stimuli=None,
action_options=None,
group_by=None,
extra_processing=None):
"""
Import data from a folder full of .mat files, where each file contains the information of one participant
Parameters
----------
folder : string, optional
The folder where the data can be found. Default is the current folder.
file_name_filter : callable, string, list of strings or None, optional
A function to process the file names or a list of possible prefixes as strings or a single string.
Default ``None``, no file names removed
terminal_ID : bool, optional
Is there an ID number at the end of the filename? If not then a more general search will be performed.
Default ``True``
participantID : string, optional
The dict key where the participant ID can be found. Default ``None``, which results in the file name being
used.
choices : string, optional
The dict key where the participant choices can be found. Default ``'actions'``
feedbacks : string, optional
The dict key where the feedbacks the participant received can be found. Default ``'feedbacks'``
stimuli : string or list of strings, optional
The dict keys where the stimulus cues for each trial can be found. Default ``'None'``
action_options : string or list of strings or None or one element list with a list, optional
If a string or list of strings these are treated as dict keys where the valid actions for each trial can
be found. If None then all trials will use all available actions. If the list contains one list then it will
be treated as a list of valid actions for each trialstep. Default ``'None'``
group_by : list of strings, optional
A list of parts of filenames that are repeated across participants, identifying all the files that should
be grouped together to form one participants data. The rest of the filename is assumed to identify the
participant. Default is ``None``
extra_processing : callable, optional
A function that modifies the dictionary of data read for each participant in such that it is appropriate
for fitting. Default is ``None``
Returns
-------
Data : Data class instance
See Also
--------
scipy.io.loadmat
"""
folder_path = cls.__folder_path_cleaning(folder)
files, file_IDs = cls.__locate_files(folder_path, "mat", file_name_filter=file_name_filter, terminal_ID=terminal_ID)
participant_data = []
for f, i in zip(files, file_IDs):
file_data = {DATA_KEYWORDS['filename']: f,
DATA_KEYWORDS['folder']: folder_path}
if participantID is None:
file_data[DATA_KEYWORDS['ID']] = i
mat = io.loadmat(folder_path + f, struct_as_record=False, squeeze_me=True)
for key, value in mat.items():
if key[0:2] == "__":
continue
elif type(value) == io.matlab.mio5_params.mat_struct:
data_structure = {s: getattr(value, s) for s in value._fieldnames}
file_data.update(data_structure)
else:
file_data[key] = value
participant_data.append(file_data)
if participantID is None:
participantID = DATA_KEYWORDS['ID']
participant_processed_data = cls.__clean_data(participant_data,
extra_processing=extra_processing,
group_by=group_by)
return cls(participant_processed_data,
participantID=participantID,
choices=choices,
feedbacks=feedbacks,
stimuli=stimuli,
action_options=action_options)
[docs] @classmethod
def from_csv(cls,
folder='./',
file_name_filter=None,
terminal_ID=True,
split_by=None,
participantID=None,
choices='actions',
feedbacks='feedbacks',
stimuli=None,
action_options=None,
group_by=None,
extra_processing=None,
csv_read_options=None):
"""
Import data from a folder full of .csv files, where each file contains the information of one participant
Parameters
----------
folder : string, optional
The folder where the data can be found. Default is the current folder.
file_name_filter : callable, string, list of strings or None, optional
A function to process the file names or a list of possible prefixes as strings or a single string.
Default ``None``, no file names removed
terminal_ID : bool, optional
Is there an ID number at the end of the filename? If not then a more general search will be performed.
Default ``True``
split_by : string or list of strings, optional
If multiple participants datasets are in one file sheet, this specifies the column or columns that can
distinguish and identify the rows for each participant. Default ``None``
participantID : string, optional
The dict key where the participant ID can be found. Default ``None``, which results in the file name being
used.
choices : string, optional
The dict key where the participant choices can be found. Default ``'actions'``
feedbacks : string, optional
The dict key where the feedbacks the participant received can be found. Default ``'feedbacks'``
stimuli : string or list of strings, optional
The dict keys where the stimulus cues for each trial can be found. Default ``'None'``
action_options : string or list of strings or None or one element list with a list, optional
If a string or list of strings these are treated as dict keys where the valid actions for each trial can
be found. If None then all trials will use all available actions. If the list contains one list then it will
be treated as a list of valid actions for each trialstep. Default ``'None'``
group_by : list of strings, optional
A list of parts of filenames that are repeated across participants, identifying all the files that should
be grouped together to form one participants data. The rest of the filename is assumed to identify the
participant. Default is ``None``
extra_processing : callable, optional
A function that modifies the dictionary of data read for each participant in such that it is appropriate
for fitting. Default is ``None``
csv_read_options : dict, optional
The keyword arguments for pandas.read_csv. Default ``{}``
Returns
-------
Data : Data class instance
See Also
--------
pandas.read_csv
"""
folder_path = cls.__folder_path_cleaning(folder)
files, file_IDs = cls.__locate_files(folder_path, "csv", file_name_filter=file_name_filter,
terminal_ID=terminal_ID)
if split_by is None:
split_by = []
elif isinstance(split_by, str):
split_by = [split_by]
elif isinstance(split_by, (list, np.ndarray)):
for s in split_by:
if not isinstance(s, str):
raise TypeError('A split_by list should only contain strings. Found {}'.format(type(s)))
else:
raise TypeError('split_by should be a string or a list of strings. Found {}'.format(type(split_by)))
if csv_read_options is None:
csv_read_options = {}
participant_data = []
participantID_changed = False
for filename, fileID in zip(files, file_IDs):
dat = pd.read_csv(folder_path + filename, **csv_read_options)
if split_by:
classifier_list = []
for s in split_by:
try:
dat[s].fillna(method='ffill', inplace=True)
except KeyError as err:
raise KeyError('Data split by contains a column that does not exist: ``{}``'.format(s))
if dat[s].dtype in [np.dtype('int64'), np.dtype('float64')]:
sSorted = sorted(list(set(dat[s])))
classifier_list.append(sSorted)
else:
classifier_list.append(cls.__sort_strings(list(set(dat[s])), ''))
participants = utils.listMerge(*classifier_list)
for p in participants:
sub_dat = dat[(dat[split_by] == p).all(axis=1)]
sub_dat_dict = sub_dat.to_dict(orient='list')
sub_dat_dict[DATA_KEYWORDS['filename']] = filename
sub_dat_dict[DATA_KEYWORDS['folder']] = folder_path
if participantID is None or participantID == split_by[0]:
participantID_changed = True
if len(p) > 1:
sub_dat_dict[DATA_KEYWORDS['ID']] = "-".join([str(pi) for pi in p])
else:
sub_dat_dict[DATA_KEYWORDS['ID']] = p[0]
participant_data.append(sub_dat_dict)
else:
dat_dict = dat.to_dict(orient='list')
dat_dict[DATA_KEYWORDS['filename']] = filename
dat_dict[DATA_KEYWORDS['folder']] = folder_path
if participantID is None:
dat_dict[DATA_KEYWORDS['ID']] = fileID
participantID_changed = True
elif participantID in dat_dict and isinstance(dat_dict[participantID], (list, np.ndarray)):
if utils.list_all_equal(dat_dict[participantID]):
dat_dict[DATA_KEYWORDS['ID']] = dat_dict[participantID][0]
participantID_changed = True
else:
raise TypeError("participantID's column, {}, had more than one value".format(participantID))
participant_data.append(dat_dict)
if participantID_changed:
participantID = DATA_KEYWORDS['ID']
participant_processed_data = cls.__clean_data(participant_data,
extra_processing=extra_processing,
group_by=group_by)
return cls(participant_processed_data,
participantID=participantID,
choices=choices,
feedbacks=feedbacks,
stimuli=stimuli,
action_options=action_options)
[docs] @classmethod
def from_xlsx(cls,
folder='./',
file_name_filter=None,
terminal_ID=True,
split_by=None,
participantID=None,
choices='actions',
feedbacks='feedbacks',
stimuli=None,
action_options=None,
group_by=None,
extra_processing=None,
xlsx_read_options=None):
"""
Import data from a folder full of .xlsx files, where each file contains the information of one participant
Parameters
----------
folder : string, optional
The folder where the data can be found. Default is the current folder.
file_name_filter : callable, string, list of strings or None, optional
A function to process the file names or a list of possible prefixes as strings or a single string.
Default ``None``, no file names removed
terminal_ID : bool, optional
Is there an ID number at the end of the filename? If not then a more general search will be performed.
Default ``True``
split_by : string or list of strings, optional
If multiple participants datasets are in one file sheet, this specifies the column or columns that can
distinguish and identify the rows for each participant. Default ``None``
participantID : string, optional
The dict key where the participant ID can be found. Default ``None``, which results in the file name being
used.
choices : string, optional
The dict key where the participant choices can be found. Default ``'actions'``
feedbacks : string, optional
The dict key where the feedbacks the participant received can be found. Default ``'feedbacks'``
stimuli : string or list of strings, optional
The dict keys where the stimulus cues for each trial can be found. Default ``'None'``
action_options : string or list of strings or None or one element list with a list, optional
If a string or list of strings these are treated as dict keys where the valid actions for each trial can
be found. If None then all trials will use all available actions. If the list contains one list then it will
be treated as a list of valid actions for each trialstep. Default ``'None'``
group_by : list of strings, optional
A list of parts of filenames that are repeated across participants, identifying all the files that should
be grouped together to form one participants data. The rest of the filename is assumed to identify the
participant. Default is ``None``
extra_processing : callable, optional
A function that modifies the dictionary of data read for each participant in such that it is appropriate
for fitting. Default is ``None``
xlsx_read_options : dict, optional
The keyword arguments for pandas.read_excel
Returns
-------
Data : Data class instance
See Also
--------
pandas.read_excel
"""
folder_path = cls.__folder_path_cleaning(folder)
files, file_IDs = cls.__locate_files(folder_path, "xlsx", file_name_filter=file_name_filter,
terminal_ID=terminal_ID)
if split_by is None:
split_by = []
elif isinstance(split_by, str):
split_by = [split_by]
elif isinstance(split_by, (list, np.ndarray)):
for s in split_by:
if not isinstance(s, str):
raise TypeError('A split_by list should only contain strings. Found {}'.format(type(s)))
else:
raise TypeError('split_by should be a string or a list of strings. Found {}'.format(type(split_by)))
if xlsx_read_options is None:
xlsx_read_options = {}
participant_data = []
participantID_changed = False
for filename, fileID in zip(files, file_IDs):
# In case the file is open, this will in fact be a temporary file and not a valid file.
if filename.startswith('~$'):
continue
dat = pd.read_excel(folder_path + filename, **xlsx_read_options)
if split_by:
classifier_list = []
for s in split_by:
try:
dat[s].fillna(method='ffill', inplace=True)
except KeyError as err:
raise KeyError('Data split by contains a column that does not exist: ``{}``'.format(s))
if dat[s].dtype in [np.dtype('int64'), np.dtype('float64')]:
sSorted = sorted(list(set(dat[s])))
classifier_list.append(sSorted)
else:
classifier_list.append(cls.__sort_strings(list(set(dat[s])), ''))
participants = utils.listMerge(*classifier_list)
for p in participants:
sub_dat = dat[(dat[split_by] == p).all(axis=1)]
sub_dat_dict = sub_dat.to_dict(orient='list')
sub_dat_dict[DATA_KEYWORDS['filename']] = filename
sub_dat_dict[DATA_KEYWORDS['folder']] = folder_path
if participantID is None or participantID == split_by[0]:
participantID_changed = True
if len(p) > 1:
sub_dat_dict[DATA_KEYWORDS['ID']] = "-".join([str(pi) for pi in p])
else:
sub_dat_dict[DATA_KEYWORDS['ID']] = p[0]
participant_data.append(sub_dat_dict)
else:
dat_dict = dat.to_dict(orient='list')
dat_dict[DATA_KEYWORDS['filename']] = filename
dat_dict[DATA_KEYWORDS['folder']] = folder_path
if participantID is None:
dat_dict[DATA_KEYWORDS['ID']] = fileID
participantID_changed = True
elif participantID in dat_dict and isinstance(dat_dict[participantID], (list, np.ndarray)):
if utils.list_all_equal(dat_dict[participantID]):
dat_dict[DATA_KEYWORDS['ID']] = dat_dict[participantID][0]
participantID_changed = True
else:
raise TypeError("participantID's column, {}, had more than one value".format(participantID))
participant_data.append(dat_dict)
if participantID_changed:
participantID = DATA_KEYWORDS['ID']
participant_processed_data = cls.__clean_data(participant_data,
extra_processing=extra_processing,
group_by=group_by)
return cls(participant_processed_data,
participantID=participantID,
choices=choices,
feedbacks=feedbacks,
stimuli=stimuli,
action_options=action_options)
[docs] @classmethod
def from_pkl(cls,
folder='./',
file_name_filter=None,
terminal_ID=True,
participantID=None,
choices='actions',
feedbacks='feedbacks',
stimuli=None,
action_options=None,
group_by=None,
extra_processing=None):
"""
Import data from a folder full of .pkl files, where each file contains the information of one participant.
This will principally be used to import data stored by task simulations
Parameters
----------
folder : string, optional
The folder where the data can be found. Default is the current folder.
file_name_filter : callable, string, list of strings or None, optional
A function to process the file names or a list of possible prefixes as strings or a single string.
Default ``None``, no file names removed
terminal_ID : bool, optional
Is there an ID number at the end of the filename? If not then a more general search will be performed.
Default ``True``
participantID : string, optional
The dict key where the participant ID can be found. Default ``None``, which results in the file name being
used.
choices : string, optional
The dict key where the participant choices can be found. Default ``'actions'``
feedbacks : string, optional
The dict key where the feedbacks the participant received can be found. Default ``'feedbacks'``
stimuli : string or list of strings, optional
The dict keys where the stimulus cues for each trial can be found. Default ``'None'``
action_options : string or list of strings or None or one element list with a list, optional
If a string or list of strings these are treated as dict keys where the valid actions for each trial can
be found. If None then all trials will use all available actions. If the list contains one list then it will
be treated as a list of valid actions for each trialstep. Default ``'None'``
group_by : list of strings, optional
A list of parts of filenames that are repeated across participants, identifying all the files that should
be grouped together to form one participants data. The rest of the filename is assumed to identify the
participant. Default is ``None``
extra_processing : callable, optional
A function that modifies the dictionary of data read for each participant in such that it is appropriate
for fitting. Default is ``None``
Returns
-------
Data : Data class instance
"""
folder_path = cls.__folder_path_cleaning(folder)
files, file_IDs = cls.__locate_files(folder_path, "pkl", file_name_filter=file_name_filter,
terminal_ID=terminal_ID)
participant_data = []
for filename, fileID in zip(files, file_IDs):
with open(folder_path + filename, 'rb') as o:
dat = pickle.load(o)
if not isinstance(dat, dict):
raise TypeError("Data coming from ``.pkl`` files expected to be dictionaries. Found {}".format(type(dat)))
dat[DATA_KEYWORDS['filename']] = filename
dat[DATA_KEYWORDS['folder']] = folder_path
file_data = {k: v for k, v in dat.items()}
if participantID is None:
file_data[DATA_KEYWORDS['ID']] = fileID
participant_data.append(file_data)
if participantID is None:
participantID = DATA_KEYWORDS['ID']
participant_processed_data = cls.__clean_data(participant_data,
extra_processing=extra_processing,
group_by=group_by)
return cls(participant_processed_data,
participantID=participantID,
choices=choices,
feedbacks=feedbacks,
stimuli=stimuli,
action_options=action_options)
def __init__(self,
participants,
participantID='ID',
choices='actions',
feedbacks='feedbacks',
stimuli=None,
action_options=None,
process_data_function=None):
"""
Parameters
----------
participants : list of dict
Each dictionary contains the information for one participant
participantID : string, optional
The dict key where the participant ID can be found. Default ``ID``
choices : string, optional
The dict key where the participant choices can be found. Default ``'actions'``
feedbacks : string, optional
The dict key where the feedbacks the participant received can be found. Default ``'feedbacks'``
stimuli : string or list of strings, optional
The dict keys where the stimulus cues for each trial can be found. Default ``'None'``
action_options : string or list of strings or one element list with a list, optional
The dict keys where the valid actions for each trial can be found as a single key or list of keys.
If ``None`` then the action list is considered to stay constant. If the list contains one list then it will
be treated as a list of valid actions for each trialstep. Default ``'None'``
"""
self.process_function = process_data_function
if callable(process_data_function):
participant_data = process_data_function(participants)
elif isinstance(process_data_function, str):
pass
else:
participant_data = participants
if not isinstance(participantID, str):
raise TypeError('participantID should be a string not a {}'.format(type(participantID)))
if not isinstance(choices, str):
raise TypeError('choices should be a string not a {}'.format(type(choices)))
if not isinstance(feedbacks, str):
raise TypeError('feedbacks should be a string not a {}'.format(type(feedbacks)))
if stimuli is None or isinstance(stimuli, str):
combining_stimuli = False
elif isinstance(stimuli, list):
combining_stimuli = True
if not all(isinstance(s, str) for s in stimuli):
raise TypeError('stimuli in the list should be strings: {}'.format(stimuli))
else:
raise TypeError('stimuli should be a string or list of strings not a {}'.format(type(stimuli)))
if action_options is None or isinstance(action_options, str):
combining_action_options = False
elif isinstance(action_options, (list, np.ndarray)):
if all(isinstance(s, str) for s in action_options):
combining_action_options = True
elif len(action_options) == 1:
combining_action_options = False
else:
raise TypeError('The list of action_options should contain strings or one example of trial valid action choices: {}'.format(action_options))
else:
raise TypeError('action_options should be a string, a list of strings or a list containing one example of trial valid action choices, not a {}'.format(type(action_options)))
self.IDs = {}
for loc, p in enumerate(participant_data):
if not isinstance(p, dict):
raise TypeError("participants must be in the form of a dict, not {}".format(type(p)))
keys = list(p.keys())
if participantID not in keys:
raise KeyError("participantID key not found in participant data: `{}`".format(participantID))
elif not isinstance(p[participantID], str):
raise TypeError("participantID value must be a string. Found {}".format(type(p[participantID])))
elif p[participantID] in self.IDs:
raise IDError("participantID must be unique. Found more than one instance of `{}`".format(p[participantID]))
else:
self.participantID = participantID
self.IDs[p[participantID]] = loc
if choices not in keys:
raise KeyError("choices key not found in participant {} data: `{}`".format(p[participantID], choices))
elif not isinstance(p[choices], (list, np.ndarray)):
raise TypeError("choices value must be a list or numpy array. Found {} in {}".format(type(p[choices]), p[participantID]))
else:
self.choices = choices
if feedbacks not in keys:
raise KeyError("feedbacks key not found in participant {} data: `{}`".format(p[participantID], feedbacks))
elif not isinstance(p[feedbacks], (list, np.ndarray)):
raise TypeError("feedbacks value must be a list or numpy array. Found {} in {}".format(type(p[feedbacks]), p[participantID]))
else:
self.feedbacks = feedbacks
if len(p[choices]) != len(p[feedbacks]):
raise LengthError('The number of values for choices and feedbacks must be the same: {} choices and {} feedbacks for participant `{}`'.format(len(p[choices]), len(p[feedbacks]), p[participantID]))
if not combining_stimuli:
if stimuli is None:
self.stimuli = None
elif stimuli not in keys:
raise KeyError("stimuli key not found in participant {} data: `{}`".format(p[participantID], stimuli))
elif not isinstance(p[stimuli], (list, np.ndarray)):
raise TypeError("stimuli value must be a list or numpy array. Found {} in {}".format(type(p[stimuli]), p[participantID]))
else:
self.stimuli = stimuli
else:
if not set(stimuli).issubset(set(keys)):
raise KeyError("stimuli keys not found in participant {} data: `{}`".format(p[participantID], stimuli))
cues_list = [np.array(p[s])[:, np.newaxis] for s in stimuli]
try:
cues_array = np.hstack(cues_list)
except ValueError as error:
if all([True if len(a.shape) == 2 else False for a in cues_list]):
# I did not expect this
raise error
else:
raise DimentionError("If you are using separate keys for each stimulus cue, they must all be 1D lists")
stimuli_combined_name = "cues_combined"
if stimuli_combined_name in keys:
raise KeyError("Unexpected use of key `{}`. Use other name".format(stimuli_combined_name))
p[stimuli_combined_name] = cues_array
self.stimuli = stimuli_combined_name
if stimuli and len(p[choices]) != len(p[self.stimuli]):
raise LengthError('The number of values for choices and stimuli must be the same: {} choices and {} stimuli for participant `{}`'.format(len(p[choices]), len(p[self.stimuli]), p[participantID]))
if not combining_action_options:
if action_options is None:
self.action_options = None
elif isinstance(action_options, (list, np.ndarray)) and len(action_options) == 1:
action_options_constant_name = 'constant_valid_actions'
participant_data[loc][action_options_constant_name] = [action_options[0]] * len(p[choices])
self.action_options = action_options_constant_name
elif action_options not in keys:
raise KeyError("action_options key not found in participant {} data: `{}`".format(p[participantID], action_options))
elif not isinstance(p[action_options], (list, np.ndarray)):
raise TypeError("action_options value must be a list or numpy array. Found {} in {}".format(type(p[action_options]), p[participantID]))
else:
self.action_options = action_options
else:
if not set(action_options).issubset(set(keys)):
raise KeyError("action_options keys not found in participant {} data: {}".format(p[participantID], action_options))
options_list = [np.array(p[a])[:, np.newaxis] for a in action_options]
try:
options_array = np.hstack(options_list)
except ValueError as error:
if all([True if len(a.shape) == 2 else False for a in options_list]):
# I did not expect this
raise error
else:
raise DimentionError(
"If you are using separate keys for each action option, they must all be 1D lists")
action_options_combined_name = "valid_actions_combined"
if action_options_combined_name in keys:
raise KeyError("Unexpected use of key `{}`. Use other name".format(action_options_combined_name))
participant_data[loc][action_options_combined_name] = options_array
self.action_options = action_options_combined_name
if action_options and len(p[choices]) != len(p[self.action_options]) and len(action_options) > 1:
raise LengthError('The number of values for choices and valid actions must be the same: {} choices and {} action_options for participant `{}`'.format(len(p[choices]), len(p[self.action_options]), p[participantID]))
super(Data, self).__init__(participant_data)
[docs] def extend(self, iterable):
"""Combines two Data instances into one
Parameters
----------
iterable : Data instance or list of participant dicts
"""
if isinstance(iterable, Data):
if self.participantID != iterable.participantID:
raise AttributeError('participantID ``{}`` cannot be extended with ``{}``'.format(self.participantID, iterable.participantID))
if self.choices != iterable.choices:
raise AttributeError('choices ``{}`` cannot be extended with ``{}``'.format(self.choices, iterable.choices))
if self.feedbacks != iterable.feedbacks:
raise AttributeError('feedbacks ``{}`` cannot be extended with ``{}``'.format(self.feedbacks, iterable.feedbacks))
if self.stimuli != iterable.stimuli:
raise AttributeError('stimuli ``{}`` cannot be extended with ``{}``'.format(self.stimuli, iterable.stimuli))
if self.action_options != iterable.action_options:
raise AttributeError('action_options ``{}`` cannot be extended with ``{}``'.format(self.action_options, iterable.action_options))
if self.process_function != iterable.process_function:
raise AttributeError('process_function ``{}`` cannot be extended with ``{}``'.format(self.process_function, iterable.process_function))
IDs = self.IDs.copy()
number_IDs = len(IDs)
for i, (id_key, id_val) in enumerate(iterable.IDs.items()):
if id_key in IDs:
raise IDError("participantID must be unique. Found more than one instance of `{}`".format(id_key))
else:
self.IDs[id_key] = number_IDs + id_val
super(Data, self).extend(iterable)
else:
dat = Data(iterable,
participantID=self.participantID,
choices=self.choices,
feedbacks=self.feedbacks,
stimuli=self.stimuli,
action_options=self.action_options,
process_data_function=self.process_function
)
self.extend(dat)
def __add__(self, y):
self.extend(y)
def __eq__(self, other):
if not isinstance(other, Data):
return False
eq_list = []
for item1, item2 in zip(self, other):
if any(item1.keys() != item2.keys()):
eq_list.append(False)
elif any(item1.values() != item2.values()):
eq_list.append(False)
else:
eq_list.append(True)
if len(eq_list) == 0:
return True
else:
return eq_list
def __ne__(self, other):
return not self.__eq__(other)
@staticmethod
def __folder_path_cleaning(folder):
folder_path = os.path.abspath(folder).replace('\\', '/')
if folder_path[-1] != '/':
folder_path += '/'
return folder_path
@classmethod
def __locate_files(cls, folder, file_type, file_name_filter=None, terminal_ID=True):
"""
Produces the list of valid input files
Parameters
----------
folder : string
The folder string should end in a ``/``
file_type : string
The file extension found after the ``.``.
file_name_filter : callable, string, list of strings or None, optional
A function to process the file names or a list of possible prefixes as strings or a single string.
Default ``None``, no file names removed
terminal_ID : bool, optional
Is there an ID number at the end of the filename? If not then a more general search will be performed.
Default ``True``
Returns
-------
dataFiles : list
A sorted list of the the files
fileIDs : list of strings
A list of unique parts of the filenames, in the order of dataFiles
See Also
--------
sortStrings : sorts the files found
"""
files = os.listdir(folder)
data_files = [f for f in files if f.endswith(file_type)]
valid_file_names = cls.__valid_files(data_files, file_name_filter=file_name_filter)
if not valid_file_names:
raise FileError('No data files found')
sorted_files, file_IDs = cls.__sort_strings(valid_file_names,
"." + file_type,
terminalID=terminal_ID,
return_IDs=True)
return sorted_files, file_IDs
@classmethod
def __valid_files(cls, data_files, file_name_filter=None):
"""
Take a list of file names in the folder and a filter function and returns the filtered list
Parameters
----------
data_files : list of strings
The list of file names without paths
file_name_filter : callable, string, list of strings or None, optional
A function to process the file names or a list of possible prefixes as strings or a single string.
Default ``None``, no file names removed
Returns
-------
valid_file_list : list of strings
A subset of the data_files
"""
if file_name_filter is None:
valid_file_list = data_files
elif callable(file_name_filter):
valid_file_list = file_name_filter(data_files)
elif isinstance(file_name_filter, str):
valid_file_list = cls.__file_prefix_filter(data_files, [file_name_filter])
elif isinstance(file_name_filter, (list, np.ndarray)):
valid_file_list = cls.__file_prefix_filter(data_files, file_name_filter)
else:
raise FileFilterError('Unrecognised data file filter {}', file_name_filter)
return valid_file_list
@classmethod
def __sort_strings(cls, unordered_list, suffix, terminalID=True, return_IDs=False):
"""
Takes an unordered list of strings and sorts them if possible and necessary
Parameters
----------
unordered_list : list of strings
A list of valid strings
suffix : string
A known suffix for the string
terminalID : bool, optional
Is there an ID number at the end of the filename? If not then a more general search will be performed. Default ``True``
return_IDs : bool, optional
Specify if the fileIDs should be returned. Default ``False``
Returns
-------
sortedList : list of strings
A sorted list of the the strings
fileIDs : list of strings
A list of unique parts of the filenames, in the order of dataFiles. Only returned if ``return_IDs=True``
See Also
--------
int_core : sorts the strings with the prefix and suffix removed if they are a number
get_unique_prefix : identifies prefixes all strings have
"""
if len(unordered_list) <= 1:
return unordered_list, ["all"]
suffixLen = len(suffix)
if not terminalID:
suffix = cls.__get_unique_suffix(unordered_list, suffixLen)
suffixLen = len(suffix)
prefix = cls.__get_unique_prefix(unordered_list, suffixLen)
sortedList, fileIDs = cls.__int_core(unordered_list, prefix, suffix)
if not sortedList:
sortedList, fileIDs = cls.__str_core(unordered_list, len(prefix), suffixLen)
if return_IDs:
return sortedList, fileIDs
else:
return sortedList
@staticmethod
def __get_unique_suffix(unorderedList, knownSuffixLen):
"""
Parameters
----------
unorderedList : list of strings
A list of strings to be ordered
knownSuffixLen : int
The length of the suffix identified so far
Returns
-------
suffixLen : int
The length of the discovered suffix
"""
for i in range(knownSuffixLen, len(unorderedList[0])): # Starting with the known string-suffix
sec = unorderedList[0][-i:]
if all((sec == d[-i:] for d in unorderedList)):
continue
else:
break
return unorderedList[0][-i + 1:]
@staticmethod
def __get_unique_prefix(unorderedList, suffixLen):
"""
Identifies any initial part of strings that are identical
for all string
Parameters
----------
unorderedList : list of strings
A list of strings to be ordered
suffixLen : int
The length of the identified suffix
Returns
-------
prefix : string
The initial part of the strings that is identical for all strings in
the list
"""
for i in range(1, len(unorderedList[0]) - suffixLen + 2): # Assuming the prefix might be the string-suffix
sec = unorderedList[0][:i]
if all((sec == d[:i] for d in unorderedList)):
continue
else:
break
return unorderedList[0][:i - 1]
@staticmethod
def __str_core(unorderedList, prefixLen, suffixLen):
"""
Takes the *core* part of a string and, assuming it is a string,
sorts them. Returns the list sorted
Parameters
----------
unorderedList : list of strings
The list of strings to be sorted
prefixLen : int
The length of the unchanging start of each filename
suffixLen : int
The length of the unchanging end of each filename
Returns
-------
orderedList : list of strings
The strings now sorted
"""
sortingList = ((f, f[prefixLen:-suffixLen]) for f in unorderedList)
sortedList = sorted(sortingList, key=lambda s: s[1])
orderedList = [s[0] for s in sortedList]
fileIDs = [s[1] for s in sortedList]
return orderedList, fileIDs
@staticmethod
def __int_core(unorderedList, prefix, suffix):
"""Takes the *core* part of a string and, assuming it is an integer, sorts them.
Parameters
----------
unorderedList : list of strings
The list of strings to be sorted
prefix : string
The unchanging part of the start each string
suffix : string
The unchanging known end of each string
Returns
-------
sortedStrings : list of strings
The strings now sorted
"""
try:
if suffix:
testItem = int(unorderedList[0][len(prefix):-len(suffix)])
else:
testItem = int(unorderedList[0][len(prefix):])
except ValueError:
return [], []
if suffix:
core = [(d[len(prefix):-(len(suffix))], i) for i, d in enumerate(unorderedList)]
else:
core = [(d[len(prefix):], i) for i, d in enumerate(unorderedList)]
coreInt = [(int(c), i) for c, i in core]
coreSorted = sorted(coreInt)
coreStr = [(str(c), i) for c, i in coreSorted]
sortedStrings = [''.join([prefix, '0' * (len(core[i][0]) - len(s)), s, suffix]) for s, i in coreStr]
return sortedStrings, [c for c, i in coreStr]
@staticmethod
def __file_prefix_filter(data_files, file_filter):
"""
Takes a list of file names and a list of strings and returns the file names that start with any of the file_name_filter
Parameters
----------
data_files : list of strings
The list of file names without paths
file_filter : list of strings
The list of possible prefixes
Returns
-------
valid_file_list : list of strings
A subset of the data_files
"""
valid_file_list = []
for f in data_files:
for v in file_filter:
if f.startswith(v):
valid_file_list.append(f)
return valid_file_list
@staticmethod
def __clean_data(participant_data, extra_processing=None, group_by=None):
if isinstance(group_by, list):
grouped_data = {}
for dat in participant_data:
filename = dat[DATA_KEYWORDS['filename']]
for group in group_by:
if group in filename:
id_label = filename.replace(group, '')
if id_label not in grouped_data:
grouped_data[id_label] = {}
#grouped_data[id_label].update({'{}_{}'.format(k, group): v for k, v in dat.items()})
grouped_data[id_label][group] = dat
merged_data = []
for id_label, group_data in grouped_data.items():
group_merged_data = {'merge_id': id_label}
keyset = set().union(*[list(v.keys()) for v in grouped_data[id_label].values()])
for key in keyset:
key_values = [group_data[group][key] for group in group_by if group in group_data
and key in group_data[group]]
if utils.list_all_equal(key_values):
group_merged_data[key] = key_values[0]
else:
for group in group_by:
if group in group_data and key in group_data[group]:
group_merged_data['{}_{}'.format(group, key)] = key_values.pop(0)
merged_data.append(group_merged_data)
else:
merged_data = participant_data
if extra_processing:
processed_data = []
for file_data in merged_data:
dat_dict = extra_processing(file_data)
if dat_dict is None:
raise ProcessingError('The extra_processing function must return the data')
else:
processed_data.append(dat_dict)
else:
processed_data = merged_data
return processed_data
# TODO work out how you want to integrate this into getFiles
[docs]def sort_by_last_number(dataFiles):
# sort by the last number on the filename
footSplit = [re.search(r"\.(?:[a-zA-Z]+)$", f).start() for f in dataFiles]
numsplit = [re.search(r"\d+(\.\d+|$)?$", f[:n]).start() for n, f in zip(footSplit, dataFiles)]
# check if number part is a float or an int (assuming the same for all) and use the appropriate conversion
if "." in dataFiles[0][numsplit[0]:footSplit[0]]:
numRepr = float
else:
numRepr = int
fileNameSections = [(f[:n], numRepr(f[n:d]), f[d:]) for n, d, f in zip(numsplit, footSplit, dataFiles)]
# Sort the keys for groupFiles
sortedFileNames = sorted(fileNameSections, key=lambda fileGroup: fileGroup[1])
dataSortedFiles = [head + str(num) + foot for head, num, foot in sortedFileNames]
return dataSortedFiles