diff --git a/detection_rules/index_mappings.py b/detection_rules/index_mappings.py index bfcbd13316f..f3017145ad4 100644 --- a/detection_rules/index_mappings.py +++ b/detection_rules/index_mappings.py @@ -160,25 +160,33 @@ def get_simulated_index_template_mappings(elastic_client: Elasticsearch, name: s def prune_mappings_of_unsupported_types( - integration: str, stream: str, stream_mappings: dict[str, Any], log: Callable[[str], None] + debug_str_data_source: str, stream_mappings: dict[str, Any], log: Callable[[str], None] ) -> dict[str, Any]: """Prune fields with unsupported types (ES|QL) from the provided mappings.""" nested_multifields = find_nested_multifields(stream_mappings) for field in nested_multifields: - field_name = str(field).split(".fields.")[0].replace(".", ".properties.") + ".fields" + parts = str(field).split(".fields.")[0].split(".") + base_name = ".properties.".join(parts) + field_name = f"{base_name}.fields" log( - f"Warning: Nested multi-field `{field}` found in `{integration}-{stream}`. " + f"Warning: Nested multi-field `{field}` found in `{debug_str_data_source}`. " f"Removing parent field from schema for ES|QL validation." ) delete_nested_key_from_dict(stream_mappings, field_name) nested_flattened_fields = find_flattened_fields_with_subfields(stream_mappings) for field in nested_flattened_fields: - field_name = str(field).split(".fields.")[0].replace(".", ".properties.") + ".fields" + # Remove both .fields and .properties entries for flattened fields + # .properties entries can occur when being merged with non-ecs or custom schemas + parts = str(field).split(".fields.")[0].split(".") + base_name = ".properties.".join(parts) + field_name = f"{base_name}.fields" + property_name = f"{base_name}.properties" log( - f"Warning: flattened field `{field}` found in `{integration}-{stream}` with sub fields. " + f"Warning: flattened field `{field}` found in `{debug_str_data_source}` with sub fields. " f"Removing parent field from schema for ES|QL validation." ) delete_nested_key_from_dict(stream_mappings, field_name) + delete_nested_key_from_dict(stream_mappings, property_name) return stream_mappings @@ -222,7 +230,7 @@ def prepare_integration_mappings( # noqa: PLR0913 for stream in package_schema: flat_schema = package_schema[stream] stream_mappings = flat_schema_to_index_mapping(flat_schema) - stream_mappings = prune_mappings_of_unsupported_types(integration, stream, stream_mappings, log) + stream_mappings = prune_mappings_of_unsupported_types(f"{integration}-{stream}", stream_mappings, log) utils.combine_dicts(integration_mappings, deepcopy(stream_mappings)) index_lookup[f"{integration}-{stream}"] = stream_mappings @@ -246,12 +254,13 @@ def get_index_to_package_lookup(indices: list[str], index_lookup: dict[str, Any] return index_lookup_indices -def get_filtered_index_schema( +def get_filtered_index_schema( # noqa: PLR0913 indices: list[str], index_lookup: dict[str, Any], ecs_schema: dict[str, Any], non_ecs_mapping: dict[str, Any], custom_mapping: dict[str, Any], + log: Callable[[str], None], ) -> tuple[dict[str, Any], dict[str, Any]]: """Check if the provided indices are known based on the integration format. Returns the combined schema.""" @@ -304,7 +313,7 @@ def get_filtered_index_schema( # Need to use a merge here to not overwrite existing fields utils.combine_dicts(base, deepcopy(non_ecs_mapping.get(match, {}))) utils.combine_dicts(base, deepcopy(custom_mapping.get(match, {}))) - filtered_index_lookup[match] = base + filtered_index_lookup[match] = prune_mappings_of_unsupported_types(match, base, log) utils.combine_dicts(combined_mappings, deepcopy(base)) # Reduce the index lookup to only the matched indices (remote/Kibana schema validation source of truth) @@ -403,7 +412,7 @@ def find_nested_multifields(mapping: dict[str, Any], path: str = "") -> list[Any def find_flattened_fields_with_subfields(mapping: dict[str, Any], path: str = "") -> list[str]: - """Recursively search for fields of type 'flattened' that have a 'fields' key in Elasticsearch mappings.""" + """Recursively search for type 'flattened' that have a 'fields' or 'properties' key in Elasticsearch mappings.""" flattened_fields_with_subfields: list[str] = [] for field, properties in mapping.items(): @@ -413,6 +422,9 @@ def find_flattened_fields_with_subfields(mapping: dict[str, Any], path: str = "" # Check if the field is of type 'flattened' and has a 'fields' key if properties.get("type") == "flattened" and "fields" in properties: # type: ignore[reportUnknownVariableType] flattened_fields_with_subfields.append(current_path) # type: ignore[reportUnknownVariableType] + # Check if the field is of type 'flattened' and has a 'properties' key + if properties.get("type") == "flattened" and "properties" in properties: # type: ignore[reportUnknownVariableType] + flattened_fields_with_subfields.append(current_path) # type: ignore[reportUnknownVariableType] # Recurse into subfields if "properties" in properties: @@ -487,8 +499,7 @@ def prepare_mappings( # noqa: PLR0913 # and also at a per index level as custom schemas can override non-ecs fields and/or indices non_ecs_schema = ecs.flatten(non_ecs_schema) non_ecs_schema = utils.convert_to_nested_schema(non_ecs_schema) - non_ecs_schema = prune_mappings_of_unsupported_types("non-ecs", "non-ecs", non_ecs_schema, log) - non_ecs_mapping = prune_mappings_of_unsupported_types("non-ecs", "non-ecs", non_ecs_mapping, log) + non_ecs_schema = prune_mappings_of_unsupported_types("non-ecs", non_ecs_schema, log) # Load custom schema and convert to index mapping format (nested schema) custom_mapping: dict[str, Any] = {} @@ -498,7 +509,6 @@ def prepare_mappings( # noqa: PLR0913 index_mapping = ecs.flatten(index_mapping) index_mapping = utils.convert_to_nested_schema(index_mapping) custom_mapping.update({index: index_mapping}) - custom_mapping = prune_mappings_of_unsupported_types("custom", "custom", custom_mapping, log) # Load ECS in an index mapping format (nested schema) current_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True) @@ -506,7 +516,7 @@ def prepare_mappings( # noqa: PLR0913 # Filter combined mappings based on the provided indices combined_mappings, index_lookup = get_filtered_index_schema( - indices, index_lookup, ecs_schema, non_ecs_mapping, custom_mapping + indices, index_lookup, ecs_schema, non_ecs_mapping, custom_mapping, log ) index_lookup.update({"rule-ecs-index": ecs_schema}) diff --git a/pyproject.toml b/pyproject.toml index 53cecd544e6..d8be95f048c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.5.8" +version = "1.5.9" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12" diff --git a/tests/test_rules_remote.py b/tests/test_rules_remote.py index 91743a8e6a5..e74d9631acd 100644 --- a/tests/test_rules_remote.py +++ b/tests/test_rules_remote.py @@ -218,3 +218,20 @@ def test_esql_filtered_keep(self): """ with pytest.raises(EsqlSchemaError): _ = RuleCollection().load_dict(production_rule) + + def test_esql_non_ecs_schema_conflict_resolution(self): + """Test an ESQL rule that has a known conflict between non_ecs and integrations for correct handling.""" + file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"]) + original_production_rule = load_rule_contents(file_path) + production_rule = deepcopy(original_production_rule)[0] + production_rule["metadata"]["integration"] = ["azure", "o365"] + production_rule["rule"]["query"] = """ + from logs-azure.signinlogs-* metadata _id, _version, _index + | where @timestamp > now() - 30 minutes + and event.dataset in ("azure.signinlogs") + and event.outcome == "success" + and azure.signinlogs.properties.user_id is not null + | keep + event.outcome + """ + _ = RuleCollection().load_dict(production_rule)