import kopf import base64 import kubernetes import json from utils.utils import unlock_bw, bw_sync_interval from lookups.bitwarden_lookup import BitwardenLookupHandler from jinja2 import Environment, BaseLoader def render_template(logger, template): jinja_template = Environment(loader=BaseLoader()).from_string(template) jinja_template.globals.update({ "bitwarden_lookup": BitwardenLookupHandler(logger).bitwarden_lookup, }) return jinja_template.render() def create_template_secret(logger, secret, filename, template): = {}[filename] = str( base64.b64encode( render_template(logger, template).encode("utf-8")), "utf-8") return secret @kopf.on.create('') def create_managed_secret(spec, name, namespace, logger, body, **kwargs): template = spec.get('template') filename = spec.get('filename') secret_name = spec.get('name') secret_namespace = spec.get('namespace') labels = spec.get('labels') custom_annotations = spec.get('annotations') custom_secret_type = spec.get('secretType') unlock_bw(logger) api = kubernetes.client.CoreV1Api() annotations = { "managed": "", "managedObject": f"{namespace}/{name}" } if custom_annotations: annotations.update(custom_annotations) if not custom_secret_type: custom_secret_type = 'Opaque' if not labels: labels = {} secret = kubernetes.client.V1Secret() secret.metadata = kubernetes.client.V1ObjectMeta( name=secret_name, annotations=annotations, labels=labels) secret.type = custom_secret_type secret = create_template_secret(logger, secret, filename, template) # Garbage collection will delete the generated secret if the owner # Is not in the same namespace as the generated secret if secret_namespace == namespace: kopf.append_owner_reference(secret) api.create_namespaced_secret( secret_namespace, secret )"Secret {secret_namespace}/{secret_name} has been created") @kopf.on.update('') @kopf.timer('', interval=bw_sync_interval) def update_managed_secret( spec, status, name, namespace, logger, body, **kwargs): template = spec.get('template') filename = spec.get('filename') secret_name = spec.get('name') secret_namespace = spec.get('namespace') labels = spec.get('labels') custom_annotations = spec.get('annotations') custom_secret_type = spec.get('secretType') if not custom_secret_type: custom_secret_type = 'Opaque' old_config = None old_secret_name = None old_secret_namespace = None old_secret_type = None if '' in body.metadata.annotations: old_config = json.loads( body.metadata.annotations['']) old_secret_name = old_config['spec'].get('name') old_secret_namespace = old_config['spec'].get('namespace') old_secret_type = old_config['spec'].get('secretType') secret_name = spec.get('name') secret_namespace = spec.get('namespace') if not old_secret_type: old_secret_type = 'Opaque' if old_config is not None and ( old_secret_name != secret_name or old_secret_namespace != secret_namespace or old_secret_type != custom_secret_type): # If the name of the secret or the namespace of the secret is different # We have to delete the secret an recreate it"Secret name or namespace changed, let's recreate it") delete_managed_secret( old_config['spec'], name, namespace, logger, **kwargs) create_managed_secret(spec, name, namespace, logger, body, **kwargs) return unlock_bw(logger) api = kubernetes.client.CoreV1Api() annotations = { "managed": "", "managedObject": f"{namespace}/{name}" } if custom_annotations: annotations.update(custom_annotations) if not labels: labels = {} secret = kubernetes.client.V1Secret() secret.metadata = kubernetes.client.V1ObjectMeta( name=secret_name, annotations=annotations, labels=labels) secret.type = custom_secret_type secret = create_template_secret(logger, secret, filename, template) # Garbage collection will delete the generated secret if the owner # Is not in the same namespace as the generated secret if secret_namespace == namespace: kopf.append_owner_reference(secret) try: api.replace_namespaced_secret( name=secret_name, body=secret, namespace="{}".format(secret_namespace)) f"Secret {secret_namespace}/{secret_name} has been updated") except BaseException as e: logger.warn( f"Could not update secret {secret_namespace}/{secret_name}!") logger.warn( f"Exception: {e}" ) @kopf.on.delete('') def delete_managed_secret(spec, name, namespace, logger, **kwargs): secret_name = spec.get('name') secret_namespace = spec.get('namespace') api = kubernetes.client.CoreV1Api() try: api.delete_namespaced_secret(secret_name, secret_namespace) f"Secret {secret_namespace}/{secret_name} has been deleted") except BaseException: logger.warn( f"Could not delete secret {secret_namespace}/{secret_name}!")