switch to JSON mode of cli
pass around logger
This commit is contained in:
		| @@ -9,12 +9,12 @@ from utils.utils import command_wrapper, unlock_bw | ||||
| def bitwarden_signin(logger, **kwargs): | ||||
|     if 'BW_HOST' in os.environ: | ||||
|         try: | ||||
|             command_wrapper(f"config server {os.getenv('BW_HOST')}") | ||||
|             command_wrapper(logger, f"config server {os.getenv('BW_HOST')}") | ||||
|         except BaseException: | ||||
|             logger.warn("Revieved none zero exit code from server config") | ||||
|             logger.warn("Received non-zero exit code from server config") | ||||
|             logger.warn("This is expected from startup") | ||||
|             pass | ||||
|     else: | ||||
|         logger.info("BW_HOST not set. Assuming SaaS installation") | ||||
|     command_wrapper("login --apikey") | ||||
|     command_wrapper(logger, "login --apikey") | ||||
|     unlock_bw(logger) | ||||
|   | ||||
| @@ -46,7 +46,7 @@ def create_managed_registry_secret(spec, name, namespace, logger, **kwargs): | ||||
|  | ||||
|     unlock_bw(logger) | ||||
|     logger.info(f"Locking up secret with ID: {id}") | ||||
|     secret_json_object = json.loads(get_secret_from_bitwarden(id)) | ||||
|     secret_json_object = get_secret_from_bitwarden(logger, id) | ||||
|  | ||||
|     api = kubernetes.client.CoreV1Api() | ||||
|  | ||||
| @@ -118,7 +118,7 @@ def update_managed_registry_secret( | ||||
|  | ||||
|     unlock_bw(logger) | ||||
|     logger.info(f"Locking up secret with ID: {id}") | ||||
|     secret_json_object = json.loads(get_secret_from_bitwarden(id)) | ||||
|     secret_json_object = get_secret_from_bitwarden(logger, id) | ||||
|  | ||||
|     api = kubernetes.client.CoreV1Api() | ||||
|  | ||||
|   | ||||
| @@ -45,7 +45,7 @@ def create_managed_secret(spec, name, namespace, logger, body, **kwargs): | ||||
|  | ||||
|     unlock_bw(logger) | ||||
|     logger.info(f"Locking up secret with ID: {id}") | ||||
|     secret_json_object = json.loads(get_secret_from_bitwarden(id)) | ||||
|     secret_json_object = get_secret_from_bitwarden(logger, id) | ||||
|  | ||||
|     api = kubernetes.client.CoreV1Api() | ||||
|  | ||||
| @@ -106,7 +106,7 @@ def update_managed_secret( | ||||
|  | ||||
|     unlock_bw(logger) | ||||
|     logger.info(f"Locking up secret with ID: {id}") | ||||
|     secret_json_object = json.loads(get_secret_from_bitwarden(id)) | ||||
|     secret_json_object = get_secret_from_bitwarden(logger, id) | ||||
|  | ||||
|     api = kubernetes.client.CoreV1Api() | ||||
|  | ||||
|   | ||||
| @@ -4,7 +4,7 @@ from utils.utils import get_secret_from_bitwarden, parse_fields_scope, parse_log | ||||
|  | ||||
|  | ||||
| def bitwarden_lookup(id, scope, field): | ||||
|     _secret_json = json.loads(get_secret_from_bitwarden(id)) | ||||
|     _secret_json = get_secret_from_bitwarden(None, id) | ||||
|     if scope == "login": | ||||
|         return parse_login_scope(_secret_json, field) | ||||
|     if scope == "fields": | ||||
|   | ||||
| @@ -7,44 +7,47 @@ class BitwardenCommandException(Exception): | ||||
|     pass | ||||
|  | ||||
|  | ||||
| def get_secret_from_bitwarden(id): | ||||
|     return command_wrapper(command=f"get item {id}") | ||||
| def get_secret_from_bitwarden(logger, id): | ||||
|     return command_wrapper(logger, command=f"get item {id}") | ||||
|  | ||||
|  | ||||
| def unlock_bw(logger): | ||||
|     status_output = command_wrapper("status") | ||||
|     status = json.loads(status_output)['status'] | ||||
|     status_output = command_wrapper(logger, "status", False) | ||||
|     status = status_output['data']['template']['status'] | ||||
|     if status == 'unlocked': | ||||
|         logger.info("Already unlocked") | ||||
|         return | ||||
|     token_output = command_wrapper("unlock --passwordenv BW_PASSWORD") | ||||
|     tokens = token_output.split('"')[1::2] | ||||
|     os.environ["BW_SESSION"] = tokens[1] | ||||
|     token_output = command_wrapper(logger, "unlock --passwordenv BW_PASSWORD") | ||||
|     os.environ["BW_SESSION"] = token_output["data"]["raw"] | ||||
|     logger.info("Signin successful. Session exported") | ||||
|  | ||||
|  | ||||
| def command_wrapper(command): | ||||
| def command_wrapper(logger, command, use_success: bool = True): | ||||
|     system_env = dict(os.environ) | ||||
|     sp = subprocess.Popen( | ||||
|         [f"bw {command}"], | ||||
|         [f"bw --response {command}"], | ||||
|         stdout=subprocess.PIPE, | ||||
|         stderr=subprocess.PIPE, | ||||
|         close_fds=True, | ||||
|         shell=True, | ||||
|         env=system_env) | ||||
|     out, err = sp.communicate() | ||||
|     if err: | ||||
|         raise BitwardenCommandException(err) | ||||
|     return out.decode(encoding='UTF-8') | ||||
|     resp = json.loads(out.decode(encoding='UTF-8')) | ||||
|     if os.environ["DEBUG"] != None: | ||||
|         logger.info(resp) | ||||
|     if resp["success"] != None and (not use_success or (use_success and resp["success"] == True)): | ||||
|         return resp | ||||
|     logger.warn(resp) | ||||
|     return None | ||||
|  | ||||
|  | ||||
| def parse_login_scope(secret_json, key): | ||||
|     return secret_json["login"][key] | ||||
|     return secret_json["data"]["login"][key] | ||||
|  | ||||
|  | ||||
| def parse_fields_scope(secret_json, key): | ||||
|     if "fields" not in secret_json: | ||||
|         return None | ||||
|     for entry in secret_json["fields"]: | ||||
|     for entry in secret_json["data"]["fields"]: | ||||
|         if entry['name'] == key: | ||||
|             return entry['value'] | ||||
|   | ||||
		Reference in New Issue
	
	Block a user