Skip to content
Snippets Groups Projects
Commit af56a23b authored by Adrian Schmitz's avatar Adrian Schmitz
Browse files

Resolve "fix timing issue for async jobs"

parent f9065fbc
Branches
Tags
No related merge requests found
......@@ -44,7 +44,7 @@ class Job:
@property
def build_path(self):
return f'{self.user_path}/builds/{get_cenv("CI_CONCURRENT_PROJECT_ID")}/{get_cenv("CI_PROJECT_PATH_SLUG")}'
return f'{self.user_path}/cache/{get_cenv("CI_CONCURRENT_PROJECT_ID")}/{get_cenv("CI_PROJECT_PATH_SLUG")}'
@property
def cache_path(self):
......@@ -206,55 +206,6 @@ class Job:
parameter_string += f'{v} '
return parameter_string
def execute(self, run_properties, run_script, helper_script, install_env=False, skip_env=False,
script_execution=False, do_inbetween_processing=False, srun_wrap=False, **kwargs):
if self.down_scoping:
wrapper = ''
if srun_wrap:
wrapper = modes.get_srun_cmd()
wrapper += (f' --jobid={self._mode.slurm_simple_job_id} ' if self._mode.slurm_simple_job_id else ' ')
if install_env:
wrapper += f' --export=CUSTOM_SHELL_CONFIG={self.shell_config}'
run_properties = f'sudo -u {self.account} {wrapper} {run_properties}'
os.chdir("/work")
command = [helper_script]
if run_properties:
command.extend([run_properties])
if script_execution:
setup_env_scripts = self._mode.get_env_setup()
if setup_env_scripts:
command.extend(setup_env_scripts.split())
command.extend(run_script.split(' '))
if script_execution:
self._mode.custom_run_setup(install_env=install_env, skip_env=skip_env, script_execution=script_execution,
do_inbetween_processing=do_inbetween_processing, srun_wrap=srun_wrap, **kwargs)
logging.info(f'Executing with env: {str(self.custom_env)}')
logging.info(f'Executing command: {str(command)}')
os.chdir('/tmp')
main_proc = subprocess.Popen(command,
env=(dict(os.environ, **{x: self.custom_env[x]
for x in self.custom_env}) if not skip_env
else os.environ),
**kwargs)
if script_execution and do_inbetween_processing:
side_proc = self._mode.inbetween_processing(main_proc)
stdout, stderr = main_proc.communicate()
logging.debug(f'Finished main processing {main_proc.pid}')
if script_execution and do_inbetween_processing and side_proc:
logging.debug(f'Terminating side_proc {side_proc.pid}')
side_proc.terminate()
logging.debug(f'Terminated side_proc {side_proc.pid}')
job_was_canceled = get_cenv('CI_JOB_STATUS') == 'canceled'
cmd_return_code = 1 if (int(main_proc.returncode) != 0 or job_was_canceled) else 0
if script_execution and (job_was_canceled or not self.allow_failure):
utility.update_json_kv(self.error_code_file, self.jobid, cmd_return_code)
if script_execution and cmd_return_code != 0:
if self._mode and not self.allow_failure:
self._mode.cleanup_on_failure()
sys.exit(cmd_return_code)
# FIXME: do not rely on internal implementation of subprocess.run
return subprocess.CompletedProcess(main_proc.args, main_proc.returncode, stdout, stderr)
def run(self):
if not self.allow_failure:
utility.update_json_kv(self.error_code_file, self.jobid, -1)
......
......@@ -133,7 +133,7 @@ class ModeBase(ABC):
scancel_out = self.executor.cancel(f'--name=CI_{self.job.pipeline_id}')
logging.debug(f'doing cleanup res={scancel_out}')
os.system(f'rm -r {self.job.shared_tmp}')
man.remove_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"))
man.remove_id_mapping(f"{self.job.runner_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"))
try:
os.rmdir(f'{self.job.runner_path}/scripts/{self.job.pipeline_id}')
except (FileNotFoundError, OSError):
......
......@@ -40,11 +40,11 @@ class Sbatch(Slurm, ABC):
wrapper_add=f"/usr/bin/chmod o+r {self.slurm_output_file}")
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
wrapper_add=f"/usr/bin/cp /dev/stdin "
f"{self.job.build_path}/chmodPath{self.job.jobid}.sh",
f"{self.job.clone_path}/chmodPath{self.job.jobid}.sh",
script=f"{self.job.runner_path}/core/scripts/chmodPath.sh")
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/execHelper.sh",
wrapper_add=f"{self.job.shell_path} "
f"{self.job.build_path}/chmodPath{self.job.jobid}.sh",
f"{self.job.clone_path}/chmodPath{self.job.jobid}.sh",
script=f"{self.slurm_output_dir}")
else:
os.makedirs(self.slurm_output_dir, exist_ok=True)
......
......@@ -32,7 +32,7 @@ class Singularity(Slurm, ABC):
ModeBase.cleanup(self)
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
wrapper_add=f'/usr/bin/rm {self.job.user_path}/script{self.job.jobid}.sh')
self.executor.cancel(self.slurm_simple_job_id)
self.executor.cancel(f'{ModeBase.get_jobid_from_file(self.slurm_jobid_file)}')
def get_singularity_script(self):
script = ""
......@@ -100,7 +100,7 @@ class Singularity_Batch(Default, ABC):
def cleanup(self):
ModeBase.cleanup(self)
self.executor.cancel(self.slurm_simple_job_id)
self.executor.cancel(f'{ModeBase.get_jobid_from_file(self.slurm_jobid_file)}')
def get_singularity_script(self):
script = ""
......
......@@ -68,4 +68,4 @@ class Slurm(ModeBase, ABC):
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
wrapper_add=f'/usr/bin/rm {self.job.user_path}/wrapper{self.job.jobid}.sh')
ModeBase.cleanup(self)
self.executor.cancel(self.slurm_simple_job_id)
self.executor.cancel()
import logging
name = 'Aixcellenz CI Driver'
version = '0.5.0'
version = '0.5.1'
debug = False
stdout_logging = False
......
......@@ -165,7 +165,10 @@ class Slurm_Executor(Executor, ABC):
logging.debug(management_out)
return management_out
def cancel(self, jobid):
def cancel(self, jobid=""):
if jobid == "":
self.set_internal_slurm_job()
jobid = self.simple_job_id
scancel_out = self.execute(helper_script=f"{self.job.scripts_path}/runHelper.sh",
allocator=self.get_scancel_cmd(),
params=f'{jobid}',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment