1010import json
1111import logging
1212import os
13- from typing import Any
13+ from typing import Any , Optional , Tuple
1414
1515import boto3
16- from botocore .exceptions import ClientError
1716
1817# Setup Default Logger
1918LOGGER = logging .getLogger (__name__ )
2726# Initialize AWS clients
2827bedrock_agent_client = boto3 .client ("bedrock-agent" , region_name = AWS_REGION )
2928config_client = boto3 .client ("config" , region_name = AWS_REGION )
29+ logs_client = boto3 .client ("logs" , region_name = AWS_REGION )
30+ sts_client = boto3 .client ("sts" , region_name = AWS_REGION )
31+
32+
33+ def check_kb_logging (kb_id : str ) -> Tuple [bool , Optional [str ]]:
34+ """Check if knowledge base has CloudWatch logging enabled.
35+
36+ Args:
37+ kb_id (str): Knowledge base ID
38+
39+ Returns:
40+ Tuple[bool, Optional[str]]: (True if logging is enabled, destination type if found)
41+ """
42+ try :
43+ account_id = sts_client .get_caller_identity ()["Account" ]
44+ kb_arn = f"arn:aws:bedrock:{ AWS_REGION } :{ account_id } :knowledge-base/{ kb_id } "
45+ LOGGER .info (f"Checking logging for KB ARN: { kb_arn } " )
46+
47+ # Get delivery sources
48+ delivery_sources = logs_client .describe_delivery_sources ()
49+ LOGGER .info (f"Found { len (delivery_sources .get ('deliverySources' , []))} delivery sources" )
50+
51+ for source in delivery_sources .get ("deliverySources" , []):
52+ LOGGER .info (f"Checking source: { source .get ('name' )} " )
53+ if kb_arn in source .get ("resourceArns" , []):
54+ source_name = source .get ("name" )
55+ LOGGER .info (f"Found matching source name: { source_name } " )
56+ if not source_name :
57+ continue
58+
59+ # Get deliveries to find the delivery ID
60+ LOGGER .info ("Calling describe_deliveries API" )
61+ deliveries = logs_client .describe_deliveries ()
62+ LOGGER .info (f"Found { len (deliveries .get ('deliveries' , []))} deliveries" )
63+
64+ for delivery in deliveries .get ("deliveries" , []):
65+ LOGGER .info (f"Checking delivery: { delivery .get ('id' )} with source name: { delivery .get ('deliverySourceName' )} " )
66+ if delivery .get ("deliverySourceName" ) == source_name :
67+ delivery_id = delivery .get ("id" )
68+ LOGGER .info (f"Found matching delivery ID: { delivery_id } " )
69+ if not delivery_id :
70+ continue
71+
72+ # Get delivery details to get the destination ARN
73+ LOGGER .info (f"Calling get_delivery API with ID: { delivery_id } " )
74+ delivery_details = logs_client .get_delivery (id = delivery_id )
75+ LOGGER .info (f"Delivery details: { delivery_details } " )
76+
77+ delivery_destination_arn = delivery_details .get ("delivery" , {}).get ("deliveryDestinationArn" )
78+ LOGGER .info (f"Found delivery destination ARN: { delivery_destination_arn } " )
79+ if not delivery_destination_arn :
80+ continue
81+
82+ # Get delivery destinations to match the ARN
83+ LOGGER .info ("Calling describe_delivery_destinations API" )
84+ delivery_destinations = logs_client .describe_delivery_destinations ()
85+ LOGGER .info (f"Found { len (delivery_destinations .get ('deliveryDestinations' , []))} delivery destinations" )
86+
87+ for destination in delivery_destinations .get ("deliveryDestinations" , []):
88+ LOGGER .info (f"Checking destination: { destination .get ('name' )} with ARN: { destination .get ('arn' )} " )
89+ if destination .get ("arn" ) == delivery_destination_arn :
90+ destination_type = destination .get ("deliveryDestinationType" )
91+ LOGGER .info (f"Found matching destination with type: { destination_type } " )
92+ return True , destination_type
93+
94+ LOGGER .info ("No matching logging configuration found" )
95+ return False , None
96+
97+ except Exception as e :
98+ LOGGER .error (f"Error checking logging for knowledge base { kb_id } : { str (e )} " )
99+ return False , None
30100
31101
32102def evaluate_compliance (rule_parameters : dict ) -> tuple [str , str ]: # noqa: CFQ004, U100
@@ -35,9 +105,6 @@ def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]: # noqa: CFQ0
35105 Args:
36106 rule_parameters (dict): Rule parameters from AWS Config rule.
37107
38- Raises:
39- ClientError: If there is an error checking the knowledge base
40-
41108 Returns:
42109 tuple[str, str]: Compliance type and annotation message.
43110 """
@@ -52,28 +119,22 @@ def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]: # noqa: CFQ0
52119 return "COMPLIANT" , "No knowledge bases found in the account"
53120
54121 non_compliant_kbs = []
122+ compliant_kbs = []
55123
56124 # Check each knowledge base for logging configuration
57125 for kb in kb_list :
58126 kb_id = kb ["knowledgeBaseId" ]
59- try :
60- kb_details = bedrock_agent_client .get_knowledge_base (knowledgeBaseId = kb_id )
61-
62- # Check if logging is enabled
63- logging_config = kb_details .get ("loggingConfiguration" , {})
64- if not isinstance (logging_config , dict ) or not logging_config .get ("enabled" , False ):
65- non_compliant_kbs .append (f"{ kb_id } ({ kb .get ('name' , 'unnamed' )} )" )
127+ kb_name = kb .get ("name" , "unnamed" )
66128
67- except ClientError as e :
68- LOGGER .error (f"Error checking knowledge base { kb_id } : { str (e )} " )
69- if e .response ["Error" ]["Code" ] == "AccessDeniedException" :
70- non_compliant_kbs .append (f"{ kb_id } (access denied)" )
71- else :
72- raise
129+ has_logging , destination_type = check_kb_logging (kb_id )
130+ if not has_logging :
131+ non_compliant_kbs .append (f"{ kb_id } ({ kb_name } ) - logging not configured" )
132+ else :
133+ compliant_kbs .append (f"{ kb_id } ({ kb_name } ) - logging configured to { destination_type } " )
73134
74135 if non_compliant_kbs :
75136 return "NON_COMPLIANT" , f"The following knowledge bases do not have logging enabled: { ', ' .join (non_compliant_kbs )} "
76- return "COMPLIANT" , "All knowledge bases have logging enabled"
137+ return "COMPLIANT" , f "All knowledge bases have logging enabled: { ', ' . join ( compliant_kbs ) } "
77138
78139 except Exception as e :
79140 LOGGER .error (f"Error evaluating Bedrock Knowledge Base logging configuration: { str (e )} " )
0 commit comments