From 1444c710ea2192298750586b83e91f7f4b4ca49b Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Wed, 29 Oct 2025 16:51:24 +1100 Subject: [PATCH 01/11] Move ML job types to models --- internal/clients/elasticsearch/ml_job.go | 24 ++---------------------- internal/models/ml.go | 22 ++++++++++++++++++---- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/internal/clients/elasticsearch/ml_job.go b/internal/clients/elasticsearch/ml_job.go index fc51037cd..560dbc52d 100644 --- a/internal/clients/elasticsearch/ml_job.go +++ b/internal/clients/elasticsearch/ml_job.go @@ -15,25 +15,6 @@ import ( "github.com/hashicorp/terraform-plugin-framework/diag" ) -// MLJobStats represents the statistics structure for an ML job -type MLJobStats struct { - Jobs []MLJob `json:"jobs"` -} - -// MLJob represents a single ML job in the stats response -type MLJob struct { - JobId string `json:"job_id"` - State string `json:"state"` - Node *MLJobNode `json:"node,omitempty"` -} - -// MLJobNode represents the node information for an ML job -type MLJobNode struct { - Id string `json:"id"` - Name string `json:"name"` - Attributes map[string]interface{} `json:"attributes"` -} - // OpenMLJob opens a machine learning job func OpenMLJob(ctx context.Context, apiClient *clients.ApiClient, jobId string) diag.Diagnostics { var diags diag.Diagnostics @@ -120,7 +101,7 @@ func CloseMLJob(ctx context.Context, apiClient *clients.ApiClient, jobId string, } // GetMLJobStats retrieves the stats for a specific machine learning job -func GetMLJobStats(ctx context.Context, apiClient *clients.ApiClient, jobId string) (*MLJob, diag.Diagnostics) { +func GetMLJobStats(ctx context.Context, apiClient *clients.ApiClient, jobId string) (*models.MLJob, diag.Diagnostics) { var diags diag.Diagnostics esClient, err := apiClient.GetESClient() @@ -148,8 +129,7 @@ func GetMLJobStats(ctx context.Context, apiClient *clients.ApiClient, jobId stri if diags.HasError() { return nil, diags } - - var jobStats MLJobStats + var jobStats models.MLJobStats if err := json.NewDecoder(res.Body).Decode(&jobStats); err != nil { diags.AddError("Failed to decode ML job stats response", err.Error()) return nil, diags diff --git a/internal/models/ml.go b/internal/models/ml.go index 8389d5409..427f3b2d8 100644 --- a/internal/models/ml.go +++ b/internal/models/ml.go @@ -124,8 +124,22 @@ type DatafeedNode struct { // DatafeedRunning represents the running state of a datafeed type DatafeedRunning struct { - RealTimeConfigured bool `json:"real_time_configured"` - RealTimeRunning bool `json:"real_time_running"` - SearchInterval *int64 `json:"search_interval,omitempty"` - LastEndTime *time.Time `json:"last_end_time,omitempty"` + +// MLJobStats represents the statistics structure for an ML job +type MLJobStats struct { + Jobs []MLJob `json:"jobs"` +} + +// MLJob represents a single ML job in the stats response +type MLJob struct { + JobId string `json:"job_id"` + State string `json:"state"` + Node *MLJobNode `json:"node,omitempty"` +} + +// MLJobNode represents the node information for an ML job +type MLJobNode struct { + Id string `json:"id"` + Name string `json:"name"` + Attributes map[string]interface{} `json:"attributes"` } From b1d7de83dcb0001a4e74771531bc2b941ac315ae Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Wed, 29 Oct 2025 16:52:46 +1100 Subject: [PATCH 02/11] Extract reading datafeed state --- internal/elasticsearch/ml/datafeed/delete.go | 25 ++++----- .../elasticsearch/ml/datafeed/state_utils.go | 56 ++++++++++++++----- internal/elasticsearch/ml/datafeed/update.go | 7 +-- 3 files changed, 56 insertions(+), 32 deletions(-) diff --git a/internal/elasticsearch/ml/datafeed/delete.go b/internal/elasticsearch/ml/datafeed/delete.go index 0a66c8102..d4f196f55 100644 --- a/internal/elasticsearch/ml/datafeed/delete.go +++ b/internal/elasticsearch/ml/datafeed/delete.go @@ -2,7 +2,6 @@ package datafeed import ( "context" - "fmt" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" "github.com/hashicorp/terraform-plugin-framework/diag" @@ -44,22 +43,18 @@ func (r *datafeedResource) delete(ctx context.Context, req resource.DeleteReques } func (r *datafeedResource) maybeStopDatafeed(ctx context.Context, datafeedId string) (bool, diag.Diagnostics) { - var diags diag.Diagnostics - // Check current state - currentState, err := r.getDatafeedState(ctx, datafeedId) - if err != nil { - // If we can't get the state, try to extract the error details - if err.Error() == fmt.Sprintf("datafeed %s not found", datafeedId) { - // Datafeed does not exist, nothing to stop - return false, diags - } - diags.AddError("Failed to get datafeed state", err.Error()) + currentState, diags := GetDatafeedState(ctx, r.client, datafeedId) + if diags.HasError() { return false, diags } + if currentState == nil { + return false, nil + } + // If the datafeed is not running, nothing to stop - if currentState != "started" && currentState != "starting" { + if *currentState != "started" && *currentState != "starting" { return false, diags } @@ -71,9 +66,9 @@ func (r *datafeedResource) maybeStopDatafeed(ctx context.Context, datafeedId str } // Wait for the datafeed to reach stopped state - err = r.waitForDatafeedState(ctx, datafeedId, "stopped") - if err != nil { - diags.AddError("Failed to wait for datafeed to stop", fmt.Sprintf("Datafeed %s did not stop within timeout: %s", datafeedId, err.Error())) + _, waitDiags := WaitForDatafeedState(ctx, r.client, datafeedId, "stopped") + diags.Append(waitDiags...) + if diags.HasError() { return true, diags } diff --git a/internal/elasticsearch/ml/datafeed/state_utils.go b/internal/elasticsearch/ml/datafeed/state_utils.go index 9c80f540c..bee9c2b95 100644 --- a/internal/elasticsearch/ml/datafeed/state_utils.go +++ b/internal/elasticsearch/ml/datafeed/state_utils.go @@ -2,35 +2,65 @@ package datafeed import ( "context" + "errors" "fmt" "github.com/elastic/terraform-provider-elasticstack/internal/asyncutils" + "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" + "github.com/hashicorp/terraform-plugin-framework/diag" ) -// getDatafeedState returns the current state of a datafeed -func (r *datafeedResource) getDatafeedState(ctx context.Context, datafeedId string) (string, error) { - statsResponse, diags := elasticsearch.GetDatafeedStats(ctx, r.client, datafeedId) +// GetDatafeedState returns the current state of a datafeed +func GetDatafeedState(ctx context.Context, client *clients.ApiClient, datafeedId string) (*string, diag.Diagnostics) { + statsResponse, diags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId) if diags.HasError() { - return "", fmt.Errorf("failed to get datafeed stats: %v", diags) + return nil, diags } if statsResponse == nil { - return "", fmt.Errorf("datafeed %s not found", datafeedId) + return nil, nil } - return statsResponse.State, nil + return &statsResponse.State, nil } -// waitForDatafeedState waits for a datafeed to reach the desired state -func (r *datafeedResource) waitForDatafeedState(ctx context.Context, datafeedId, desiredState string) error { +var terminalDatafeedStates = map[string]struct{}{ + "stopped": {}, + "started": {}, +} + +var errDatafeedInUndesiredState = errors.New("datafeed stuck in undesired state") + +// WaitForDatafeedState waits for a datafeed to reach the desired state +func WaitForDatafeedState(ctx context.Context, client *clients.ApiClient, datafeedId, desiredState string) (bool, diag.Diagnostics) { stateChecker := func(ctx context.Context) (bool, error) { - currentState, err := r.getDatafeedState(ctx, datafeedId) - if err != nil { - return false, err + currentState, diags := GetDatafeedState(ctx, client, datafeedId) + if diags.HasError() { + return false, diagutil.FwDiagsAsError(diags) + } + + if currentState == nil { + return false, fmt.Errorf("datafeed %s not found", datafeedId) + } + + if *currentState == desiredState { + return true, nil + } + + _, isInTerminalState := terminalDatafeedStates[*currentState] + if isInTerminalState { + return false, fmt.Errorf("%w: datafeed is in state [%s] but desired state is [%s]", errDatafeedInUndesiredState, *currentState, desiredState) } - return currentState == desiredState, nil + + return false, nil + } + + err := asyncutils.WaitForStateTransition(ctx, "datafeed", datafeedId, stateChecker) + if errors.Is(err, errDatafeedInUndesiredState) { + return false, nil } - return asyncutils.WaitForStateTransition(ctx, "datafeed", datafeedId, stateChecker) + return err == nil, diagutil.FrameworkDiagFromError(err) } diff --git a/internal/elasticsearch/ml/datafeed/update.go b/internal/elasticsearch/ml/datafeed/update.go index 8e967def8..9ea75e991 100644 --- a/internal/elasticsearch/ml/datafeed/update.go +++ b/internal/elasticsearch/ml/datafeed/update.go @@ -2,7 +2,6 @@ package datafeed import ( "context" - "fmt" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" @@ -62,9 +61,9 @@ func (r *datafeedResource) update(ctx context.Context, req resource.UpdateReques } // Wait for the datafeed to reach started state - err := r.waitForDatafeedState(ctx, datafeedId, "started") - if err != nil { - resp.Diagnostics.AddError("Failed to wait for datafeed to start", fmt.Sprintf("Datafeed %s did not start within timeout: %s", datafeedId, err.Error())) + _, waitDiags := WaitForDatafeedState(ctx, r.client, datafeedId, "started") + resp.Diagnostics.Append(waitDiags...) + if resp.Diagnostics.HasError() { return } } From 4d0a34752a5a6b519a520480c534625b81a2c85b Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Wed, 29 Oct 2025 16:54:51 +1100 Subject: [PATCH 03/11] Add ML datafeed state resource --- .../elasticsearch_ml_datafeed_state.md | 192 +++++++++++++++++ .../import.sh | 1 + .../resource.tf | 113 ++++++++++ go.mod | 1 + go.sum | 2 + .../ml/datafeed_state/acc_test.go | 121 +++++++++++ .../elasticsearch/ml/datafeed_state/create.go | 27 +++ .../elasticsearch/ml/datafeed_state/delete.go | 68 ++++++ .../elasticsearch/ml/datafeed_state/models.go | 44 ++++ .../elasticsearch/ml/datafeed_state/read.go | 72 +++++++ .../ml/datafeed_state/resource-description.md | 3 + .../ml/datafeed_state/resource.go | 32 +++ .../elasticsearch/ml/datafeed_state/schema.go | 95 +++++++++ .../started/datafeed_state.tf | 76 +++++++ .../stopped/datafeed_state.tf | 76 +++++++ .../create/datafeed_state.tf | 76 +++++++ .../with_times/datafeed_state.tf | 87 ++++++++ .../elasticsearch/ml/datafeed_state/update.go | 199 ++++++++++++++++++ internal/elasticsearch/ml/job_state/update.go | 4 +- internal/models/ml.go | 25 ++- provider/plugin_framework.go | 2 + 21 files changed, 1309 insertions(+), 7 deletions(-) create mode 100644 docs/resources/elasticsearch_ml_datafeed_state.md create mode 100644 examples/resources/elasticstack_elasticsearch_ml_datafeed_state/import.sh create mode 100644 examples/resources/elasticstack_elasticsearch_ml_datafeed_state/resource.tf create mode 100644 internal/elasticsearch/ml/datafeed_state/acc_test.go create mode 100644 internal/elasticsearch/ml/datafeed_state/create.go create mode 100644 internal/elasticsearch/ml/datafeed_state/delete.go create mode 100644 internal/elasticsearch/ml/datafeed_state/models.go create mode 100644 internal/elasticsearch/ml/datafeed_state/read.go create mode 100644 internal/elasticsearch/ml/datafeed_state/resource-description.md create mode 100644 internal/elasticsearch/ml/datafeed_state/resource.go create mode 100644 internal/elasticsearch/ml/datafeed_state/schema.go create mode 100644 internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/started/datafeed_state.tf create mode 100644 internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/stopped/datafeed_state.tf create mode 100644 internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_import/create/datafeed_state.tf create mode 100644 internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_withTimes/with_times/datafeed_state.tf create mode 100644 internal/elasticsearch/ml/datafeed_state/update.go diff --git a/docs/resources/elasticsearch_ml_datafeed_state.md b/docs/resources/elasticsearch_ml_datafeed_state.md new file mode 100644 index 000000000..700e2b790 --- /dev/null +++ b/docs/resources/elasticsearch_ml_datafeed_state.md @@ -0,0 +1,192 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "elasticstack_elasticsearch_ml_datafeed_state Resource - terraform-provider-elasticstack" +subcategory: "Ml" +description: |- + Manages the state of an existing Elasticsearch ML datafeed by starting or stopping it. This resource does not create or configure a datafeed, but instead manages the operational state of an existing datafeed. + Note: Starting a non-realtime datafeed (i.e with an absolute end time) will result in the datafeed automatically stopping once all available data has been processed. By default, Terraform will restart the datafeed from the configured start time and reprocess all data again. It's recommended to ignore changes to the state attribute via the resource lifecycle https://developer.hashicorp.com/terraform/tutorials/state/resource-lifecycle#ignore-changes for non-realtime datafeeds. +--- + +# elasticstack_elasticsearch_ml_datafeed_state (Resource) + +Manages the state of an existing Elasticsearch ML datafeed by starting or stopping it. This resource does not create or configure a datafeed, but instead manages the operational state of an existing datafeed. + +Note: Starting a non-realtime datafeed (i.e with an absolute end time) will result in the datafeed automatically stopping once all available data has been processed. By default, Terraform will restart the datafeed from the configured start time and reprocess all data again. It's recommended to ignore changes to the `state` attribute via the [resource lifecycle](https://developer.hashicorp.com/terraform/tutorials/state/resource-lifecycle#ignore-changes) for non-realtime datafeeds. + +## Example Usage + +```terraform +## The following resources setup a realtime ML datafeed. +resource "elasticstack_elasticsearch_index" "ml_datafeed_index" { + name = "ml-datafeed-data" + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + user = { + type = "keyword" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "example" { + job_id = "example-anomaly-job" + description = "Example anomaly detection job" + + analysis_config { + bucket_span = "15m" + detectors { + function = "mean" + field_name = "value" + by_field_name = "user" + } + } + + data_description { + time_field = "@timestamp" + } +} + +resource "elasticstack_elasticsearch_ml_datafeed" "example" { + datafeed_id = "example-datafeed" + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.example.job_id + indices = [elasticstack_elasticsearch_index.ml_datafeed_index.name] + + query = jsonencode({ + bool = { + must = [ + { + range = { + "@timestamp" = { + gte = "now-7d" + } + } + } + ] + } + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "example" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.example.datafeed_id + state = "started" + force = false +} + +## A non-realtime datafeed will automatically stop once all data has been processed. +## It's recommended to ignore changes to the `state` attribute via the resource lifecycle for such datafeeds. + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "non-realtime" { + job_id = "non-realtime-anomaly-job" + description = "Test job for datafeed state testing with time range" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "non-realtime" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.non-realtime.job_id + state = "opened" + + lifecycle { + ignore_changes = ["state"] + } +} + +resource "elasticstack_elasticsearch_ml_datafeed" "non-realtime" { + datafeed_id = "non-realtime-datafeed" + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.non-realtime.job_id + indices = [elasticstack_elasticsearch_index.ml_datafeed_index.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "non-realtime" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.non-realtime.datafeed_id + state = "started" + start = "2024-01-01T00:00:00Z" + end = "2024-01-02T00:00:00Z" + datafeed_timeout = "60s" + + lifecycle { + ignore_changes = ["state"] + } +} +``` + + +## Schema + +### Required + +- `datafeed_id` (String) Identifier for the ML datafeed. +- `state` (String) The desired state for the ML datafeed. Valid values are `started` and `stopped`. + +### Optional + +- `datafeed_timeout` (String) Timeout for the operation. Examples: `30s`, `5m`, `1h`. Default is `30s`. +- `elasticsearch_connection` (Block List, Deprecated) Elasticsearch connection configuration block. (see [below for nested schema](#nestedblock--elasticsearch_connection)) +- `end` (String) The time that the datafeed should end collecting data. When not specified, the datafeed continues in real-time. This property must be specified in RFC 3339 format. +- `force` (Boolean) When stopping a datafeed, use to forcefully stop it. +- `start` (String) The time that the datafeed should start collecting data. When not specified, the datafeed starts in real-time. This property must be specified in RFC 3339 format. +- `timeouts` (Attributes) (see [below for nested schema](#nestedatt--timeouts)) + +### Read-Only + +- `id` (String) Internal identifier of the resource + + +### Nested Schema for `elasticsearch_connection` + +Optional: + +- `api_key` (String, Sensitive) API Key to use for authentication to Elasticsearch +- `bearer_token` (String, Sensitive) Bearer Token to use for authentication to Elasticsearch +- `ca_data` (String) PEM-encoded custom Certificate Authority certificate +- `ca_file` (String) Path to a custom Certificate Authority certificate +- `cert_data` (String) PEM encoded certificate for client auth +- `cert_file` (String) Path to a file containing the PEM encoded certificate for client auth +- `endpoints` (List of String, Sensitive) A list of endpoints where the terraform provider will point to, this must include the http(s) schema and port number. +- `es_client_authentication` (String, Sensitive) ES Client Authentication field to be used with the JWT token +- `headers` (Map of String, Sensitive) A list of headers to be sent with each request to Elasticsearch. +- `insecure` (Boolean) Disable TLS certificate validation +- `key_data` (String, Sensitive) PEM encoded private key for client auth +- `key_file` (String) Path to a file containing the PEM encoded private key for client auth +- `password` (String, Sensitive) Password to use for API authentication to Elasticsearch. +- `username` (String) Username to use for API authentication to Elasticsearch. + + + +### Nested Schema for `timeouts` + +Optional: + +- `create` (String) A string that can be [parsed as a duration](https://pkg.go.dev/time#ParseDuration) consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours). +- `update` (String) A string that can be [parsed as a duration](https://pkg.go.dev/time#ParseDuration) consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours). + +## Import + +Import is supported using the following syntax: + +The [`terraform import` command](https://developer.hashicorp.com/terraform/cli/commands/import) can be used, for example: + +```shell +terraform import elasticstack_elasticsearch_ml_datafeed_state.example my-datafeed-id +``` diff --git a/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/import.sh b/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/import.sh new file mode 100644 index 000000000..925813279 --- /dev/null +++ b/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/import.sh @@ -0,0 +1 @@ +terraform import elasticstack_elasticsearch_ml_datafeed_state.example my-datafeed-id \ No newline at end of file diff --git a/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/resource.tf b/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/resource.tf new file mode 100644 index 000000000..a123b3609 --- /dev/null +++ b/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/resource.tf @@ -0,0 +1,113 @@ +## The following resources setup a realtime ML datafeed. +resource "elasticstack_elasticsearch_index" "ml_datafeed_index" { + name = "ml-datafeed-data" + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + user = { + type = "keyword" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "example" { + job_id = "example-anomaly-job" + description = "Example anomaly detection job" + + analysis_config { + bucket_span = "15m" + detectors { + function = "mean" + field_name = "value" + by_field_name = "user" + } + } + + data_description { + time_field = "@timestamp" + } +} + +resource "elasticstack_elasticsearch_ml_datafeed" "example" { + datafeed_id = "example-datafeed" + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.example.job_id + indices = [elasticstack_elasticsearch_index.ml_datafeed_index.name] + + query = jsonencode({ + bool = { + must = [ + { + range = { + "@timestamp" = { + gte = "now-7d" + } + } + } + ] + } + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "example" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.example.datafeed_id + state = "started" + force = false +} + +## A non-realtime datafeed will automatically stop once all data has been processed. +## It's recommended to ignore changes to the `state` attribute via the resource lifecycle for such datafeeds. + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "non-realtime" { + job_id = "non-realtime-anomaly-job" + description = "Test job for datafeed state testing with time range" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "non-realtime" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.non-realtime.job_id + state = "opened" + + lifecycle { + ignore_changes = ["state"] + } +} + +resource "elasticstack_elasticsearch_ml_datafeed" "non-realtime" { + datafeed_id = "non-realtime-datafeed" + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.non-realtime.job_id + indices = [elasticstack_elasticsearch_index.ml_datafeed_index.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "non-realtime" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.non-realtime.datafeed_id + state = "started" + start = "2024-01-01T00:00:00Z" + end = "2024-01-02T00:00:00Z" + datafeed_timeout = "60s" + + lifecycle { + ignore_changes = ["state"] + } +} \ No newline at end of file diff --git a/go.mod b/go.mod index 1b2e89060..e2b83f34f 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/hashicorp/terraform-plugin-framework v1.16.1 github.com/hashicorp/terraform-plugin-framework-jsontypes v0.2.0 github.com/hashicorp/terraform-plugin-framework-timeouts v0.7.0 + github.com/hashicorp/terraform-plugin-framework-timetypes v0.5.0 github.com/hashicorp/terraform-plugin-framework-validators v0.19.0 github.com/hashicorp/terraform-plugin-go v0.29.0 github.com/hashicorp/terraform-plugin-log v0.9.0 diff --git a/go.sum b/go.sum index b036aaa1f..64d8ab1d4 100644 --- a/go.sum +++ b/go.sum @@ -617,6 +617,8 @@ github.com/hashicorp/terraform-plugin-framework-jsontypes v0.2.0 h1:SJXL5FfJJm17 github.com/hashicorp/terraform-plugin-framework-jsontypes v0.2.0/go.mod h1:p0phD0IYhsu9bR4+6OetVvvH59I6LwjXGnTVEr8ox6E= github.com/hashicorp/terraform-plugin-framework-timeouts v0.7.0 h1:jblRy1PkLfPm5hb5XeMa3tezusnMRziUGqtT5epSYoI= github.com/hashicorp/terraform-plugin-framework-timeouts v0.7.0/go.mod h1:5jm2XK8uqrdiSRfD5O47OoxyGMCnwTcl8eoiDgSa+tc= +github.com/hashicorp/terraform-plugin-framework-timetypes v0.5.0 h1:v3DapR8gsp3EM8fKMh6up9cJUFQ2iRaFsYLP8UJnCco= +github.com/hashicorp/terraform-plugin-framework-timetypes v0.5.0/go.mod h1:c3PnGE9pHBDfdEVG9t1S1C9ia5LW+gkFR0CygXlM8ak= github.com/hashicorp/terraform-plugin-framework-validators v0.19.0 h1:Zz3iGgzxe/1XBkooZCewS0nJAaCFPFPHdNJd8FgE4Ow= github.com/hashicorp/terraform-plugin-framework-validators v0.19.0/go.mod h1:GBKTNGbGVJohU03dZ7U8wHqc2zYnMUawgCN+gC0itLc= github.com/hashicorp/terraform-plugin-go v0.29.0 h1:1nXKl/nSpaYIUBU1IG/EsDOX0vv+9JxAltQyDMpq5mU= diff --git a/internal/elasticsearch/ml/datafeed_state/acc_test.go b/internal/elasticsearch/ml/datafeed_state/acc_test.go new file mode 100644 index 000000000..b75ab5ab9 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/acc_test.go @@ -0,0 +1,121 @@ +package datafeed_state_test + +import ( + "fmt" + "testing" + + "github.com/elastic/terraform-provider-elasticstack/internal/acctest" + "github.com/hashicorp/terraform-plugin-testing/config" + sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest" + "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "github.com/hashicorp/terraform-plugin-testing/terraform" +) + +func TestAccResourceMLDatafeedState_basic(t *testing.T) { + jobID := fmt.Sprintf("test-job-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + datafeedID := fmt.Sprintf("test-datafeed-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + indexName := fmt.Sprintf("test-datafeed-index-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + Steps: []resource.TestStep{ + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("started"), + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "datafeed_id": config.StringVariable(datafeedID), + "index_name": config.StringVariable(indexName), + }, + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "datafeed_id", datafeedID), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "state", "started"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "force", "false"), + ), + }, + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("stopped"), + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "datafeed_id": config.StringVariable(datafeedID), + "index_name": config.StringVariable(indexName), + }, + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "datafeed_id", datafeedID), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "state", "stopped"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "force", "false"), + ), + }, + }, + }) +} + +func TestAccResourceMLDatafeedState_import(t *testing.T) { + jobID := fmt.Sprintf("test-job-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + datafeedID := fmt.Sprintf("test-datafeed-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + indexName := fmt.Sprintf("test-datafeed-index-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + Steps: []resource.TestStep{ + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("create"), + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "datafeed_id": config.StringVariable(datafeedID), + "index_name": config.StringVariable(indexName), + }, + }, + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("create"), + ResourceName: "elasticstack_elasticsearch_ml_datafeed_state.test", + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIdentifierAttribute: "datafeed_id", + ImportStateVerifyIgnore: []string{"force", "datafeed_timeout", "id"}, + ImportStateIdFunc: func(s *terraform.State) (string, error) { + rs, ok := s.RootModule().Resources["elasticstack_elasticsearch_ml_datafeed_state.test"] + if !ok { + return "", fmt.Errorf("not found: %s", "elasticstack_elasticsearch_ml_datafeed_state.test") + } + return rs.Primary.Attributes["datafeed_id"], nil + }, + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "datafeed_id": config.StringVariable(datafeedID), + "index_name": config.StringVariable(indexName), + }, + }, + }, + }) +} + +func TestAccResourceMLDatafeedState_withTimes(t *testing.T) { + jobID := fmt.Sprintf("test-job-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + datafeedID := fmt.Sprintf("test-datafeed-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + indexName := fmt.Sprintf("test-datafeed-index-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + Steps: []resource.TestStep{ + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("with_times"), + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "datafeed_id": config.StringVariable(datafeedID), + "index_name": config.StringVariable(indexName), + }, + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "datafeed_id", datafeedID), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "state", "started"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "start", "2024-01-01T00:00:00Z"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "end", "2024-01-02T00:00:00Z"), + ), + }, + }, + }) +} diff --git a/internal/elasticsearch/ml/datafeed_state/create.go b/internal/elasticsearch/ml/datafeed_state/create.go new file mode 100644 index 000000000..d5773730a --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/create.go @@ -0,0 +1,27 @@ +package datafeed_state + +import ( + "context" + "time" + + "github.com/hashicorp/terraform-plugin-framework/resource" +) + +func (r *mlDatafeedStateResource) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) { + var data MLDatafeedStateData + diags := req.Plan.Get(ctx, &data) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + // Get create timeout + createTimeout, fwDiags := data.Timeouts.Create(ctx, 5*time.Minute) // Default 5 minutes + resp.Diagnostics.Append(fwDiags...) + if resp.Diagnostics.HasError() { + return + } + + diags = r.update(ctx, req.Plan, &resp.State, createTimeout) + resp.Diagnostics.Append(diags...) +} diff --git a/internal/elasticsearch/ml/datafeed_state/delete.go b/internal/elasticsearch/ml/datafeed_state/delete.go new file mode 100644 index 000000000..b9d9e85a2 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/delete.go @@ -0,0 +1,68 @@ +package datafeed_state + +import ( + "context" + "fmt" + + "github.com/elastic/terraform-provider-elasticstack/internal/clients" + "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-log/tflog" +) + +func (r *mlDatafeedStateResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) { + var data MLDatafeedStateData + diags := req.State.Get(ctx, &data) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + client, fwDiags := clients.MaybeNewApiClientFromFrameworkResource(ctx, data.ElasticsearchConnection, r.client) + resp.Diagnostics.Append(fwDiags...) + if resp.Diagnostics.HasError() { + return + } + + datafeedId := data.DatafeedId.ValueString() + currentState, fwDiags := datafeed.GetDatafeedState(ctx, client, datafeedId) + resp.Diagnostics.Append(fwDiags...) + if resp.Diagnostics.HasError() { + return + } + + if currentState == nil { + // Datafeed already doesn't exist, nothing to do + tflog.Info(ctx, fmt.Sprintf("ML datafeed %s not found during delete", datafeedId)) + return + } + + // If the datafeed is started, stop it when deleting the resource + if *currentState == "started" { + tflog.Info(ctx, fmt.Sprintf("Stopping ML datafeed %s during delete", datafeedId)) + + // Parse timeout duration + timeout, parseErrs := data.Timeout.Parse() + resp.Diagnostics.Append(parseErrs...) + if resp.Diagnostics.HasError() { + return + } + + force := data.Force.ValueBool() + fwDiags = elasticsearch.StopDatafeed(ctx, client, datafeedId, force, timeout) + resp.Diagnostics.Append(fwDiags...) + if resp.Diagnostics.HasError() { + return + } + + // Wait for the datafeed to stop + _, diags := datafeed.WaitForDatafeedState(ctx, client, datafeedId, "stopped") + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + tflog.Info(ctx, fmt.Sprintf("ML datafeed %s successfully stopped during delete", datafeedId)) + } +} diff --git a/internal/elasticsearch/ml/datafeed_state/models.go b/internal/elasticsearch/ml/datafeed_state/models.go new file mode 100644 index 000000000..3d4e8da4e --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/models.go @@ -0,0 +1,44 @@ +package datafeed_state + +import ( + "strconv" + + "github.com/elastic/terraform-provider-elasticstack/internal/utils" + "github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes" + "github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts" + "github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/types" +) + +type MLDatafeedStateData struct { + Id types.String `tfsdk:"id"` + ElasticsearchConnection types.List `tfsdk:"elasticsearch_connection"` + DatafeedId types.String `tfsdk:"datafeed_id"` + State types.String `tfsdk:"state"` + Force types.Bool `tfsdk:"force"` + Timeout customtypes.Duration `tfsdk:"datafeed_timeout"` + Start timetypes.RFC3339 `tfsdk:"start"` + End timetypes.RFC3339 `tfsdk:"end"` + Timeouts timeouts.Value `tfsdk:"timeouts"` +} + +func (d MLDatafeedStateData) GetStartAsString() (string, diag.Diagnostics) { + return d.getTimeAttributeAsString(d.Start) +} + +func (d MLDatafeedStateData) GetEndAsString() (string, diag.Diagnostics) { + return d.getTimeAttributeAsString(d.End) +} + +func (d MLDatafeedStateData) getTimeAttributeAsString(val timetypes.RFC3339) (string, diag.Diagnostics) { + if !utils.IsKnown(val) { + return "", nil + } + + valTime, diags := val.ValueRFC3339Time() + if diags.HasError() { + return "", diags + } + return strconv.FormatInt(valTime.Unix(), 10), nil +} diff --git a/internal/elasticsearch/ml/datafeed_state/read.go b/internal/elasticsearch/ml/datafeed_state/read.go new file mode 100644 index 000000000..c38a40201 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/read.go @@ -0,0 +1,72 @@ +package datafeed_state + +import ( + "context" + "time" + + "github.com/elastic/terraform-provider-elasticstack/internal/clients" + "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-framework/types" +) + +func (r *mlDatafeedStateResource) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) { + var data MLDatafeedStateData + diags := req.State.Get(ctx, &data) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + readData, diags := r.read(ctx, data) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + if readData == nil { + resp.State.RemoveResource(ctx) + return + } + + resp.Diagnostics.Append(resp.State.Set(ctx, readData)...) +} + +func (r *mlDatafeedStateResource) read(ctx context.Context, data MLDatafeedStateData) (*MLDatafeedStateData, diag.Diagnostics) { + client, diags := clients.MaybeNewApiClientFromFrameworkResource(ctx, data.ElasticsearchConnection, r.client) + if diags.HasError() { + return nil, diags + } + + datafeedId := data.DatafeedId.ValueString() + // Check if the datafeed exists by getting its stats + datafeedStats, getDiags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId) + diags.Append(getDiags...) + if diags.HasError() { + return nil, diags + } + + if datafeedStats == nil { + return nil, diags + } + + // Update the data with current information + data.State = types.StringValue(datafeedStats.State) + + if datafeedStats.State == "started" { + if datafeedStats.RunningState == nil { + diags.AddWarning("Running state was empty for a started datafeed", "The Elasticsearch API returned an empty running state for a Datafeed which was successfully started. Ignoring start and end response values.") + } + + data.Start = timetypes.NewRFC3339TimeValue(time.UnixMilli(datafeedStats.RunningState.SearchInterval.StartMS)) + if datafeedStats.RunningState.RealTimeConfigured { + data.End = timetypes.NewRFC3339Null() + } else { + data.End = timetypes.NewRFC3339TimeValue(time.UnixMilli(datafeedStats.RunningState.SearchInterval.EndMS)) + } + } + + return &data, diags +} diff --git a/internal/elasticsearch/ml/datafeed_state/resource-description.md b/internal/elasticsearch/ml/datafeed_state/resource-description.md new file mode 100644 index 000000000..4f01d2398 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/resource-description.md @@ -0,0 +1,3 @@ +Manages the state of an existing Elasticsearch ML datafeed by starting or stopping it. This resource does not create or configure a datafeed, but instead manages the operational state of an existing datafeed. + +Note: Starting a non-realtime datafeed (i.e with an absolute end time) will result in the datafeed automatically stopping once all available data has been processed. By default, Terraform will restart the datafeed from the configured start time and reprocess all data again. It's recommended to ignore changes to the `state` attribute via the [resource lifecycle](https://developer.hashicorp.com/terraform/tutorials/state/resource-lifecycle#ignore-changes) for non-realtime datafeeds. \ No newline at end of file diff --git a/internal/elasticsearch/ml/datafeed_state/resource.go b/internal/elasticsearch/ml/datafeed_state/resource.go new file mode 100644 index 000000000..d8fde7969 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/resource.go @@ -0,0 +1,32 @@ +package datafeed_state + +import ( + "context" + + "github.com/elastic/terraform-provider-elasticstack/internal/clients" + "github.com/hashicorp/terraform-plugin-framework/path" + "github.com/hashicorp/terraform-plugin-framework/resource" +) + +func NewMLDatafeedStateResource() resource.Resource { + return &mlDatafeedStateResource{} +} + +type mlDatafeedStateResource struct { + client *clients.ApiClient +} + +func (r *mlDatafeedStateResource) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) { + resp.TypeName = req.ProviderTypeName + "_elasticsearch_ml_datafeed_state" +} + +func (r *mlDatafeedStateResource) Configure(_ context.Context, req resource.ConfigureRequest, resp *resource.ConfigureResponse) { + client, diags := clients.ConvertProviderData(req.ProviderData) + resp.Diagnostics.Append(diags...) + r.client = client +} + +func (r *mlDatafeedStateResource) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) { + // Retrieve import ID and save to datafeed_id attribute + resource.ImportStatePassthroughID(ctx, path.Root("datafeed_id"), req, resp) +} diff --git a/internal/elasticsearch/ml/datafeed_state/schema.go b/internal/elasticsearch/ml/datafeed_state/schema.go new file mode 100644 index 000000000..a789591dd --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/schema.go @@ -0,0 +1,95 @@ +package datafeed_state + +import ( + "context" + _ "embed" + "regexp" + + "github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes" + "github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts" + "github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes" + "github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-framework/resource/schema" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/booldefault" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringdefault" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/schema/validator" + + providerschema "github.com/elastic/terraform-provider-elasticstack/internal/schema" +) + +func (r *mlDatafeedStateResource) Schema(_ context.Context, _ resource.SchemaRequest, resp *resource.SchemaResponse) { + resp.Schema = GetSchema() +} + +//go:embed resource-description.md +var description string + +func GetSchema() schema.Schema { + return schema.Schema{ + MarkdownDescription: description, + Blocks: map[string]schema.Block{ + "elasticsearch_connection": providerschema.GetEsFWConnectionBlock("elasticsearch_connection", false), + }, + Attributes: map[string]schema.Attribute{ + "id": schema.StringAttribute{ + MarkdownDescription: "Internal identifier of the resource", + Computed: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.UseStateForUnknown(), + }, + }, + "datafeed_id": schema.StringAttribute{ + MarkdownDescription: "Identifier for the ML datafeed.", + Required: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.RequiresReplace(), + }, + Validators: []validator.String{ + stringvalidator.LengthBetween(1, 64), + stringvalidator.RegexMatches(regexp.MustCompile(`^[a-zA-Z0-9_-]+$`), "must contain only alphanumeric characters, hyphens, and underscores"), + }, + }, + "state": schema.StringAttribute{ + MarkdownDescription: "The desired state for the ML datafeed. Valid values are `started` and `stopped`.", + Required: true, + Validators: []validator.String{ + stringvalidator.OneOf("started", "stopped"), + }, + }, + "force": schema.BoolAttribute{ + MarkdownDescription: "When stopping a datafeed, use to forcefully stop it.", + Optional: true, + Computed: true, + Default: booldefault.StaticBool(false), + }, + "start": schema.StringAttribute{ + MarkdownDescription: "The time that the datafeed should start collecting data. When not specified, the datafeed starts in real-time. This property must be specified in RFC 3339 format.", + CustomType: timetypes.RFC3339Type{}, + Optional: true, + Computed: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.UseStateForUnknown(), + }, + }, + "end": schema.StringAttribute{ + MarkdownDescription: "The time that the datafeed should end collecting data. When not specified, the datafeed continues in real-time. This property must be specified in RFC 3339 format.", + CustomType: timetypes.RFC3339Type{}, + Optional: true, + }, + "datafeed_timeout": schema.StringAttribute{ + MarkdownDescription: "Timeout for the operation. Examples: `30s`, `5m`, `1h`. Default is `30s`.", + Optional: true, + Computed: true, + Default: stringdefault.StaticString("30s"), + CustomType: customtypes.DurationType{}, + }, + "timeouts": timeouts.Attributes(context.Background(), timeouts.Opts{ + Create: true, + Update: true, + }), + }, + } +} diff --git a/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/started/datafeed_state.tf b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/started/datafeed_state.tf new file mode 100644 index 000000000..2a5964cdd --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/started/datafeed_state.tf @@ -0,0 +1,76 @@ +variable "job_id" { + description = "The ML job ID" + type = string +} + +variable "datafeed_id" { + description = "The ML datafeed ID" + type = string +} + +variable "index_name" { + description = "The index name" + type = string +} + +provider "elasticstack" { + elasticsearch {} +} + +resource "elasticstack_elasticsearch_index" "test" { + name = var.index_name + deletion_protection = false + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { + job_id = var.job_id + description = "Test job for datafeed state testing" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "test" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + state = "opened" +} + +resource "elasticstack_elasticsearch_ml_datafeed" "test" { + datafeed_id = var.datafeed_id + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + indices = [elasticstack_elasticsearch_index.test.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "test" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.test.datafeed_id + state = "started" + + depends_on = [ + elasticstack_elasticsearch_ml_datafeed.test, + elasticstack_elasticsearch_ml_job_state.test + ] +} \ No newline at end of file diff --git a/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/stopped/datafeed_state.tf b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/stopped/datafeed_state.tf new file mode 100644 index 000000000..da42e54d2 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/stopped/datafeed_state.tf @@ -0,0 +1,76 @@ +variable "job_id" { + description = "The ML job ID" + type = string +} + +variable "datafeed_id" { + description = "The ML datafeed ID" + type = string +} + +variable "index_name" { + description = "The index name" + type = string +} + +provider "elasticstack" { + elasticsearch {} +} + +resource "elasticstack_elasticsearch_index" "test" { + name = var.index_name + deletion_protection = false + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { + job_id = var.job_id + description = "Test job for datafeed state testing" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "test" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + state = "opened" +} + +resource "elasticstack_elasticsearch_ml_datafeed" "test" { + datafeed_id = var.datafeed_id + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + indices = [elasticstack_elasticsearch_index.test.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "test" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.test.datafeed_id + state = "stopped" + + depends_on = [ + elasticstack_elasticsearch_ml_datafeed.test, + elasticstack_elasticsearch_ml_job_state.test + ] +} \ No newline at end of file diff --git a/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_import/create/datafeed_state.tf b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_import/create/datafeed_state.tf new file mode 100644 index 000000000..2a5964cdd --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_import/create/datafeed_state.tf @@ -0,0 +1,76 @@ +variable "job_id" { + description = "The ML job ID" + type = string +} + +variable "datafeed_id" { + description = "The ML datafeed ID" + type = string +} + +variable "index_name" { + description = "The index name" + type = string +} + +provider "elasticstack" { + elasticsearch {} +} + +resource "elasticstack_elasticsearch_index" "test" { + name = var.index_name + deletion_protection = false + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { + job_id = var.job_id + description = "Test job for datafeed state testing" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "test" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + state = "opened" +} + +resource "elasticstack_elasticsearch_ml_datafeed" "test" { + datafeed_id = var.datafeed_id + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + indices = [elasticstack_elasticsearch_index.test.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "test" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.test.datafeed_id + state = "started" + + depends_on = [ + elasticstack_elasticsearch_ml_datafeed.test, + elasticstack_elasticsearch_ml_job_state.test + ] +} \ No newline at end of file diff --git a/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_withTimes/with_times/datafeed_state.tf b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_withTimes/with_times/datafeed_state.tf new file mode 100644 index 000000000..3ac6a2abc --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_withTimes/with_times/datafeed_state.tf @@ -0,0 +1,87 @@ +variable "job_id" { + description = "The ML job ID" + type = string +} + +variable "datafeed_id" { + description = "The ML datafeed ID" + type = string +} + +variable "index_name" { + description = "The index name" + type = string +} + +provider "elasticstack" { + elasticsearch {} +} + +resource "elasticstack_elasticsearch_index" "test" { + name = var.index_name + deletion_protection = false + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { + job_id = var.job_id + description = "Test job for datafeed state testing with time range" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "test" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + state = "opened" + + lifecycle { + ignore_changes = ["state"] + } +} + +resource "elasticstack_elasticsearch_ml_datafeed" "test" { + datafeed_id = var.datafeed_id + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + indices = [elasticstack_elasticsearch_index.test.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "test" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.test.datafeed_id + state = "started" + start = "2024-01-01T00:00:00Z" + end = "2024-01-02T00:00:00Z" + datafeed_timeout = "60s" + + depends_on = [ + elasticstack_elasticsearch_ml_datafeed.test, + elasticstack_elasticsearch_ml_job_state.test + ] + + lifecycle { + ignore_changes = ["state"] + } +} \ No newline at end of file diff --git a/internal/elasticsearch/ml/datafeed_state/update.go b/internal/elasticsearch/ml/datafeed_state/update.go new file mode 100644 index 000000000..d646b5ee2 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/update.go @@ -0,0 +1,199 @@ +package datafeed_state + +import ( + "context" + "fmt" + "time" + + "github.com/elastic/terraform-provider-elasticstack/internal/clients" + "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed" + "github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-framework/tfsdk" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/hashicorp/terraform-plugin-log/tflog" +) + +func (r *mlDatafeedStateResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) { + var data MLDatafeedStateData + diags := req.Plan.Get(ctx, &data) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + // Get update timeout + updateTimeout, fwDiags := data.Timeouts.Update(ctx, 5*time.Minute) // Default 5 minutes + resp.Diagnostics.Append(fwDiags...) + if resp.Diagnostics.HasError() { + return + } + + diags = r.update(ctx, req.Plan, &resp.State, updateTimeout) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } +} + +func (r *mlDatafeedStateResource) update(ctx context.Context, plan tfsdk.Plan, state *tfsdk.State, operationTimeout time.Duration) diag.Diagnostics { + var data MLDatafeedStateData + diags := plan.Get(ctx, &data) + if diags.HasError() { + return diags + } + + client, fwDiags := clients.MaybeNewApiClientFromFrameworkResource(ctx, data.ElasticsearchConnection, r.client) + diags.Append(fwDiags...) + if diags.HasError() { + return diags + } + + datafeedId := data.DatafeedId.ValueString() + desiredState := data.State.ValueString() + + // Create context with timeout + ctx, cancel := context.WithTimeout(ctx, operationTimeout) + defer cancel() + + // First, get the current datafeed stats to check if the datafeed exists and its current state + datafeedStats, fwDiags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId) + diags.Append(fwDiags...) + if diags.HasError() { + return diags + } + + if datafeedStats == nil { + diags.AddError( + "ML Datafeed not found", + fmt.Sprintf("ML datafeed %s does not exist", datafeedId), + ) + return diags + } + + // Perform state transition if needed + inDesiredState, fwDiags := r.performStateTransition(ctx, client, data, datafeedStats.State) + diags.Append(fwDiags...) + if diags.HasError() { + return diags + } + + // Generate composite ID + compId, sdkDiags := client.ID(ctx, datafeedId) + if len(sdkDiags) > 0 { + for _, d := range sdkDiags { + diags.AddError(d.Summary, d.Detail) + } + return diags + } + + // Set the response state + data.Id = types.StringValue(compId.String()) + + var finalData *MLDatafeedStateData + if inDesiredState { + var getDiags diag.Diagnostics + finalData, getDiags = r.read(ctx, data) + diags.Append(getDiags...) + if diags.HasError() { + return diags + } + } else { + statsAfterUpdate, fwDiags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId) + diags.Append(fwDiags...) + if diags.HasError() { + return diags + } + + if datafeedStats == nil { + diags.AddError( + "ML Datafeed not found", + fmt.Sprintf("ML datafeed %s does not exist after successful update", datafeedId), + ) + return diags + } + + if statsAfterUpdate.TimingStats.SearchCount < datafeedStats.TimingStats.SearchCount { + diags.AddError( + "Datafeed did not successfully transition to the desired state", + fmt.Sprintf("[%s] datafeed did not settle into the [%s] state. The current state is [%s]", datafeedId, desiredState, statsAfterUpdate.State), + ) + return diags + } + + if data.Start.IsUnknown() { + data.Start = timetypes.NewRFC3339Null() + } + + finalData = &data + } + + if finalData == nil { + diags.AddError("Failed to read datafeed stats after update", fmt.Sprintf("The datafeed was successfully transitioned to the %s state, but could not be read after this change", desiredState)) + return diags + } + + diags.Append(state.Set(ctx, finalData)...) + return diags +} + +// performStateTransition handles the ML datafeed state transition process +func (r *mlDatafeedStateResource) performStateTransition(ctx context.Context, client *clients.ApiClient, data MLDatafeedStateData, currentState string) (bool, diag.Diagnostics) { + datafeedId := data.DatafeedId.ValueString() + desiredState := data.State.ValueString() + force := data.Force.ValueBool() + + // Parse timeout duration + timeout, parseErrs := data.Timeout.Parse() + if parseErrs.HasError() { + return false, parseErrs + } + + // Return early if no state change is needed + if currentState == desiredState { + tflog.Debug(ctx, fmt.Sprintf("ML datafeed %s is already in desired state %s", datafeedId, desiredState)) + return false, nil + } + + // Initiate the state change + switch desiredState { + case "started": + start, diags := data.GetStartAsString() + if diags.HasError() { + return false, diags + } + end, endDiags := data.GetEndAsString() + diags.Append(endDiags...) + if diags.HasError() { + return false, diags + } + + startDiags := elasticsearch.StartDatafeed(ctx, client, datafeedId, start, end, timeout) + diags.Append(startDiags...) + if diags.HasError() { + return false, diags + } + case "stopped": + if diags := elasticsearch.StopDatafeed(ctx, client, datafeedId, force, timeout); diags.HasError() { + return false, diags + } + default: + return false, diag.Diagnostics{ + diag.NewErrorDiagnostic( + "Invalid state", + fmt.Sprintf("Invalid state %s. Valid states are 'started' and 'stopped'", desiredState), + ), + } + } + + // Wait for state transition to complete + inDesiredState, diags := datafeed.WaitForDatafeedState(ctx, client, datafeedId, desiredState) + if diags.HasError() { + return false, diags + } + + tflog.Info(ctx, fmt.Sprintf("ML datafeed %s successfully transitioned to state %s", datafeedId, desiredState)) + return inDesiredState, nil +} diff --git a/internal/elasticsearch/ml/job_state/update.go b/internal/elasticsearch/ml/job_state/update.go index 03be71df3..ff99a656d 100644 --- a/internal/elasticsearch/ml/job_state/update.go +++ b/internal/elasticsearch/ml/job_state/update.go @@ -72,7 +72,7 @@ func (r *mlJobStateResource) update(ctx context.Context, plan tfsdk.Plan, state } // Perform state transition if needed - fwDiags = r.performStateTransition(ctx, client, data, *currentState, operationTimeout) + fwDiags = r.performStateTransition(ctx, client, data, *currentState) diags.Append(fwDiags...) if diags.HasError() { return diags @@ -97,7 +97,7 @@ func (r *mlJobStateResource) update(ctx context.Context, plan tfsdk.Plan, state } // performStateTransition handles the ML job state transition process -func (r *mlJobStateResource) performStateTransition(ctx context.Context, client *clients.ApiClient, data MLJobStateData, currentState string, operationTimeout time.Duration) diag.Diagnostics { +func (r *mlJobStateResource) performStateTransition(ctx context.Context, client *clients.ApiClient, data MLJobStateData, currentState string) diag.Diagnostics { jobId := data.JobId.ValueString() desiredState := data.State.ValueString() force := data.Force.ValueBool() diff --git a/internal/models/ml.go b/internal/models/ml.go index 427f3b2d8..a102a3189 100644 --- a/internal/models/ml.go +++ b/internal/models/ml.go @@ -106,11 +106,12 @@ type DatafeedStatsResponse struct { // DatafeedStats represents the statistics for a single datafeed type DatafeedStats struct { - DatafeedId string `json:"datafeed_id"` - State string `json:"state"` - Node *DatafeedNode `json:"node,omitempty"` - AssignmentExplanation *string `json:"assignment_explanation,omitempty"` - RunningState *DatafeedRunning `json:"running_state,omitempty"` + DatafeedId string `json:"datafeed_id"` + State string `json:"state"` + Node *DatafeedNode `json:"node,omitempty"` + AssignmentExplanation *string `json:"assignment_explanation,omitempty"` + RunningState *DatafeedRunning `json:"running_state,omitempty"` + TimingStats *DatafeedTimingStats `json:"timing_stats"` } // DatafeedNode represents the node information for a datafeed @@ -124,6 +125,20 @@ type DatafeedNode struct { // DatafeedRunning represents the running state of a datafeed type DatafeedRunning struct { + RealTimeConfigured bool `json:"real_time_configured"` + RealTimeRunning bool `json:"real_time_running"` + SearchInterval DatafeedSearchInterval `json:"search_interval,omitempty"` + LastEndTime *time.Time `json:"last_end_time,omitempty"` +} + +type DatafeedTimingStats struct { + SearchCount int `json:"search_count"` +} + +type DatafeedSearchInterval struct { + StartMS int64 `json:"start_ms"` + EndMS int64 `json:"end_ms"` +} // MLJobStats represents the statistics structure for an ML job type MLJobStats struct { diff --git a/provider/plugin_framework.go b/provider/plugin_framework.go index 14a0d9939..0685d4228 100644 --- a/provider/plugin_framework.go +++ b/provider/plugin_framework.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/index/indices" "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/anomaly_detection_job" "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed" + "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed_state" "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/job_state" "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/security/api_key" "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/security/role_mapping" @@ -128,5 +129,6 @@ func (p *Provider) Resources(ctx context.Context) []func() resource.Resource { anomaly_detection_job.NewAnomalyDetectionJobResource, security_detection_rule.NewSecurityDetectionRuleResource, job_state.NewMLJobStateResource, + datafeed_state.NewMLDatafeedStateResource, } } From f281fd721f30aae8bda7df0c27f2f7e382286e56 Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Mon, 10 Nov 2025 13:03:12 +1100 Subject: [PATCH 04/11] State constants --- internal/elasticsearch/ml/datafeed/delete.go | 4 ++-- .../elasticsearch/ml/datafeed/state_utils.go | 21 +++++++++++++------ internal/elasticsearch/ml/datafeed/update.go | 2 +- .../elasticsearch/ml/datafeed_state/delete.go | 4 ++-- .../elasticsearch/ml/datafeed_state/read.go | 3 ++- .../elasticsearch/ml/datafeed_state/schema.go | 3 ++- .../elasticsearch/ml/datafeed_state/update.go | 8 +++---- 7 files changed, 28 insertions(+), 17 deletions(-) diff --git a/internal/elasticsearch/ml/datafeed/delete.go b/internal/elasticsearch/ml/datafeed/delete.go index d4f196f55..cd24b5aad 100644 --- a/internal/elasticsearch/ml/datafeed/delete.go +++ b/internal/elasticsearch/ml/datafeed/delete.go @@ -54,7 +54,7 @@ func (r *datafeedResource) maybeStopDatafeed(ctx context.Context, datafeedId str } // If the datafeed is not running, nothing to stop - if *currentState != "started" && *currentState != "starting" { + if *currentState != StateStarted && *currentState != StateStarting { return false, diags } @@ -66,7 +66,7 @@ func (r *datafeedResource) maybeStopDatafeed(ctx context.Context, datafeedId str } // Wait for the datafeed to reach stopped state - _, waitDiags := WaitForDatafeedState(ctx, r.client, datafeedId, "stopped") + _, waitDiags := WaitForDatafeedState(ctx, r.client, datafeedId, StateStopped) diags.Append(waitDiags...) if diags.HasError() { return true, diags diff --git a/internal/elasticsearch/ml/datafeed/state_utils.go b/internal/elasticsearch/ml/datafeed/state_utils.go index bee9c2b95..522ebfb16 100644 --- a/internal/elasticsearch/ml/datafeed/state_utils.go +++ b/internal/elasticsearch/ml/datafeed/state_utils.go @@ -12,8 +12,16 @@ import ( "github.com/hashicorp/terraform-plugin-framework/diag" ) +type State string + +const ( + StateStopped State = "stopped" + StateStarted State = "started" + StateStarting State = "starting" +) + // GetDatafeedState returns the current state of a datafeed -func GetDatafeedState(ctx context.Context, client *clients.ApiClient, datafeedId string) (*string, diag.Diagnostics) { +func GetDatafeedState(ctx context.Context, client *clients.ApiClient, datafeedId string) (*State, diag.Diagnostics) { statsResponse, diags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId) if diags.HasError() { return nil, diags @@ -23,18 +31,19 @@ func GetDatafeedState(ctx context.Context, client *clients.ApiClient, datafeedId return nil, nil } - return &statsResponse.State, nil + state := State(statsResponse.State) + return &state, nil } -var terminalDatafeedStates = map[string]struct{}{ - "stopped": {}, - "started": {}, +var terminalDatafeedStates = map[State]struct{}{ + StateStopped: {}, + StateStarted: {}, } var errDatafeedInUndesiredState = errors.New("datafeed stuck in undesired state") // WaitForDatafeedState waits for a datafeed to reach the desired state -func WaitForDatafeedState(ctx context.Context, client *clients.ApiClient, datafeedId, desiredState string) (bool, diag.Diagnostics) { +func WaitForDatafeedState(ctx context.Context, client *clients.ApiClient, datafeedId string, desiredState State) (bool, diag.Diagnostics) { stateChecker := func(ctx context.Context) (bool, error) { currentState, diags := GetDatafeedState(ctx, client, datafeedId) if diags.HasError() { diff --git a/internal/elasticsearch/ml/datafeed/update.go b/internal/elasticsearch/ml/datafeed/update.go index 9ea75e991..c7eb776ea 100644 --- a/internal/elasticsearch/ml/datafeed/update.go +++ b/internal/elasticsearch/ml/datafeed/update.go @@ -61,7 +61,7 @@ func (r *datafeedResource) update(ctx context.Context, req resource.UpdateReques } // Wait for the datafeed to reach started state - _, waitDiags := WaitForDatafeedState(ctx, r.client, datafeedId, "started") + _, waitDiags := WaitForDatafeedState(ctx, r.client, datafeedId, StateStarted) resp.Diagnostics.Append(waitDiags...) if resp.Diagnostics.HasError() { return diff --git a/internal/elasticsearch/ml/datafeed_state/delete.go b/internal/elasticsearch/ml/datafeed_state/delete.go index b9d9e85a2..f6b4f5d6c 100644 --- a/internal/elasticsearch/ml/datafeed_state/delete.go +++ b/internal/elasticsearch/ml/datafeed_state/delete.go @@ -39,7 +39,7 @@ func (r *mlDatafeedStateResource) Delete(ctx context.Context, req resource.Delet } // If the datafeed is started, stop it when deleting the resource - if *currentState == "started" { + if *currentState == datafeed.StateStarted { tflog.Info(ctx, fmt.Sprintf("Stopping ML datafeed %s during delete", datafeedId)) // Parse timeout duration @@ -57,7 +57,7 @@ func (r *mlDatafeedStateResource) Delete(ctx context.Context, req resource.Delet } // Wait for the datafeed to stop - _, diags := datafeed.WaitForDatafeedState(ctx, client, datafeedId, "stopped") + _, diags := datafeed.WaitForDatafeedState(ctx, client, datafeedId, datafeed.StateStopped) resp.Diagnostics.Append(diags...) if resp.Diagnostics.HasError() { return diff --git a/internal/elasticsearch/ml/datafeed_state/read.go b/internal/elasticsearch/ml/datafeed_state/read.go index c38a40201..c40a7b5e9 100644 --- a/internal/elasticsearch/ml/datafeed_state/read.go +++ b/internal/elasticsearch/ml/datafeed_state/read.go @@ -6,6 +6,7 @@ import ( "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed" "github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes" "github.com/hashicorp/terraform-plugin-framework/diag" "github.com/hashicorp/terraform-plugin-framework/resource" @@ -55,7 +56,7 @@ func (r *mlDatafeedStateResource) read(ctx context.Context, data MLDatafeedState // Update the data with current information data.State = types.StringValue(datafeedStats.State) - if datafeedStats.State == "started" { + if datafeed.State(datafeedStats.State) == datafeed.StateStarted { if datafeedStats.RunningState == nil { diags.AddWarning("Running state was empty for a started datafeed", "The Elasticsearch API returned an empty running state for a Datafeed which was successfully started. Ignoring start and end response values.") } diff --git a/internal/elasticsearch/ml/datafeed_state/schema.go b/internal/elasticsearch/ml/datafeed_state/schema.go index a789591dd..fe27a794a 100644 --- a/internal/elasticsearch/ml/datafeed_state/schema.go +++ b/internal/elasticsearch/ml/datafeed_state/schema.go @@ -5,6 +5,7 @@ import ( _ "embed" "regexp" + "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed" "github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes" "github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts" "github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes" @@ -56,7 +57,7 @@ func GetSchema() schema.Schema { MarkdownDescription: "The desired state for the ML datafeed. Valid values are `started` and `stopped`.", Required: true, Validators: []validator.String{ - stringvalidator.OneOf("started", "stopped"), + stringvalidator.OneOf(string(datafeed.StateStarted), string(datafeed.StateStopped)), }, }, "force": schema.BoolAttribute{ diff --git a/internal/elasticsearch/ml/datafeed_state/update.go b/internal/elasticsearch/ml/datafeed_state/update.go index d646b5ee2..7078f4850 100644 --- a/internal/elasticsearch/ml/datafeed_state/update.go +++ b/internal/elasticsearch/ml/datafeed_state/update.go @@ -140,9 +140,9 @@ func (r *mlDatafeedStateResource) update(ctx context.Context, plan tfsdk.Plan, s } // performStateTransition handles the ML datafeed state transition process -func (r *mlDatafeedStateResource) performStateTransition(ctx context.Context, client *clients.ApiClient, data MLDatafeedStateData, currentState string) (bool, diag.Diagnostics) { +func (r *mlDatafeedStateResource) performStateTransition(ctx context.Context, client *clients.ApiClient, data MLDatafeedStateData, currentState datafeed.State) (bool, diag.Diagnostics) { datafeedId := data.DatafeedId.ValueString() - desiredState := data.State.ValueString() + desiredState := datafeed.State(data.State.ValueString()) force := data.Force.ValueBool() // Parse timeout duration @@ -159,7 +159,7 @@ func (r *mlDatafeedStateResource) performStateTransition(ctx context.Context, cl // Initiate the state change switch desiredState { - case "started": + case datafeed.StateStarted: start, diags := data.GetStartAsString() if diags.HasError() { return false, diags @@ -175,7 +175,7 @@ func (r *mlDatafeedStateResource) performStateTransition(ctx context.Context, cl if diags.HasError() { return false, diags } - case "stopped": + case datafeed.StateStopped: if diags := elasticsearch.StopDatafeed(ctx, client, datafeedId, force, timeout); diags.HasError() { return false, diags } From 0151a268f75fd0cfe7d0f434853b307264e58712 Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Mon, 10 Nov 2025 13:14:40 +1100 Subject: [PATCH 05/11] State constants --- internal/elasticsearch/ml/datafeed_state/update.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/elasticsearch/ml/datafeed_state/update.go b/internal/elasticsearch/ml/datafeed_state/update.go index 7078f4850..131cf9b8b 100644 --- a/internal/elasticsearch/ml/datafeed_state/update.go +++ b/internal/elasticsearch/ml/datafeed_state/update.go @@ -74,7 +74,7 @@ func (r *mlDatafeedStateResource) update(ctx context.Context, plan tfsdk.Plan, s } // Perform state transition if needed - inDesiredState, fwDiags := r.performStateTransition(ctx, client, data, datafeedStats.State) + inDesiredState, fwDiags := r.performStateTransition(ctx, client, data, datafeed.State(datafeedStats.State)) diags.Append(fwDiags...) if diags.HasError() { return diags From 674559b293e3c1f2a8edc4df39d44c9a44270569 Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Mon, 10 Nov 2025 14:07:31 +1100 Subject: [PATCH 06/11] Add test for timeouts on ML job state --- .../elasticsearch/ml/datafeed_state/create.go | 6 ++ .../elasticsearch/ml/datafeed_state/update.go | 5 ++ .../elasticsearch/ml/job_state/acc_test.go | 20 +++++++ internal/elasticsearch/ml/job_state/create.go | 6 ++ .../timeouts/job_state.tf | 58 +++++++++++++++++++ internal/elasticsearch/ml/job_state/update.go | 5 ++ 6 files changed, 100 insertions(+) create mode 100644 internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf diff --git a/internal/elasticsearch/ml/datafeed_state/create.go b/internal/elasticsearch/ml/datafeed_state/create.go index d5773730a..c7cfa84cf 100644 --- a/internal/elasticsearch/ml/datafeed_state/create.go +++ b/internal/elasticsearch/ml/datafeed_state/create.go @@ -2,8 +2,10 @@ package datafeed_state import ( "context" + "fmt" "time" + "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" "github.com/hashicorp/terraform-plugin-framework/resource" ) @@ -23,5 +25,9 @@ func (r *mlDatafeedStateResource) Create(ctx context.Context, req resource.Creat } diags = r.update(ctx, req.Plan, &resp.State, createTimeout) + if diags.Contains(diagutil.FrameworkDiagFromError(context.DeadlineExceeded)[0]) { + diags.AddError("Operation timed out", fmt.Sprintf("The operation to create the ML datafeed state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", createTimeout)) + } + resp.Diagnostics.Append(diags...) } diff --git a/internal/elasticsearch/ml/datafeed_state/update.go b/internal/elasticsearch/ml/datafeed_state/update.go index 131cf9b8b..0351ffd06 100644 --- a/internal/elasticsearch/ml/datafeed_state/update.go +++ b/internal/elasticsearch/ml/datafeed_state/update.go @@ -7,6 +7,7 @@ import ( "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed" "github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes" "github.com/hashicorp/terraform-plugin-framework/diag" @@ -32,6 +33,10 @@ func (r *mlDatafeedStateResource) Update(ctx context.Context, req resource.Updat } diags = r.update(ctx, req.Plan, &resp.State, updateTimeout) + if diags.Contains(diagutil.FrameworkDiagFromError(context.DeadlineExceeded)[0]) { + diags.AddError("Operation timed out", fmt.Sprintf("The operation to update the ML datafeed state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", updateTimeout)) + } + resp.Diagnostics.Append(diags...) if resp.Diagnostics.HasError() { return diff --git a/internal/elasticsearch/ml/job_state/acc_test.go b/internal/elasticsearch/ml/job_state/acc_test.go index aa91e2e5e..bc7ab1f89 100644 --- a/internal/elasticsearch/ml/job_state/acc_test.go +++ b/internal/elasticsearch/ml/job_state/acc_test.go @@ -115,3 +115,23 @@ func TestAccResourceMLJobStateImport(t *testing.T) { }, }) } + +func TestAccResourceMLJobState_timeouts(t *testing.T) { + jobID := fmt.Sprintf("test-job-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + indexName := fmt.Sprintf("test-datafeed-index-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + Steps: []resource.TestStep{ + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("timeouts"), + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "index_name": config.StringVariable(indexName), + }, + ExpectError: regexp.MustCompile("Operation timed out"), + }, + }, + }) +} diff --git a/internal/elasticsearch/ml/job_state/create.go b/internal/elasticsearch/ml/job_state/create.go index 10e7914cd..0fc6f2e0f 100644 --- a/internal/elasticsearch/ml/job_state/create.go +++ b/internal/elasticsearch/ml/job_state/create.go @@ -2,8 +2,10 @@ package job_state import ( "context" + "fmt" "time" + "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" "github.com/hashicorp/terraform-plugin-framework/resource" ) @@ -23,5 +25,9 @@ func (r *mlJobStateResource) Create(ctx context.Context, req resource.CreateRequ } diags = r.update(ctx, req.Plan, &resp.State, createTimeout) + if diags.Contains(diagutil.FrameworkDiagFromError(context.DeadlineExceeded)[0]) { + diags.AddError("Operation timed out", fmt.Sprintf("The operation to create the ML job state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", createTimeout)) + } + resp.Diagnostics.Append(diags...) } diff --git a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf new file mode 100644 index 000000000..09e475782 --- /dev/null +++ b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf @@ -0,0 +1,58 @@ +variable "job_id" { + description = "The ML job ID" + type = string +} + +variable "index_name" { + description = "The index name" + type = string +} + +provider "elasticstack" { + elasticsearch {} +} + +resource "elasticstack_elasticsearch_index" "test" { + name = var.index_name + deletion_protection = false + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { + job_id = var.job_id + description = "Test job for datafeed state timeout testing with large memory" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "784mb" # Large memory requirement close to cluster limit + } + allow_lazy_open = true # This should cause datafeed to wait for available node +} + +resource "elasticstack_elasticsearch_ml_job_state" "test" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + state = "opened" + + timeouts = { + create = "10s" + update = "10s" + } +} \ No newline at end of file diff --git a/internal/elasticsearch/ml/job_state/update.go b/internal/elasticsearch/ml/job_state/update.go index ff99a656d..bbb5a135a 100644 --- a/internal/elasticsearch/ml/job_state/update.go +++ b/internal/elasticsearch/ml/job_state/update.go @@ -7,6 +7,7 @@ import ( "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" "github.com/hashicorp/terraform-plugin-framework/diag" "github.com/hashicorp/terraform-plugin-framework/resource" "github.com/hashicorp/terraform-plugin-framework/tfsdk" @@ -30,6 +31,10 @@ func (r *mlJobStateResource) Update(ctx context.Context, req resource.UpdateRequ } diags = r.update(ctx, req.Plan, &resp.State, updateTimeout) + if diags.Contains(diagutil.FrameworkDiagFromError(context.DeadlineExceeded)[0]) { + diags.AddError("Operation timed out", fmt.Sprintf("The operation to update the ML job state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", updateTimeout)) + } + resp.Diagnostics.Append(diags...) if resp.Diagnostics.HasError() { return From 0a8b08c23a55e784f797d90a4ddfb940f388f405 Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Mon, 10 Nov 2025 14:32:35 +1100 Subject: [PATCH 07/11] make fmt --- .../timeouts/job_state.tf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf index 09e475782..9326ab9bb 100644 --- a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf +++ b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf @@ -42,15 +42,15 @@ resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { time_format = "epoch_ms" } analysis_limits = { - model_memory_limit = "784mb" # Large memory requirement close to cluster limit + model_memory_limit = "784mb" # Large memory requirement close to cluster limit } - allow_lazy_open = true # This should cause datafeed to wait for available node + allow_lazy_open = true # This should cause datafeed to wait for available node } resource "elasticstack_elasticsearch_ml_job_state" "test" { job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id state = "opened" - + timeouts = { create = "10s" update = "10s" From 0e53350c0a8e201b59dc04b393a3fa7186b61f96 Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Mon, 10 Nov 2025 14:39:08 +1100 Subject: [PATCH 08/11] Allow 2G ML models --- .github/workflows/test.yml | 4 +++- docker-compose.yml | 1 + .../TestAccResourceMLJobState_timeouts/timeouts/job_state.tf | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b12f5ce97..0c5b2f407 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -63,6 +63,8 @@ jobs: xpack.security.enabled: true xpack.security.authc.api_key.enabled: true xpack.security.authc.token.enabled: true + xpack.ml.use_auto_machine_memory_percent: true + xpack.ml.max_model_memory_limit: 2g xpack.watcher.enabled: true xpack.license.self_generated.type: trial repositories.url.allowed_urls: https://example.com/* @@ -70,7 +72,7 @@ jobs: ELASTIC_PASSWORD: ${{ env.ELASTIC_PASSWORD }} ports: - 9200:9200 - options: --health-cmd="curl http://localhost:9200/_cluster/health" --health-interval=10s --health-timeout=5s --health-retries=10 + options: --memory=2g --health-cmd="curl http://localhost:9200/_cluster/health" --health-interval=10s --health-timeout=5s --health-retries=10 kibana: image: docker.elastic.co/kibana/kibana:${{ matrix.version }} env: diff --git a/docker-compose.yml b/docker-compose.yml index a86b12398..37c7378d0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,7 @@ services: xpack.security.http.ssl.enabled: false xpack.license.self_generated.type: trial xpack.ml.use_auto_machine_memory_percent: true + xpack.ml.max_model_memory_limit: 2g xpack.security.authc.api_key.enabled: true xpack.security.authc.token.enabled: true xpack.watcher.enabled: true diff --git a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf index 9326ab9bb..468e607c1 100644 --- a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf +++ b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf @@ -42,7 +42,7 @@ resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { time_format = "epoch_ms" } analysis_limits = { - model_memory_limit = "784mb" # Large memory requirement close to cluster limit + model_memory_limit = "2g" # Large memory requirement close to cluster limit } allow_lazy_open = true # This should cause datafeed to wait for available node } From 9ed8173fd2cbe4aad9e41759af4cfd4f2d78a454 Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Mon, 10 Nov 2025 19:31:14 +1100 Subject: [PATCH 09/11] Create a custom type for model memory limit ES doesn't return the size as provided, so ensure 1024mb = 1024m = 1g etc --- .../ml/anomaly_detection_job/models_tf.go | 9 +- .../ml/anomaly_detection_job/schema.go | 5 +- .../elasticsearch/ml/job_state/acc_test.go | 40 ++ .../timeouts/job_state.tf | 7 +- .../utils/customtypes/memory_size_type.go | 68 ++++ .../customtypes/memory_size_type_test.go | 45 ++ .../utils/customtypes/memory_size_value.go | 187 +++++++++ .../customtypes/memory_size_value_test.go | 384 ++++++++++++++++++ 8 files changed, 737 insertions(+), 8 deletions(-) create mode 100644 internal/utils/customtypes/memory_size_type.go create mode 100644 internal/utils/customtypes/memory_size_type_test.go create mode 100644 internal/utils/customtypes/memory_size_value.go create mode 100644 internal/utils/customtypes/memory_size_value_test.go diff --git a/internal/elasticsearch/ml/anomaly_detection_job/models_tf.go b/internal/elasticsearch/ml/anomaly_detection_job/models_tf.go index d2c3e033f..0b4c67ec3 100644 --- a/internal/elasticsearch/ml/anomaly_detection_job/models_tf.go +++ b/internal/elasticsearch/ml/anomaly_detection_job/models_tf.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/elastic/terraform-provider-elasticstack/internal/utils" + "github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes" "github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes" fwdiags "github.com/hashicorp/terraform-plugin-framework/diag" "github.com/hashicorp/terraform-plugin-framework/types" @@ -81,8 +82,8 @@ type RuleConditionTFModel struct { // AnalysisLimitsTFModel represents analysis limits configuration type AnalysisLimitsTFModel struct { - CategorizationExamplesLimit types.Int64 `tfsdk:"categorization_examples_limit"` - ModelMemoryLimit types.String `tfsdk:"model_memory_limit"` + CategorizationExamplesLimit types.Int64 `tfsdk:"categorization_examples_limit"` + ModelMemoryLimit customtypes.MemorySize `tfsdk:"model_memory_limit"` } // DataDescriptionTFModel represents data description configuration @@ -558,9 +559,9 @@ func (tfModel *AnomalyDetectionJobTFModel) convertAnalysisLimitsFromAPI(ctx cont } if apiLimits.ModelMemoryLimit != "" { - analysisLimitsTF.ModelMemoryLimit = types.StringValue(apiLimits.ModelMemoryLimit) + analysisLimitsTF.ModelMemoryLimit = customtypes.NewMemorySizeValue(apiLimits.ModelMemoryLimit) } else { - analysisLimitsTF.ModelMemoryLimit = types.StringNull() + analysisLimitsTF.ModelMemoryLimit = customtypes.NewMemorySizeNull() } analysisLimitsObjectValue, d := types.ObjectValueFrom(ctx, getAnalysisLimitsAttrTypes(), analysisLimitsTF) diff --git a/internal/elasticsearch/ml/anomaly_detection_job/schema.go b/internal/elasticsearch/ml/anomaly_detection_job/schema.go index d3f4a0bfc..ccaefbd5c 100644 --- a/internal/elasticsearch/ml/anomaly_detection_job/schema.go +++ b/internal/elasticsearch/ml/anomaly_detection_job/schema.go @@ -4,6 +4,7 @@ import ( "context" "regexp" + "github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes" "github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes" "github.com/hashicorp/terraform-plugin-framework-validators/int64validator" "github.com/hashicorp/terraform-plugin-framework-validators/listvalidator" @@ -250,9 +251,7 @@ func GetSchema() schema.Schema { "model_memory_limit": schema.StringAttribute{ MarkdownDescription: "The approximate maximum amount of memory resources that are required for analytical processing.", Optional: true, - Validators: []validator.String{ - stringvalidator.RegexMatches(regexp.MustCompile(`^\d+[kmgtKMGT]?[bB]?$`), "must be a valid memory size (e.g., 10mb, 1gb)"), - }, + CustomType: customtypes.MemorySizeType{}, }, }, }, diff --git a/internal/elasticsearch/ml/job_state/acc_test.go b/internal/elasticsearch/ml/job_state/acc_test.go index bc7ab1f89..0cf881150 100644 --- a/internal/elasticsearch/ml/job_state/acc_test.go +++ b/internal/elasticsearch/ml/job_state/acc_test.go @@ -1,15 +1,20 @@ package job_state_test import ( + "encoding/json" "fmt" + "maps" "regexp" + "slices" "testing" "github.com/elastic/terraform-provider-elasticstack/internal/acctest" + "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/hashicorp/terraform-plugin-testing/config" sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest" "github.com/hashicorp/terraform-plugin-testing/helper/resource" "github.com/hashicorp/terraform-plugin-testing/terraform" + "github.com/stretchr/testify/require" ) func TestAccResourceMLJobState(t *testing.T) { @@ -129,9 +134,44 @@ func TestAccResourceMLJobState_timeouts(t *testing.T) { ConfigVariables: config.Variables{ "job_id": config.StringVariable(jobID), "index_name": config.StringVariable(indexName), + "job_memory": config.StringVariable(GetMaxMLJobMemory(t)), }, ExpectError: regexp.MustCompile("Operation timed out"), }, }, }) } + +func GetMaxMLJobMemory(t *testing.T) string { + client, err := clients.NewAcceptanceTestingClient() + require.NoError(t, err) + + esClient, err := client.GetESClient() + require.NoError(t, err) + + resp, err := esClient.ML.GetMemoryStats() + require.NoError(t, err) + + defer resp.Body.Close() + type mlNode struct { + Memory struct { + ML struct { + MaxInBytes int64 `json:"max_in_bytes"` + } `json:"ml"` + } `json:"mem"` + } + var mlMemoryStats struct { + Nodes map[string]mlNode `json:"nodes"` + } + + err = json.NewDecoder(resp.Body).Decode(&mlMemoryStats) + require.NoError(t, err) + + nodes := slices.Collect(maps.Values(mlMemoryStats.Nodes)) + nodeWithMaxMemory := slices.MaxFunc(nodes, func(a, b mlNode) int { + return int(b.Memory.ML.MaxInBytes - a.Memory.ML.MaxInBytes) + }) + + maxAvailableMemoryInMB := nodeWithMaxMemory.Memory.ML.MaxInBytes / (1024 * 1024) + return fmt.Sprintf("%dmb", maxAvailableMemoryInMB+50) +} diff --git a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf index 468e607c1..062355b55 100644 --- a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf +++ b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf @@ -8,6 +8,11 @@ variable "index_name" { type = string } +variable "job_memory" { + description = "The ML job memory limit" + type = string +} + provider "elasticstack" { elasticsearch {} } @@ -42,7 +47,7 @@ resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { time_format = "epoch_ms" } analysis_limits = { - model_memory_limit = "2g" # Large memory requirement close to cluster limit + model_memory_limit = var.job_memory } allow_lazy_open = true # This should cause datafeed to wait for available node } diff --git a/internal/utils/customtypes/memory_size_type.go b/internal/utils/customtypes/memory_size_type.go new file mode 100644 index 000000000..da783a788 --- /dev/null +++ b/internal/utils/customtypes/memory_size_type.go @@ -0,0 +1,68 @@ +package customtypes + +import ( + "context" + "fmt" + + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/types/basetypes" + "github.com/hashicorp/terraform-plugin-go/tftypes" +) + +var ( + _ basetypes.StringTypable = (*MemorySizeType)(nil) +) + +type MemorySizeType struct { + basetypes.StringType +} + +// String returns a human readable string of the type name. +func (t MemorySizeType) String() string { + return "customtypes.MemorySizeType" +} + +// ValueType returns the Value type. +func (t MemorySizeType) ValueType(ctx context.Context) attr.Value { + return MemorySize{} +} + +// Equal returns true if the given type is equivalent. +func (t MemorySizeType) Equal(o attr.Type) bool { + other, ok := o.(MemorySizeType) + + if !ok { + return false + } + + return t.StringType.Equal(other.StringType) +} + +// ValueFromString returns a StringValuable type given a StringValue. +func (t MemorySizeType) ValueFromString(ctx context.Context, in basetypes.StringValue) (basetypes.StringValuable, diag.Diagnostics) { + return MemorySize{ + StringValue: in, + }, nil +} + +// ValueFromTerraform returns a Value given a tftypes.Value. This is meant to convert the tftypes.Value into a more convenient Go type +// for the provider to consume the data with. +func (t MemorySizeType) ValueFromTerraform(ctx context.Context, in tftypes.Value) (attr.Value, error) { + attrValue, err := t.StringType.ValueFromTerraform(ctx, in) + if err != nil { + return nil, err + } + + stringValue, ok := attrValue.(basetypes.StringValue) + if !ok { + return nil, fmt.Errorf("unexpected value type of %T", attrValue) + } + + stringValuable, diags := t.ValueFromString(ctx, stringValue) + if diags.HasError() { + return nil, fmt.Errorf("unexpected error converting StringValue to StringValuable: %v", diags) + } + + return stringValuable, nil +} diff --git a/internal/utils/customtypes/memory_size_type_test.go b/internal/utils/customtypes/memory_size_type_test.go new file mode 100644 index 000000000..6c81ca1f7 --- /dev/null +++ b/internal/utils/customtypes/memory_size_type_test.go @@ -0,0 +1,45 @@ +package customtypes + +import ( + "context" + "testing" + + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/stretchr/testify/require" +) + +func TestMemorySizeType_String(t *testing.T) { + require.Equal(t, "customtypes.MemorySizeType", MemorySizeType{}.String()) +} + +func TestMemorySizeType_ValueType(t *testing.T) { + require.Equal(t, MemorySize{}, MemorySizeType{}.ValueType(context.Background())) +} + +func TestMemorySizeType_Equal(t *testing.T) { + tests := []struct { + name string + typ MemorySizeType + other attr.Type + expected bool + }{ + { + name: "equal to another MemorySizeType", + typ: MemorySizeType{}, + other: MemorySizeType{}, + expected: true, + }, + { + name: "not equal to different type", + typ: MemorySizeType{}, + other: DurationType{}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, tt.typ.Equal(tt.other)) + }) + } +} diff --git a/internal/utils/customtypes/memory_size_value.go b/internal/utils/customtypes/memory_size_value.go new file mode 100644 index 000000000..891e7ed4f --- /dev/null +++ b/internal/utils/customtypes/memory_size_value.go @@ -0,0 +1,187 @@ +package customtypes + +import ( + "context" + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/hashicorp/terraform-plugin-framework/attr/xattr" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/types/basetypes" +) + +var ( + _ basetypes.StringValuable = (*MemorySize)(nil) + _ basetypes.StringValuableWithSemanticEquals = (*MemorySize)(nil) + _ xattr.ValidateableAttribute = (*MemorySize)(nil) +) + +// memoryPattern matches memory size strings with optional 'b' suffix +var memoryPattern = regexp.MustCompile(`^(\d+)([kmgtKMGT])?[bB]?$`) + +type MemorySize struct { + basetypes.StringValue +} + +// Type returns a MemorySizeType. +func (v MemorySize) Type(_ context.Context) attr.Type { + return MemorySizeType{} +} + +// Equal returns true if the given value is equivalent. +func (v MemorySize) Equal(o attr.Value) bool { + other, ok := o.(MemorySize) + + if !ok { + return false + } + + return v.StringValue.Equal(other.StringValue) +} + +func (t MemorySize) ValidateAttribute(ctx context.Context, req xattr.ValidateAttributeRequest, resp *xattr.ValidateAttributeResponse) { + if t.IsNull() || t.IsUnknown() { + return + } + + valueString := t.ValueString() + if !memoryPattern.MatchString(valueString) { + resp.Diagnostics.AddAttributeError( + req.Path, + "Invalid memory size string value", + fmt.Sprintf("A string value was provided that is not a valid memory size format\n\nGiven value \"%s\"\nExpected format: number followed by optional unit (k/K, m/M, g/G, t/T) and optional 'b/B' suffix", valueString), + ) + } +} + +// StringSemanticEquals returns true if the given memory size string value is semantically equal to the current memory size string value. +// When compared, the memory sizes are parsed into bytes and the byte values compared. +func (v MemorySize) StringSemanticEquals(_ context.Context, newValuable basetypes.StringValuable) (bool, diag.Diagnostics) { + var diags diag.Diagnostics + + newValue, ok := newValuable.(MemorySize) + if !ok { + diags.AddError( + "Semantic Equality Check Error", + "An unexpected value type was received while performing semantic equality checks. "+ + "Please report this to the provider developers.\n\n"+ + "Expected Value Type: "+fmt.Sprintf("%T", v)+"\n"+ + "Got Value Type: "+fmt.Sprintf("%T", newValuable), + ) + + return false, diags + } + + if v.IsNull() { + return newValue.IsNull(), diags + } + + if v.IsUnknown() { + return newValue.IsUnknown(), diags + } + + vParsed, diags := v.ConvertToMB() + if diags.HasError() { + return false, diags + } + + newParsed, diags := newValue.ConvertToMB() + if diags.HasError() { + return false, diags + } + + return vParsed == newParsed, diags +} + +// ConvertToMB parses the memory size string and returns the equivalent number of megabytes. +// Supports units: k/K (kilobytes), m/M (megabytes), g/G (gigabytes), t/T (terabytes) +// The 'b' suffix is optional and ignored. +// Note: As per ML documentation, values are rounded down to the nearest MB for consistency. +func (v MemorySize) ConvertToMB() (int64, diag.Diagnostics) { + var diags diag.Diagnostics + + if v.IsNull() { + diags.Append(diag.NewErrorDiagnostic("Memory Size Parse error", "memory size string value is null")) + return 0, diags + } + + if v.IsUnknown() { + diags.Append(diag.NewErrorDiagnostic("Memory Size Parse Error", "memory size string value is unknown")) + return 0, diags + } + + valueString := v.ValueString() + matches := memoryPattern.FindStringSubmatch(valueString) + if len(matches) != 3 { + diags.Append(diag.NewErrorDiagnostic("Memory Size Parse Error", + fmt.Sprintf("invalid memory size format: %s", valueString))) + return 0, diags + } + + // Parse the numeric part + numStr := matches[1] + num, err := strconv.ParseInt(numStr, 10, 64) + if err != nil { + diags.Append(diag.NewErrorDiagnostic("Memory Size Parse Error", + fmt.Sprintf("invalid number in memory size: %s", numStr))) + return 0, diags + } + + // Parse the unit part (if present) and calculate bytes + unit := strings.ToLower(matches[2]) + var bytes int64 + + switch unit { + case "k": + bytes = num * 1024 + case "m": + bytes = num * 1024 * 1024 + case "g": + bytes = num * 1024 * 1024 * 1024 + case "t": + bytes = num * 1024 * 1024 * 1024 * 1024 + case "": // no unit = bytes + bytes = num + default: + diags.Append(diag.NewErrorDiagnostic("Memory Size Parse Error", + fmt.Sprintf("unsupported memory unit: %s", unit))) + return 0, diags + } + + // Round down to the nearest MB (1024*1024 = 1048576 bytes) as per ML documentation + const mbInBytes = 1024 * 1024 + roundedMB := bytes / mbInBytes + + return roundedMB, diags +} + +// NewMemorySizeNull creates a MemorySize with a null value. Determine whether the value is null via IsNull method. +func NewMemorySizeNull() MemorySize { + return MemorySize{ + StringValue: basetypes.NewStringNull(), + } +} + +// NewMemorySizeUnknown creates a MemorySize with an unknown value. Determine whether the value is unknown via IsUnknown method. +func NewMemorySizeUnknown() MemorySize { + return MemorySize{ + StringValue: basetypes.NewStringUnknown(), + } +} + +// NewMemorySizeValue creates a MemorySize with a known value. Access the value via ValueString method. +func NewMemorySizeValue(value string) MemorySize { + return MemorySize{ + StringValue: basetypes.NewStringValue(value), + } +} + +// NewMemorySizePointerValue creates a MemorySize with a null value if nil or a known value. Access the value via ValueStringPointer method. +func NewMemorySizePointerValue(value *string) MemorySize { + return MemorySize{ + StringValue: basetypes.NewStringPointerValue(value), + } +} diff --git a/internal/utils/customtypes/memory_size_value_test.go b/internal/utils/customtypes/memory_size_value_test.go new file mode 100644 index 000000000..9b8dea262 --- /dev/null +++ b/internal/utils/customtypes/memory_size_value_test.go @@ -0,0 +1,384 @@ +package customtypes + +import ( + "context" + "testing" + + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/hashicorp/terraform-plugin-framework/attr/xattr" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/path" + "github.com/hashicorp/terraform-plugin-framework/types/basetypes" + "github.com/stretchr/testify/require" +) + +func TestMemorySize_Type(t *testing.T) { + require.Equal(t, MemorySizeType{}, MemorySize{}.Type(context.Background())) +} + +func TestMemorySize_Equal(t *testing.T) { + tests := []struct { + name string + expectedEqual bool + val MemorySize + otherVal attr.Value + }{ + { + name: "not equal if the other value is not a memory size", + expectedEqual: false, + val: NewMemorySizeValue("128mb"), + otherVal: basetypes.NewBoolValue(true), + }, + { + name: "not equal if the memory sizes are not equal", + expectedEqual: false, + val: NewMemorySizeValue("128mb"), + otherVal: NewMemorySizeValue("256mb"), + }, + { + name: "not equal if the memory sizes are semantically equal but string values are not equal", + expectedEqual: false, + val: NewMemorySizeValue("1gb"), + otherVal: NewMemorySizeValue("1024mb"), + }, + { + name: "equal if the memory size string values are equal", + expectedEqual: true, + val: NewMemorySizeValue("128mb"), + otherVal: NewMemorySizeValue("128mb"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expectedEqual, tt.val.Equal(tt.otherVal)) + }) + } +} + +func TestMemorySize_ValidateAttribute(t *testing.T) { + tests := []struct { + name string + memorySize MemorySize + expectedDiags diag.Diagnostics + }{ + { + name: "unknown is valid", + memorySize: NewMemorySizeNull(), + }, + { + name: "null is valid", + memorySize: NewMemorySizeUnknown(), + }, + { + name: "valid memory sizes are valid - bytes", + memorySize: NewMemorySizeValue("1024"), + }, + { + name: "valid memory sizes are valid - kilobytes", + memorySize: NewMemorySizeValue("128k"), + }, + { + name: "valid memory sizes are valid - kilobytes with B", + memorySize: NewMemorySizeValue("128kb"), + }, + { + name: "valid memory sizes are valid - megabytes", + memorySize: NewMemorySizeValue("128m"), + }, + { + name: "valid memory sizes are valid - megabytes with B", + memorySize: NewMemorySizeValue("128mb"), + }, + { + name: "valid memory sizes are valid - uppercase megabytes", + memorySize: NewMemorySizeValue("128MB"), + }, + { + name: "valid memory sizes are valid - gigabytes", + memorySize: NewMemorySizeValue("2g"), + }, + { + name: "valid memory sizes are valid - gigabytes with B", + memorySize: NewMemorySizeValue("2gb"), + }, + { + name: "valid memory sizes are valid - terabytes", + memorySize: NewMemorySizeValue("1t"), + }, + { + name: "non-memory strings are invalid", + memorySize: NewMemorySizeValue("not a memory size"), + expectedDiags: diag.Diagnostics{ + diag.NewAttributeErrorDiagnostic( + path.Root("memory_size"), + "Invalid memory size string value", + "A string value was provided that is not a valid memory size format\n\nGiven value \"not a memory size\"\nExpected format: number followed by optional unit (k/K, m/M, g/G, t/T) and optional 'b/B' suffix", + ), + }, + }, + { + name: "negative numbers are invalid", + memorySize: NewMemorySizeValue("-128mb"), + expectedDiags: diag.Diagnostics{ + diag.NewAttributeErrorDiagnostic( + path.Root("memory_size"), + "Invalid memory size string value", + "A string value was provided that is not a valid memory size format\n\nGiven value \"-128mb\"\nExpected format: number followed by optional unit (k/K, m/M, g/G, t/T) and optional 'b/B' suffix", + ), + }, + }, + { + name: "float numbers are invalid", + memorySize: NewMemorySizeValue("128.5mb"), + expectedDiags: diag.Diagnostics{ + diag.NewAttributeErrorDiagnostic( + path.Root("memory_size"), + "Invalid memory size string value", + "A string value was provided that is not a valid memory size format\n\nGiven value \"128.5mb\"\nExpected format: number followed by optional unit (k/K, m/M, g/G, t/T) and optional 'b/B' suffix", + ), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resp := xattr.ValidateAttributeResponse{} + + tt.memorySize.ValidateAttribute( + context.Background(), + xattr.ValidateAttributeRequest{ + Path: path.Root("memory_size"), + }, + &resp, + ) + + if tt.expectedDiags == nil { + require.Nil(t, resp.Diagnostics) + } else { + require.Equal(t, tt.expectedDiags, resp.Diagnostics) + } + }) + } +} + +func TestMemorySize_StringSemanticEquals(t *testing.T) { + tests := []struct { + name string + memorySize MemorySize + otherVal basetypes.StringValuable + expectedEqual bool + expectedErrorDiags bool + }{ + { + name: "should error if the other value is not a memory size", + memorySize: NewMemorySizeValue("128mb"), + otherVal: basetypes.NewStringValue("128mb"), + expectedEqual: false, + expectedErrorDiags: true, + }, + { + name: "two null values are semantically equal", + memorySize: NewMemorySizeNull(), + otherVal: NewMemorySizeNull(), + expectedEqual: true, + }, + { + name: "null is not equal to unknown", + memorySize: NewMemorySizeNull(), + otherVal: NewMemorySizeUnknown(), + expectedEqual: false, + }, + { + name: "null is not equal to a string value", + memorySize: NewMemorySizeNull(), + otherVal: NewMemorySizeValue("128mb"), + expectedEqual: false, + }, + { + name: "two unknown values are semantically equal", + memorySize: NewMemorySizeUnknown(), + otherVal: NewMemorySizeUnknown(), + expectedEqual: true, + }, + { + name: "unknown is not equal to a string value", + memorySize: NewMemorySizeUnknown(), + otherVal: NewMemorySizeValue("128mb"), + expectedEqual: false, + }, + { + name: "two equal values are semantically equal", + memorySize: NewMemorySizeValue("128mb"), + otherVal: NewMemorySizeValue("128mb"), + expectedEqual: true, + }, + { + name: "two semantically equal values - gb to mb", + memorySize: NewMemorySizeValue("2g"), + otherVal: NewMemorySizeValue("2048m"), + expectedEqual: true, + }, + { + name: "two semantically equal values - gb with B to mb", + memorySize: NewMemorySizeValue("2gb"), + otherVal: NewMemorySizeValue("2048mb"), + expectedEqual: true, + }, + { + name: "two semantically equal values - different case", + memorySize: NewMemorySizeValue("128MB"), + otherVal: NewMemorySizeValue("128mb"), + expectedEqual: true, + }, + { + name: "two semantically equal values - kb to bytes (rounded to MB)", + memorySize: NewMemorySizeValue("2048k"), + otherVal: NewMemorySizeValue("2097152"), + expectedEqual: true, + }, + { + name: "bytes that don't round to same MB are not equal", + memorySize: NewMemorySizeValue("1048576"), // exactly 1MB + otherVal: NewMemorySizeValue("1048575"), // 1 byte less, rounds to 0MB + expectedEqual: false, + }, + { + name: "partial MB values round down to same MB", + memorySize: NewMemorySizeValue("1500000"), // ~1.43MB, rounds to 1MB + otherVal: NewMemorySizeValue("1048576"), // exactly 1MB + expectedEqual: true, + }, + { + name: "errors if this value is invalid", + memorySize: NewMemorySizeValue("not a memory size"), + otherVal: NewMemorySizeValue("128mb"), + expectedEqual: false, + expectedErrorDiags: true, + }, + { + name: "errors if the other value is invalid", + memorySize: NewMemorySizeValue("128mb"), + otherVal: NewMemorySizeValue("not a memory size"), + expectedEqual: false, + expectedErrorDiags: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + isEqual, diags := tt.memorySize.StringSemanticEquals(context.Background(), tt.otherVal) + + require.Equal(t, tt.expectedEqual, isEqual) + require.Equal(t, tt.expectedErrorDiags, diags.HasError()) + }) + } +} + +func TestMemorySize_ParseBytes(t *testing.T) { + tests := []struct { + name string + memorySize MemorySize + expectedBytes int64 + expectedError bool + }{ + { + name: "null value should error", + memorySize: NewMemorySizeNull(), + expectedError: true, + }, + { + name: "unknown value should error", + memorySize: NewMemorySizeUnknown(), + expectedError: true, + }, + { + name: "bytes without unit", + memorySize: NewMemorySizeValue("1048576"), // exactly 1MB + expectedBytes: 1048576, + }, + { + name: "bytes without unit - rounds down", + memorySize: NewMemorySizeValue("1048575"), // 1 byte less than 1MB + expectedBytes: 0, // rounds down to 0MB + }, + { + name: "bytes without unit - partial MB rounds down", + memorySize: NewMemorySizeValue("1500000"), // ~1.43MB + expectedBytes: 1048576, // rounds down to 1MB + }, + { + name: "kilobytes", + memorySize: NewMemorySizeValue("1024k"), // exactly 1MB + expectedBytes: 1024 * 1024, + }, + { + name: "kilobytes - partial MB rounds down", + memorySize: NewMemorySizeValue("1000k"), // ~976KB, rounds down to 0MB + expectedBytes: 0, + }, + { + name: "kilobytes with B suffix", + memorySize: NewMemorySizeValue("1024kb"), // exactly 1MB + expectedBytes: 1024 * 1024, + }, + { + name: "megabytes", + memorySize: NewMemorySizeValue("128m"), + expectedBytes: 128 * 1024 * 1024, + }, + { + name: "megabytes with B suffix", + memorySize: NewMemorySizeValue("128mb"), + expectedBytes: 128 * 1024 * 1024, + }, + { + name: "uppercase megabytes", + memorySize: NewMemorySizeValue("128MB"), + expectedBytes: 128 * 1024 * 1024, + }, + { + name: "gigabytes", + memorySize: NewMemorySizeValue("2g"), + expectedBytes: 2 * 1024 * 1024 * 1024, + }, + { + name: "gigabytes with B suffix", + memorySize: NewMemorySizeValue("2gb"), + expectedBytes: 2 * 1024 * 1024 * 1024, + }, + { + name: "terabytes", + memorySize: NewMemorySizeValue("1t"), + expectedBytes: 1024 * 1024 * 1024 * 1024, + }, + { + name: "terabytes with B suffix", + memorySize: NewMemorySizeValue("1tb"), + expectedBytes: 1024 * 1024 * 1024 * 1024, + }, + { + name: "invalid format", + memorySize: NewMemorySizeValue("not a memory size"), + expectedError: true, + }, + { + name: "invalid number", + memorySize: NewMemorySizeValue("abcmb"), + expectedError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bytes, diags := tt.memorySize.ConvertToMB() + + if tt.expectedError { + require.True(t, diags.HasError()) + } else { + require.False(t, diags.HasError()) + require.Equal(t, tt.expectedBytes, bytes) + } + }) + } +} From 0b6542cd588d8179dd040997db752cc939d70cd5 Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Mon, 10 Nov 2025 19:44:35 +1100 Subject: [PATCH 10/11] Fix tests --- .../customtypes/memory_size_value_test.go | 82 +++++++++---------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/internal/utils/customtypes/memory_size_value_test.go b/internal/utils/customtypes/memory_size_value_test.go index 9b8dea262..35bdc6e60 100644 --- a/internal/utils/customtypes/memory_size_value_test.go +++ b/internal/utils/customtypes/memory_size_value_test.go @@ -279,7 +279,7 @@ func TestMemorySize_ParseBytes(t *testing.T) { tests := []struct { name string memorySize MemorySize - expectedBytes int64 + expectedMB int64 expectedError bool }{ { @@ -293,69 +293,69 @@ func TestMemorySize_ParseBytes(t *testing.T) { expectedError: true, }, { - name: "bytes without unit", - memorySize: NewMemorySizeValue("1048576"), // exactly 1MB - expectedBytes: 1048576, + name: "bytes without unit", + memorySize: NewMemorySizeValue("1048576"), // exactly 1MB + expectedMB: 1, }, { - name: "bytes without unit - rounds down", - memorySize: NewMemorySizeValue("1048575"), // 1 byte less than 1MB - expectedBytes: 0, // rounds down to 0MB + name: "bytes without unit - rounds down", + memorySize: NewMemorySizeValue("1048575"), // 1 byte less than 1MB + expectedMB: 0, // rounds down to 0MB }, { - name: "bytes without unit - partial MB rounds down", - memorySize: NewMemorySizeValue("1500000"), // ~1.43MB - expectedBytes: 1048576, // rounds down to 1MB + name: "bytes without unit - partial MB rounds down", + memorySize: NewMemorySizeValue("1500000"), // ~1.43MB + expectedMB: 1, // rounds down to 1MB }, { - name: "kilobytes", - memorySize: NewMemorySizeValue("1024k"), // exactly 1MB - expectedBytes: 1024 * 1024, + name: "kilobytes", + memorySize: NewMemorySizeValue("1024k"), // exactly 1MB + expectedMB: 1, }, { - name: "kilobytes - partial MB rounds down", - memorySize: NewMemorySizeValue("1000k"), // ~976KB, rounds down to 0MB - expectedBytes: 0, + name: "kilobytes - partial MB rounds down", + memorySize: NewMemorySizeValue("1000k"), // ~976KB, rounds down to 0MB + expectedMB: 0, }, { - name: "kilobytes with B suffix", - memorySize: NewMemorySizeValue("1024kb"), // exactly 1MB - expectedBytes: 1024 * 1024, + name: "kilobytes with B suffix", + memorySize: NewMemorySizeValue("1024kb"), // exactly 1MB + expectedMB: 1, }, { - name: "megabytes", - memorySize: NewMemorySizeValue("128m"), - expectedBytes: 128 * 1024 * 1024, + name: "megabytes", + memorySize: NewMemorySizeValue("128m"), + expectedMB: 128, }, { - name: "megabytes with B suffix", - memorySize: NewMemorySizeValue("128mb"), - expectedBytes: 128 * 1024 * 1024, + name: "megabytes with B suffix", + memorySize: NewMemorySizeValue("128mb"), + expectedMB: 128, }, { - name: "uppercase megabytes", - memorySize: NewMemorySizeValue("128MB"), - expectedBytes: 128 * 1024 * 1024, + name: "uppercase megabytes", + memorySize: NewMemorySizeValue("128MB"), + expectedMB: 128, }, { - name: "gigabytes", - memorySize: NewMemorySizeValue("2g"), - expectedBytes: 2 * 1024 * 1024 * 1024, + name: "gigabytes", + memorySize: NewMemorySizeValue("2g"), + expectedMB: 2 * 1024, }, { - name: "gigabytes with B suffix", - memorySize: NewMemorySizeValue("2gb"), - expectedBytes: 2 * 1024 * 1024 * 1024, + name: "gigabytes with B suffix", + memorySize: NewMemorySizeValue("2gb"), + expectedMB: 2 * 1024, }, { - name: "terabytes", - memorySize: NewMemorySizeValue("1t"), - expectedBytes: 1024 * 1024 * 1024 * 1024, + name: "terabytes", + memorySize: NewMemorySizeValue("1t"), + expectedMB: 1024 * 1024, }, { - name: "terabytes with B suffix", - memorySize: NewMemorySizeValue("1tb"), - expectedBytes: 1024 * 1024 * 1024 * 1024, + name: "terabytes with B suffix", + memorySize: NewMemorySizeValue("1tb"), + expectedMB: 1024 * 1024, }, { name: "invalid format", @@ -377,7 +377,7 @@ func TestMemorySize_ParseBytes(t *testing.T) { require.True(t, diags.HasError()) } else { require.False(t, diags.HasError()) - require.Equal(t, tt.expectedBytes, bytes) + require.Equal(t, tt.expectedMB, bytes) } }) } From cab9208fb18e52d93de9a313439adbf41991d775 Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Tue, 11 Nov 2025 08:58:53 +1100 Subject: [PATCH 11/11] Fix tests --- Makefile | 2 +- .../elasticsearch/ml/job_state/acc_test.go | 40 ------------------- .../timeouts/job_state.tf | 7 +--- 3 files changed, 2 insertions(+), 47 deletions(-) diff --git a/Makefile b/Makefile index 431fa7df8..e3343ad9d 100644 --- a/Makefile +++ b/Makefile @@ -101,7 +101,7 @@ setup-kibana-fleet: ## Creates the agent and integration policies required to ru .PHONY: docker-clean docker-clean: ## Try to remove provisioned nodes and assigned network - @ docker compose -f $(COMPOSE_FILE) down + @ docker compose -f $(COMPOSE_FILE) down -v .PHONY: copy-kibana-ca copy-kibana-ca: ## Copy Kibana CA certificate to local machine diff --git a/internal/elasticsearch/ml/job_state/acc_test.go b/internal/elasticsearch/ml/job_state/acc_test.go index 0cf881150..bc7ab1f89 100644 --- a/internal/elasticsearch/ml/job_state/acc_test.go +++ b/internal/elasticsearch/ml/job_state/acc_test.go @@ -1,20 +1,15 @@ package job_state_test import ( - "encoding/json" "fmt" - "maps" "regexp" - "slices" "testing" "github.com/elastic/terraform-provider-elasticstack/internal/acctest" - "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/hashicorp/terraform-plugin-testing/config" sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest" "github.com/hashicorp/terraform-plugin-testing/helper/resource" "github.com/hashicorp/terraform-plugin-testing/terraform" - "github.com/stretchr/testify/require" ) func TestAccResourceMLJobState(t *testing.T) { @@ -134,44 +129,9 @@ func TestAccResourceMLJobState_timeouts(t *testing.T) { ConfigVariables: config.Variables{ "job_id": config.StringVariable(jobID), "index_name": config.StringVariable(indexName), - "job_memory": config.StringVariable(GetMaxMLJobMemory(t)), }, ExpectError: regexp.MustCompile("Operation timed out"), }, }, }) } - -func GetMaxMLJobMemory(t *testing.T) string { - client, err := clients.NewAcceptanceTestingClient() - require.NoError(t, err) - - esClient, err := client.GetESClient() - require.NoError(t, err) - - resp, err := esClient.ML.GetMemoryStats() - require.NoError(t, err) - - defer resp.Body.Close() - type mlNode struct { - Memory struct { - ML struct { - MaxInBytes int64 `json:"max_in_bytes"` - } `json:"ml"` - } `json:"mem"` - } - var mlMemoryStats struct { - Nodes map[string]mlNode `json:"nodes"` - } - - err = json.NewDecoder(resp.Body).Decode(&mlMemoryStats) - require.NoError(t, err) - - nodes := slices.Collect(maps.Values(mlMemoryStats.Nodes)) - nodeWithMaxMemory := slices.MaxFunc(nodes, func(a, b mlNode) int { - return int(b.Memory.ML.MaxInBytes - a.Memory.ML.MaxInBytes) - }) - - maxAvailableMemoryInMB := nodeWithMaxMemory.Memory.ML.MaxInBytes / (1024 * 1024) - return fmt.Sprintf("%dmb", maxAvailableMemoryInMB+50) -} diff --git a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf index 062355b55..698b7e2f6 100644 --- a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf +++ b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf @@ -8,11 +8,6 @@ variable "index_name" { type = string } -variable "job_memory" { - description = "The ML job memory limit" - type = string -} - provider "elasticstack" { elasticsearch {} } @@ -47,7 +42,7 @@ resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { time_format = "epoch_ms" } analysis_limits = { - model_memory_limit = var.job_memory + model_memory_limit = "2gb" } allow_lazy_open = true # This should cause datafeed to wait for available node }