Something went wrong on our end
Select Git revision
-
Felix Tomski authoredFelix Tomski authored
batch.py 6.05 KiB
from core.modes.common import *
from .srun import Slurm
from core.modes.base import ModeBase
from ...utility.executor import async_process
def get_batch_properties(self, batch_script):
# cmd_out = ""
if self.job.down_scoping:
stdout = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
wrapper_add=f"/usr/bin/cat {batch_script}")
cmd_out = ' '.join([l.split()[1] for l in stdout.split("\n") if l.startswith('#SBATCH')])
else:
with open(batch_script, 'r') as f:
cmd_out = ' '.join([l.split()[1] for l in f.readlines() if l.startswith('#SBATCH')])
return cmd_out
class Sbatch(Slurm, ABC):
def __init__(self, job):
Slurm.__init__(self, job)
self.slurm_output_dir = f'{self.job.clone_path}/slurm_output'
self.slurm_output_file = f'{self.slurm_output_dir}/so_{self.job.jobid}.txt'
def get_run_parameters(self):
batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
parameters = f'--wait {get_batch_properties(self, batch_script)} {self.job.get_parameters()} ' \
f'--output={self.slurm_output_file}'
return parameters
def custom_run_setup(self, **kwargs):
if kwargs["main_script"] and kwargs["run_async"]:
logging.debug('Creating so file')
if self.job.down_scoping:
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
wrapper_add=f"/usr/bin/mkdir -p {self.slurm_output_dir}")
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
wrapper_add=f"/usr/bin/touch {self.slurm_output_file}")
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh",
wrapper_add=f"/usr/bin/chmod o+r {self.slurm_output_file}")
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
wrapper_add=f"/usr/bin/cp /dev/stdin "
f"{self.job.clone_path}/chmodPath{self.job.jobid}.sh",
script=f"{self.job.scripts_path}/chmodPath.sh")
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/execHelper.sh",
wrapper_add=f"{self.job.shell_path} "
f"{self.job.clone_path}/chmodPath{self.job.jobid}.sh",
script=f"{self.slurm_output_dir}")
else:
os.makedirs(self.slurm_output_dir, exist_ok=True)
os.system(f'touch {self.slurm_output_file}')
def inbetween_processing(self):
async_process(self.slurm_output_file)
def run_main_script(self):
self.executor.run_batched(params=self.get_run_parameters(),
script=self.get_run_script())
def get_post_run_script(self):
return Slurm.get_run_script(self)
def get_post_run_parameters(self):
return Slurm.get_run_parameters(self)
def get_post_run_wrapper(self):
return Slurm.get_run_wrapper(self)
def run_post_script(self):
self.executor.run_direct(params=self.get_post_run_parameters(), wrapper_add=self.get_post_run_wrapper(),
script=self.get_post_run_script())
def get_run_script(self):
tmp = os.getenv("TMP")
with open(f'{self.job.scripts_path}/batchWrapper.sh', 'r') as file:
filedata = file.read()
filedata = filedata.replace('replaceme', f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}')
with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file:
file.write(filedata)
return f'{tmp}/wrapper{self.job.jobid}'
class Batch(Slurm, ABC):
def __init__(self, job):
Slurm.__init__(self, job, )
def get_run_parameters(self):
batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
parameters = ' '.join([f'{srun_path}', get_batch_properties(self, batch_script)])
print('Warning: The contents of the script section in the CI definition '
'will be used as a post-processing script in the batch mode.')
return parameters
def get_run_wrapper(self):
return f'{self.job.shell_path}'
def get_run_script(self):
return f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}'
def get_post_run_script(self):
return Slurm.get_run_script(self)
def get_post_run_parameters(self):
return Slurm.get_run_parameters(self)
def get_post_run_wrapper(self):
return Slurm.get_run_wrapper(self)
def run_post_script(self):
self.executor.run_direct(params=self.get_post_run_parameters(), wrapper_add=self.get_post_run_wrapper(),
script=self.get_post_run_script())
class Default(Sbatch, ABC):
def get_run_parameters(self):
parameter_string = self.job.get_parameters()
parameters = f'--wait --output={self.slurm_output_file} {parameter_string}'
return parameters
def get_run_script(self):
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
wrapper_add=f"/usr/bin/cp /dev/stdin {self.job.clone_path}/script.",
script=f"{self.job.exec_script}")
tmp = os.getenv("TMP")
with open(f'{self.job.scripts_path}/batchWrapper.sh', 'r') as file:
filedata = file.read()
filedata = filedata.replace('replaceme', f'{self.job.clone_path}/script.')
with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file:
file.write(filedata)
return f'{tmp}/wrapper{self.job.jobid}'
def run_post_script(self):
pass