1+ """Lambda module to use SRA STS service resources in the organization.
2+
3+ Version: 0.1
4+
5+ STS module for SRA in the repo, https://github.com/aws-samples/aws-security-reference-architecture-examples
6+
7+ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
8+ SPDX-License-Identifier: MIT-0
9+ """
110import logging
211import os
312from typing import Any
716from botocore .config import Config
817import botocore .exceptions
918
10- class sra_sts :
19+
20+ class SRASTS :
21+ """Class to manage STS resources."""
22+
1123 PROFILE = "default"
1224
1325 UNEXPECTED = "Unexpected!"
14- # TODO(liamschn): this needs to be made into an SSM parameter
1526 CONFIGURATION_ROLE : str = ""
1627 BOTO3_CONFIG = Config (retries = {"max_attempts" : 10 , "mode" : "standard" })
1728 PARTITION : str = ""
@@ -22,16 +33,24 @@ class sra_sts:
2233 log_level : str = os .environ .get ("LOG_LEVEL" , "INFO" )
2334 LOGGER .setLevel (log_level )
2435
25- def __init__ (self , profile : str = "default" ) -> None :
36+ def __init__ (self , profile : str = "default" ) -> None :
37+ """Initialize class object.
38+
39+ Args:
40+ profile (str): AWS credentials profile name. Defaults to "default".
41+
42+ Raises:
43+ ValueError: Error message
44+ """
2645 self .PROFILE = profile
27- print (f"STS PROFILE INFO : { self .PROFILE } " )
46+ self . LOGGER . info (f"Initial PROFILE: { self .PROFILE } " )
2847
2948 try :
3049 if self .PROFILE != "default" :
3150 self .MANAGEMENT_ACCOUNT_SESSION = boto3 .Session (profile_name = self .PROFILE )
32- print (f"STS INFO: { self .MANAGEMENT_ACCOUNT_SESSION .client ('sts' ).get_caller_identity ()} " )
51+ self . LOGGER . info (f"STS INFO: { self .MANAGEMENT_ACCOUNT_SESSION .client ('sts' ).get_caller_identity ()} " )
3352 else :
34- print (f"STS PROFILE AGAIN : { self .PROFILE } " )
53+ self . LOGGER . info (f"Subsequent PROFILE: { self .PROFILE } " )
3554 self .MANAGEMENT_ACCOUNT_SESSION = boto3 .Session ()
3655
3756 self .STS_CLIENT = self .MANAGEMENT_ACCOUNT_SESSION .client ("sts" )
@@ -48,7 +67,7 @@ def __init__(self, profile: str="default") -> None:
4867
4968 else :
5069 self .LOGGER .info (f"Error: { error } " )
51- raise error
70+ raise ValueError ( f"Error: { error } " ) from None
5271
5372 try :
5473 self .MANAGEMENT_ACCOUNT = self .STS_CLIENT .get_caller_identity ().get ("Account" )
@@ -59,18 +78,19 @@ def __init__(self, profile: str="default") -> None:
5978 self .LOGGER .info ("Token has expired, please re-run with proper credentials set." )
6079 else :
6180 self .LOGGER .info (f"Error: { error } " )
62- raise error
81+ raise ValueError ( f"Error: { error } " ) from None
6382
6483 def assume_role (self , account : str , role_name : str , service : str , region_name : str ) -> Any :
6584 """Get boto3 client assumed into an account for a specified service.
6685
6786 Args:
6887 account: aws account id
88+ role_name: aws role name
6989 service: aws service
7090 region_name: aws region
7191
7292 Returns:
73- client : boto3 client
93+ Any : boto3 client
7494 """
7595 self .LOGGER .info (f"ASSUME ROLE CALLER ID INFO: { self .MANAGEMENT_ACCOUNT_SESSION .client ('sts' ).get_caller_identity ()} " )
7696 self .LOGGER .info (f"ASSUME ROLE ACCOUNT (CLIENT): { account } ; ROLE NAME: { role_name } ; SERVICE: { service } ; REGION: { region_name } " )
@@ -81,32 +101,29 @@ def assume_role(self, account: str, role_name: str, service: str, region_name: s
81101 RoleSessionName = "SRA-AssumeCrossAccountRole" ,
82102 DurationSeconds = 900 ,
83103 )
84- assumed_client = self .MANAGEMENT_ACCOUNT_SESSION .client (
85- service , # type: ignore
104+ return self .MANAGEMENT_ACCOUNT_SESSION .client (
105+ service , # type: ignore
86106 region_name = region_name ,
87107 aws_access_key_id = sts_response ["Credentials" ]["AccessKeyId" ],
88108 aws_secret_access_key = sts_response ["Credentials" ]["SecretAccessKey" ],
89109 aws_session_token = sts_response ["Credentials" ]["SessionToken" ],
90110 )
91- return assumed_client
92- else :
93- assumed_client = self .MANAGEMENT_ACCOUNT_SESSION .client (
94- service , # type: ignore
95- region_name = region_name ,
96- config = self .BOTO3_CONFIG )
97- return assumed_client
98-
111+ return self .MANAGEMENT_ACCOUNT_SESSION .client (
112+ service , # type: ignore
113+ region_name = region_name ,
114+ config = self .BOTO3_CONFIG )
99115
100116 def assume_role_resource (self , account : str , role_name : str , service : str , region_name : str ) -> Any :
101117 """Get boto3 resource assumed into an account for a specified service.
102118
103119 Args:
104120 account: aws account id
121+ role_name: aws role name
105122 service: aws service
106123 region_name: aws region
107124
108125 Returns:
109- client : boto3 client
126+ Any : boto3 client
110127 """
111128 self .LOGGER .info (f"ASSUME ROLE CALLER ID INFO: { self .MANAGEMENT_ACCOUNT_SESSION .client ('sts' ).get_caller_identity ()} " )
112129 self .LOGGER .info (f"ASSUME ROLE ACCOUNT (RESOURCE): { account } ; ROLE NAME: { role_name } ; SERVICE: { service } ; REGION: { region_name } " )
@@ -116,16 +133,23 @@ def assume_role_resource(self, account: str, role_name: str, service: str, regio
116133 RoleSessionName = "SRA-AssumeCrossAccountRole" ,
117134 DurationSeconds = 900 ,
118135 )
119- assumed_resource = self .MANAGEMENT_ACCOUNT_SESSION .resource (
120- service , # type: ignore
136+ return self .MANAGEMENT_ACCOUNT_SESSION .resource (
137+ service , # type: ignore
121138 region_name = region_name ,
122139 aws_access_key_id = sts_response ["Credentials" ]["AccessKeyId" ],
123140 aws_secret_access_key = sts_response ["Credentials" ]["SecretAccessKey" ],
124141 aws_session_token = sts_response ["Credentials" ]["SessionToken" ],
125142 )
126- return assumed_resource
127143
128144 def get_lambda_execution_role (self ) -> str :
145+ """Get the current lambda execution role arn.
146+
147+ Raises:
148+ ValueError: Unexpected error getting caller identity
149+
150+ Returns:
151+ str: lambda execution role arn
152+ """
129153 try :
130154 response = self .STS_CLIENT .get_caller_identity ()
131155 return response ["Arn" ]
0 commit comments