diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6a6fc6f503216378ef2cb52dd4127f3a437a5157..f9217ae24da9b9e496c1da42168f26ba97c32c25 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -26,7 +26,7 @@ variables: .local-template: variables: - RUNNER_TAG: "custom2" + RUNNER_TAG: "ja664344" # CI_LOG_STDOUT: "0" .run-template: diff --git a/core/authentication/JSONManager.py b/core/authentication/JSONManager.py index cadbc7e4e72fec0c1248197bf03cbe6e58669b22..8972e7e993276174bf87c71d66268d9c144ab764 100644 --- a/core/authentication/JSONManager.py +++ b/core/authentication/JSONManager.py @@ -23,7 +23,10 @@ def remove_id_mapping(path, CI_id): with lock: with open(path, "r") as file: mapping = json.loads(file.read()) - del mapping[CI_id] + try: + del mapping[CI_id] + except (KeyError): + logging.warning(f'Tried to remove internal Slurm ID from mapping') new_mapping = json.dumps(mapping) with open(path, "w") as file: file.write(new_mapping) diff --git a/core/job.py b/core/job.py index 8325351e957cbf18be3793786268ebdc6efac83c..f4be259ff0696b02a82643a671328a16cb0d3f86 100644 --- a/core/job.py +++ b/core/job.py @@ -155,7 +155,11 @@ class Job: self.args = args self.driver_path = driver_path self.mode_name = get_cenv('CI_MODE', 'Slurm').strip() + self._mode = None self.__setup() + if not self.allow_failure and self.args[1] == 'run': + self.error_code_file = f'{self.stage_tmp_dir}/{self.args[3]}.json' + os.makedirs(self.stage_tmp_dir, exist_ok=True) logging.info(f'Started step {self.args[1]} with args={self.args}') self._mode = modes.get_mode(self) @@ -261,9 +265,10 @@ class Job: command.extend([run_script]) else: command.extend([run_properties, run_script]) - self._mode.custom_run_setup(install_env=install_env, skip_env=skip_env, script_execution=script_execution, - do_inbetween_processing=do_inbetween_processing, srun_wrap=srun_wrap, **kwargs) - logging.info(f'Executing with env: {str(self.custom_env)}') + if script_execution: + self._mode.custom_run_setup(install_env=install_env, skip_env=skip_env, script_execution=script_execution, + do_inbetween_processing=do_inbetween_processing, srun_wrap=srun_wrap, **kwargs) + logging.info(f'Executing with env: {str(self.custom_env)}') logging.info(f'Executing command: {str(command)}') os.chdir('/tmp') main_proc = subprocess.Popen(command, @@ -292,8 +297,6 @@ class Job: def run(self): if not self.allow_failure: - self.error_code_file = f'{self.stage_tmp_dir}/{self.args[3]}.json' - os.makedirs(self.stage_tmp_dir, exist_ok=True) utility.update_json_kv(self.error_code_file, self.jobid, -1) self.exec_script = self.args[2] diff --git a/core/modes/__init__.py b/core/modes/__init__.py index 5e1c1603c455745ed37cae4c95b407288a6fffd5..c5d5589b71304bd6f0125006fb2dc5a32dba92b4 100644 --- a/core/modes/__init__.py +++ b/core/modes/__init__.py @@ -149,7 +149,8 @@ class ModeBase(ABC): pass def abort(self, error_str, exit_code=1): - utility.update_json_kv(self.job.error_code_file, self.job.jobid, exit_code) + if self.job.error_code_file: + utility.update_json_kv(self.job.error_code_file, self.job.jobid, exit_code) ModeBase.cleanup(self) logging.debug(f'Aborting with error: {error_str}') exit(exit_code) @@ -162,17 +163,36 @@ class Slurm(ModeBase): def get_run_properties_postfix(self): return "" - def custom_prepare(self): - if not self.slurm_simple_job_id: - salloc_command = ['/opt/slurm/current/bin/salloc', '--no-shell', f'--job-name=CI_{self.job.pipeline_id}'] - salloc_out = self.job.execute(' '.join(salloc_command), '', f"{self.job.driver_path}/core/scripts/runHelper.sh", - text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout - logging.debug(f'custom_prepare salloc_command={salloc_command}') - logging.debug(f'custom_prepare salloc_out={salloc_out}') + def set_internal_slurm_job(self): + if self.slurm_simple_job_id: + return + + try: + with open(f"{self.job.driver_path}/SlurmIDMapping.json", "r") as file: + mapping = json.loads(file.read()) + self.slurm_simple_job_id = mapping[get_cenv("CI_JOB_ID")] + return + except (IOError, KeyError): + self.slurm_simple_job_id = None + logging.warning(f'Could not read internal Slurm jobid from mapping file') + + salloc_command = ['/opt/slurm/current/bin/salloc', '--cpus-per-task=1', '--ntasks=1', '--no-shell', f'--job-name=CI_{self.job.pipeline_id}'] + salloc_out = self.job.execute(' '.join(salloc_command), '', f"{self.job.driver_path}/core/scripts/runHelper.sh", + text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + logging.debug(f'custom_prepare salloc_command={salloc_command}') + logging.debug(f'custom_prepare salloc_out={salloc_out}') + try: self.slurm_simple_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1) - logging.info(f'Using slurm_job_id={self.slurm_simple_job_id}') - man.add_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"), - self.slurm_simple_job_id) + logging.info(f'Using internal Slurm jobid {self.slurm_simple_job_id}') + man.add_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"), + self.slurm_simple_job_id) + except (AttributeError): + self.abort(f'Could not allocate a Slurm job for internal usage') + + + + def custom_prepare(self): + self.set_internal_slurm_job() # install gitlab runner if necessary self.job.execute(f'/usr/bin/mkdir -p {self.job.user_path}', "", @@ -216,12 +236,8 @@ class Slurm(ModeBase): def __init__(self, job): ModeBase.__init__(self, job) - try: - with open(f"{self.job.driver_path}/SlurmIDMapping.json", "r") as file: - mapping = json.loads(file.read()) - self.slurm_simple_job_id = mapping[get_cenv("CI_JOB_ID")] - except (IOError, KeyError): - self.slurm_simple_job_id = None + self.slurm_simple_job_id = None + self.set_internal_slurm_job() def get_run_properties(self): self.set_srun_cmd() @@ -233,6 +249,7 @@ class Slurm(ModeBase): return self._run_script def cleanup(self): + self.set_internal_slurm_job() self.job.execute(f'/usr/bin/rm ' f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '', f"{self.job.driver_path}/core/scripts/runHelper.sh", @@ -564,8 +581,6 @@ class SingleSlurmJobAcrossStages(Slurm): os.makedirs(f'{self.job.tmp_dir}/SlurmJobIds', exist_ok=True) self.slurm_jobid_file = f'{self.job.shared_tmp}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt' self.cc_slurm_jobid_file = f'{self.job.tmp_dir}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt' - if os.path.isfile(self.slurm_jobid_file): - self.slurm_simple_job_id = self.get_jobid_from_file(self.slurm_jobid_file) self.tmp_dir = None def get_run_properties(self): @@ -603,8 +618,6 @@ class SingleSlurmJobAcrossStages(Slurm): additional_env = [] for k, v in self.get_env_for_single_slurm_job(self.id_vars).items(): additional_env.append(f"{k}={v}") - # self._custom_env = self.get_env_for_single_slurm_job(self.id_vars) - # self._custom_env['SLURM_JOB_ID'] = self.slurm_job_id if not additional_env.__sizeof__() == 0: self._run_properties.insert(1, f'--export=' + ",".join(additional_env)) self._run_properties.insert(1, f'--jobid={self.slurm_job_id}') @@ -615,8 +628,12 @@ class SingleSlurmJobAcrossStages(Slurm): self._run_script = Slurm.get_run_script(self) return self._run_script + def get_simple_script_exec(self): + if self.job.args[1] == 'run': + return self.get_run_properties() + return Slurm.get_simple_script_exec(self) + def cleanup(self): - ModeBase.cleanup(self) if get_cenv('END_SINGLE_SLURM_JOB') == '1': scancel_out = self.job.execute(f'/opt/slurm/current/bin/scancel ' f'{self.get_jobid_from_file(self.slurm_jobid_file)}', '', @@ -637,10 +654,10 @@ class SingleSlurmJobAcrossStages(Slurm): os.rmdir(f'{self.job.shared_tmp}') except (FileNotFoundError, OSError): pass - #self.job.execute(f'scancel {self.slurm_simple_job_id}', '', f"{self.job.driver_path}/core/scripts/runHelper.sh") self.job.execute(f'/usr/bin/rm ' f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '', f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True) + ModeBase.cleanup(self) class SSH(ModeBase):