Skip to content
Snippets Groups Projects
Commit 78f8c2fb authored by Felix Tomski's avatar Felix Tomski
Browse files

Introduce default slurm params

Introduces `SLURM_DPARAM` environment variables which are also used for internal slurm jobs. `SLURM_PARAM` overwrite these parameters.
parent b9ad2496
No related branches found
No related tags found
No related merge requests found
......@@ -199,11 +199,11 @@ class Job:
logging.info(f'ci_mode: {get_cenv("CI_MODE")}')
logging.info(f'ci_job_id: {get_cenv("CI_JOB_ID")}')
def get_parameters(self):
parameters = {k: v for k, v in os.environ.items() if k.startswith("CUSTOM_ENV_SLURM_PARAM")}
def get_parameters(self, prefixes=['SLURM_PARAM']):
parameter_string = ''
for _, v in parameters.items():
parameter_string += f'{v} '
# NOTE: Preserve order of prefixes, so that later prefixes overwrite prior ones
for prefix in prefixes:
parameter_string += ' '.join([v for k,v in os.environ.items() if k.startswith(f'CUSTOM_ENV_{prefix}')])
return parameter_string
def run(self):
......
......@@ -66,12 +66,12 @@ class SingleSlurmJobAcrossStages(Slurm, ABC):
return parameters
def get_simple_run_wrapper(self):
if self.job.args[1] == 'run':
if self.job.args[1] == 'run' and self.job.args[3] in ['build_script', 'step_script', 'get_sources']:
return self.get_run_wrapper()
return Slurm.get_run_wrapper(self)
def get_simple_run_parameters(self):
if self.job.args[1] == 'run':
if self.job.args[1] == 'run' and self.job.args[3] in ['build_script', 'step_script', 'get_sources']:
return self.get_run_parameters()
return Slurm.get_simple_run_parameters(self)
......
......@@ -51,6 +51,8 @@ class Executor(ABC):
time.sleep(1)
stdout, stderr = main_proc.communicate()
logging.debug(f'Finished main processing {main_proc.pid}')
logging.debug(f'---stdout---\n {stdout}')
logging.debug(f'---stderr---\n {stderr}')
if main_script and run_async and side_proc:
logging.debug(f'Terminating side_proc {side_proc.pid}')
side_proc.terminate()
......@@ -62,6 +64,7 @@ class Executor(ABC):
if main_script and cmd_return_code != 0:
if self.job.mode and not self.job.allow_failure:
self.job.mode.cleanup_on_failure()
print(subprocess.CompletedProcess(main_proc.args, main_proc.returncode, stdout, stderr).stdout)
sys.exit(cmd_return_code)
# FIXME: do not rely on internal implementation of subprocess.run
return subprocess.CompletedProcess(main_proc.args, main_proc.returncode, stdout, stderr)
......@@ -100,6 +103,9 @@ class Slurm_Executor(Executor, ABC):
def __init__(self, job, downscope=False):
Executor.__init__(self, job, downscope=downscope)
def get_default_params(self):
return self.job.get_parameters(['SLURM_DPARAM'])
def is_job_alive(self, job_id):
srun_out = self.execute(helper_script=f"{self.job.scripts_path}/runHelper.sh",
allocator=self.get_srun_cmd(),
......@@ -142,7 +148,7 @@ class Slurm_Executor(Executor, ABC):
logging.debug(f'allocating job for pipeline {self.job.pipeline_id}')
salloc_out = self.execute(helper_script=f"{self.job.scripts_path}/runHelper.sh",
allocator=self.get_salloc_cmd(),
params=params,
params=f'{self.get_default_params()} {params}',
text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
#print(salloc_out)
logging.debug(f' salloc output: {salloc_out}')
......@@ -155,7 +161,7 @@ class Slurm_Executor(Executor, ABC):
self.set_internal_slurm_job()
management_out = self.execute(helper_script=helper_script,
allocator=self.get_srun_cmd(),
params=f"--jobid={self.simple_job_id} " + params,
params=f'--jobid={self.simple_job_id} {self.get_default_params()} {params}',
wrapper_add=wrapper_add,
target_script=script, install_env=install_env,
text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
......@@ -176,7 +182,7 @@ class Slurm_Executor(Executor, ABC):
def run_direct(self, params="", wrapper_add="", script=""):
srun_out = self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
allocator=self.get_srun_cmd(),
params=params,
params=f'{self.get_default_params()} {params}',
target_script=script,
wrapper_add=wrapper_add, main_script=True,
text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
......@@ -186,7 +192,7 @@ class Slurm_Executor(Executor, ABC):
def run_batched(self, params="", wrapper_add="", script=""):
sbatch_out = self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
allocator=self.get_sbatch_cmd(),
params=f'--export=NONE {params}',
params=f'--export=NONE {self.get_default_params()} {params}',
wrapper_add=wrapper_add,
target_script=script,
main_script=True, run_async=True,
......
......@@ -35,7 +35,7 @@ default:
variables:
SLURM_PARAM_TIMELIMIT: "-t 10:00"
SLURM_PARAM_PARTITION: "-p devel"
SLURM_DPARAM_PARTITION: "-p devel"
SLURM_PARAM_CPUS: "-c 1"
GIT_STRATEGY: clone
......@@ -58,8 +58,26 @@ variables:
SLURM_PARAM_CPUS: "-c 1"
CI_MODE: "SingleSlurmJobAcrossStages"
.partition-check:
variables:
_PARTITION: "c18m"
SLURM_PARAM_PARTITION: "-p ${_PARTITION}"
script:
- echo "SLURM_JOB_PARTITION ${SLURM_JOB_PARTITION}"
- |
if [[ "${SLURM_JOB_PARTITION}" != "${_PARTITION}" ]]
then
echo "${SLURM_JOB_PARTITION} != ${_PARTITION}"
exit 1
else
echo "${SLURM_JOB_PARTITION} == ${_PARTITION}"
fi
parallel-build-job: # This job runs in the build stage, which runs first.
extends: .parallel-job
extends:
- .parallel-job
- .partition-check
stage: build
variables:
BEGIN_SINGLE_SLURM_JOB: "1"
......@@ -70,6 +88,7 @@ parallel-build-job: # This job runs in the build stage, which runs first.
- echo "NODEINDEX ${CI_NODE_INDEX}"
- echo "Building on $(hostname) into $TMP/$X"
- echo "${X}${Y}" > $TMP/$X
- !reference [.partition-check, script]
parallel-unit-test-job: # This job runs in the test stage.
stage: unit-test # It only starts when the job in the build stage completes successfully.
......@@ -111,10 +130,13 @@ parallel-extensive-test-job: # This job runs in the test stage.
single-build-job:
stage: build
extends: .sequential-job
extends:
- .sequential-job
- .partition-check
script:
- echo "JOBID ${SLURM_JOB_ID}"
- echo "Building on $(hostname) into $TMP"
- !reference [.partition-check, script]
single-unit-test-job:
stage: unit-test
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment