diff --git a/core/modes/slurm/shared.py b/core/modes/slurm/shared.py index 7c4e9f6ca9ecc4da297a66f9daa58706dc664b06..723484d2922dc39c595a3c62e99cf206d578334d 100644 --- a/core/modes/slurm/shared.py +++ b/core/modes/slurm/shared.py @@ -39,6 +39,7 @@ class SingleSlurmJobAcrossStages(Slurm, ABC): logging.debug(f'run_properties salloc_out={salloc_out}') try: self.slurm_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1) + self.simple_job_id = self.slurm_job_id except: print(salloc_out, file=sys.stderr) exit(1) @@ -52,8 +53,10 @@ class SingleSlurmJobAcrossStages(Slurm, ABC): self.slurm_job_id = self.get_jobid_from_file(self.slurm_jobid_file) logging.info(f'Using slurm_job_id={self.slurm_job_id}, could find {self.slurm_jobid_file}') + self.executor.simple_job_id = self.slurm_job_id + tmp_dir_srun_out = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", - wrapper_add=f'/usr/bin/printenv TMP').split('\n') + wrapper_add=f'/usr/bin/printenv TMP', force_allocator=True).split('\n') logging.debug(f'srun tmp_dir output (unfitered): {tmp_dir_srun_out}') ignores = ['error', 'slurmstepd'] self.tmp_dir = [x for x in tmp_dir_srun_out if all(s not in x for s in ignores)] @@ -74,15 +77,23 @@ class SingleSlurmJobAcrossStages(Slurm, ABC): return parameters def get_simple_run_wrapper(self): - if self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job: + if (not self.job.downscope and self.job.args[1] != 'cleanup') or (self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job): return self.get_run_wrapper() return Slurm.get_run_wrapper(self) def get_simple_run_parameters(self): - if self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job: + if (not self.job.downscope and self.job.args[1] != 'cleanup') or (self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job): return self.get_run_parameters() return Slurm.get_simple_run_parameters(self) + def run_simple_script(self): + out = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + params=self.get_simple_run_parameters(), + wrapper_add=self.get_simple_run_wrapper(), + script=self.get_simple_run_script(), + force_allocator=True) + print(out) + def cleanup(self): if get_cenv('END_SINGLE_SLURM_JOB') == '1': scancel_out = self.executor.cancel(f'{self.get_jobid_from_file(self.slurm_jobid_file)}') diff --git a/core/utility/executor.py b/core/utility/executor.py index 365795081ab41a9b8dee5b6894c092768df360e4..c807e91c9e28199eb096b01118fee309735a967f 100644 --- a/core/utility/executor.py +++ b/core/utility/executor.py @@ -14,6 +14,7 @@ class Executor(ABC): self.job = job if downscope: self.downscope_add = f"sudo -u {self.job.account}" + self.downscope = downscope # Executes internal management functions, e.g., setup scripts etc. @abstractmethod @@ -25,6 +26,8 @@ class Executor(ABC): def run_direct(self, params="", wrapper_add="", script=""): pass + # TODO: Use https://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running + # to capture and print live output def execute(self, helper_script='', allocator='', params='', wrapper_add='', pre_exec_scripts=[], target_script='', skip_env=False, run_async=False, main_script=False, install_env=False, **kwargs): if main_script: @@ -117,6 +120,9 @@ class Slurm_Executor(Executor, ABC): return srun_out == "" def set_internal_slurm_job(self): + if not self.downscope: + return + if self.simple_job_id: if self.is_job_alive(self.simple_job_id): return @@ -142,7 +148,9 @@ class Slurm_Executor(Executor, ABC): man.add_id_mapping(f"{self.job.runner_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"), self.simple_job_id) except AttributeError: - self.job.mode.abort(f'Could not allocate a Slurm job for internal usage') + self.job.mode.abort(f'Could not allocate a Slurm job for internal usage\n' + f'runner_path={self.job.runner_path}' + f'\nallocator_out={salloc_out}') def allocator(self, params=""): logging.debug(f'allocating job for pipeline {self.job.pipeline_id}') @@ -154,14 +162,15 @@ class Slurm_Executor(Executor, ABC): logging.debug(f' salloc output: {salloc_out}') return salloc_out - def management_handler(self, helper_script="", params="", wrapper_add="", script="", install_env=False): + def management_handler(self, helper_script="", params="", wrapper_add="", script="", install_env=False, force_allocator=False): if helper_script == '': helper_script = f"{self.job.scripts_path}/runHelper.sh" - self.set_internal_slurm_job() + if self.downscope: + self.set_internal_slurm_job() management_out = self.execute(helper_script=helper_script, - allocator=self.get_srun_cmd(), - params=f'--jobid={self.simple_job_id} {self.get_default_params()} {params}', + allocator=self.get_srun_cmd() if (self.downscope or force_allocator) else '', + params=f'--jobid={self.simple_job_id} {self.get_default_params()} {params}' if (self.downscope or force_allocator) else '', wrapper_add=wrapper_add, #pre_exec_scripts=[self.job.shell_config], target_script=script, install_env=install_env, @@ -170,9 +179,11 @@ class Slurm_Executor(Executor, ABC): return management_out def cancel(self, jobid=""): - if jobid == "": + if jobid == "" and self.downscope: self.set_internal_slurm_job() jobid = self.simple_job_id + elif jobid == "" and not self.downscope: + return '' scancel_out = self.execute(helper_script=f"{self.job.scripts_path}/runHelper.sh", allocator=self.get_scancel_cmd(), params=f'{jobid}', @@ -181,14 +192,14 @@ class Slurm_Executor(Executor, ABC): return scancel_out def run_direct(self, params="", wrapper_add="", script=""): - srun_out = self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + out = self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", allocator=self.get_srun_cmd(), params=f'{self.get_default_params()} {params}', target_script=script, wrapper_add=wrapper_add, main_script=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout - logging.debug(srun_out) - return srun_out + logging.debug(out) + return out def run_batched(self, params="", wrapper_add="", script=""): sbatch_out = self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", diff --git a/utility/.gitlab/.template.yml b/utility/.gitlab/.template.yml index 97e339a3ed543b90c39f0ef62261a2036e990630..a10401770350708718c85e8a2cb485492d20c95d 100644 --- a/utility/.gitlab/.template.yml +++ b/utility/.gitlab/.template.yml @@ -143,6 +143,7 @@ single-build-job: script: - echo "JOBID ${SLURM_JOB_ID}" - echo "Building on $(hostname) into $TMP" + - $SHELL utility/.gitlab/sbatch.sh - !reference [.partition-check, script] single-unit-test-job: diff --git a/utility/.gitlab/sbatch.sh b/utility/.gitlab/sbatch.sh index 8bd094cf9a896927e0a38870f457944fabe251a8..cc68a0f750b82851de03bbda05d57741ce0a4fce 100755 --- a/utility/.gitlab/sbatch.sh +++ b/utility/.gitlab/sbatch.sh @@ -12,7 +12,7 @@ module list module load Python -for i in $(seq 20); +for i in $(seq 60); do sleep 1 echo $i