1+ """Config rule to check OpenSearch vector store encryption for Bedrock Knowledge Base.
2+
3+ Version: 1.0
4+
5+ Config rule for SRA in the repo, https://github.com/aws-samples/aws-security-reference-architecture-examples
6+
7+ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
8+ SPDX-License-Identifier: MIT-0
9+ """
10+ import json
11+ import logging
12+ import os
13+ from typing import Any
14+
15+ import boto3
16+ from botocore .exceptions import ClientError
17+
18+ # Setup Default Logger
19+ LOGGER = logging .getLogger (__name__ )
20+ log_level = os .environ .get ("LOG_LEVEL" , logging .INFO )
21+ LOGGER .setLevel (log_level )
22+ LOGGER .info (f"boto3 version: { boto3 .__version__ } " )
23+
24+ # Get AWS region from environment variable
25+ AWS_REGION = os .environ .get ("AWS_REGION" )
26+
27+ # Initialize AWS clients
28+ bedrock_agent_client = boto3 .client ("bedrock-agent" , region_name = AWS_REGION )
29+ opensearch_client = boto3 .client ("opensearch" , region_name = AWS_REGION )
30+ config_client = boto3 .client ("config" , region_name = AWS_REGION )
31+
32+ def evaluate_compliance (rule_parameters : dict ) -> tuple [str , str ]:
33+ """Evaluate if Bedrock Knowledge Base OpenSearch vector stores are encrypted with KMS CMK.
34+
35+ Args:
36+ rule_parameters (dict): Rule parameters from AWS Config rule.
37+
38+ Returns:
39+ tuple[str, str]: Compliance type and annotation message.
40+ """
41+ try :
42+ non_compliant_kbs = []
43+ paginator = bedrock_agent_client .get_paginator ("list_knowledge_bases" )
44+
45+ for page in paginator .paginate ():
46+ for kb in page ["knowledgeBaseSummaries" ]:
47+ kb_id = kb ["knowledgeBaseId" ]
48+ kb_name = kb .get ("name" , kb_id )
49+
50+ try :
51+ # Get knowledge base details
52+ kb_details = bedrock_agent_client .get_knowledge_base (knowledgeBaseId = kb_id )
53+ vector_store = kb_details .get ("vectorStoreConfiguration" )
54+
55+ if vector_store and vector_store .get ("vectorStoreType" ) == "OPENSEARCH" :
56+ # Extract OpenSearch domain information
57+ opensearch_config = vector_store .get ("opensearchServerlessConfiguration" ) or vector_store .get ("opensearchConfiguration" )
58+
59+ if not opensearch_config :
60+ non_compliant_kbs .append (f"{ kb_name } (missing OpenSearch configuration)" )
61+ continue
62+
63+ # Check if it's OpenSearch Serverless or standard OpenSearch
64+ if "collectionArn" in opensearch_config :
65+ # OpenSearch Serverless - always encrypted with AWS owned key at minimum
66+ collection_id = opensearch_config ["collectionArn" ].split ("/" )[- 1 ]
67+ try :
68+ collection = opensearch_client .get_security_policy (
69+ Name = collection_id ,
70+ Type = "encryption"
71+ )
72+ # Check if using customer managed key
73+ security_policy = collection .get ("securityPolicyDetail" , {})
74+ if security_policy .get ("Type" ) == "encryption" :
75+ encryption_policy = security_policy .get ("SecurityPolicies" , [])[0 ]
76+ kms_key_arn = encryption_policy .get ("KmsARN" , "" )
77+
78+ # If not using customer managed key
79+ if not kms_key_arn or "aws/opensearchserverless" in kms_key_arn :
80+ non_compliant_kbs .append (f"{ kb_name } (OpenSearch Serverless not using CMK)" )
81+ except ClientError as e :
82+ LOGGER .error (f"Error checking OpenSearch Serverless collection: { str (e )} " )
83+ non_compliant_kbs .append (f"{ kb_name } (error checking OpenSearch Serverless)" )
84+ else :
85+ # Standard OpenSearch
86+ domain_endpoint = opensearch_config .get ("endpoint" , "" )
87+ if not domain_endpoint :
88+ non_compliant_kbs .append (f"{ kb_name } (missing OpenSearch domain endpoint)" )
89+ continue
90+
91+ # Extract domain name from endpoint
92+ domain_name = domain_endpoint .split ("." )[0 ]
93+
94+ try :
95+ domain = opensearch_client .describe_domain (DomainName = domain_name )
96+ encryption_config = domain .get ("DomainStatus" , {}).get ("EncryptionAtRestOptions" , {})
97+
98+ # Check if encryption is enabled and using CMK
99+ if not encryption_config .get ("Enabled" , False ):
100+ non_compliant_kbs .append (f"{ kb_name } (OpenSearch domain encryption not enabled)" )
101+ elif not encryption_config .get ("KmsKeyId" ):
102+ non_compliant_kbs .append (f"{ kb_name } (OpenSearch domain not using CMK)" )
103+ except ClientError as e :
104+ LOGGER .error (f"Error checking OpenSearch domain: { str (e )} " )
105+ non_compliant_kbs .append (f"{ kb_name } (error checking OpenSearch domain)" )
106+
107+ except ClientError as e :
108+ LOGGER .error (f"Error checking knowledge base { kb_id } : { str (e )} " )
109+ if e .response ["Error" ]["Code" ] == "AccessDeniedException" :
110+ non_compliant_kbs .append (f"{ kb_name } (access denied)" )
111+ else :
112+ raise
113+
114+ if non_compliant_kbs :
115+ return "NON_COMPLIANT" , f"The following knowledge bases have OpenSearch vector stores not encrypted with CMK: { '; ' .join (non_compliant_kbs )} "
116+ return "COMPLIANT" , "All knowledge base OpenSearch vector stores are encrypted with KMS CMK"
117+
118+ except Exception as e :
119+ LOGGER .error (f"Error evaluating Bedrock Knowledge Base OpenSearch encryption: { str (e )} " )
120+ return "ERROR" , f"Error evaluating compliance: { str (e )} "
121+
122+ def lambda_handler (event : dict , context : Any ) -> None :
123+ """Lambda handler.
124+
125+ Args:
126+ event (dict): Lambda event object
127+ context (Any): Lambda context object
128+ """
129+ LOGGER .info ("Evaluating compliance for AWS Config rule" )
130+ LOGGER .info (f"Event: { json .dumps (event )} " )
131+
132+ invoking_event = json .loads (event ["invokingEvent" ])
133+ rule_parameters = json .loads (event ["ruleParameters" ]) if "ruleParameters" in event else {}
134+
135+ compliance_type , annotation = evaluate_compliance (rule_parameters )
136+
137+ evaluation = {
138+ "ComplianceResourceType" : "AWS::::Account" ,
139+ "ComplianceResourceId" : event ["accountId" ],
140+ "ComplianceType" : compliance_type ,
141+ "Annotation" : annotation ,
142+ "OrderingTimestamp" : invoking_event ["notificationCreationTime" ],
143+ }
144+
145+ LOGGER .info (f"Compliance evaluation result: { compliance_type } " )
146+ LOGGER .info (f"Annotation: { annotation } " )
147+
148+ config_client .put_evaluations (Evaluations = [evaluation ], ResultToken = event ["resultToken" ])
149+
150+ LOGGER .info ("Compliance evaluation complete." )
0 commit comments