1+ """Config rule to check knowledge base data ingestion encryption for Bedrock environments.
2+
3+ Version: 1.0
4+
5+ Config rule for SRA in the repo, https://github.com/aws-samples/aws-security-reference-architecture-examples
6+
7+ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
8+ SPDX-License-Identifier: MIT-0
9+ """
10+ import json
11+ import logging
12+ import os
13+ from typing import Any
14+
15+ import boto3
16+ from botocore .exceptions import ClientError
17+
18+ # Setup Default Logger
19+ LOGGER = logging .getLogger (__name__ )
20+ log_level = os .environ .get ("LOG_LEVEL" , logging .INFO )
21+ LOGGER .setLevel (log_level )
22+ LOGGER .info (f"boto3 version: { boto3 .__version__ } " )
23+
24+ # Get AWS region from environment variable
25+ AWS_REGION = os .environ .get ("AWS_REGION" )
26+
27+ # Initialize AWS clients
28+ bedrock_agent_client = boto3 .client ("bedrock-agent" , region_name = AWS_REGION )
29+ config_client = boto3 .client ("config" , region_name = AWS_REGION )
30+
31+ def evaluate_compliance (rule_parameters : dict ) -> tuple [str , str ]:
32+ """Evaluate if Bedrock Knowledge Base data sources are encrypted with KMS.
33+
34+ Args:
35+ rule_parameters (dict): Rule parameters from AWS Config rule.
36+
37+ Returns:
38+ tuple[str, str]: Compliance type and annotation message.
39+ """
40+ try :
41+ # List all knowledge bases
42+ non_compliant_kbs = []
43+ paginator = bedrock_agent_client .get_paginator ("list_knowledge_bases" )
44+
45+ for page in paginator .paginate ():
46+ for kb in page ["knowledgeBaseSummaries" ]:
47+ kb_id = kb ["knowledgeBaseId" ]
48+ kb_name = kb .get ("name" , kb_id )
49+
50+ # Get data sources for each knowledge base
51+ try :
52+ data_sources = bedrock_agent_client .list_data_sources (
53+ knowledgeBaseId = kb_id
54+ )
55+
56+ # Check if any data source is not encrypted
57+ unencrypted_sources = []
58+ for source in data_sources .get ("dataSourceSummaries" , []):
59+ if not source .get ("serverSideEncryptionConfiguration" , {}).get ("kmsKeyArn" ):
60+ unencrypted_sources .append (source .get ("name" , source ["dataSourceId" ]))
61+
62+ if unencrypted_sources :
63+ non_compliant_kbs .append (f"{ kb_name } (unencrypted sources: { ', ' .join (unencrypted_sources )} )" )
64+
65+ except ClientError as e :
66+ LOGGER .error (f"Error checking data sources for knowledge base { kb_name } : { str (e )} " )
67+ if e .response ["Error" ]["Code" ] == "AccessDeniedException" :
68+ non_compliant_kbs .append (f"{ kb_name } (access denied)" )
69+ else :
70+ raise
71+
72+ if non_compliant_kbs :
73+ return "NON_COMPLIANT" , f"The following knowledge bases have unencrypted data sources: { '; ' .join (non_compliant_kbs )} "
74+ return "COMPLIANT" , "All knowledge base data sources are encrypted with KMS"
75+
76+ except Exception as e :
77+ LOGGER .error (f"Error evaluating Bedrock Knowledge Base encryption: { str (e )} " )
78+ return "ERROR" , f"Error evaluating compliance: { str (e )} "
79+
80+ def lambda_handler (event : dict , context : Any ) -> None : # noqa: U100
81+ """Lambda handler.
82+
83+ Args:
84+ event (dict): Lambda event object
85+ context (Any): Lambda context object
86+ """
87+ LOGGER .info ("Evaluating compliance for AWS Config rule" )
88+ LOGGER .info (f"Event: { json .dumps (event )} " )
89+
90+ invoking_event = json .loads (event ["invokingEvent" ])
91+ rule_parameters = json .loads (event ["ruleParameters" ]) if "ruleParameters" in event else {}
92+
93+ compliance_type , annotation = evaluate_compliance (rule_parameters )
94+
95+ evaluation = {
96+ "ComplianceResourceType" : "AWS::::Account" ,
97+ "ComplianceResourceId" : event ["accountId" ],
98+ "ComplianceType" : compliance_type ,
99+ "Annotation" : annotation ,
100+ "OrderingTimestamp" : invoking_event ["notificationCreationTime" ],
101+ }
102+
103+ LOGGER .info (f"Compliance evaluation result: { compliance_type } " )
104+ LOGGER .info (f"Annotation: { annotation } " )
105+
106+ config_client .put_evaluations (Evaluations = [evaluation ], ResultToken = event ["resultToken" ]) # type: ignore
107+
108+ LOGGER .info ("Compliance evaluation complete." )
0 commit comments