Skip to content
Snippets Groups Projects
Commit 06349b67 authored by Felix Tomski's avatar Felix Tomski
Browse files

Rework accountmanager

parent ce5053c6
No related branches found
No related tags found
No related merge requests found
......@@ -14,9 +14,13 @@
# For more information, see: https://docs.gitlab.com/ee/ci/yaml/index.html#stages
stages: # List of stages for jobs, and their order of execution
- unittest
- run
- deploy
include:
- local: 'utility/.gitlab/.unittest.yml'
variables:
CI_LOG_STDOUT: "1"
......
#!/usr/bin/env python3
import argparse
import string
import random
import os
import sys
import logging
from pprint import pprint
import core.authentication.EncryptionManager as encrypt
import core.utility.cli as cli
# Define Commandline interface
def CLI(_args):
mod_common_pars = [
{
"flags": ["-uid", "--gitlab-user-id"],
"type": str,
"metavar": "<uid>",
"help": "The gitlab uid of the assigned user."
},
{
"flags": ["-pid", "--gitlab-project-id"],
"type": str,
"metavar": "<pid>",
"help": "The gitlab pid of the assigned project."
},
{
"flags": ['-url', '--gitlab-url'],
"type": str,
"metavar": "<pub>",
"required": True,
"help": "Path to the public key file."
},
]
subcommands = {
'init': {
'aliases': ['i'],
'f': init_mapping
},
'remove': {
'aliases': ['rm'],
'f': remove
},
'add': {
'aliases': ['a'],
'f': add
},
'print': {
'aliases': ['p'],
'f': print_mapping
},
}
parameters = {
'init': [
{
"flags": ['-pub', '--pub-key-file'],
"type": str,
"metavar": "<file>",
"default": "key.pub",
"help": "Path to the public key file.",
},
],
'remove': [
*mod_common_pars,
],
'add': [
{
"flags": ["-del", "--delete-date"],
"type": str,
"metavar": "<del>",
"default": "never",
"help": "The deletion date of the coupling. Default: never"
},
{
"flags": ["-acc", "--cluster-account-name"],
"type": str,
"metavar": "<acc>",
"required": True,
"help": "Name of the assigned cluster account."
},
*mod_common_pars,
],
'print': [],
'global': [
{
"flags": ["-priv", "--priv-key-file"],
"type": str,
"metavar": "<file>",
"default": "key",
"help": "Path to the private key file."
},
{
"flags": ["-aes", "--aes-key-file"],
"type": str,
"metavar": "<file>",
"default": "aes.txt",
"help": "Path to the AES key."
},
{
"flags": ["-map", "--mapping-file"],
"type": str,
"metavar": "<file>",
"default": "mapping.txt",
"help": "Path to the account mapping file."
},
{
"flags": ["-bp", "--base-path"],
"type": str,
"metavar": "<path>",
"default": "./",
"help": "Interpret keys, mapping etc. relative to this path. Default: cwd"
},
{
"flags": ["-lf", "--log-file"],
"type": str,
"metavar": "<file>",
"default": "history.log",
"help": "Log history in this file. Default: history.log"
},
{
"flags": ["-ll", "--log-level"],
"type": str,
"metavar": "<DEBUG|INFO|WARNING|ERROR|CRITICAL>",
"default": "INFO",
"help": "Log level. Default: INFO"
},
]
}
parser = argparse.ArgumentParser(prog='Aixcellenz CI AccountManager')
cli.add_parameters(parser, parameters['global'])
subparsers = parser.add_subparsers(help='sub-command help')
for cmd in subcommands:
subcmd_parser = subparsers.add_parser(cmd, help=f'{cmd} help', aliases=subcommands[cmd]['aliases'])
subcmd_parser.set_defaults(func=subcommands[cmd]['f'])
cli.add_parameters(subcmd_parser, parameters[cmd])
ret = parser.parse_args(_args)
ret.base_path = os.path.abspath(os.path.expandvars(os.path.expanduser(ret.base_path)))
os.makedirs(ret.base_path, exist_ok=True)
args_to_expand = ['mapping_file', 'priv_key_file', 'pub_key_file', 'aes_key_file',
'log_file']
for arg in args_to_expand:
if arg in ret:
tmp = os.path.expandvars(os.path.expanduser(vars(ret)[arg]))
if not os.path.isabs(tmp):
vars(ret)[arg] = os.path.join(ret.base_path, tmp)
return ret
def get_random_string(length: int) -> str:
# choose from all lowercase letter
letters = string.ascii_letters + string.digits
result_str = ''.join(random.choice(letters) for _ in range(length))
return result_str
def _init_mapping(priv_key_file, pub_key_file, mapping_file, aes_key_file):
if not (os.path.isfile(priv_key_file) and os.path.isfile(pub_key_file)):
encrypt.create_keys(priv_key_file, pub_key_file)
with open(mapping_file, "w") as text_file:
text_file.write('')
if not os.path.isfile(aes_key_file):
encrypt.set_AES_key(get_random_string(16), aes_key_file, pub_key_file)
logger.info(f'Added initial mapping at {mapping_file} with aes_key at {aes_key_file}')
def init_mapping(args):
#pprint(args)
if os.path.isfile(args.mapping_file):
logger.error(f'Mapping at {args.mapping_file} already exists. (aborting)')
sys.exit(1)
_init_mapping(args.priv_key_file, args.pub_key_file,
args.mapping_file, args.aes_key_file)
def _add_id(url, id, priv_key_file, mapping_file, cluster_acc, delete_date, aes_key_file, id_type='uid'):
mapping = encrypt.read_mapping(priv_key_file, mapping_file, aes_key_file)
if url not in mapping:
mapping[url] = {"uid": {}, "pid": {}}
if id in mapping[url][id_type]:
logger.error(f"Mapping for project={id} at gitlab instance={url} already present (aborting)")
sys.exit(1)
id_dict = {"acc": cluster_acc, "delete": delete_date}
mapping[url][id_type][id] = id_dict
encrypt.write_mapping(mapping, priv_key_file, mapping_file, aes_key_file)
logger.info(f'Added CI access for url={url}, id_type={id_type}, id={id}, acc={cluster_acc}, delete={delete_date} at mapping={mapping_file}')
def add(args):
if args.gitlab_user_id:
_add_id(args.gitlab_url, args.gitlab_user_id, args.priv_key_file, args.mapping_file,
args.cluster_account_name, args.delete_date, args.aes_key_file, 'uid')
if args.gitlab_project_id:
_add_id(args.gitlab_url, args.gitlab_project_id, args.priv_key_file, args.mapping_file,
args.cluster_account_name, args.delete_date, args.aes_key_file, 'pid')
if not args.gitlab_user_id and not args.gitlab_project_id:
logger.debug(f'Could not add due to missing user or project id')
def _remove_id(url, id, priv_key_file, mapping_file, aes_key_file, id_type='uid'):
try:
mapping = encrypt.read_mapping(priv_key_file, mapping_file, aes_key_file)
cluster_account = mapping[url][id_type][id]["acc"]
del mapping[url][id_type][id]
encrypt.write_mapping(mapping, priv_key_file, mapping_file, aes_key_file)
logger.info(f"Removed CI access for cluster account={cluster_account}")
except KeyError as e:
logger.error(f'Could not remove id={id}({id_type}) from mapping={mapping_file}. (aborting)')
sys.exit(1)
def _remove_url(url, priv_key_file, mapping_file, aes_key_file):
try:
mapping = encrypt.read_mapping(priv_key_file, mapping_file, aes_key_file)
del mapping[url]
encrypt.write_mapping(mapping, priv_key_file, mapping_file, aes_key_file)
logger.info(f"Removed CI access for url={url}")
except KeyError as e:
logger.error(f'Could not remove url={url} from mapping={mapping_file} (aborting)')
sys.exit(1)
def remove(args):
if args.gitlab_user_id:
_remove_id(args.gitlab_url, args.gitlab_user_id, args.priv_key_file, args.mapping_file,
args.aes_key_file, 'uid')
if args.gitlab_project_id:
_remove_id(args.gitlab_url, args.gitlab_project_id, args.priv_key_file, args.mapping_file,
args.aes_key_file, 'pid')
if not args.gitlab_user_id and not args.gitlab_project_id:
_remove_url(args.gitlab_url, args.priv_key_file, args.mapping_file, args.aes_key_file)
def _get_mapping(args):
return encrypt.read_mapping(args.priv_key_file, args.mapping_file, args.aes_key_file)
def _id_present(args, id, id_type='uid'):
try:
encrypt.read_mapping(args.priv_key_file, args.mapping_file, args.aes_key_file)[args.gitlab_url][id_type][id]
return True
except:
return False
def print_mapping(args):
pprint(_get_mapping(args))
def _setup_logging(level, filename, std=True):
global logger
logger = logging.getLogger(__file__)
logger.setLevel(level)
file_handler = logging.FileHandler(filename, encoding='utf-8')
formatter = logging.Formatter('[%(asctime)s] %(levelname)s:%(message)s', datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(formatter)
file_handler.setLevel(level)
if std:
std_handler = logging.StreamHandler()
std_handler.setLevel(level)
logger.addHandler(std_handler)
logger.addHandler(file_handler)
if __name__ == '__main__':
args = CLI(sys.argv[1:])
_setup_logging(getattr(logging, args.log_level.upper(), 20), args.log_file)
args.func(args)
\ No newline at end of file
......@@ -4,29 +4,7 @@ import os
import sys
import subprocess
def add_parameters(parser, param_list):
for p in param_list:
if p.get("action"):
parser.add_argument(
*p.get("flags"),
required=p.get("required"),
default=p.get("default"),
action=p.get("action"),
help=p.get("help")
)
else:
parser.add_argument(
*p.get("flags"),
required=p.get("required"),
type=p.get("type"),
choices=p.get("choices"),
nargs=p.get("nargs"),
default=p.get("default"),
const=p.get("const"),
metavar=p.get("metavar"),
help=p.get("help")
)
import core.utility.cli as cli
# Define Commandline interface
......@@ -185,13 +163,13 @@ def CLI():
}
parser = argparse.ArgumentParser(prog='Aixcellenz CI Driver Installer')
add_parameters(parser, parameters['global'])
cli.add_parameters(parser, parameters['global'])
subparsers = parser.add_subparsers(dest='cmd_name', help='sub-command help')
for cmd in subcommands:
subcmd_parser = subparsers.add_parser(cmd, help=f'{cmd} help', aliases=subcommands[cmd]['aliases'])
subcmd_parser.set_defaults(func=subcommands[cmd]['f'])
add_parameters(subcmd_parser, parameters[cmd])
cli.add_parameters(subcmd_parser, parameters[cmd])
ret = parser.parse_args()
ret.install_path = os.path.abspath(os.path.expandvars(os.path.expanduser(ret.install_path)))
......
import argparse
import os.path
import string
import random
import os
import sys
import core.authentication.EncryptionManager as encrypt
# Define Commandline interface
def CLI():
parameters = [
{
"flags": ['-c', '--create'],
"action": "store_true",
"help": "Create a key pair and a mapping file."
},
{
"flags": ['-rm', '--remove-mapping'],
"action": "store_true",
"help": "Remove a uid/pid from the mapping file."
},
{
"flags": ['-rmu', '--remove-url'],
"action": "store_true",
"help": "Remove a url from the mapping file."
},
{
"flags": ['-add', '--add-mapping'],
"action": "store_true",
"help": "Remove a uid/pid and cluster account to the mapping file."
},
{
"flags": ['-url', '--gitlab-url'],
"type": str,
"metavar": "<pub>",
"help": "Path to the public key file."
},
{
"flags": ['-pub', '--public-key-path'],
"type": str,
"metavar": "<pub>",
"help": "Path to the public key file."
},
{
"flags": ["-priv", "--private-key-path"],
"type": str,
"metavar": "<priv>",
"help": "Path to the private key file."
},
{
"flags": ["-map", "--mapping-path"],
"type": str,
"metavar": "<mapping>",
"help": "Path to the account mapping file."
},
{
"flags": ["-acc", "--cluster-account-name"],
"type": str,
"metavar": "<acc>",
"help": "Name of the assigned cluster account."
},
{
"flags": ["-uid", "--gitlab-user-id"],
"type": str,
"metavar": "<uid>",
"help": "The gitlab uid of the assigned user."
},
{
"flags": ["-pid", "--gitlab-project-id"],
"type": str,
"metavar": "<pid>",
"help": "The gitlab pid of the assigned project."
},
{
"flags": ["-del", "--delete-date"],
"type": str,
"metavar": "<del>",
"help": "The deletion date of the coupling."
},
{
"flags": ["-aes", "--aes-encryption-key-path"],
"type": str,
"metavar": "<aes>",
"help": "Path to the AES key."
},
]
parser = argparse.ArgumentParser()
for p in parameters:
if p.get("action"):
parser.add_argument(
*p.get("flags"),
required=p.get("required"),
default=p.get("default"),
action=p.get("action"),
help=p.get("help")
)
else:
parser.add_argument(
*p.get("flags"),
required=p.get("required"),
type=p.get("type"),
choices=p.get("choices"),
nargs=p.get("nargs"),
default=p.get("default"),
const=p.get("const"),
metavar=p.get("metavar"),
help=p.get("help")
)
return parser.parse_args()
def get_random_string(length: int) -> str:
# choose from all lowercase letter
letters = string.ascii_letters + string.digits
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str
def create_mapping(priv_key_path, pub_key_path, map_path, AES_key):
encrypt.create_keys(priv_key_path, pub_key_path)
with open(map_path, "w") as text_file:
text_file.write('')
string = get_random_string(16)
encrypt.set_AES_key(string, AES_key, pub_key_path)
def add_user_account(url, uid, priv_key_path, pub_key_path, map_path, cluster_acc, delete_date, AES_key):
mapping = encrypt.read_mapping(priv_key_path, map_path, AES_key)
if url not in mapping:
mapping[url] = {"uid": {}, "pid": {}}
if uid in mapping[url]["uid"]:
print(f"Mapping for user: {uid} at gitlab instance: {url} already present.")
sys.exit(1)
uid_dict = {"acc": cluster_acc, "delete": delete_date}
mapping[url]["uid"][uid] = uid_dict
encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key)
def add_project_account(url, pid, priv_key_path, pub_key_path, map_path, cluster_acc, delete_date, AES_key):
mapping = encrypt.read_mapping(priv_key_path, map_path, AES_key)
if url not in mapping:
mapping[url] = {"uid": {}, "pid": {}}
if pid in mapping[url]["pid"]:
print(f"Mapping for project: {pid} at gitlab instance: {url} already present.")
sys.exit(1)
pid_dict = {"acc": cluster_acc, "delete": delete_date}
mapping[url]["pid"][pid] = pid_dict
encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key)
def remove_user_account(url, uid, priv_key_path, pub_key_path, map_path, AES_key):
mapping = encrypt.read_mapping(priv_key_path, map_path, AES_key)
cluster_account = mapping[url]["uid"][uid]["acc"]
del mapping[url]["uid"][uid]
encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key)
print(f"Removed CI access for cluster account: {cluster_account}")
def remove_project_account(url, pid, priv_key_path, pub_key_path, map_path, AES_key):
mapping = encrypt.read_mapping(priv_key_path, map_path, AES_key)
cluster_account = mapping[url]["pid"][pid]["acc"]
del mapping[url]["pid"][pid]
encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key)
print(f"Removed CI access for cluster account: {cluster_account}")
def remove_url(url, priv_key_path, pub_key_path, map_path, AES_key):
mapping = encrypt.read_mapping(priv_key_path, map_path, AES_key)
removed_serv_accounts = []
for x in mapping[url]["pid"]:
removed_serv_accounts.append(mapping[url]["pid"][x]["acc"])
for x in mapping[url]["uid"]:
removed_serv_accounts.append(mapping[url]["uid"][x]["acc"])
del mapping[url]
encrypt.write_mapping(mapping, priv_key_path, map_path, AES_key)
print(f"Removed CI access for cluster accounts: {removed_serv_accounts}")
def run():
args = CLI()
if args.remove_mapping and args.add_mapping and args.remove_url:
print("Remove(-url) and add cannot be used in the same call.")
sys.exit(1)
if args.gitlab_user_id is not None and args.gitlab_project_id is not None:
print("Gitlab project id and gitlab user id cannot be provided in the same call.")
sys.exit(1)
if args.create:
if args.private_key_path is None or args.public_key_path is None or args.mapping_path is None or args.aes_encryption_key_path is None:
print("Arguments for private/public key and mapping path must be provided.")
sys.exit(1)
else:
create_mapping(os.path.abspath(args.private_key_path), os.path.abspath(args.public_key_path),
os.path.abspath(args.mapping_path), os.path.abspath(args.aes_encryption_key_path))
if args.remove_mapping:
if args.gitlab_url is None:
print("Arguments for gitlab url must be provided.")
sys.exit(1)
if args.gitlab_project_id is not None:
remove_project_account(args.gitlab_url, args.gitlab_project_id, os.path.abspath(args.private_key_path),
os.path.abspath(args.public_key_path),
os.path.abspath(args.mapping_path), os.path.abspath(args.aes_encryption_key_path))
elif args.gitlab_user_id is not None:
remove_user_account(args.gitlab_url, args.gitlab_user_id, os.path.abspath(args.private_key_path),
os.path.abspath(args.public_key_path),
os.path.abspath(args.mapping_path), os.path.abspath(args.aes_encryption_key_path))
else:
print("Arguments for gitlab project id or gitlab user id must be provided.")
sys.exit(1)
if args.remove_url:
if args.gitlab_url is None:
print("Argument for gitlab url must be provided.")
sys.exit(1)
else:
remove_url(args.gitlab_url, os.path.abspath(args.private_key_path), os.path.abspath(args.public_key_path),
os.path.abspath(args.mapping_path), os.path.abspath(args.aes_encryption_key_path))
if args.add_mapping:
if args.gitlab_url is None:
print("Argument for gitlab url must be provided.")
sys.exit(1)
if args.cluster_account_name is None:
print("Argument for cluster account name must be provided.")
sys.exit(1)
if args.delete_date is None:
print("Argument for delete date must be provided.")
sys.exit(1)
if args.gitlab_project_id is not None:
add_project_account(args.gitlab_url, args.gitlab_project_id, os.path.abspath(args.private_key_path),
os.path.abspath(args.public_key_path), os.path.abspath(args.mapping_path),
args.cluster_account_name, args.delete_date,
os.path.abspath(args.aes_encryption_key_path))
elif args.gitlab_user_id is not None:
add_user_account(args.gitlab_url, args.gitlab_user_id, os.path.abspath(args.private_key_path),
os.path.abspath(args.public_key_path), os.path.abspath(args.mapping_path),
args.cluster_account_name, args.delete_date, os.path.abspath(args.aes_encryption_key_path))
else:
print("Arguments for gitlab project id or gitlab user id must be provided.")
sys.exit(1)
run()
\ No newline at end of file
......@@ -9,14 +9,12 @@ from Crypto.Cipher import AES
def load_priv_key(path):
path = os.path.join(os.path.dirname(__file__), path)
with open(path, mode='rb') as private_file:
key_data = private_file.read()
return rsa.PrivateKey.load_pkcs1(key_data)
def load_pub_key(path):
path = os.path.join(os.path.dirname(__file__), path)
with open(path, mode='rb') as public_file:
key_data = public_file.read()
return rsa.PublicKey.load_pkcs1(key_data)
......
def add_parameters(parser, param_list):
for p in param_list:
if p.get("action"):
parser.add_argument(
*p.get("flags"),
required=p.get("required"),
default=p.get("default"),
action=p.get("action"),
help=p.get("help")
)
else:
parser.add_argument(
*p.get("flags"),
required=p.get("required"),
type=p.get("type"),
choices=p.get("choices"),
nargs=p.get("nargs"),
default=p.get("default"),
const=p.get("const"),
metavar=p.get("metavar"),
help=p.get("help")
)
import shutil
import tempfile
import os
import unittest
import AccountManager as manager
class TestAccManager(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.global_args = [f'--base-path={self.test_dir}']
manager._setup_logging('DEBUG', f'{self.test_dir}/history.log', std=False)
def _run(self, command, command_opts=[]):
self.args = manager.CLI(self.global_args + [command] + command_opts)
self.args.func(self.args)
def _init(self):
self._run('init')
def _add_id(self, id, url, accname, id_type='uid'):
s_type = 'user' if id_type=='uid' else 'project'
self._run('add', [f'--gitlab-{s_type}-id={id}', f'--gitlab-url={url}', f'--cluster-account-name={accname}'])
def _rm_id(self, id, url, id_type='uid'):
s_type = 'user' if id_type=='uid' else 'project'
self._run('remove', [f'--gitlab-{s_type}-id={id}', f'--gitlab-url={url}'])
def _rm_url(self, url):
self._run('remove', [f'--gitlab-url={url}'])
def tearDown(self):
# with open(f'{self.test_dir}/history.log', 'r') as history:
# print(history.readlines())
shutil.rmtree(self.test_dir)
class TestInit(TestAccManager):
def test_init(self):
self._init()
files = (self.args.priv_key_file, self.args.pub_key_file,
self.args.mapping_file, self.args.aes_key_file)
for file in files:
self.assertTrue(os.path.isfile(file))
class TestAdd(TestAccManager):
def _test_add_id(self, id='0', url='gitlab.com', accname='john', id_type='uid'):
self._init()
self._add_id(id, url, accname, id_type)
self.assertTrue(manager._get_mapping(self.args)[url][id_type][id]['acc'] == accname)
def test_add_uid(self):
self._test_add_id()
def test_add_pid(self):
self._test_add_id(id_type='pid')
class TestRemove(TestAccManager):
def _test_rm_id(self, id='0', url='gitlab.com', accname='john', id_type='uid'):
self._init()
self._add_id(id, url, accname, id_type)
self.assertTrue(manager._get_mapping(self.args)[url][id_type][id]['acc'] == accname)
self._rm_id(id, url, id_type)
self.assertFalse(manager._id_present(self.args, id, id_type))
def test_remove_uid(self):
self._test_rm_id()
def test_remove_pid(self):
self._test_rm_id(id_type='pid')
if __name__ == '__main__':
unittest.main()
unittest:
stage: unittest
tags: ['docker']
needs: []
image: ubuntu:22.04
before_script:
- export DEBIAN_FRONTEND=noninteractive
- apt-get update
- apt-get install -y --no-install-recommends python3 python3-pip
- pip3 install -r requirements.txt
- pip3 install pytest
script:
- python3 -m pytest -o junit_suite_name=acc_manager --junitxml=report.xml test/account_manager
artifacts:
when: always
reports:
junit: report.xml
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment