diff --git a/core/job.py b/core/job.py
index d7fc2273750390ad71a7d40dff3de8092dd9dbd0..8325351e957cbf18be3793786268ebdc6efac83c 100644
--- a/core/job.py
+++ b/core/job.py
@@ -141,12 +141,13 @@ class Job:
self.log_path = f'{self.log_path}/{self.account}/{pipeline_creation_day_utc}/{self.pipeline_id}'
os.makedirs(self.log_path, exist_ok=True)
logging.basicConfig(filename=f'{self.log_path}/{self.jobid}.log',
- format=f'[%(asctime)s] [%(filename)s:%(lineno)s:%(funcName)s] %(levelname)s:%(message)s',
+ format=f'[%(asctime)s] [%(filename)s:%(lineno)s:%(funcName)s] %(levelname)s:%(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
encoding='utf-8',
level=defines.log_level)
logging.getLogger('filelock').setLevel(logging.INFO)
- if not get_cenv('CI_LOG_STDOUT', 'False') in ['0', 'NULL', 'False', 'false', 'FALSE'] and defines.stdout_logging and self.args[1] != 'config':
+ if not get_cenv('CI_LOG_STDOUT', 'False') in ['0', 'NULL', 'False', 'false',
+ 'FALSE'] and defines.stdout_logging and self.args[1] != 'config':
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
def __init__(self, args, driver_path):
@@ -156,7 +157,7 @@ class Job:
self.mode_name = get_cenv('CI_MODE', 'Slurm').strip()
self.__setup()
logging.info(f'Started step {self.args[1]} with args={self.args}')
-
+
self._mode = modes.get_mode(self)
if self.args[1] == 'config':
self.config()
@@ -180,7 +181,7 @@ class Job:
self._mode.custom_prepare()
def config(self):
- # Allocate the Slurm job here already to get set build dir to Slurm's tmp
+ # Allocate the Slurm job here already to get set build dir to Slurm's tmp
_build_path = self.build_path
_cache_path = self.cache_path
_builds_dir_is_shared = True
@@ -189,20 +190,20 @@ class Job:
_build_path = f'{self._mode.tmp_dir}/builds'
_cache_path = f'{self._mode.tmp_dir}/cache'
_builds_dir_is_shared = False
- builder = { 'builds_dir' : _build_path,
- 'cache_dir': _cache_path,
- 'builds_dir_is_shared': _builds_dir_is_shared,
- 'hostname': 'custom-hostname',
- 'driver': {'name': defines.name, 'version': defines.version},
- 'job_env': {'GIT_DEPTH': '1'} }
+ builder = {'builds_dir': _build_path,
+ 'cache_dir': _cache_path,
+ 'builds_dir_is_shared': _builds_dir_is_shared,
+ 'hostname': 'custom-hostname',
+ 'driver': {'name': defines.name, 'version': defines.version},
+ 'job_env': {'GIT_DEPTH': '1'}}
print(json.dumps(builder))
logging.debug(builder)
pipeline_ci_variables = ["CI_PROJECT_ID", "CI_PROJECT_PATH", "GITLAB_USER_NAME",
- "GITLAB_USER_ID", "CI_PIPELINE_ID", "CI_PIPELINE_URL",
- "CI_JOB_STARTED_AT", "CI_PIPELINE_SOURCE", "CI_COMMIT_SHA",
- "CI_COMMIT_AUTHOR", "GITLAB_USER_EMAIL", "CI_COMMIT_BRANCH"]
+ "GITLAB_USER_ID", "CI_PIPELINE_ID", "CI_PIPELINE_URL",
+ "CI_JOB_STARTED_AT", "CI_PIPELINE_SOURCE", "CI_COMMIT_SHA",
+ "CI_COMMIT_AUTHOR", "GITLAB_USER_EMAIL", "CI_COMMIT_BRANCH"]
pipeline_info = {k: get_cenv(k) for k in pipeline_ci_variables}
pipeline_info['cluser_account'] = self.account
pipeline_info_path = f'{self.log_path}/pipeline_info.json'
@@ -213,21 +214,16 @@ class Job:
logging.info(f'ci_mode: {get_cenv("CI_MODE")}')
logging.info(f'ci_job_id: {get_cenv("CI_JOB_ID")}')
- def generate_sudo_env(self, skip_env=False):
- if not skip_env and self.custom_env:
- logging.debug(f'custom_env size = {self.custom_env.__sizeof__()}, len = {len(self.custom_env)}')
- env_string = ' '
- logging.debug(self.custom_env)
- for x in self.custom_env:
- env_string += x + "=" + self.custom_env[x] + ' '
- return env_string + ' '
- return ''
-
+ def get_parameters(self):
+ parameters = {k: v for k, v in os.environ.items() if k.startswith("CUSTOM_ENV_SLURM_PARAM")}
+ parameter_string = ''
+ for _, v in parameters.items():
+ parameter_string += f'{v} '
+ return parameter_string
def popen_wrapper(self, run_properties, run_script, helper_script, skip_env=False, **kwargs):
if self.down_scoping:
- logging.info(f'custom_env is "{self.generate_sudo_env(skip_env=skip_env)}"')
- run_properties = f'sudo {self.generate_sudo_env(skip_env=skip_env)}-u {self.account} ' + run_properties
+ run_properties = f'sudo -u {self.account} ' + run_properties
os.chdir("/work")
command = []
if helper_script:
@@ -244,13 +240,13 @@ class Job:
logging.info(f'Executing command: {str(command)}')
os.chdir('/tmp')
return subprocess.Popen(command,
- env=(dict(os.environ, **{x: self.custom_env[x]
+ env=(dict(os.environ, **{x: self.custom_env[x]
for x in self.custom_env}) if not skip_env
- else os.environ),
- **kwargs)
-
+ else os.environ),
+ **kwargs)
- def execute(self, run_properties, run_script, helper_script, install_env=False ,skip_env=False, script_execution=False, do_inbetween_processing=False, srun_wrap=False, **kwargs):
+ def execute(self, run_properties, run_script, helper_script, install_env=False, skip_env=False,
+ script_execution=False, do_inbetween_processing=False, srun_wrap=False, **kwargs):
if self.down_scoping:
wrapper = ''
if srun_wrap:
@@ -258,22 +254,23 @@ class Job:
wrapper += (f' --jobid={self._mode.slurm_simple_job_id} ' if self._mode.slurm_simple_job_id else ' ')
if install_env:
wrapper += f' --export=CUSTOM_SHELL_CONFIG={self.shell_config}'
- run_properties = f'sudo {self.generate_sudo_env(skip_env=skip_env)} -u {self.account} {wrapper} {run_properties}'
+ run_properties = f'sudo -u {self.account} {wrapper} {run_properties}'
os.chdir("/work")
command = [helper_script]
if run_properties == '':
command.extend([run_script])
else:
command.extend([run_properties, run_script])
- self._mode.custom_run_setup(install_env=install_env ,skip_env=skip_env, script_execution=script_execution, do_inbetween_processing=do_inbetween_processing, srun_wrap=srun_wrap,**kwargs)
+ self._mode.custom_run_setup(install_env=install_env, skip_env=skip_env, script_execution=script_execution,
+ do_inbetween_processing=do_inbetween_processing, srun_wrap=srun_wrap, **kwargs)
logging.info(f'Executing with env: {str(self.custom_env)}')
logging.info(f'Executing command: {str(command)}')
os.chdir('/tmp')
main_proc = subprocess.Popen(command,
- env=(dict(os.environ, **{x: self.custom_env[x]
- for x in self.custom_env}) if not skip_env
- else os.environ),
- **kwargs)
+ env=(dict(os.environ, **{x: self.custom_env[x]
+ for x in self.custom_env}) if not skip_env
+ else os.environ),
+ **kwargs)
if script_execution and do_inbetween_processing:
side_proc = self._mode.inbetween_processing(main_proc)
stdout, stderr = main_proc.communicate()
@@ -286,14 +283,13 @@ class Job:
cmd_return_code = 1 if (int(main_proc.returncode) != 0 or job_was_canceled) else 0
if script_execution and (job_was_canceled or not self.allow_failure):
utility.update_json_kv(self.error_code_file, self.jobid, cmd_return_code)
- if script_execution and cmd_return_code != 0:
+ if script_execution and cmd_return_code != 0:
if self._mode and not self.allow_failure:
self._mode.cleanup_on_failure()
exit(cmd_return_code)
- #FIXME: do not rely on internal implementation of subprocess.run
+ # FIXME: do not rely on internal implementation of subprocess.run
return subprocess.CompletedProcess(main_proc.args, main_proc.returncode, stdout, stderr)
-
def run(self):
if not self.allow_failure:
self.error_code_file = f'{self.stage_tmp_dir}/{self.args[3]}.json'
@@ -312,7 +308,8 @@ class Job:
run_properties += self._mode.get_run_properties()
run_script = self._mode.get_run_script()
do_inbetween_processing = True
- self.execute(run_properties, run_script, command, script_execution=True, do_inbetween_processing=do_inbetween_processing)
+ self.execute(run_properties, run_script, command, script_execution=True,
+ do_inbetween_processing=do_inbetween_processing)
if self.args[3] in ['build_script', 'step_script'] and self._mode.has_post_run_script():
command = f"{self.driver_path}/core/scripts/pipeHelper.sh"