Select Git revision
driver.py 10.19 KiB
import os
import re
import sys
import time
import variableHandle as vh
#import authmanager as auth
import JWTManager as jwt
import JSONManager as man
import JSONTest as test
import ConfigManager as conf
import subprocess
import stat
import random
import string
def get_random_string(length):
# choose from all lowercase letter
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str
argv = sys.argv
name = 'Custom_Driver'
version = '0.1.1'
account = ""
user_path = ""
runner_path = ""
down_scoping = True
# generates the path to the build directory
def get_build_path():
CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID = os.getenv("CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID")
CUSTOM_ENV_CI_PROJECT_PATH_SLUG = os.getenv("CUSTOM_ENV_CI_PROJECT_PATH_SLUG")
HOME = f"/home/{account}"
if not down_scoping:
HOME = os.getenv("HOME")
build_path = f'{HOME}/{user_path}/builds/{CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID}/{CUSTOM_ENV_CI_PROJECT_PATH_SLUG}'
return str(build_path)
def get_cache_path():
CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID = os.getenv("CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID")
CUSTOM_ENV_CI_PROJECT_PATH_SLUG = os.getenv("CUSTOM_ENV_CI_PROJECT_PATH_SLUG")
HOME = f"/home/{account}"
if not down_scoping:
HOME = os.getenv("HOME")
cache_path = f'{HOME}/{user_path}/cache/{CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID}/{CUSTOM_ENV_CI_PROJECT_PATH_SLUG}'
return str(cache_path)
# generates the path to the cloned repository with regards to the build directory
def get_clone_path():
clone_path = os.getenv('CUSTOM_ENV_CI_PROJECT_DIR')
return clone_path
def get_node_id_str(vars):
if vars and len(vars):
return ''.join([os.getenv(v).replace('/', '') for v in vars])
return 'Sequential'
def get_env_for_single_slurm_job(vars):
return {v.replace('CUSTOM_ENV_PARVAR_', '') : os.getenv(v) for v in vars}
def get_jobid_from_file(path):
with open(path, 'r') as node_index_fp:
return node_index_fp.readline().strip()
def handle():
global user_path
global runner_path
global account
global down_scoping
# read config file
config = conf.read_config()
user_path = config["user_path"]
key_path = config["key_path"]
runner_path = config["runner_path"]
map_path = config["map_path"]
down_scoping = config["down_scoping"]
if down_scoping:
#test.create_testset()
token = os.getenv('CUSTOM_ENV_CI_JOB_JWT')
url = os.getenv('CUSTOM_ENV_CI_SERVER_URL')
# get uid and pid from JWT
uid, pid = jwt.get_UID_PID(token, f"{url}/-/jwks")
# get account from mapping file
account = man.get_account(url, pid, uid, key_path, map_path)
if account is None:
print(f"Error: no mapping for GitLab project: {os.getenv('CUSTOM_ENV_CI_PROJECT_NAME')}, or GitLab user: {os.getenv('CUSTOM_ENV_GITLAB_USER_NAME')} available. Please register CI support for to acces the Runner")
if len(argv) < 2:
print("Error: no argument")
exit(1)
if argv[1] == 'config': # Do not use print in this step
os.system(f'mkdir -p {runner_path}/scripts')
os.system(f'mkdir -p {runner_path}/errorCodes')
os.system(f'mkdir -p {runner_path}/jobIds')
os.system(f'chmod +x {runner_path}/sshRunstep.sh')
os.system(f'chmod +x {runner_path}/singularityRunstep.sh')
os.system(f'dos2unix {runner_path}/sshRunstep.sh')
os.system(f'dos2unix {runner_path}/singularityRunstep.sh')
handle_config(get_build_path(), get_cache_path(), name, version)
elif argv[1] == 'prepare':
handle_prepare()
elif argv[1] == 'run':
handle_run()
elif argv[1] == 'cleanup':
handle_cleanup()
else:
print('Error')
def handle_config(build_dir, cache_dir, driver_name, driver_version):
builder = "{ \"builds_dir\": \""
builder += build_dir
builder += "\", \"cache_dir\": \""
builder += cache_dir
builder += "\", \"builds_dir_is_shared\": true, \"hostname\": \"custom-hostname\", \"driver\": { \"name\": \""
builder += driver_name
builder += "\", \"version\": \""
builder += driver_version
builder += "\" }, \"job_env\" : { \"CUSTOM_ENVIRONMENT\": \"example\" }}"
# print(builder, file=sys.stderr)
print(builder)
def handle_prepare():
os.system('hostname')
print(os.getenv('CUSTOM_ENV_CI_PROJECT_PATH'))
print(os.getenv('CUSTOM_ENV_GITLAB_USER_NAME'))
def handle_run():
#Setup CI scripts
script_hash = get_random_string(8)
os.system(f'cp {argv[2]} {runner_path}/scripts/script{script_hash}')
os.system(f'chmod +x {runner_path}/scripts/script{script_hash}')
os.system(f'dos2unix {runner_path}/scripts/script{script_hash}')
mode, container, script = vh.get_CI_mode()
custom_env = dict()
command_wrapper_ds = []
exec_command = []
# Handle different modes
if mode == 'local': #Debugging mode
print("local mode only for development.")
exit(1)
os.system(f"chmod -R 777 {runner_path}")
# auth.run_task(USER, f'{runner_path}/scripts/script{script_hash} {argv[3]}')
command_wrapper_ds = f"sudo su --shell /bin/bash --login {account} -c".split()
exec_command = f"{runner_path}/scripts/script{script_hash} {argv[3]}"
vh.set_slurm_env()
elif argv[3] == 'build_script' or argv[3] == 'step_script':
Slurm_vars = vh.get_slurm_variables()
exec_command = []
if mode == 'Batch': # Handle Batch scripts
# Parse parameters from Batchscript
file = open(f'{get_clone_path()}/{script}', 'r')
batch_parameters = []
for line in file.readlines():
if line.startswith('#SBATCH'):
batch_parameters.append(line.split()[1])
file.close()
#Define Batchscript run
exec_command += ['srun'] + batch_parameters + [f'{get_clone_path()}/{script}']
print('Warning: The contents of the script section in the CI definition '
'will be ignored in the batch mode. If you want to work on the results '
'please create additional stages and connect them via artifacts.')
elif mode == 'SingleSlurmJobAcrossStages':
print(f'END_SINGLE_SLURM_JOB: {os.getenv("CUSTOM_ENV_END_SINGLE_SLURM_JOB")}')
job_id = None
id_vars = [v for v in os.environ if v.startswith('CUSTOM_ENV_PARVAR_')]
os.makedirs(f'{runner_path}/jobIds/{os.getenv("CUSTOM_ENV_CI_PIPELINE_ID")}/', exist_ok=True)
node_index_file = f'{runner_path}/jobIds/{os.getenv("CUSTOM_ENV_CI_PIPELINE_ID")}/{get_node_id_str(id_vars)}.txt'
if not os.path.isfile(node_index_file):
salloc_command = ['salloc', '--no-shell', '--job-name=CI']
for x in Slurm_vars:
salloc_command += [f'{x[0]}{x[1]}']
salloc_out = subprocess.run(salloc_command, text=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT).stdout
print(salloc_out)
job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1)
with open(node_index_file, 'w+') as node_index_fp:
node_index_fp.write(job_id + '\n')
print(f'Starting new slurm job, could not find {node_index_file}')
else:
job_id = get_jobid_from_file(node_index_file)
print(f'Using slurm job {job_id}, could find {node_index_file}')
exec_command += ['srun', f'--jobid={job_id}', '--job-name=CI']
for x in Slurm_vars:
exec_command += [f'{x[0]}{x[1]}']
exec_command += [f'{runner_path}/scripts/script{script_hash}', 'step_script']
custom_env = get_env_for_single_slurm_job(id_vars)
custom_env['SLURM_JOB_ID'] = job_id
else:
# Define Slurm parameters
exec_command += ['srun', '--job-name=CI']
for x in Slurm_vars:
exec_command += [f'{x[0]}{x[1]}']
# Handle Slurm shell and singularity shell environment
if mode == "Slurm":
exec_command += [f'{runner_path}/scripts/script{script_hash}', 'step_script']
elif mode == "Singularity":
if os.path.exists(container):
container = f'{get_clone_path()}/{script}'
exec_command += [f'{runner_path}/singularityLocalRunstep.sh',
f'{get_clone_path()}/{container}', script_hash]
else:
exec_command += [f'{runner_path}/singularityRunstep.sh', container, script_hash]
exec_command = ' '.join(exec_command)
#print(exec_command)
command_wrapper_ds = f"sudo su --shell /bin/bash --login {account} -c ".split()
else: #run small scripts on local machine
command_wrapper_ds = f"sudo su --shell /bin/bash --login {account} -c ".split()
exec_command = f'{runner_path}/scripts/script{script_hash} {argv[3]}'
command_wrapper_ds.append(f"{exec_command}")
# check for downscoping
command = command_wrapper_ds
if not down_scoping:
command = exec_command.split()
# Run command
print(command)
cmd_ret = subprocess.run(command, env=dict(os.environ, **{x: custom_env[x] for x in custom_env}))
return_code = cmd_ret.returncode
os.remove(f'{runner_path}/scripts/script{script_hash}')
if int(return_code) != 0:
exit(1)
else:
exit(0)
def handle_cleanup():
if os.getenv('CUSTOM_ENV_END_SINGLE_SLURM_JOB') == '1':
id_vars = [v for v in os.environ if v.startswith('CUSTOM_ENV_PARVAR_')]
node_index_file = f'{runner_path}/jobIds/{os.getenv("CUSTOM_ENV_CI_PIPELINE_ID")}/{get_node_id_str(id_vars)}.txt'
subprocess.run(['scancel', f'{get_jobid_from_file(node_index_file)}'])
try:
os.remove(node_index_file)
except FileNotFoundError:
pass
# Cleanup the directory with the jobIds of the whole pipeline
try:
os.rmdir(f'{runner_path}/jobIds/{os.getenv("CUSTOM_ENV_CI_PIPELINE_ID")}')
except (FileNotFoundError, OSError) as e:
pass
handle()