diff --git a/Dockerfile b/Dockerfile index 747d259..be2cca3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ RUN apk add wget unzip RUN cd /tmp && wget https://github.com/bitwarden/clients/releases/download/cli-v${BW_VERSION}/bw-linux-${BW_VERSION}.zip && \ unzip /tmp/bw-linux-${BW_VERSION}.zip -FROM alpine:3.17.2 +FROM alpine:3.17.3 ARG PYTHON_VERSION=3.10.10-r0 ARG PIP_VERSION=22.3.1-r1 diff --git a/src/bitwardenCrdOperator.py b/src/bitwardenCrdOperator.py index bb1ce72..5e11bef 100755 --- a/src/bitwardenCrdOperator.py +++ b/src/bitwardenCrdOperator.py @@ -1,20 +1,20 @@ #!/usr/bin/env python3 -import kopf import os +import kopf from utils.utils import command_wrapper, unlock_bw + @kopf.on.startup() def bitwarden_signin(logger, **kwargs): if 'BW_HOST' in os.environ: try: command_wrapper(f"config server {os.getenv('BW_HOST')}") - except: + except BaseException: logger.warn("Revieved none zero exit code from server config") logger.warn("This is expected from startup") pass else: - logger.info(f"BW_HOST not set. Assuming SaaS installation") + logger.info("BW_HOST not set. Assuming SaaS installation") command_wrapper("login --apikey") unlock_bw(logger) - diff --git a/src/dockerlogin.py b/src/dockerlogin.py index aef2c59..1ad0f3c 100644 --- a/src/dockerlogin.py +++ b/src/dockerlogin.py @@ -5,7 +5,14 @@ import json from utils.utils import unlock_bw, get_secret_from_bitwarden -def create_dockerlogin(logger, secret, secret_json, username_ref, password_ref, registry): + +def create_dockerlogin( + logger, + secret, + secret_json, + username_ref, + password_ref, + registry): secret.type = "dockerconfigjson" secret.data = {} auths_dict = {} @@ -15,14 +22,19 @@ def create_dockerlogin(logger, secret, secret_json, username_ref, password_ref, _username = secret_json["login"][username_ref] logger.info(f"Creating login with username: {_username}") _password = secret_json["login"][password_ref] - cred_field = str(base64.b64encode(f"{_username}:{_password}".encode("utf-8")), "utf-8") + cred_field = str( + base64.b64encode( + f"{_username}:{_password}".encode("utf-8")), + "utf-8") reg_auth_dict["auth"] = cred_field registry_dict[registry] = reg_auth_dict auths_dict["auths"] = registry_dict - secret.data[".dockerconfigjson"] = str(base64.b64encode(json.dumps(auths_dict).encode("utf-8")), "utf-8") + secret.data[".dockerconfigjson"] = str(base64.b64encode( + json.dumps(auths_dict).encode("utf-8")), "utf-8") return secret + @kopf.on.create('registry-credential.lerentis.uploadfilter24.eu') def create_managed_registry_secret(spec, name, namespace, logger, **kwargs): username_ref = spec.get('usernameRef') @@ -43,18 +55,34 @@ def create_managed_registry_secret(spec, name, namespace, logger, **kwargs): "managedObject": f"{namespace}/{name}" } secret = kubernetes.client.V1Secret() - secret.metadata = kubernetes.client.V1ObjectMeta(name=secret_name, annotations=annotations) - secret = create_dockerlogin(logger, secret, secret_json_object, username_ref, password_ref, registry) + secret.metadata = kubernetes.client.V1ObjectMeta( + name=secret_name, annotations=annotations) + secret = create_dockerlogin( + logger, + secret, + secret_json_object, + username_ref, + password_ref, + registry) obj = api.create_namespaced_secret( secret_namespace, secret ) - logger.info(f"Registry Secret {secret_namespace}/{secret_name} has been created") + logger.info( + f"Registry Secret {secret_namespace}/{secret_name} has been created") + @kopf.on.update('registry-credential.lerentis.uploadfilter24.eu') @kopf.timer('registry-credential.lerentis.uploadfilter24.eu', interval=900) -def update_managed_registry_secret(spec, status, name, namespace, logger, body, **kwargs): +def update_managed_registry_secret( + spec, + status, + name, + namespace, + logger, + body, + **kwargs): username_ref = spec.get('usernameRef') password_ref = spec.get('passwordRef') @@ -63,22 +91,28 @@ def update_managed_registry_secret(spec, status, name, namespace, logger, body, secret_name = spec.get('name') secret_namespace = spec.get('namespace') - old_config = None old_secret_name = None old_secret_namespace = None if 'kopf.zalando.org/last-handled-configuration' in body.metadata.annotations: - old_config = json.loads(body.metadata.annotations['kopf.zalando.org/last-handled-configuration']) + old_config = json.loads( + body.metadata.annotations['kopf.zalando.org/last-handled-configuration']) old_secret_name = old_config['spec'].get('name') old_secret_namespace = old_config['spec'].get('namespace') secret_name = spec.get('name') secret_namespace = spec.get('namespace') - if old_config is not None and (old_secret_name != secret_name or old_secret_namespace != secret_namespace): + if old_config is not None and ( + old_secret_name != secret_name or old_secret_namespace != secret_namespace): # If the name of the secret or the namespace of the secret is different # We have to delete the secret an recreate it logger.info("Secret name or namespace changed, let's recreate it") - delete_managed_secret(old_config['spec'], name, namespace, logger, **kwargs) + delete_managed_secret( + old_config['spec'], + name, + namespace, + logger, + **kwargs) create_managed_registry_secret(spec, name, namespace, logger, **kwargs) return @@ -93,15 +127,23 @@ def update_managed_registry_secret(spec, status, name, namespace, logger, body, "managedObject": f"{namespace}/{name}" } secret = kubernetes.client.V1Secret() - secret.metadata = kubernetes.client.V1ObjectMeta(name=secret_name, annotations=annotations) - secret = create_dockerlogin(logger, secret, secret_json_object, username_ref, password_ref, registry) + secret.metadata = kubernetes.client.V1ObjectMeta( + name=secret_name, annotations=annotations) + secret = create_dockerlogin( + logger, + secret, + secret_json_object, + username_ref, + password_ref, + registry) try: obj = api.replace_namespaced_secret( name=secret_name, body=secret, namespace="{}".format(secret_namespace)) - logger.info(f"Secret {secret_namespace}/{secret_name} has been updated") - except: + logger.info( + f"Secret {secret_namespace}/{secret_name} has been updated") + except BaseException: logger.warn( f"Could not update secret {secret_namespace}/{secret_name}!") @@ -114,6 +156,8 @@ def delete_managed_secret(spec, name, namespace, logger, **kwargs): try: api.delete_namespaced_secret(secret_name, secret_namespace) - logger.info(f"Secret {secret_namespace}/{secret_name} has been deleted") - except: - logger.warn(f"Could not delete secret {secret_namespace}/{secret_name}!") + logger.info( + f"Secret {secret_namespace}/{secret_name} has been deleted") + except BaseException: + logger.warn( + f"Could not delete secret {secret_namespace}/{secret_name}!") diff --git a/src/kv.py b/src/kv.py index 5428a55..17104e3 100644 --- a/src/kv.py +++ b/src/kv.py @@ -21,13 +21,15 @@ def create_kv(secret, secret_json, content_def): if _secret_scope == "login": value = parse_login_scope(secret_json, _secret_key) if value is None: - raise Exception(f"Field {_secret_key} has no value in bitwarden secret") + raise Exception( + f"Field {_secret_key} has no value in bitwarden secret") secret.data[_secret_ref] = str(base64.b64encode( value.encode("utf-8")), "utf-8") if _secret_scope == "fields": value = parse_fields_scope(secret_json, _secret_key) if value is None: - raise Exception(f"Field {_secret_key} has no value in bitwarden secret") + raise Exception( + f"Field {_secret_key} has no value in bitwarden secret") secret.data[_secret_ref] = str(base64.b64encode( value.encode("utf-8")), "utf-8") return secret @@ -66,7 +68,14 @@ def create_managed_secret(spec, name, namespace, logger, body, **kwargs): @kopf.on.update('bitwarden-secret.lerentis.uploadfilter24.eu') @kopf.timer('bitwarden-secret.lerentis.uploadfilter24.eu', interval=900) -def update_managed_secret(spec, status, name, namespace, logger, body, **kwargs): +def update_managed_secret( + spec, + status, + name, + namespace, + logger, + body, + **kwargs): content_def = body['spec']['content'] id = spec.get('id') @@ -74,17 +83,24 @@ def update_managed_secret(spec, status, name, namespace, logger, body, **kwargs) old_secret_name = None old_secret_namespace = None if 'kopf.zalando.org/last-handled-configuration' in body.metadata.annotations: - old_config = json.loads(body.metadata.annotations['kopf.zalando.org/last-handled-configuration']) + old_config = json.loads( + body.metadata.annotations['kopf.zalando.org/last-handled-configuration']) old_secret_name = old_config['spec'].get('name') old_secret_namespace = old_config['spec'].get('namespace') secret_name = spec.get('name') secret_namespace = spec.get('namespace') - if old_config is not None and (old_secret_name != secret_name or old_secret_namespace != secret_namespace): + if old_config is not None and ( + old_secret_name != secret_name or old_secret_namespace != secret_namespace): # If the name of the secret or the namespace of the secret is different # We have to delete the secret an recreate it logger.info("Secret name or namespace changed, let's recreate it") - delete_managed_secret(old_config['spec'], name, namespace, logger, **kwargs) + delete_managed_secret( + old_config['spec'], + name, + namespace, + logger, + **kwargs) create_managed_secret(spec, name, namespace, logger, body, **kwargs) return @@ -109,8 +125,9 @@ def update_managed_secret(spec, status, name, namespace, logger, body, **kwargs) name=secret_name, body=secret, namespace="{}".format(secret_namespace)) - logger.info(f"Secret {secret_namespace}/{secret_name} has been updated") - except: + logger.info( + f"Secret {secret_namespace}/{secret_name} has been updated") + except BaseException: logger.warn( f"Could not update secret {secret_namespace}/{secret_name}!") @@ -125,6 +142,6 @@ def delete_managed_secret(spec, name, namespace, logger, **kwargs): api.delete_namespaced_secret(secret_name, secret_namespace) logger.info( f"Secret {secret_namespace}/{secret_name} has been deleted") - except: + except BaseException: logger.warn( f"Could not delete secret {secret_namespace}/{secret_name}!") diff --git a/src/lookups/bitwarden_lookup.py b/src/lookups/bitwarden_lookup.py index d15adb0..8a6965b 100644 --- a/src/lookups/bitwarden_lookup.py +++ b/src/lookups/bitwarden_lookup.py @@ -2,9 +2,10 @@ import json from utils.utils import get_secret_from_bitwarden, parse_fields_scope, parse_login_scope + def bitwarden_lookup(id, scope, field): _secret_json = json.loads(get_secret_from_bitwarden(id)) if scope == "login": return parse_login_scope(_secret_json, field) if scope == "fields": - return parse_fields_scope(_secret_json, field) \ No newline at end of file + return parse_fields_scope(_secret_json, field) diff --git a/src/template.py b/src/template.py index 983fa62..8951d72 100644 --- a/src/template.py +++ b/src/template.py @@ -12,17 +12,23 @@ lookup_func_dict = { "bitwarden_lookup": bitwarden_lookup, } + def render_template(template): jinja_template = Environment(loader=BaseLoader()).from_string(template) jinja_template.globals.update(lookup_func_dict) return jinja_template.render() + def create_template_secret(secret, filename, template): secret.type = "Opaque" secret.data = {} - secret.data[filename] = str(base64.b64encode(render_template(template).encode("utf-8")), "utf-8") + secret.data[filename] = str( + base64.b64encode( + render_template(template).encode("utf-8")), + "utf-8") return secret + @kopf.on.create('bitwarden-template.lerentis.uploadfilter24.eu') def create_managed_secret(spec, name, namespace, logger, body, **kwargs): @@ -40,7 +46,8 @@ def create_managed_secret(spec, name, namespace, logger, body, **kwargs): "managedObject": f"{namespace}/{name}" } secret = kubernetes.client.V1Secret() - secret.metadata = kubernetes.client.V1ObjectMeta(name=secret_name, annotations=annotations) + secret.metadata = kubernetes.client.V1ObjectMeta( + name=secret_name, annotations=annotations) secret = create_template_secret(secret, filename, template) obj = api.create_namespaced_secret( @@ -49,9 +56,17 @@ def create_managed_secret(spec, name, namespace, logger, body, **kwargs): logger.info(f"Secret {secret_namespace}/{secret_name} has been created") + @kopf.on.update('bitwarden-template.lerentis.uploadfilter24.eu') @kopf.timer('bitwarden-template.lerentis.uploadfilter24.eu', interval=900) -def update_managed_secret(spec, status, name, namespace, logger, body, **kwargs): +def update_managed_secret( + spec, + status, + name, + namespace, + logger, + body, + **kwargs): template = spec.get('template') filename = spec.get('filename') @@ -62,17 +77,24 @@ def update_managed_secret(spec, status, name, namespace, logger, body, **kwargs) old_secret_name = None old_secret_namespace = None if 'kopf.zalando.org/last-handled-configuration' in body.metadata.annotations: - old_config = json.loads(body.metadata.annotations['kopf.zalando.org/last-handled-configuration']) + old_config = json.loads( + body.metadata.annotations['kopf.zalando.org/last-handled-configuration']) old_secret_name = old_config['spec'].get('name') old_secret_namespace = old_config['spec'].get('namespace') secret_name = spec.get('name') secret_namespace = spec.get('namespace') - if old_config is not None and (old_secret_name != secret_name or old_secret_namespace != secret_namespace): + if old_config is not None and ( + old_secret_name != secret_name or old_secret_namespace != secret_namespace): # If the name of the secret or the namespace of the secret is different # We have to delete the secret an recreate it logger.info("Secret name or namespace changed, let's recreate it") - delete_managed_secret(old_config['spec'], name, namespace, logger, **kwargs) + delete_managed_secret( + old_config['spec'], + name, + namespace, + logger, + **kwargs) create_managed_secret(spec, name, namespace, logger, body, **kwargs) return @@ -85,7 +107,8 @@ def update_managed_secret(spec, status, name, namespace, logger, body, **kwargs) "managedObject": f"{namespace}/{name}" } secret = kubernetes.client.V1Secret() - secret.metadata = kubernetes.client.V1ObjectMeta(name=secret_name, annotations=annotations) + secret.metadata = kubernetes.client.V1ObjectMeta( + name=secret_name, annotations=annotations) secret = create_template_secret(secret, filename, template) try: @@ -93,11 +116,13 @@ def update_managed_secret(spec, status, name, namespace, logger, body, **kwargs) name=secret_name, body=secret, namespace="{}".format(secret_namespace)) - logger.info(f"Secret {secret_namespace}/{secret_name} has been updated") - except: + logger.info( + f"Secret {secret_namespace}/{secret_name} has been updated") + except BaseException: logger.warn( f"Could not update secret {secret_namespace}/{secret_name}!") + @kopf.on.delete('bitwarden-template.lerentis.uploadfilter24.eu') def delete_managed_secret(spec, name, namespace, logger, **kwargs): secret_name = spec.get('name') @@ -106,6 +131,8 @@ def delete_managed_secret(spec, name, namespace, logger, **kwargs): try: api.delete_namespaced_secret(secret_name, secret_namespace) - logger.info(f"Secret {secret_namespace}/{secret_name} has been deleted") - except: - logger.warn(f"Could not delete secret {secret_namespace}/{secret_name}!") + logger.info( + f"Secret {secret_namespace}/{secret_name} has been deleted") + except BaseException: + logger.warn( + f"Could not delete secret {secret_namespace}/{secret_name}!") diff --git a/src/utils/utils.py b/src/utils/utils.py index 3d4a69c..65f4e0a 100644 --- a/src/utils/utils.py +++ b/src/utils/utils.py @@ -2,12 +2,15 @@ import os import json import subprocess + class BitwardenCommandException(Exception): pass + def get_secret_from_bitwarden(id): return command_wrapper(command=f"get item {id}") + def unlock_bw(logger): status_output = command_wrapper("status") status = json.loads(status_output)['status'] @@ -19,17 +22,26 @@ def unlock_bw(logger): os.environ["BW_SESSION"] = tokens[1] logger.info("Signin successful. Session exported") + def command_wrapper(command): system_env = dict(os.environ) - sp = subprocess.Popen([f"bw {command}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, shell=True, env=system_env) + sp = subprocess.Popen( + [f"bw {command}"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True, + shell=True, + env=system_env) out, err = sp.communicate() if err: raise BitwardenCommandException(err) return out.decode(encoding='UTF-8') + def parse_login_scope(secret_json, key): return secret_json["login"][key] + def parse_fields_scope(secret_json, key): if "fields" not in secret_json: return None