Skip to content
Snippets Groups Projects
Select Git revision
  • e853f6696573a48d6b315b77e47c568a274f6af9
  • main default protected
  • release
3 results

measurements.py

Blame
  • shared.py 4.73 KiB
    from core.modes.common import *
    from core.modes.base import ModeBase
    from .srun import Slurm
    
    
    class SingleSlurmJobAcrossStages(Slurm, ABC):
        substeps_in_shared_job = ('build_script', 'step_script', 'get_sources',
                                'upload_artifacts_on_success', 'upload_artifacts_on_failure')
    
        def get_jobid_from_file(self, path):
            with open(path, 'r') as node_index_fp:
                return node_index_fp.readline().strip()
    
        @staticmethod
        def get_node_id_str(variables):
            if variables and len(variables):
                return ''.join([os.getenv(v).replace('/', '') for v in variables])
            return 'Sequential'
    
        @staticmethod
        def get_env_for_single_slurm_job(variables):
            return {v.replace('CUSTOM_ENV_PARVAR_', ''): os.getenv(v) for v in variables}
    
        def __init__(self, job):
            Slurm.__init__(self, job)
            self.slurm_job_id = None
            self.id_vars = [v for v in os.environ if v.startswith('CUSTOM_ENV_PARVAR_')]
            os.makedirs(f'{self.job.shared_tmp}/SlurmJobIds', exist_ok=True)
            os.makedirs(f'{self.job.tmp_dir}/SlurmJobIds', exist_ok=True)
            self.slurm_jobid_file = f'{self.job.shared_tmp}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt'
            self.cc_slurm_jobid_file = f'{self.job.tmp_dir}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt'
            self.tmp_dir = None
    
        def get_custom_config(self):
            if not os.path.isfile(self.slurm_jobid_file):
                params = ['--no-shell', f'--job-name=CI_{self.job.pipeline_id}'] + \
                         self.job.get_parameters().split(" ")
                salloc_out = self.executor.allocator(params=' '.join(params))
                logging.debug(f'run_properties salloc_out={salloc_out}')
                self.slurm_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1)
                with open(self.slurm_jobid_file, 'w+') as slurm_jobid_fp:
                    slurm_jobid_fp.write(self.slurm_job_id + '\n')
                with open(self.cc_slurm_jobid_file, 'w+') as slurm_jobid_fp:
                    slurm_jobid_fp.write(self.slurm_job_id + '\n')
                logging.info(f'Started new slurm_job_id={self.slurm_job_id}, could not find {self.slurm_jobid_file}')
            else:
                self.slurm_job_id = self.get_jobid_from_file(self.slurm_jobid_file)
                logging.info(f'Using slurm_job_id={self.slurm_job_id}, could find {self.slurm_jobid_file}')
    
            tmp_dir_srun_out = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
                                                                wrapper_add=f'--jobid {self.slurm_job_id} /usr/bin/printenv TMP').split('\n')
            logging.debug(f'srun tmp_dir output (unfitered): {tmp_dir_srun_out}')
            ignores = ['error', 'slurmstepd']
            self.tmp_dir = [x for x in tmp_dir_srun_out if all(s not in x for s in ignores)]
            logging.debug(f'srun tmp_dir output: {self.tmp_dir}')
            self.tmp_dir = self.tmp_dir[0]
    
        def get_run_parameters(self):
            self.get_custom_config()
    
            parameters = Slurm.get_run_parameters(self)
            additional_env = []
            for k, v in self.get_env_for_single_slurm_job(self.id_vars).items():
                additional_env.append(f"{k}={v}")
            if len(additional_env) != 0:
                parameters += f' --export=' + ",".join(additional_env)
            parameters += f' --jobid={self.slurm_job_id}'
            return parameters
    
        def get_simple_run_wrapper(self):
            if self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job:
                return self.get_run_wrapper()
            return Slurm.get_run_wrapper(self)
    
        def get_simple_run_parameters(self):
            if self.job.args[1] == 'run' and self.job.args[3] in self.substeps_in_shared_job:
                return self.get_run_parameters()
            return Slurm.get_simple_run_parameters(self)
    
        def cleanup(self):
            if get_cenv('END_SINGLE_SLURM_JOB') == '1':
                scancel_out = self.executor.cancel(f'{self.get_jobid_from_file(self.slurm_jobid_file)}')
                logging.debug(f'cleanup res={scancel_out}')
                try:
                    os.remove(self.slurm_jobid_file)
                except FileNotFoundError:
                    pass
                # Cleanup the directory with the jobIds of the whole pipeline
                try:
                    os.rmdir(f'{self.job.shared_tmp}/SlurmJobIds')
                except (FileNotFoundError, OSError):
                    pass
    
                try:
                    os.rmdir(f'{self.job.shared_tmp}')
                except (FileNotFoundError, OSError):
                    pass
            self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
                                             wrapper_add=f'/usr/bin/rm {self.job.user_path}/wrapper{self.job.jobid}.sh')
            ModeBase.cleanup(self)