1- """Custom Resource to setup SRA IAM resources in the management account.
1+ """Lambda module to setup SRA KMS resources in the management account.
22
33Version: 1.0
44
2929import json
3030
3131
32- class sra_kms:
32+ class SRAKMS:
33+ """Class to represent SRA KMS resources."""
34+
3335 # Setup Default Logger
3436 LOGGER = logging.getLogger(__name__)
3537 log_level: str = os.environ.get("LOG_LEVEL", "INFO")
@@ -55,12 +57,12 @@ class sra_kms:
5557 raise ValueError("Unexpected error executing Lambda function. Review CloudWatch logs for details.") from None
5658
5759 def create_kms_key(self, kms_client: KMSClient, key_policy: str, description: str = "Key description") -> str:
58- """Create KMS key
60+ """Create KMS key.
5961
6062 Args:
6163 kms_client (KMSClient): KMS boto3 client
62- key_policy (dict ): key policy
63- description (str, optional ): Description of KMS key. Defaults to "Key description".
64+ key_policy (str ): key policy
65+ description (str): Description of KMS key. Defaults to "Key description".
6466
6567 Returns:
6668 str: KMS key id
@@ -75,18 +77,47 @@ def create_kms_key(self, kms_client: KMSClient, key_policy: str, description: st
7577 return key_response["KeyMetadata"]["KeyId"]
7678
7779 def create_alias(self, kms_client: KMSClient, alias_name: str, target_key_id: str) -> None:
80+ """Create KMS alias.
81+
82+ Args:
83+ kms_client (KMSClient): KMS boto3 client
84+ alias_name (str): KMS alias name
85+ target_key_id (str): KMS key id
86+ """
7887 self.LOGGER.info(f"Create KMS alias: {alias_name}")
7988 kms_client.create_alias(AliasName=alias_name, TargetKeyId=target_key_id)
8089
8190 def delete_alias(self, kms_client: KMSClient, alias_name: str) -> None:
91+ """Delete KMS alias.
92+
93+ Args:
94+ kms_client (KMSClient): KMS boto3 client
95+ alias_name (str): KMS alias name
96+ """
8297 self.LOGGER.info(f"Delete KMS alias: {alias_name}")
8398 kms_client.delete_alias(AliasName=alias_name)
8499
85100 def schedule_key_deletion(self, kms_client: KMSClient, key_id: str, pending_window_in_days: int = 30) -> None:
101+ """Schedule KMS key deletion.
102+
103+ Args:
104+ kms_client (KMSClient): KMS boto3 client
105+ key_id (str): KMS key id
106+ pending_window_in_days (int): Number of days to wait before deleting the key. Defaults to 30.
107+ """
86108 self.LOGGER.info(f"Schedule deletion of key: {key_id} in {pending_window_in_days} days")
87109 kms_client.schedule_key_deletion(KeyId=key_id, PendingWindowInDays=pending_window_in_days)
88110
89111 def search_key_policies(self, kms_client: KMSClient, key_policy: str) -> tuple[bool, str]:
112+ """Search KMS keys for a specific policy.
113+
114+ Args:
115+ kms_client (KMSClient): KMS boto3 client
116+ key_policy (str): key policy
117+
118+ Returns:
119+ tuple[bool, str]: True if policy is found, False if not found
120+ """
90121 for key in self.list_all_keys(kms_client):
91122 self.LOGGER.info(f"Examining state of key: {key['KeyId']}")
92123 if kms_client.describe_key(KeyId=key["KeyId"])["KeyMetadata"]["KeyState"] != "Enabled":
@@ -103,20 +134,45 @@ def search_key_policies(self, kms_client: KMSClient, key_policy: str) -> tuple[b
103134 self.LOGGER.info(f"Key policy match found for key {key['KeyId']} policy {policy}: {policy_body}")
104135 self.LOGGER.info(f"Attempted to match to: {expected_key_policy}")
105136 return True, key["KeyId"]
106- else:
107- self.LOGGER.info(f"No key policy match found for key {key['KeyId']} policy {policy}: {policy_body}")
108- self.LOGGER.info(f"Attempted to match to: {expected_key_policy}")
137+ self.LOGGER.info(f"No key policy match found for key {key['KeyId']} policy {policy}: {policy_body}")
138+ self.LOGGER.info(f"Attempted to match to: {expected_key_policy}")
109139 return False, "None"
110140
111141 def list_key_policies(self, kms_client: KMSClient, key_id: str) -> list:
142+ """List KMS key policies.
143+
144+ Args:
145+ kms_client (KMSClient): KMS boto3 client
146+ key_id (str): KMS key id
147+
148+ Returns:
149+ list: list of KMS key policies
150+ """
112151 response = kms_client.list_key_policies(KeyId=key_id)
113152 return response["PolicyNames"]
114153
115154 def list_all_keys(self, kms_client: KMSClient) -> list:
155+ """List all KMS keys.
156+
157+ Args:
158+ kms_client (KMSClient): KMS boto3 client
159+
160+ Returns:
161+ list: list of KMS keys
162+ """
116163 response = kms_client.list_keys()
117164 return response["Keys"]
118165
119166 def check_key_exists(self, kms_client: KMSClient, key_id: str) -> tuple[bool, DescribeKeyResponseTypeDef]:
167+ """Check if a KMS key exists.
168+
169+ Args:
170+ kms_client (KMSClient): KMS boto3 client
171+ key_id (str): KMS key id
172+
173+ Returns:
174+ tuple[bool, DescribeKeyResponseTypeDef]: True if key exists, False otherwise, and key description
175+ """
120176 try:
121177 response: DescribeKeyResponseTypeDef = kms_client.describe_key(KeyId=key_id)
122178 return True, response
@@ -127,7 +183,7 @@ def check_alias_exists(self, kms_client: KMSClient, alias_name: str) -> tuple[bo
127183 """Check if an alias exists in KMS.
128184
129185 Args:
130- kms_client (kms_client ): KMS boto3 client
186+ kms_client (KMSClient ): KMS boto3 client
131187 alias_name (str): alias name to check for
132188
133189 Returns:
0 commit comments