Skip to content
Snippets Groups Projects
Select Git revision
  • 50a7abd19e1a3ff86307c333be5e7a8ef346e949
  • main default protected
  • vac_in_initial_conditions
3 results

final_plots.py

Blame
  • batch.py 6.05 KiB
    from core.modes.common import *
    from .srun import Slurm
    from core.modes.base import ModeBase
    from ...utility.executor import async_process
    
    
    def get_batch_properties(self, batch_script):
        # cmd_out = ""
        if self.job.down_scoping:
            stdout = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
                                                      wrapper_add=f"/usr/bin/cat {batch_script}")
            cmd_out = ' '.join([l.split()[1] for l in stdout.split("\n") if l.startswith('#SBATCH')])
        else:
            with open(batch_script, 'r') as f:
                cmd_out = ' '.join([l.split()[1] for l in f.readlines() if l.startswith('#SBATCH')])
        return cmd_out
    
    
    class Sbatch(Slurm, ABC):
        def __init__(self, job):
            Slurm.__init__(self, job)
            self.slurm_output_dir = f'{self.job.clone_path}/slurm_output'
            self.slurm_output_file = f'{self.slurm_output_dir}/so_{self.job.jobid}.txt'
    
        def get_run_parameters(self):
            batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
            parameters = f'--wait {get_batch_properties(self, batch_script)} {self.job.get_parameters()} ' \
                         f'--output={self.slurm_output_file}'
            return parameters
    
        def custom_run_setup(self, **kwargs):
            if kwargs["main_script"] and kwargs["run_async"]:
                logging.debug('Creating so file')
                if self.job.down_scoping:
                    self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
                                                     wrapper_add=f"/usr/bin/mkdir -p {self.slurm_output_dir}")
                    self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
                                                     wrapper_add=f"/usr/bin/touch {self.slurm_output_file}")
                    self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
                                                     wrapper_add=f"/usr/bin/chmod o+r {self.slurm_output_file}")
                    self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
                                                     wrapper_add=f"/usr/bin/cp /dev/stdin  "
                                                                 f"{self.job.clone_path}/chmodPath{self.job.jobid}.sh",
                                                     script=f"{self.job.scripts_path}/chmodPath.sh")
                    self.executor.management_handler(helper_script=f"{self.job.scripts_path}/execHelper.sh",
                                                     wrapper_add=f"{self.job.shell_path} "
                                                                 f"{self.job.clone_path}/chmodPath{self.job.jobid}.sh",
                                                     script=f"{self.slurm_output_dir}")
                else:
                    os.makedirs(self.slurm_output_dir, exist_ok=True)
                    os.system(f'touch {self.slurm_output_file}')
    
        def inbetween_processing(self):
            async_process(self.slurm_output_file)
    
        def run_main_script(self):
            self.executor.run_batched(params=self.get_run_parameters(),
                                      script=self.get_run_script())
    
        def get_post_run_script(self):
            return Slurm.get_run_script(self)
    
        def get_post_run_parameters(self):
            return Slurm.get_run_parameters(self)
    
        def get_post_run_wrapper(self):
            return Slurm.get_run_wrapper(self)
    
        def run_post_script(self):
            self.executor.run_direct(params=self.get_post_run_parameters(), wrapper_add=self.get_post_run_wrapper(),
                                     script=self.get_post_run_script())
    
        def get_run_script(self):
            tmp = os.getenv("TMP")
            with open(f'{self.job.scripts_path}/batchWrapper.sh', 'r') as file:
                filedata = file.read()
            filedata = filedata.replace('replaceme', f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}')
            with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file:
                file.write(filedata)
            return f'{tmp}/wrapper{self.job.jobid}'
    
    
    class Batch(Slurm, ABC):
    
        def __init__(self, job):
            Slurm.__init__(self, job, )
    
        def get_run_parameters(self):
            batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
            parameters = ' '.join([f'{srun_path}', get_batch_properties(self, batch_script)])
            print('Warning: The contents of the script section in the CI definition '
                  'will be used as a post-processing script in the batch mode.')
            return parameters
    
        def get_run_wrapper(self):
            return f'{self.job.shell_path}'
    
        def get_run_script(self):
            return f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
    
        def get_post_run_script(self):
            return Slurm.get_run_script(self)
    
        def get_post_run_parameters(self):
            return Slurm.get_run_parameters(self)
    
        def get_post_run_wrapper(self):
            return Slurm.get_run_wrapper(self)
    
        def run_post_script(self):
            self.executor.run_direct(params=self.get_post_run_parameters(), wrapper_add=self.get_post_run_wrapper(),
                                     script=self.get_post_run_script())
    
    
    class Default(Sbatch, ABC):
    
        def get_run_parameters(self):
            parameter_string = self.job.get_parameters()
            parameters = f'--wait --output={self.slurm_output_file} {parameter_string}'
            return parameters
    
        def get_run_script(self):
            self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
                                             wrapper_add=f"/usr/bin/cp /dev/stdin {self.job.clone_path}/script.",
                                             script=f"{self.job.exec_script}")
            tmp = os.getenv("TMP")
            with open(f'{self.job.scripts_path}/batchWrapper.sh', 'r') as file:
                filedata = file.read()
            filedata = filedata.replace('replaceme', f'{self.job.clone_path}/script.')
            with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file:
                file.write(filedata)
            return f'{tmp}/wrapper{self.job.jobid}'
    
        def run_post_script(self):
            pass