Select Git revision
Floor.uasset
-
David Gilbert authoredDavid Gilbert authored
shared.py 4.73 KiB
from core.modes.common import *
from core.modes.base import ModeBase
from .srun import Slurm
class SingleSlurmJobAcrossStages(Slurm, ABC):
substeps_in_shared_job = ('build_script', 'step_script', 'get_sources',
'upload_artifacts_on_success', 'upload_artifacts_on_failure')
def get_jobid_from_file(self, path):
with open(path, 'r') as node_index_fp:
return node_index_fp.readline().strip()
@staticmethod
def get_node_id_str(variables):
if variables and len(variables):
return ''.join([os.getenv(v).replace('/', '') for v in variables])
return 'Sequential'
@staticmethod
def get_env_for_single_slurm_job(variables):
return {v.replace('CUSTOM_ENV_PARVAR_', ''): os.getenv(v) for v in variables}
def __init__(self, job):
Slurm.__init__(self, job)
self.slurm_job_id = None
self.id_vars = [v for v in os.environ if v.startswith('CUSTOM_ENV_PARVAR_')]
os.makedirs(f'{self.job.shared_tmp}/SlurmJobIds', exist_ok=True)
os.makedirs(f'{self.job.tmp_dir}/SlurmJobIds', exist_ok=True)
self.slurm_jobid_file = f'{self.job.shared_tmp}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt'
self.cc_slurm_jobid_file = f'{self.job.tmp_dir}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt'
self.tmp_dir = None
def get_custom_config(self):
if not os.path.isfile(self.slurm_jobid_file):
params = ['--no-shell', f'--job-name=CI_{self.job.pipeline_id}'] + \
self.job.get_parameters().split(" ")
salloc_out = self.executor.allocator(params=' '.join(params))
logging.debug(f'run_properties salloc_out={salloc_out}')
self.slurm_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1)
with open(self.slurm_jobid_file, 'w+') as slurm_jobid_fp:
slurm_jobid_fp.write(self.slurm_job_id + '\n')
with open(self.cc_slurm_jobid_file, 'w+') as slurm_jobid_fp:
slurm_jobid_fp.write(self.slurm_job_id + '\n')
logging.info(f'Started new slurm_job_id={self.slurm_job_id}, could not find {self.slurm_jobid_file}')
else:
self.slurm_job_id = self.get_jobid_from_file(self.slurm_jobid_file)
logging.info(f'Using slurm_job_id={self.slurm_job_id}, could find {self.slurm_jobid_file}')
tmp_dir_srun_out = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
wrapper_add=f'--jobid {self.slurm_job_id} /usr/bin/printenv TMP').split('\n')
logging.debug(f'srun tmp_dir output (unfitered): {tmp_dir_srun_out}')
ignores = ['error', 'slurmstepd']
self.tmp_dir = [x for x in tmp_dir_srun_out if all(s not in x for s in ignores)]
logging.debug(f'srun tmp_dir output: {self.tmp_dir}')
self.tmp_dir = self.tmp_dir[0]
def get_run_parameters(self):
self.get_custom_config()
parameters = Slurm.get_run_parameters(self)
additional_env = []
for k, v in self.get_env_for_single_slurm_job(self.id_vars).items():
additional_env.append(f"{k}={v}")
if len(additional_env) != 0:
parameters += f' --export=' + ",".join(additional_env)
parameters += f' --jobid={self.slurm_job_id}'
return parameters
def get_simple_run_wrapper(self):
if self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job:
return self.get_run_wrapper()
return Slurm.get_run_wrapper(self)
def get_simple_run_parameters(self):
if self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job:
return self.get_run_parameters()
return Slurm.get_simple_run_parameters(self)
def cleanup(self):
if get_cenv('END_SINGLE_SLURM_JOB') == '1':
scancel_out = self.executor.cancel(f'{self.get_jobid_from_file(self.slurm_jobid_file)}')
logging.debug(f'cleanup res={scancel_out}')
try:
os.remove(self.slurm_jobid_file)
except FileNotFoundError:
pass
# Cleanup the directory with the jobIds of the whole pipeline
try:
os.rmdir(f'{self.job.shared_tmp}/SlurmJobIds')
except (FileNotFoundError, OSError):
pass
try:
os.rmdir(f'{self.job.shared_tmp}')
except (FileNotFoundError, OSError):
pass
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
wrapper_add=f'/usr/bin/rm {self.job.user_path}/wrapper{self.job.jobid}.sh')
ModeBase.cleanup(self)