Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions detection_rules/index_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,25 +160,33 @@ def get_simulated_index_template_mappings(elastic_client: Elasticsearch, name: s


def prune_mappings_of_unsupported_types(
integration: str, stream: str, stream_mappings: dict[str, Any], log: Callable[[str], None]
debug_str_data_source: str, stream_mappings: dict[str, Any], log: Callable[[str], None]
) -> dict[str, Any]:
"""Prune fields with unsupported types (ES|QL) from the provided mappings."""
nested_multifields = find_nested_multifields(stream_mappings)
for field in nested_multifields:
field_name = str(field).split(".fields.")[0].replace(".", ".properties.") + ".fields"
parts = str(field).split(".fields.")[0].split(".")
base_name = ".properties.".join(parts)
field_name = f"{base_name}.fields"
log(
f"Warning: Nested multi-field `{field}` found in `{integration}-{stream}`. "
f"Warning: Nested multi-field `{field}` found in `{debug_str_data_source}`. "
f"Removing parent field from schema for ES|QL validation."
)
delete_nested_key_from_dict(stream_mappings, field_name)
nested_flattened_fields = find_flattened_fields_with_subfields(stream_mappings)
for field in nested_flattened_fields:
field_name = str(field).split(".fields.")[0].replace(".", ".properties.") + ".fields"
# Remove both .fields and .properties entries for flattened fields
# .properties entries can occur when being merged with non-ecs or custom schemas
parts = str(field).split(".fields.")[0].split(".")
base_name = ".properties.".join(parts)
field_name = f"{base_name}.fields"
property_name = f"{base_name}.properties"
log(
f"Warning: flattened field `{field}` found in `{integration}-{stream}` with sub fields. "
f"Warning: flattened field `{field}` found in `{debug_str_data_source}` with sub fields. "
f"Removing parent field from schema for ES|QL validation."
)
delete_nested_key_from_dict(stream_mappings, field_name)
delete_nested_key_from_dict(stream_mappings, property_name)
return stream_mappings


Expand Down Expand Up @@ -222,7 +230,7 @@ def prepare_integration_mappings( # noqa: PLR0913
for stream in package_schema:
flat_schema = package_schema[stream]
stream_mappings = flat_schema_to_index_mapping(flat_schema)
stream_mappings = prune_mappings_of_unsupported_types(integration, stream, stream_mappings, log)
stream_mappings = prune_mappings_of_unsupported_types(f"{integration}-{stream}", stream_mappings, log)
utils.combine_dicts(integration_mappings, deepcopy(stream_mappings))
index_lookup[f"{integration}-{stream}"] = stream_mappings

Expand All @@ -246,12 +254,13 @@ def get_index_to_package_lookup(indices: list[str], index_lookup: dict[str, Any]
return index_lookup_indices


def get_filtered_index_schema(
def get_filtered_index_schema( # noqa: PLR0913
indices: list[str],
index_lookup: dict[str, Any],
ecs_schema: dict[str, Any],
non_ecs_mapping: dict[str, Any],
custom_mapping: dict[str, Any],
log: Callable[[str], None],
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Check if the provided indices are known based on the integration format. Returns the combined schema."""

Expand Down Expand Up @@ -304,7 +313,7 @@ def get_filtered_index_schema(
# Need to use a merge here to not overwrite existing fields
utils.combine_dicts(base, deepcopy(non_ecs_mapping.get(match, {})))
utils.combine_dicts(base, deepcopy(custom_mapping.get(match, {})))
filtered_index_lookup[match] = base
filtered_index_lookup[match] = prune_mappings_of_unsupported_types(match, base, log)
utils.combine_dicts(combined_mappings, deepcopy(base))

# Reduce the index lookup to only the matched indices (remote/Kibana schema validation source of truth)
Expand Down Expand Up @@ -403,7 +412,7 @@ def find_nested_multifields(mapping: dict[str, Any], path: str = "") -> list[Any


def find_flattened_fields_with_subfields(mapping: dict[str, Any], path: str = "") -> list[str]:
"""Recursively search for fields of type 'flattened' that have a 'fields' key in Elasticsearch mappings."""
"""Recursively search for type 'flattened' that have a 'fields' or 'properties' key in Elasticsearch mappings."""
flattened_fields_with_subfields: list[str] = []

for field, properties in mapping.items():
Expand All @@ -413,6 +422,9 @@ def find_flattened_fields_with_subfields(mapping: dict[str, Any], path: str = ""
# Check if the field is of type 'flattened' and has a 'fields' key
if properties.get("type") == "flattened" and "fields" in properties: # type: ignore[reportUnknownVariableType]
flattened_fields_with_subfields.append(current_path) # type: ignore[reportUnknownVariableType]
# Check if the field is of type 'flattened' and has a 'properties' key
if properties.get("type") == "flattened" and "properties" in properties: # type: ignore[reportUnknownVariableType]
flattened_fields_with_subfields.append(current_path) # type: ignore[reportUnknownVariableType]

# Recurse into subfields
if "properties" in properties:
Expand Down Expand Up @@ -487,8 +499,7 @@ def prepare_mappings( # noqa: PLR0913
# and also at a per index level as custom schemas can override non-ecs fields and/or indices
non_ecs_schema = ecs.flatten(non_ecs_schema)
non_ecs_schema = utils.convert_to_nested_schema(non_ecs_schema)
non_ecs_schema = prune_mappings_of_unsupported_types("non-ecs", "non-ecs", non_ecs_schema, log)
non_ecs_mapping = prune_mappings_of_unsupported_types("non-ecs", "non-ecs", non_ecs_mapping, log)
non_ecs_schema = prune_mappings_of_unsupported_types("non-ecs", non_ecs_schema, log)

# Load custom schema and convert to index mapping format (nested schema)
custom_mapping: dict[str, Any] = {}
Expand All @@ -498,15 +509,14 @@ def prepare_mappings( # noqa: PLR0913
index_mapping = ecs.flatten(index_mapping)
index_mapping = utils.convert_to_nested_schema(index_mapping)
custom_mapping.update({index: index_mapping})
custom_mapping = prune_mappings_of_unsupported_types("custom", "custom", custom_mapping, log)

# Load ECS in an index mapping format (nested schema)
current_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
ecs_schema = get_ecs_schema_mappings(current_version)

# Filter combined mappings based on the provided indices
combined_mappings, index_lookup = get_filtered_index_schema(
indices, index_lookup, ecs_schema, non_ecs_mapping, custom_mapping
indices, index_lookup, ecs_schema, non_ecs_mapping, custom_mapping, log
)

index_lookup.update({"rule-ecs-index": ecs_schema})
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.5.8"
version = "1.5.9"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
17 changes: 17 additions & 0 deletions tests/test_rules_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,20 @@ def test_esql_filtered_keep(self):
"""
with pytest.raises(EsqlSchemaError):
_ = RuleCollection().load_dict(production_rule)

def test_esql_non_ecs_schema_conflict_resolution(self):
"""Test an ESQL rule that has a known conflict between non_ecs and integrations for correct handling."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = ["azure", "o365"]
production_rule["rule"]["query"] = """
from logs-azure.signinlogs-* metadata _id, _version, _index
| where @timestamp > now() - 30 minutes
and event.dataset in ("azure.signinlogs")
and event.outcome == "success"
and azure.signinlogs.properties.user_id is not null
| keep
event.outcome
"""
_ = RuleCollection().load_dict(production_rule)
Loading