Skip to content
Snippets Groups Projects
Commit ff045057 authored by Felix Tomski's avatar Felix Tomski
Browse files

Remove internal slurm jobs if downscope is disabled

parent a1af5de0
No related branches found
No related tags found
No related merge requests found
...@@ -39,6 +39,7 @@ class SingleSlurmJobAcrossStages(Slurm, ABC): ...@@ -39,6 +39,7 @@ class SingleSlurmJobAcrossStages(Slurm, ABC):
logging.debug(f'run_properties salloc_out={salloc_out}') logging.debug(f'run_properties salloc_out={salloc_out}')
try: try:
self.slurm_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1) self.slurm_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1)
self.simple_job_id = self.slurm_job_id
except: except:
print(salloc_out, file=sys.stderr) print(salloc_out, file=sys.stderr)
exit(1) exit(1)
...@@ -52,8 +53,10 @@ class SingleSlurmJobAcrossStages(Slurm, ABC): ...@@ -52,8 +53,10 @@ class SingleSlurmJobAcrossStages(Slurm, ABC):
self.slurm_job_id = self.get_jobid_from_file(self.slurm_jobid_file) self.slurm_job_id = self.get_jobid_from_file(self.slurm_jobid_file)
logging.info(f'Using slurm_job_id={self.slurm_job_id}, could find {self.slurm_jobid_file}') logging.info(f'Using slurm_job_id={self.slurm_job_id}, could find {self.slurm_jobid_file}')
self.executor.simple_job_id = self.slurm_job_id
tmp_dir_srun_out = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", tmp_dir_srun_out = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
wrapper_add=f'/usr/bin/printenv TMP').split('\n') wrapper_add=f'/usr/bin/printenv TMP', force_allocator=True).split('\n')
logging.debug(f'srun tmp_dir output (unfitered): {tmp_dir_srun_out}') logging.debug(f'srun tmp_dir output (unfitered): {tmp_dir_srun_out}')
ignores = ['error', 'slurmstepd'] ignores = ['error', 'slurmstepd']
self.tmp_dir = [x for x in tmp_dir_srun_out if all(s not in x for s in ignores)] self.tmp_dir = [x for x in tmp_dir_srun_out if all(s not in x for s in ignores)]
...@@ -74,15 +77,23 @@ class SingleSlurmJobAcrossStages(Slurm, ABC): ...@@ -74,15 +77,23 @@ class SingleSlurmJobAcrossStages(Slurm, ABC):
return parameters return parameters
def get_simple_run_wrapper(self): def get_simple_run_wrapper(self):
if self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job: if (not self.job.downscope and self.job.args[1] != 'cleanup') or (self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job):
return self.get_run_wrapper() return self.get_run_wrapper()
return Slurm.get_run_wrapper(self) return Slurm.get_run_wrapper(self)
def get_simple_run_parameters(self): def get_simple_run_parameters(self):
if self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job: if (not self.job.downscope and self.job.args[1] != 'cleanup') or (self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job):
return self.get_run_parameters() return self.get_run_parameters()
return Slurm.get_simple_run_parameters(self) return Slurm.get_simple_run_parameters(self)
def run_simple_script(self):
out = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
params=self.get_simple_run_parameters(),
wrapper_add=self.get_simple_run_wrapper(),
script=self.get_simple_run_script(),
force_allocator=True)
print(out)
def cleanup(self): def cleanup(self):
if get_cenv('END_SINGLE_SLURM_JOB') == '1': if get_cenv('END_SINGLE_SLURM_JOB') == '1':
scancel_out = self.executor.cancel(f'{self.get_jobid_from_file(self.slurm_jobid_file)}') scancel_out = self.executor.cancel(f'{self.get_jobid_from_file(self.slurm_jobid_file)}')
......
...@@ -14,6 +14,7 @@ class Executor(ABC): ...@@ -14,6 +14,7 @@ class Executor(ABC):
self.job = job self.job = job
if downscope: if downscope:
self.downscope_add = f"sudo -u {self.job.account}" self.downscope_add = f"sudo -u {self.job.account}"
self.downscope = downscope
# Executes internal management functions, e.g., setup scripts etc. # Executes internal management functions, e.g., setup scripts etc.
@abstractmethod @abstractmethod
...@@ -25,6 +26,8 @@ class Executor(ABC): ...@@ -25,6 +26,8 @@ class Executor(ABC):
def run_direct(self, params="", wrapper_add="", script=""): def run_direct(self, params="", wrapper_add="", script=""):
pass pass
# TODO: Use https://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running
# to capture and print live output
def execute(self, helper_script='', allocator='', params='', wrapper_add='', pre_exec_scripts=[], def execute(self, helper_script='', allocator='', params='', wrapper_add='', pre_exec_scripts=[],
target_script='', skip_env=False, run_async=False, main_script=False, install_env=False, **kwargs): target_script='', skip_env=False, run_async=False, main_script=False, install_env=False, **kwargs):
if main_script: if main_script:
...@@ -117,6 +120,9 @@ class Slurm_Executor(Executor, ABC): ...@@ -117,6 +120,9 @@ class Slurm_Executor(Executor, ABC):
return srun_out == "" return srun_out == ""
def set_internal_slurm_job(self): def set_internal_slurm_job(self):
if not self.downscope:
return
if self.simple_job_id: if self.simple_job_id:
if self.is_job_alive(self.simple_job_id): if self.is_job_alive(self.simple_job_id):
return return
...@@ -142,7 +148,9 @@ class Slurm_Executor(Executor, ABC): ...@@ -142,7 +148,9 @@ class Slurm_Executor(Executor, ABC):
man.add_id_mapping(f"{self.job.runner_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"), man.add_id_mapping(f"{self.job.runner_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"),
self.simple_job_id) self.simple_job_id)
except AttributeError: except AttributeError:
self.job.mode.abort(f'Could not allocate a Slurm job for internal usage') self.job.mode.abort(f'Could not allocate a Slurm job for internal usage\n'
f'runner_path={self.job.runner_path}'
f'\nallocator_out={salloc_out}')
def allocator(self, params=""): def allocator(self, params=""):
logging.debug(f'allocating job for pipeline {self.job.pipeline_id}') logging.debug(f'allocating job for pipeline {self.job.pipeline_id}')
...@@ -154,14 +162,15 @@ class Slurm_Executor(Executor, ABC): ...@@ -154,14 +162,15 @@ class Slurm_Executor(Executor, ABC):
logging.debug(f' salloc output: {salloc_out}') logging.debug(f' salloc output: {salloc_out}')
return salloc_out return salloc_out
def management_handler(self, helper_script="", params="", wrapper_add="", script="", install_env=False): def management_handler(self, helper_script="", params="", wrapper_add="", script="", install_env=False, force_allocator=False):
if helper_script == '': if helper_script == '':
helper_script = f"{self.job.scripts_path}/runHelper.sh" helper_script = f"{self.job.scripts_path}/runHelper.sh"
if self.downscope:
self.set_internal_slurm_job() self.set_internal_slurm_job()
management_out = self.execute(helper_script=helper_script, management_out = self.execute(helper_script=helper_script,
allocator=self.get_srun_cmd(), allocator=self.get_srun_cmd() if (self.downscope or force_allocator) else '',
params=f'--jobid={self.simple_job_id} {self.get_default_params()} {params}', params=f'--jobid={self.simple_job_id} {self.get_default_params()} {params}' if (self.downscope or force_allocator) else '',
wrapper_add=wrapper_add, wrapper_add=wrapper_add,
#pre_exec_scripts=[self.job.shell_config], #pre_exec_scripts=[self.job.shell_config],
target_script=script, install_env=install_env, target_script=script, install_env=install_env,
...@@ -170,9 +179,11 @@ class Slurm_Executor(Executor, ABC): ...@@ -170,9 +179,11 @@ class Slurm_Executor(Executor, ABC):
return management_out return management_out
def cancel(self, jobid=""): def cancel(self, jobid=""):
if jobid == "": if jobid == "" and self.downscope:
self.set_internal_slurm_job() self.set_internal_slurm_job()
jobid = self.simple_job_id jobid = self.simple_job_id
elif jobid == "" and not self.downscope:
return ''
scancel_out = self.execute(helper_script=f"{self.job.scripts_path}/runHelper.sh", scancel_out = self.execute(helper_script=f"{self.job.scripts_path}/runHelper.sh",
allocator=self.get_scancel_cmd(), allocator=self.get_scancel_cmd(),
params=f'{jobid}', params=f'{jobid}',
...@@ -181,14 +192,14 @@ class Slurm_Executor(Executor, ABC): ...@@ -181,14 +192,14 @@ class Slurm_Executor(Executor, ABC):
return scancel_out return scancel_out
def run_direct(self, params="", wrapper_add="", script=""): def run_direct(self, params="", wrapper_add="", script=""):
srun_out = self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", out = self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
allocator=self.get_srun_cmd(), allocator=self.get_srun_cmd(),
params=f'{self.get_default_params()} {params}', params=f'{self.get_default_params()} {params}',
target_script=script, target_script=script,
wrapper_add=wrapper_add, main_script=True, wrapper_add=wrapper_add, main_script=True,
text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
logging.debug(srun_out) logging.debug(out)
return srun_out return out
def run_batched(self, params="", wrapper_add="", script=""): def run_batched(self, params="", wrapper_add="", script=""):
sbatch_out = self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", sbatch_out = self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
......
...@@ -143,6 +143,7 @@ single-build-job: ...@@ -143,6 +143,7 @@ single-build-job:
script: script:
- echo "JOBID ${SLURM_JOB_ID}" - echo "JOBID ${SLURM_JOB_ID}"
- echo "Building on $(hostname) into $TMP" - echo "Building on $(hostname) into $TMP"
- $SHELL utility/.gitlab/sbatch.sh
- !reference [.partition-check, script] - !reference [.partition-check, script]
single-unit-test-job: single-unit-test-job:
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
module list module list
module load Python module load Python
for i in $(seq 20); for i in $(seq 60);
do do
sleep 1 sleep 1
echo $i echo $i
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment