1+ """Config rule to check knowledge base vector store secret configuration for Bedrock environments.
2+
3+ Version: 1.0
4+
5+ Config rule for SRA in the repo, https://github.com/aws-samples/aws-security-reference-architecture-examples
6+
7+ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
8+ SPDX-License-Identifier: MIT-0
9+ """
10+ import json
11+ import logging
12+ import os
13+ from typing import Any
14+
15+ import boto3
16+ from botocore .exceptions import ClientError
17+
18+ # Setup Default Logger
19+ LOGGER = logging .getLogger (__name__ )
20+ log_level = os .environ .get ("LOG_LEVEL" , logging .INFO )
21+ LOGGER .setLevel (log_level )
22+ LOGGER .info (f"boto3 version: { boto3 .__version__ } " )
23+
24+ # Get AWS region from environment variable
25+ AWS_REGION = os .environ .get ("AWS_REGION" )
26+
27+ # Initialize AWS clients
28+ bedrock_agent_client = boto3 .client ("bedrock-agent" , region_name = AWS_REGION )
29+ secretsmanager_client = boto3 .client ("secretsmanager" , region_name = AWS_REGION )
30+ config_client = boto3 .client ("config" , region_name = AWS_REGION )
31+
32+ def evaluate_compliance (rule_parameters : dict ) -> tuple [str , str ]:
33+ """Evaluate if Bedrock Knowledge Base vector stores are using KMS encrypted secrets.
34+
35+ Args:
36+ rule_parameters (dict): Rule parameters from AWS Config rule.
37+
38+ Returns:
39+ tuple[str, str]: Compliance type and annotation message.
40+ """
41+ try :
42+ non_compliant_kbs = []
43+ paginator = bedrock_agent_client .get_paginator ("list_knowledge_bases" )
44+
45+ for page in paginator .paginate ():
46+ for kb in page ["knowledgeBaseSummaries" ]:
47+ kb_id = kb ["knowledgeBaseId" ]
48+ kb_name = kb .get ("name" , kb_id )
49+
50+ try :
51+ # Get knowledge base details
52+ kb_details = bedrock_agent_client .get_knowledge_base (knowledgeBaseId = kb_id )
53+ vector_store = kb_details .get ("vectorStoreConfiguration" )
54+
55+ if vector_store :
56+ secret_arn = vector_store .get ("secretArn" )
57+ if not secret_arn :
58+ non_compliant_kbs .append (f"{ kb_name } (no secret configured)" )
59+ continue
60+
61+ try :
62+ # Check if secret uses CMK
63+ secret_details = secretsmanager_client .describe_secret (SecretId = secret_arn )
64+ if not secret_details .get ("KmsKeyId" ):
65+ non_compliant_kbs .append (f"{ kb_name } (secret not using CMK)" )
66+ except ClientError as e :
67+ LOGGER .error (f"Error checking secret { secret_arn } : { str (e )} " )
68+ if e .response ["Error" ]["Code" ] == "AccessDeniedException" :
69+ non_compliant_kbs .append (f"{ kb_name } (secret access denied)" )
70+ else :
71+ raise
72+
73+ except ClientError as e :
74+ LOGGER .error (f"Error checking knowledge base { kb_name } : { str (e )} " )
75+ if e .response ["Error" ]["Code" ] == "AccessDeniedException" :
76+ non_compliant_kbs .append (f"{ kb_name } (access denied)" )
77+ else :
78+ raise
79+
80+ if non_compliant_kbs :
81+ return "NON_COMPLIANT" , f"The following knowledge bases have vector store secret issues: { '; ' .join (non_compliant_kbs )} "
82+ return "COMPLIANT" , "All knowledge base vector stores are using KMS encrypted secrets"
83+
84+ except Exception as e :
85+ LOGGER .error (f"Error evaluating Bedrock Knowledge Base vector store secrets: { str (e )} " )
86+ return "ERROR" , f"Error evaluating compliance: { str (e )} "
87+
88+ def lambda_handler (event : dict , context : Any ) -> None :
89+ """Lambda handler.
90+
91+ Args:
92+ event (dict): Lambda event object
93+ context (Any): Lambda context object
94+ """
95+ LOGGER .info ("Evaluating compliance for AWS Config rule" )
96+ LOGGER .info (f"Event: { json .dumps (event )} " )
97+
98+ invoking_event = json .loads (event ["invokingEvent" ])
99+ rule_parameters = json .loads (event ["ruleParameters" ]) if "ruleParameters" in event else {}
100+
101+ compliance_type , annotation = evaluate_compliance (rule_parameters )
102+
103+ evaluation = {
104+ "ComplianceResourceType" : "AWS::::Account" ,
105+ "ComplianceResourceId" : event ["accountId" ],
106+ "ComplianceType" : compliance_type ,
107+ "Annotation" : annotation ,
108+ "OrderingTimestamp" : invoking_event ["notificationCreationTime" ],
109+ }
110+
111+ LOGGER .info (f"Compliance evaluation result: { compliance_type } " )
112+ LOGGER .info (f"Annotation: { annotation } " )
113+
114+ config_client .put_evaluations (Evaluations = [evaluation ], ResultToken = event ["resultToken" ])
115+
116+ LOGGER .info ("Compliance evaluation complete." )
0 commit comments