From 4f48e7eb9e31f886e5032b87b9efaa3a17ab515d Mon Sep 17 00:00:00 2001 From: Adrian Schmitz <a.schmitz@itc.rwth-aachen.de> Date: Mon, 9 Jan 2023 10:27:33 +0100 Subject: [PATCH] merge fix Signed-off-by: Adrian Schmitz <a.schmitz@itc.rwth-aachen.de> --- core/job.py | 77 +++++++++++++++++++++++++---------------------------- 1 file changed, 37 insertions(+), 40 deletions(-) diff --git a/core/job.py b/core/job.py index d7fc227..8325351 100644 --- a/core/job.py +++ b/core/job.py @@ -141,12 +141,13 @@ class Job: self.log_path = f'{self.log_path}/{self.account}/{pipeline_creation_day_utc}/{self.pipeline_id}' os.makedirs(self.log_path, exist_ok=True) logging.basicConfig(filename=f'{self.log_path}/{self.jobid}.log', - format=f'[%(asctime)s] [%(filename)s:%(lineno)s:%(funcName)s] %(levelname)s:%(message)s', + format=f'[%(asctime)s] [%(filename)s:%(lineno)s:%(funcName)s] %(levelname)s:%(message)s', datefmt='%Y-%m-%d %H:%M:%S', encoding='utf-8', level=defines.log_level) logging.getLogger('filelock').setLevel(logging.INFO) - if not get_cenv('CI_LOG_STDOUT', 'False') in ['0', 'NULL', 'False', 'false', 'FALSE'] and defines.stdout_logging and self.args[1] != 'config': + if not get_cenv('CI_LOG_STDOUT', 'False') in ['0', 'NULL', 'False', 'false', + 'FALSE'] and defines.stdout_logging and self.args[1] != 'config': logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) def __init__(self, args, driver_path): @@ -156,7 +157,7 @@ class Job: self.mode_name = get_cenv('CI_MODE', 'Slurm').strip() self.__setup() logging.info(f'Started step {self.args[1]} with args={self.args}') - + self._mode = modes.get_mode(self) if self.args[1] == 'config': self.config() @@ -180,7 +181,7 @@ class Job: self._mode.custom_prepare() def config(self): - # Allocate the Slurm job here already to get set build dir to Slurm's tmp + # Allocate the Slurm job here already to get set build dir to Slurm's tmp _build_path = self.build_path _cache_path = self.cache_path _builds_dir_is_shared = True @@ -189,20 +190,20 @@ class Job: _build_path = f'{self._mode.tmp_dir}/builds' _cache_path = f'{self._mode.tmp_dir}/cache' _builds_dir_is_shared = False - builder = { 'builds_dir' : _build_path, - 'cache_dir': _cache_path, - 'builds_dir_is_shared': _builds_dir_is_shared, - 'hostname': 'custom-hostname', - 'driver': {'name': defines.name, 'version': defines.version}, - 'job_env': {'GIT_DEPTH': '1'} } + builder = {'builds_dir': _build_path, + 'cache_dir': _cache_path, + 'builds_dir_is_shared': _builds_dir_is_shared, + 'hostname': 'custom-hostname', + 'driver': {'name': defines.name, 'version': defines.version}, + 'job_env': {'GIT_DEPTH': '1'}} print(json.dumps(builder)) logging.debug(builder) pipeline_ci_variables = ["CI_PROJECT_ID", "CI_PROJECT_PATH", "GITLAB_USER_NAME", - "GITLAB_USER_ID", "CI_PIPELINE_ID", "CI_PIPELINE_URL", - "CI_JOB_STARTED_AT", "CI_PIPELINE_SOURCE", "CI_COMMIT_SHA", - "CI_COMMIT_AUTHOR", "GITLAB_USER_EMAIL", "CI_COMMIT_BRANCH"] + "GITLAB_USER_ID", "CI_PIPELINE_ID", "CI_PIPELINE_URL", + "CI_JOB_STARTED_AT", "CI_PIPELINE_SOURCE", "CI_COMMIT_SHA", + "CI_COMMIT_AUTHOR", "GITLAB_USER_EMAIL", "CI_COMMIT_BRANCH"] pipeline_info = {k: get_cenv(k) for k in pipeline_ci_variables} pipeline_info['cluser_account'] = self.account pipeline_info_path = f'{self.log_path}/pipeline_info.json' @@ -213,21 +214,16 @@ class Job: logging.info(f'ci_mode: {get_cenv("CI_MODE")}') logging.info(f'ci_job_id: {get_cenv("CI_JOB_ID")}') - def generate_sudo_env(self, skip_env=False): - if not skip_env and self.custom_env: - logging.debug(f'custom_env size = {self.custom_env.__sizeof__()}, len = {len(self.custom_env)}') - env_string = ' ' - logging.debug(self.custom_env) - for x in self.custom_env: - env_string += x + "=" + self.custom_env[x] + ' ' - return env_string + ' ' - return '' - + def get_parameters(self): + parameters = {k: v for k, v in os.environ.items() if k.startswith("CUSTOM_ENV_SLURM_PARAM")} + parameter_string = '' + for _, v in parameters.items(): + parameter_string += f'{v} ' + return parameter_string def popen_wrapper(self, run_properties, run_script, helper_script, skip_env=False, **kwargs): if self.down_scoping: - logging.info(f'custom_env is "{self.generate_sudo_env(skip_env=skip_env)}"') - run_properties = f'sudo {self.generate_sudo_env(skip_env=skip_env)}-u {self.account} ' + run_properties + run_properties = f'sudo -u {self.account} ' + run_properties os.chdir("/work") command = [] if helper_script: @@ -244,13 +240,13 @@ class Job: logging.info(f'Executing command: {str(command)}') os.chdir('/tmp') return subprocess.Popen(command, - env=(dict(os.environ, **{x: self.custom_env[x] + env=(dict(os.environ, **{x: self.custom_env[x] for x in self.custom_env}) if not skip_env - else os.environ), - **kwargs) - + else os.environ), + **kwargs) - def execute(self, run_properties, run_script, helper_script, install_env=False ,skip_env=False, script_execution=False, do_inbetween_processing=False, srun_wrap=False, **kwargs): + def execute(self, run_properties, run_script, helper_script, install_env=False, skip_env=False, + script_execution=False, do_inbetween_processing=False, srun_wrap=False, **kwargs): if self.down_scoping: wrapper = '' if srun_wrap: @@ -258,22 +254,23 @@ class Job: wrapper += (f' --jobid={self._mode.slurm_simple_job_id} ' if self._mode.slurm_simple_job_id else ' ') if install_env: wrapper += f' --export=CUSTOM_SHELL_CONFIG={self.shell_config}' - run_properties = f'sudo {self.generate_sudo_env(skip_env=skip_env)} -u {self.account} {wrapper} {run_properties}' + run_properties = f'sudo -u {self.account} {wrapper} {run_properties}' os.chdir("/work") command = [helper_script] if run_properties == '': command.extend([run_script]) else: command.extend([run_properties, run_script]) - self._mode.custom_run_setup(install_env=install_env ,skip_env=skip_env, script_execution=script_execution, do_inbetween_processing=do_inbetween_processing, srun_wrap=srun_wrap,**kwargs) + self._mode.custom_run_setup(install_env=install_env, skip_env=skip_env, script_execution=script_execution, + do_inbetween_processing=do_inbetween_processing, srun_wrap=srun_wrap, **kwargs) logging.info(f'Executing with env: {str(self.custom_env)}') logging.info(f'Executing command: {str(command)}') os.chdir('/tmp') main_proc = subprocess.Popen(command, - env=(dict(os.environ, **{x: self.custom_env[x] - for x in self.custom_env}) if not skip_env - else os.environ), - **kwargs) + env=(dict(os.environ, **{x: self.custom_env[x] + for x in self.custom_env}) if not skip_env + else os.environ), + **kwargs) if script_execution and do_inbetween_processing: side_proc = self._mode.inbetween_processing(main_proc) stdout, stderr = main_proc.communicate() @@ -286,14 +283,13 @@ class Job: cmd_return_code = 1 if (int(main_proc.returncode) != 0 or job_was_canceled) else 0 if script_execution and (job_was_canceled or not self.allow_failure): utility.update_json_kv(self.error_code_file, self.jobid, cmd_return_code) - if script_execution and cmd_return_code != 0: + if script_execution and cmd_return_code != 0: if self._mode and not self.allow_failure: self._mode.cleanup_on_failure() exit(cmd_return_code) - #FIXME: do not rely on internal implementation of subprocess.run + # FIXME: do not rely on internal implementation of subprocess.run return subprocess.CompletedProcess(main_proc.args, main_proc.returncode, stdout, stderr) - def run(self): if not self.allow_failure: self.error_code_file = f'{self.stage_tmp_dir}/{self.args[3]}.json' @@ -312,7 +308,8 @@ class Job: run_properties += self._mode.get_run_properties() run_script = self._mode.get_run_script() do_inbetween_processing = True - self.execute(run_properties, run_script, command, script_execution=True, do_inbetween_processing=do_inbetween_processing) + self.execute(run_properties, run_script, command, script_execution=True, + do_inbetween_processing=do_inbetween_processing) if self.args[3] in ['build_script', 'step_script'] and self._mode.has_post_run_script(): command = f"{self.driver_path}/core/scripts/pipeHelper.sh" -- GitLab