diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6a6fc6f503216378ef2cb52dd4127f3a437a5157..3af6cf4ecd130dd55ae9e822d75c113103894319 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -27,21 +27,29 @@ variables: .local-template: variables: RUNNER_TAG: "custom2" - # CI_LOG_STDOUT: "0" .run-template: stage: run trigger: - include: .template.yml + include: "/utility/.gitlab/.template.yml" strategy: depend .deploy-template: # This job runs in the deploy stage. stage: deploy # It only runs when *both* jobs in the test stage complete successfully. script: - - zip -r Runner.zip driver.py core/ JSONAccountManager.py + - zip -r Runner.zip driver.py core/ utility/runtime/scripts JSONAccountManager.py + - module load Python + - python -m venv venv + - source venv/bin/activate + - pip install -r requirements.txt + - pip install -r utility/install/requirements-static.txt + - mkdir -p $TMP/deploy_build + - $HOME/.local/bin/pyinstaller --onefile --clean --strip --workpath=$TMP/deploy_build driver.py + - staticx dist/driver static_driver.exe artifacts: paths: - Runner.zip + - static_driver.exe tags: - $RUNNER_TAG @@ -53,18 +61,12 @@ downscope-test: local-test: trigger: include: - - .template.yml - - .localTemplate.yml + - "/utility/.gitlab/.template.yml" + - "/utility/.gitlab/.localTemplate.yml" extends: - .local-template - .run-template -downscope-deploy: - extends: - - .downscope-template - - .deploy-template - needs: ["downscope-test"] - local-deploy: extends: - .local-template diff --git a/Installer.py b/Installer.py old mode 100644 new mode 100755 index 05a6909505738c4df2b2037a2cef345f3a3990b7..652af03a6c43997a02fc3ad2903a76ef6eba2b96 --- a/Installer.py +++ b/Installer.py @@ -1,167 +1,219 @@ +#!/usr/bin/env python3 import argparse -import json import os +import sys +import subprocess -import rsa -mainpath = os.getcwd() -args = "" +def add_parameters(parser, param_list): + for p in param_list: + if p.get("action"): + parser.add_argument( + *p.get("flags"), + required=p.get("required"), + default=p.get("default"), + action=p.get("action"), + help=p.get("help") + ) + else: + parser.add_argument( + *p.get("flags"), + required=p.get("required"), + type=p.get("type"), + choices=p.get("choices"), + nargs=p.get("nargs"), + default=p.get("default"), + const=p.get("const"), + metavar=p.get("metavar"), + help=p.get("help") + ) # Define Commandline interface def CLI(): - parameters = [ + subcommands = { + 'install': { + 'aliases': ['i'], + 'f': install + }, + 'remove': { + 'aliases': ['rm'], + 'f': uninstall + }, + 'update': { + 'aliases': ['u'], + 'f': update + }, + 'register': { + 'aliases': ['r'], + 'f': add_registration + }, + } + parameters = { + 'install': [ { - "flags": ['-i', '--install'], + "flags": ['-se', '--static-exe'], "action": "store_true", - "help": "Install a gitlab runner." + "help": "Make the custom executor a static executable." }, { - "flags": ['-t', '--registration-token'], - "type": str, - "metavar": "<token>", - "help": "The registration token provided by the Gitlab Instance." + "flags": ['-scc', '--static-contain-config'], + "action": "store_true", + "help": "Include the config inside the static executable" }, { - "flags": ["-url", "--registration-url"], + "flags": ["-mp", "--mapping-path"], "type": str, - "metavar": "<url>", - "help": "Url to the used Gitlab Instance." + "metavar": "<name>", + "default": "Assignments.txt", + "help": "The path where the user mapping is stored." }, { - "flags": ["-n", "--runner-name"], - "type": str, - "metavar": "<name>", - "default": "Cluster-Runner", - "help": "The name of the Gitlab Runner." + "flags": ["-d", "--downscope"], + "action": "store_true", + "help": "Activate the downscoping mechanism." }, { - "flags": ["-tag", "--tag-list"], + "flags": ["-sf", "--shell-file"], "type": str, - "metavar": "<tags>", - "default": "cluster", - "help": "A list of tags for the GitLab Runner." + "metavar": "<name>", + "default": ".zshrc", + "help": "The configuration file of your systems default shell." }, { - "flags": ["-ip", "--install-Path"], + "flags": ["-aes", "--aes-key-path"], "type": str, "metavar": "<name>", - "default": "$HOME/Runner", - "help": "The path where the runner is installed." + "default": "Keyfile", + "help": "The path to the aes key file." }, { - "flags": ["-up", "--user-Path"], + "flags": ["-s", "--shell-installation"], "type": str, "metavar": "<name>", - "default": "Runner", - "help": "The path where the user data is stored." + "default": "/usr/bin/env bash", + "help": "The path to the preferred shell to be used." }, { - "flags": ["-kp", "--key-Path"], + "flags": ["-kp", "--key-path"], "type": str, "metavar": "<name>", "default": "id_rsa", "help": "The path where the key for decrypting the mapping is stored." }, { - "flags": ["-lp", "--log-Path"], + "flags": ["-lp", "--log-path"], "type": str, "metavar": "<name>", - "default": None, + "default": 'logs', "help": "Path to the directory used to store logs." }, { - "flags": ["-pkp", "--public-key-Path"], + "flags": ["-pkp", "--public-key-path"], "type": str, "metavar": "<name>", "default": "id_rsa.pub", "help": "The path where the key for encrypting the mapping is stored." }, - { "flags": ["-cc", "--concurrency"], - "type": int, - "metavar": "<name>", - "default": "100", - "help": "Defines how many concurrent runners shall be available." - }, { - "flags": ["-mp", "--mapping-Path"], + "flags": ["-rup", "--rt-utility-path"], "type": str, "metavar": "<name>", - "default": "Assignments.txt", - "help": "The path where the user mapping is stored." + "default": "utility/runtime", + "help": "Path where utility files are located. Downscoped users need read access." }, { - "flags": ["-d", "--downscope"], - "action": "store_true", - "help": "Activate the downscoping mechanism." + "flags": ["-rip", "--runner-install-path"], + "type": str, + "metavar": "<name>", + "default": "utility/runtime/bin", + "help": "The path where the runner is installed." }, { - "flags": ["-u", "--update"], - "action": "store_true", - "help": "Update the Gitlab Runner installation." + "flags": ["-up", "--user-path"], + "type": str, + "metavar": "<name>", + "default": "/home/$USER/.cache/ci_driver", + "help": "The path where the user data is stored. Only $USER is expanded if downscoping is enabled. Downscoped user needs rw access." }, + ], + 'remove': [], + 'update': [], + 'register': [ { - "flags": ["-rm", "--remove"], - "action": "store_true", - "help": "Uninstall the Gitlab Runner." + "flags": ['-t', '--registration-token'], + "type": str, + "metavar": "<token>", + "help": "The registration token provided by the Gitlab Instance." }, { - "flags": ["-sf", "--shell-file"], + "flags": ["-url", "--registration-url"], "type": str, - "metavar": "<name>", - "default": ".zshrc", - "help": "The configuration file of your systems default shell." + "metavar": "<url>", + "help": "Url to the used Gitlab Instance." }, { - "flags": ["-rg", "--register"], - "action": "store_true", - "help": "Register a new runner with the same config." + "flags": ["-n", "--runner-name"], + "type": str, + "metavar": "<name>", + "default": "Cluster-Runner", + "help": "The name of the Gitlab Runner." }, { - "flags": ["-aes", "--aes-key-path"], + "flags": ["-tag", "--tag-list"], "type": str, - "metavar": "<name>", - "default": "Keyfile", - "help": "The path to the aes key file." + "metavar": "<tags>", + "default": "cluster", + "help": "A list of tags for the GitLab Runner." }, + { "flags": ["-cc", "--concurrency"], + "type": int, + "metavar": "<name>", + "default": "100", + "help": "Maximum number of jobs the runner will execute concurrently." + }, + ], + 'global': [ { - "flags": ["-s", "--shell-installation"], + "flags": ["-ip", "--install-path"], "type": str, "metavar": "<name>", - "default": "/usr/bin/env bash", - "help": "The path to the preferred shell to be used." - } + "default": "./install", + "help": "The path where the runner is installed." + }, ] + } - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser(prog='Aixcellenz CI Driver Installer') + add_parameters(parser, parameters['global']) + subparsers = parser.add_subparsers(help='sub-command help') - for p in parameters: - if p.get("action"): - parser.add_argument( - *p.get("flags"), - required=p.get("required"), - default=p.get("default"), - action=p.get("action"), - help=p.get("help") - ) - else: - parser.add_argument( - *p.get("flags"), - required=p.get("required"), - type=p.get("type"), - choices=p.get("choices"), - nargs=p.get("nargs"), - default=p.get("default"), - const=p.get("const"), - metavar=p.get("metavar"), - help=p.get("help") - ) + for cmd in subcommands: + subcmd_parser = subparsers.add_parser(cmd, help=f'{cmd} help', aliases=subcommands[cmd]['aliases']) + subcmd_parser.set_defaults(func=subcommands[cmd]['f']) + add_parameters(subcmd_parser, parameters[cmd]) - return parser.parse_args() + ret = parser.parse_args() + ret.install_path = os.path.abspath(os.path.expandvars(os.path.expanduser(ret.install_path))) + + # NOTE: Do *not* include the user path here. It is expanded at runtime. + args_to_expand = ['runner_install_path', 'shell_installation', 'mapping_path', + 'public_key_path', 'key_path', 'log_path', 'rt_utility_path'] + for arg in args_to_expand: + vars(ret)[arg] = os.path.expandvars(os.path.expanduser(vars(ret)[arg])) + if not os.path.isabs(vars(ret)[arg]): + vars(ret)[arg] = os.path.join(ret.install_path, vars(ret)[arg]) + + ret.shell_file = os.path.expandvars(os.path.expanduser(ret.shell_file)) + if not os.path.isabs(ret.shell_file): + ret.shell_file = os.path.join(ret.rt_utility_path, ret.shell_file) + + return ret # create initial keys def create_keys(priv_key_path, pub_key_path): + import rsa (pubkey, private_key) = rsa.newkeys(2048) with open(f"{pub_key_path}", "w") as text_file: text_file.write(pubkey.save_pkcs1().decode('ascii')) @@ -170,248 +222,156 @@ def create_keys(priv_key_path, pub_key_path): # create initial mapping -def create_mapping(priv_key_path, pub_key_path, map_path): +def create_mapping(priv_key_path, pub_key_path): create_keys(priv_key_path, pub_key_path) - with open(map_path, "w") as text_file: + with open(args.map_path, "w") as text_file: text_file.write('') # Perform the initial installation of the Gitlab runner -def doInstall(Token, Url, Name, Tags, install_Path, shell_file): - install_Path = install_Path.replace("$HOME", os.getenv("HOME")) - # Directory Setup - if not os.path.exists(install_Path): - os.mkdir(install_Path) - os.chdir(install_Path) - - os.system("pip install rsa") - os.system("pip install PyJWT") - os.system("pip install cryptography") - os.system("pip install pycryptodome") - os.system("pip install filelock") - - # Download GitLab Runner - os.system("curl -LJO \"https://gitlab-runner-downloads.s3.amazonaws.com/latest/rpm/gitlab-runner_amd64.rpm\"") - - # Install GitLab Runner Locally - os.system("rpm2cpio ./gitlab-runner_amd64.rpm | cpio -idmv") - - # Make GitLab Runner executable - os.system(f"chmod +x {install_Path}/usr/bin/gitlab-runner") - - # Register Gitlab Runner - os.system( - f'./usr/bin/gitlab-runner register --non-interactive --url "{Url}" --registration-token "{Token}"\ - --executor "custom" --description "{Name}" --tag-list "{Tags}" --run-untagged="false" --locked="false" ' - f'--access-level="not_protected"\ - --custom-prepare-exec="{install_Path}/main.sh" --custom-config-exec="{install_Path}/main.sh"\ - --custom-run-exec="{install_Path}/main.sh" --custom-cleanup-exec="{install_Path}/main.sh"\ - --custom-prepare-args="prepare" --custom-config-args="config"\ - --custom-run-args="run" --custom-cleanup-args="cleanup"') - - generateDriverConfig(install_Path, args.downscope, args.key_Path, args.mapping_Path, args.user_Path, - args.aes_key_path, args.shell_installation, args.log_Path, args.shell_file) - generateRunnerMain(install_Path) +def install(): + os.system("pip install -r requirements.txt") + + os.makedirs(args.rt_utility_path, exist_ok=True) + # In case we don't perform an in-source installation + # TODO: Install compiled python files + if os.path.dirname(os.path.abspath(sys.argv[0])) != args.install_path: + os.makedirs(args.install_path, exist_ok=True) + if not args.static_exe: + os.system(f'cp -r core driver.py {args.install_path}') + os.system(f'cp -r settings JSONAccountManager.py {args.install_path}') + print(f'Installing utility scripts to {args.rt_utility_path}') + os.system(f'cp -r utility/runtime/scripts {args.rt_utility_path}') + #os.chdir(args.install_path) + + if args.static_exe: + create_static_exe() + + _runner_install_path = args.runner_install_path + from shutil import which + if which("gitlab-runner") is None: + os.makedirs(args.runner_install_path, exist_ok=True) + os.system(f'curl -L --output {args.runner_install_path}/gitlab-runner "https://gitlab-runner-downloads.s3.amazonaws.com/latest/binaries/gitlab-runner-linux-amd64"') + os.system(f"chmod +x {args.runner_install_path}/gitlab-runner") + else: + _runner_install_path = os.path.dirname(os.path.abspath(which("gitlab-runner"))) - os.system(f"sed -i 's/concurrent = 1/concurrent = {args.concurrency}/g' $HOME/.gitlab-runner/config.toml") + # Let the user rather do that with another invocation of the installer + #add_registration(_runner_install_path) - # Download driver - os.system( - 'curl -LJO "https://git-ce.rwth-aachen.de/adrianschmitz2/runner-mirror/-/jobs/artifacts/main/raw/' - 'Runner.zip?job=deploy"') - os.system(f'unzip {install_Path}/Runner.zip') + config_dir = f'{args.install_path}/settings' + if args.static_exe and not args.static_contain_config: + config_dir = os.getenv('XDG_CONFIG_HOME', f"{os.getenv('HOME')}/.config") + '/ci_driver' + os.makedirs(config_dir, exist_ok=True) + generate_driver_config(config_dir) + generate_runner_main(args.static_exe) # create mapping for downscoping if args.downscope: - create_mapping(args.key_Path, args.public_key_Path, args.mapping_Path) + create_mapping(args.key_path, args.public_key_path) # enable the gitlab-runner globally, necessary for artifacts - from shutil import which - if which("gitlab-runner") is None: - HOME = os.getenv("HOME") - file = open(f"{HOME}/{shell_file}", "r+") - new_shell_file = f"export PATH=$PATH:{install_Path}/usr/bin\n" - file.writelines(new_shell_file) - file.close() + # we need to do this regardless of whether the gitlab-runner is in the current PATH + with open(args.shell_file, "a") as shell_fp: + shell_fp.writelines(f"export PATH=$PATH:{_runner_install_path}\n") - setRunstepDependencies(install_Path) - # Start Runner - # os.system("screen -dm ./usr/bin/gitlab-runner run") + set_utility_permissions(_runner_install_path, ['gitlab-runner']) + set_utility_permissions(args.rt_utility_path) -def add_registration(install_Path, Url, Token, Name, Tags): +def add_registration(runner_path=''): # Register Gitlab Runner + if runner_path: runner_path += '/' os.system( - f'gitlab-runner register --non-interactive --url "{Url}" --registration-token "{Token}"\ - --executor "custom" --description "{Name}" --tag-list "{Tags}" --run-untagged="false" --locked="false" \ + f'{runner_path}gitlab-runner register --non-interactive --url "{args.registration_url}" --registration-token "{args.registration_token}"\ + --executor "custom" --description "{args.runner_name}" --tag-list "{args.tag_list}" --run-untagged="false" --locked="false" \ --access-level="not_protected"\ - --custom-prepare-exec="{install_Path}/main.sh" --custom-config-exec="{install_Path}/main.sh"\ - --custom-run-exec="{install_Path}/main.sh" --custom-cleanup-exec="{install_Path}/main.sh"\ + --custom-prepare-exec="{args.install_path}/main.sh" --custom-config-exec="{args.install_path}/main.sh"\ + --custom-run-exec="{args.install_path}/main.sh" --custom-cleanup-exec="{args.install_path}/main.sh"\ --custom-prepare-args="prepare" --custom-config-args="config"\ --custom-run-args="run" --custom-cleanup-args="cleanup"') + os.system(f"sed -i 's/concurrent = 1/concurrent = {args.concurrency}/g' $HOME/.gitlab-runner/config.toml") # Uninstall the Gitlab runner and remove all associated processes -def doUninstall(install_Path): - install_Path = install_Path.replace("$HOME", os.getenv("HOME")) - # Directory Setup - # os.system("pkill screen") - - os.system(f"rm -rf {install_Path}") +def uninstall(): + os.system(f"rm -rf {args.install_path}") + #TODO: remove all potential absolute paths as well # Update the GitLab runner -def doUpdate(install_Path): - install_Path = install_Path.replace("$HOME", os.getenv("HOME")) - # Directory Setup - os.chdir(f"{install_Path}") - - # os.system("pkill screen") - - os.system("curl -LJO \"https://gitlab-runner-downloads.s3.amazonaws.com/latest/rpm/gitlab-runner_amd64.rpm\"") - - # Install GitLab Runner Locally - os.system("rpm2cpio ./gitlab-runner_amd64.rpm | cpio -idmv") - - # Make GitLab Runner executable - os.system("chmod +x usr/bin/gitlab-runner") +def update(): + # TODO: Should be optional +# os.system(f'curl -L --output {args.runner_install_path}/gitlab-runner "https://gitlab-runner-downloads.s3.amazonaws.com/latest/binaries/gitlab-runner-linux-amd64"') +# os.system(f"chmod +x {args.runner_install_path}/gitlab-runner") # Download driver os.system( 'curl -LJO "https://git-ce.rwth-aachen.de/adrianschmitz2/runner-mirror/-/jobs/artifacts/main/file/' - 'Runner.zip?job=deploy"') + 'Runner.zip?job=deploy-local"') os.system('unzip Runner.zip') - # Restart Runner - # os.system("screen -dm ./usr/bin/gitlab-runner run") - # Generate the main path depending on the installation path -def generateRunnerMain(install_Path): - install_Path = install_Path.replace("$HOME", os.getenv("HOME")) - if os.path.exists(f'{install_Path}/main.sh'): - os.remove(f"{install_Path}/main.sh") - file = open(f"{install_Path}/main.sh", "w") - main = ["#!/bin/bash\n", "module load python > /dev/null 2>&1\n", f"python3 {install_Path}/driver.py $@\n"] - file.writelines(main) - file.close() - os.system(f"chmod +x {install_Path}/main.sh") - - -def setRunstepDependencies(install_Path): - currentPath = '' - for path in install_Path.split('/'): - currentPath += path + '/' - os.system(f'chmod o+rx {currentPath}') - os.system(f'chmod o+rx {currentPath}core') - os.system(f'chmod -R o+rx {currentPath}core/scripts') - os.system(f'chmod o+rx {currentPath}usr/') - os.system(f'chmod -R o+rx {currentPath}usr/bin') - os.system(f'chmod -R o+rx {currentPath}usr/bin/gitlab-runner') +def generate_runner_main(is_exe_static=False): + execution_line = f'{args.install_path}/static_driver.exe' if is_exe_static else f'python3 {args.install_path}/driver.py' + with open(f"{args.install_path}/main.sh", "w") as fp: + fp.writelines(["#!/usr/bin/bash\n", + "#module load python > /dev/null 2>&1\n", + f"{execution_line} $@\n"]) + os.system(f"chmod +x {args.install_path}/main.sh") + + +def set_utility_permissions(base_path, executables=None): + currentpath = '' + for path in base_path.split('/'): + currentpath += path + '/' + subprocess.run(['chmod', 'o+rx', currentpath], env=os.environ, capture_output=True) + + if executables: + for executable in executables: + os.system(f'chmod o+rx {os.path.join(base_path, executable)}') + else: + for f in os.listdir(base_path): + if os.path.isfile(f): + os.system(f'chmod o+rx {f}') # Generate the config file for the custom driver -def generateDriverConfig(install_Path, downscope, key_path, mapping_path, user_path, aes_key_path, shell_installation, - log_path, shell_config): - install_Path = install_Path.replace("$HOME", os.getenv("HOME")) - +def generate_driver_config(dest_path): config_json = {} # Define whether downscoping shall be used or not, must be the first parameter. - if downscope: - config_json["Downscope"] = "True" - else: - config_json["Downscope"] = "False" - - # Define where the driver is installed, relative to the $HOME directory of the installing user. - config_json["Runner Path"] = install_Path - - # Define where the key for decrypting the mapping is located, either relative to the $HOME directory of the - # installing user or as an absolute path - if key_path.startswith("/") or key_path.startswith("$"): - config_json["Key Path"] = key_path - else: - config_json["Key Path"] = f'{install_Path}/{key_path}' - - # Define where the mapping is located, either relative to the $HOME directory of the - # installing user or as an absolute path - if mapping_path.startswith("/") or mapping_path.startswith("$"): - config_json["Map Path"] = mapping_path - else: - config_json["Map Path"] = f'{install_Path}/{mapping_path}' - - # Define where the AES key for encrypting the mapping file, either relative to the $HOME directory of the - # installing user or as an absolute path - if aes_key_path.startswith("/") or aes_key_path.startswith("$"): - config_json["AES Path"] = aes_key_path - else: - config_json["AES Path"] = f'{install_Path}/{aes_key_path}' - - if log_path == None: - config_json["Log Path"] = f'{install_Path}/logs' - elif log_path.startswith("/") or log_path.startswith("$"): - config_json["Log Path"] = log_path - else: - config_json["Log Path"] = f'{install_Path}/{log_path}' - - config_json["Shell Config"] = shell_config - - # Define where the build and cache paths shall be constructed, relative to the $WORK directory of the downscoped - # user. - config_json["User Path"] = user_path - - # Define where the shell to run the generated CI scripts is located. - config_json["Shell Install"] = shell_installation - - # Create File - if os.path.exists(f'{install_Path}/config.json'): - os.remove(f'{install_Path}/config.json') - file = open(f'{install_Path}/config.json', "w") - json.dump(config_json, file) - file.close() - - -def run(): + config_json["Downscope"] = "True" if args.downscope else "False" + + config_json["Runner Path"] = args.install_path + config_json["Key Path"] = args.key_path + config_json["Map Path"] = args.mapping_path + config_json["AES Path"] = args.aes_key_path + config_json["Log Path"] = args.log_path + config_json["Shell Config"] = args.shell_file + config_json["User Path"] = args.user_path + config_json["Shell Install"] = args.shell_installation + config_json["Runtime Utility Path"] = args.rt_utility_path + + with open(f'{dest_path}/config.json', "w") as fp_config: + import json + json.dump(config_json, fp_config, indent=4) + + +def create_static_exe(): + print('Creating static executable') + os.system('pip install -r utility/install/requirements-static.txt') + env = os.environ + if args.static_contain_config: + env.update({'STATIC_ADD_DATA': f'--add-data \"{args.install_path}/settings:settings\"'}) + res = subprocess.run(['./utility/install/build_static_driver.sh', f'{args.install_path}/static_driver.exe'], env=env, capture_output=True, text=True) + print(res.stdout) + print(res.stderr) + + + +if __name__ == '__main__': global args args = CLI() - - print(args) - - flag_sum = 0 - - if args.install: - flag_sum += 1 - - if args.remove: - flag_sum += 1 - - if args.update: - flag_sum += 1 - - if args.register: - flag_sum += 1 - - if flag_sum > 1: - print("-i, -rm, rg and -u may not be used in conjunction!") - exit(1) - - if args.install: - if args.registration_token is None or args.registration_url is None: - print("registration token and registration url must be defined for installation!") - exit(1) - - doInstall(args.registration_token, args.registration_url, args.runner_name, args.tag_list, args.install_Path, - args.shell_file) - - if args.remove: - doUninstall(args.install_Path) - - if args.update: - doUpdate(args.install_Path) - - if args.register: - add_registration(args.install_Path, args.registration_url, args.registration_token, args.runner_name, - args.tag_list) - - -run() + args.func() diff --git a/JSONAccountManager.py b/JSONAccountManager.py index ef618344022751b12a4f70ee75a8a5e5da12c628..6b47c42167b4f0af56e3998544e12fbd475ad07b 100644 --- a/JSONAccountManager.py +++ b/JSONAccountManager.py @@ -3,6 +3,7 @@ import os.path import string import random import os +import sys import core.authentication.EncryptionManager as encrypt @@ -132,7 +133,7 @@ def add_user_account(url, uid, priv_key_path, pub_key_path, map_path, cluster_ac mapping[url] = {"uid": {}, "pid": {}} if uid in mapping[url]["uid"]: print(f"Mapping for user: {uid} at gitlab instance: {url} already present.") - exit(1) + sys.exit(1) uid_dict = {"acc": cluster_acc, "delete": delete_date} mapping[url]["uid"][uid] = uid_dict encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key) @@ -144,7 +145,7 @@ def add_project_account(url, pid, priv_key_path, pub_key_path, map_path, cluster mapping[url] = {"uid": {}, "pid": {}} if pid in mapping[url]["pid"]: print(f"Mapping for project: {pid} at gitlab instance: {url} already present.") - exit(1) + sys.exit(1) pid_dict = {"acc": cluster_acc, "delete": delete_date} mapping[url]["pid"][pid] = pid_dict encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key) @@ -183,16 +184,16 @@ def run(): if args.remove_mapping and args.add_mapping and args.remove_url: print("Remove(-url) and add cannot be used in the same call.") - exit(1) + sys.exit(1) if args.gitlab_user_id is not None and args.gitlab_project_id is not None: print("Gitlab project id and gitlab user id cannot be provided in the same call.") - exit(1) + sys.exit(1) if args.create: if args.private_key_path is None or args.public_key_path is None or args.mapping_path is None or args.aes_encryption_key_path is None: print("Arguments for private/public key and mapping path must be provided.") - exit(1) + sys.exit(1) else: create_mapping(os.path.abspath(args.private_key_path), os.path.abspath(args.public_key_path), os.path.abspath(args.mapping_path), os.path.abspath(args.aes_encryption_key_path)) @@ -200,7 +201,7 @@ def run(): if args.remove_mapping: if args.gitlab_url is None: print("Arguments for gitlab url must be provided.") - exit(1) + sys.exit(1) if args.gitlab_project_id is not None: remove_project_account(args.gitlab_url, args.gitlab_project_id, os.path.abspath(args.private_key_path), os.path.abspath(args.public_key_path), @@ -211,12 +212,12 @@ def run(): os.path.abspath(args.mapping_path), os.path.abspath(args.aes_encryption_key_path)) else: print("Arguments for gitlab project id or gitlab user id must be provided.") - exit(1) + sys.exit(1) if args.remove_url: if args.gitlab_url is None: print("Argument for gitlab url must be provided.") - exit(1) + sys.exit(1) else: remove_url(args.gitlab_url, os.path.abspath(args.private_key_path), os.path.abspath(args.public_key_path), os.path.abspath(args.mapping_path), os.path.abspath(args.aes_encryption_key_path)) @@ -224,13 +225,13 @@ def run(): if args.add_mapping: if args.gitlab_url is None: print("Argument for gitlab url must be provided.") - exit(1) + sys.exit(1) if args.cluster_account_name is None: print("Argument for cluster account name must be provided.") - exit(1) + sys.exit(1) if args.delete_date is None: print("Argument for delete date must be provided.") - exit(1) + sys.exit(1) if args.gitlab_project_id is not None: add_project_account(args.gitlab_url, args.gitlab_project_id, os.path.abspath(args.private_key_path), os.path.abspath(args.public_key_path), os.path.abspath(args.mapping_path), @@ -242,7 +243,7 @@ def run(): args.cluster_account_name, args.delete_date, os.path.abspath(args.aes_encryption_key_path)) else: print("Arguments for gitlab project id or gitlab user id must be provided.") - exit(1) + sys.exit(1) run() \ No newline at end of file diff --git a/core/authentication/JSONManager.py b/core/authentication/JSONManager.py index 8972e7e993276174bf87c71d66268d9c144ab764..6f6315314a9b23695433d0b8f261f2f77095bb8b 100644 --- a/core/authentication/JSONManager.py +++ b/core/authentication/JSONManager.py @@ -2,6 +2,7 @@ import json import core.authentication.EncryptionManager as encrypt from filelock import FileLock import logging +import sys def add_id_mapping(path, CI_id, slurm_id): @@ -43,5 +44,5 @@ def get_account(url, pid, uid, key_path, map_path, aes_path): result = instance["pid"][pid]["acc"] except KeyError: logging.error("Cannot assign GitLab user/project to cluster account. Please register here: TODO") - exit(1) + sys.exit(1) return result diff --git a/core/job.py b/core/job.py index 286765dc2282e45a5da6058fd1032a42af9ce910..f7133ad7df0156c77ed04042e2b9a6e71d7a6c3e 100644 --- a/core/job.py +++ b/core/job.py @@ -3,8 +3,8 @@ import subprocess import os import logging import sys -import re -import time +from time import sleep +import importlib.resources as importlib_resources import core.modes as modes from core.utility import get_cenv, defines @@ -23,8 +23,6 @@ class Job: jobid = None pipeline_id = None account = None - _home = None - driver_path = None exec_script = '' allow_failure = False shared_tmp = '' @@ -44,22 +42,6 @@ class Job: return self.mode.custom_env return dict() - @property - def cmd(self): - return self.mode.cmd - - @cmd.setter - def cmd(self, cmd): - self._cmd = cmd - - @property - def home(self): - if not self._home: - if not self.down_scoping: - return os.getenv("WORK") - return f"/work/{self.account}" - return self._home - @property def build_path(self): return f'{self.user_path}/builds/{get_cenv("CI_CONCURRENT_PROJECT_ID")}/{get_cenv("CI_PROJECT_PATH_SLUG")}' @@ -82,45 +64,44 @@ class Job: pdb_attach.listen(port) if self.args[1] != 'config': logging.debug(f'Debug mode on listening on port {port}') - time.sleep(100) + sleep(100) except ImportError: if self.args[1] != 'config': logging.warning('Debug mode on but pdb_attach module not found') - replacements_base = ['HOME', 'WORK', 'HPCWORK'] - REPLACEMENTS = {f'${s}': os.getenv(s, '') for s in replacements_base} - def rep(s): - rep_value = dict((re.escape(k), v) for k, v in REPLACEMENTS.items()) - pattern = re.compile("|".join(rep_value.keys())) - return pattern.sub(lambda m: rep_value[re.escape(m.group(0))], s) - - with open(f'{self.driver_path}/config.json', mode='r') as file: - CONFIG_MAPPING = json.load(file) - - if CONFIG_MAPPING["Downscope"] == 'True' or CONFIG_MAPPING["Downscope"] == 'true': - self.down_scoping = True - else: - self.down_scoping = False - self.key_path = rep(CONFIG_MAPPING["Key Path"]) - self.map_path = rep(CONFIG_MAPPING["Map Path"]) - self.runner_path = rep(CONFIG_MAPPING["Runner Path"]) - self.aes_path = rep(CONFIG_MAPPING["AES Path"]) - self.shell_path = rep(CONFIG_MAPPING["Shell Install"]) - self.log_path = rep(CONFIG_MAPPING["Log Path"]) - self.shell_config = rep(CONFIG_MAPPING["Shell Config"]) + return os.path.expanduser(os.path.expandvars(s)) + + try: + settings = json.loads(importlib_resources.read_text('settings', 'config.json')) + except: + config_dir = os.getenv("XDG_CONFIG_HOME", f"{os.getenv('HOME')}/.config") + '/ci_driver' + with open(f'{config_dir}/config.json') as config_fp: + settings = json.load(config_fp) + + self.down_scoping = (settings["Downscope"] == 'True' or settings["Downscope"] == 'true') + self.key_path = rep(settings["Key Path"]) + self.map_path = rep(settings["Map Path"]) + self.runner_path = rep(settings["Runner Path"]) + self.aes_path = rep(settings["AES Path"]) + self.shell_path = rep(settings["Shell Install"]) + self.log_path = rep(settings["Log Path"]) + self.shell_config = rep(settings["Shell Config"]) + self.rt_utility_path = rep(settings["Runtime Utility Path"]) + self.scripts_path = f'{self.rt_utility_path}/scripts' if self.down_scoping: - uid, pid = jwt.get_UID_PID(get_cenv('CI_JOB_JWT'), f"{get_cenv('CI_SERVER_URL')}/-/jwks") + uid, pid = jwt.get_UID_PID(get_cenv('CI_JOB_JWT'), + f"{get_cenv('CI_SERVER_URL')}/-/jwks") self.account = man.get_account(get_cenv('CI_SERVER_URL'), pid, uid, self.key_path, self.map_path, self.aes_path) - self.user_path = f'/work/{self.account}/{CONFIG_MAPPING["User Path"]}' + self.user_path = settings["User Path"].replace("$USER", self.account) if self.account is None: logging.error( f"No mapping for GitLab project: {get_cenv('CI_PROJECT_NAME')}, or GitLab user: " f"{get_cenv('GITLAB_USER_NAME')} available. Please register CI support for to access the Runner") else: - self.user_path = f"{os.getenv('WORK')}/{CONFIG_MAPPING['User Path']}" - self.shell_config = f'/home/{self.account}/{self.shell_config}' + self.user_path = rep(settings['User Path']) + def __setup(self): self.__get_config() @@ -129,9 +110,11 @@ class Job: self.jobid = get_cenv('CI_JOB_ID') self.pipeline_id = get_cenv('CI_PIPELINE_ID') self.allow_failure = get_cenv('ALLOW_FAILURE', False) - self.driver_work = os.getenv("HOME") - self.shared_tmp = f'{self.driver_work}/shared_tmp/{self.pipeline_id}' - self.concurrent_tmp = f'{self.driver_work}/{get_cenv("CI_RUNNER_ID")}/concurrent_tmp/{get_cenv("CI_CONCURRENT_ID")}' + self.driver_cache = os.getenv('XDG_CACHE_HOME', f'{os.getenv("HOME")}/.cache') + '/ci_driver' + os.makedirs(self.driver_cache, exist_ok=True) + self.shared_tmp = f'{self.driver_cache}/shared_tmp/{self.pipeline_id}' + self.concurrent_tmp = f'{self.driver_cache}/{get_cenv("CI_RUNNER_ID")}/concurrent_tmp/{get_cenv("CI_CONCURRENT_ID")}' + #os.makedirs(self.concurrent_tmp, exist_ok=True) self.tmp_dir = f'{self.concurrent_tmp}/{self.pipeline_id}' self.stage_tmp_dir = f'{self.tmp_dir}/stages/{get_cenv("CI_JOB_STAGE")}' self.setup_logging() @@ -150,10 +133,8 @@ class Job: 'FALSE'] and defines.stdout_logging and self.args[1] != 'config': logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) - def __init__(self, args, driver_path): - self._cmd = None + def __init__(self, args): self.args = args - self.driver_path = driver_path self.mode_name = get_cenv('CI_MODE', 'Slurm').strip() self._mode = None self.__setup() @@ -190,7 +171,7 @@ class Job: _cache_path = self.cache_path _builds_dir_is_shared = True if get_cenv('CI_MODE') == 'SingleSlurmJobAcrossStages': - self._mode.get_run_properties() + self._mode.get_custom_config() _build_path = f'{self._mode.tmp_dir}/builds' _cache_path = f'{self._mode.tmp_dir}/cache' _builds_dir_is_shared = False @@ -225,36 +206,12 @@ class Job: parameter_string += f'{v} ' return parameter_string - def popen_wrapper(self, run_properties, run_script, helper_script, skip_env=False, **kwargs): - if self.down_scoping: - run_properties = f'sudo -u {self.account} ' + run_properties - os.chdir("/work") - command = [] - if helper_script: - command.append(helper_script) - if not run_properties: - command.extend([run_script]) - else: - command.extend([run_properties, run_script]) - else: - command = f'{run_properties} {run_script}' - logging.info(f'Command before {command.strip()}') - command = command.strip().split(' ') - logging.info(f'Executing with env: {str(self.custom_env)}') - logging.info(f'Executing command: {str(command)}') - os.chdir('/tmp') - return subprocess.Popen(command, - env=(dict(os.environ, **{x: self.custom_env[x] - for x in self.custom_env}) if not skip_env - else os.environ), - **kwargs) - def execute(self, run_properties, run_script, helper_script, install_env=False, skip_env=False, script_execution=False, do_inbetween_processing=False, srun_wrap=False, **kwargs): if self.down_scoping: wrapper = '' if srun_wrap: - wrapper = 'srun' + wrapper = modes.get_srun_cmd() wrapper += (f' --jobid={self._mode.slurm_simple_job_id} ' if self._mode.slurm_simple_job_id else ' ') if install_env: wrapper += f' --export=CUSTOM_SHELL_CONFIG={self.shell_config}' @@ -263,9 +220,10 @@ class Job: command = [helper_script] if run_properties: command.extend([run_properties]) - setup_env_scripts = self._mode.get_env_setup() - if setup_env_scripts: - command.extend(setup_env_scripts.split()) + if script_execution: + setup_env_scripts = self._mode.get_env_setup() + if setup_env_scripts: + command.extend(setup_env_scripts.split()) command.extend(run_script.split(' ')) if script_execution: self._mode.custom_run_setup(install_env=install_env, skip_env=skip_env, script_execution=script_execution, @@ -293,34 +251,19 @@ class Job: if script_execution and cmd_return_code != 0: if self._mode and not self.allow_failure: self._mode.cleanup_on_failure() - exit(cmd_return_code) + sys.exit(cmd_return_code) # FIXME: do not rely on internal implementation of subprocess.run return subprocess.CompletedProcess(main_proc.args, main_proc.returncode, stdout, stderr) def run(self): if not self.allow_failure: utility.update_json_kv(self.error_code_file, self.jobid, -1) - self.exec_script = self.args[2] - command = f"{self.driver_path}/core/scripts/xPipeHelper.sh" - run_properties = '' - do_inbetween_processing = False if self.args[3] not in ['build_script', 'step_script']: - run_properties = self._mode.get_simple_script_exec() - run_script = self.exec_script + self._mode.run_simple_script() else: - command = self._mode.get_combiner_script() - run_properties += self._mode.get_run_properties() - run_script = self._mode.get_run_script() - do_inbetween_processing = True - self.execute(run_properties, run_script, command, script_execution=True, - do_inbetween_processing=do_inbetween_processing) - - if self.args[3] in ['build_script', 'step_script'] and self._mode.has_post_run_script(): - command = f"{self.driver_path}/core/scripts/xPipeHelper.sh" - run_properties = self._mode.get_post_run_properties() - run_script = self._mode.get_post_run_script() - self.execute(run_properties, run_script, command, script_execution=True) + self._mode.run_main_script() + self._mode.run_post_script() def cleanup(self): self._mode = modes.get_mode(self) diff --git a/core/modes/__init__.py b/core/modes/__init__.py index 3f680d60e3791f8eebc14f56acc4e30f56b16554..c04dd7695f305ae4dd3401cdc811610b31597289 100644 --- a/core/modes/__init__.py +++ b/core/modes/__init__.py @@ -1,705 +1,27 @@ from abc import ABC, abstractmethod import os -import subprocess -import re import importlib -import sys -from filelock import FileLock -import json -import core.authentication.JSONManager as man import logging from core.utility import get_cenv import core.utility as utility +from core.modes.ssh import SSH +from core.modes.slurm import ( + Slurm, + Sbatch, + Batch, + Default, + SingleSlurmJobAcrossStages, + Singularity, + Singularity_Batch, +) + VALID_MODES = ['Default', 'Slurm', 'Singularity', 'Batch', 'SingleSlurmJobAcrossStages', 'Sbatch'] DOWN_MODES = ['Singularity_Batch'] XLOCAL_MODES = ['SSH'] ALL_MODES = VALID_MODES + XLOCAL_MODES + DOWN_MODES -srun_path = "srun" # "/usr/local_host/bin/srun" -sbatch_path = "sbatch" # "/usr/local_host/bin/sbatch" - - -class ModeBase(ABC): - _run_script = '' - _run_properties = '' - _custom_env = dict() - slurm_jobid_file = None - _combiner_script = '' - run_properties_postfix = None - - @staticmethod - def get_jobid_from_file(path): - with open(path, 'r') as node_index_fp: - return node_index_fp.readline().strip() - - @property - def run_script(self): - return self._run_script - - @property - def combiner_script(self): - return self._combiner_script - - @property - def run_properties(self): - return self._run_properties - - @abstractmethod - def get_combiner_script(self): - pass - - @property - def custom_env(self): - return self._custom_env - - def __init__(self, job): - self.job = job - - @abstractmethod - def get_run_properties(self): - pass - - @abstractmethod - def get_run_script(self): - pass - - def has_post_run_script(self): - return False - - def get_post_run_script(self): - return None - - def custom_run_setup(self, **kwargs): - pass - - def get_post_run_properties(self): - return None - - def custom_prepare(self): - pass - - def get_simple_script_exec(self): - return None - - def inbetween_processing(self, main_proc=None): - pass - - @staticmethod - def get_env_setup(): - return '' - - def get_run_properties_postfix(self): - pass - - def cleanup_on_failure(self): - if not self.slurm_jobid_file or not os.path.isfile(self.slurm_jobid_file): - logging.debug(f'Skipping cleanup_on_failure due to missing slurm jobid file') - return - command = [] - if self.job.down_scoping: - command = f"sudo -u {self.job.account} ".split() - logging.debug(f'cleanup_on_failure command: {command}') - scancel_out = self.job.execute(f'/opt/slurm/current/bin/scancel ' - f'{ModeBase.get_jobid_from_file(self.slurm_jobid_file)}', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", text=True, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - logging.debug(f'cleanup_on_failure result: {scancel_out}') - - @abstractmethod - def cleanup(self): - if self.job.allow_failure: - return - cc_tmp = os.path.split(self.job.concurrent_tmp)[0] - do_cleanup = 0 - for cc_id in os.listdir(cc_tmp): - pipeline_path = f'{cc_tmp}/{cc_id}/{self.job.pipeline_id}/stages/{get_cenv("CI_JOB_STAGE")}' - if not os.path.isdir(pipeline_path): - continue - for error_file in os.listdir(pipeline_path): - if not error_file.endswith('.json'): - continue - - lock = FileLock(f'{pipeline_path}/{error_file}.lock') - with lock: - with open(f'{pipeline_path}/{error_file}', 'r+') as error_file_fd: - error_file_fd.seek(0) - error_codes = json.load(error_file_fd) - for jobid in error_codes: - if error_codes[jobid] == -1: - # do_cleanup = -1 - return - elif error_codes[jobid] == 1: - do_cleanup = 1 - - if do_cleanup == 1 and self.job.shared_tmp and os.path.isdir(self.job.shared_tmp): - command = [] - if self.job.down_scoping: - command = f"sudo -u {self.job.account} ".split() - command.extend(['/opt/slurm/current/bin/scancel', f'--name=CI_{self.job.pipeline_id}']) - logging.debug(f'doing cleanup shared_tmp={self.job.shared_tmp} command={command}') - scancel_out = subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout - logging.debug(f'doing cleanup res={scancel_out}') - os.system(f'rm -r {self.job.shared_tmp}') - man.remove_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID")) - try: - os.rmdir(f'{self.job.runner_path}/scripts/{self.job.pipeline_id}') - except (FileNotFoundError, OSError): - pass - - def abort(self, error_str, exit_code=1): - if self.job.error_code_file: - utility.update_json_kv(self.job.error_code_file, self.job.jobid, exit_code) - ModeBase.cleanup(self) - logging.debug(f'Aborting with error: {error_str}') - exit(exit_code) - - -class Slurm(ModeBase): - slurm_vars = [] - considered_env_prefixes = ['SLURM', 'SRUN', 'SALLOC'] - - def get_run_properties_postfix(self): - return "" - - def set_internal_slurm_job(self): - if self.slurm_simple_job_id: - return - - try: - with open(f"{self.job.driver_path}/SlurmIDMapping.json", "r") as file: - mapping = json.loads(file.read()) - self.slurm_simple_job_id = mapping[get_cenv("CI_JOB_ID")] - if self.job.down_scoping: - if not subprocess.run([f'{self.job.driver_path}/core/scripts/runHelper.sh', - f'sudo -u {self.job.account} srun --jobid={self.slurm_simple_job_id} /usr/bin/zsh -l -c echo']).returncode: - return - else: - man.remove_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID")) - except (IOError, KeyError): - self.slurm_simple_job_id = None - logging.warning(f'Could not read internal Slurm jobid from mapping file') - - salloc_command = ['/opt/slurm/current/bin/salloc', '--cpus-per-task=1', '--ntasks=1', '--no-shell', f'--job-name=CI_{self.job.pipeline_id}'] - salloc_out = self.job.execute(' '.join(salloc_command), '', f"{self.job.driver_path}/core/scripts/runHelper.sh", - text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout - logging.debug(f'custom_prepare salloc_command={salloc_command}') - logging.debug(f'custom_prepare salloc_out={salloc_out}') - try: - self.slurm_simple_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1) - logging.info(f'Using internal Slurm jobid {self.slurm_simple_job_id}') - man.add_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"), - self.slurm_simple_job_id) - except (AttributeError): - self.abort(f'Could not allocate a Slurm job for internal usage') - - - - def custom_prepare(self): - self.set_internal_slurm_job() - # install gitlab runner if necessary - self.job.execute(f'/usr/bin/mkdir -p {self.job.user_path}', - "", - f"{self.job.driver_path}/core/scripts/runHelper.sh", - skip_env=True, srun_wrap=True) - self.job.execute(f'/usr/bin/cp /dev/stdin ' - f'{self.job.user_path}/wrapper{self.job.jobid}.sh', - f"{self.job.driver_path}/core/scripts/zshWrapper.sh", - f"{self.job.driver_path}/core/scripts/xPipeHelper.sh", - skip_env=True, srun_wrap=True) - git_runner_command = [f'{self.job.shell_path} -l', - f"{self.job.user_path}/wrapper{self.job.jobid}.sh"] - self.job.execute(' '.join(git_runner_command), - f'{self.job.driver_path}/core/scripts/runnerInstallHelper.sh', - f"{self.job.driver_path}/core/scripts/xPipeHelper.sh", srun_wrap=True, install_env=True) - - def get_simple_script_exec(self): - return f"{srun_path} --jobid={self.slurm_simple_job_id} {self.job.shell_path} -l " \ - f"{self.job.user_path}/wrapper{self.job.jobid}.sh" - - @staticmethod - def env_copy(): - user = os.getenv("USER") - env = {k: v for k, v in os.environ.items() if not v.__contains__(user) or k == "PATH"} - export_env = "" - for k, v in env.items(): - export_env = f'{export_env}{k}="{v}",' - return export_env[:-1] - - def get_combiner_script(self): - self._combiner_script = f"{self.job.driver_path}/core/scripts/xPipeHelper.sh" - return self._combiner_script - - def set_slurm_env(self): - self._run_properties = self.job.get_parameters() - - def set_srun_cmd(self): - prop_list = [f'{srun_path}', f'--job-name=CI_{self.job.jobid}'] - prop_list.extend(self.slurm_vars) - self._run_properties = ' '.join(prop_list) - - def __init__(self, job): - ModeBase.__init__(self, job) - self.slurm_simple_job_id = None - self.set_internal_slurm_job() - - def get_run_properties(self): - self.set_srun_cmd() - self.set_slurm_env() - return f'{srun_path} ' + self._run_properties + f' {self.job.shell_path} -l' - - def get_run_script(self): - self._run_script = self.job.exec_script - return self._run_script - - def cleanup(self): - self.set_internal_slurm_job() - self.job.execute(f'/usr/bin/rm ' - f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", - srun_wrap=True) - ModeBase.cleanup(self) - self.job.execute(f'scancel {self.slurm_simple_job_id}', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh") - - -class Sbatch(Slurm): - def __init__(self, job): - Slurm.__init__(self, job) - self.does_inbetween_processing = True - self.slurm_output_dir = f'{self.job.clone_path}/slurm_output' - self.slurm_output_file = f'{self.slurm_output_dir}/so_{self.job.jobid}.txt' - - def get_batch_properties(self, batch_script): - # cmd_out = "" - if self.job.down_scoping: - stdout = self.job.execute(f"{srun_path} /usr/bin/cat {batch_script}", '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", stdout=subprocess.PIPE, - text=True).stdout - cmd_out = ' '.join([l.split()[1] for l in stdout.split("\n") if l.startswith('#SBATCH')]) - else: - with open(batch_script, 'r') as f: - cmd_out = ' '.join([l.split()[1] for l in f.readlines() if l.startswith('#SBATCH')]) - return cmd_out - - def get_combiner_script(self): - self._combiner_script = f"{self.job.driver_path}/core/scripts/xPipeHelper.sh" - return self._combiner_script - - - def get_run_properties(self): - batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}' - self._run_properties = f'{sbatch_path} --wait {self.get_batch_properties(batch_script)} ' \ - f'{self.job.get_parameters()} --output={self.slurm_output_file}' - return self._run_properties - - def custom_run_setup(self, **kwargs): - if kwargs["script_execution"] and kwargs["do_inbetween_processing"]: - logging.debug('Creating so file') - if self.job.down_scoping: - self.job.execute(f'/usr/bin/mkdir -p ' - f'{self.slurm_output_dir}', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True) - self.job.execute(f'/usr/bin/touch ' - f'{self.slurm_output_file}', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True) - self.job.execute(f'/usr/bin/chmod ' - f'o+r {self.slurm_output_file}', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True) - self.job.execute(f'/usr/bin/cp /dev/stdin ' - f'{self.job.build_path}/chmodPath{self.job.jobid}.sh', - f"{self.job.driver_path}/core/scripts/chmodPath.sh", - f"{self.job.driver_path}/core/scripts/xPipeHelper.sh", - skip_env=True, srun_wrap=True) - self.job.execute(f'/usr/local_rwth/bin/zsh {self.job.build_path}/chmodPath{self.job.jobid}.sh', - f'{self.slurm_output_dir}', - f"{self.job.driver_path}/core/scripts/execHelper.sh", srun_wrap=True) - else: - os.makedirs(self.slurm_output_dir, exist_ok=True) - os.system(f'touch {self.slurm_output_file}') - - def inbetween_processing(self, main_proc): - logging.debug(f'Starting inbetween processing') - return subprocess.Popen(f'tail -F {self.slurm_output_file}'.split()) - - def has_post_run_script(self): - return True - - def get_post_run_script(self): - return Slurm.get_run_script(self) - - def get_post_run_properties(self): - return Slurm.get_run_properties(self) - - def get_run_script(self): - tmp = os.getenv("TMP") - with open(f'{self.job.driver_path}/core/scripts/batchWrapper.sh', 'r') as file: - filedata = file.read() - filedata = filedata.replace('replaceme', f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}') - with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file: - file.write(filedata) - return f'{tmp}/wrapper{self.job.jobid}' - -class Default(Sbatch): - - def get_combiner_script(self): - self._combiner_script = f"{self.job.driver_path}/core/scripts/xPipeHelper.sh" - return self._combiner_script - - def get_run_properties(self): - parameter_string = self.job.get_parameters() - self._run_properties = f'{sbatch_path} --wait --output={self.slurm_output_file} {parameter_string}' - return self._run_properties - - def get_run_script(self): - self.job.execute(f'/usr/bin/cp /dev/stdin ' - f'{self.job.clone_path}/script.', - f"{Slurm.get_run_script(self)}", - f"{self.job.driver_path}/core/scripts/xPipeHelper.sh", - skip_env=True, srun_wrap=True) - tmp = os.getenv("TMP") - with open(f'{self.job.driver_path}/core/scripts/batchWrapper.sh', 'r') as file: - filedata = file.read() - filedata = filedata.replace('replaceme', f'{self.job.clone_path}/script.') - with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file: - file.write(filedata) - return f'{tmp}/wrapper{self.job.jobid}' - - def has_post_run_script(self): - return False - -class Batch(Slurm): - - def __init__(self, job): - Slurm.__init__(self, job, ) - - def get_combiner_script(self): - self._combiner_script = f"{self.job.driver_path}/core/scripts/execHelper.sh" - return self._combiner_script - - def get_batch_properties(self, batch_script): - # cmd_out = "" - if self.job.down_scoping: - stdout = self.job.execute(f"{srun_path} /usr/bin/cat {batch_script}", '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", stdout=subprocess.PIPE, - text=True).stdout - cmd_out = ' '.join([l.split()[1] for l in stdout.split("\n") if l.startswith('#SBATCH')]) - else: - with open(batch_script, 'r') as f: - cmd_out = ' '.join([l.split()[1] for l in f.readlines() if l.startswith('#SBATCH')]) - return cmd_out - - def get_run_properties(self): - batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}' - self._run_properties = ' '.join([f'{srun_path}', self.get_batch_properties(batch_script)]) - print('Warning: The contents of the script section in the CI definition ' - 'will be used as a post-processing script in the batch mode.') - - return self._run_properties + f' {self.job.shell_path}' - - def get_run_properties_postfix(self): - return "" - - def get_run_script(self): - self._run_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}' - return self._run_script - - def has_post_run_script(self): - return True - - def get_post_run_script(self): - return Slurm.get_run_script(self) - - def get_post_run_properties(self): - return Slurm.get_run_properties(self) - - def cleanup(self): - self.job.execute(f'/usr/bin/rm ' - f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True) - ModeBase.cleanup(self) - self.job.execute(f'scancel {self.slurm_simple_job_id}', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh") - - -class Singularity_Batch(Default): - container = '' - - @staticmethod - def escape(s): - return s.replace('/', '\/') - - def custom_run_setup(self, **kwargs): - Sbatch.custom_run_setup(self, **kwargs) - if kwargs["script_execution"] and kwargs["do_inbetween_processing"]: - logging.debug('Creating param file') - # write env_file - with open(f'{self.job.tmp_dir}/env{self.job.jobid}', 'w') as file: - file.write(f"CONTAINER={self.container}\0EXEC_WRAPPER={srun_path}\0PARAMS={self.job.get_parameters()}\0OUTPUT_FILE=--output={self.slurm_output_file}") - - - self.job.execute(f'/usr/bin/cp /dev/stdin ' - f'{self.slurm_output_dir}/batchEnv{self.job.jobid}', - f"{self.job.tmp_dir}/env{self.job.jobid}", - f"{self.job.driver_path}/core/scripts/xPipeHelper.sh", - skip_env=True, srun_wrap=True) - - def get_combiner_script(self): - self._combiner_script = f"{self.job.driver_path}/core/scripts/xPipeHelper.sh" - return self._combiner_script - - def get_run_script(self): - self.job.execute(f'/usr/bin/cp /dev/stdin ' - f'{self.job.clone_path}/script{self.job.jobid}.sh', - f'{Slurm.get_run_script(self)}', f"{self.job.driver_path}/core/scripts/xPipeHelper.sh", - skip_env=True, srun_wrap=True) - # move wrapper to user - tmp = os.getenv("TMP") - with open(f'{self.job.driver_path}/core/scripts/batchWrapper.sh', 'r') as file: - filedata = file.read() - filedata = filedata.replace('replaceme', f'{self.job.clone_path}/singularity{self.job.jobid}.sh < {self.job.clone_path}/script{self.job.jobid}.sh') - with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file: - file.write(filedata) - return f'{tmp}/wrapper{self.job.jobid}' - - def get_run_properties(self): - self._run_properties = Default.get_run_properties(self) - # get container for singularity - self.container = get_cenv('CONTAINER') - if self.container is None: - ModeBase.abort(self, "Error: No container defined, use variable CONTAINER") - if os.path.exists(self.container): - self.container = f'{self.job.clone_path}/{self.container}' - # Generation of the singularity script within user space - self.job.execute(f'/usr/bin/cp /dev/stdin ' - f'{self.job.clone_path}/singularity{self.job.jobid}.sh', - self.get_run_properties_postfix(), f"{self.job.driver_path}/core/scripts/xPipeHelper.sh", - skip_env=True, srun_wrap=True) - # add singularity specific parameter to the properties - property_split = self._run_properties.split(" ") - property_split.insert(1, f'--export-file={self.slurm_output_dir}/batchEnv{self.job.jobid}') - self._run_properties = " ".join(property_split) - return self._run_properties - - def cleanup(self): - ModeBase.cleanup(self) - self.job.execute(f'scancel {self.slurm_simple_job_id}', '', f"{self.job.driver_path}/core/scripts/runHelper.sh") - - def get_run_properties_postfix(self): - self.run_properties_postfix = "" - # get container for singularity - self.container = get_cenv('CONTAINER') - if self.container is None: - ModeBase.abort(self, "Error: No container defined, use variable CONTAINER") - if os.path.exists(self.container): - self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityLocalRunstep.sh ' - else: - self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityRunstep.sh ' - return self.run_properties_postfix - - -class Singularity(Slurm): - container = '' - - def get_combiner_script(self): - self._combiner_script = f"{self.job.driver_path}/core/scripts/xPipeHelper.sh" - return self._combiner_script - - def __init__(self, job): - Slurm.__init__(self, job) - - def get_run_script(self): - self._run_script = Slurm.get_run_script(self) - return self._run_script - - def get_run_properties(self): - self._run_properties = Slurm.get_run_properties(self) - # get container for singularity - self.container = get_cenv('CONTAINER') - if self.container is None: - ModeBase.abort(self, "Error: No container defined, use variable CONTAINER") - if os.path.exists(self.container): - self.container = f'{self.job.clone_path}/{self.container}' - # Generation of the singularity script within user space - self.job.execute(f'/usr/bin/cp /dev/stdin ' - f'{self.job.user_path}/script{self.job.jobid}.sh', - self.get_run_properties_postfix(), f"{self.job.driver_path}/core/scripts/xPipeHelper.sh", - skip_env=True, srun_wrap=True) - # add singularity specific parameter to the properties - property_split = self._run_properties.split(" ") - property_split.insert(1, f'--export=CONTAINER={self.container}') - property_split.append(f"{self.job.user_path}/script{self.job.jobid}.sh") - self._run_properties = " ".join(property_split) - return self._run_properties - - def cleanup(self): - self.job.execute(f'/usr/bin/rm ' - f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True) - ModeBase.cleanup(self) - self.job.execute(f'/usr/bin/rm ' - f'{self.job.user_path}/script{self.job.jobid}.sh', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True) - self.job.execute(f'scancel {self.slurm_simple_job_id}', '', f"{self.job.driver_path}/core/scripts/runHelper.sh") - - def get_run_properties_postfix(self): - self.run_properties_postfix = "" - # get container for singularity - self.container = get_cenv('CONTAINER') - if self.container is None: - ModeBase.abort(self, "Error: No container defined, use variable CONTAINER") - if os.path.exists(self.container): - self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityLocalRunstep.sh ' - else: - self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityRunstep.sh ' - return self.run_properties_postfix - - -class SingleSlurmJobAcrossStages(Slurm): - def get_combiner_script(self): - self._combiner_script = f"{self.job.driver_path}/core/scripts/xPipeHelper.sh" - return self._combiner_script - - def get_run_properties_postfix(self): - return "" - - def get_jobid_from_file(self, path): - with open(path, 'r') as node_index_fp: - return node_index_fp.readline().strip() - - @staticmethod - def get_node_id_str(variables): - if variables and len(variables): - return ''.join([os.getenv(v).replace('/', '') for v in variables]) - return 'Sequential' - - @staticmethod - def get_env_for_single_slurm_job(variables): - return {v.replace('CUSTOM_ENV_PARVAR_', ''): os.getenv(v) for v in variables} - - def __init__(self, job): - Slurm.__init__(self, job) - self.slurm_job_id = None - self.id_vars = [v for v in os.environ if v.startswith('CUSTOM_ENV_PARVAR_')] - os.makedirs(f'{self.job.shared_tmp}/SlurmJobIds', exist_ok=True) - os.makedirs(f'{self.job.tmp_dir}/SlurmJobIds', exist_ok=True) - self.slurm_jobid_file = f'{self.job.shared_tmp}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt' - self.cc_slurm_jobid_file = f'{self.job.tmp_dir}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt' - self.tmp_dir = None - - def get_run_properties(self): - if not os.path.isfile(self.slurm_jobid_file): - salloc_command = ['/opt/slurm/current/bin/salloc', '--no-shell', f'--job-name=CI_{self.job.pipeline_id}'] + \ - self.job.get_parameters().split(" ") - salloc_command.extend(self.slurm_vars) - salloc_out = self.job.execute(' '.join(salloc_command), '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", - text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout - logging.debug(f'run_properties salloc_command={salloc_command}') - logging.debug(f'run_properties salloc_out={salloc_out}') - self.slurm_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1) - with open(self.slurm_jobid_file, 'w+') as slurm_jobid_fp: - slurm_jobid_fp.write(self.slurm_job_id + '\n') - with open(self.cc_slurm_jobid_file, 'w+') as slurm_jobid_fp: - slurm_jobid_fp.write(self.slurm_job_id + '\n') - logging.info(f'Started new slurm_job_id={self.slurm_job_id}, could not find {self.slurm_jobid_file}') - else: - self.slurm_job_id = self.get_jobid_from_file(self.slurm_jobid_file) - logging.info(f'Using slurm_job_id={self.slurm_job_id}, could find {self.slurm_jobid_file}') - - tmp_dir_srun_out = self.job.execute(f'{srun_path} --jobid {self.slurm_job_id} /usr/bin/printenv TMP', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=False, - text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout.split('\n') - logging.debug(f'srun tmp_dir output (unfitered): {tmp_dir_srun_out}') - ignores = ['error', 'slurmstepd'] - self.tmp_dir = [x for x in tmp_dir_srun_out if all(s not in x for s in ignores)] - logging.debug(f'srun tmp_dir output: {self.tmp_dir}') - self.tmp_dir = self.tmp_dir[0] - - - self._run_properties = Slurm.get_run_properties(self).split() - - additional_env = [] - for k, v in self.get_env_for_single_slurm_job(self.id_vars).items(): - additional_env.append(f"{k}={v}") - if not additional_env.__sizeof__() == 0: - self._run_properties.insert(1, f'--export=' + ",".join(additional_env)) - self._run_properties.insert(1, f'--jobid={self.slurm_job_id}') - self._run_properties = ' '.join(self._run_properties) - return self._run_properties - - def get_run_script(self): - self._run_script = Slurm.get_run_script(self) - return self._run_script - - def get_simple_script_exec(self): - if self.job.args[1] == 'run': - return self.get_run_properties() - return Slurm.get_simple_script_exec(self) - - def cleanup(self): - if get_cenv('END_SINGLE_SLURM_JOB') == '1': - scancel_out = self.job.execute(f'/opt/slurm/current/bin/scancel ' - f'{self.get_jobid_from_file(self.slurm_jobid_file)}', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", - text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout - logging.debug(f'cleanup res={scancel_out}') - try: - os.remove(self.slurm_jobid_file) - except FileNotFoundError: - pass - # Cleanup the directory with the jobIds of the whole pipeline - try: - os.rmdir(f'{self.job.shared_tmp}/SlurmJobIds') - except (FileNotFoundError, OSError): - pass - - try: - os.rmdir(f'{self.job.shared_tmp}') - except (FileNotFoundError, OSError): - pass - self.job.execute(f'/usr/bin/rm ' - f'{self.job.user_path}/wrapper{self.job.jobid}.sh', '', - f"{self.job.driver_path}/core/scripts/runHelper.sh", srun_wrap=True) - ModeBase.cleanup(self) - - -class SSH(ModeBase): - def __init__(self, job): - ModeBase.__init__(self, job) - self.dest_node = get_cenv('CI_SSH_HOST') - if not self.dest_node: - ModeBase.abort(self, "Using ssh mode but no node specified. Specify: CI_SSH_HOST") - - def get_env_setup(self): - return f' {self.job.driver_path}/core/scripts/ssh.env ' - - def get_run_properties(self): - return f'ssh -T {self.dest_node}' - - # TODO: Move this to ModeBase (is same for Slurm, except Batch) - def get_run_script(self): - self._run_script = f'{self.job.exec_script}' - return self._run_script - - def get_combiner_script(self): - self._combiner_script = f"{self.job.driver_path}/core/scripts/xPipeHelper.sh" - return self._combiner_script - - def get_simple_script_exec(self): - return f"ssh -T {self.dest_node}" - - def cleanup(self): - ModeBase.cleanup(self) - - def get_custom_prepare(self): - pass - - @staticmethod - def has_post_run_script(): - return False - # Get the possible modes from the CI script def get_mode(job): diff --git a/core/modes/base.py b/core/modes/base.py new file mode 100644 index 0000000000000000000000000000000000000000..5647bac8a5972162625c6729f3868b905ccf9d6b --- /dev/null +++ b/core/modes/base.py @@ -0,0 +1,147 @@ +from abc import ABC, abstractmethod +import os +import subprocess +import re +import importlib +import sys +from filelock import FileLock +import json +import core.authentication.JSONManager as man +import logging + +from core.utility import get_cenv +import core.utility as utility +from core.utility.executor import Executor + + +class ModeBase(ABC): + _custom_env = dict() + slurm_jobid_file = None + executor = None + + @staticmethod + def get_jobid_from_file(path): + with open(path, 'r') as node_index_fp: + return node_index_fp.readline().strip() + + @abstractmethod + def get_combiner_script(self): + pass + + @property + def custom_env(self): + return self._custom_env + + def __init__(self, job): + self.job = job + self.executor = Executor.__init__(job, job.down_scoping) + + def get_run_parameters(self): + return '' + + def get_run_wrapper(self): + return '' + + def get_run_script(self): + return '' + + def get_simple_run_script(self): + return self.job.args[2] + + def get_simple_run_parameters(self): + return '' + + def get_simple_run_wrapper(self): + return '' + + def run_simple_script(self): + pass + + def run_main_script(self): + pass + + def run_post_script(self): + pass + + def get_post_run_script(self): + return None + + def custom_run_setup(self, **kwargs): + return '' + + def get_post_run_parameters(self): + return '' + + def get_post_run_wrapper(self): + return '' + + def custom_prepare(self): + pass + + def get_simple_script_exec(self): + return None + + def inbetween_processing(self): + pass + + @staticmethod + def get_env_setup(): + return '' + + def get_custom_config(self): + return None + + def cleanup_on_failure(self): + if not self.slurm_jobid_file or not os.path.isfile(self.slurm_jobid_file): + logging.debug(f'Skipping cleanup_on_failure due to missing slurm jobid file') + return + command = [] + if self.job.down_scoping: + command = f"sudo -u {self.job.account} ".split() + logging.debug(f'cleanup_on_failure command: {command}') + scancel_out = self.executor.cancel(f'{ModeBase.get_jobid_from_file(self.slurm_jobid_file)}') + logging.debug(f'cleanup_on_failure result: {scancel_out}') + + @abstractmethod + def cleanup(self): + if self.job.allow_failure: + return + cc_tmp = os.path.split(self.job.concurrent_tmp)[0] + do_cleanup = 0 + for cc_id in os.listdir(cc_tmp): + pipeline_path = f'{cc_tmp}/{cc_id}/{self.job.pipeline_id}/stages/{get_cenv("CI_JOB_STAGE")}' + if not os.path.isdir(pipeline_path): + continue + for error_file in os.listdir(pipeline_path): + if not error_file.endswith('.json'): + continue + + lock = FileLock(f'{pipeline_path}/{error_file}.lock') + with lock: + with open(f'{pipeline_path}/{error_file}', 'r+') as error_file_fd: + error_file_fd.seek(0) + error_codes = json.load(error_file_fd) + for jobid in error_codes: + if error_codes[jobid] == -1: + # do_cleanup = -1 + return + elif error_codes[jobid] == 1: + do_cleanup = 1 + + if do_cleanup == 1 and self.job.shared_tmp and os.path.isdir(self.job.shared_tmp): + logging.debug(f'doing cleanup shared_tmp={self.job.shared_tmp} command=--name=CI_{self.job.pipeline_id}') + scancel_out = self.executor.cancel(f'--name=CI_{self.job.pipeline_id}') + logging.debug(f'doing cleanup res={scancel_out}') + os.system(f'rm -r {self.job.shared_tmp}') + man.remove_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID")) + try: + os.rmdir(f'{self.job.runner_path}/scripts/{self.job.pipeline_id}') + except (FileNotFoundError, OSError): + pass + + def abort(self, error_str, exit_code=1): + if self.job.error_code_file: + utility.update_json_kv(self.job.error_code_file, self.job.jobid, exit_code) + ModeBase.cleanup(self) + logging.debug(f'Aborting with error: {error_str}') + exit(exit_code) diff --git a/core/modes/common.py b/core/modes/common.py new file mode 100644 index 0000000000000000000000000000000000000000..7e7b303aaddb7ecaa32f8757c1c79751ec2572f3 --- /dev/null +++ b/core/modes/common.py @@ -0,0 +1,17 @@ +from abc import ABC, abstractmethod +import os +import subprocess +import re +import importlib +import sys +from filelock import FileLock +import json +import core.authentication.JSONManager as man +import logging +import time + +from core.utility import get_cenv +import core.utility as utility + +srun_path = "srun" # "/usr/local_host/bin/srun" +sbatch_path = "sbatch" # "/usr/local_host/bin/sbatch" diff --git a/core/modes/slurm/__init__.py b/core/modes/slurm/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7094e23db6a04eff3b8e8390e2c6d90f0b2a8924 --- /dev/null +++ b/core/modes/slurm/__init__.py @@ -0,0 +1,11 @@ +from .srun import Slurm +from .shared import SingleSlurmJobAcrossStages +from .batch import ( + Sbatch, + Batch, + Default, +) +from .singularity import ( + Singularity, + Singularity_Batch, +) diff --git a/core/modes/slurm/batch.py b/core/modes/slurm/batch.py new file mode 100644 index 0000000000000000000000000000000000000000..5a44485d5a9894768e33dcad83cc27e26e99f7ed --- /dev/null +++ b/core/modes/slurm/batch.py @@ -0,0 +1,135 @@ +from core.modes.common import * +from .srun import Slurm +from core.modes.base import ModeBase +from ...utility.executor import async_process + + +def get_batch_properties(self, batch_script): + # cmd_out = "" + if self.job.down_scoping: + stdout = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", + wrapper_add=f"/usr/bin/cat {batch_script}") + cmd_out = ' '.join([l.split()[1] for l in stdout.split("\n") if l.startswith('#SBATCH')]) + else: + with open(batch_script, 'r') as f: + cmd_out = ' '.join([l.split()[1] for l in f.readlines() if l.startswith('#SBATCH')]) + return cmd_out + + +class Sbatch(Slurm, ABC): + def __init__(self, job): + Slurm.__init__(self, job) + self.slurm_output_dir = f'{self.job.clone_path}/slurm_output' + self.slurm_output_file = f'{self.slurm_output_dir}/so_{self.job.jobid}.txt' + + def get_run_parameters(self): + batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}' + parameters = f'--wait {get_batch_properties(self, batch_script)} {self.job.get_parameters()} ' \ + f'--output={self.slurm_output_file}' + return parameters + + def custom_run_setup(self, **kwargs): + if kwargs["main_script"] and kwargs["run_async"]: + logging.debug('Creating so file') + if self.job.down_scoping: + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", + wrapper_add=f"/usr/bin/mkdir -p {self.slurm_output_dir}") + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", + wrapper_add=f"/usr/bin/touch {self.slurm_output_file}") + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", + wrapper_add=f"/usr/bin/chmod o+r {self.slurm_output_file}") + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + wrapper_add=f"/usr/bin/cp /dev/stdin " + f"{self.job.build_path}/chmodPath{self.job.jobid}.sh", + script=f"{self.job.runner_path}/core/scripts/chmodPath.sh") + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/execHelper.sh", + wrapper_add=f"{self.job.shell_path} " + f"{self.job.build_path}/chmodPath{self.job.jobid}.sh", + script=f"{self.slurm_output_dir}") + else: + os.makedirs(self.slurm_output_dir, exist_ok=True) + os.system(f'touch {self.slurm_output_file}') + + def inbetween_processing(self): + async_process(self.slurm_output_file) + + def run_main_script(self): + self.executor.run_batched(params=self.get_run_parameters(), + script=self.get_run_script()) + + def get_post_run_script(self): + return Slurm.get_run_script(self) + + def get_post_run_parameters(self): + return Slurm.get_run_parameters(self) + + def get_post_run_wrapper(self): + return Slurm.get_run_wrapper(self) + + def run_post_script(self): + self.executor.run_direct(params=self.get_post_run_parameters(), wrapper_add=self.get_post_run_wrapper(), + script=self.get_post_run_script()) + + def get_run_script(self): + tmp = os.getenv("TMP") + with open(f'{self.job.scripts_path}/batchWrapper.sh', 'r') as file: + filedata = file.read() + filedata = filedata.replace('replaceme', f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}') + with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file: + file.write(filedata) + return f'{tmp}/wrapper{self.job.jobid}' + + +class Batch(Slurm, ABC): + + def __init__(self, job): + Slurm.__init__(self, job, ) + + def get_run_parameters(self): + batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}' + parameters = ' '.join([f'{srun_path}', get_batch_properties(self, batch_script)]) + print('Warning: The contents of the script section in the CI definition ' + 'will be used as a post-processing script in the batch mode.') + return parameters + + def get_run_wrapper(self): + return f'{self.job.shell_path}' + + def get_run_script(self): + return f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}' + + def get_post_run_script(self): + return Slurm.get_run_script(self) + + def get_post_run_parameters(self): + return Slurm.get_run_parameters(self) + + def get_post_run_wrapper(self): + return Slurm.get_run_wrapper(self) + + def run_post_script(self): + self.executor.run_direct(params=self.get_post_run_parameters(), wrapper_add=self.get_post_run_wrapper(), + script=self.get_post_run_script()) + + +class Default(Sbatch, ABC): + + def get_run_parameters(self): + parameter_string = self.job.get_parameters() + parameters = f'--wait --output={self.slurm_output_file} {parameter_string}' + return parameters + + def get_run_script(self): + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + wrapper_add=f"/usr/bin/cp /dev/stdin {self.job.clone_path}/script.", + script=f"{self.job.exec_script}") + tmp = os.getenv("TMP") + with open(f'{self.job.scripts_path}/batchWrapper.sh', 'r') as file: + filedata = file.read() + filedata = filedata.replace('replaceme', f'{self.job.clone_path}/script.') + with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file: + file.write(filedata) + return f'{tmp}/wrapper{self.job.jobid}' + + def run_post_script(self): + pass diff --git a/core/modes/slurm/shared.py b/core/modes/slurm/shared.py new file mode 100644 index 0000000000000000000000000000000000000000..52fe507b47892e5b59a4f981c2bb9c0610aef084 --- /dev/null +++ b/core/modes/slurm/shared.py @@ -0,0 +1,98 @@ +from core.modes.common import * +from core.modes.base import ModeBase +from .srun import Slurm + + +class SingleSlurmJobAcrossStages(Slurm, ABC): + + def get_jobid_from_file(self, path): + with open(path, 'r') as node_index_fp: + return node_index_fp.readline().strip() + + @staticmethod + def get_node_id_str(variables): + if variables and len(variables): + return ''.join([os.getenv(v).replace('/', '') for v in variables]) + return 'Sequential' + + @staticmethod + def get_env_for_single_slurm_job(variables): + return {v.replace('CUSTOM_ENV_PARVAR_', ''): os.getenv(v) for v in variables} + + def __init__(self, job): + Slurm.__init__(self, job) + self.slurm_job_id = None + self.id_vars = [v for v in os.environ if v.startswith('CUSTOM_ENV_PARVAR_')] + os.makedirs(f'{self.job.shared_tmp}/SlurmJobIds', exist_ok=True) + os.makedirs(f'{self.job.tmp_dir}/SlurmJobIds', exist_ok=True) + self.slurm_jobid_file = f'{self.job.shared_tmp}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt' + self.cc_slurm_jobid_file = f'{self.job.tmp_dir}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt' + self.tmp_dir = None + + def get_custom_config(self): + if not os.path.isfile(self.slurm_jobid_file): + params = ['--no-shell', f'--job-name=CI_{self.job.pipeline_id}'] + \ + self.job.get_parameters().split(" ") + salloc_out = self.executor.allocator(params=' '.join(params)) + logging.debug(f'run_properties salloc_out={salloc_out}') + self.slurm_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1) + with open(self.slurm_jobid_file, 'w+') as slurm_jobid_fp: + slurm_jobid_fp.write(self.slurm_job_id + '\n') + with open(self.cc_slurm_jobid_file, 'w+') as slurm_jobid_fp: + slurm_jobid_fp.write(self.slurm_job_id + '\n') + logging.info(f'Started new slurm_job_id={self.slurm_job_id}, could not find {self.slurm_jobid_file}') + else: + self.slurm_job_id = self.get_jobid_from_file(self.slurm_jobid_file) + logging.info(f'Using slurm_job_id={self.slurm_job_id}, could find {self.slurm_jobid_file}') + + tmp_dir_srun_out = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", + wrapper_add=f'--jobid {self.slurm_job_id} /usr/bin/printenv TMP').split('\n') + logging.debug(f'srun tmp_dir output (unfitered): {tmp_dir_srun_out}') + ignores = ['error', 'slurmstepd'] + self.tmp_dir = [x for x in tmp_dir_srun_out if all(s not in x for s in ignores)] + logging.debug(f'srun tmp_dir output: {self.tmp_dir}') + self.tmp_dir = self.tmp_dir[0] + + def get_run_parameters(self): + self.get_custom_config() + + parameters = Slurm.get_run_parameters(self) + additional_env = [] + for k, v in self.get_env_for_single_slurm_job(self.id_vars).items(): + additional_env.append(f"{k}={v}") + if len(additional_env) != 0: + parameters += f' --export=' + ",".join(additional_env) + parameters += f' --jobid={self.slurm_job_id}' + return parameters + + def get_simple_run_wrapper(self): + if self.job.args[1] == 'run': + return self.get_run_wrapper() + return Slurm.get_run_wrapper(self) + + def get_simple_run_parameters(self): + if self.job.args[1] == 'run': + return self.get_run_parameters() + return Slurm.get_simple_run_parameters(self) + + def cleanup(self): + if get_cenv('END_SINGLE_SLURM_JOB') == '1': + scancel_out = self.executor.cancel(f'{self.get_jobid_from_file(self.slurm_jobid_file)}') + logging.debug(f'cleanup res={scancel_out}') + try: + os.remove(self.slurm_jobid_file) + except FileNotFoundError: + pass + # Cleanup the directory with the jobIds of the whole pipeline + try: + os.rmdir(f'{self.job.shared_tmp}/SlurmJobIds') + except (FileNotFoundError, OSError): + pass + + try: + os.rmdir(f'{self.job.shared_tmp}') + except (FileNotFoundError, OSError): + pass + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", + wrapper_add=f'/usr/bin/rm {self.job.user_path}/wrapper{self.job.jobid}.sh') + ModeBase.cleanup(self) diff --git a/core/modes/slurm/singularity.py b/core/modes/slurm/singularity.py new file mode 100644 index 0000000000000000000000000000000000000000..918884da6c22ccbea8bb0057eef0128ea7d59f58 --- /dev/null +++ b/core/modes/slurm/singularity.py @@ -0,0 +1,115 @@ +from core.modes.common import * +from .srun import Slurm +from .batch import Default, Sbatch +from core.modes.base import ModeBase + + +class Singularity(Slurm, ABC): + container = '' + + def get_run_parameters(self): + parameters = Slurm.get_run_parameters(self) + # get container for singularity + self.container = get_cenv('CONTAINER') + if self.container is None: + ModeBase.abort(self, "Error: No container defined, use variable CONTAINER") + if os.path.exists(self.container): + self.container = f'{self.job.clone_path}/{self.container}' + # add singularity specific parameter + return parameters + f' --export=CONTAINER={self.container}' + + def get_run_wrapper(self): + # Generation of the singularity script within user space + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + wrapper_add=f'/usr/bin/cp /dev/stdin ' + f'{self.job.user_path}/script{self.job.jobid}.sh', + script=self.get_singularity_script()) + return Slurm.get_run_wrapper(self) + f" {self.job.user_path}/script{self.job.jobid}.sh" + + def cleanup(self): + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", + wrapper_add=f'/usr/bin/rm {self.job.user_path}/wrapper{self.job.jobid}.sh') + ModeBase.cleanup(self) + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", + wrapper_add=f'/usr/bin/rm {self.job.user_path}/script{self.job.jobid}.sh') + self.executor.cancel(self.slurm_simple_job_id) + + def get_singularity_script(self): + script = "" + # get container for singularity + self.container = get_cenv('CONTAINER') + if self.container is None: + ModeBase.abort(self, "Error: No container defined, use variable CONTAINER") + if os.path.exists(self.container): + script += f'{self.job.scripts_path}/singularityLocalRunstep.sh' + else: + script += f'{self.job.scripts_path}/singularityRunstep.sh' + return script + + +class Singularity_Batch(Default, ABC): + container = '' + + @staticmethod + def escape(s): + return s.replace('/', '\/') + + def custom_run_setup(self, **kwargs): + Sbatch.custom_run_setup(self, **kwargs) + if kwargs["main_script"] and kwargs["run_async"]: + logging.debug('Creating param file') + # write env_file + with open(f'{self.job.tmp_dir}/env{self.job.jobid}', 'w') as file: + file.write(f"CONTAINER={self.container}\0EXEC_WRAPPER={srun_path}\0PARAMS={self.job.get_parameters()}" + f"\0OUTPUT_FILE=--output={self.slurm_output_file}") + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + wrapper_add=f'/usr/bin/cp /dev/stdin ' + f'{self.slurm_output_dir}/batchEnv{self.job.jobid}', + script=f"{self.job.tmp_dir}/env{self.job.jobid}") + + def get_run_script(self): + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + wrapper_add=f'/usr/bin/cp /dev/stdin ' + f'{self.job.clone_path}/script{self.job.jobid}.sh', + script=f'{Slurm.get_run_script(self)}') + # Generation of the singularity script within user space + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + wrapper_add=f'/usr/bin/cp /dev/stdin ' + f'{self.job.clone_path}/singularity{self.job.jobid}.sh', + script=self.get_singularity_script()) + # move wrapper to user + tmp = os.getenv("TMP") + with open(f'{self.job.runner_path}/core/scripts/batchWrapper.sh', 'r') as file: + filedata = file.read() + filedata = filedata.replace('replaceme', + f'{self.job.clone_path}/singularity{self.job.jobid}.sh < ' + f'{self.job.clone_path}/script{self.job.jobid}.sh') + with open(f'{tmp}/wrapper{self.job.jobid}', 'w') as file: + file.write(filedata) + return f'{tmp}/wrapper{self.job.jobid}' + + def get_run_parameters(self): + parameters = Default.get_run_parameters(self) + # get container for singularity + self.container = get_cenv('CONTAINER') + if self.container is None: + ModeBase.abort(self, "Error: No container defined, use variable CONTAINER") + if os.path.exists(self.container): + self.container = f'{self.job.clone_path}/{self.container}' + return parameters + f' --export-file={self.slurm_output_dir}/batchEnv{self.job.jobid}' + + def cleanup(self): + ModeBase.cleanup(self) + self.executor.cancel(self.slurm_simple_job_id) + + def get_singularity_script(self): + script = "" + # get container for singularity + self.container = get_cenv('CONTAINER') + if self.container is None: + ModeBase.abort(self, "Error: No container defined, use variable CONTAINER") + if os.path.exists(self.container): + script += f'{self.job.scripts_path}/singularityLocalRunstep.sh' + else: + script += f'{self.job.scripts_path}/singularityRunstep.sh' + return script diff --git a/core/modes/slurm/srun.py b/core/modes/slurm/srun.py new file mode 100644 index 0000000000000000000000000000000000000000..b31c322f4c4e9b486c762e830db9157680979a75 --- /dev/null +++ b/core/modes/slurm/srun.py @@ -0,0 +1,71 @@ +from core.modes.common import * +from core.modes.base import ModeBase +from core.utility.executor import Slurm_Executor + + +class Slurm(ModeBase, ABC): + considered_env_prefixes = ['SLURM', 'SRUN', 'SALLOC'] + executor = None + + def custom_prepare(self): + # install gitlab runner if necessary + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", + wrapper_add=f'/usr/bin/mkdir -p {self.job.user_path}') + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + wrapper_add=f'/usr/bin/cp /dev/stdin ' + f'{self.job.user_path}/wrapper{self.job.jobid}.sh', + script=f"{self.job.scripts_path}/zshWrapper.sh") + + git_runner_command = [f'{self.job.shell_path} -l', + f"{self.job.user_path}/wrapper{self.job.jobid}.sh"] + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + wrapper_add=' '.join(git_runner_command), + script=f"{self.job.scripts_path}/runnerInstallHelper.sh", + install_env=True) + + @staticmethod + def env_copy(): + user = os.getenv("USER") + env = {k: v for k, v in os.environ.items() if not v.__contains__(user) or k == "PATH"} + export_env = "" + for k, v in env.items(): + export_env = f'{export_env}{k}="{v}",' + return export_env[:-1] + + def get_combiner_script(self): + return f"{self.job.scripts_path}/xPipeHelper.sh" + + def __init__(self, job): + ModeBase.__init__(self, job) + self.slurm_simple_job_id = None + self.executor = Slurm_Executor(job, job.down_scoping) + + def get_run_parameters(self): + return f'--job-name=CI_{self.job.jobid} ' + self.job.get_parameters() + + def get_run_wrapper(self): + return f' {self.job.shell_path} -l' + + def get_run_script(self): + return self.job.exec_script + + def get_simple_run_wrapper(self): + return f"{self.job.shell_path} -l {self.job.user_path}/wrapper{self.job.jobid}.sh" + + def run_main_script(self): + out = self.executor.run_direct(params=self.get_run_parameters(), wrapper_add=self.get_run_wrapper(), + script=self.get_run_script()) + print(out) + + def run_simple_script(self): + out = self.executor.management_handler(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + params=self.get_simple_run_parameters(), + wrapper_add=self.get_simple_run_wrapper(), + script=self.get_simple_run_script()) + print(out) + + def cleanup(self): + self.executor.management_handler(helper_script=f"{self.job.scripts_path}/runHelper.sh", + wrapper_add=f'/usr/bin/rm {self.job.user_path}/wrapper{self.job.jobid}.sh') + ModeBase.cleanup(self) + self.executor.cancel(self.slurm_simple_job_id) diff --git a/core/modes/ssh.py b/core/modes/ssh.py new file mode 100644 index 0000000000000000000000000000000000000000..9fea7bd834ec119c0a962723cfa934fcc2e4cfa9 --- /dev/null +++ b/core/modes/ssh.py @@ -0,0 +1,38 @@ +from core.modes.common import * +from core.modes.base import ModeBase + + +class SSH(ModeBase): + def __init__(self, job): + ModeBase.__init__(self, job) + self.dest_node = get_cenv('CI_SSH_HOST') + if not self.dest_node: + ModeBase.abort(self, "Using ssh mode but no node specified. Specify: CI_SSH_HOST") + + def get_env_setup(self): + return f' {self.job.driver_path}/core/scripts/ssh.env ' + + def get_run_properties(self): + return f'ssh -T {self.dest_node}' + + # TODO: Move this to ModeBase (is same for Slurm, except Batch) + def get_run_script(self): + self._run_script = f'{self.job.exec_script}' + return self._run_script + + def get_combiner_script(self): + self._combiner_script = f"{self.job.driver_path}/core/scripts/xPipeHelper.sh" + return self._combiner_script + + def get_simple_script_exec(self): + return f"ssh -T {self.dest_node}" + + def cleanup(self): + ModeBase.cleanup(self) + + def get_custom_prepare(self): + pass + + @staticmethod + def has_post_run_script(): + return False diff --git a/core/scripts/zshWrapper.sh b/core/scripts/zshWrapper.sh deleted file mode 100755 index a0d93f7032c8e21e62e78dc80e8ddcf1f1bbd7e2..0000000000000000000000000000000000000000 --- a/core/scripts/zshWrapper.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/local_rwth/bin/zsh - -source $HOME/.zshrc - -zsh \ No newline at end of file diff --git a/core/utility/defines.py b/core/utility/defines.py index b2a5a6603edaa295a1fb57b29a7f4491572a5e8d..44ff909ec0753be44813b6a07bfcc6984353ab46 100644 --- a/core/utility/defines.py +++ b/core/utility/defines.py @@ -1,7 +1,7 @@ import logging name = 'Aixcellenz CI Driver' -version = '0.3.1' +version = '0.5.0' debug = False -stdout_logging = True +stdout_logging = False log_level = logging.DEBUG diff --git a/core/utility/executor.py b/core/utility/executor.py new file mode 100644 index 0000000000000000000000000000000000000000..ca207a85e3144754f523a059c2955fa2e2711719 --- /dev/null +++ b/core/utility/executor.py @@ -0,0 +1,194 @@ +from core.modes.common import * + +from core import utility + + +def async_process(file): + return subprocess.Popen(f'tail -F {file}'.split()) + + +class Executor(ABC): + downscope_add = "" + + simple_job_id = "" + + def __init__(self, job, downscope=False): + self.job = job + if downscope: + self.downscope_add = f"sudo -u {self.job.account}" + + def set_simple_job_id(self, job_id): + self.simple_job_id = job_id + + # Allocates a batch job with optional user specifications and returns the string + @abstractmethod + def allocator(self, params=""): + pass + + # Executes internal management functions, e.g., setup scripts etc. + @abstractmethod + def management_handler(self, params="", wrapper_add="", script=""): + pass + + # Cancels a batch job based on its id + @abstractmethod + def cancel(self, jobid): + pass + + # runs a script in the batch system with direct output + @abstractmethod + def run_direct(self, params="", wrapper_add="", script=""): + pass + + # runs a script without direct output, e.g., a batch script or multinode jobs + @abstractmethod + def run_batched(self, params="", wrapper_add="", script=""): + pass + + def execute(self, helper_script='', allocator='', params='', wrapper_add='', + target_script='', skip_env=False, run_async=False, main_script=False, install_env=False, **kwargs): + if main_script: + self.job.mode.custom_run_setup(main_script=main_script, run_async=run_async, **kwargs) + logging.info(f'Executing with env: {str(self.job.custom_env)}') + else: + params += (f' --jobid={self.job.mode.slurm_simple_job_id} ' if self.job.mode.slurm_simple_job_id else ' ') + if install_env: + params += f' --export=CUSTOM_SHELL_CONFIG={self.job.shell_config}' + command = [helper_script, f'{self.downscope_add} {allocator} {params} {wrapper_add}', + f'{target_script}'] + logging.info(f'Executing command: {str(command)}') + os.chdir('/tmp') + main_proc = subprocess.Popen(command, + env=(dict(os.environ, **{x: self.job.custom_env[x] + for x in self.job.custom_env}) if not skip_env + else os.environ), **kwargs) + side_proc = None + if main_script and run_async: + logging.debug(f'Starting inbetween processing') + logging.debug(f'Reading from File: {self.job.mode.slurm_output_file}') + side_proc = async_process(self.job.mode.slurm_output_file) + time.sleep(1) + stdout, stderr = main_proc.communicate() + logging.debug(f'Finished main processing {main_proc.pid}') + if main_script and run_async and side_proc: + logging.debug(f'Terminating side_proc {side_proc.pid}') + side_proc.terminate() + logging.debug(f'Terminated side_proc {side_proc.pid}') + job_was_canceled = get_cenv('CI_JOB_STATUS') == 'canceled' + cmd_return_code = 1 if (int(main_proc.returncode) != 0 or job_was_canceled) else 0 + if main_script and (job_was_canceled or not self.job.allow_failure): + utility.update_json_kv(self.job.error_code_file, self.job.jobid, cmd_return_code) + if main_script and cmd_return_code != 0: + if self.job.mode and not self.job.allow_failure: + self.job.mode.cleanup_on_failure() + sys.exit(cmd_return_code) + # FIXME: do not rely on internal implementation of subprocess.run + return subprocess.CompletedProcess(main_proc.args, main_proc.returncode, stdout, stderr) + + +class Slurm_Executor(Executor, ABC): + srun_path = "srun --export=NONE" # "/usr/local_host/bin/srun" + sbatch_path = "sbatch" # "/usr/local_host/bin/sbatch" + salloc_path = "/opt/slurm/current/bin/salloc" + scancel_path = "scancel" + + def _get_slurm_cmd(self, base): + add_args = '' + constraint = get_cenv("SLURM_CONSTRAINT") + if not constraint: + constraint = get_cenv("SALLOC_CONSTRAINT") + + if constraint: + add_args += f' --constraint={constraint}' + + return f'{base}{add_args}' + + def get_srun_cmd(self): + return self._get_slurm_cmd(self.srun_path) + + def get_sbatch_cmd(self): + return self._get_slurm_cmd(self.sbatch_path) + + def get_salloc_cmd(self): + return self._get_slurm_cmd(self.salloc_path) + + def get_scancel_cmd(self): + return self._get_slurm_cmd(self.scancel_path) + + def __init__(self, job, downscope=False): + Executor.__init__(self, job, downscope=downscope) + + def set_internal_slurm_job(self): + if self.simple_job_id: + return + + try: + with open(f"{self.job.runner_path}/SlurmIDMapping.json", "r") as file: + mapping = json.loads(file.read()) + self.simple_job_id = mapping[get_cenv("CI_JOB_ID")] + return + except (IOError, KeyError): + self.simple_job_id = None + logging.warning(f'Could not read internal Slurm jobid from mapping file') + + salloc_out = self.allocator(params=' '.join(['--cpus-per-task=1', '--ntasks=1', '--no-shell', + f'--job-name=CI_{self.job.pipeline_id}'])) + try: + self.simple_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1) + logging.info(f'Using internal Slurm jobid {self.simple_job_id}') + man.add_id_mapping(f"{self.job.runner_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"), + self.simple_job_id) + except AttributeError: + self.job.mode.abort(f'Could not allocate a Slurm job for internal usage') + + def allocator(self, params=""): + logging.debug(f'allocating job for pipeline {self.job.pipeline_id}') + salloc_out = self.execute(helper_script=f"{self.job.scripts_path}/runHelper.sh", + allocator=self.get_salloc_cmd(), + params=params, + text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + #print(salloc_out) + logging.debug(f' salloc output: {salloc_out}') + return salloc_out + + def management_handler(self, helper_script="", params="", wrapper_add="", script="", install_env=False): + if helper_script == '': + helper_script = f"{self.job.scripts_path}/runHelper.sh" + + self.set_internal_slurm_job() + management_out = self.execute(helper_script=helper_script, + allocator=self.get_srun_cmd(), + params=f"--jobid={self.simple_job_id} " + params, + wrapper_add=wrapper_add, + target_script=script, install_env=install_env, + text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + logging.debug(management_out) + return management_out + + def cancel(self, jobid): + scancel_out = self.execute(helper_script=f"{self.job.scripts_path}/runHelper.sh", + allocator=self.get_scancel_cmd(), + params=f'{jobid}', + text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + logging.debug(f'scancel output: {scancel_out}') + return scancel_out + + def run_direct(self, params="", wrapper_add="", script=""): + srun_out = self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + allocator=self.get_srun_cmd(), + params=params, + target_script=script, + wrapper_add=wrapper_add, main_script=True, + text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + logging.debug(srun_out) + return srun_out + + def run_batched(self, params="", wrapper_add="", script=""): + sbatch_out = self.execute(helper_script=f"{self.job.scripts_path}/xPipeHelper.sh", + allocator=self.get_sbatch_cmd(), + params=params, + wrapper_add=wrapper_add, + target_script=script, + main_script=True, run_async=True, + text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + logging.debug(f'sbatch output: {sbatch_out}') diff --git a/driver.py b/driver.py index aec82b7a95a39d45235bcbcaffc31feb0e1b23b0..28e147f635d5d9abf97ada4c80a655d82178f442 100644 --- a/driver.py +++ b/driver.py @@ -1,17 +1,9 @@ import sys -import os from core.job import Job -argv = sys.argv -account = "" -user_path = "" -runner_path = "" -down_scoping = True - -# handle() -if len(argv) < 2: +if len(sys.argv) < 2: print("Error: no argument") - exit(1) -myJob = Job(argv, os.path.abspath(os.path.dirname(__file__))) + sys.exit(1) +myJob = Job(sys.argv) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..9be7a2eea70d49b2b0b4721c57fe5891fcc1c67a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +rsa +PyJWT +cryptography +pycryptodome +filelock diff --git a/settings/__init__.py b/settings/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/.localTemplate.yml b/utility/.gitlab/.localTemplate.yml similarity index 95% rename from .localTemplate.yml rename to utility/.gitlab/.localTemplate.yml index 9196f8a38bebf665abd00aa824f3201e0725fca9..b2533bfa3f4bf1368fca0c7a0911fae4fe332f88 100644 --- a/.localTemplate.yml +++ b/utility/.gitlab/.localTemplate.yml @@ -2,7 +2,7 @@ variables: CI_MODE: "SSH" -ssh-build-job: +.ssh-build-job: extends: .ssh-job stage: build script: diff --git a/.template.yml b/utility/.gitlab/.template.yml similarity index 98% rename from .template.yml rename to utility/.gitlab/.template.yml index e0a763ff40ee22c02f9753c581b43d85d7c5fb5d..687db5a8ffd956a395777fc6beacee2afd8e3dba 100644 --- a/.template.yml +++ b/utility/.gitlab/.template.yml @@ -229,15 +229,16 @@ batch-job: # This job runs in the build stage, which runs first. stage: unit-test variables: CI_MODE: "Batch" - BATCH_SCRIPT: "batch.sh" + BATCH_SCRIPT: "utility/.gitlab/batch.sh" script: - echo "I do nothing" + allow_failure: true sbatch-job: # This job runs in the build stage, which runs first. stage: unit-test variables: CI_MODE: "Sbatch" - BATCH_SCRIPT: "sbatch.sh" + BATCH_SCRIPT: "utility/.gitlab/sbatch.sh" script: - echo "I do nothing" diff --git a/batch.sh b/utility/.gitlab/batch.sh similarity index 91% rename from batch.sh rename to utility/.gitlab/batch.sh index 1bd8bea3537c201d9fe7384c37c495ae1040b2ce..7d0b762e41dfa7d54eaae45d6987bd1c3414c0a2 100755 --- a/batch.sh +++ b/utility/.gitlab/batch.sh @@ -5,7 +5,7 @@ ###SBATCH --reservation=<advanced-reservation-id> ### File / path where STDOUT will be written, the %j is the job id -###SBATCH --output=$HOME/out.txt +###SBATCH --output=$PWD/out.txt ### Request time and memory #SBATCH --time=00:05:00 diff --git a/sbatch.sh b/utility/.gitlab/sbatch.sh similarity index 100% rename from sbatch.sh rename to utility/.gitlab/sbatch.sh diff --git a/utility/install/build_static_driver.sh b/utility/install/build_static_driver.sh new file mode 100755 index 0000000000000000000000000000000000000000..fa884abfbe4cfc4b2c09da3bd77b3ea0981dd467 --- /dev/null +++ b/utility/install/build_static_driver.sh @@ -0,0 +1,19 @@ +#!/usr/bin/bash + +EXE_NAME=driver +: "${TMP:=/tmp}" +WORKPATH=$(mktemp -d -p "${TMP}") + +function cleanup() +{ + rm -r $WORKPATH dist ${EXE_NAME}.spec +} + +NAME='static_${EXE_NAME}.exe' +if [ "$#" -ge 1 ]; then + NAME="$1" +fi + +pyinstaller --onefile --clean --strip --workpath $WORKPATH ${STATIC_ADD_DATA} ${EXE_NAME}.py +staticx dist/$EXE_NAME $NAME +trap cleanup EXIT TERM INT HUP diff --git a/utility/install/requirements-static.txt b/utility/install/requirements-static.txt new file mode 100644 index 0000000000000000000000000000000000000000..732cea4c7e2ee781df12ede3ec70c07917b84b43 --- /dev/null +++ b/utility/install/requirements-static.txt @@ -0,0 +1,5 @@ +pyinstaller +patchelf==0.18 +patchelf-wrapper +scons +staticx diff --git a/core/scripts/batch.env b/utility/runtime/scripts/batch.env similarity index 100% rename from core/scripts/batch.env rename to utility/runtime/scripts/batch.env diff --git a/core/scripts/batchWrapper.sh b/utility/runtime/scripts/batchWrapper.sh similarity index 100% rename from core/scripts/batchWrapper.sh rename to utility/runtime/scripts/batchWrapper.sh diff --git a/core/scripts/chmodPath.sh b/utility/runtime/scripts/chmodPath.sh similarity index 100% rename from core/scripts/chmodPath.sh rename to utility/runtime/scripts/chmodPath.sh diff --git a/core/scripts/execHelper.sh b/utility/runtime/scripts/execHelper.sh similarity index 100% rename from core/scripts/execHelper.sh rename to utility/runtime/scripts/execHelper.sh diff --git a/core/scripts/pipeHelper.sh b/utility/runtime/scripts/pipeHelper.sh similarity index 100% rename from core/scripts/pipeHelper.sh rename to utility/runtime/scripts/pipeHelper.sh diff --git a/core/scripts/runHelper.sh b/utility/runtime/scripts/runHelper.sh similarity index 100% rename from core/scripts/runHelper.sh rename to utility/runtime/scripts/runHelper.sh diff --git a/core/scripts/runnerInstallHelper.sh b/utility/runtime/scripts/runnerInstallHelper.sh similarity index 92% rename from core/scripts/runnerInstallHelper.sh rename to utility/runtime/scripts/runnerInstallHelper.sh index 5f0bf0c5121f8ac93786d42b0f1a0bbc90cc5cd8..43221b3ebcddea5e62633b6963678a899186ea03 100755 --- a/core/scripts/runnerInstallHelper.sh +++ b/utility/runtime/scripts/runnerInstallHelper.sh @@ -1,4 +1,4 @@ -#!/usr/local_rwth/bin/zsh +#!/usr/bin/zsh source $HOME/.zshrc diff --git a/core/scripts/singularityLocalRunstep.sh b/utility/runtime/scripts/singularityLocalRunstep.sh similarity index 63% rename from core/scripts/singularityLocalRunstep.sh rename to utility/runtime/scripts/singularityLocalRunstep.sh index 3481154b29f1e8e233029952a83a9aa5a180a80f..7fa22844925ccfad8faad65d13f122617bc42f8a 100755 --- a/core/scripts/singularityLocalRunstep.sh +++ b/utility/runtime/scripts/singularityLocalRunstep.sh @@ -1,5 +1,5 @@ #!/bin/bash -module load CONTAINERS +module load $CONTAINER -$EXEC_WRAPPER $PARAMS $OUTPUT_FILE singularity shell -B /work:/work --nv $CONTAINER +$EXEC_WRAPPER $PARAMS $OUTPUT_FILE singularity shell -B /work:/work --nv $R_CONTAINER diff --git a/core/scripts/singularityRunstep.sh b/utility/runtime/scripts/singularityRunstep.sh similarity index 83% rename from core/scripts/singularityRunstep.sh rename to utility/runtime/scripts/singularityRunstep.sh index dc0e6d5cdddfc70bbdfd4b5d253c63fca54d8337..7fa22844925ccfad8faad65d13f122617bc42f8a 100755 --- a/core/scripts/singularityRunstep.sh +++ b/utility/runtime/scripts/singularityRunstep.sh @@ -1,7 +1,5 @@ #!/bin/bash -module load CONTAINERS - module load $CONTAINER $EXEC_WRAPPER $PARAMS $OUTPUT_FILE singularity shell -B /work:/work --nv $R_CONTAINER diff --git a/core/scripts/xPipeHelper.sh b/utility/runtime/scripts/xPipeHelper.sh similarity index 100% rename from core/scripts/xPipeHelper.sh rename to utility/runtime/scripts/xPipeHelper.sh diff --git a/utility/runtime/scripts/zshWrapper.sh b/utility/runtime/scripts/zshWrapper.sh new file mode 100755 index 0000000000000000000000000000000000000000..029328c7c4bebeee375786e66e712f837fea4599 --- /dev/null +++ b/utility/runtime/scripts/zshWrapper.sh @@ -0,0 +1,5 @@ +#!/usr/bin/zsh + +source $HOME/.zshrc + +zsh \ No newline at end of file