Source code for psychopy.tools.filetools

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Part of the PsychoPy library
# Copyright (C) 2002-2018 Jonathan Peirce (C) 2019-2024 Open Science Tools Ltd.
# Distributed under the terms of the GNU General Public License (GPL).

"""Functions and classes related to file and directory handling
"""
import os
import shutil
import subprocess
import sys
import atexit
import codecs
import numpy as np
import json
import json_tricks

try:
    import cPickle as pickle
except ImportError:
    import pickle

from psychopy import logging
from psychopy.tools.fileerrortools import handleFileCollision
from pathlib import Path


def _synonymiseExtensions(assets):
    """
    Synonymise filetypes which refer to the same media types.

    Parameters
    ==========
    assets : dict
        Dict of {name: path} pairs

    Returns
    ==========
    dict
        Same dict which was passed in, but any names ending in a recognised extension
        will have variants with the same stem but different (and synonymous) extensions,
        pointing to the same path. For example:
        {"default.png": "default.png"}
        becomes
        {"default.png": "default.png", "default.jpg": "default.png", "default.jpeg": "default.png"}
    """
    # Alias filetypes
    newAssets = {}
    for key, val in assets.items():
        # Skip if no ext
        if "." not in key:
            continue
        # Get stem and ext
        stem, ext = key.split(".")
        # Synonymise image types
        imgTypes = ("png", "jpg", "jpeg")
        if ext in imgTypes:
            for thisExt in imgTypes:
                newAssets[stem + "." + thisExt] = val
        # Synonymise movie types
        movTypes = ("mp4", "mov", "mkv", "avi", "wmv")
        if ext in movTypes:
            for thisExt in movTypes:
                newAssets[stem + "." + thisExt] = val
        # Synonymise audio types
        sndTypes = ("mp3", "wav")
        if ext in sndTypes:
            for thisExt in sndTypes:
                newAssets[stem + "." + thisExt] = val

    return newAssets


# Names accepted by stimulus classes & the filename of the default stimulus to use
defaultStimRoot = Path(__file__).parent.parent / "assets"
defaultStim = {
    # Image stimuli
    "default.png": "default.png",
    # Movie stimuli
    "default.mp4": "default.mp4",
    # Sound stimuli
    "default.mp3": "default.mp3",
    # Credit card image
    "creditCard.png": "creditCard.png",
    "CreditCard.png": "creditCard.png",
    "creditcard.png": "creditCard.png",
    # USB image
    "USB.png": "USB.png",
    "usb.png": "USB.png",
    # USB-C image
    "USB-C.png": "USB-C.png",
    "USB_C.png": "USB-C.png",
    "USBC.png": "USB-C.png",
    "usb-c.png": "USB-C.png",
    "usb_c.png": "USB-C.png",
    "usbc.png": "USB-C.png",
}
defaultStim = _synonymiseExtensions(defaultStim)


[docs]def toFile(filename, data): """Save data (of any sort) as a pickle file. simple wrapper of the cPickle module in core python """ f = open(filename, 'wb') pickle.dump(data, f) f.close()
[docs]def fromFile(filename, encoding='utf-8-sig'): """Load data from a psydat, pickle or JSON file. Parameters ---------- encoding : str The encoding to use when reading a JSON file. This parameter will be ignored for any other file type. """ filename = pathToString(filename) if filename.endswith('.psydat'): with open(filename, 'rb') as f: try: contents = pickle.load(f) except UnicodeDecodeError: f.seek(0) # reset to start of file to try again contents = pickle.load(f, encoding='latin1') # python 2 data files # if loading an experiment file make sure we don't save further # copies using __del__ if hasattr(contents, 'abort'): contents.abort() return contents elif filename.endswith('pickle'): with open(filename, 'rb') as f: contents = pickle.load(f) return contents elif filename.endswith('.json'): with codecs.open(filename, 'r', encoding=encoding) as f: contents = json_tricks.load(f) # Restore RNG if we load a TrialHandler2 object. # We also need to remove the 'temporary' ._rng_state attribute that # was saved with it. from psychopy.data import TrialHandler2 if isinstance(contents, TrialHandler2): contents._rng = np.random.default_rng() contents._rng.bit_generator.state = contents._rng_state del contents._rng_state return contents # QuestPlus. if sys.version_info.major == 3 and sys.version_info.minor >= 6: from psychopy.data.staircase import QuestPlusHandler from questplus import QuestPlus if isinstance(contents, QuestPlusHandler): # Restore the questplus.QuestPlus object. contents._qp = QuestPlus.from_json(contents._qp_json) del contents._qp_json return contents # If we haven't returned anything by now, the loaded object is neither # a TrialHandler2 nor a QuestPlus instance. Return it unchanged. return contents else: msg = "Don't know how to handle this file type, aborting." raise ValueError(msg)
[docs]def mergeFolder(src, dst, pattern=None): """Merge a folder into another. Existing files in `dst` folder with the same name will be overwritten. Non-existent files/folders will be created. """ # dstdir must exist first srcnames = os.listdir(src) for name in srcnames: srcfname = os.path.join(src, name) dstfname = os.path.join(dst, name) if os.path.isdir(srcfname): if not os.path.isdir(dstfname): os.makedirs(dstfname) mergeFolder(srcfname, dstfname) else: try: # copy without metadata: shutil.copyfile(srcfname, dstfname) except IOError as why: print(why)
[docs]def openOutputFile(fileName=None, append=False, fileCollisionMethod='rename', encoding='utf-8-sig'): """Open an output file (or standard output) for writing. :Parameters: fileName : None, 'stdout', or str The desired output file name. If `None` or `stdout`, return `sys.stdout`. Any other string will be considered a filename. append : bool, optional If ``True``, append data to an existing file; otherwise, overwrite it with new data. Defaults to ``True``, i.e. appending. fileCollisionMethod : string, optional How to handle filename collisions. Valid values are `'rename'`, `'overwrite'`, and `'fail'`. This parameter is ignored if ``append`` is set to ``True``. Defaults to `rename`. encoding : string, optional The encoding to use when writing the file. This parameter will be ignored if `append` is `False` and `fileName` ends with `.psydat` or `.npy` (i.e. if a binary file is to be written). Defaults to ``'utf-8'``. :Returns: f : file A writable file handle. """ fileName = pathToString(fileName) if (fileName is None) or (fileName == 'stdout'): return sys.stdout if append: mode = 'a' else: if fileName.endswith(('.psydat', '.npy')): mode = 'wb' else: mode = 'w' # Rename the output file if a file of that name already exists # and it should not be appended. if os.path.exists(fileName) and not append: fileName = handleFileCollision( fileName, fileCollisionMethod=fileCollisionMethod) # Do not use encoding when writing a binary file. if 'b' in mode: encoding = None if os.path.exists(fileName) and mode in ['w', 'wb']: logging.warning('Data file %s will be overwritten!' % fileName) # The file will always be opened in binary writing mode, # see https://docs.python.org/2/library/codecs.html#codecs.open f = codecs.open(fileName, mode=mode, encoding=encoding) return f
[docs]def genDelimiter(fileName): """Return a delimiter based on a filename. :Parameters: fileName : string The output file name. :Returns: delim : string A delimiter picked based on the supplied filename. This will be ``,`` if the filename extension is ``.csv``, and a tabulator character otherwise. """ fileName = pathToString(fileName) if fileName.endswith(('.csv', '.CSV')): delim = ',' else: delim = '\t' return delim
def genFilenameFromDelimiter(filename, delim): # If no known filename extension was specified, derive a one from the # delimiter. filename = pathToString(filename) if not filename.endswith(('.dlm', '.DLM', '.tsv', '.TSV', '.txt', '.TXT', '.csv', '.CSV', '.psydat', '.npy', '.json')): if delim == ',': filename += '.csv' elif delim == '\t': filename += '.tsv' else: filename += '.txt' return filename def constructLegacyFilename(filename): # make path object from filename filename = Path(filename) # construct legacy variant name legacyName = filename.parent / (filename.stem + "_legacy" + filename.suffix) return legacyName class DictStorage(dict): """Helper class based on dictionary with storage to json """ def __init__(self, filename, *args, **kwargs): dict.__init__(self, *args, **kwargs) self.filename = filename self.load() self._deleted = False atexit.register(self.__del__) def load(self, filename=None): """Load all tokens from a given filename (defaults to ~/.PsychoPy3/pavlovia.json) """ if filename is None: filename = self.filename if os.path.isfile(filename): with open(filename, 'r') as f: try: self.update(json.load(f)) except ValueError: logging.error("Tried to load %s but it wasn't valid " "JSON format" %filename) def save(self, filename=None): """Save all tokens from a given filename (defaults to the filename given to the class but can be overridden) """ if filename is None: filename = self.filename # make sure the folder exists folder = os.path.split(filename)[0] if not os.path.isdir(folder): os.makedirs(folder) # save the file as json with open(filename, 'wb') as f: json_str = json.dumps(self, indent=2, sort_keys=True) f.write(bytes(json_str, 'UTF-8')) def __del__(self): if not self._deleted: self.save() self._deleted = True class KnownProjects(DictStorage): def save(self, filename=None): """Purge unnecessary projects (without a local root) and save""" toPurge = [] for projname in self: proj = self[projname] if not proj['localRoot']: toPurge.append(projname) for projname in toPurge: del self[projname] DictStorage.save(self, filename) def pathToString(filepath): """ Coerces pathlib Path objects to a string (only python version 3.6+) any other objects passed to this function will be returned as is. This WILL NOT work with on Python 3.4, 3.5 since the __fspath__ under method did not exist in those versions, however psychopy does not support these versions of python anyways. :Parameters: filepath : str or pathlib.Path file system path that needs to be coerced into a string to use by Psychopy's internals :Returns: filepath : str or same as input object file system path coerced into a string type """ if hasattr(filepath, "__fspath__"): return filepath.__fspath__() return filepath def openInExplorer(path): """ Open a given director path in current operating system's file explorer. """ # Choose a command according to OS if sys.platform in ['win32']: comm = "explorer" elif sys.platform in ['darwin']: comm = "open" elif sys.platform in ['linux', 'linux2']: comm = "dolphin" # Use command to open folder ret = subprocess.call(" ".join([comm, path]), shell=True) return ret

Back to top