Skip to content
Snippets Groups Projects
Select Git revision
2 results Searching

NDC_CaveSmall.ndisplay

Blame
  • driver.py 10.19 KiB
    import os
    import re
    import sys
    import time
    
    import variableHandle as vh
    #import authmanager as auth
    import JWTManager as jwt
    import JSONManager as man
    import JSONTest as test
    import ConfigManager as conf
    import subprocess
    import stat
    
    import random
    import string
    
    
    def get_random_string(length):
        # choose from all lowercase letter
        letters = string.ascii_letters
        result_str = ''.join(random.choice(letters) for i in range(length))
        return result_str
    
    
    argv = sys.argv
    
    name = 'Custom_Driver'
    version = '0.1.1'
    
    account = ""
    user_path = ""
    runner_path = ""
    down_scoping = True
    
    
    # generates the path to the build directory
    def get_build_path():
        CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID = os.getenv("CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID")
        CUSTOM_ENV_CI_PROJECT_PATH_SLUG = os.getenv("CUSTOM_ENV_CI_PROJECT_PATH_SLUG")
        HOME = f"/home/{account}"
        if not down_scoping:
            HOME = os.getenv("HOME")
        build_path = f'{HOME}/{user_path}/builds/{CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID}/{CUSTOM_ENV_CI_PROJECT_PATH_SLUG}'
        return str(build_path)
    
    
    def get_cache_path():
        CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID = os.getenv("CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID")
        CUSTOM_ENV_CI_PROJECT_PATH_SLUG = os.getenv("CUSTOM_ENV_CI_PROJECT_PATH_SLUG")
        HOME = f"/home/{account}"
        if not down_scoping:
            HOME = os.getenv("HOME")
        cache_path = f'{HOME}/{user_path}/cache/{CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID}/{CUSTOM_ENV_CI_PROJECT_PATH_SLUG}'
        return str(cache_path)
    
    
    # generates the path to the cloned repository with regards to the build directory
    def get_clone_path():
        clone_path = os.getenv('CUSTOM_ENV_CI_PROJECT_DIR')
        return clone_path
    
    def get_node_id_str(vars):
        if vars and len(vars):
            return ''.join([os.getenv(v).replace('/', '') for v in vars])
        return 'Sequential'
    
    def get_env_for_single_slurm_job(vars):
        return {v.replace('CUSTOM_ENV_PARVAR_', '') : os.getenv(v) for v in vars}
    
    def get_jobid_from_file(path):
        with open(path, 'r') as node_index_fp:
            return node_index_fp.readline().strip()
    
    def handle():
        global user_path
        global runner_path
        global account
        global down_scoping
    
        # read config file
        config = conf.read_config()
        user_path = config["user_path"]
        key_path = config["key_path"]
        runner_path = config["runner_path"]
        map_path = config["map_path"]
        down_scoping = config["down_scoping"]
    
        if down_scoping:
            #test.create_testset()
            token = os.getenv('CUSTOM_ENV_CI_JOB_JWT')
            url = os.getenv('CUSTOM_ENV_CI_SERVER_URL')
    
            # get uid and pid from JWT
            uid, pid = jwt.get_UID_PID(token, f"{url}/-/jwks")
    
            # get account from mapping file
            account = man.get_account(url, pid, uid, key_path, map_path)
    
            if account is None:
                print(f"Error: no mapping for GitLab project: {os.getenv('CUSTOM_ENV_CI_PROJECT_NAME')}, or GitLab user: {os.getenv('CUSTOM_ENV_GITLAB_USER_NAME')} available. Please register CI support for to acces the Runner")
    
        if len(argv) < 2:
            print("Error: no argument")
            exit(1)
        if argv[1] == 'config':  # Do not use print in this step
            os.system(f'mkdir -p {runner_path}/scripts')
            os.system(f'mkdir -p {runner_path}/errorCodes')
            os.system(f'mkdir -p {runner_path}/jobIds')
            os.system(f'chmod +x {runner_path}/sshRunstep.sh')
            os.system(f'chmod +x {runner_path}/singularityRunstep.sh')
            os.system(f'dos2unix {runner_path}/sshRunstep.sh')
            os.system(f'dos2unix {runner_path}/singularityRunstep.sh')
    
            handle_config(get_build_path(), get_cache_path(), name, version)
        elif argv[1] == 'prepare':
            handle_prepare()
        elif argv[1] == 'run':
            handle_run()
        elif argv[1] == 'cleanup':
            handle_cleanup()
        else:
            print('Error')
    
    
    def handle_config(build_dir, cache_dir, driver_name, driver_version):
        builder = "{ \"builds_dir\": \""
        builder += build_dir
        builder += "\",  \"cache_dir\": \""
        builder += cache_dir
        builder += "\",  \"builds_dir_is_shared\": true,   \"hostname\": \"custom-hostname\",  \"driver\": { \"name\": \""
        builder += driver_name
        builder += "\",    \"version\": \""
        builder += driver_version
        builder += "\"  },  \"job_env\" : {    \"CUSTOM_ENVIRONMENT\": \"example\"  }}"
    
        # print(builder, file=sys.stderr)
        print(builder)
    
    
    def handle_prepare():
        os.system('hostname')
        print(os.getenv('CUSTOM_ENV_CI_PROJECT_PATH'))
        print(os.getenv('CUSTOM_ENV_GITLAB_USER_NAME'))
    
    def handle_run():
        #Setup CI scripts
        script_hash = get_random_string(8)
        os.system(f'cp {argv[2]} {runner_path}/scripts/script{script_hash}')
        os.system(f'chmod +x {runner_path}/scripts/script{script_hash}')
        os.system(f'dos2unix {runner_path}/scripts/script{script_hash}')
    
        mode, container, script = vh.get_CI_mode()
        custom_env = dict()
        command_wrapper_ds = []
        exec_command = []
    
        # Handle different modes
        if mode == 'local': #Debugging mode
            print("local mode only for development.")
            exit(1)
            os.system(f"chmod -R 777 {runner_path}")
            # auth.run_task(USER, f'{runner_path}/scripts/script{script_hash}  {argv[3]}')
            command_wrapper_ds = f"sudo su --shell /bin/bash --login {account} -c".split()
            exec_command = f"{runner_path}/scripts/script{script_hash} {argv[3]}"
            vh.set_slurm_env()
    
        elif argv[3] == 'build_script' or argv[3] == 'step_script':
            Slurm_vars = vh.get_slurm_variables()
            exec_command = []
    
            if mode == 'Batch': # Handle Batch scripts
                # Parse parameters from Batchscript
                file = open(f'{get_clone_path()}/{script}', 'r')
                batch_parameters = []
                for line in file.readlines():
                    if line.startswith('#SBATCH'):
                        batch_parameters.append(line.split()[1])
                file.close()
                #Define Batchscript run
                exec_command += ['srun'] + batch_parameters + [f'{get_clone_path()}/{script}']
                print('Warning: The contents of the script section in the CI definition '
                      'will be ignored in the batch mode. If you want to work on the results '
                      'please create additional stages and connect them via artifacts.')
            elif mode == 'SingleSlurmJobAcrossStages':
                print(f'END_SINGLE_SLURM_JOB: {os.getenv("CUSTOM_ENV_END_SINGLE_SLURM_JOB")}')
                job_id = None
                id_vars = [v for v in os.environ if v.startswith('CUSTOM_ENV_PARVAR_')]
                os.makedirs(f'{runner_path}/jobIds/{os.getenv("CUSTOM_ENV_CI_PIPELINE_ID")}/', exist_ok=True)
                node_index_file = f'{runner_path}/jobIds/{os.getenv("CUSTOM_ENV_CI_PIPELINE_ID")}/{get_node_id_str(id_vars)}.txt'
    
                if not os.path.isfile(node_index_file):
                    salloc_command = ['salloc', '--no-shell', '--job-name=CI']
                    for x in Slurm_vars:
                        salloc_command += [f'{x[0]}{x[1]}']
                    salloc_out = subprocess.run(salloc_command, text=True, stdout=subprocess.PIPE,
                                                stderr=subprocess.STDOUT).stdout
                    print(salloc_out)
                    job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1)
                    with open(node_index_file, 'w+') as node_index_fp:
                        node_index_fp.write(job_id + '\n')
                    print(f'Starting new slurm job, could not find {node_index_file}')
                else:
                    job_id = get_jobid_from_file(node_index_file)
                    print(f'Using slurm job {job_id}, could find {node_index_file}')
    
                exec_command += ['srun',  f'--jobid={job_id}', '--job-name=CI']
                for x in Slurm_vars:
                    exec_command += [f'{x[0]}{x[1]}']
                exec_command += [f'{runner_path}/scripts/script{script_hash}', 'step_script']
                custom_env = get_env_for_single_slurm_job(id_vars)
                custom_env['SLURM_JOB_ID'] = job_id
            else:
                # Define Slurm parameters
                exec_command += ['srun', '--job-name=CI']
                for x in Slurm_vars:
                    exec_command += [f'{x[0]}{x[1]}']
    
                # Handle Slurm shell and singularity shell environment 
                if mode == "Slurm":
                    exec_command += [f'{runner_path}/scripts/script{script_hash}', 'step_script']
                elif mode == "Singularity":
                    if os.path.exists(container):
                        container = f'{get_clone_path()}/{script}'
                        exec_command += [f'{runner_path}/singularityLocalRunstep.sh',
                                    f'{get_clone_path()}/{container}', script_hash]
                    else:
                        exec_command += [f'{runner_path}/singularityRunstep.sh', container, script_hash]
    
            exec_command = ' '.join(exec_command)
            #print(exec_command)
            command_wrapper_ds = f"sudo su --shell /bin/bash --login {account} -c ".split()
    
        else: #run small scripts on local machine
            command_wrapper_ds = f"sudo su --shell /bin/bash --login {account} -c ".split()
            exec_command = f'{runner_path}/scripts/script{script_hash} {argv[3]}'
    
        command_wrapper_ds.append(f"{exec_command}")
    
        # check for downscoping
        command = command_wrapper_ds
        if not down_scoping:
            command = exec_command.split()
    
        # Run command
        print(command)
        cmd_ret = subprocess.run(command, env=dict(os.environ, **{x: custom_env[x] for x in custom_env}))
        return_code = cmd_ret.returncode
        os.remove(f'{runner_path}/scripts/script{script_hash}')
    
        if int(return_code) != 0:
            exit(1)
        else:
            exit(0)
    
    def handle_cleanup():
        if os.getenv('CUSTOM_ENV_END_SINGLE_SLURM_JOB') == '1':
            id_vars = [v for v in os.environ if v.startswith('CUSTOM_ENV_PARVAR_')]
            node_index_file = f'{runner_path}/jobIds/{os.getenv("CUSTOM_ENV_CI_PIPELINE_ID")}/{get_node_id_str(id_vars)}.txt'
            subprocess.run(['scancel', f'{get_jobid_from_file(node_index_file)}'])
            try:
                os.remove(node_index_file)
            except FileNotFoundError:
                pass
           # Cleanup the directory with the jobIds of the whole pipeline
            try:
                os.rmdir(f'{runner_path}/jobIds/{os.getenv("CUSTOM_ENV_CI_PIPELINE_ID")}')
            except (FileNotFoundError, OSError) as e:
                pass
    
    handle()