11"""Custom Resource to setup SRA Config resources in the organization.
22
3- Version: 0.1
3+ Version: 1.0
44
55Config module for SRA in the repo, https://github.com/aws-samples/aws-security-reference-architecture-examples
66
1414import os
1515from time import sleep
1616
17- # import re
18- # from time import sleep
19- from typing import TYPE_CHECKING
20-
21- # , Literal, Optional, Sequence, Union
17+ from typing import TYPE_CHECKING , Literal , Optional
18+ from typing import cast
2219
2320import boto3
2421from botocore .config import Config
3330 from mypy_boto3_cloudformation import CloudFormationClient
3431 from mypy_boto3_organizations import OrganizationsClient
3532 from mypy_boto3_config import ConfigServiceClient
33+ from mypy_boto3_config .type_defs import DescribeConfigRulesResponseTypeDef , ConfigRuleTypeDef , ScopeTypeDef
3634 from mypy_boto3_iam .client import IAMClient
3735 from mypy_boto3_iam .type_defs import CreatePolicyResponseTypeDef , CreateRoleResponseTypeDef , EmptyResponseMetadataTypeDef
3836
@@ -54,13 +52,13 @@ class sra_config:
5452 LOGGER .exception (UNEXPECTED )
5553 raise ValueError ("Unexpected error executing Lambda function. Review CloudWatch logs for details." ) from None
5654
57- def get_organization_config_rules (self ):
55+ def get_organization_config_rules (self ) -> dict :
5856 """Get Organization Config Rules."""
5957 # Get the Organization ID
6058 org_id : str = self .ORG_CLIENT .describe_organization ()["Organization" ]["Id" ]
6159
6260 # Get the Organization Config Rules
63- response = self .ORG_CLIENT .describe_organization_config_rules (
61+ response = self .ORG_CLIENT .describe_organization_config_rules ( # type: ignore
6462 OrganizationConfigRuleNames = ["sra_config_rule" ],
6563 OrganizationId = org_id ,
6664 )
@@ -71,13 +69,13 @@ def get_organization_config_rules(self):
7169 # Return the response
7270 return response
7371
74- def put_organization_config_rule (self ):
72+ def put_organization_config_rule (self ) -> dict :
7573 """Put Organization Config Rule."""
7674 # Get the Organization ID
7775 org_id : str = self .ORG_CLIENT .describe_organization ()["Organization" ]["Id" ]
7876
7977 # Put the Organization Config Rule
80- response = self .ORG_CLIENT .put_organization_config_rule (
78+ response = self .ORG_CLIENT .put_organization_config_rule ( # type: ignore
8179 OrganizationConfigRuleName = "sra_config_rule" ,
8280 OrganizationId = org_id ,
8381 ConfigRuleName = "sra_config_rule" ,
@@ -89,7 +87,7 @@ def put_organization_config_rule(self):
8987 # Return the response
9088 return response
9189
92- def find_config_rule (self , rule_name ) :
90+ def find_config_rule (self , rule_name : str ) -> tuple [ bool , DescribeConfigRulesResponseTypeDef ] :
9391 """Get config rule
9492
9593 Args:
@@ -111,7 +109,7 @@ def find_config_rule(self, rule_name):
111109 except ClientError as e :
112110 if e .response ["Error" ]["Code" ] == "NoSuchConfigRuleException" :
113111 self .LOGGER .info (f"No such config rule: { rule_name } " )
114- return False , {}
112+ return False , cast ( DescribeConfigRulesResponseTypeDef , {})
115113 else :
116114 self .LOGGER .info (f"Unexpected error: { e } " )
117115 raise e
@@ -120,28 +118,29 @@ def find_config_rule(self, rule_name):
120118 return True , response
121119
122120
123- def create_config_rule (self , rule_name , lambda_arn , max_frequency , owner , description , input_params , eval_mode , solution_name , scope = {}):
121+ def create_config_rule (self , rule_name : str , lambda_arn : str ,
122+ max_frequency : Literal ["One_Hour" , "Three_Hours" , "Six_Hours" , "Twelve_Hours" , "TwentyFour_Hours" ],
123+ owner : Literal ["CUSTOM_LAMBDA" , "AWS" ], description : str , input_params : dict ,
124+ eval_mode : Literal ["DETECTIVE" , "PROACTIVE" ], solution_name : str , scope : dict = {}) -> None :
124125 """Create Config Rule."""
125- # Create the Config Rule
126- response = self .CONFIG_CLIENT .put_config_rule (
126+ self .CONFIG_CLIENT .put_config_rule (
127127 ConfigRule = {
128128 "ConfigRuleName" : rule_name ,
129129 "Description" : description ,
130- "Scope" : scope ,
130+ "Scope" : cast ( ScopeTypeDef , scope ) ,
131131 "Source" : {
132132 "Owner" : owner ,
133133 "SourceIdentifier" : lambda_arn ,
134134 "SourceDetails" : [
135135 {
136136 "EventSource" : "aws.config" ,
137- # TODO(liamschn): does messagetype need to be a parameter
137+ # TODO(liamschn): does messagetype need to be a parameter?
138138 "MessageType" : "ScheduledNotification" ,
139139 "MaximumExecutionFrequency" : max_frequency ,
140140 }
141141 ],
142142 },
143143 "InputParameters" : json .dumps (input_params ),
144- # "MaximumExecutionFrequency": max_frequency,
145144 "EvaluationModes" : [
146145 {
147146 'Mode' : eval_mode
@@ -152,12 +151,9 @@ def create_config_rule(self, rule_name, lambda_arn, max_frequency, owner, descri
152151 )
153152
154153 # Log the response
155- sra_config .LOGGER .info (response )
156-
157- # Return the response
158- return response
154+ self .LOGGER .info (f"{ rule_name } config rule created..." )
159155
160- def delete_config_rule (self , rule_name ) :
156+ def delete_config_rule (self , rule_name : str ) -> None :
161157 """Delete Config Rule."""
162158 # Delete the Config Rule
163159 try :
@@ -166,7 +162,7 @@ def delete_config_rule(self, rule_name):
166162 )
167163
168164 # Log the response
169- sra_config .LOGGER .info (f"Deleted { rule_name } config rule succeeded." )
165+ self .LOGGER .info (f"Deleted { rule_name } config rule succeeded." )
170166 except ClientError as e :
171167 if e .response ["Error" ]["Code" ] == "NoSuchConfigRuleException" :
172168 self .LOGGER .info (f"No such config rule: { rule_name } " )
0 commit comments