3131config_client = boto3 .client ("config" , region_name = AWS_REGION )
3232
3333
34- def check_opensearch_serverless (collection_id : str , kb_name : str ) -> str | None : # type: ignore
34+ def check_opensearch_serverless (collection_id : str , kb_name : str ) -> str | None : # type: ignore # noqa: CFQ004
3535 """Check OpenSearch Serverless collection encryption.
3636
3737 Args:
@@ -42,19 +42,41 @@ def check_opensearch_serverless(collection_id: str, kb_name: str) -> str | None:
4242 str | None: Error message if non-compliant, None if compliant
4343 """
4444 try :
45- collection = opensearch_serverless_client .get_security_policy (name = collection_id , type = "encryption" )
46- security_policy = collection .get ("securityPolicyDetail" , {})
47- if security_policy .get ("Type" ) == "encryption" :
48- security_policies = security_policy .get ("SecurityPolicies" , [])
49- if isinstance (security_policies , list ) and security_policies :
50- encryption_policy = security_policies [0 ]
51- kms_key_arn = encryption_policy .get ("KmsARN" , "" )
52- if not kms_key_arn or "aws/opensearchserverless" in kms_key_arn :
53- return f"{ kb_name } (OpenSearch Serverless not using CMK)"
45+ # Get collection details to get the collection name
46+ collection_response = opensearch_serverless_client .batch_get_collection (ids = [collection_id ])
47+ LOGGER .info (f"Collection details: { json .dumps (collection_response , default = str )} " )
48+
49+ if not collection_response .get ("collectionDetails" ):
50+ LOGGER .error (f"No collection details found for ID { collection_id } " )
51+ return f"{ kb_name } (OpenSearch Serverless collection not found)"
52+
53+ collection_name = collection_response ["collectionDetails" ][0 ].get ("name" )
54+ if not collection_name :
55+ LOGGER .error (f"No collection name found for ID { collection_id } " )
56+ return f"{ kb_name } (OpenSearch Serverless collection name not found)"
57+
58+ # Get the specific policy details using the collection name
59+ policy_details = opensearch_serverless_client .get_security_policy (name = collection_name , type = "encryption" )
60+ LOGGER .info (f"Policy details for { collection_name } : { json .dumps (policy_details , default = str )} " )
61+
62+ policy_details_dict = json .loads (json .dumps (policy_details , default = str ))
63+ policy_details_dict = policy_details_dict .get ("securityPolicyDetail" , {}).get ("policy" , {})
64+ LOGGER .info (f"Policy details dict (after getting policy): { json .dumps (policy_details_dict , default = str )} " )
65+
66+ if policy_details_dict .get ("AWSOwnedKey" , False ):
67+ LOGGER .info (f"{ kb_name } (OpenSearch Serverless using AWS-owned key instead of CMK)" )
68+ return f"{ kb_name } (OpenSearch Serverless using AWS-owned key instead of CMK)"
69+
70+ kms_key_arn = policy_details_dict .get ("KmsARN" , "" )
71+ if not kms_key_arn :
72+ LOGGER .info (f"{ kb_name } (OpenSearch Serverless not using CMK)" )
73+ return f"{ kb_name } (OpenSearch Serverless not using CMK)"
74+
75+ return None
76+
5477 except ClientError as e :
5578 LOGGER .error (f"Error checking OpenSearch Serverless collection: { str (e )} " )
5679 return f"{ kb_name } (error checking OpenSearch Serverless)"
57- return None
5880
5981
6082def check_opensearch_domain (domain_name : str , kb_name : str ) -> str | None : # type: ignore # noqa: CFQ004
0 commit comments