Source code for ubcs_auxiliary.os

[docs]def find(topdir, name=[], exclude=[]): """A list of files found starting at 'topdir' that match the patterns given by 'name', excluding those matching the patterns given by 'exclude'. Parameters ---------- topdir (string) name (list) exclude (list) Returns ------- file_list (list) Examples -------- >>> res = anfinrud_auxiliary.os.walk('ubcs_auxiliary/') >>> for i in res: print(i[0]) ...: ubcs_auxiliary/ ubcs_auxiliary/tests ubcs_auxiliary/tests/__pycache__ ubcs_auxiliary/__pycache__ Python 3.7.4 (v3.7.4:e09359112e, Jul 8 2019, 14:54:52) Type 'copyright', 'credits' or 'license' for more information IPython 7.8.0 -- An enhanced Interactive Python. Type '?' for help. In [1]: from time import time In [2]: import auxiliary In [3]: res = auxiliary.os.walk('/') In [4]: t1 = time(); lst = list(res); t2 = time(); print(t2-t1, len(lst)) 3.9815242290496826 1346 Python 2.7.16 (v2.7.16:413a49145e, Mar 2 2019, 14:32:10) Type "copyright", "credits" or "license" for more information. IPython 5.8.0 -- An enhanced Interactive Python. In [1]: from time import time In [2]: import anfinrud_auxiliary In [3]: res = auxiliary.os.walk('') In [4]: t1 = time(); lst = list(res); t2 = time(); print(t2-t1, len(lst)) (0.77646803855896, 1346) """ def glob_to_regex(pattern): return "^"+pattern.replace(".", "\\.").replace("*", ".*").replace("?", ".")+"$" try: from scandir import walk except ImportError: from os import walk import re if type(name) == str: name = [name] if type(exclude) == str: exclude = [exclude] name = [re.compile(glob_to_regex(pattern)) for pattern in name] exclude = [re.compile(glob_to_regex(pattern)) for pattern in exclude] file_list = [] for (directory, subdirs, files) in walk(topdir): for file in files: pathname = directory+"/"+file match = any([pattern.match(pathname) for pattern in name]) and\ not any([pattern.match(pathname) for pattern in exclude]) if match: file_list += [pathname] return file_list
[docs]def exclude(): """Returns a list of patterns to exclude from a search. Add terms as required.""" exclude = ['*/alignment*', '*/trash*', '*/_Archived*', '*/backup*', '*/Commissioning*', '*/Test*', '*/.AppleDouble*', '*LaserX*', '*LaserZ*', '*Copy*', '*._*', '.DS_Store'] return exclude
[docs]def image_file_names_from_path(beamtime,path_name): """Returns image file names found under path_name. Typically used with 'Reference_*() , which specifies which directories contain data for which zinger-free-statistics are to be acquired. The zinger-free-statistics include Imean and Ivar, which are used to construct UC_psi.npy.""" from db_functions import find,exclude from numpy import sort data_dir,analysis_dir = data_dir_analysis_dir(beamtime) terms = ['*.mccd','*.rx'] image_files = sort(find(data_dir+path_name, name=terms, exclude=exclude())) return image_files
def N_files_in_dir(folder = '', match = '*'): import os import fnmatch integer = len(fnmatch.filter(os.listdir(folder), match)) return integer
[docs]def listdir(root, include = ['.hdf5'], exclude = [], sort = ''): """ returns list of files from a 'source' directory that have terms listed in 'include' and doesn't terms listed in 'exclude'. Extra parameter 'sort' can be used to sort the output list. If left blank, no sorting will be performed. Parameters ---------- source (string) include (list) exclude (list) sort (string) Returns ------- file_list (list) Examples -------- >>> res = ubcs_auxiliary.os.get_list_of_files('/',['.hdf5']) """ import os from numpy import argsort, array files = [os.path.join(root,file) for file in os.listdir(root)] selected = [] [selected.append(file) for file in files if ((all([term in file for term in include])) and (all([term2 not in file for term2 in exclude])))] return selected
[docs]def find_recent_filename(root, include, exclude, newest_first = True): """ find the list of files or folders that have any terms specified in the list 'include' and do not have terms specified in 'exclude'. The extra parameter reverse_order specified whether return the newest or oldest one. Parameters ---------- source (string) include (list) exclude (list) sort (string) Returns ------- file_list (list) """ from os import listdir,path from os.path import getmtime from numpy import argsort, array files = [path.join(root,file) for file in listdir(root)] selected = [] [selected.append(file) for file in files if ((all([term in file for term in include])) and (all([term2 not in file for term2 in exclude])))] path_names = selected.copy() if len(path_names) > 0: creation_times = [getmtime(file) for file in path_names] sort_order = argsort(creation_times) if newest_first: return array(path_names)[sort_order][-1] else: return array(path_names)[sort_order][0] else: return ''
[docs]def get_current_pid_memory_usage(units = 'GB',verbose = False): """ returns current process memory footprint. """ import os import psutil pid = os.getpid() py = psutil.Process(pid) coeff = 1 if units == 'GB': coeff = 2.**30 elif units == 'MB': coeff = 2.**20 elif units == 'KB': coeff = 2.**10 elif units == 'B': coeff = 2.**0.0 elif units == 'b': coeff = 1.0/8.0 memoryUse = py.memory_info()[0]/coeff # memory use in GB...I think if verbose: print('memory use:', memoryUse) else: pass return memoryUse
[docs]def does_filename_have_counterpart(src_path,dst_root = None, counterpart_extension = ''): """ checks if the 'src_path' has counterpart with extension 'counterpart_extension'. Parameters ---------- filename (string) counterpart_extension (string) Returns ------- boolean (boolean) """ import os src_root, src_name = os.path.split(src_path) if dst_root is None: dst_root, dst_name = os.path.split(src_path) splitted = src_name.split('.') src_base = splitted[0] src_extension = ''.join(splitted[1:]) counterpart = os.path.join(dst_root,src_base + counterpart_extension) flag = os.path.exists(counterpart) return flag
[docs]def read_config_file(filename): """ read yaml config file Parameters ---------- filename (string) Returns ------- dict (dictionary) boolean (boolean) """ import yaml import os flag = os.path.isfile(filename) if flag: with open(filename,'r') as handle: config = yaml.safe_load(handle.read()) # (2) else: config = {} return config, flag