Source code for outputting

# -*- coding: utf-8 -*-
"""
:Author: Dominic Hunt
"""
import logging
import sys
import os
import inspect
import collections

import pickle
import pandas as pd
import datetime as dt
import shutil as shu
import numpy as np

import utils
import start


#%% Folder management
[docs]class LoggerWriter(object): """ Fake file-like stream object that redirects writes to a logger instance. Taken from https://stackoverflow.com/a/51612402 Parameters ---------- writer : logging function """ # def __init__(self, writer): self._writer = writer self._message = ''
[docs] def write(self, message): self._message = self._message + message while '\n' in self._message: pos = self._message.find('\n') self._writer(self._message[:pos]) self._message = self._message[pos + 1:]
[docs] def flush(self): if self._message != '': self._writer(self._message) self._message = ''
[docs]class Saving(object): """ Creates the folder structure for the saved data and created the log file as ``log.txt`` Parameters ---------- label : string, optional The label for the simulation. Default ``None`` will mean no data is saved to files. output_path : string, optional The path that will be used for the run output. Default ``None`` config : dict, optional The parameters of the running simulation/fitting. This is used to create a YAML configuration file. Default ``None`` config_file : string, optional The file name and path of a ``.yaml`` configuration file. Default ``None`` pickle_store : bool, optional If true the data for each model, task and participant is recorded. Default is ``False`` min_log_level : str, optional Defines the level of the log from (``DEBUG``, ``INFO``, ``WARNING``, ``ERROR``, ``CRITICAL``). Default ``INFO`` See https://docs.python.org/3/library/logging.html#levels numpy_error_level : {'log', 'raise'} Defines the response to numpy errors. Default ``log``. See numpy.seterr Returns ------- file_name_gen : function Creates a new file with the name <handle> and the extension <extension>. It takes two string parameters: (``handle``, ``extension``) and returns one ``fileName`` string See Also -------- folderSetup : creates the folders """ def __init__(self, label=None, output_path=None, config=None, config_file=None, pickle_store=False, min_log_level='INFO', numpy_error_level="log"): if config is not None: label = config['label'] output_path = config['output_path'] config_file = config['config_file'] pickle_store = config['pickle'] min_log_level = config['min_log_level'] numpy_error_level = config['numpy_error_level'] self.date_string = date() self.label = label self.config = config self.config_file = config_file self.pickle_store = pickle_store self.numpy_error_level = numpy_error_level if label: self.save_label = label if output_path: self.base_path = output_path elif config_file: self.base_path = folder_path_cleaning(os.path.dirname(os.path.abspath(config_file))) else: self.base_path = None else: self.save_label = 'Untitled' possible_log_levels = {'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'WARNING': logging.WARNING, 'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL} self.log_level = possible_log_levels[min_log_level] def __enter__(self): if self.label: output_folder = folder_setup(self.save_label, self.date_string, pickle_data=self.pickle_store, base_path=self.base_path) file_name_gen = file_name_generator(output_folder) log_file = file_name_gen('log', 'txt') if self.config_file: shu.copy(self.config_file, output_folder) if self.config is not None: config_file = file_name_gen('config', 'yaml') start.write_script(config_file, self.config) else: output_folder = None log_file = None file_name_gen = None self.close_loggers = fancy_logger(log_file=log_file, log_level=self.log_level, numpy_error_level=self.numpy_error_level) logger = logging.getLogger('Framework') message = 'Beginning task labelled: {}'.format(self.save_label) logger.info(message) return file_name_gen def __exit__(self, exc_type, exc_value, exc_traceback): if exc_type is not None and issubclass(exc_type, Exception): logger = logging.getLogger('Fatal') logger.error("Logging an uncaught fatal exception", exc_info=(exc_type, exc_value, exc_traceback)) self.close_loggers()
[docs]def folder_setup(label, date_string, pickle_data=False, base_path=None): """ Identifies and creates the folder the data will be stored in Folder will be created as "./Outputs/<sim_label>_<date>/". If that had previously been created then it is created as "./Outputs/<sim_label>_<date>_no_<#>/", where "<#>" is the first available integer. A subfolder is also created with the name ``Pickle`` if pickle is true. Parameters ---------- label : str The label for the simulation date_string : str The date identifier pickle_data : bool, optional If true the data for each model, task and participant is recorded. Default is ``False`` base_path : str, optional The path into which the new folder will be placed. Default is current working directory Returns ------- folder_name : string The folder path that has just been created See Also -------- newFile : Creates a new file saving : Creates the log system """ if not base_path: base_path = folder_path_cleaning(os.getcwd()) else: base_path = folder_path_cleaning(base_path) # While the folders have already been created, check for the next one folder_name = "{}Outputs/{}_{}".format(base_path, label, date_string) if os.path.exists(folder_name): i = 1 folder_name += '_no_' while os.path.exists(folder_name + str(i)): i += 1 folder_name += str(i) folder_name += "/" os.makedirs(folder_name) os.makedirs(folder_name + 'data/') if pickle_data: os.makedirs(folder_name + 'Pickle/') return folder_name
#%% File management
[docs]def file_name_generator(output_folder=None): """ Keeps track of filenames that have been used and generates the next unused one Parameters ---------- output_folder : string, optional The folder into which the new file will be placed. Default is the current working directory Returns ------- new_file_name : function Creates a new file with the name <handle> and the extension <extension>. It takes two string parameters: (``handle``, ``extension``) and returns one ``fileName`` string Examples -------- >>> file_name_gen = file_name_generator("./") >>> file_name_gen("a", "b") './a.b' >>> file_name_gen("a", "b") './a_1.b' >>> file_name_gen("", "") './' >>> file_name_gen = file_name_generator() >>> fileName = file_name_gen("", "") >>> fileName == os.getcwd() False """ if not output_folder: output_path = folder_path_cleaning(os.getcwd()) else: output_path = folder_path_cleaning(output_folder) output_file_counts = collections.defaultdict(int) def new_file_name(handle, extension): """ Creates a new unused file name with the <handle> and the extension <extension> Parameters ---------- handle : string The file name extension : string The extension of the file Returns ------- file_name : string The file name allowed for the file """ if extension == '': end = '' else: end = "." + extension file_name = output_path + handle file_name_form = file_name + end last_count = output_file_counts[file_name_form] output_file_counts[file_name_form] += 1 if last_count > 0: file_name += "_" + str(last_count) # if os.path.exists(fileName + end): # i = 1 # while os.path.exists(fileName + "_" + str(i) + end): # i += 1 # fileName += "_" + str(i) file_name += end return file_name return new_file_name
[docs]def folder_path_cleaning(folder): """ Modifies string file names from Windows format to Unix format if necessary and makes sure there is a ``/`` at the end. Parameters ---------- folder : string The folder path Returns ------- folder_path : str The folder path """ folder_path = folder.replace('\\', '/') if folder_path[-1] != '/': folder_path += '/' return folder_path
#%% Logging
[docs]def fancy_logger(log_file=None, log_level=logging.DEBUG, numpy_error_level='log'): """ Sets up the style of logging for all the simulations Parameters ---------- log_file : string, optional Provides the path the log will be written to. Default "./log.txt" log_level : {logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL} Defines the level of the log. Default logging.INFO numpy_error_level : {'log', 'raise'} Defines the response to numpy errors. Default ``log``. See numpy.seterr Returns ------- close_loggers : function Closes the logging systems that have been set up See Also -------- logging : The Python standard logging library numpy.seterr : The function npErrResp is passed to for defining the response to numpy errors """ old_stdout = sys.stdout logging.basicConfig(stream=sys.stdout, format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s', datefmt='%H:%M', level=log_level) core_logger = logging.getLogger('') if log_file: file_handler = logging.FileHandler(log_file, mode='w') file_handler.setLevel(log_level) file_format = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s', datefmt='%y-%m-%d %H:%M') file_handler.setFormatter(file_format) core_logger.addHandler(file_handler) else: file_handler = None logging.captureWarnings(True) np.seterr(all=numpy_error_level) old_np_error_call = np.seterrcall(LoggerWriter(logging.getLogger('NPSTDERR').error)) old_stderr = sys.stderr sys.stderr = LoggerWriter(logging.getLogger('STDERR').error) setup_logger = logging.getLogger('Setup') setup_logger.info(date()) setup_logger.info('Log initialised') if log_file: setup_logger.info("The log you are reading was written to " + str(log_file)) def close_loggers(): """ To run once everything has been completed. """ message = "Shutting down program" setup_logger.info(message) logging.shutdown() if file_handler: core_logger.removeHandler(file_handler) np.seterrcall(old_np_error_call) sys.stderr = old_stderr sys.stdout = old_stdout #for h in core_logger.handlers[:]: # h.close() # core_logger.removeHandler(h) return close_loggers
#%% Pickle
[docs]def pickle_write(data, handle, file_name_gen): """ Writes the data to a pickle file Parameters ---------- data : object Data to be written to the file handle : string The name of the file file_name_gen : function Creates a new file with the name <handle> and the extension <extension>. It takes two string parameters: (``handle``, ``extension``) and returns one ``fileName`` string """ output_file = file_name_gen(handle, 'pkl') with open(output_file, 'wb') as w: pickle.dump(data, w)
[docs]def pickleLog(results, file_name_gen, label=""): """ Stores the data in the appropriate pickle file in a Pickle subfolder of the outputting folder Parameters ---------- results : dict The data to be stored file_name_gen : function Creates a new file with the name <handle> and the extension <extension>. It takes two string parameters: (``handle``, ``extension``) and returns one ``fileName`` string label : string, optional A label for the results file """ if not file_name_gen: return # TODO: remove the pulling out of ``Name`` from inside this method and make it more explicit higher up name = results["Name"] if isinstance(name, str): handle = 'Pickle/{}'.format(name) else: raise TypeError("The ``Name`` in the participant data is of type {} and not str".format(type(name))) if label: handle += label pickle_write(results, handle, file_name_gen)
#%% Utils
[docs]def flatDictKeySet(store, selectKeys=None): """ Generates a dictionary of keys and identifiers for the new dictionary, including only the keys in the keys list. Any keys with lists will be split into a set of keys, one for each element in the original key. These are named <key><location> Parameters ---------- store : list of dicts The dictionaries would be expected to have many of the same keys. Any dictionary keys containing lists in the input have been split into multiple numbered keys selectKeys : list of strings, optional The keys whose data will be included in the return dictionary. Default ``None``, which results in all keys being returned Returns ------- keySet : dict with values of dict, list or None The dictionary of keys to be extracted See Also -------- reframeListDicts, newFlatDict """ keySet = {} for s in store: if selectKeys: sKeys = (k for k in s.keys() if k in selectKeys) abridge = True else: sKeys = s.keys() abridge = False for k in sKeys: if k in keySet: continue v = s[k] if isinstance(v, (list, np.ndarray)): listSet, maxListLen = listKeyGen(v, maxListLen=None, returnList=False, abridge=abridge) if listSet is not None: keySet[k] = listSet elif isinstance(v, dict): dictKeySet, maxListLen = dictKeyGen(v, maxListLen=None, returnList=False, abridge=abridge) keySet[k] = dictKeySet else: keySet[k] = None return keySet
[docs]def newFlatDict(store, selectKeys=None, labelPrefix=''): """ Takes a list of dictionaries and returns a dictionary of 1D lists. If a dictionary did not have that key or list element, then 'None' is put in its place Parameters ---------- store : list of dicts The dictionaries would be expected to have many of the same keys. Any dictionary keys containing lists in the input have been split into multiple numbered keys selectKeys : list of strings, optional The keys whose data will be included in the return dictionary. Default ``None``, which results in all keys being returned labelPrefix : string An identifier to be added to the beginning of each key string. Returns ------- newStore : dict The new dictionary with the keys from the keySet and the values as 1D lists with 'None' if the keys, value pair was not found in the store. Examples -------- >>> store = [{'list': [1, 2, 3, 4, 5, 6]}] >>> newFlatDict(store) {'list_[0]': [1], 'list_[1]': [2], 'list_[2]': [3], 'list_[3]': [4], 'list_[4]': [5], 'list_[5]': [6]} >>> store = [{'string': 'string'}] >>> newFlatDict(store) {'string': ["'string'"]} >>> store = [{'dict': {1: {3: "a"}, 2: "b"}}] >>> newFlatDict(store) {'dict_1_3': ["'a'"], 'dict_2': ["'b'"]} """ keySet = flatDictKeySet(store, selectKeys=selectKeys) newStore = {} if labelPrefix: labelPrefix += "_" for key, loc in keySet.items(): newKey = labelPrefix + str(key) if isinstance(loc, dict): subStore = [s[key] for s in store] keyStoreSet = newFlatDict(subStore, labelPrefix=newKey) newStore.update(keyStoreSet) elif isinstance(loc, (list, np.ndarray)): for locCo in loc: tempList = [] for s in store: rawVal = s.get(key, None) if rawVal is None: tempList.append(None) else: tempList.append(listSelection(rawVal, locCo)) newStore.setdefault(newKey + "_" + str(locCo), tempList) else: vals = [repr(s.get(key, None)) for s in store] newStore.setdefault(newKey, vals) return newStore
[docs]def newListDict(store, labelPrefix='', maxListLen=0): """ Takes a dictionary of numbers, strings, lists and arrays and returns a dictionary of 1D arrays. If there is a single value, then a list is created with that value repeated Parameters ---------- store : dict A dictionary of numbers, strings, lists, dictionaries and arrays labelPrefix : string An identifier to be added to the beginning of each key string. Default empty string Returns ------- newStore : dict The new dictionary with the keys from the keySet and the values as 1D lists. Examples -------- >>> store = {'list': [1, 2, 3, 4, 5, 6]} >>> newListDict(store) {'list': [1, 2, 3, 4, 5, 6]} >>> store = {'string': 'string'} >>> newListDict(store) {'string': ['string']} >>> store = {'dict': {1: {3: "a"}, 2: "b"}} >>> newListDict(store) {'dict_1_3': ['a'], 'dict_2': ['b']} """ keySet, maxListLen = dictKeyGen(store, maxListLen=maxListLen, returnList=True, abridge=False) newStore = {} if labelPrefix: labelPrefix += "_" for key, loc in keySet.items(): newKey = labelPrefix + str(key) if isinstance(loc, dict): keyStoreSet = newListDict(store[key], labelPrefix=newKey, maxListLen=maxListLen) newStore.update(keyStoreSet) elif isinstance(loc, (list, np.ndarray)): for locCo in loc: vals = list(listSelection(store[key], locCo)) vals = pad(vals, maxListLen) newStore[newKey + "_" + str(locCo)] = vals else: v = store[key] if isinstance(v, (list, np.ndarray)): vals = pad(list(v), maxListLen) else: # We assume the object is a single value or string vals = pad([v], maxListLen) newStore[newKey] = vals return newStore
[docs]def pad(values, maxListLen): """ Pads a list with None Parameters ---------- values : list The list to be extended maxListLen : int The number of elements the list needs to have """ vLen = np.size(values) if vLen < maxListLen: values.extend([None for i in range(maxListLen - vLen)]) return values
[docs]def listSelection(data, loc): """ Allows numpy array-like referencing of lists Parameters ---------- data : list The data to be referenced loc : tuple of integers The location to be referenced Returns ------- selection : list The referenced subset Examples -------- >>> listSelection([1, 2, 3], (0,)) 1 >>> listSelection([[1, 2, 3], [4, 5, 6]], (0,)) [1, 2, 3] >>> listSelection([[1, 2, 3], [4, 5, 6]], (0, 2)) 3 """ if len(loc) == 0: return None elif len(loc) == 1: return data[loc[0]] else: subData = listSelection(data, loc[:-1]) if len(np.shape(subData)) > 0: return listSelection(data, loc[:-1])[loc[-1]] else: return None
[docs]def dictKeyGen(store, maxListLen=None, returnList=False, abridge=False): """ Identifies the columns necessary to convert a dictionary into a table Parameters ---------- store : dict The dictionary to be broken down into keys maxListLen : int or float with no decimal places or None, optional The length of the longest expected list. Only useful if returnList is ``True``. Default ``None`` returnList : bool, optional Defines if the lists will be broken into 1D lists or values. Default ``False``, lists will be broken into values abridge : bool, optional Defines if the final dataset will be a summary or the whole lot. If it is a summary, lists of more than 10 elements are removed. Default ``False``, not abridged Returns ------- keySet : dict with values of dict, list or None The dictionary of keys to be extracted maxListLen : int or float with no decimal places or None, optional If returnList is ``True`` this should be the length of the longest list. If returnList is ``False`` this should return its original value Examples -------- >>> store = {'string': 'string'} >>> dictKeyGen(store) ({'string': None}, 1) >>> store = {'num': 23.6} >>> dictKeyGen(store) ({'num': None}, 1) >>> store = {'array': np.array([[1, 2, 3, 4, 5, 6], [7, 8, 9, 10, 11, 12]])} >>> dictKeyGen(store, returnList=True, abridge=True) ({'array': array([[0], [1]])}, 6) >>> store = {'dict': {1: "a", 2: "b"}} >>> dictKeyGen(store, maxListLen=7, returnList=True, abridge=True) ({'dict': {1: None, 2: None}}, 7) """ keySet = {} for k in store.keys(): v = store[k] if isinstance(v, (list, np.ndarray)): listSet, maxListLen = listKeyGen(v, maxListLen=maxListLen, returnList=returnList, abridge=abridge) if listSet is not None: keySet.setdefault(k, listSet) else: keySet.setdefault(k, None) elif isinstance(v, dict): dictKeySet, maxListLen = dictKeyGen(v, maxListLen=maxListLen, returnList=returnList, abridge=abridge) keySet.setdefault(k, dictKeySet) else: keySet.setdefault(k, None) if maxListLen is None and len(keySet) > 0: maxListLen = 1 return keySet, maxListLen
[docs]def listKeyGen(data, maxListLen=None, returnList=False, abridge=False): """ Identifies the columns necessary to convert a list into a table Parameters ---------- data : numpy.ndarray or list The list to be broken down maxListLen : int or float with no decimal places or None, optional The length of the longest expected list. Only useful if returnList is ``True``. Default ``None`` returnList : bool, optional Defines if the lists will be broken into 1D lists or values. Default ``False``, lists will be broken into values abridge : bool, optional Defines if the final dataset will be a summary or the whole lot. If it is a summary, lists of more than 10 elements are removed. Default ``False``, not abridged Returns ------- returnList : None or list of tuples of ints or ints The list of co-ordinates for the elements to be extracted from the data. If None the list is used as-is. maxListLen : int or float with no decimal places or None, optional If returnList is ``True`` this should be the length of the longest list. If returnList is ``False`` this should return its original value Examples -------- >>> listKeyGen([[1, 2, 3, 4, 5, 6], [4, 5, 6, 7, 8, 9]], maxListLen=None, returnList=False, abridge=False) (array([[0, 0], [1, 0], [0, 1], [1, 1], [0, 2], [1, 2], [0, 3], [1, 3], [0, 4], [1, 4], [0, 5], [1, 5]]), 1) >>> listKeyGen([[1, 2, 3, 4, 5, 6], [4, 5, 6, 7, 8, 9]], maxListLen=None, returnList=False, abridge=True) (None, None) >>> listKeyGen([[1, 2, 3, 4, 5, 6], [4, 5, 6, 7, 8, 9]], maxListLen=None, returnList=True, abridge=True) (array([[0], [1]]), 6) """ dataShape = np.shape(data) if dataShape[-1] == 0: return None, maxListLen dataShapeList = list(dataShape) if returnList: dataShapeFirst = dataShapeList.pop(-1) numberColumns = np.prod(dataShapeList) if maxListLen is None: maxListLen = dataShapeFirst elif dataShapeFirst > maxListLen: maxListLen = dataShapeFirst else: numberColumns = np.prod(dataShape) # If we are creating an abridged dataset and the length is too long, skip it. It will just clutter up the document if abridge and numberColumns > 10: return None, maxListLen # We need to calculate every combination of co-ordinates in the array arrSets = [list(range(0, i)) for i in dataShapeList] # Now record each one locList = np.array([tuple(loc) for loc in utils.listMergeGen(*arrSets)]) listItemLen = len(locList[0]) if listItemLen == 1: finalList = locList #.flatten() elif listItemLen == 0: return None, maxListLen else: finalList = locList if maxListLen is None and len(finalList) > 0: maxListLen = 1 return finalList, maxListLen
[docs]def date(): """ Calculate today's date as a string in the form <year>-<month>-<day> and returns it Returns ------- date_today : str The current date in the format <year>-<month>-<day> """ d = dt.datetime(2000, 1, 1) d = d.today() date_today = "{}-{}-{}".format(d.year, d.month, d.day) return date_today