Select Git revision
Changelog.md
__init__.py 30.71 KiB
from abc import ABC, abstractmethod
import os
import subprocess
import re
import importlib
import sys
from filelock import FileLock
import json
import core.authentication.JSONManager as man
import logging
from core.utility import get_cenv
import core.utility as utility
VALID_MODES = ['Default', 'Slurm', 'Singularity', 'Batch', 'SingleSlurmJobAcrossStages', 'Sbatch']
DOWN_MODES = ['Singularity_Batch']
XLOCAL_MODES = ['SSH']
ALL_MODES = VALID_MODES + XLOCAL_MODES + DOWN_MODES
srun_path = "srun" # "/usr/local_host/bin/srun"
sbatch_path = "sbatch" # "/usr/local_host/bin/sbatch"
class ModeBase(ABC):
_run_script = ''
_run_properties = ''
_custom_env = dict()
slurm_jobid_file = None
_combiner_script = ''
run_properties_postfix = None
@staticmethod
def get_jobid_from_file(path):
with open(path, 'r') as node_index_fp:
return node_index_fp.readline().strip()
@property
def run_script(self):
return self._run_script
@property
def combiner_script(self):
return self._combiner_script
@property
def run_properties(self):
return self._run_properties
@abstractmethod
def get_combiner_script(self):
pass
@property
def custom_env(self):
return self._custom_env
def __init__(self, job):
self.job = job
@abstractmethod
def get_run_properties(self):
pass
@abstractmethod
def get_run_script(self):
pass
def has_post_run_script(self):
return False
def get_post_run_script(self):
return None
def custom_run_setup(self, **kwargs):
pass
def get_post_run_properties(self):
return None
def custom_prepare(self):
pass
def get_simple_script_exec(self):
return None
def inbetween_processing(self, main_proc=None):
pass
@staticmethod
def get_env_setup():
return ''
def get_run_properties_postfix(self):
pass
def cleanup_on_failure(self):
if not self.slurm_jobid_file or not os.path.isfile(self.slurm_jobid_file):
logging.debug(f'Skipping cleanup_on_failure due to missing slurm jobid file')
return
command = []
if self.job.down_scoping:
command = f"sudo -u {self.job.account} ".split()
logging.debug(f'cleanup_on_failure command: {command}')
scancel_out = self.job.execute(f'/opt/slurm/current/bin/scancel '
f'{ModeBase.get_jobid_from_file(self.slurm_jobid_file)}', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh", text=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
logging.debug(f'cleanup_on_failure result: {scancel_out}')
@abstractmethod
def cleanup(self):
if self.job.allow_failure:
return
cc_tmp = os.path.split(self.job.concurrent_tmp)[0]
do_cleanup = 0
for cc_id in os.listdir(cc_tmp):
pipeline_path = f'{cc_tmp}/{cc_id}/{self.job.pipeline_id}/stages/{get_cenv("CI_JOB_STAGE")}'
if not os.path.isdir(pipeline_path):
continue
for error_file in os.listdir(pipeline_path):
if not error_file.endswith('.json'):
continue
lock = FileLock(f'{pipeline_path}/{error_file}.lock')
with lock:
with open(f'{pipeline_path}/{error_file}', 'r+') as error_file_fd:
error_file_fd.seek(0)
error_codes = json.load(error_file_fd)
for jobid in error_codes:
if error_codes[jobid] == -1:
# do_cleanup = -1
return
elif error_codes[jobid] == 1:
do_cleanup = 1
if do_cleanup == 1 and self.job.shared_tmp and os.path.isdir(self.job.shared_tmp):
command = []
if self.job.down_scoping:
command = f"sudo -u {self.job.account} ".split()
command.extend(['/opt/slurm/current/bin/scancel', f'--name=CI_{self.job.pipeline_id}'])
logging.debug(f'doing cleanup shared_tmp={self.job.shared_tmp} command={command}')
scancel_out = subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
logging.debug(f'doing cleanup res={scancel_out}')
os.system(f'rm -r {self.job.shared_tmp}')
man.remove_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"))
try:
os.rmdir(f'{self.job.runner_path}/scripts/{self.job.pipeline_id}')
except (FileNotFoundError, OSError):
pass
def abort(self, error_str, exit_code=1):
if self.job.error_code_file:
utility.update_json_kv(self.job.error_code_file, self.job.jobid, exit_code)
ModeBase.cleanup(self)
logging.debug(f'Aborting with error: {error_str}')
exit(exit_code)
class Slurm(ModeBase):
slurm_vars = []
considered_env_prefixes = ['SLURM', 'SRUN', 'SALLOC']
def get_run_properties_postfix(self):
return ""
def set_internal_slurm_job(self):
if self.slurm_simple_job_id:
return
try:
with open(f"{self.job.driver_path}/SlurmIDMapping.json", "r") as file:
mapping = json.loads(file.read())
self.slurm_simple_job_id = mapping[get_cenv("CI_JOB_ID")]
if self.job.down_scoping:
if not subprocess.run([f'{self.job.driver_path}/core/scripts/runHelper.sh',
f'sudo -u {self.job.account} srun --jobid={self.slurm_simple_job_id} /usr/bin/zsh -l -c echo']).returncode:
return
else:
man.remove_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"))
except (IOError, KeyError):
self.slurm_simple_job_id = None
logging.warning(f'Could not read internal Slurm jobid from mapping file')
salloc_command = ['/opt/slurm/current/bin/salloc', '--cpus-per-task=1', '--ntasks=1', '--no-shell', f'--job-name=CI_{self.job.pipeline_id}']
salloc_out = self.job.execute(' '.join(salloc_command), '', f"{self.job.driver_path}/core/scripts/runHelper.sh",
text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
logging.debug(f'custom_prepare salloc_command={salloc_command}')
logging.debug(f'custom_prepare salloc_out={salloc_out}')
try:
self.slurm_simple_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1)
logging.info(f'Using internal Slurm jobid {self.slurm_simple_job_id}')
man.add_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"),
self.slurm_simple_job_id)
except (AttributeError):
self.abort(f'Could not allocate a Slurm job for internal usage')
def custom_prepare(self):
self.set_internal_slurm_job()
# install gitlab runner if necessary
self.job.execute(f'/usr/bin/mkdir -p {self.job.user_path}',
"",
f"{self.job.driver_path}/core/scripts/runHelper.sh",
skip_env=True, srun_wrap=True)
self.job.execute(f'/usr/bin/cp /dev/stdin '
f'{self.job.user_path}/wrapper{self.job.jobid}.sh',
f"{self.job.driver_path}/core/scripts/zshWrapper.sh",
f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
skip_env=True, srun_wrap=True)
git_runner_command = [f'{self.job.shell_path} -l',
f"{self.job.user_path}/wrapper{self.job.jobid}.sh"]
self.job.execute(' '.join(git_runner_command),
f'{self.job.driver_path}/core/scripts/runnerInstallHelper.sh',
f"{self.job.driver_path}/core/scripts/pipeHelper.sh", srun_wrap=True, install_env=True)
def get_simple_script_exec(self):
return f"{srun_path} --jobid={self.slurm_simple_job_id} {self.job.shell_path} -l " \
f"{self.job.user_path}/wrapper{self.job.jobid}.sh"
@staticmethod
def env_copy():
user = os.getenv("USER")
env = {k: v for k, v in os.environ.items() if not v.__contains__(user) or k == "PATH"}
export_env = ""
for k, v in env.items():
export_env = f'{export_env}{k}="{v}",'
return export_env[:-1]
def get_combiner_script(self):
self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
return self._combiner_script
def set_slurm_env(self):
self._run_properties = self.job.get_parameters()
def set_srun_cmd(self):
prop_list = [f'{srun_path}', f'--job-name=CI_{self.job.jobid}']
prop_list.extend(self.slurm_vars)
self._run_properties = ' '.join(prop_list)
def __init__(self, job):
ModeBase.__init__(self, job)
self.slurm_simple_job_id = None
self.set_internal_slurm_job()
def get_run_properties(self):
self.set_srun_cmd()
self.set_slurm_env()
return f'{srun_path} ' + self._run_properties + f' {self.job.shell_path} -l'
def get_run_script(self):
self._run_script = self.job.exec_script
return self._run_script
def cleanup(self):
self.set_internal_slurm_job()
self.job.execute(f'/usr/bin/rm '
f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh",
srun_wrap=True)
ModeBase.cleanup(self)
self.job.execute(f'scancel {self.slurm_simple_job_id}', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh")
class Sbatch(Slurm):
def __init__(self, job):
Slurm.__init__(self, job)
self.does_inbetween_processing = True
self.slurm_output_dir = f'{self.job.clone_path}/slurm_output'
self.slurm_output_file = f'{self.slurm_output_dir}/so_{self.job.jobid}.txt'
def get_batch_properties(self, batch_script):
# cmd_out = ""
if self.job.down_scoping:
stdout = self.job.execute(f"{srun_path} /usr/bin/cat {batch_script}", '',
f"{self.job.driver_path}/core/scripts/runHelper.sh", stdout=subprocess.PIPE,
text=True).stdout
cmd_out = ' '.join([l.split()[1] for l in stdout.split("\n") if l.startswith('#SBATCH')])
else:
with open(batch_script, 'r') as f:
cmd_out = ' '.join([l.split()[1] for l in f.readlines() if l.startswith('#SBATCH')])
return cmd_out
def get_combiner_script(self):
self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
return self._combiner_script
def get_run_properties(self):
batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
self._run_properties = f'{sbatch_path} --wait {self.get_batch_properties(batch_script)} ' \
f'{self.job.get_parameters()} --output={self.slurm_output_file}'
return self._run_properties
def custom_run_setup(self, **kwargs):
if kwargs["script_execution"] and kwargs["do_inbetween_processing"]:
logging.debug('Creating so file')
if self.job.down_scoping:
self.job.execute(f'/usr/bin/mkdir -p '
f'{self.slurm_output_dir}', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
self.job.execute(f'/usr/bin/touch '
f'{self.slurm_output_file}', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
self.job.execute(f'/usr/bin/chmod '
f'o+r {self.slurm_output_file}', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
self.job.execute(f'/usr/bin/cp /dev/stdin '
f'{self.job.build_path}/chmodPath{self.job.jobid}.sh',
f"{self.job.driver_path}/core/scripts/chmodPath.sh",
f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
skip_env=True, srun_wrap=True)
self.job.execute(f'/usr/local_rwth/bin/zsh {self.job.build_path}/chmodPath{self.job.jobid}.sh',
f'{self.slurm_output_dir}',
f"{self.job.driver_path}/core/scripts/execHelper.sh", srun_wrap=True)
else:
os.makedirs(self.slurm_output_dir, exist_ok=True)
os.system(f'touch {self.slurm_output_file}')
def inbetween_processing(self, main_proc):
logging.debug(f'Starting inbetween processing')
return subprocess.Popen(f'tail -F {self.slurm_output_file}'.split())
def has_post_run_script(self):
return True
def get_post_run_script(self):
return Slurm.get_run_script(self)
def get_post_run_properties(self):
return Slurm.get_run_properties(self)
def get_run_script(self):
tmp = os.getenv("TMP")
with open(f'{self.job.driver_path}/core/scripts/batchWrapper.sh', 'r') as file:
filedata = file.read()
filedata = filedata.replace('replaceme', f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}')
with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file:
file.write(filedata)
return f'{tmp}/wrapper{self.job.jobid}'
class Default(Sbatch):
def get_combiner_script(self):
self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
return self._combiner_script
def get_run_properties(self):
parameter_string = self.job.get_parameters()
self._run_properties = f'{sbatch_path} --wait --output={self.slurm_output_file} {parameter_string}'
return self._run_properties
def get_run_script(self):
self.job.execute(f'/usr/bin/cp /dev/stdin '
f'{self.job.clone_path}/script.',
f"{Slurm.get_run_script(self)}",
f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
skip_env=True, srun_wrap=True)
tmp = os.getenv("TMP")
with open(f'{self.job.driver_path}/core/scripts/batchWrapper.sh', 'r') as file:
filedata = file.read()
filedata = filedata.replace('replaceme', f'{self.job.clone_path}/script.')
with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file:
file.write(filedata)
return f'{tmp}/wrapper{self.job.jobid}'
def has_post_run_script(self):
return False
class Batch(Slurm):
def __init__(self, job):
Slurm.__init__(self, job, )
def get_combiner_script(self):
self._combiner_script = f"{self.job.driver_path}/core/scripts/execHelper.sh"
return self._combiner_script
def get_batch_properties(self, batch_script):
# cmd_out = ""
if self.job.down_scoping:
stdout = self.job.execute(f"{srun_path} /usr/bin/cat {batch_script}", '',
f"{self.job.driver_path}/core/scripts/runHelper.sh", stdout=subprocess.PIPE,
text=True).stdout
cmd_out = ' '.join([l.split()[1] for l in stdout.split("\n") if l.startswith('#SBATCH')])
else:
with open(batch_script, 'r') as f:
cmd_out = ' '.join([l.split()[1] for l in f.readlines() if l.startswith('#SBATCH')])
return cmd_out
def get_run_properties(self):
batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
self._run_properties = ' '.join([f'{srun_path}', self.get_batch_properties(batch_script)])
print('Warning: The contents of the script section in the CI definition '
'will be used as a post-processing script in the batch mode.')
return self._run_properties + f' {self.job.shell_path}'
def get_run_properties_postfix(self):
return ""
def get_run_script(self):
self._run_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
return self._run_script
def has_post_run_script(self):
return True
def get_post_run_script(self):
return Slurm.get_run_script(self)
def get_post_run_properties(self):
return Slurm.get_run_properties(self)
def cleanup(self):
self.job.execute(f'/usr/bin/rm '
f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
ModeBase.cleanup(self)
self.job.execute(f'scancel {self.slurm_simple_job_id}', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh")
class Singularity_Batch(Default):
container = ''
@staticmethod
def escape(s):
return s.replace('/', '\/')
def custom_run_setup(self, **kwargs):
Sbatch.custom_run_setup(self, **kwargs)
if kwargs["script_execution"] and kwargs["do_inbetween_processing"]:
logging.debug('Creating param file')
# write env_file
with open(f'{self.job.tmp_dir}/env{self.job.jobid}', 'w') as file:
file.write(f"CONTAINER={self.container}\0EXEC_WRAPPER={srun_path}\0PARAMS={self.job.get_parameters()}\0OUTPUT_FILE=--output={self.slurm_output_file}")
self.job.execute(f'/usr/bin/cp /dev/stdin '
f'{self.slurm_output_dir}/batchEnv{self.job.jobid}',
f"{self.job.tmp_dir}/env{self.job.jobid}",
f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
skip_env=True, srun_wrap=True)
def get_combiner_script(self):
self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
return self._combiner_script
def get_run_script(self):
self.job.execute(f'/usr/bin/cp /dev/stdin '
f'{self.job.clone_path}/script{self.job.jobid}.sh',
f'{Slurm.get_run_script(self)}', f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
skip_env=True, srun_wrap=True)
# move wrapper to user
tmp = os.getenv("TMP")
with open(f'{self.job.driver_path}/core/scripts/batchWrapper.sh', 'r') as file:
filedata = file.read()
filedata = filedata.replace('replaceme', f'{self.job.clone_path}/singularity{self.job.jobid}.sh < {self.job.clone_path}/script{self.job.jobid}.sh')
with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file:
file.write(filedata)
return f'{tmp}/wrapper{self.job.jobid}'
def get_run_properties(self):
self._run_properties = Default.get_run_properties(self)
# get container for singularity
self.container = get_cenv('CONTAINER')
if self.container is None:
ModeBase.abort(self, "Error: No container defined, use variable CONTAINER")
if os.path.exists(self.container):
self.container = f'{self.job.clone_path}/{self.container}'
# Generation of the singularity script within user space
self.job.execute(f'/usr/bin/cp /dev/stdin '
f'{self.job.clone_path}/singularity{self.job.jobid}.sh',
self.get_run_properties_postfix(), f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
skip_env=True, srun_wrap=True)
# add singularity specific parameter to the properties
property_split = self._run_properties.split(" ")
property_split.insert(1, f'--export-file={self.slurm_output_dir}/batchEnv{self.job.jobid}')
self._run_properties = " ".join(property_split)
return self._run_properties
def cleanup(self):
ModeBase.cleanup(self)
self.job.execute(f'scancel {self.slurm_simple_job_id}', '', f"{self.job.driver_path}/core/scripts/runHelper.sh")
def get_run_properties_postfix(self):
self.run_properties_postfix = ""
# get container for singularity
self.container = get_cenv('CONTAINER')
if self.container is None:
ModeBase.abort(self, "Error: No container defined, use variable CONTAINER")
if os.path.exists(self.container):
self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityLocalRunstep.sh '
else:
self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityRunstep.sh '
return self.run_properties_postfix
class Singularity(Slurm):
container = ''
def get_combiner_script(self):
self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
return self._combiner_script
def __init__(self, job):
Slurm.__init__(self, job)
def get_run_script(self):
self._run_script = Slurm.get_run_script(self)
return self._run_script
def get_run_properties(self):
self._run_properties = Slurm.get_run_properties(self)
# get container for singularity
self.container = get_cenv('CONTAINER')
if self.container is None:
ModeBase.abort(self, "Error: No container defined, use variable CONTAINER")
if os.path.exists(self.container):
self.container = f'{self.job.clone_path}/{self.container}'
# Generation of the singularity script within user space
self.job.execute(f'/usr/bin/cp /dev/stdin '
f'{self.job.user_path}/script{self.job.jobid}.sh',
self.get_run_properties_postfix(), f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
skip_env=True, srun_wrap=True)
# add singularity specific parameter to the properties
property_split = self._run_properties.split(" ")
property_split.insert(1, f'--export=CONTAINER={self.container}')
property_split.append(f"{self.job.user_path}/script{self.job.jobid}.sh")
self._run_properties = " ".join(property_split)
return self._run_properties
def cleanup(self):
self.job.execute(f'/usr/bin/rm '
f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
ModeBase.cleanup(self)
self.job.execute(f'/usr/bin/rm '
f'{self.job.user_path}/script{self.job.jobid}.sh', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
self.job.execute(f'scancel {self.slurm_simple_job_id}', '', f"{self.job.driver_path}/core/scripts/runHelper.sh")
def get_run_properties_postfix(self):
self.run_properties_postfix = ""
# get container for singularity
self.container = get_cenv('CONTAINER')
if self.container is None:
ModeBase.abort(self, "Error: No container defined, use variable CONTAINER")
if os.path.exists(self.container):
self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityLocalRunstep.sh '
else:
self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityRunstep.sh '
return self.run_properties_postfix
class SingleSlurmJobAcrossStages(Slurm):
def get_combiner_script(self):
self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
return self._combiner_script
def get_run_properties_postfix(self):
return ""
def get_jobid_from_file(self, path):
with open(path, 'r') as node_index_fp:
return node_index_fp.readline().strip()
@staticmethod
def get_node_id_str(variables):
if variables and len(variables):
return ''.join([os.getenv(v).replace('/', '') for v in variables])
return 'Sequential'
@staticmethod
def get_env_for_single_slurm_job(variables):
return {v.replace('CUSTOM_ENV_PARVAR_', ''): os.getenv(v) for v in variables}
def __init__(self, job):
Slurm.__init__(self, job)
self.slurm_job_id = None
self.id_vars = [v for v in os.environ if v.startswith('CUSTOM_ENV_PARVAR_')]
os.makedirs(f'{self.job.shared_tmp}/SlurmJobIds', exist_ok=True)
os.makedirs(f'{self.job.tmp_dir}/SlurmJobIds', exist_ok=True)
self.slurm_jobid_file = f'{self.job.shared_tmp}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt'
self.cc_slurm_jobid_file = f'{self.job.tmp_dir}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt'
self.tmp_dir = None
def get_run_properties(self):
if not os.path.isfile(self.slurm_jobid_file):
salloc_command = ['/opt/slurm/current/bin/salloc', '--no-shell', f'--job-name=CI_{self.job.pipeline_id}'] + \
self.job.get_parameters().split(" ")
salloc_command.extend(self.slurm_vars)
salloc_out = self.job.execute(' '.join(salloc_command), '',
f"{self.job.driver_path}/core/scripts/runHelper.sh",
text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
logging.debug(f'run_properties salloc_command={salloc_command}')
logging.debug(f'run_properties salloc_out={salloc_out}')
self.slurm_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1)
with open(self.slurm_jobid_file, 'w+') as slurm_jobid_fp:
slurm_jobid_fp.write(self.slurm_job_id + '\n')
with open(self.cc_slurm_jobid_file, 'w+') as slurm_jobid_fp:
slurm_jobid_fp.write(self.slurm_job_id + '\n')
logging.info(f'Started new slurm_job_id={self.slurm_job_id}, could not find {self.slurm_jobid_file}')
else:
self.slurm_job_id = self.get_jobid_from_file(self.slurm_jobid_file)
logging.info(f'Using slurm_job_id={self.slurm_job_id}, could find {self.slurm_jobid_file}')
tmp_dir_srun_out = self.job.execute(f'{srun_path} --jobid {self.slurm_job_id} /usr/bin/printenv TMP', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=False,
text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout.split('\n')
logging.debug(f'srun tmp_dir output (unfitered): {tmp_dir_srun_out}')
ignores = ['error', 'slurmstepd']
self.tmp_dir = [x for x in tmp_dir_srun_out if all(s not in x for s in ignores)]
logging.debug(f'srun tmp_dir output: {self.tmp_dir}')
self.tmp_dir = self.tmp_dir[0]
self._run_properties = Slurm.get_run_properties(self).split()
additional_env = []
for k, v in self.get_env_for_single_slurm_job(self.id_vars).items():
additional_env.append(f"{k}={v}")
if not additional_env.__sizeof__() == 0:
self._run_properties.insert(1, f'--export=' + ",".join(additional_env))
self._run_properties.insert(1, f'--jobid={self.slurm_job_id}')
self._run_properties = ' '.join(self._run_properties)
return self._run_properties
def get_run_script(self):
self._run_script = Slurm.get_run_script(self)
return self._run_script
def get_simple_script_exec(self):
if self.job.args[1] == 'run':
return self.get_run_properties()
return Slurm.get_simple_script_exec(self)
def cleanup(self):
if get_cenv('END_SINGLE_SLURM_JOB') == '1':
scancel_out = self.job.execute(f'/opt/slurm/current/bin/scancel '
f'{self.get_jobid_from_file(self.slurm_jobid_file)}', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh",
text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
logging.debug(f'cleanup res={scancel_out}')
try:
os.remove(self.slurm_jobid_file)
except FileNotFoundError:
pass
# Cleanup the directory with the jobIds of the whole pipeline
try:
os.rmdir(f'{self.job.shared_tmp}/SlurmJobIds')
except (FileNotFoundError, OSError):
pass
try:
os.rmdir(f'{self.job.shared_tmp}')
except (FileNotFoundError, OSError):
pass
self.job.execute(f'/usr/bin/rm '
f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '',
f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
ModeBase.cleanup(self)
class SSH(ModeBase):
def __init__(self, job):
ModeBase.__init__(self, job)
self.dest_node = get_cenv('CI_SSH_HOST')
if not self.dest_node:
ModeBase.abort(self, "Using ssh mode but no node specified. Specify: CI_SSH_HOST")
def get_env_setup(self):
return f' {self.job.driver_path}/core/scripts/ssh.env '
def get_run_properties(self):
return f'ssh -T {self.dest_node}'
# TODO: Move this to ModeBase (is same for Slurm, except Batch)
def get_run_script(self):
self._run_script = f'{self.job.exec_script}'
return self._run_script
def get_combiner_script(self):
self._combiner_script = f"{self.job.driver_path}/core/scripts/xPipeHelper.sh"
return self._combiner_script
def get_simple_script_exec(self):
return f"ssh -T {self.dest_node}"
def cleanup(self):
ModeBase.cleanup(self)
def get_custom_prepare(self):
pass
@staticmethod
def has_post_run_script():
return False
# Get the possible modes from the CI script
def get_mode(job):
mode = get_cenv('CI_MODE', 'Default').strip()
if mode not in ALL_MODES:
print(f'Error: tried to use unknown ci mode {mode}, valid modes are {ALL_MODES}')
logging.error(f'Error: tried to use unknown ci mode {mode}')
exit(1)
if job.down_scoping and mode in XLOCAL_MODES:
print(f'Error: tried to use local only ci mode {mode}, valid modes for down_scoping are {VALID_MODES + DOWN_MODES}')
logging.error(f'Error: tried to use local only ci mode {mode}')
exit(1)
if not job.down_scoping and mode in DOWN_MODES:
print(f'Error: tried to use downscope only ci mode {mode}, valid modes for local are {VALID_MODES + XLOCAL_MODES}')
logging.error(f'Error: tried to use downscope only ci mode {mode}')
exit(1)
return getattr(importlib.import_module('core.modes'), mode)(job)