Skip to content

Commit 71ecfd6

Browse files
committed
fixes for refactoring due to mypy and flake8; update API calls to appropriately get info
1 parent 578d1da commit 71ecfd6

File tree

1 file changed

+63
-15
lines changed
  • aws_sra_examples/solutions/genai/bedrock_org/lambda/rules/sra_bedrock_check_kb_vector_store_secret

1 file changed

+63
-15
lines changed

aws_sra_examples/solutions/genai/bedrock_org/lambda/rules/sra_bedrock_check_kb_vector_store_secret/app.py

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232

3333
def check_knowledge_base(kb_id: str, kb_name: str) -> tuple[bool, str]: # noqa: CFQ004
34-
"""Check if a knowledge base's vector store is using KMS encrypted secrets.
34+
"""Check if a knowledge base's vector store is using AWS Secrets Manager for credentials.
3535
3636
Args:
3737
kb_id (str): Knowledge base ID
@@ -45,27 +45,63 @@ def check_knowledge_base(kb_id: str, kb_name: str) -> tuple[bool, str]: # noqa:
4545
"""
4646
try:
4747
kb_details = bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id)
48-
vector_store = kb_details.get("vectorStoreConfiguration")
49-
50-
if not vector_store or not isinstance(vector_store, dict):
51-
return False, f"{kb_name} (no vector store configuration)"
52-
53-
secret_arn = vector_store.get("secretArn")
48+
LOGGER.info(f"KB Details: {json.dumps(kb_details, default=str)}")
49+
50+
# Get the knowledge base object from the response
51+
kb = kb_details.get("knowledgeBase", {})
52+
storage_config = kb.get("storageConfiguration")
53+
LOGGER.info(f"Storage config from kb: {json.dumps(storage_config, default=str)}")
54+
55+
if not storage_config or not isinstance(storage_config, dict):
56+
return False, f"{kb_name} (Vector store configuration missing)"
57+
58+
storage_type = storage_config.get("type")
59+
LOGGER.info(f"Storage type: {storage_type}")
60+
if not storage_type:
61+
return False, f"{kb_name} (Vector store type not specified)"
62+
63+
# Check if storage type is one of the supported types
64+
supported_types = {
65+
"PINECONE": "pineconeConfiguration",
66+
"MONGO_DB_ATLAS": "mongoDbAtlasConfiguration",
67+
"REDIS_ENTERPRISE_CLOUD": "redisEnterpriseCloudConfiguration",
68+
"RDS": "rdsConfiguration"
69+
}
70+
71+
# If storage type is not supported, it's compliant (no credentials needed)
72+
if storage_type not in supported_types:
73+
LOGGER.info(f"Storage type {storage_type} not supported - no credentials needed")
74+
return True, f"{kb_name} (Using unsupported vector store type '{storage_type}' - no credentials required)"
75+
76+
# Get the configuration block for the storage type
77+
config_key = supported_types[storage_type]
78+
LOGGER.info(f"Config key: {config_key}")
79+
type_config = storage_config.get(config_key)
80+
LOGGER.info(f"Type config: {type_config}")
81+
82+
if not type_config or not isinstance(type_config, dict):
83+
return False, f"{kb_name} (Missing configuration for {storage_type} vector store)"
84+
85+
# Check for credentials secret ARN
86+
secret_arn = type_config.get("credentialsSecretArn")
87+
LOGGER.info(f"Secret ARN: {secret_arn}")
5488
if not secret_arn:
55-
return False, f"{kb_name} (no secret configured)"
89+
return False, f"{kb_name} (Missing credentials secret for {storage_type} vector store)"
5690

5791
try:
92+
# Verify the secret exists and is using KMS encryption
5893
secret_details = secretsmanager_client.describe_secret(SecretId=secret_arn)
94+
LOGGER.info(f"Secret details: {secret_details}")
5995
if not secret_details.get("KmsKeyId"):
60-
return False, f"{kb_name} (secret not using CMK)"
61-
return True, ""
96+
return False, f"{kb_name} (Credentials secret for {storage_type} vector store not using CMK encryption)"
97+
return True, f"{kb_name} (Using {storage_type} vector store with CMK-encrypted credentials)"
6298
except ClientError as e:
6399
if e.response["Error"]["Code"] == "AccessDeniedException":
64-
return False, f"{kb_name} (secret access denied)"
100+
return False, f"{kb_name} (Access denied to credentials secret for {storage_type} vector store)"
65101
raise
66102
except ClientError as e:
67103
if e.response["Error"]["Code"] == "AccessDeniedException":
68-
return False, f"{kb_name} (access denied)"
104+
return False, f"{kb_name} (Access denied to knowledge base)"
69105
raise
70106

71107

@@ -80,19 +116,31 @@ def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]: # noqa: U100
80116
"""
81117
try:
82118
non_compliant_kbs = []
119+
compliant_kbs = []
83120
paginator = bedrock_agent_client.get_paginator("list_knowledge_bases")
84121

85122
for page in paginator.paginate():
86123
for kb in page["knowledgeBaseSummaries"]:
87124
kb_id = kb["knowledgeBaseId"]
125+
LOGGER.info(f"KB ID: {kb_id}")
88126
kb_name = kb.get("name", kb_id)
127+
LOGGER.info(f"KB Name: {kb_name}")
89128
is_compliant, message = check_knowledge_base(kb_id, kb_name)
90-
if not is_compliant:
129+
if is_compliant:
130+
compliant_kbs.append(message)
131+
else:
91132
non_compliant_kbs.append(message)
92133

93134
if non_compliant_kbs:
94-
return "NON_COMPLIANT", f"The following knowledge bases have vector store secret issues: {'; '.join(non_compliant_kbs)}"
95-
return "COMPLIANT", "All knowledge base vector stores are using KMS encrypted secrets"
135+
return "NON_COMPLIANT", (
136+
"Knowledge base vector store compliance check results:\n"
137+
+ f"Compliant: {'; '.join(compliant_kbs)}\n"
138+
+ f"Non-compliant: {'; '.join(non_compliant_kbs)}"
139+
)
140+
return "COMPLIANT", (
141+
"Knowledge base vector store compliance check results:\n"
142+
+ f"Compliant: {'; '.join(compliant_kbs)}"
143+
)
96144

97145
except Exception as e:
98146
LOGGER.error(f"Error evaluating Bedrock Knowledge Base vector store secrets: {str(e)}")

0 commit comments

Comments
 (0)