Source code for CPAC.utils.test_init

# CPAC/utils/test_init.py
#
# Contributing authors (please append):
# Daniel Clark
# Jon Clucas
'''
This module contains functions that assist in initializing CPAC
tests resources
'''
from typing import Optional

from nipype.interfaces.utility import IdentityInterface

from CPAC.pipeline.nipype_pipeline_engine import Node
from CPAC.utils.typing import LIST


[docs]def create_dummy_node(name: str, fields: Optional[LIST[str]] = None): """ Create a dummy IdentityInterface Node source for resources upstream in a graph from a section to be tested Parameters ---------- name : str a name for the dummy Node fields : list of str, optional a list of resources to be present in the created Node. If not provided, the only resource will be called 'resource' Returns ------- Node """ if fields is None: fields = ['resource'] return Node(IdentityInterface(fields=fields), name=name)
# Return tests data config file
[docs]def populate_template_config(config_type: str) -> str: ''' Function to read in a template config file from the CPAC_RESOURCE_DIR and populate it with actual filepaths Parameters ---------- config_type : string config file to populate; accepts 'data_config' and 'pipeline_config' Returns ------- config_test : string filepath to the newly written config file for testing ''' # Import packages import os # Init variables resource_dir = return_resource_dir() templates_dir = return_resource_subfolder('templates') yamls = ['data_config', 'pipeline_config'] # Check config type and build path if config_type in yamls: ext = '.yml' out_name = 'configs' else: # Check if it's supported, otherwise raise an Exception err_msg = 'config_type parameter: %s is unsupported' % config_type raise Exception(err_msg) # Get template and output paths template_path = os.path.join(templates_dir, config_type + ext) output_dir = return_resource_subfolder(out_name) output_path = os.path.join(output_dir, config_type + ext) # Open the files tmp_f = open(template_path, 'r') out_f = open(output_path, 'w') # Replace 'RESOURCE_DIR' string with actual directory for line in tmp_f: out_f.write(line.replace('RESOURCE_DIR', resource_dir)) # Close file objects tmp_f.close() out_f.close() # Return filepath return output_path
# Populate all of the template paths
[docs]def populate_all_templates(): ''' Function to populate all of the template files Parameters ---------- None Returns ------- None ''' # Import packages # Init variables outputs = [] config_types = ['data_config', 'pipeline_config', 'centrality_spec', 'map_spec', 'mask_spec', 'roi_spec', 'seed_spec', 'spatial_maps_spec'] # Populate all of the config templates with actual paths for config_type in config_types: output = populate_template_config(config_type) outputs.append(output) # Check that they all returned a value if len(outputs) == len(config_types): print('Successfully populated and saved templates!') else: err_msg = 'Something went wrong during template population' raise Exception(err_msg)
# Get the AWS credentials
[docs]def return_aws_creds(): ''' Function to return the AWS credentials file given by the CPAC_AWS_CREDS environment variable Parameters ---------- None Returns ------- aws_creds : string filepath to the AWS credentials with access key id and secret access key ''' # Import packages import os # Init variables creds_path = os.getenv('CPAC_AWS_CREDS') # Check if set if not creds_path: err_msg = 'CPAC_AWS_CREDS environment variable not set!\n' \ 'Set this to the filepath location of your AWS credentials.' print(err_msg) creds_path = input('Enter path to AWS credentials file: ') else: return creds_path
# Get the default test bucket name
[docs]def default_bucket_name(): ''' Function to return the default S3 bucket name used in test suite Parameters ---------- None Returns ------- bucket_name : string default S3 bucket name for testing ''' # Set default bucket name bucket_name = 'fcp-indi' # Return bucket name return bucket_name
# Grab all nifti files within directory
[docs]def return_all_niis(base_dir): ''' Function to walk through a base directory and all subsequent files and return the filepaths of all nifti files found Parameters ---------- base_dir : string filepath to the base directory to search through Returns ------- nii_list : list a list of filepath strings of the nifti files found in base_dir ''' # Import packages import os # Init variables nii_list = [] # Collect computed outputs for root, dirs, files in os.walk(base_dir): if files: nii_list.extend([os.path.join(root, file) for file in files \ if file.endswith('.nii.gz')]) # Return the list of files return nii_list
# Download the CPAC resource dir from S3
[docs]def download_cpac_resources_from_s3(local_base): ''' Function to download the CPAC testing resources directory from S3 Parameters ---------- local_base : string the local directory to save the 'cpac_resources' contents ''' # Import packages import os from indi_aws import aws_utils, fetch_creds # Init variables bucket_name = default_bucket_name() resource_folder = 'cpac_resources' s3_prefix = os.path.join('data/test_resources', resource_folder) # Get bucket object bucket = fetch_creds.return_bucket(None, bucket_name) # Gather files from bucket for obj in bucket.objects.filter(Prefix=s3_prefix): bkey = obj.key # If the object is just a folder, move on to next object if bkey.endswith('/'): continue # Form local path from key local_path = os.path.join(local_base, bkey.split(resource_folder)[-1].lstrip('/')) # Make download directories local_dir = os.path.dirname(local_path) if not os.path.exists(local_dir): os.makedirs(local_dir) # Download file if it doesn't exist if not os.path.exists(local_path): bucket.download_file(bkey, local_path, Callback=aws_utils.ProgressPercentage(obj)) # Print done print('CPAC resources folder in %s is complete!' % local_base)
# Look for CPAC_RESOURCE_DIR to be in environment
[docs]def return_resource_dir(): ''' Function to return the filepath of the CPAC_RESOURCE_DIR; note the CPAC_RESOURCE_DIR environment variable must be set Parameters ---------- None Returns ------- resource_dir : string the file path on disk where the cpac resources folder is ''' # Import packages import os # Init variables resource_dir = os.getenv('CPAC_RESOURCE_DIR') # Check if set if not resource_dir: # Print notification of cpac resources directory print_msg = 'CPAC_RESOURCE_DIR environment variable not set! Enter '\ 'directory of the cpac_resources folder.\n\n*If the folder '\ 'does not exist, it will be downloaded under the directory '\ 'specified.' print(print_msg) # Get user input resource_dir = input('Enter C-PAC resources directory: ') # Check and download any new or missing resources from S3 copy try: download_cpac_resources_from_s3(resource_dir) except Exception as exc: err_msg = 'There was a problem downloading the cpac_resources '\ 'folder from S3.\nError: %s' % exc raise Exception(err_msg) return resource_dir
# Return any subfolder of the resource directory
[docs]def return_resource_subfolder(subfolder): ''' Funnction to return subfolders of the CPAC_RESOURCE_DIR Parameters ---------- subfolder : string subfolder name to return path of Returns ------- resource_subfolder : string filepath to the resource subfolder ''' # Import packages import os # Init variables resource_dir = return_resource_dir() in_settings = ['configs', 'creds', 'resources', 'subject_lists', 'templates'] # Check if its a sub-subfolder if subfolder in in_settings: resource_subfolder = os.path.join(resource_dir, 'settings', subfolder) else: resource_subfolder = os.path.join(resource_dir, subfolder) # Return subfolder return resource_subfolder
# Return test strategies obj file
[docs]def return_strats_obj(): ''' Function to return the file path of the strategies obj file from the CPAC_RESOURCE_DIR Parameters ---------- None Returns ------- strats_obj : string filepath to the strategies obj file ''' # Import packages import os # Init variables settings_dir = return_resource_subfolder('resources') # Get strategies obj strats_obj = os.path.join(settings_dir, 'strategies_test.obj') # Return filepath return strats_obj
# Return tests subject list
[docs]def return_subject_list(): ''' Function to return the file path of the subject list file from the CPAC_RESOURCE_DIR Parameters ---------- None Returns ------- subject_list : string filepath to the subject list yaml file ''' # Import packages import os # Init variables config_dir = return_resource_subfolder('subject_lists') # Get sublist subject_list = os.path.join(config_dir, 'CPAC_subject_list_test.yml') # Return filepath return subject_list
# Return the test subjects measure directories
[docs]def return_subj_measure_dirs(measure): ''' Function to grab the base directories of the test subject's output files for a given measure or workflow Parameters ---------- measure : string the measure or workflow or derivative of interest to parse for; this must be the folder name where all of the subject's test outputs are located (e.g. 'network_centrality') Returns ------- subj_measure_dirs : list a list of strings of the base directories for each instance of the desired measure folder within the test subjects outputs ''' # Import packages import glob import os # Init variables test_subj = return_test_subj() outputs_dir = return_resource_subfolder('output') # Root directories (cpac_resources/output/reg/subj_sess/scan/measure/..) subj_measure_dirs = \ glob.glob(os.path.join(outputs_dir, '*', '%s*' % test_subj, '*', measure)) # Check to see if the directories exist if len(subj_measure_dirs) == 0: err_msg = 'Unable to find any subject directories for the %s measure.' \ % measure raise Exception(err_msg) # Return base directories for test measures outputs return subj_measure_dirs
# Get subject for individual tests
[docs]def return_test_subj(): ''' Function to return the subject id; note the CPAC_RESOURCE_DIR environment variable must be set Parameters ---------- None Returns ------- resource_dir : string the file path on disk where the cpac resources folder is ''' # Import packages import os # Init variables test_subj = os.getenv('CPAC_TEST_SUBJ') # Get cpac resource directory and get a list of subject folders input_dir = return_resource_subfolder('input') site_dir = os.path.join(input_dir, 'site_1') # Get list of subject directories subs = os.listdir(site_dir) # Check if set and exists if not test_subj: info_msg = 'CPAC_TEST_SUBJ environment variable not set!' print(info_msg) # Get user input test_subj = input('Enter C-PAC benchmark test subject id: ') # Check to make sure their input files exist if test_subj not in subs: err_msg = 'Test subject %s is not in the cpac_resources subject ' \ 'directory %s. Please specify different CPAC_TEST_SUBJ.' \ %(test_subj, site_dir) raise Exception(err_msg) else: return test_subj
# Smooth nifti file
[docs]def smooth_nii_file(self, nii_file, fwhm, mask_file=None): ''' Function to Gaussian smooth nifti files and optionally using a mask on the smoothed data Parameters ---------- nii_file : string filepath to the nifti file to smooth fwhm : float FWHM for Gaussian smoothing kernel, in mm mask_file : string (optional); default=None filepath to the mask file to use Returns ------- smooth_arr : numpy.ndarray smoothed nifti image as a numpy array ''' # Import packages import nibabel as nib import numpy as np import scipy.ndimage # Init variables raw_nii = nib.load(nii_file) raw_arr = raw_nii.get_fdata() # Check parameters if mask_file: mask_arr = nib.load(mask_file).get_fdata() # Check the mask shape matches the raw nifti if mask_arr.shape != raw_arr.shape: err_msg = 'Mask file has different dimensions than nifti.\n' \ 'Check the paths are correct and try again.' raise Exception(err_msg) # Calculate sigma for smoothing mm_res = np.abs(raw_nii.affine[0][0]) sigma = fwhm/2.3548/mm_res # Smooth input smooth_arr = scipy.ndimage.gaussian_filter(raw_arr, sigma, order=0) # And mask if using one (this writes it to a 1d array) if mask_arr: smooth_out = smooth_arr[mask_arr.astype('bool')] smooth_arr = np.zeros(mask_arr.shape, dtype=float) # Get mask coordinates and populate smoothed image coords = np.argwhere(mask_arr) for idx, xyz in enumerate(coords): x, y, z = xyz smooth_arr[x, y, z] = smooth_out[idx] # Return the smoothed array return smooth_arr
# Download test resource from S3 bucket
[docs]def download_resource_from_s3(s3_url_path): ''' ''' # Import packages import os import tempfile import urllib.request, urllib.parse, urllib.error # Init variables temp_dir = tempfile.mkdtemp() url_open = urllib.request.URLopener() base_name = os.path.basename(s3_url_path) dl_path = os.path.join(temp_dir, base_name) # Download file url_open.retrieve(s3_url_path, dl_path) # Return the downloaded path return dl_path
# Setup log file
[docs]def setup_test_logger(logger_name, log_file, level, to_screen=False): ''' Function to initialize and configure a logger that can write to file and (optionally) the screen. Parameters ---------- logger_name : string name of the logger log_file : string file path to the log file on disk level : integer indicates the level at which the logger should log; this is controlled by integers that come with the python logging package. (e.g. logging.INFO=20, logging.DEBUG=10) to_screen : boolean (optional) flag to indicate whether to enable logging to the screen Returns ------- logger : logging.Logger object Python logging.Logger object which is capable of logging run- time information about the program to file and/or screen ''' # Import packages import logging from CPAC.utils.monitoring.custom_logging import getLogger # Init logger, formatter, filehandler, streamhandler logger = getLogger(logger_name) logger.setLevel(level) formatter = logging.Formatter('%(asctime)s : %(message)s') # Write logs to file file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) logger.addHandler(file_handler) # Write to screen, if desired if to_screen: stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) # Return the logger return logger
[docs]def pearson_correlation(nii_1, nii_2): import nibabel as nb import numpy as np data_1 = nb.load(nii_1).get_fdata() data_2 = nb.load(nii_2).get_fdata() R = np.corrcoef(data_1.flatten(), data_2.flatten()) return(R[0,1])
# Calculate concordance correlation coefficient
[docs]def concordance(x, y): ''' Return the concordance correlation coefficient as defined by Lin (1989) Parameters ---------- x : list or array a list of array of length N of numbers y : list or array a list of array of length N of numbers Returns ------- rho_c : numpy.float32 the concordance value as a float ''' # Import packages import numpy as np # Usage errors check x_shape = np.shape(x) y_shape = np.shape(y) if len(x_shape) != 1 or len(y_shape) != 1: err_msg = 'Inputs must be 1D lists or arrays.' raise ValueError(err_msg) elif x_shape != y_shape: err_msg = 'Length of the two inputs must be equal.\n'\ 'Length of x: %d\nLength of y: %d' % (len(x), len(y)) raise ValueError(err_msg) # Init variables x_arr = np.array(x).astype('float64') y_arr = np.array(y).astype('float64') # Get pearson correlation rho = np.corrcoef(x_arr, y_arr)[0][1] # Get stdevs sigma_x = np.std(x_arr) sigma_y = np.std(y_arr) # Get means mu_x = np.mean(x_arr) mu_y = np.mean(y_arr) # Comput condordance rho_c = (2*rho*sigma_x*sigma_y) /\ (sigma_x**2 + sigma_y**2 + (mu_x-mu_y)**2) # Return variables return rho_c