Skip to content

Commit f7d3dde

Browse files
committed
fix flake8 issues in config rules
1 parent 2ae9582 commit f7d3dde

File tree

1 file changed

+30
-9
lines changed
  • aws_sra_examples/solutions/genai/bedrock_org/lambda/rules/sra_bedrock_check_invocation_log_cloudwatch

1 file changed

+30
-9
lines changed

aws_sra_examples/solutions/genai/bedrock_org/lambda/rules/sra_bedrock_check_invocation_log_cloudwatch/app.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
"""Config rule to check invocation log for Bedrock environemts.
2+
3+
Version: 1.0
4+
5+
Config rule for SRA in the repo, https://github.com/aws-samples/aws-security-reference-architecture-examples
6+
7+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
8+
SPDX-License-Identifier: MIT-0
9+
"""
110
from typing import Any
211
import boto3
312
import json
@@ -18,9 +27,16 @@
1827
config_client = boto3.client('config', region_name=AWS_REGION)
1928
logs_client = boto3.client('logs', region_name=AWS_REGION)
2029

21-
def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]:
22-
"""Evaluates if Bedrock Model Invocation Logging is properly configured for CloudWatch"""
23-
30+
31+
def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]: # noqa: CFQ004
32+
"""Evaluate if Bedrock Model Invocation Logging is properly configured for CloudWatch.
33+
34+
Args:
35+
rule_parameters (dict): Rule parameters from AWS Config rule.
36+
37+
Returns:
38+
tuple[str, str]: Compliance type and annotation message.
39+
"""
2440
# Parse rule parameters
2541
params = json.loads(json.dumps(rule_parameters)) if rule_parameters else {}
2642
check_retention = params.get('check_retention', 'true').lower() == 'true'
@@ -31,7 +47,6 @@ def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]:
3147
LOGGER.info(f"Bedrock get_model_invocation_logging_configuration response: {response}")
3248
logging_config = response.get('loggingConfig', {})
3349
LOGGER.info(f"Bedrock Model Invocation Logging Configuration: {logging_config}")
34-
3550
cloudwatch_config = logging_config.get('cloudWatchConfig', {})
3651
LOGGER.info(f"Bedrock Model Invocation config: {cloudwatch_config}")
3752
log_group_name = cloudwatch_config.get('logGroupName', "")
@@ -54,22 +69,28 @@ def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]:
5469

5570
if issues:
5671
return 'NON_COMPLIANT', f"CloudWatch logging enabled but {', '.join(issues)}"
57-
else:
58-
return 'COMPLIANT', f"CloudWatch logging properly configured for Bedrock Model Invocation Logging. Log Group: {log_group_name}"
72+
return 'COMPLIANT', f"CloudWatch logging properly configured for Bedrock Model Invocation Logging. Log Group: {log_group_name}"
5973

6074
except Exception as e:
6175
LOGGER.error(f"Error evaluating Bedrock Model Invocation Logging configuration: {str(e)}")
6276
return 'INSUFFICIENT_DATA', f"Error evaluating compliance: {str(e)}"
6377

64-
def lambda_handler(event: dict, context: Any) -> None:
78+
79+
def lambda_handler(event: dict, context: Any) -> None: # noqa: U100
80+
"""Lambda handler.
81+
82+
Args:
83+
event (dict): Lambda event object
84+
context (Any): Lambda context object
85+
"""
6586
LOGGER.info('Evaluating compliance for AWS Config rule')
6687
LOGGER.info(f"Event: {json.dumps(event)}")
6788

6889
invoking_event = json.loads(event['invokingEvent'])
6990
rule_parameters = json.loads(event['ruleParameters']) if 'ruleParameters' in event else {}
7091

7192
compliance_type, annotation = evaluate_compliance(rule_parameters)
72-
93+
7394
evaluation = {
7495
'ComplianceResourceType': 'AWS::::Account',
7596
'ComplianceResourceId': event['accountId'],
@@ -86,4 +107,4 @@ def lambda_handler(event: dict, context: Any) -> None:
86107
ResultToken=event['resultToken']
87108
)
88109

89-
LOGGER.info("Compliance evaluation complete.")
110+
LOGGER.info("Compliance evaluation complete.")

0 commit comments

Comments
 (0)