diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f98418cf7059453e8a17a0c944c4e33080b87305..16f78ec2a66ad0ce09698a907089c1ca9cb8f7b0 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -23,16 +23,17 @@ stages: # List of stages for jobs, and their order of execution variables: - SLURM_TIME: "00:05:00" - SLURM_PARTITION: "c18m_low" + SLURM_ENV_SLURM_TIMELIMIT: "00:05:00" + SLURM_ENV_SLURM_PARTITION: "c18m_low" + SLURM_ENV_SRUN_CPUS_PER_TASK: "1" GIT_STRATEGY: clone default: - tags: ["custom"] + tags: ["downscope"] .parallel-job: variables: - SLURM_CPUS_PER_TASK: "6" + SLURM_ENV_SRUN_CPUS_PER_TASK: "2" CI_MODE: "SingleSlurmJobAcrossStages" parallel: matrix: @@ -41,12 +42,12 @@ default: .sequential-job: variables: - SLURM_CPUS_PER_TASK: "1" + SLURM_ENV_SRUN_CPUS_PER_TASK: "1" CI_MODE: "Slurm" .shared-sequential-job: variables: - SLURM_CPUS_PER_TASK: "1" + SLURM_ENV_SRUN_CPUS_PER_TASK: "1" CI_MODE: "SingleSlurmJobAcrossStages" @@ -55,6 +56,7 @@ parallel-build-job: # This job runs in the build stage, which runs first. stage: build variables: BEGIN_SINGLE_SLURM_JOB: "1" + SLURM_ENV_SLURM_TIMELIMIT: "00:15:00" script: - echo "JOBID ${SLURM_JOB_ID}" - echo "NODELIST ${SLURM_JOB_NODELIST}" @@ -147,18 +149,20 @@ shared-deploy-job: fail-exit-code-job-Singularity: # This job runs in the build stage, which runs first. stage: fail # MUST FAIL variables: - SLURM_CPUS_PER_TASK: "1" CI_MODE: "Singularity" CONTAINER: "tensorflow" script: - - cd rewagha + - which tensorboard + - hostname + - whoami + - echo $PATH + - cd eorwgkp allow_failure: true fail-timeout-job-Singularity: # This job runs in the build stage, which runs first. stage: fail # MUST FAIL variables: - SLURM_CPUS_PER_TASK: "1" - SLURM_TIME: "00:01:00" + SLURM_ENV_SLURM_TIMELIMIT: "00:01:00" CI_MODE: "Singularity" CONTAINER: "tensorflow" script: @@ -168,8 +172,6 @@ fail-timeout-job-Singularity: # This job runs in the build stage, which ru fail-exit-code-job: # This job runs in the build stage, which runs first. stage: fail # MUST FAIL - variables: - SLURM_CPUS_PER_TASK: "1" script: - cd rewagha allow_failure: true @@ -177,8 +179,7 @@ fail-exit-code-job: # This job runs in the build stage, which runs first. fail-timeout-job: # This job runs in the build stage, which runs first. stage: fail # MUST FAIL variables: - SLURM_CPUS_PER_TASK: "1" - SLURM_TIME: "00:01:00" + SLURM_ENV_SLURM_TIMELIMIT: "00:01:00" script: - echo "Compiling the code..." - sleep 1200 @@ -187,7 +188,7 @@ fail-timeout-job: # This job runs in the build stage, which runs first. build-job: # This job runs in the build stage, which runs first. stage: build variables: - SLURM_CPUS_PER_TASK: "2" + SLURM_ENV_SRUN_CPUS_PER_TASK: "2" script: - echo "Compiling the code..." - echo "Compile complete." @@ -195,15 +196,17 @@ build-job: # This job runs in the build stage, which runs first. build-job-Singularity: # This job runs in the build stage, which runs first. stage: build variables: - SLURM_CPUS_PER_TASK: "2" + SLURM_ENV_SRUN_CPUS_PER_TASK: "2" CI_MODE: "Singularity" CONTAINER: "tensorflow" script: - - echo "Compiling the code..." - - echo "Compile complete." + - which tensorboard + - hostname + - whoami + - echo $PATH batch-job: # This job runs in the build stage, which runs first. - stage: build + stage: unit-test variables: CI_MODE: "Batch" BATCH_SCRIPT: "batch.sh" @@ -222,7 +225,7 @@ fail-batch-job: # This job runs in the build stage, which runs first. unit-test-job: # This job runs in the test stage. stage: test # It only starts when the job in the build stage completes successfully. variables: - SLURM_CPUS_PER_TASK: "4" + SLURM_ENV_SRUN_CPUS_PER_TASK: "4" script: - echo "Running unit tests... This will take about 60 seconds." - sleep 60 @@ -231,7 +234,7 @@ unit-test-job: # This job runs in the test stage. lint-test-job: # This job also runs in the test stage. stage: test # It can run at the same time as unit-test-job (in parallel). variables: - SLURM_CPUS_PER_TASK: "8" + SLURM_ENV_SRUN_CPUS_PER_TASK: "8" script: - echo "Linting code... This will take about 10 seconds." - sleep 10 @@ -266,7 +269,6 @@ deploy-job: # This job runs in the deploy stage. stage: deploy # It only runs when *both* jobs in the test stage complete successfully. script: - zip -r Runner.zip driver.py core/ JSONAccountManager.py - - zip -d Runner.zip core/authentication/JSONTest.py artifacts: paths: - Runner.zip diff --git a/Installer.py b/Installer.py index 81dc7c0637af745052ab41f685435fc1939c1f52..d3d281a09eda71ddb39310cb86fff537628fe8db 100644 --- a/Installer.py +++ b/Installer.py @@ -1,5 +1,7 @@ -import argparse, os +import argparse import json +import os + import rsa mainpath = os.getcwd() @@ -68,12 +70,12 @@ def CLI(): "default": "id_rsa.pub", "help": "The path where the key for encrypting the mapping is stored." }, - { "flags": ["-cc", "--concurrency"], - "type": int, - "metavar": "<name>", - "default": "100", - "help": "Defines how many concurrent runners shall be available." - }, + {"flags": ["-cc", "--concurrency"], + "type": int, + "metavar": "<name>", + "default": "100", + "help": "Defines how many concurrent runners shall be available." + }, { "flags": ["-mp", "--mapping-Path"], "type": str, @@ -107,6 +109,20 @@ def CLI(): "flags": ["-rg", "--register"], "action": "store_true", "help": "Register a new runner with the same config." + }, + { + "flags": ["-aes", "--aes-key-path"], + "type": str, + "metavar": "<name>", + "default": "Keyfile", + "help": "The path to the aes key file." + }, + { + "flags": ["-s", "--shell-installation"], + "type": str, + "metavar": "<name>", + "default": "/usr/bin/env bash", + "help": "The path to the preferred shell to be used." } ] @@ -116,18 +132,13 @@ def CLI(): if p.get("action"): parser.add_argument( *p.get("flags"), - required=p.get("required"), default=p.get("default"), - action=p.get("action"), help=p.get("help") ) else: parser.add_argument( *p.get("flags"), - required=p.get("required"), - type=p.get("type"), choices=p.get("choices"), - nargs=p.get("nargs"), default=p.get("default"), const=p.get("const"), metavar=p.get("metavar"), @@ -136,13 +147,15 @@ def CLI(): return parser.parse_args() + # create initial keys def create_keys(priv_key_path, pub_key_path): - (pubkey, privkey) = rsa.newkeys(2048) + (pubkey, private_key) = rsa.newkeys(2048) with open(f"{pub_key_path}", "w") as text_file: text_file.write(pubkey.save_pkcs1().decode('ascii')) with open(f"{priv_key_path}", "w") as text_file: - text_file.write(privkey.save_pkcs1().decode('ascii')) + text_file.write(private_key.save_pkcs1().decode('ascii')) + # create initial mapping def create_mapping(priv_key_path, pub_key_path, map_path): @@ -150,6 +163,7 @@ def create_mapping(priv_key_path, pub_key_path, map_path): with open(map_path, "w") as text_file: text_file.write('') + # Perform the initial installation of the Gitlab runner def doInstall(Token, Url, Name, Tags, install_Path, shell_file): install_Path = install_Path.replace("$HOME", os.getenv("HOME")) @@ -160,6 +174,9 @@ def doInstall(Token, Url, Name, Tags, install_Path, shell_file): os.system("pip install rsa") os.system("pip install PyJWT") + os.system("pip install cryptography") + os.system("pip install pycryptodome") + os.system("pip install filelock") # Download GitLab Runner os.system("curl -LJO \"https://gitlab-runner-downloads.s3.amazonaws.com/latest/rpm/gitlab-runner_amd64.rpm\"") @@ -173,19 +190,23 @@ def doInstall(Token, Url, Name, Tags, install_Path, shell_file): # Register Gitlab Runner os.system( f'./usr/bin/gitlab-runner register --non-interactive --url "{Url}" --registration-token "{Token}"\ - --executor "custom" --description "{Name}" --tag-list "{Tags}" --run-untagged="false" --locked="false" --access-level="not_protected"\ + --executor "custom" --description "{Name}" --tag-list "{Tags}" --run-untagged="false" --locked="false" ' + f'--access-level="not_protected"\ --custom-prepare-exec="{install_Path}/main.sh" --custom-config-exec="{install_Path}/main.sh"\ --custom-run-exec="{install_Path}/main.sh" --custom-cleanup-exec="{install_Path}/main.sh"\ --custom-prepare-args="prepare" --custom-config-args="config"\ --custom-run-args="run" --custom-cleanup-args="cleanup"') - generateDriverConfig(install_Path, args.downscope, args.key_Path, args.mapping_Path, args.user_Path) + generateDriverConfig(install_Path, args.downscope, args.key_Path, args.mapping_Path, args.user_Path, + args.aes_key_path, args.shell_installation) generateRunnerMain(install_Path) os.system(f"sed -i 's/concurrent = 1/concurrent = {args.concurrency}/g' $HOME/.gitlab-runner/config.toml") # Download driver - os.system('curl -LJO "https://git-ce.rwth-aachen.de/adrianschmitz2/runner-mirror/-/jobs/artifacts/main/raw/Runner.zip?job=deploy-job"') + os.system( + 'curl -LJO "https://git-ce.rwth-aachen.de/adrianschmitz2/runner-mirror/-/jobs/artifacts/main/raw/' + 'Runner.zip?job=deploy-job"') os.system(f'unzip {install_Path}/Runner.zip') # create mapping for downscoping @@ -203,7 +224,7 @@ def doInstall(Token, Url, Name, Tags, install_Path, shell_file): setRunstepDependencies(install_Path) # Start Runner - #os.system("screen -dm ./usr/bin/gitlab-runner run") + # os.system("screen -dm ./usr/bin/gitlab-runner run") def add_registration(install_Path, Url, Token, Name, Tags): @@ -222,17 +243,18 @@ def add_registration(install_Path, Url, Token, Name, Tags): def doUninstall(install_Path): install_Path = install_Path.replace("$HOME", os.getenv("HOME")) # Directory Setup - #os.system("pkill screen") + # os.system("pkill screen") os.system(f"rm -rf {install_Path}") + # Update the GitLab runner def doUpdate(install_Path): install_Path = install_Path.replace("$HOME", os.getenv("HOME")) # Directory Setup os.chdir(f"{install_Path}") - #os.system("pkill screen") + # os.system("pkill screen") os.system("curl -LJO \"https://gitlab-runner-downloads.s3.amazonaws.com/latest/rpm/gitlab-runner_amd64.rpm\"") @@ -243,23 +265,22 @@ def doUpdate(install_Path): os.system("chmod +x usr/bin/gitlab-runner") # Download driver - os.system('curl -LJO "https://git-ce.rwth-aachen.de/adrianschmitz2/runner-mirror/-/jobs/artifacts/main/file/Runner.zip?job=deploy"') + os.system( + 'curl -LJO "https://git-ce.rwth-aachen.de/adrianschmitz2/runner-mirror/-/jobs/artifacts/main/file/' + 'Runner.zip?job=deploy"') os.system('unzip Runner.zip') # Restart Runner - #os.system("screen -dm ./usr/bin/gitlab-runner run") + # os.system("screen -dm ./usr/bin/gitlab-runner run") -# Generate the main path depending on the install path +# Generate the main path depending on the installation path def generateRunnerMain(install_Path): install_Path = install_Path.replace("$HOME", os.getenv("HOME")) if os.path.exists(f'{install_Path}/main.sh'): os.remove(f"{install_Path}/main.sh") file = open(f"{install_Path}/main.sh", "w") - main = [] - main.append("#!/bin/bash\n") - main.append("module load python\n") - main.append(f"python3 {install_Path}/driver.py $@\n") + main = ["#!/bin/bash\n", "module load python\n", f"python3 {install_Path}/driver.py $@\n"] file.writelines(main) file.close() os.system(f"chmod +x {install_Path}/main.sh") @@ -278,7 +299,7 @@ def setRunstepDependencies(install_Path): # Generate the config file for the custom driver -def generateDriverConfig(install_Path, downscope, key_path, mapping_path, user_path): +def generateDriverConfig(install_Path, downscope, key_path, mapping_path, user_path, aes_key_path, shell_installation): install_Path = install_Path.replace("$HOME", os.getenv("HOME")) config_json = {} @@ -302,14 +323,23 @@ def generateDriverConfig(install_Path, downscope, key_path, mapping_path, user_p # Define where the mapping is located, either relative to the $HOME directory of the # installing user or as an absolute path if mapping_path.startswith("/") or mapping_path.startswith("$"): - config_json["Map Path"] = f'{install_Path}/{mapping_path}' + config_json["Map Path"] = mapping_path else: config_json["Map Path"] = f'{install_Path}/{mapping_path}' + # Define where the AES key for encrypting the mapping file, either relative to the $HOME directory of the + # installing user or as an absolute path + if aes_key_path.startswith("/") or aes_key_path.startswith("$"): + config_json["AES Path"] = aes_key_path + else: + config_json["AES Path"] = f'{install_Path}/{aes_key_path}' + # Define where the build and cache paths shall be constructed, relative to the $WORK directory of the downscoped # user. config_json["User Path"] = user_path + # Define where the shell to run the generated CI scripts is located. + config_json["Shell Install"] = shell_installation # Create File if os.path.exists(f'{install_Path}/config.json'): @@ -318,6 +348,7 @@ def generateDriverConfig(install_Path, downscope, key_path, mapping_path, user_p json.dump(config_json, file) file.close() + def run(): global args args = CLI() @@ -343,12 +374,12 @@ def run(): exit(1) if args.install: - if args.registration_token == None or args.registration_url == None: + if args.registration_token is None or args.registration_url is None: print("registration token and registration url must be defined for installation!") exit(1) - doInstall(args.registration_token, args.registration_url, args.runner_name, args.tag_list, args.install_Path \ - , args.shell_file) + doInstall(args.registration_token, args.registration_url, args.runner_name, args.tag_list, args.install_Path, + args.shell_file) if args.remove: doUninstall(args.install_Path) @@ -357,6 +388,8 @@ def run(): doUpdate(args.install_Path) if args.register: - add_registration(args.install_Path, args.registration_url, args.registration_token, args.runner_name, args.tag_list) + add_registration(args.install_Path, args.registration_url, args.registration_token, args.runner_name, + args.tag_list) + run() diff --git a/JSONAccountManager.py b/JSONAccountManager.py index 05780450d4627ff6fd3de87ed94e41d736cfc201..ef618344022751b12a4f70ee75a8a5e5da12c628 100644 --- a/JSONAccountManager.py +++ b/JSONAccountManager.py @@ -1,8 +1,10 @@ import argparse -import json -import rsa +import os.path +import string +import random import os +import core.authentication.EncryptionManager as encrypt # Define Commandline interface def CLI(): @@ -75,6 +77,12 @@ def CLI(): "metavar": "<del>", "help": "The deletion date of the coupling." }, + { + "flags": ["-aes", "--aes-encryption-key-path"], + "type": str, + "metavar": "<aes>", + "help": "Path to the AES key." + }, ] parser = argparse.ArgumentParser() @@ -104,55 +112,22 @@ def CLI(): return parser.parse_args() -def load_priv_key(path): - path = os.path.join(os.path.dirname(__file__), path) - with open(path, mode='rb') as privatefile: - keydata = privatefile.read() - return rsa.PrivateKey.load_pkcs1(keydata) - - -def load_pub_key(path): - path = os.path.join(os.path.dirname(__file__), path) - with open(path, mode='rb') as pubfile: - keydata = pubfile.read() - return rsa.PublicKey.load_pkcs1(keydata) - - -def create_keys(priv_key_path, pub_key_path): - (pubkey, privkey) = rsa.newkeys(2048) - with open(f"{pub_key_path}", "w") as text_file: - text_file.write(pubkey.save_pkcs1().decode('ascii')) - with open(f"{priv_key_path}", "w") as text_file: - text_file.write(privkey.save_pkcs1().decode('ascii')) +def get_random_string(length: int) -> str: + # choose from all lowercase letter + letters = string.ascii_letters + string.digits + result_str = ''.join(random.choice(letters) for i in range(length)) + return result_str - -def create_mapping(priv_key_path, pub_key_path, map_path): - create_keys(priv_key_path, pub_key_path) +def create_mapping(priv_key_path, pub_key_path, map_path, AES_key): + encrypt.create_keys(priv_key_path, pub_key_path) with open(map_path, "w") as text_file: text_file.write('') + string = get_random_string(16) + encrypt.set_AES_key(string, AES_key, pub_key_path) -def read_mapping(key_path, map_path): - with open(map_path, mode='rb') as file: - file.seek(0) - data = file.read() - try: - json_file = rsa.decrypt(data, load_priv_key(key_path)) - search_file = json.loads(json_file) - return search_file - except: - return {} - - -def write_mapping(mapping, key_path, map_path): - json_file = json.dumps(mapping) - encrypted = rsa.encrypt(json_file.encode('ascii'), load_pub_key(key_path)) - with open(map_path, "wb") as text_file: - text_file.write(encrypted) - - -def add_user_account(url, uid, priv_key_path, pub_key_path, map_path, cluster_acc, delete_date): - mapping = read_mapping(priv_key_path, map_path) +def add_user_account(url, uid, priv_key_path, pub_key_path, map_path, cluster_acc, delete_date, AES_key): + mapping = encrypt.read_mapping(priv_key_path, map_path, AES_key) if url not in mapping: mapping[url] = {"uid": {}, "pid": {}} if uid in mapping[url]["uid"]: @@ -160,11 +135,11 @@ def add_user_account(url, uid, priv_key_path, pub_key_path, map_path, cluster_ac exit(1) uid_dict = {"acc": cluster_acc, "delete": delete_date} mapping[url]["uid"][uid] = uid_dict - write_mapping(mapping, pub_key_path, map_path) + encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key) -def add_project_account(url, pid, priv_key_path, pub_key_path, map_path, cluster_acc, delete_date): - mapping = read_mapping(priv_key_path, map_path) +def add_project_account(url, pid, priv_key_path, pub_key_path, map_path, cluster_acc, delete_date, AES_key): + mapping = encrypt.read_mapping(priv_key_path, map_path, AES_key) if url not in mapping: mapping[url] = {"uid": {}, "pid": {}} if pid in mapping[url]["pid"]: @@ -172,36 +147,37 @@ def add_project_account(url, pid, priv_key_path, pub_key_path, map_path, cluster exit(1) pid_dict = {"acc": cluster_acc, "delete": delete_date} mapping[url]["pid"][pid] = pid_dict - write_mapping(mapping, pub_key_path, map_path) + encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key) -def remove_user_account(url, uid, priv_key_path, pub_key_path, map_path): - mapping = read_mapping(priv_key_path, map_path) +def remove_user_account(url, uid, priv_key_path, pub_key_path, map_path, AES_key): + mapping = encrypt.read_mapping(priv_key_path, map_path, AES_key) cluster_account = mapping[url]["uid"][uid]["acc"] del mapping[url]["uid"][uid] - write_mapping(mapping, pub_key_path, map_path) + encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key) print(f"Removed CI access for cluster account: {cluster_account}") -def remove_project_account(url, pid, priv_key_path, pub_key_path, map_path): - mapping = read_mapping(priv_key_path, map_path) +def remove_project_account(url, pid, priv_key_path, pub_key_path, map_path, AES_key): + mapping = encrypt.read_mapping(priv_key_path, map_path, AES_key) cluster_account = mapping[url]["pid"][pid]["acc"] del mapping[url]["pid"][pid] - write_mapping(mapping, pub_key_path, map_path) + encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key) print(f"Removed CI access for cluster account: {cluster_account}") -def remove_url(url, priv_key_path, pub_key_path, map_path): - mapping = read_mapping(priv_key_path, map_path) +def remove_url(url, priv_key_path, pub_key_path, map_path, AES_key): + mapping = encrypt.read_mapping(priv_key_path, map_path, AES_key) removed_serv_accounts = [] for x in mapping[url]["pid"]: removed_serv_accounts.append(mapping[url]["pid"][x]["acc"]) for x in mapping[url]["uid"]: removed_serv_accounts.append(mapping[url]["uid"][x]["acc"]) del mapping[url] - write_mapping(mapping, pub_key_path, map_path) + encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key) print(f"Removed CI access for cluster accounts: {removed_serv_accounts}") + def run(): args = CLI() @@ -214,22 +190,25 @@ def run(): exit(1) if args.create: - if args.private_key_path is None or args.public_key_path is None or args.mapping_path is None: + if args.private_key_path is None or args.public_key_path is None or args.mapping_path is None or args.aes_encryption_key_path is None: print("Arguments for private/public key and mapping path must be provided.") exit(1) else: - create_mapping(args.private_key_path, args.public_key_path, args.mapping_path) + create_mapping(os.path.abspath(args.private_key_path), os.path.abspath(args.public_key_path), + os.path.abspath(args.mapping_path), os.path.abspath(args.aes_encryption_key_path)) if args.remove_mapping: if args.gitlab_url is None: print("Arguments for gitlab url must be provided.") exit(1) if args.gitlab_project_id is not None: - remove_project_account(args.gitlab_url, args.gitlab_project_id, args.private_key_path, args.public_key_path, - args.mapping_path) + remove_project_account(args.gitlab_url, args.gitlab_project_id, os.path.abspath(args.private_key_path), + os.path.abspath(args.public_key_path), + os.path.abspath(args.mapping_path), os.path.abspath(args.aes_encryption_key_path)) elif args.gitlab_user_id is not None: - remove_user_account(args.gitlab_url, args.gitlab_user_id, args.private_key_path, args.public_key_path, - args.mapping_path) + remove_user_account(args.gitlab_url, args.gitlab_user_id, os.path.abspath(args.private_key_path), + os.path.abspath(args.public_key_path), + os.path.abspath(args.mapping_path), os.path.abspath(args.aes_encryption_key_path)) else: print("Arguments for gitlab project id or gitlab user id must be provided.") exit(1) @@ -239,7 +218,8 @@ def run(): print("Argument for gitlab url must be provided.") exit(1) else: - remove_url(args.gitlab_url, args.private_key_path, args.public_key_path, args.mapping_path,) + remove_url(args.gitlab_url, os.path.abspath(args.private_key_path), os.path.abspath(args.public_key_path), + os.path.abspath(args.mapping_path), os.path.abspath(args.aes_encryption_key_path)) if args.add_mapping: if args.gitlab_url is None: @@ -252,13 +232,17 @@ def run(): print("Argument for delete date must be provided.") exit(1) if args.gitlab_project_id is not None: - add_project_account(args.gitlab_url, args.gitlab_project_id, args.private_key_path, args.public_key_path, - args.mapping_path, args.cluster_account_name, args.delete_date) + add_project_account(args.gitlab_url, args.gitlab_project_id, os.path.abspath(args.private_key_path), + os.path.abspath(args.public_key_path), os.path.abspath(args.mapping_path), + args.cluster_account_name, args.delete_date, + os.path.abspath(args.aes_encryption_key_path)) elif args.gitlab_user_id is not None: - add_user_account(args.gitlab_url, args.gitlab_user_id, args.private_key_path, args.public_key_path, - args.mapping_path, args.cluster_account_name, args.delete_date) + add_user_account(args.gitlab_url, args.gitlab_user_id, os.path.abspath(args.private_key_path), + os.path.abspath(args.public_key_path), os.path.abspath(args.mapping_path), + args.cluster_account_name, args.delete_date, os.path.abspath(args.aes_encryption_key_path)) else: print("Arguments for gitlab project id or gitlab user id must be provided.") exit(1) + run() \ No newline at end of file diff --git a/core/authentication/EncryptionManager.py b/core/authentication/EncryptionManager.py new file mode 100644 index 0000000000000000000000000000000000000000..658e205585615e29647eecd6e3e9f9f4feda1a35 --- /dev/null +++ b/core/authentication/EncryptionManager.py @@ -0,0 +1,93 @@ +import json + +import rsa +import os +import base64 +import hashlib +from Crypto import Random +from Crypto.Cipher import AES + + +def load_priv_key(path): + path = os.path.join(os.path.dirname(__file__), path) + with open(path, mode='rb') as private_file: + key_data = private_file.read() + return rsa.PrivateKey.load_pkcs1(key_data) + + +def load_pub_key(path): + path = os.path.join(os.path.dirname(__file__), path) + with open(path, mode='rb') as public_file: + key_data = public_file.read() + return rsa.PublicKey.load_pkcs1(key_data) + + +def create_keys(private_key_path, pub_key_path): + (pubkey, private_key) = rsa.newkeys(2048) + with open(f"{pub_key_path}", "w") as text_file: + text_file.write(pubkey.save_pkcs1().decode('ascii')) + with open(f"{private_key_path}", "w") as text_file: + text_file.write(private_key.save_pkcs1().decode('ascii')) + + +def read_mapping(key_path, map_path, AES_key): + key = get_AES_key(AES_key, key_path) + cipher = AESCipher(key) + with open(map_path, mode='rb') as file: + file.seek(0) + data = file.read() + try: + json_file = cipher.decrypt(data) + search_file = json.loads(json_file) + return search_file + except ValueError: + return {} + + +def write_mapping(mapping, key_path, map_path, AES_key): + key = get_AES_key(AES_key, key_path) + cipher = AESCipher(key) + json_file = json.dumps(mapping) + encrypted = cipher.encrypt(json_file) + with open(map_path, "wb") as text_file: + text_file.write(encrypted) + + +def get_AES_key(file_path: str, key_path: str) -> str: + with open(file_path, mode='rb') as file: + file.seek(0) + data = file.read() + key = rsa.decrypt(data, load_priv_key(key_path)) + return key.decode('ascii') + + +def set_AES_key(key_phrase: str, file_path: str, key_path: str) -> None: + encrypted = rsa.encrypt(key_phrase.encode('ascii'), load_pub_key(key_path)) + with open(file_path, "wb") as text_file: + text_file.write(encrypted) + + +class AESCipher(object): + + def __init__(self, key): + self.bs = AES.block_size + self.key = hashlib.sha256(key.encode()).digest() + + def encrypt(self, raw): + raw = self._pad(raw) + iv = Random.new().read(AES.block_size) + cipher = AES.new(self.key, AES.MODE_CBC, iv) + return base64.b64encode(iv + cipher.encrypt(raw.encode())) + + def decrypt(self, enc): + enc = base64.b64decode(enc) + iv = enc[:AES.block_size] + cipher = AES.new(self.key, AES.MODE_CBC, iv) + return self._unpad(cipher.decrypt(enc[AES.block_size:])).decode('utf-8') + + def _pad(self, s): + return s + (self.bs - len(s) % self.bs) * chr(self.bs - len(s) % self.bs) + + @staticmethod + def _unpad(s): + return s[:-ord(s[len(s)-1:])] diff --git a/core/authentication/JSONManager.py b/core/authentication/JSONManager.py index a4a6f1b9d297e1b2951d6fdf3444a6b97118e9fc..0946c8389c300525b1ef1d7ce05ccf22da1c5c8e 100644 --- a/core/authentication/JSONManager.py +++ b/core/authentication/JSONManager.py @@ -1,41 +1,43 @@ -import time import json -import rsa -import os - -def load_priv_key(path): - path = os.path.join(os.path.dirname(__file__), path) - with open(path, mode='rb') as privatefile: - keydata = privatefile.read() - return rsa.PrivateKey.load_pkcs1(keydata) - - -def load_pub_key(path): - path = os.path.join(os.path.dirname(__file__), path) - with open(path, mode='rb') as pubfile: - keydata = pubfile.read() - return rsa.PublicKey.load_pkcs1(keydata) - - -def create_keys(): - (pubkey, privkey) = rsa.newkeys(2048) - with open("/home/ppl/Runner/id_rsa.pub", "w") as text_file: - text_file.write(pubkey.save_pkcs1().decode('ascii')) - with open("/home/ppl/Runner/id_rsa", "w") as text_file: - text_file.write(privkey.save_pkcs1().decode('ascii')) - -def get_account(url, pid, uid, key_path, map_path): - with open(map_path, mode='rb') as file: - data = file.read() - json_file = rsa.decrypt(data, load_priv_key(key_path)) - search_file = json.loads(json_file) +import core.authentication.EncryptionManager as encrypt +from filelock import FileLock + + +def add_id_mapping(path, CI_id, slurm_id): + lock = FileLock(f"{path}.lock") + with lock: + try: + with open(path, "r") as file: + mapping = json.loads(file.read()) + except IOError: + mapping = {} + mapping[CI_id] = slurm_id + new_mapping = json.dumps(mapping) + with open(path, "w") as file: + file.write(new_mapping) + + +def remove_id_mapping(path, CI_id): + lock = FileLock(f"{path}.lock") + with lock: + with open(path, "r") as file: + mapping = json.loads(file.read()) + del mapping[CI_id] + new_mapping = json.dumps(mapping) + with open(path, "w") as file: + file.write(new_mapping) + + +def get_account(url, pid, uid, key_path, map_path, aes_path): + search_file = encrypt.read_mapping(key_path, map_path, aes_path) instance = search_file[url] - result = instance["uid"][uid] - if result == None: - result = instance["pid"][pid] - if result == None: - print("Cannot assign GitLab user/project to cluster account. Please register here: TODO") - exit(1) - return result["acc"] - - + result = None + try: + result = instance["uid"][uid]["acc"] + except KeyError: + try: + result = instance["pid"][pid]["acc"] + except KeyError: + print("Cannot assign GitLab user/project to cluster account. Please register here: TODO") + exit(1) + return result diff --git a/core/authentication/JSONTest.py b/core/authentication/JSONTest.py deleted file mode 100644 index 41b6a55a591c2bc8464d6aaedc94eefb0e1b8670..0000000000000000000000000000000000000000 --- a/core/authentication/JSONTest.py +++ /dev/null @@ -1,11 +0,0 @@ -import core.authentication.JSONManager as man -import json -import rsa - -def create_testset(): - dict = {"https://git-ce.rwth-aachen.de" : {"pid" : {}, "uid" : {"2076": "tester1"}}} - json_file = json.dumps(dict) - man.create_keys() - encrypted = rsa.encrypt(json_file.encode('ascii'), man.load_pub_key("id_rsa.pub")) - with open("/home/ppl/Runner/Assignments.txt", "wb") as text_file: - text_file.write(encrypted) diff --git a/core/authentication/JWTManager.py b/core/authentication/JWTManager.py index 984e6f571ee1a254ec42a1a668e2662969b910a5..3f661a08bf7666baef50f3e907c901d57137bb11 100644 --- a/core/authentication/JWTManager.py +++ b/core/authentication/JWTManager.py @@ -1,13 +1,14 @@ import jwt import time + def get_UID_PID(JWT, url): jwks_client = jwt.PyJWKClient(url) signing_key = jwks_client.get_signing_key_from_jwt(JWT) # wait for token to be valid time.sleep(2) data = jwt.decode(JWT, - signing_key.key, - algorithms=["RS256"], - options={"verify_exp": False}) + signing_key.key, + algorithms=["RS256"], + options={"verify_exp": False}) return data["user_id"], data["project_id"] diff --git a/core/authentication/authmanager.py b/core/authentication/authmanager.py deleted file mode 100644 index de5bda363a3538efff72376c3722249a3c5584e0..0000000000000000000000000000000000000000 --- a/core/authentication/authmanager.py +++ /dev/null @@ -1,62 +0,0 @@ -import os -import subprocess -import pwd -import time -import colorama - - -def demote(user, uid, gid): - def demote_function(): - print("starting") - print('uid, gid = %d, %d' % (os.getuid(), os.getgid())) - os.chdir(f"/home/{user}") - print(os.getgroups()) - # initgroups must be run before we lose the privilege to set it! - os.initgroups(user, gid) - os.setgid(gid) - # this must be run last - os.setuid(uid) - print("finished demotion") - print('uid, gid = %d, %d' % (os.getuid(), os.getgid())) - print(os.getgroups()) - return demote_function - - -def get_user(user): - uid = pwd.getpwnam(user).pw_uid - gid = pwd.getpwnam(user).pw_gid - print('uid, gid = %d, %d' % (os.getuid(), os.getgid())) - print('User: uid, gid = %d, %d' % (uid, gid)) - return uid, gid - - -def run_task(user, cmd): - return_code = 0 - err = "" - out = "" - print("requesting rights for user: " + user) - uid, gid = get_user(user) - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - preexec_fn=demote(user, uid, gid), - shell=True - ) - - # process can execute normally, no exceptions at startup - process_running = True - while process_running: - if proc.poll() is not None: - process_running = False - # half a second of resolution - time.sleep(0.5) - return_code = proc.returncode - out = proc.stderr.read() - err = proc.stdout.read() - colorama.init() - clean_print(err) - clean_print(out) - -def clean_print(string): - print(str(string)[2:-1].replace("\\n","\n")) diff --git a/core/job.py b/core/job.py index 125abff9023a84ab4a2b2ec2635d2e71c603bebb..99135c1036c933c921290adc9c34fa936ce63a24 100644 --- a/core/job.py +++ b/core/job.py @@ -8,22 +8,31 @@ import time import core.modes as modes from core.utility import get_cenv, defines +import core.utility as utility import core.authentication.JWTManager as jwt import core.authentication.JSONManager as man -class Job(): + +class Job: _mode = None user_path = 'Runner' key_path = None runner_path = None map_path = None + aes_path = None down_scoping = True jobid = None + pipeline_id = None account = None _home = None driver_path = None - scripts_dir = '' exec_script = '' + allow_failure = False + shared_tmp = '' + tmp_dir = '' + stage_tmp_dir = '' + error_code_file = '' + errcode_dict = dict() mode_name = '' @property @@ -71,7 +80,7 @@ class Job(): import pdb_attach import random random.seed(int(get_cenv("CI_JOB_ID"))) - port = random.randint(1024,65535) + port = random.randint(1024, 65535) pdb_attach.listen(port) if self.args[1] != 'config': print(f'Debug mode on listening on port {port}', sys.stderr) @@ -82,10 +91,11 @@ class Job(): replacements_base = ['HOME', 'WORK'] REPLACEMENTS = {f'${s}': os.getenv(s, '') for s in replacements_base} + def rep(s): - rep = dict((re.escape(k), v) for k, v in REPLACEMENTS.items()) - pattern = re.compile("|".join(rep.keys())) - return pattern.sub(lambda m: rep[re.escape(m.group(0))], s) + rep_value = dict((re.escape(k), v) for k, v in REPLACEMENTS.items()) + pattern = re.compile("|".join(rep_value.keys())) + return pattern.sub(lambda m: rep_value[re.escape(m.group(0))], s) file = open(f'{self.driver_path}/config.json', mode='r') CONFIG_MAPPING = json.load(file) @@ -98,61 +108,78 @@ class Job(): self.key_path = rep(CONFIG_MAPPING["Key Path"]) self.map_path = rep(CONFIG_MAPPING["Map Path"]) self.runner_path = rep(CONFIG_MAPPING["Runner Path"]) + self.aes_path = rep(CONFIG_MAPPING["AES Path"]) + self.shell_path = rep(CONFIG_MAPPING["Shell Install"]) if self.down_scoping: uid, pid = jwt.get_UID_PID(get_cenv('CI_JOB_JWT'), f"{get_cenv('CI_SERVER_URL')}/-/jwks") - self.account = man.get_account(get_cenv('CI_SERVER_URL'), pid, uid, self.key_path, self.map_path) + self.account = man.get_account(get_cenv('CI_SERVER_URL'), pid, uid, self.key_path, self.map_path, + self.aes_path) self.user_path = f'/work/{self.account}/{CONFIG_MAPPING["User Path"]}' if self.account is None: - logging.error(f"No mapping for GitLab project: {get_cenv('CI_PROJECT_NAME')}, or GitLab user: {get_cenv('GITLAB_USER_NAME')} available. Please register CI support for to acces the Runner") + logging.error( + f"No mapping for GitLab project: {get_cenv('CI_PROJECT_NAME')}, or GitLab user: " + f"{get_cenv('GITLAB_USER_NAME')} available. Please register CI support for to access the Runner") else: self.user_path = f"{os.getenv('WORK')}/{CONFIG_MAPPING['User Path']}" - def __setup(self): self.__get_config() self.jobid = get_cenv('CI_JOB_ID') - self.scripts_dir = f'{self.runner_path}/scripts/{get_cenv("CI_PIPELINE_ID")}/{self.jobid}' - os.makedirs(self.scripts_dir, exist_ok=True) - + self.pipeline_id = get_cenv('CI_PIPELINE_ID') + self.allow_failure = get_cenv('ALLOW_FAILURE', False) + self.driver_work = os.getenv("HOME") + self.shared_tmp = f'{self.driver_work}/shared_tmp/{self.pipeline_id}' + self.concurrent_tmp = f'{self.driver_work}/concurrent_tmp/{get_cenv("CI_CONCURRENT_PROJECT_ID")}' + self.tmp_dir = f'{self.concurrent_tmp}/{self.pipeline_id}' + self.stage_tmp_dir = f'{self.tmp_dir}/stages/{get_cenv("CI_JOB_STAGE")}' def __init__(self, args, driver_path): + self._cmd = None self.args = args self.driver_path = driver_path self.__setup() self.mode_name = get_cenv('CI_MODE', 'Slurm').strip() logging.basicConfig(filename=f'{self.jobid}.log', encoding='utf-8', level=logging.DEBUG) + self._mode = modes.get_mode(self) if self.args[1] == 'config': self.config() elif self.args[1] == 'prepare': - os.system('hostname') - print(get_cenv('CI_PROJECT_PATH')) - print(get_cenv('GITLAB_USER_NAME')) + self.prepare() elif self.args[1] == 'run': self.run() elif self.args[1] == 'cleanup': self.cleanup() else: - logging.error(f'Unknown driver event: {args[1] }') - + logging.error(f'Unknown driver event: {args[1]}') + + def prepare(self): + if os.path.isdir(self.concurrent_tmp): + for p in os.listdir(f'{self.concurrent_tmp}'): + if p != self.pipeline_id and os.path.isdir(f'{self.concurrent_tmp}/{p}'): + print(f'Removing {self.concurrent_tmp}/{p}') + os.system(f'rm -r {self.concurrent_tmp}/{p}') + os.system('hostname') + print(get_cenv('CI_PROJECT_PATH'), flush=True) + print(get_cenv('GITLAB_USER_NAME'), flush=True) + self._mode.get_custom_prepare() def config(self): - os.system(f'mkdir -p {self.scripts_dir}') - os.system(f'mkdir -p {self.runner_path}/jobIds') builder = "{ \"builds_dir\": \"" builder += self.build_path builder += "\", \"cache_dir\": \"" builder += self.cache_path - builder += "\", \"builds_dir_is_shared\": true, \"hostname\": \"custom-hostname\", \"driver\": { \"name\": \"" + builder += "\", \"builds_dir_is_shared\": true, \"hostname\": \"custom-hostname\", \"driver\": " \ + "{ \"name\": \"" builder += defines.name builder += "\", \"version\": \"" builder += defines.version builder += "\" }, \"job_env\" : { \"CUSTOM_ENVIRONMENT\": \"example\" }}" -# print(get_cenv("CI_CONCURRENT_PROJECT_ID"), sys.stderr) -# print(os.getenv("CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID"), sys.stderr) -# print(builder, file=sys.stderr) + # print(get_cenv("CI_CONCURRENT_PROJECT_ID"), sys.stderr) + # print(os.getenv("CUSTOM_ENV_CI_CONCURRENT_PROJECT_ID"), sys.stderr) + # print(builder, file=sys.stderr) print(builder) def generate_sudo_env(self): @@ -165,29 +192,41 @@ class Job(): def execute(self, run_properties, run_script, command): if self.down_scoping: - run_properties = f'sudo {self.generate_sudo_env()} -u {self.account} --shell ' + run_properties + run_properties = f'sudo {self.generate_sudo_env()} -u {self.account} ' + run_properties os.chdir("/work") if run_properties == '': command = [run_script] else: command.extend([run_properties, run_script]) + print(self.custom_env, sys.stderr, flush=True) print(command, sys.stderr, flush=True) cmd_ret = subprocess.run(command, env=dict(os.environ, **{x: self.custom_env[x] - for x in self.custom_env}) - ).returncode - if int(cmd_ret) != 0: - exit(1) + for x in self.custom_env}) + ).returncode + job_was_canceled = get_cenv('CI_JOB_STATUS') == 'canceled' + cmd_ret = 1 if (int(cmd_ret) != 0 or job_was_canceled) else 0 + if job_was_canceled or not self.allow_failure: + utility.update_json_kv(self.error_code_file, self.jobid, cmd_ret) + if cmd_ret != 0: + if self._mode and not self.allow_failure: + self._mode.cleanup_on_failure() + exit(cmd_ret) def run(self): + if not self.allow_failure: + self.error_code_file = f'{self.stage_tmp_dir}/{self.args[3]}.json' + os.makedirs(self.stage_tmp_dir, exist_ok=True) + utility.update_json_kv(self.error_code_file, self.jobid, -1) + self.exec_script = self.args[2] command = [f"{self.driver_path}/core/scripts/pipeHelper.sh"] run_properties = '' - run_script = '' if self.args[3] not in ['build_script', 'step_script']: run_script = self.exec_script + if self.down_scoping: + run_properties = self._mode.get_simple_script_exec() else: - self._mode = modes.get_mode(self) command = [self._mode.get_combiner_script()] run_properties += self._mode.get_run_properties() run_script = self._mode.get_run_script() @@ -200,12 +239,6 @@ class Job(): self.execute(run_properties, run_script, command) exit(0) - def cleanup(self): self._mode = modes.get_mode(self) - # Gets handled in the cleanup function of the mode for SingleSlurmJobAcrossStages self._mode.cleanup() -# if not isinstance(self._mode, modes.SingleSlurmJobAcrossStages): -# print(f'Removing own script {self.exec_script}', sys.stderr) -# os.system(f'rm -r {self.scripts_dir}') -# print(f'Removed own script {self.exec_script}', sys.stderr) diff --git a/core/modes/__init__.py b/core/modes/__init__.py index bdccd5c883e7440196d253e1b694421eb4d4fad3..d9f766be07365b1ba0a1e4107c5a6fd77ae35979 100644 --- a/core/modes/__init__.py +++ b/core/modes/__init__.py @@ -4,16 +4,30 @@ import subprocess import re import importlib import sys +from filelock import FileLock +import json +import core.authentication.JSONManager as man from core.utility import get_cenv +import core.utility as utility VALID_MODES = ['Slurm', 'Singularity', 'Batch', 'SingleSlurmJobAcrossStages'] +srun_path = "srun" # "/usr/local_host/bin/srun" + + class ModeBase(ABC): _run_script = '' _run_properties = '' _custom_env = dict() + slurm_jobid_file = None _combiner_script = '' + run_properties_postfix = None + + @staticmethod + def get_jobid_from_file(path): + with open(path, 'r') as node_index_fp: + return node_index_fp.readline().strip() @property def run_script(self): @@ -58,18 +72,108 @@ class ModeBase(ABC): def get_post_run_properties(self): pass + @abstractmethod + def get_custom_prepare(self): + pass + + @abstractmethod + def get_simple_script_exec(self): + pass + + def get_run_properties_postfix(self): + pass + + def cleanup_on_failure(self): + print(f'Cleanup on failure: {self.slurm_jobid_file}', flush=True) + if not self.slurm_jobid_file: + return + command = [] + if self.job.down_scoping: + command = f"sudo -u {self.job.account} ".split() + print(str(command + ['/opt/slurm/current/bin/scancel', f'{self.get_jobid_from_file(self.slurm_jobid_file)}']), + sys.stderr) + subprocess.run( + command + ['/opt/slurm/current/bin/scancel', f'{ModeBase.get_jobid_from_file(self.slurm_jobid_file)}']) + print(f'Cleanup on failure: canceled {ModeBase.get_jobid_from_file(self.slurm_jobid_file)}', flush=True) + @abstractmethod def cleanup(self): - os.system(f'rm -r {self.job.scripts_dir}') + if self.job.allow_failure: + return + cc_tmp = os.path.split(self.job.concurrent_tmp)[0] + do_cleanup = 0 + for cc_id in os.listdir(cc_tmp): + pipeline_path = f'{cc_tmp}/{cc_id}/{self.job.pipeline_id}/stages/{get_cenv("CI_JOB_STAGE")}' + if not os.path.isdir(pipeline_path): + continue + for error_file in os.listdir(pipeline_path): + if not error_file.endswith('.json'): + continue + + lock = FileLock(f'{pipeline_path}/{error_file}.lock') + with lock: + with open(f'{pipeline_path}/{error_file}', 'r+') as error_file_fd: + error_file_fd.seek(0) + error_codes = json.load(error_file_fd) + for jobid in error_codes: + if error_codes[jobid] == -1: + # do_cleanup = -1 + return + elif error_codes[jobid] == 1: + do_cleanup = 1 + + if do_cleanup == 1 and self.job.shared_tmp and os.path.isdir(self.job.shared_tmp): + command = [] + if self.job.down_scoping: + command = f"sudo -u {self.job.account} ".split() + command.extend(['/opt/slurm/current/bin/scancel', f'--name=CI_{self.job.pipeline_id}']) + # scancel_out = subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout + os.system(f'rm -r {self.job.shared_tmp}') + man.remove_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID")) try: - os.rmdir(f'{self.job.runner_path}/scripts/{get_cenv("CI_PIPELINE_ID")}') - except (FileNotFoundError, OSError) as e: + os.rmdir(f'{self.job.runner_path}/scripts/{self.job.pipeline_id}') + except (FileNotFoundError, OSError): pass + def abort(self, error_str, exit_code=1): + utility.update_json_kv(self.job.error_code_file, self.job.jobid, exit_code) + ModeBase.cleanup(self) + print(error_str, flush=True) + exit(exit_code) + class Slurm(ModeBase): slurm_vars = [] + def get_run_properties_postfix(self): + return "" + + def get_custom_prepare(self): + downscoping_command = ['sudo', '-u', self.job.account] + salloc_command = ['/opt/slurm/current/bin/salloc', '--no-shell', f'--job-name=CI_{self.job.pipeline_id}'] + if self.job.down_scoping: + salloc_command = [f"{self.job.driver_path}/core/scripts/runHelper.sh", + ' '.join(downscoping_command + salloc_command)] + salloc_out = subprocess.run(salloc_command, text=True, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT).stdout + print(salloc_command) + print(salloc_out) + self.slurm_simple_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1) + man.add_id_mapping(f"{self.job.driver_path}/SlurmIDMapping.json", get_cenv("CI_JOB_ID"), + self.slurm_simple_job_id) + + def get_simple_script_exec(self): + return f"{srun_path} --jobid={self.slurm_simple_job_id} {self.job.shell_path} -l" + + @staticmethod + def env_copy(): + user = os.getenv("USER") + env = {k: v for k, v in os.environ.items() if not v.__contains__(user) or k == "PATH"} + export_env = "" + for k, v in env.items(): + export_env = f'{export_env}{k}="{v}",' + return export_env[:-1] + def get_combiner_script(self): self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh" return self._combiner_script @@ -78,11 +182,10 @@ class Slurm(ModeBase): for variable in os.environ: if variable.startswith("CUSTOM_ENV_SLURM_ENV_"): temp = variable.replace("CUSTOM_ENV_SLURM_ENV_", "") - if temp.startswith("SLURM_"): + if any(temp.startswith(s) for s in ['SLURM', 'SRUN']): self._custom_env[temp] = os.environ[variable] else: - print("Error: Only Slurm environment variables allowed.") - exit(1) + ModeBase.abort(self, f"Error: Only Slurm environment variables allowed: {variable}") def get_slurm_variables(self): self.slurm_vars.append(f"--time={get_cenv('SLURM_TIME', '00:10:00')}") @@ -90,26 +193,31 @@ class Slurm(ModeBase): self.slurm_vars.append(f"--nodes={get_cenv('SLURM_NUM_NODES', '1')}") num_gpus = get_cenv('SLURM_NUM_GPUS') - if num_gpus != None: + if num_gpus is not None: self.slurm_vars.append(f"--gres=gpu:{num_gpus}") - partition = get_cenv('SLURM_PARTITION') - if partition != None: + partition = get_cenv('SLURM_PARTITION') + if partition is not None: self.slurm_vars.append(f"--partition={partition}") def set_srun_cmd(self): - prop_list = ['srun', f'--job-name=CI_{self.job.jobid}'] + prop_list = [f'{srun_path}', f'--job-name=CI_{self.job.jobid}'] prop_list.extend(self.slurm_vars) self._run_properties = ' '.join(prop_list) def __init__(self, job): ModeBase.__init__(self, job) - self.get_slurm_variables() + try: + with open(f"{self.job.driver_path}/SlurmIDMapping.json", "r") as file: + mapping = json.loads(file.read()) + self.slurm_simple_job_id = mapping[get_cenv("CI_JOB_ID")] + except (IOError, KeyError): + self.slurm_simple_job_id = None def get_run_properties(self): self.set_srun_cmd() self.set_slurm_env() - return self._run_properties + ' /usr/bin/env bash' + return self._run_properties + f' {self.job.shell_path} -l' def get_run_script(self): self._run_script = self.job.exec_script @@ -131,16 +239,17 @@ class Slurm(ModeBase): class Batch(Slurm): def __init__(self, job): - ModeBase.__init__(self, job,) + Slurm.__init__(self, job, ) def get_combiner_script(self): self._combiner_script = f"{self.job.driver_path}/core/scripts/execHelper.sh" return self._combiner_script def get_batch_properties(self, batch_script): - cmd_out = "" + # cmd_out = "" if self.job.down_scoping: - command = [f"{self.job.driver_path}/core/scripts/runHelper.sh", f"sudo -u {self.job.account} --shell srun /usr/bin/env cat {batch_script}"] + command = [f"{self.job.driver_path}/core/scripts/runHelper.sh", + f"sudo -u {self.job.account} {srun_path} /usr/bin/cat {batch_script}"] stdout = subprocess.run(command, stdout=subprocess.PIPE, text=True).stdout cmd_out = ' '.join([l.split()[1] for l in stdout.split("\n") if l.startswith('#SBATCH')]) else: @@ -150,11 +259,14 @@ class Batch(Slurm): def get_run_properties(self): batch_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}' - self._run_properties = ' '.join(['srun', self.get_batch_properties(batch_script)]) + self._run_properties = ' '.join([f'{srun_path}', self.get_batch_properties(batch_script)]) print('Warning: The contents of the script section in the CI definition ' 'will be used as a post-processing script in the batch mode.') - return self._run_properties + ' /usr/bin/env bash' + return self._run_properties + f' {self.job.shell_path}' + + def get_run_properties_postfix(self): + return "" def get_run_script(self): self._run_script = f'{self.job.clone_path}/{get_cenv("BATCH_SCRIPT")}' @@ -188,22 +300,48 @@ class Singularity(Slurm): return self._run_script def get_run_properties(self): + self._run_properties = Slurm.get_run_properties(self) # get container for singularity self.container = get_cenv('CONTAINER') if self.container is None: - print("Error: No container defined, use variable CONTAINER") - exit(1) - self._run_properties = Slurm.get_run_properties(self) + ModeBase.abort(self, "Error: No container defined, use variable CONTAINER") if os.path.exists(self.container): self.container = f'{self.job.clone_path}/{self.container}' - self._run_properties += f' {self.job.runner_path}/core/scripts/singularityLocalRunstep.sh {self.container} ' - else: - self._run_properties += f' {self.job.runner_path}/core/scripts/singularityRunstep.sh {self.container} ' + # Generation of the singularity script within user space + if self.job.down_scoping: + print([f"{self.job.driver_path}/core/scripts/pipeHelper.sh", f'sudo -u {self.job.account} ' + f'{srun_path} --jobid={self.slurm_simple_job_id} /usr/bin/cp /dev/stdin ' + f'/work/{self.job.account}/script{self.job.jobid}.sh', + self.get_run_properties_postfix()], flush=True) + subprocess.run([f"{self.job.driver_path}/core/scripts/pipeHelper.sh", f'sudo -u {self.job.account} ' + f'{srun_path} --jobid={self.slurm_simple_job_id} /usr/bin/cp /dev/stdin ' + f'/work/{self.job.account}/script{self.job.jobid}.sh', + self.get_run_properties_postfix()]) + # add singularity specific parameter to the properties + property_split = self._run_properties.split(" ") + property_split.insert(1, f'--export=CONTAINER={self.container}') + property_split.append(f"/work/{self.job.account}/script{self.job.jobid}.sh") + self._run_properties = " ".join(property_split) return self._run_properties def cleanup(self): + subprocess.run([f"{self.job.driver_path}/core/scripts/runHelper.sh", f'sudo -u {self.job.account} ' + f'{srun_path} --jobid={self.slurm_simple_job_id} /usr/bin/rm ' + f'/work/{self.job.account}/script{self.job.jobid}.sh']) ModeBase.cleanup(self) + def get_run_properties_postfix(self): + self.run_properties_postfix = "" + # get container for singularity + self.container = get_cenv('CONTAINER') + if self.container is None: + ModeBase.abort(self, "Error: No container defined, use variable CONTAINER") + if os.path.exists(self.container): + self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityLocalRunstep.sh ' + else: + self.run_properties_postfix += f' {self.job.runner_path}/core/scripts/singularityRunstep.sh ' + return self.run_properties_postfix + def has_post_run_script(self): return False @@ -215,47 +353,46 @@ class Singularity(Slurm): class SingleSlurmJobAcrossStages(Slurm): - def get_combiner_script(self): self._combiner_script = f"{self.job.driver_path}/core/scripts/pipeHelper.sh" return self._combiner_script + def get_run_properties_postfix(self): + return "" + def get_jobid_from_file(self, path): with open(path, 'r') as node_index_fp: return node_index_fp.readline().strip() - def get_node_id_str(self, vars): - if vars and len(vars): - return ''.join([os.getenv(v).replace('/', '') for v in vars]) + @staticmethod + def get_node_id_str(variables): + if variables and len(variables): + return ''.join([os.getenv(v).replace('/', '') for v in variables]) return 'Sequential' - def get_env_for_single_slurm_job(self, vars): - return {v.replace('CUSTOM_ENV_PARVAR_', '') : os.getenv(v) for v in vars} + @staticmethod + def get_env_for_single_slurm_job(variables): + return {v.replace('CUSTOM_ENV_PARVAR_', ''): os.getenv(v) for v in variables} def __init__(self, job): Slurm.__init__(self, job) - self.tmp_path = f'{self.job.driver_path}/tmp/{get_cenv("CI_PIPELINE_ID")}/' - os.makedirs(self.tmp_path, exist_ok=True) self.slurm_job_id = None self.id_vars = [v for v in os.environ if v.startswith('CUSTOM_ENV_PARVAR_')] - os.makedirs(f'{self.tmp_path}/SlurmJobIds/', exist_ok=True) - os.makedirs(f'{self.tmp_path}/CiJobIds/', exist_ok=True) - self.slurm_jobid_file = f'{self.tmp_path}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt' - self.ci_jobid_file = f'{self.tmp_path}/CiJobIds/{self.get_node_id_str(self.id_vars)}.txt' + os.makedirs(f'{self.job.shared_tmp}/SlurmJobIds', exist_ok=True) + os.makedirs(f'{self.job.tmp_dir}/SlurmJobIds', exist_ok=True) + self.slurm_jobid_file = f'{self.job.shared_tmp}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt' + self.cc_slurm_jobid_file = f'{self.job.tmp_dir}/SlurmJobIds/{self.get_node_id_str(self.id_vars)}.txt' def get_run_properties(self): - - # Collect the CI Job Ids of all CI Jobs that run in the same slurm job to delete - # the scripts later - with open (self.ci_jobid_file, 'a+') as ci_jobid_fp: - ci_jobid_fp.write(f'{self.job.jobid}\n') - if not os.path.isfile(self.slurm_jobid_file): - downscoping_command = ['sudo', '-u', self.job.account, '--shell', '/usr/bin/env', 'bash', '-c'] - salloc_command = ['salloc', '--no-shell', f'--job-name=CI_{self.job.jobid}'] + downscoping_command = ['sudo', self.job.generate_sudo_env(), '-u', self.job.account] + num_cores = get_cenv("SLURM_ENV_SRUN_CPUS_PER_TASK") + salloc_command = ['/opt/slurm/current/bin/salloc', f'--cpus-per-task={num_cores}', '--no-shell', + f'--job-name=CI_{self.job.pipeline_id}'] salloc_command.extend(self.slurm_vars) if self.job.down_scoping: - salloc_command = downscoping_command + [' '.join(salloc_command)] + salloc_command = [f"{self.job.driver_path}/core/scripts/runHelper.sh", + ' '.join(downscoping_command + salloc_command)] salloc_out = subprocess.run(salloc_command, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout print(salloc_command) @@ -263,14 +400,22 @@ class SingleSlurmJobAcrossStages(Slurm): self.slurm_job_id = re.search(r'salloc: job (\d+)', salloc_out).group(1) with open(self.slurm_jobid_file, 'w+') as slurm_jobid_fp: slurm_jobid_fp.write(self.slurm_job_id + '\n') + with open(self.cc_slurm_jobid_file, 'w+') as slurm_jobid_fp: + slurm_jobid_fp.write(self.slurm_job_id + '\n') print(f'Starting new slurm job, could not find {self.slurm_jobid_file}') else: self.slurm_job_id = self.get_jobid_from_file(self.slurm_jobid_file) print(f'Using slurm job {self.slurm_job_id}, could find {self.slurm_jobid_file}') self._run_properties = Slurm.get_run_properties(self).split() - self._custom_env = self.get_env_for_single_slurm_job(self.id_vars) - self._custom_env['SLURM_JOB_ID'] = self.slurm_job_id + + additional_env = [] + for k, v in self.get_env_for_single_slurm_job(self.id_vars).items(): + additional_env.append(f"{k}={v}") + # self._custom_env = self.get_env_for_single_slurm_job(self.id_vars) + # self._custom_env['SLURM_JOB_ID'] = self.slurm_job_id + if not additional_env.__sizeof__() == 0: + self._run_properties.insert(2, f'--export=' + ",".join(additional_env)) self._run_properties.insert(1, f'--jobid={self.slurm_job_id}') self._run_properties = ' '.join(self._run_properties) return self._run_properties @@ -280,40 +425,27 @@ class SingleSlurmJobAcrossStages(Slurm): return self._run_script def cleanup(self): - #ModeBase.cleanup(self) + ModeBase.cleanup(self) if get_cenv('END_SINGLE_SLURM_JOB') == '1': command = [] if self.job.down_scoping: - command = f"sudo -u {self.job.account} --shell /usr/bin/env bash -c ".split() - print(str(command + ['scancel', f'{self.get_jobid_from_file(self.slurm_jobid_file)}']), sys.stderr) - subprocess.run(command + ['scancel', f'{self.get_jobid_from_file(self.slurm_jobid_file)}']) + command = f"sudo -u {self.job.account}".split() + scancel_command = ['/opt/slurm/current/bin/scancel', f'{self.get_jobid_from_file(self.slurm_jobid_file)}'] + print(str(command + scancel_command), sys.stderr) + subprocess.run([f"{self.job.driver_path}/core/scripts/runHelper.sh", ' '.join(command + scancel_command)]) try: os.remove(self.slurm_jobid_file) except FileNotFoundError: pass - # Cleanup the directory with the jobIds of the whole pipeline - try: - os.rmdir(f'{self.tmp_path}/SlurmJobIds') - except (FileNotFoundError, OSError) as e: - pass - - with open(self.ci_jobid_file, 'r') as ci_jobid_fp: - for tmp_job_id in ci_jobid_fp.readlines(): - os.system(f'rm -r {self.job.runner_path}/scripts/{get_cenv("CI_PIPELINE_ID")}/{tmp_job_id}') - - try: - os.remove(self.ci_jobid_file) - except FileNotFoundError: - pass - + # Cleanup the directory with the jobIds of the whole pipeline try: - os.rmdir(f'{self.tmp_path}/CiJobIds') - except (FileNotFoundError, OSError) as e: + os.rmdir(f'{self.job.shared_tmp}/SlurmJobIds') + except (FileNotFoundError, OSError): pass try: - os.rmdir(f'{self.tmp_path}') - except (FileNotFoundError, OSError) as e: + os.rmdir(f'{self.job.shared_tmp}') + except (FileNotFoundError, OSError): pass def has_post_run_script(self): @@ -325,6 +457,7 @@ class SingleSlurmJobAcrossStages(Slurm): def get_post_run_properties(self): return None + # Get the possible modes from the CI script def get_mode(job): mode = get_cenv('CI_MODE', 'Slurm').strip() diff --git a/core/scripts/singularityLocalRunstep.sh b/core/scripts/singularityLocalRunstep.sh index 75e52cac3c075d0c0bf47b13d2277f6e561cef4e..5d7661cc6a5309bb19eb35a1e502cc63e0c61a99 100755 --- a/core/scripts/singularityLocalRunstep.sh +++ b/core/scripts/singularityLocalRunstep.sh @@ -2,4 +2,4 @@ module load CONTAINERS -singularity shell --nv $1 +singularity shell -B /work:/work --nv $CONTAINER diff --git a/core/scripts/singularityRunstep.sh b/core/scripts/singularityRunstep.sh index 59195c0101cb36a7fb634d6020191e9a16dc8946..27e6b9351e8a24c7f29705b4561b60dd15ba140c 100755 --- a/core/scripts/singularityRunstep.sh +++ b/core/scripts/singularityRunstep.sh @@ -2,6 +2,6 @@ module load CONTAINERS -module load $1 +module load $CONTAINER -singularity shell --nv $R_CONTAINER +singularity shell -B /work:/work --nv $R_CONTAINER diff --git a/core/utility/__init__.py b/core/utility/__init__.py index 36c8650dbd92f817bcd9e921d6a91ad0d056e98d..ab079ae029d044f936e628c72d127d619a92f8d8 100644 --- a/core/utility/__init__.py +++ b/core/utility/__init__.py @@ -1,4 +1,30 @@ import os +import json +from filelock import FileLock + + +def update_json_kv(path, key, value): + lock = FileLock(f"{path}.lock") + with lock: + with open(path, 'a+')as f: + f.seek(0) + try: + tmp_d = json.load(f) +# print(f'Found json file with {str(tmp_d)}', flush=True) + except Exception: + tmp_d = dict() +# print(f'Error while loading json file {path}', flush=True) +# print(e, flush=True) + try: + tmp_d[key] = max(value, tmp_d[key]) + except KeyError: + tmp_d[key] = value + f.seek(0) + f.truncate() + json.dump(tmp_d, f, indent=2) + f.flush() +# print(f'dumped {tmp_d} to {f}', flush=True) + def get_cenv(env_str, default=None): return os.getenv(f'CUSTOM_ENV_{env_str}', default) diff --git a/core/utility/defines.py b/core/utility/defines.py index c96bd5ca8aa5283b7f8064bb107156ae4abcfbb5..50a693040d9827d42eb198e5a6eb01c576f986d6 100644 --- a/core/utility/defines.py +++ b/core/utility/defines.py @@ -1,4 +1,4 @@ name = 'Aixcellenz CI Driver' -version = '0.2.1' +version = '0.2.2' debug=False diff --git a/driver.py b/driver.py index d5a4e7334e0e9930fa57a2216e6401b952851318..36b17af9dbe8248a515eea2c61ac1cf06e415a32 100644 --- a/driver.py +++ b/driver.py @@ -10,13 +10,12 @@ user_path = "" runner_path = "" down_scoping = True - -#handle() +# handle() if len(argv) < 2: print("Error: no argument") exit(1) -#if argv[1] != 'config': # Do not use print in this step +# if argv[1] != 'config': # Do not use print in this step # print(f'Starting event with {str(argv)}', sys.stderr) myJob = Job(argv, os.path.abspath(os.path.dirname(__file__))) -#if argv[1] != 'config': # Do not use print in this step +# if argv[1] != 'config': # Do not use print in this step # print(f'Finished event with {str(argv)}', sys.stderr)