1+ """Config rule to check knowledge base S3 bucket configuration for Bedrock environments.
2+
3+ Version: 1.0
4+
5+ Config rule for SRA in the repo, https://github.com/aws-samples/aws-security-reference-architecture-examples
6+
7+ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
8+ SPDX-License-Identifier: MIT-0
9+ """
10+ import json
11+ import logging
12+ import os
13+ from typing import Any
14+
15+ import boto3
16+ from botocore .exceptions import ClientError
17+
18+ # Setup Default Logger
19+ LOGGER = logging .getLogger (__name__ )
20+ log_level = os .environ .get ("LOG_LEVEL" , logging .INFO )
21+ LOGGER .setLevel (log_level )
22+ LOGGER .info (f"boto3 version: { boto3 .__version__ } " )
23+
24+ # Get AWS region from environment variable
25+ AWS_REGION = os .environ .get ("AWS_REGION" )
26+
27+ # Initialize AWS clients
28+ bedrock_agent_client = boto3 .client ("bedrock-agent" , region_name = AWS_REGION )
29+ s3_client = boto3 .client ("s3" , region_name = AWS_REGION )
30+ config_client = boto3 .client ("config" , region_name = AWS_REGION )
31+
32+ def evaluate_compliance (rule_parameters : dict ) -> tuple [str , str ]:
33+ """Evaluate if Bedrock Knowledge Base S3 bucket has required configurations.
34+
35+ Args:
36+ rule_parameters (dict): Rule parameters from AWS Config rule.
37+
38+ Returns:
39+ tuple[str, str]: Compliance status and annotation
40+ """
41+ try :
42+ # Get all knowledge bases
43+ non_compliant_buckets = []
44+ paginator = bedrock_agent_client .get_paginator ("list_knowledge_bases" )
45+
46+ for page in paginator .paginate ():
47+ for kb in page ["knowledgeBaseSummaries" ]:
48+ kb_details = bedrock_agent_client .get_knowledge_base (knowledgeBaseId = kb ["knowledgeBaseId" ])
49+ data_source = bedrock_agent_client .get_data_source (
50+ knowledgeBaseId = kb ["knowledgeBaseId" ],
51+ dataSourceId = kb_details ["dataSource" ]["dataSourceId" ]
52+ )
53+
54+ # Extract bucket name from S3 path
55+ s3_path = data_source ["configuration" ]["s3Configuration" ]["bucketName" ]
56+ bucket_name = s3_path .split ("/" )[0 ]
57+
58+ issues = []
59+
60+ # Check retention
61+ if rule_parameters .get ("check_retention" , "true" ).lower () == "true" :
62+ try :
63+ lifecycle = s3_client .get_bucket_lifecycle_configuration (Bucket = bucket_name )
64+ if not any (rule .get ("Expiration" ) for rule in lifecycle .get ("Rules" , [])):
65+ issues .append ("retention" )
66+ except ClientError as e :
67+ if e .response ["Error" ]["Code" ] == "NoSuchLifecycleConfiguration" :
68+ issues .append ("retention" )
69+
70+ # Check encryption
71+ if rule_parameters .get ("check_encryption" , "true" ).lower () == "true" :
72+ try :
73+ encryption = s3_client .get_bucket_encryption (Bucket = bucket_name )
74+ if not encryption .get ("ServerSideEncryptionConfiguration" ):
75+ issues .append ("encryption" )
76+ except ClientError :
77+ issues .append ("encryption" )
78+
79+ # Check server access logging
80+ if rule_parameters .get ("check_access_logging" , "true" ).lower () == "true" :
81+ logging_config = s3_client .get_bucket_logging (Bucket = bucket_name )
82+ if not logging_config .get ("LoggingEnabled" ):
83+ issues .append ("access logging" )
84+
85+ # Check object lock
86+ if rule_parameters .get ("check_object_locking" , "true" ).lower () == "true" :
87+ try :
88+ lock_config = s3_client .get_bucket_object_lock_configuration (Bucket = bucket_name )
89+ if not lock_config .get ("ObjectLockConfiguration" ):
90+ issues .append ("object locking" )
91+ except ClientError :
92+ issues .append ("object locking" )
93+
94+ # Check versioning
95+ if rule_parameters .get ("check_versioning" , "true" ).lower () == "true" :
96+ versioning = s3_client .get_bucket_versioning (Bucket = bucket_name )
97+ if versioning .get ("Status" ) != "Enabled" :
98+ issues .append ("versioning" )
99+
100+ if issues :
101+ non_compliant_buckets .append (f"{ bucket_name } (missing: { ', ' .join (issues )} )" )
102+
103+ if non_compliant_buckets :
104+ return "NON_COMPLIANT" , f"The following KB S3 buckets are non-compliant: { '; ' .join (non_compliant_buckets )} "
105+ return "COMPLIANT" , "All Knowledge Base S3 buckets meet the required configurations"
106+
107+ except Exception as e :
108+ LOGGER .error (f"Error evaluating Knowledge Base S3 bucket configurations: { str (e )} " )
109+ return "ERROR" , f"Error evaluating compliance: { str (e )} "
110+
111+ def lambda_handler (event : dict , context : Any ) -> None :
112+ """Lambda handler.
113+
114+ Args:
115+ event (dict): Lambda event object
116+ context (Any): Lambda context object
117+ """
118+ LOGGER .info ("Evaluating compliance for AWS Config rule" )
119+ LOGGER .info (f"Event: { json .dumps (event )} " )
120+
121+ invoking_event = json .loads (event ["invokingEvent" ])
122+ rule_parameters = json .loads (event ["ruleParameters" ]) if "ruleParameters" in event else {}
123+
124+ compliance_type , annotation = evaluate_compliance (rule_parameters )
125+
126+ evaluation = {
127+ "ComplianceResourceType" : "AWS::::Account" ,
128+ "ComplianceResourceId" : event ["accountId" ],
129+ "ComplianceType" : compliance_type ,
130+ "Annotation" : annotation ,
131+ "OrderingTimestamp" : invoking_event ["notificationCreationTime" ],
132+ }
133+
134+ LOGGER .info (f"Compliance evaluation result: { compliance_type } " )
135+ LOGGER .info (f"Annotation: { annotation } " )
136+
137+ config_client .put_evaluations (Evaluations = [evaluation ], ResultToken = event ["resultToken" ])
138+
139+ LOGGER .info ("Compliance evaluation complete." )
0 commit comments