import os.path from cwltool.main import setup_loadingContext, load_job_order, init_job_order from cwltool.context import RuntimeContext from cwltool.argparser import arg_parser from cwltool.load_tool import resolve_tool_uri, make_tool, fetch_document, resolve_and_validate_document from cwltool.flatten import flatten import argcomplete import sys # import logging # _logger = logging.getLogger("cwltool") my_args = [ '/home/zcy/yy.cwl', '/home/zcy/custom-types.yml'] # my_args = ['/home/zcy/hello_world.cwl', '--message=zcy'] my_args = ['/home/zcy/bandage.cwl', '/home/zcy/aa.yml'] my_args = ['/home/zcy/bandage.cwl', '/home/zcy/output.json'] #my_args = ['/home/zcy/hello_world.cwl', '/home/zcy/hh.yml'] my_args = ['/home/zcy/stdout.cwl', '/home/zcy/echo-job.yml'] my_args = ['/home/zcy/other/bedtools_bamtobed_1.cwl', '/home/zcy/other/zzz.yml'] parser = arg_parser() argcomplete.autocomplete(parser) args = parser.parse_args(my_args) runtimeContext = RuntimeContext(vars(args)) loadingContext = setup_loadingContext(None, runtimeContext, args) # 'file:///home/zcy/yy.cwl' 'file:///home/zcy/yy.cwl' ,so tool_file_uri can be "" or 手动拼 # uri, tool_file_uri = resolve_tool_uri( # args.workflow, # resolver=loadingContext.resolver, # fetcher_constructor=loadingContext.fetcher_constructor, # ) uri = 'file://' + args.workflow tool_file_uri = uri job_order_object, input_basedir, jobloader = load_job_order( args, sys.stdin, loadingContext.fetcher_constructor, loadingContext.overrides_list, tool_file_uri, ) loadingContext, workflowobj, uri = fetch_document(uri, loadingContext) loadingContext, uri = resolve_and_validate_document( loadingContext, workflowobj, uri, preprocess_only=(args.print_pre or args.pack), ) tool = make_tool(uri, loadingContext) stdout = sys.stdout # cast(IO[str], stdout) todo stdout = None input_required = True # todo 会随着输入变化吗 initialized_job_order_object = init_job_order( job_order_object, args, tool, jobloader, stdout, print_input_deps=args.print_input_deps, relative_deps=args.relative_deps, make_fs_access=runtimeContext.make_fs_access, input_basedir=input_basedir, secret_store=runtimeContext.secret_store, input_required=input_required, runtime_context=runtimeContext, ) def output_callback(out, process_status: str): pass jobiter = tool.job(initialized_job_order_object, output_callback, runtimeContext) for job in jobiter: if job is not None: print(job.command_line) print(job.builder.bindings) my_job = job.builder.job for k,v in my_job.items(): if isinstance(v, dict): if v.get('class') in ['File', 'Directory']: v_location = v['location'][7:] v['path'] = v_location v['dirname'] = os.path.dirname(v_location) my_bindings = job.builder.bindings for rd in my_bindings: if isinstance(rd['datum'], dict): if rd['datum'].get('class') in ['File', 'Directory']: rd_location = rd['datum']['location'][7:] rd['datum']['path'] = rd_location rd['datum']['dirname'] = os.path.dirname(rd_location) command_line = flatten(list(map(job.builder.generate_arg, my_bindings))) print(command_line)
标签:commandline,args,uri,job,cwltool,CWL,home,my,zcy From: https://www.cnblogs.com/testzcy/p/18346531