Skip to content
Snippets Groups Projects
Commit 86eb1178 authored by Felix Tomski's avatar Felix Tomski
Browse files

Fix ssh mode

parent af56a23b
No related branches found
No related tags found
No related merge requests found
......@@ -22,11 +22,11 @@ variables:
.downscope-template:
variables:
RUNNER_TAG: "downscope2"
RUNNER_TAG: "testing"
.local-template:
variables:
RUNNER_TAG: "custom2"
RUNNER_TAG: "ja664344-dev"
.run-template:
stage: run
......
......@@ -41,7 +41,7 @@ class Sbatch(Slurm, ABC):
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
wrapper_add=f"/usr/bin/cp /dev/stdin "
f"{self.job.clone_path}/chmodPath{self.job.jobid}.sh",
script=f"{self.job.runner_path}/core/scripts/chmodPath.sh")
script=f"{self.job.scripts_path}/chmodPath.sh")
self.executor.management_handler(helper_script=f"{self.job.scripts_path}/execHelper.sh",
wrapper_add=f"{self.job.shell_path} "
f"{self.job.clone_path}/chmodPath{self.job.jobid}.sh",
......
from core.modes.common import *
from core.modes.base import ModeBase
from core.utility.executor import SshExecutor
class SSH(ModeBase):
......@@ -8,6 +9,7 @@ class SSH(ModeBase):
self.dest_node = get_cenv('CI_SSH_HOST')
if not self.dest_node:
ModeBase.abort(self, "Using ssh mode but no node specified. Specify: CI_SSH_HOST")
self.executor = SshExecutor(job, job.down_scoping)
def get_env_setup(self):
return f' {self.job.driver_path}/core/scripts/ssh.env '
......@@ -24,8 +26,15 @@ class SSH(ModeBase):
self._combiner_script = f"{self.job.driver_path}/core/scripts/xPipeHelper.sh"
return self._combiner_script
def get_simple_script_exec(self):
return f"ssh -T {self.dest_node}"
def run_simple_script(self):
out = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
params=f'-T {self.dest_node}',
script=self.get_simple_run_script())
def run_main_script(self):
out = self.executor.run_direct(params=f'-T {self.dest_node}', script=self.get_run_script())
print(out)
def cleanup(self):
ModeBase.cleanup(self)
......
......@@ -10,52 +10,33 @@ def async_process(file):
class Executor(ABC):
downscope_add = ""
simple_job_id = ""
def __init__(self, job, downscope=False):
self.job = job
if downscope:
self.downscope_add = f"sudo -u {self.job.account}"
def set_simple_job_id(self, job_id):
self.simple_job_id = job_id
# Allocates a batch job with optional user specifications and returns the string
@abstractmethod
def allocator(self, params=""):
pass
# Executes internal management functions, e.g., setup scripts etc.
@abstractmethod
def management_handler(self, params="", wrapper_add="", script=""):
pass
# Cancels a batch job based on its id
@abstractmethod
def cancel(self, jobid):
pass
# runs a script in the batch system with direct output
@abstractmethod
def run_direct(self, params="", wrapper_add="", script=""):
pass
# runs a script without direct output, e.g., a batch script or multinode jobs
@abstractmethod
def run_batched(self, params="", wrapper_add="", script=""):
pass
def execute(self, helper_script='', allocator='', params='', wrapper_add='',
def execute(self, helper_script='', allocator='', params='', wrapper_add='', pre_exec_scripts=[],
target_script='', skip_env=False, run_async=False, main_script=False, install_env=False, **kwargs):
if main_script:
self.job.mode.custom_run_setup(main_script=main_script, run_async=run_async, **kwargs)
logging.info(f'Executing with env: {str(self.job.custom_env)}')
else:
params += (f' --jobid={self.job.mode.slurm_simple_job_id} ' if self.job.mode.slurm_simple_job_id else ' ')
if install_env:
params += f' --export=CUSTOM_SHELL_CONFIG={self.job.shell_config}'
command = [helper_script, f'{self.downscope_add} {allocator} {params} {wrapper_add}',
f'{target_script}']
command = [helper_script]
command.extend([f'{self.downscope_add} {allocator} {params} {wrapper_add}'])
command.extend(pre_exec_scripts)
command.append(target_script)
logging.info(f'Executing command: {str(command)}')
os.chdir('/tmp')
main_proc = subprocess.Popen(command,
......@@ -91,6 +72,7 @@ class Slurm_Executor(Executor, ABC):
sbatch_path = "sbatch" # "/usr/local_host/bin/sbatch"
salloc_path = "/opt/slurm/current/bin/salloc"
scancel_path = "scancel"
simple_job_id = ""
def _get_slurm_cmd(self, base):
add_args = ''
......@@ -195,3 +177,30 @@ class Slurm_Executor(Executor, ABC):
main_script=True, run_async=True,
text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
logging.debug(f'sbatch output: {sbatch_out}')
class SshExecutor(Executor, ABC):
def __init__(self, job, downscope=False):
Executor.__init__(self, job, downscope=downscope)
def run_direct(self, params="", wrapper_add="", script=""):
self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh",
allocator='ssh',
params=params,
target_script=script,
wrapper_add=wrapper_add, main_script=True)
return ''
def management_handler(self, helper_script="", params="", wrapper_add="", script="", install_env=False):
if helper_script == '':
helper_script = f"{self.job.scripts_path}/runHelper.sh"
management_out = self.execute(helper_script=helper_script,
allocator='ssh',
params=params,
wrapper_add=wrapper_add,
pre_exec_scripts=[f'{self.job.scripts_path}/ssh.env'],
target_script=script, install_env=install_env,
text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
logging.debug(management_out)
return management_out
......@@ -2,7 +2,7 @@
variables:
CI_MODE: "SSH"
.ssh-build-job:
ssh-build-job:
extends: .ssh-job
stage: build
script:
......@@ -16,5 +16,5 @@
- path.env
parallel:
matrix:
- CI_SSH_HOST: ['login18-1', 'login18-beta']
- CI_SSH_HOST: ['login18-1']
......@@ -197,6 +197,7 @@ build-job: # This job runs in the build stage, which runs first.
variables:
SLURM_PARAM_CPUS: "-c 2"
script:
- module list
- echo "Compiling the code..."
- echo "Compile complete."
......
......@@ -16,5 +16,7 @@
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=2
module list
module load Python
echo 'Hello World'
......@@ -9,6 +9,8 @@
#SBATCH --nodes=1
#SBATCH --ntasks=1
module list
module load Python
for i in $(seq 20);
do
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment