Skip to content
Snippets Groups Projects
Select Git revision
  • 827137238d3f97c39d33146fcda91d132fb39679
  • main default protected
  • AC-1525-flexibleResourceLimits
  • AC-1339-MigrateGPUunits
4 results

Changelog.md

Blame
  • __init__.py 30.71 KiB
    from abc import ABC, abstractmethod
    import os
    import subprocess
    import re
    import importlib
    import sys
    from filelock import FileLock
    import json
    import core.authentication.JSONManager as man
    import logging
    
    from core.utility import get_cenv
    import core.utility as utility
    
    VALID_MODES = ['Default', 'Slurm', 'Singularity', 'Batch', 'SingleSlurmJobAcrossStages', 'Sbatch']
    DOWN_MODES = ['Singularity_Batch']
    XLOCAL_MODES = ['SSH']
    ALL_MODES = VALID_MODES + XLOCAL_MODES + DOWN_MODES
    
    srun_path = "srun"  # "/usr/local_host/bin/srun"
    sbatch_path = "sbatch"  # "/usr/local_host/bin/sbatch"
    
    
    class ModeBase(ABC):
        _run_script = ''
        _run_properties = ''
        _custom_env = dict()
        slurm_jobid_file = None
        _combiner_script = ''
        run_properties_postfix = None
    
        @staticmethod
        def get_jobid_from_file(path):
            with open(path, 'r') as node_index_fp:
                return node_index_fp.readline().strip()
    
        @property
        def run_script(self):
            return self._run_script
    
        @property
        def combiner_script(self):
            return self._combiner_script
    
        @property
        def run_properties(self):
            return self._run_properties
    
        @abstractmethod
        def get_combiner_script(self):
            pass
    
        @property
        def custom_env(self):
            return self._custom_env
    
        def __init__(self, job):
            self.job = job
    
        @abstractmethod
        def get_run_properties(self):
            pass
    
        @abstractmethod
        def get_run_script(self):
            pass
    
        def has_post_run_script(self):
            return False
    
        def get_post_run_script(self):
            return None
    
        def custom_run_setup(self, **kwargs):
            pass
    
        def get_post_run_properties(self):
            return None
    
        def custom_prepare(self):
            pass
    
        def get_simple_script_exec(self):
            return None
    
        def inbetween_processing(self, main_proc=None):
            pass
    
        @staticmethod
        def get_env_setup():
            return ''
    
        def get_run_properties_postfix(self):
            pass
    
        def cleanup_on_failure(self):
            if not self.slurm_jobid_file or not os.path.isfile(self.slurm_jobid_file):
                logging.debug(f'Skipping cleanup_on_failure due to missing slurm jobid file')
                return
            command = []
            if self.job.down_scoping:
                command = f"sudo -u {self.job.account} ".split()
            logging.debug(f'cleanup_on_failure command: {command}')
            scancel_out = self.job.execute(f'/opt/slurm/current/bin/scancel '
                                           f'{ModeBase.get_jobid_from_file(self.slurm_jobid_file)}', '',
                                           f"{self.job.driver_path}/core/scripts/runHelper.sh", text=True,
                                           stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
            logging.debug(f'cleanup_on_failure result: {scancel_out}')
    
        @abstractmethod
        def cleanup(self):
            if self.job.allow_failure:
                return
            cc_tmp = os.path.split(self.job.concurrent_tmp)[0]
            do_cleanup = 0
            for cc_id in os.listdir(cc_tmp):
                pipeline_path = f'{cc_tmp}/{cc_id}/{self.job.pipeline_id}/stages/{get_cenv("CI_JOB_STAGE")}'
                if not os.path.isdir(pipeline_path):
                    continue
                for error_file in os.listdir(pipeline_path):
                    if not error_file.endswith('.json'):
                        continue
    
                    lock = FileLock(f'{pipeline_path}/{error_file}.lock')
                    with lock:
                        with open(f'{pipeline_path}/{error_file}', 'r+') as error_file_fd:
                            error_file_fd.seek(0)
                            error_codes = json.load(error_file_fd)
                            for jobid in error_codes:
                                if error_codes[jobid] == -1:
                                    # do_cleanup = -1
                                    return
                                elif error_codes[jobid] == 1:
                                    do_cleanup = 1
    
            if do_cleanup == 1 and self.job.shared_tmp and os.path.isdir(self.job.shared_tmp):
                command = []
                if self.job.down_scoping:
                    command = f"sudo -u {self.job.account} ".split()
                command.extend(['/opt/slurm/current/bin/scancel', f'--name=CI_{self.job.pipeline_id}'])
                logging.debug(f'doing cleanup shared_tmp={self.job.shared_tmp} command={command}')
                scancel_out = subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
                logging.debug(f'doing cleanup res={scancel_out}')
                os.system(f'rm -r {self.job.shared_tmp}')
            man.remove_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"))
            try:
                os.rmdir(f'{self.job.runner_path}/scripts/{self.job.pipeline_id}')
            except (FileNotFoundError, OSError):
                pass
    
        def abort(self, error_str, exit_code=1):
            if self.job.error_code_file:
                utility.update_json_kv(self.job.error_code_file, self.job.jobid, exit_code)
            ModeBase.cleanup(self)
            logging.debug(f'Aborting with error: {error_str}')
            exit(exit_code)
    
    
    class Slurm(ModeBase):
        slurm_vars = []
        considered_env_prefixes = ['SLURM', 'SRUN', 'SALLOC']
    
        def get_run_properties_postfix(self):
            return ""
    
        def set_internal_slurm_job(self):
            if self.slurm_simple_job_id:
                return
    
            try:
                with open(f"{self.job.driver_path}/SlurmIDMapping.json", "r") as file:
                    mapping = json.loads(file.read())
                    self.slurm_simple_job_id = mapping[get_cenv("CI_JOB_ID")]
                    if self.job.down_scoping:
                        if not subprocess.run([f'{self.job.driver_path}/core/scripts/runHelper.sh',
                                                f'sudo -u {self.job.account} srun --jobid={self.slurm_simple_job_id} /usr/bin/zsh -l -c echo']).returncode:
                            return
                        else:
                            man.remove_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"))
            except (IOError, KeyError):
                self.slurm_simple_job_id = None
                logging.warning(f'Could not read internal Slurm jobid from mapping file')
    
            salloc_command = ['/opt/slurm/current/bin/salloc', '--cpus-per-task=1', '--ntasks=1', '--no-shell', f'--job-name=CI_{self.job.pipeline_id}']
            salloc_out = self.job.execute(' '.join(salloc_command), '', f"{self.job.driver_path}/core/scripts/runHelper.sh",
                                          text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
            logging.debug(f'custom_prepare salloc_command={salloc_command}')
            logging.debug(f'custom_prepare salloc_out={salloc_out}')
            try:
                self.slurm_simple_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1)
                logging.info(f'Using internal Slurm jobid {self.slurm_simple_job_id}')
                man.add_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"),
                                   self.slurm_simple_job_id)
            except (AttributeError):
                self.abort(f'Could not allocate a Slurm job for internal usage')
            
    
    
        def custom_prepare(self):
            self.set_internal_slurm_job()
            # install gitlab runner if necessary
            self.job.execute(f'/usr/bin/mkdir -p {self.job.user_path}',
                             "",
                             f"{self.job.driver_path}/core/scripts/runHelper.sh",
                             skip_env=True, srun_wrap=True)
            self.job.execute(f'/usr/bin/cp /dev/stdin '
                             f'{self.job.user_path}/wrapper{self.job.jobid}.sh',
                             f"{self.job.driver_path}/core/scripts/zshWrapper.sh",
                             f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
                             skip_env=True, srun_wrap=True)
            git_runner_command = [f'{self.job.shell_path} -l',
                                  f"{self.job.user_path}/wrapper{self.job.jobid}.sh"]
            self.job.execute(' '.join(git_runner_command),
                             f'{self.job.driver_path}/core/scripts/runnerInstallHelper.sh',
                             f"{self.job.driver_path}/core/scripts/pipeHelper.sh", srun_wrap=True, install_env=True)
    
        def get_simple_script_exec(self):
            return f"{srun_path} --jobid={self.slurm_simple_job_id} {self.job.shell_path} -l " \
                   f"{self.job.user_path}/wrapper{self.job.jobid}.sh"
    
        @staticmethod
        def env_copy():
            user = os.getenv("USER")
            env = {k: v for k, v in os.environ.items() if not v.__contains__(user) or k == "PATH"}
            export_env = ""
            for k, v in env.items():
                export_env = f'{export_env}{k}="{v}",'
            return export_env[:-1]
    
        def get_combiner_script(self):
            self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
            return self._combiner_script
    
        def set_slurm_env(self):
            self._run_properties = self.job.get_parameters()
    
        def set_srun_cmd(self):
            prop_list = [f'{srun_path}', f'--job-name=CI_{self.job.jobid}']
            prop_list.extend(self.slurm_vars)
            self._run_properties = ' '.join(prop_list)
    
        def __init__(self, job):
            ModeBase.__init__(self, job)
            self.slurm_simple_job_id = None
            self.set_internal_slurm_job()
    
        def get_run_properties(self):
            self.set_srun_cmd()
            self.set_slurm_env()
            return f'{srun_path} ' + self._run_properties + f' {self.job.shell_path} -l'
    
        def get_run_script(self):
            self._run_script = self.job.exec_script
            return self._run_script
    
        def cleanup(self):
            self.set_internal_slurm_job()
            self.job.execute(f'/usr/bin/rm '
                             f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '',
                             f"{self.job.driver_path}/core/scripts/runHelper.sh",
                             srun_wrap=True)
            ModeBase.cleanup(self)
            self.job.execute(f'scancel {self.slurm_simple_job_id}', '',
                             f"{self.job.driver_path}/core/scripts/runHelper.sh")
    
    
    class Sbatch(Slurm):
        def  __init__(self, job):
            Slurm.__init__(self, job)
            self.does_inbetween_processing = True
            self.slurm_output_dir = f'{self.job.clone_path}/slurm_output'
            self.slurm_output_file = f'{self.slurm_output_dir}/so_{self.job.jobid}.txt'
    
        def get_batch_properties(self, batch_script):
            # cmd_out = ""
            if self.job.down_scoping:
                stdout = self.job.execute(f"{srun_path} /usr/bin/cat {batch_script}", '',
                                          f"{self.job.driver_path}/core/scripts/runHelper.sh", stdout=subprocess.PIPE,
                                          text=True).stdout
                cmd_out = ' '.join([l.split()[1] for l in stdout.split("\n") if l.startswith('#SBATCH')])
            else:
                with open(batch_script, 'r') as f:
                    cmd_out = ' '.join([l.split()[1] for l in f.readlines() if l.startswith('#SBATCH')])
            return cmd_out
    
        def get_combiner_script(self):
            self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
            return self._combiner_script
    
    
        def get_run_properties(self):
            batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
            self._run_properties = f'{sbatch_path} --wait {self.get_batch_properties(batch_script)} ' \
                                   f'{self.job.get_parameters()} --output={self.slurm_output_file}'
            return self._run_properties
    
        def custom_run_setup(self, **kwargs):
            if kwargs["script_execution"] and kwargs["do_inbetween_processing"]:
                logging.debug('Creating so file')
                if self.job.down_scoping:
                    self.job.execute(f'/usr/bin/mkdir -p '
                                 f'{self.slurm_output_dir}', '',
                                 f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
                    self.job.execute(f'/usr/bin/touch '
                                 f'{self.slurm_output_file}', '',
                                 f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
                    self.job.execute(f'/usr/bin/chmod '
                                 f'o+r {self.slurm_output_file}', '',
                                 f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
                    self.job.execute(f'/usr/bin/cp /dev/stdin '
                                 f'{self.job.build_path}/chmodPath{self.job.jobid}.sh',
                                 f"{self.job.driver_path}/core/scripts/chmodPath.sh",
                                 f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
                                 skip_env=True, srun_wrap=True)
                    self.job.execute(f'/usr/local_rwth/bin/zsh {self.job.build_path}/chmodPath{self.job.jobid}.sh',
                                 f'{self.slurm_output_dir}',
                                 f"{self.job.driver_path}/core/scripts/execHelper.sh", srun_wrap=True)
                else:
                    os.makedirs(self.slurm_output_dir, exist_ok=True)
                    os.system(f'touch {self.slurm_output_file}')
    
        def inbetween_processing(self, main_proc):
            logging.debug(f'Starting inbetween processing')
            return subprocess.Popen(f'tail -F {self.slurm_output_file}'.split())
    
        def has_post_run_script(self):
            return True
    
        def get_post_run_script(self):
            return Slurm.get_run_script(self)
    
        def get_post_run_properties(self):
            return Slurm.get_run_properties(self)
    
        def get_run_script(self):
            tmp = os.getenv("TMP")
            with open(f'{self.job.driver_path}/core/scripts/batchWrapper.sh', 'r') as file:
                filedata = file.read()
            filedata = filedata.replace('replaceme', f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}')
            with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file:
                file.write(filedata)
            return f'{tmp}/wrapper{self.job.jobid}'
    
    class Default(Sbatch):
    
        def get_combiner_script(self):
            self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
            return self._combiner_script
    
        def get_run_properties(self):
            parameter_string = self.job.get_parameters()
            self._run_properties = f'{sbatch_path} --wait --output={self.slurm_output_file} {parameter_string}'
            return self._run_properties
    
        def get_run_script(self):
            self.job.execute(f'/usr/bin/cp /dev/stdin '
                             f'{self.job.clone_path}/script.',
                             f"{Slurm.get_run_script(self)}",
                             f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
                             skip_env=True, srun_wrap=True)
            tmp = os.getenv("TMP")
            with open(f'{self.job.driver_path}/core/scripts/batchWrapper.sh', 'r') as file:
                filedata = file.read()
            filedata = filedata.replace('replaceme', f'{self.job.clone_path}/script.')
            with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file:
                file.write(filedata)
            return f'{tmp}/wrapper{self.job.jobid}'
    
        def has_post_run_script(self):
            return False
    
    class Batch(Slurm):
    
        def __init__(self, job):
            Slurm.__init__(self, job, )
    
        def get_combiner_script(self):
            self._combiner_script = f"{self.job.driver_path}/core/scripts/execHelper.sh"
            return self._combiner_script
    
        def get_batch_properties(self, batch_script):
            # cmd_out = ""
            if self.job.down_scoping:
                stdout = self.job.execute(f"{srun_path} /usr/bin/cat {batch_script}", '',
                                          f"{self.job.driver_path}/core/scripts/runHelper.sh", stdout=subprocess.PIPE,
                                          text=True).stdout
                cmd_out = ' '.join([l.split()[1] for l in stdout.split("\n") if l.startswith('#SBATCH')])
            else:
                with open(batch_script, 'r') as f:
                    cmd_out = ' '.join([l.split()[1] for l in f.readlines() if l.startswith('#SBATCH')])
            return cmd_out
    
        def get_run_properties(self):
            batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
            self._run_properties = ' '.join([f'{srun_path}', self.get_batch_properties(batch_script)])
            print('Warning: The contents of the script section in the CI definition '
                  'will be used as a post-processing script in the batch mode.')
    
            return self._run_properties + f' {self.job.shell_path}'
    
        def get_run_properties_postfix(self):
            return ""
    
        def get_run_script(self):
            self._run_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
            return self._run_script
    
        def has_post_run_script(self):
            return True
    
        def get_post_run_script(self):
            return Slurm.get_run_script(self)
    
        def get_post_run_properties(self):
            return Slurm.get_run_properties(self)
    
        def cleanup(self):
            self.job.execute(f'/usr/bin/rm '
                             f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '',
                             f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
            ModeBase.cleanup(self)
            self.job.execute(f'scancel {self.slurm_simple_job_id}', '',
                             f"{self.job.driver_path}/core/scripts/runHelper.sh")
    
    
    class Singularity_Batch(Default):
        container = ''
    
        @staticmethod
        def escape(s):
            return s.replace('/', '\/')
    
        def custom_run_setup(self, **kwargs):
            Sbatch.custom_run_setup(self, **kwargs)
            if kwargs["script_execution"] and kwargs["do_inbetween_processing"]:
                logging.debug('Creating param file')
                # write env_file
                with open(f'{self.job.tmp_dir}/env{self.job.jobid}', 'w') as file:
                    file.write(f"CONTAINER={self.container}\0EXEC_WRAPPER={srun_path}\0PARAMS={self.job.get_parameters()}\0OUTPUT_FILE=--output={self.slurm_output_file}")
    
    
                self.job.execute(f'/usr/bin/cp /dev/stdin '
                                 f'{self.slurm_output_dir}/batchEnv{self.job.jobid}',
                                 f"{self.job.tmp_dir}/env{self.job.jobid}",
                                 f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
                                 skip_env=True, srun_wrap=True)
    
        def get_combiner_script(self):
            self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
            return self._combiner_script
    
        def get_run_script(self):
            self.job.execute(f'/usr/bin/cp /dev/stdin '
                             f'{self.job.clone_path}/script{self.job.jobid}.sh',
                             f'{Slurm.get_run_script(self)}', f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
                             skip_env=True, srun_wrap=True)
            # move wrapper to user
            tmp = os.getenv("TMP")
            with open(f'{self.job.driver_path}/core/scripts/batchWrapper.sh', 'r') as file:
                filedata = file.read()
            filedata = filedata.replace('replaceme', f'{self.job.clone_path}/singularity{self.job.jobid}.sh < {self.job.clone_path}/script{self.job.jobid}.sh')
            with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file:
                file.write(filedata)
            return f'{tmp}/wrapper{self.job.jobid}'
    
        def get_run_properties(self):
            self._run_properties = Default.get_run_properties(self)
            # get container for singularity
            self.container = get_cenv('CONTAINER')
            if self.container is None:
                ModeBase.abort(self, "Error: No container defined, use variable CONTAINER")
            if os.path.exists(self.container):
                self.container = f'{self.job.clone_path}/{self.container}'
            # Generation of the singularity script within user space
            self.job.execute(f'/usr/bin/cp /dev/stdin '
                             f'{self.job.clone_path}/singularity{self.job.jobid}.sh',
                             self.get_run_properties_postfix(), f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
                             skip_env=True, srun_wrap=True)
            # add singularity specific parameter to the properties
            property_split = self._run_properties.split(" ")
            property_split.insert(1, f'--export-file={self.slurm_output_dir}/batchEnv{self.job.jobid}')
            self._run_properties = " ".join(property_split)
            return self._run_properties
    
        def cleanup(self):
            ModeBase.cleanup(self)
            self.job.execute(f'scancel {self.slurm_simple_job_id}', '', f"{self.job.driver_path}/core/scripts/runHelper.sh")
    
        def get_run_properties_postfix(self):
            self.run_properties_postfix = ""
            # get container for singularity
            self.container = get_cenv('CONTAINER')
            if self.container is None:
                ModeBase.abort(self, "Error: No container defined, use variable CONTAINER")
            if os.path.exists(self.container):
                self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityLocalRunstep.sh '
            else:
                self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityRunstep.sh '
            return self.run_properties_postfix
    
    
    class Singularity(Slurm):
        container = ''
    
        def get_combiner_script(self):
            self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
            return self._combiner_script
    
        def __init__(self, job):
            Slurm.__init__(self, job)
    
        def get_run_script(self):
            self._run_script = Slurm.get_run_script(self)
            return self._run_script
    
        def get_run_properties(self):
            self._run_properties = Slurm.get_run_properties(self)
            # get container for singularity
            self.container = get_cenv('CONTAINER')
            if self.container is None:
                ModeBase.abort(self, "Error: No container defined, use variable CONTAINER")
            if os.path.exists(self.container):
                self.container = f'{self.job.clone_path}/{self.container}'
            # Generation of the singularity script within user space
            self.job.execute(f'/usr/bin/cp /dev/stdin '
                             f'{self.job.user_path}/script{self.job.jobid}.sh',
                             self.get_run_properties_postfix(), f"{self.job.driver_path}/core/scripts/pipeHelper.sh",
                             skip_env=True, srun_wrap=True)
            # add singularity specific parameter to the properties
            property_split = self._run_properties.split(" ")
            property_split.insert(1, f'--export=CONTAINER={self.container}')
            property_split.append(f"{self.job.user_path}/script{self.job.jobid}.sh")
            self._run_properties = " ".join(property_split)
            return self._run_properties
    
        def cleanup(self):
            self.job.execute(f'/usr/bin/rm '
                             f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '',
                             f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
            ModeBase.cleanup(self)
            self.job.execute(f'/usr/bin/rm '
                             f'{self.job.user_path}/script{self.job.jobid}.sh', '',
                             f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
            self.job.execute(f'scancel {self.slurm_simple_job_id}', '', f"{self.job.driver_path}/core/scripts/runHelper.sh")
    
        def get_run_properties_postfix(self):
            self.run_properties_postfix = ""
            # get container for singularity
            self.container = get_cenv('CONTAINER')
            if self.container is None:
                ModeBase.abort(self, "Error: No container defined, use variable CONTAINER")
            if os.path.exists(self.container):
                self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityLocalRunstep.sh '
            else:
                self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityRunstep.sh '
            return self.run_properties_postfix
    
    
    class SingleSlurmJobAcrossStages(Slurm):
        def get_combiner_script(self):
            self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh"
            return self._combiner_script
    
        def get_run_properties_postfix(self):
            return ""
    
        def get_jobid_from_file(self, path):
            with open(path, 'r') as node_index_fp:
                return node_index_fp.readline().strip()
    
        @staticmethod
        def get_node_id_str(variables):
            if variables and len(variables):
                return ''.join([os.getenv(v).replace('/', '') for v in variables])
            return 'Sequential'
    
        @staticmethod
        def get_env_for_single_slurm_job(variables):
            return {v.replace('CUSTOM_ENV_PARVAR_', ''): os.getenv(v) for v in variables}
    
        def __init__(self, job):
            Slurm.__init__(self, job)
            self.slurm_job_id = None
            self.id_vars = [v for v in os.environ if v.startswith('CUSTOM_ENV_PARVAR_')]
            os.makedirs(f'{self.job.shared_tmp}/SlurmJobIds', exist_ok=True)
            os.makedirs(f'{self.job.tmp_dir}/SlurmJobIds', exist_ok=True)
            self.slurm_jobid_file = f'{self.job.shared_tmp}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt'
            self.cc_slurm_jobid_file = f'{self.job.tmp_dir}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt'
            self.tmp_dir = None
    
        def get_run_properties(self):
            if not os.path.isfile(self.slurm_jobid_file):
                salloc_command = ['/opt/slurm/current/bin/salloc', '--no-shell', f'--job-name=CI_{self.job.pipeline_id}'] + \
                                 self.job.get_parameters().split(" ")
                salloc_command.extend(self.slurm_vars)
                salloc_out = self.job.execute(' '.join(salloc_command), '',
                                              f"{self.job.driver_path}/core/scripts/runHelper.sh",
                                              text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
                logging.debug(f'run_properties salloc_command={salloc_command}')
                logging.debug(f'run_properties salloc_out={salloc_out}')
                self.slurm_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1)
                with open(self.slurm_jobid_file, 'w+') as slurm_jobid_fp:
                    slurm_jobid_fp.write(self.slurm_job_id + '\n')
                with open(self.cc_slurm_jobid_file, 'w+') as slurm_jobid_fp:
                    slurm_jobid_fp.write(self.slurm_job_id + '\n')
                logging.info(f'Started new slurm_job_id={self.slurm_job_id}, could not find {self.slurm_jobid_file}')
            else:
                self.slurm_job_id = self.get_jobid_from_file(self.slurm_jobid_file)
                logging.info(f'Using slurm_job_id={self.slurm_job_id}, could find {self.slurm_jobid_file}')
    
            tmp_dir_srun_out = self.job.execute(f'{srun_path} --jobid {self.slurm_job_id} /usr/bin/printenv TMP', '',
                             f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=False,
                             text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout.split('\n')
            logging.debug(f'srun tmp_dir output (unfitered): {tmp_dir_srun_out}')
            ignores = ['error', 'slurmstepd']
            self.tmp_dir = [x for x in tmp_dir_srun_out if all(s not in x for s in ignores)]
            logging.debug(f'srun tmp_dir output: {self.tmp_dir}')
            self.tmp_dir = self.tmp_dir[0]
    
    
            self._run_properties = Slurm.get_run_properties(self).split()
    
            additional_env = []
            for k, v in self.get_env_for_single_slurm_job(self.id_vars).items():
                additional_env.append(f"{k}={v}")
            if not additional_env.__sizeof__() == 0:
                self._run_properties.insert(1, f'--export=' + ",".join(additional_env))
            self._run_properties.insert(1, f'--jobid={self.slurm_job_id}')
            self._run_properties = ' '.join(self._run_properties)
            return self._run_properties
    
        def get_run_script(self):
            self._run_script = Slurm.get_run_script(self)
            return self._run_script
    
        def get_simple_script_exec(self):
            if self.job.args[1] == 'run':
                return self.get_run_properties()
            return Slurm.get_simple_script_exec(self)
    
        def cleanup(self):
            if get_cenv('END_SINGLE_SLURM_JOB') == '1':
                scancel_out = self.job.execute(f'/opt/slurm/current/bin/scancel '
                                               f'{self.get_jobid_from_file(self.slurm_jobid_file)}', '',
                                               f"{self.job.driver_path}/core/scripts/runHelper.sh",
                                               text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
                logging.debug(f'cleanup res={scancel_out}')
                try:
                    os.remove(self.slurm_jobid_file)
                except FileNotFoundError:
                    pass
                # Cleanup the directory with the jobIds of the whole pipeline
                try:
                    os.rmdir(f'{self.job.shared_tmp}/SlurmJobIds')
                except (FileNotFoundError, OSError):
                    pass
    
                try:
                    os.rmdir(f'{self.job.shared_tmp}')
                except (FileNotFoundError, OSError):
                    pass
            self.job.execute(f'/usr/bin/rm '
                             f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '',
                             f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True)
            ModeBase.cleanup(self)
    
    
    class SSH(ModeBase):
        def __init__(self, job):
            ModeBase.__init__(self, job)
            self.dest_node = get_cenv('CI_SSH_HOST')
            if not self.dest_node:
                ModeBase.abort(self, "Using ssh mode but no node specified. Specify: CI_SSH_HOST")
    
        def get_env_setup(self):
            return f' {self.job.driver_path}/core/scripts/ssh.env '
    
        def get_run_properties(self):
            return f'ssh -T {self.dest_node}'
    
        # TODO: Move this to ModeBase (is same for Slurm, except Batch)
        def get_run_script(self):
            self._run_script = f'{self.job.exec_script}'
            return self._run_script
    
        def get_combiner_script(self):
            self._combiner_script = f"{self.job.driver_path}/core/scripts/xPipeHelper.sh"
            return self._combiner_script
    
        def get_simple_script_exec(self):
            return f"ssh -T {self.dest_node}"
    
        def cleanup(self):
            ModeBase.cleanup(self)
    
        def get_custom_prepare(self):
            pass
    
        @staticmethod
        def has_post_run_script():
            return False
    
    
    # Get the possible modes from the CI script
    def get_mode(job):
        mode = get_cenv('CI_MODE', 'Default').strip()
        if mode not in ALL_MODES:
            print(f'Error: tried to use unknown ci mode {mode}, valid modes are {ALL_MODES}')
            logging.error(f'Error: tried to use unknown ci mode {mode}')
            exit(1)
        if job.down_scoping and mode in XLOCAL_MODES:
            print(f'Error: tried to use local only ci mode {mode}, valid modes for down_scoping are {VALID_MODES + DOWN_MODES}')
            logging.error(f'Error: tried to use local only ci mode {mode}')
            exit(1)
        if not job.down_scoping and mode in DOWN_MODES:
            print(f'Error: tried to use downscope only ci mode {mode}, valid modes for local are {VALID_MODES + XLOCAL_MODES}')
            logging.error(f'Error: tried to use downscope only ci mode {mode}')
            exit(1)
        return getattr(importlib.import_module('core.modes'), mode)(job)