Source code for CPAC.utils.test_resources



[docs]def setup_test_wf(s3_prefix, paths_list, test_name, workdirs_to_keep=None): """Set up a basic template Nipype workflow for testing single nodes or small sub-workflows. """ import os import shutil from CPAC.pipeline import nipype_pipeline_engine as pe from CPAC.utils.datasource import check_for_s3 from CPAC.utils.interfaces.datasink import DataSink test_dir = os.path.join(os.getcwd(), test_name) work_dir = os.path.join(test_dir, "workdir") out_dir = os.path.join(test_dir, "output") if os.path.exists(out_dir): try: shutil.rmtree(out_dir) except: pass if os.path.exists(work_dir): for dirname in os.listdir(work_dir): if workdirs_to_keep: for keepdir in workdirs_to_keep: print("{0} --- {1}\n".format(dirname, keepdir)) if keepdir in dirname: continue try: shutil.rmtree(os.path.join(work_dir, dirname)) except: pass local_paths = {} for subpath in paths_list: s3_path = os.path.join(s3_prefix, subpath) local_path = check_for_s3(s3_path, dl_dir=test_dir) local_paths[subpath] = local_path wf = pe.Workflow(name=test_name) wf.base_dir = os.path.join(work_dir) wf.config['execution'] = { 'hash_method': 'timestamp', 'crashdump_dir': os.path.abspath(test_dir) } ds = pe.Node(DataSink(), name='sinker_{0}'.format(test_name)) ds.inputs.base_directory = out_dir ds.inputs.parameterization = True return (wf, ds, local_paths)