From af56a23bf0523196be8ee9d09ddec2f0a68db9d3 Mon Sep 17 00:00:00 2001 From: Adrian Schmitz <adrian.schmitz@rwth-aachen.de> Date: Fri, 19 May 2023 12:45:53 +0200 Subject: [PATCH] Resolve "fix timing issue for async jobs" --- core/job.py | 51 +-------------------------------- core/modes/base.py | 2 +- core/modes/slurm/batch.py | 4 +-- core/modes/slurm/singularity.py | 4 +-- core/modes/slurm/srun.py | 2 +- core/utility/defines.py | 2 +- core/utility/executor.py | 5 +++- 7 files changed, 12 insertions(+), 58 deletions(-) diff --git a/core/job.py b/core/job.py index f7133ad..8363812 100644 --- a/core/job.py +++ b/core/job.py @@ -44,7 +44,7 @@ class Job: @property def build_path(self): - return f'{self.user_path}/builds/{get_cenv("CI_CONCURRENT_PROJECT_ID")}/{get_cenv("CI_PROJECT_PATH_SLUG")}' + return f'{self.user_path}/cache/{get_cenv("CI_CONCURRENT_PROJECT_ID")}/{get_cenv("CI_PROJECT_PATH_SLUG")}' @property def cache_path(self): @@ -206,55 +206,6 @@ class Job: parameter_string += f'{v} ' return parameter_string - def execute(self, run_properties, run_script, helper_script, install_env=False, skip_env=False, - script_execution=False, do_inbetween_processing=False, srun_wrap=False, **kwargs): - if self.down_scoping: - wrapper = '' - if srun_wrap: - wrapper = modes.get_srun_cmd() - wrapper += (f' --jobid={self._mode.slurm_simple_job_id} ' if self._mode.slurm_simple_job_id else ' ') - if install_env: - wrapper += f' --export=CUSTOM_SHELL_CONFIG={self.shell_config}' - run_properties = f'sudo -u {self.account} {wrapper} {run_properties}' - os.chdir("/work") - command = [helper_script] - if run_properties: - command.extend([run_properties]) - if script_execution: - setup_env_scripts = self._mode.get_env_setup() - if setup_env_scripts: - command.extend(setup_env_scripts.split()) - command.extend(run_script.split(' ')) - if script_execution: - self._mode.custom_run_setup(install_env=install_env, skip_env=skip_env, script_execution=script_execution, - do_inbetween_processing=do_inbetween_processing, srun_wrap=srun_wrap, **kwargs) - logging.info(f'Executing with env: {str(self.custom_env)}') - logging.info(f'Executing command: {str(command)}') - os.chdir('/tmp') - main_proc = subprocess.Popen(command, - env=(dict(os.environ, **{x: self.custom_env[x] - for x in self.custom_env}) if not skip_env - else os.environ), - **kwargs) - if script_execution and do_inbetween_processing: - side_proc = self._mode.inbetween_processing(main_proc) - stdout, stderr = main_proc.communicate() - logging.debug(f'Finished main processing {main_proc.pid}') - if script_execution and do_inbetween_processing and side_proc: - logging.debug(f'Terminating side_proc {side_proc.pid}') - side_proc.terminate() - logging.debug(f'Terminated side_proc {side_proc.pid}') - job_was_canceled = get_cenv('CI_JOB_STATUS') == 'canceled' - cmd_return_code = 1 if (int(main_proc.returncode) != 0 or job_was_canceled) else 0 - if script_execution and (job_was_canceled or not self.allow_failure): - utility.update_json_kv(self.error_code_file, self.jobid, cmd_return_code) - if script_execution and cmd_return_code != 0: - if self._mode and not self.allow_failure: - self._mode.cleanup_on_failure() - sys.exit(cmd_return_code) - # FIXME: do not rely on internal implementation of subprocess.run - return subprocess.CompletedProcess(main_proc.args, main_proc.returncode, stdout, stderr) - def run(self): if not self.allow_failure: utility.update_json_kv(self.error_code_file, self.jobid, -1) diff --git a/core/modes/base.py b/core/modes/base.py index 5647bac..3082004 100644 --- a/core/modes/base.py +++ b/core/modes/base.py @@ -133,7 +133,7 @@ class ModeBase(ABC): scancel_out = self.executor.cancel(f'--name=CI_{self.job.pipeline_id}') logging.debug(f'doing cleanup res={scancel_out}') os.system(f'rm -r {self.job.shared_tmp}') - man.remove_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID")) + man.remove_id_mapping(f"{self.job.runner_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID")) try: os.rmdir(f'{self.job.runner_path}/scripts/{self.job.pipeline_id}') except (FileNotFoundError, OSError): diff --git a/core/modes/slurm/batch.py b/core/modes/slurm/batch.py index 5a44485..cd1ea3f 100644 --- a/core/modes/slurm/batch.py +++ b/core/modes/slurm/batch.py @@ -40,11 +40,11 @@ class Sbatch(Slurm, ABC): wrapper_add=f"/usr/bin/chmod o+r {self.slurm_output_file}") self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", wrapper_add=f"/usr/bin/cp /dev/stdin " - f"{self.job.build_path}/chmodPath{self.job.jobid}.sh", + f"{self.job.clone_path}/chmodPath{self.job.jobid}.sh", script=f"{self.job.runner_path}/core/scripts/chmodPath.sh") self.executor.management_handler(helper_script=f"{self.job.scripts_path}/execHelper.sh", wrapper_add=f"{self.job.shell_path} " - f"{self.job.build_path}/chmodPath{self.job.jobid}.sh", + f"{self.job.clone_path}/chmodPath{self.job.jobid}.sh", script=f"{self.slurm_output_dir}") else: os.makedirs(self.slurm_output_dir, exist_ok=True) diff --git a/core/modes/slurm/singularity.py b/core/modes/slurm/singularity.py index 918884d..ee14d71 100644 --- a/core/modes/slurm/singularity.py +++ b/core/modes/slurm/singularity.py @@ -32,7 +32,7 @@ class Singularity(Slurm, ABC): ModeBase.cleanup(self) self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", wrapper_add=f'/usr/bin/rm {self.job.user_path}/script{self.job.jobid}.sh') - self.executor.cancel(self.slurm_simple_job_id) + self.executor.cancel(f'{ModeBase.get_jobid_from_file(self.slurm_jobid_file)}') def get_singularity_script(self): script = "" @@ -100,7 +100,7 @@ class Singularity_Batch(Default, ABC): def cleanup(self): ModeBase.cleanup(self) - self.executor.cancel(self.slurm_simple_job_id) + self.executor.cancel(f'{ModeBase.get_jobid_from_file(self.slurm_jobid_file)}') def get_singularity_script(self): script = "" diff --git a/core/modes/slurm/srun.py b/core/modes/slurm/srun.py index b31c322..b97e900 100644 --- a/core/modes/slurm/srun.py +++ b/core/modes/slurm/srun.py @@ -68,4 +68,4 @@ class Slurm(ModeBase, ABC): self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", wrapper_add=f'/usr/bin/rm {self.job.user_path}/wrapper{self.job.jobid}.sh') ModeBase.cleanup(self) - self.executor.cancel(self.slurm_simple_job_id) + self.executor.cancel() diff --git a/core/utility/defines.py b/core/utility/defines.py index 44ff909..92d93dd 100644 --- a/core/utility/defines.py +++ b/core/utility/defines.py @@ -1,6 +1,6 @@ import logging name = 'Aixcellenz CI Driver' -version = '0.5.0' +version = '0.5.1' debug = False stdout_logging = False diff --git a/core/utility/executor.py b/core/utility/executor.py index ca207a8..fa352a8 100644 --- a/core/utility/executor.py +++ b/core/utility/executor.py @@ -165,7 +165,10 @@ class Slurm_Executor(Executor, ABC): logging.debug(management_out) return management_out - def cancel(self, jobid): + def cancel(self, jobid=""): + if jobid == "": + self.set_internal_slurm_job() + jobid = self.simple_job_id scancel_out = self.execute(helper_script=f"{self.job.scripts_path}/runHelper.sh", allocator=self.get_scancel_cmd(), params=f'{jobid}', -- GitLab