1- """ CloudFormation Resource serialization, deserialization, and validation """
1+ """CloudFormation Resource serialization, deserialization, and validation"""
22
33import inspect
44import re
55from abc import ABC , ABCMeta , abstractmethod
66from contextlib import suppress
7+ from enum import Enum
78from typing import Any , Callable , Dict , List , Optional , Tuple , Type , TypeVar
89
910from samtranslator .compat import pydantic
@@ -163,7 +164,7 @@ def __init__(
163164 self .set_resource_attribute (attr , value )
164165
165166 @classmethod
166- def get_supported_resource_attributes (cls ): # type: ignore[no-untyped-def]
167+ def get_supported_resource_attributes (cls ) -> Tuple [ str , ...]:
167168 """
168169 A getter method for the supported resource attributes
169170 returns: a tuple that contains the name of all supported resource attributes
@@ -205,7 +206,7 @@ def from_dict(cls, logical_id: str, resource_dict: Dict[str, Any], relative_id:
205206
206207 resource = cls (logical_id , relative_id = relative_id )
207208
208- resource ._validate_resource_dict (logical_id , resource_dict ) # type: ignore[no-untyped-call]
209+ resource ._validate_resource_dict (logical_id , resource_dict )
209210
210211 # Default to empty properties dictionary. If customers skip the Properties section, an empty dictionary
211212 # accurately captures the intent.
@@ -247,7 +248,7 @@ def _validate_logical_id(logical_id: Optional[Any]) -> str:
247248 raise InvalidResourceException (str (logical_id ), "Logical ids must be alphanumeric." )
248249
249250 @classmethod
250- def _validate_resource_dict (cls , logical_id , resource_dict ): # type: ignore[no-untyped-def]
251+ def _validate_resource_dict (cls , logical_id : str , resource_dict : Dict [ str , Any ]) -> None :
251252 """Validates that the provided resource dict contains the correct Type string, and the required Properties dict.
252253
253254 :param dict resource_dict: the resource dict to validate
@@ -335,22 +336,82 @@ def __setattr__(self, name, value): # type: ignore[no-untyped-def]
335336 )
336337
337338 # Note: For compabitliy issue, we should ONLY use this with new abstraction/resources.
338- def validate_properties_and_return_model (self , cls : Type [RT ]) -> RT :
339+ def validate_properties_and_return_model (self , cls : Type [RT ], collect_all_errors : bool = False ) -> RT :
339340 """
340341 Given a resource properties, return a typed object from the definitions of SAM schema model
341342
342- param:
343- resource_properties: properties from input template
343+ Args:
344344 cls: schema models
345+ collect_all_errors: If True, collect all validation errors. If False (default), only first error.
345346 """
346347 try :
347348 return cls .parse_obj (self ._generate_resource_dict ()["Properties" ])
348349 except pydantic .error_wrappers .ValidationError as e :
350+ if collect_all_errors :
351+ # Comprehensive error collection with union type consolidation
352+ error_messages = self ._format_all_errors (e .errors ()) # type: ignore[arg-type]
353+ raise InvalidResourceException (self .logical_id , " " .join (error_messages )) from e
349354 error_properties : str = ""
350355 with suppress (KeyError ):
351356 error_properties = "." .join (str (x ) for x in e .errors ()[0 ]["loc" ])
352357 raise InvalidResourceException (self .logical_id , f"Property '{ error_properties } ' is invalid." ) from e
353358
359+ def _format_all_errors (self , errors : List [Dict [str , Any ]]) -> List [str ]:
360+ """Format all validation errors, consolidating union type errors in single pass."""
361+ type_mapping = {
362+ "not a valid dict" : "dictionary" ,
363+ "not a valid int" : "integer" ,
364+ "not a valid float" : "number" ,
365+ "not a valid list" : "list" ,
366+ "not a valid str" : "string" ,
367+ }
368+
369+ # Group errors by path in a single pass
370+ path_to_errors : Dict [str , Dict [str , Any ]] = {}
371+
372+ for error in errors :
373+ property_path = "." .join (str (x ) for x in error ["loc" ])
374+ raw_message = error .get ("msg" , "" )
375+
376+ # Extract type for union consolidation
377+ extracted_type = None
378+ for pattern , type_name in type_mapping .items ():
379+ if pattern in raw_message :
380+ extracted_type = type_name
381+ break
382+
383+ if property_path not in path_to_errors :
384+ path_to_errors [property_path ] = {"types" : [], "error" : error }
385+
386+ if extracted_type :
387+ path_to_errors [property_path ]["types" ].append (extracted_type )
388+
389+ # Format messages based on collected data
390+ result = []
391+ for path , data in path_to_errors .items ():
392+ unique_types = list (dict .fromkeys (data ["types" ])) # Remove duplicates, preserve order
393+
394+ if len (unique_types ) > 1 :
395+ # Multiple types - consolidate with union
396+ type_text = " or " .join (unique_types )
397+ result .append (f"Property '{ path } ' value must be { type_text } ." )
398+ else :
399+ # Single or no types - format normally
400+ result .append (self ._format_single_error (data ["error" ]))
401+
402+ return result
403+
404+ def _format_single_error (self , error : Dict [str , Any ]) -> str :
405+ """Format a single Pydantic error into user-friendly message."""
406+ property_path = "." .join (str (x ) for x in error ["loc" ])
407+ raw_message = error ["msg" ]
408+
409+ if error ["type" ] == "value_error.missing" :
410+ return f"Property '{ property_path } ' is required."
411+ if "extra fields not permitted" in raw_message :
412+ return f"Property '{ property_path } ' is an invalid property."
413+ return f"Property '{ property_path } ' { raw_message .lower ()} ."
414+
354415 def validate_properties (self ) -> None :
355416 """Validates that the required properties for this Resource have been populated, and that all properties have
356417 valid values.
@@ -481,6 +542,16 @@ def to_cloudformation(self, **kwargs: Any) -> List[Any]:
481542 """
482543
483544
545+ class ValidationRule (Enum ):
546+ MUTUALLY_EXCLUSIVE = "mutually_exclusive"
547+ MUTUALLY_INCLUSIVE = "mutually_inclusive"
548+ CONDITIONAL_REQUIREMENT = "conditional_requirement"
549+
550+
551+ # Simple tuple-based rules: (rule_type, [property_names])
552+ PropertyRule = Tuple [ValidationRule , List [str ]]
553+
554+
484555class SamResourceMacro (ResourceMacro , metaclass = ABCMeta ):
485556 """ResourceMacro that specifically refers to SAM (AWS::Serverless::*) resources."""
486557
@@ -536,14 +607,14 @@ def get_resource_references(self, generated_cfn_resources, supported_resource_re
536607 def _construct_tag_list (
537608 self , tags : Optional [Dict [str , Any ]], additional_tags : Optional [Dict [str , Any ]] = None
538609 ) -> List [Dict [str , Any ]]:
539- if not bool (tags ):
540- tags = {}
610+ tags_dict : Dict [str , Any ] = tags or {}
541611
542612 if additional_tags is None :
543613 additional_tags = {}
544614
615+ # At this point tags is guaranteed to be a Dict[str, Any] since we set it to {} if it was falsy
545616 for tag in self ._RESERVED_TAGS :
546- self ._check_tag (tag , tags ) # type: ignore[no-untyped-call]
617+ self ._check_tag (tag , tags_dict )
547618
548619 sam_tag = {self ._SAM_KEY : self ._SAM_VALUE }
549620
@@ -553,6 +624,40 @@ def _construct_tag_list(
553624 # customer's knowledge.
554625 return get_tag_list (sam_tag ) + get_tag_list (additional_tags ) + get_tag_list (tags )
555626
627+ @staticmethod
628+ def propagate_tags_combine (
629+ resources : List [Resource ], tags : Optional [Dict [str , Any ]], propagate_tags : Optional [bool ] = False
630+ ) -> None :
631+ """
632+ Propagates tags to the resources
633+ Similar to propagate_tags() method but this method will combine provided tags with existing resource tags.
634+
635+ Note:
636+ - This method created after propagate_tags() to combine propagate tags with resource tags create during to_cloudformation()
637+ - Create this method because updating propagate_tags() will cause regression issue;
638+ - Use this method for new resource if you want to assign combined tags, not replace.
639+
640+ :param propagate_tags: Whether we should pass the tags to generated resources.
641+ :param resources: List of generated resources
642+ :param tags: dictionary of tags to propagate to the resources.
643+
644+ :return: None
645+ """
646+ if not propagate_tags or not tags :
647+ return
648+
649+ for resource in resources :
650+ if hasattr (resource , "Tags" ):
651+ if resource .Tags :
652+ propagated_tags = get_tag_list (tags )
653+ combined_tags = [
654+ {"Key" : k , "Value" : v }
655+ for k , v in {tag ["Key" ]: tag ["Value" ] for tag in resource .Tags + propagated_tags }.items ()
656+ ]
657+ resource .Tags = combined_tags
658+ else :
659+ resource .assign_tags (tags )
660+
556661 @staticmethod
557662 def propagate_tags (
558663 resources : List [Resource ], tags : Optional [Dict [str , Any ]], propagate_tags : Optional [bool ] = False
@@ -572,7 +677,7 @@ def propagate_tags(
572677 for resource in resources :
573678 resource .assign_tags (tags )
574679
575- def _check_tag (self , reserved_tag_name , tags ): # type: ignore[no-untyped-def]
680+ def _check_tag (self , reserved_tag_name : str , tags : Dict [ str , Any ]) -> None :
576681 if reserved_tag_name in tags :
577682 raise InvalidResourceException (
578683 self .logical_id ,
@@ -582,6 +687,71 @@ def _check_tag(self, reserved_tag_name, tags): # type: ignore[no-untyped-def]
582687 "input." ,
583688 )
584689
690+ def validate_before_transform (self , schema_class : Optional [Type [RT ]], collect_all_errors : bool = False ) -> None :
691+ if not hasattr (self , "__validation_rules__" ):
692+ return
693+
694+ rules = self .__validation_rules__
695+ validated_model = (
696+ self .validate_properties_and_return_model (schema_class , collect_all_errors ) if schema_class else None
697+ )
698+
699+ error_messages = []
700+
701+ for rule_type , properties in rules :
702+ present = [prop for prop in properties if self ._get_property_value (prop , validated_model ) is not None ]
703+ if rule_type == ValidationRule .MUTUALLY_EXCLUSIVE :
704+ # Check if more than one property exists
705+ if len (present ) > 1 :
706+ prop_names = [f"'{ p } '" for p in present ]
707+ error_messages .append (f"Cannot specify { self ._combine_string (prop_names )} together." )
708+
709+ elif rule_type == ValidationRule .MUTUALLY_INCLUSIVE :
710+ # If any property in the group is present, then all properties in the group must be present
711+ # Check if some but not all properties from the group are present
712+ missing_some_properties = 0 < len (present ) < len (properties )
713+ if missing_some_properties :
714+ error_messages .append (f"Properties must be used together: { self ._combine_string (properties )} ." )
715+
716+ elif (
717+ rule_type == ValidationRule .CONDITIONAL_REQUIREMENT
718+ and self ._get_property_value (properties [0 ], validated_model ) is not None
719+ and self ._get_property_value (properties [1 ], validated_model ) is None
720+ ):
721+ # First property requires second property
722+ error_messages .append (f"'{ properties [0 ]} ' requires '{ properties [1 ]} '." )
723+
724+ # If there are any validation errors, raise a single exception with all error messages
725+ if error_messages :
726+ raise InvalidResourceException (self .logical_id , "\n " .join (error_messages ))
727+
728+ def _combine_string (self , words : List [str ]) -> str :
729+ return ", " .join (words [:- 1 ]) + (" and " + words [- 1 ] if len (words ) > 1 else words [0 ] if words else "" )
730+
731+ def _get_property_value (self , prop : str , validated_model : Any = None ) -> Any :
732+ """Original property value getter. Supports nested properties with dot notation."""
733+ if "." not in prop :
734+ # Simple property - use existing logic for direct attributes
735+ return getattr (self , prop , None )
736+
737+ # Nested property - use validated model
738+ if validated_model is None :
739+ return None
740+
741+ try :
742+ # Navigate through nested properties using dot notation
743+ value = validated_model
744+ for part in prop .split ("." ):
745+ if hasattr (value , part ):
746+ value = getattr (value , part )
747+ if value is None :
748+ return None
749+ else :
750+ return None
751+ return value
752+ except Exception :
753+ return None
754+
585755
586756class ResourceTypeResolver :
587757 """ResourceTypeResolver maps Resource Types to Resource classes, e.g. AWS::Serverless::Function to
@@ -643,7 +813,7 @@ def get_all_resources(self) -> Dict[str, Any]:
643813 """Return a dictionary of all resources from the SAM template."""
644814 return self .resources
645815
646- def get_resource_by_logical_id (self , _input : str ) -> Dict [str , Any ]:
816+ def get_resource_by_logical_id (self , _input : str ) -> Optional [ Dict [str , Any ] ]:
647817 """
648818 Recursively find resource with matching Logical ID that are present in the template and returns the value.
649819 If it is not in template, this method simply returns the input unchanged.
@@ -661,16 +831,16 @@ def get_resource_by_logical_id(self, _input: str) -> Dict[str, Any]:
661831__all__ : List [str ] = [
662832 "IS_DICT" ,
663833 "IS_STR" ,
664- "Validator" ,
665- "any_type" ,
666- "is_type" ,
667- "PropertyType" ,
668- "Property" ,
669- "PassThroughProperty" ,
670834 "MutatedPassThroughProperty" ,
835+ "PassThroughProperty" ,
836+ "Property" ,
837+ "PropertyType" ,
671838 "Resource" ,
672839 "ResourceMacro" ,
673- "SamResourceMacro" ,
674- "ResourceTypeResolver" ,
675840 "ResourceResolver" ,
841+ "ResourceTypeResolver" ,
842+ "SamResourceMacro" ,
843+ "Validator" ,
844+ "any_type" ,
845+ "is_type" ,
676846]
0 commit comments