diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b12f5ce97..0c5b2f407 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -63,6 +63,8 @@ jobs: xpack.security.enabled: true xpack.security.authc.api_key.enabled: true xpack.security.authc.token.enabled: true + xpack.ml.use_auto_machine_memory_percent: true + xpack.ml.max_model_memory_limit: 2g xpack.watcher.enabled: true xpack.license.self_generated.type: trial repositories.url.allowed_urls: https://example.com/* @@ -70,7 +72,7 @@ jobs: ELASTIC_PASSWORD: ${{ env.ELASTIC_PASSWORD }} ports: - 9200:9200 - options: --health-cmd="curl http://localhost:9200/_cluster/health" --health-interval=10s --health-timeout=5s --health-retries=10 + options: --memory=2g --health-cmd="curl http://localhost:9200/_cluster/health" --health-interval=10s --health-timeout=5s --health-retries=10 kibana: image: docker.elastic.co/kibana/kibana:${{ matrix.version }} env: diff --git a/CHANGELOG.md b/CHANGELOG.md index fe238fb3c..8522e719e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,10 @@ - Fix `elasticstack_elasticsearch_snapshot_lifecycle` metadata type conversion causing terraform apply to fail ([#1409](https://github.com/elastic/terraform-provider-elasticstack/issues/1409)) - Add new `elasticstack_elasticsearch_ml_anomaly_detection_job` resource ([#1329](https://github.com/elastic/terraform-provider-elasticstack/pull/1329)) -- Add new `elasticstack_elasticsearch_ml_datafeed` resource ([1340](https://github.com/elastic/terraform-provider-elasticstack/pull/1340)) +- Add new `elasticstack_elasticsearch_ml_datafeed` resource ([#1340](https://github.com/elastic/terraform-provider-elasticstack/pull/1340)) - Add `space_ids` attribute to all Fleet resources to support space-aware Fleet resource management ([#1390](https://github.com/elastic/terraform-provider-elasticstack/pull/1390)) +- Add new `elasticstack_elasticsearch_ml_job_state` resource ([#1337](https://github.com/elastic/terraform-provider-elasticstack/pull/1337)) +- Add new `elasticstack_elasticsearch_ml_datafeed_state` resource ([#1422](https://github.com/elastic/terraform-provider-elasticstack/pull/1422)) ## [0.12.1] - 2025-10-22 - Fix regression restricting the characters in an `elasticstack_elasticsearch_role_mapping` `name`. ([#1373](https://github.com/elastic/terraform-provider-elasticstack/pull/1373)) diff --git a/Makefile b/Makefile index 431fa7df8..e3343ad9d 100644 --- a/Makefile +++ b/Makefile @@ -101,7 +101,7 @@ setup-kibana-fleet: ## Creates the agent and integration policies required to ru .PHONY: docker-clean docker-clean: ## Try to remove provisioned nodes and assigned network - @ docker compose -f $(COMPOSE_FILE) down + @ docker compose -f $(COMPOSE_FILE) down -v .PHONY: copy-kibana-ca copy-kibana-ca: ## Copy Kibana CA certificate to local machine diff --git a/docker-compose.yml b/docker-compose.yml index a86b12398..37c7378d0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,7 @@ services: xpack.security.http.ssl.enabled: false xpack.license.self_generated.type: trial xpack.ml.use_auto_machine_memory_percent: true + xpack.ml.max_model_memory_limit: 2g xpack.security.authc.api_key.enabled: true xpack.security.authc.token.enabled: true xpack.watcher.enabled: true diff --git a/docs/resources/elasticsearch_ml_datafeed_state.md b/docs/resources/elasticsearch_ml_datafeed_state.md new file mode 100644 index 000000000..700e2b790 --- /dev/null +++ b/docs/resources/elasticsearch_ml_datafeed_state.md @@ -0,0 +1,192 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "elasticstack_elasticsearch_ml_datafeed_state Resource - terraform-provider-elasticstack" +subcategory: "Ml" +description: |- + Manages the state of an existing Elasticsearch ML datafeed by starting or stopping it. This resource does not create or configure a datafeed, but instead manages the operational state of an existing datafeed. + Note: Starting a non-realtime datafeed (i.e with an absolute end time) will result in the datafeed automatically stopping once all available data has been processed. By default, Terraform will restart the datafeed from the configured start time and reprocess all data again. It's recommended to ignore changes to the state attribute via the resource lifecycle https://developer.hashicorp.com/terraform/tutorials/state/resource-lifecycle#ignore-changes for non-realtime datafeeds. +--- + +# elasticstack_elasticsearch_ml_datafeed_state (Resource) + +Manages the state of an existing Elasticsearch ML datafeed by starting or stopping it. This resource does not create or configure a datafeed, but instead manages the operational state of an existing datafeed. + +Note: Starting a non-realtime datafeed (i.e with an absolute end time) will result in the datafeed automatically stopping once all available data has been processed. By default, Terraform will restart the datafeed from the configured start time and reprocess all data again. It's recommended to ignore changes to the `state` attribute via the [resource lifecycle](https://developer.hashicorp.com/terraform/tutorials/state/resource-lifecycle#ignore-changes) for non-realtime datafeeds. + +## Example Usage + +```terraform +## The following resources setup a realtime ML datafeed. +resource "elasticstack_elasticsearch_index" "ml_datafeed_index" { + name = "ml-datafeed-data" + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + user = { + type = "keyword" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "example" { + job_id = "example-anomaly-job" + description = "Example anomaly detection job" + + analysis_config { + bucket_span = "15m" + detectors { + function = "mean" + field_name = "value" + by_field_name = "user" + } + } + + data_description { + time_field = "@timestamp" + } +} + +resource "elasticstack_elasticsearch_ml_datafeed" "example" { + datafeed_id = "example-datafeed" + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.example.job_id + indices = [elasticstack_elasticsearch_index.ml_datafeed_index.name] + + query = jsonencode({ + bool = { + must = [ + { + range = { + "@timestamp" = { + gte = "now-7d" + } + } + } + ] + } + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "example" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.example.datafeed_id + state = "started" + force = false +} + +## A non-realtime datafeed will automatically stop once all data has been processed. +## It's recommended to ignore changes to the `state` attribute via the resource lifecycle for such datafeeds. + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "non-realtime" { + job_id = "non-realtime-anomaly-job" + description = "Test job for datafeed state testing with time range" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "non-realtime" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.non-realtime.job_id + state = "opened" + + lifecycle { + ignore_changes = ["state"] + } +} + +resource "elasticstack_elasticsearch_ml_datafeed" "non-realtime" { + datafeed_id = "non-realtime-datafeed" + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.non-realtime.job_id + indices = [elasticstack_elasticsearch_index.ml_datafeed_index.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "non-realtime" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.non-realtime.datafeed_id + state = "started" + start = "2024-01-01T00:00:00Z" + end = "2024-01-02T00:00:00Z" + datafeed_timeout = "60s" + + lifecycle { + ignore_changes = ["state"] + } +} +``` + + +## Schema + +### Required + +- `datafeed_id` (String) Identifier for the ML datafeed. +- `state` (String) The desired state for the ML datafeed. Valid values are `started` and `stopped`. + +### Optional + +- `datafeed_timeout` (String) Timeout for the operation. Examples: `30s`, `5m`, `1h`. Default is `30s`. +- `elasticsearch_connection` (Block List, Deprecated) Elasticsearch connection configuration block. (see [below for nested schema](#nestedblock--elasticsearch_connection)) +- `end` (String) The time that the datafeed should end collecting data. When not specified, the datafeed continues in real-time. This property must be specified in RFC 3339 format. +- `force` (Boolean) When stopping a datafeed, use to forcefully stop it. +- `start` (String) The time that the datafeed should start collecting data. When not specified, the datafeed starts in real-time. This property must be specified in RFC 3339 format. +- `timeouts` (Attributes) (see [below for nested schema](#nestedatt--timeouts)) + +### Read-Only + +- `id` (String) Internal identifier of the resource + + +### Nested Schema for `elasticsearch_connection` + +Optional: + +- `api_key` (String, Sensitive) API Key to use for authentication to Elasticsearch +- `bearer_token` (String, Sensitive) Bearer Token to use for authentication to Elasticsearch +- `ca_data` (String) PEM-encoded custom Certificate Authority certificate +- `ca_file` (String) Path to a custom Certificate Authority certificate +- `cert_data` (String) PEM encoded certificate for client auth +- `cert_file` (String) Path to a file containing the PEM encoded certificate for client auth +- `endpoints` (List of String, Sensitive) A list of endpoints where the terraform provider will point to, this must include the http(s) schema and port number. +- `es_client_authentication` (String, Sensitive) ES Client Authentication field to be used with the JWT token +- `headers` (Map of String, Sensitive) A list of headers to be sent with each request to Elasticsearch. +- `insecure` (Boolean) Disable TLS certificate validation +- `key_data` (String, Sensitive) PEM encoded private key for client auth +- `key_file` (String) Path to a file containing the PEM encoded private key for client auth +- `password` (String, Sensitive) Password to use for API authentication to Elasticsearch. +- `username` (String) Username to use for API authentication to Elasticsearch. + + + +### Nested Schema for `timeouts` + +Optional: + +- `create` (String) A string that can be [parsed as a duration](https://pkg.go.dev/time#ParseDuration) consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours). +- `update` (String) A string that can be [parsed as a duration](https://pkg.go.dev/time#ParseDuration) consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours). + +## Import + +Import is supported using the following syntax: + +The [`terraform import` command](https://developer.hashicorp.com/terraform/cli/commands/import) can be used, for example: + +```shell +terraform import elasticstack_elasticsearch_ml_datafeed_state.example my-datafeed-id +``` diff --git a/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/import.sh b/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/import.sh new file mode 100644 index 000000000..925813279 --- /dev/null +++ b/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/import.sh @@ -0,0 +1 @@ +terraform import elasticstack_elasticsearch_ml_datafeed_state.example my-datafeed-id \ No newline at end of file diff --git a/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/resource.tf b/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/resource.tf new file mode 100644 index 000000000..a123b3609 --- /dev/null +++ b/examples/resources/elasticstack_elasticsearch_ml_datafeed_state/resource.tf @@ -0,0 +1,113 @@ +## The following resources setup a realtime ML datafeed. +resource "elasticstack_elasticsearch_index" "ml_datafeed_index" { + name = "ml-datafeed-data" + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + user = { + type = "keyword" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "example" { + job_id = "example-anomaly-job" + description = "Example anomaly detection job" + + analysis_config { + bucket_span = "15m" + detectors { + function = "mean" + field_name = "value" + by_field_name = "user" + } + } + + data_description { + time_field = "@timestamp" + } +} + +resource "elasticstack_elasticsearch_ml_datafeed" "example" { + datafeed_id = "example-datafeed" + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.example.job_id + indices = [elasticstack_elasticsearch_index.ml_datafeed_index.name] + + query = jsonencode({ + bool = { + must = [ + { + range = { + "@timestamp" = { + gte = "now-7d" + } + } + } + ] + } + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "example" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.example.datafeed_id + state = "started" + force = false +} + +## A non-realtime datafeed will automatically stop once all data has been processed. +## It's recommended to ignore changes to the `state` attribute via the resource lifecycle for such datafeeds. + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "non-realtime" { + job_id = "non-realtime-anomaly-job" + description = "Test job for datafeed state testing with time range" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "non-realtime" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.non-realtime.job_id + state = "opened" + + lifecycle { + ignore_changes = ["state"] + } +} + +resource "elasticstack_elasticsearch_ml_datafeed" "non-realtime" { + datafeed_id = "non-realtime-datafeed" + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.non-realtime.job_id + indices = [elasticstack_elasticsearch_index.ml_datafeed_index.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "non-realtime" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.non-realtime.datafeed_id + state = "started" + start = "2024-01-01T00:00:00Z" + end = "2024-01-02T00:00:00Z" + datafeed_timeout = "60s" + + lifecycle { + ignore_changes = ["state"] + } +} \ No newline at end of file diff --git a/go.mod b/go.mod index 1b2e89060..e2b83f34f 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/hashicorp/terraform-plugin-framework v1.16.1 github.com/hashicorp/terraform-plugin-framework-jsontypes v0.2.0 github.com/hashicorp/terraform-plugin-framework-timeouts v0.7.0 + github.com/hashicorp/terraform-plugin-framework-timetypes v0.5.0 github.com/hashicorp/terraform-plugin-framework-validators v0.19.0 github.com/hashicorp/terraform-plugin-go v0.29.0 github.com/hashicorp/terraform-plugin-log v0.9.0 diff --git a/go.sum b/go.sum index b036aaa1f..64d8ab1d4 100644 --- a/go.sum +++ b/go.sum @@ -617,6 +617,8 @@ github.com/hashicorp/terraform-plugin-framework-jsontypes v0.2.0 h1:SJXL5FfJJm17 github.com/hashicorp/terraform-plugin-framework-jsontypes v0.2.0/go.mod h1:p0phD0IYhsu9bR4+6OetVvvH59I6LwjXGnTVEr8ox6E= github.com/hashicorp/terraform-plugin-framework-timeouts v0.7.0 h1:jblRy1PkLfPm5hb5XeMa3tezusnMRziUGqtT5epSYoI= github.com/hashicorp/terraform-plugin-framework-timeouts v0.7.0/go.mod h1:5jm2XK8uqrdiSRfD5O47OoxyGMCnwTcl8eoiDgSa+tc= +github.com/hashicorp/terraform-plugin-framework-timetypes v0.5.0 h1:v3DapR8gsp3EM8fKMh6up9cJUFQ2iRaFsYLP8UJnCco= +github.com/hashicorp/terraform-plugin-framework-timetypes v0.5.0/go.mod h1:c3PnGE9pHBDfdEVG9t1S1C9ia5LW+gkFR0CygXlM8ak= github.com/hashicorp/terraform-plugin-framework-validators v0.19.0 h1:Zz3iGgzxe/1XBkooZCewS0nJAaCFPFPHdNJd8FgE4Ow= github.com/hashicorp/terraform-plugin-framework-validators v0.19.0/go.mod h1:GBKTNGbGVJohU03dZ7U8wHqc2zYnMUawgCN+gC0itLc= github.com/hashicorp/terraform-plugin-go v0.29.0 h1:1nXKl/nSpaYIUBU1IG/EsDOX0vv+9JxAltQyDMpq5mU= diff --git a/internal/clients/elasticsearch/ml_job.go b/internal/clients/elasticsearch/ml_job.go index fc51037cd..560dbc52d 100644 --- a/internal/clients/elasticsearch/ml_job.go +++ b/internal/clients/elasticsearch/ml_job.go @@ -15,25 +15,6 @@ import ( "github.com/hashicorp/terraform-plugin-framework/diag" ) -// MLJobStats represents the statistics structure for an ML job -type MLJobStats struct { - Jobs []MLJob `json:"jobs"` -} - -// MLJob represents a single ML job in the stats response -type MLJob struct { - JobId string `json:"job_id"` - State string `json:"state"` - Node *MLJobNode `json:"node,omitempty"` -} - -// MLJobNode represents the node information for an ML job -type MLJobNode struct { - Id string `json:"id"` - Name string `json:"name"` - Attributes map[string]interface{} `json:"attributes"` -} - // OpenMLJob opens a machine learning job func OpenMLJob(ctx context.Context, apiClient *clients.ApiClient, jobId string) diag.Diagnostics { var diags diag.Diagnostics @@ -120,7 +101,7 @@ func CloseMLJob(ctx context.Context, apiClient *clients.ApiClient, jobId string, } // GetMLJobStats retrieves the stats for a specific machine learning job -func GetMLJobStats(ctx context.Context, apiClient *clients.ApiClient, jobId string) (*MLJob, diag.Diagnostics) { +func GetMLJobStats(ctx context.Context, apiClient *clients.ApiClient, jobId string) (*models.MLJob, diag.Diagnostics) { var diags diag.Diagnostics esClient, err := apiClient.GetESClient() @@ -148,8 +129,7 @@ func GetMLJobStats(ctx context.Context, apiClient *clients.ApiClient, jobId stri if diags.HasError() { return nil, diags } - - var jobStats MLJobStats + var jobStats models.MLJobStats if err := json.NewDecoder(res.Body).Decode(&jobStats); err != nil { diags.AddError("Failed to decode ML job stats response", err.Error()) return nil, diags diff --git a/internal/diagutil/context.go b/internal/diagutil/context.go new file mode 100644 index 000000000..5b75f782e --- /dev/null +++ b/internal/diagutil/context.go @@ -0,0 +1,19 @@ +package diagutil + +import ( + "context" + + fwdiag "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-log/tflog" +) + +var contextDeadlineExceededDiags = FrameworkDiagFromError(context.DeadlineExceeded) + +func ContainsContextDeadlineExceeded(ctx context.Context, diags fwdiag.Diagnostics) bool { + if len(contextDeadlineExceededDiags) == 0 { + tflog.Error(ctx, "Expected context deadline exceeded diagnostics to contain at least one error") + return false + } + + return diags.Contains(contextDeadlineExceededDiags[0]) +} diff --git a/internal/elasticsearch/ml/anomaly_detection_job/models_tf.go b/internal/elasticsearch/ml/anomaly_detection_job/models_tf.go index d2c3e033f..0b4c67ec3 100644 --- a/internal/elasticsearch/ml/anomaly_detection_job/models_tf.go +++ b/internal/elasticsearch/ml/anomaly_detection_job/models_tf.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/elastic/terraform-provider-elasticstack/internal/utils" + "github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes" "github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes" fwdiags "github.com/hashicorp/terraform-plugin-framework/diag" "github.com/hashicorp/terraform-plugin-framework/types" @@ -81,8 +82,8 @@ type RuleConditionTFModel struct { // AnalysisLimitsTFModel represents analysis limits configuration type AnalysisLimitsTFModel struct { - CategorizationExamplesLimit types.Int64 `tfsdk:"categorization_examples_limit"` - ModelMemoryLimit types.String `tfsdk:"model_memory_limit"` + CategorizationExamplesLimit types.Int64 `tfsdk:"categorization_examples_limit"` + ModelMemoryLimit customtypes.MemorySize `tfsdk:"model_memory_limit"` } // DataDescriptionTFModel represents data description configuration @@ -558,9 +559,9 @@ func (tfModel *AnomalyDetectionJobTFModel) convertAnalysisLimitsFromAPI(ctx cont } if apiLimits.ModelMemoryLimit != "" { - analysisLimitsTF.ModelMemoryLimit = types.StringValue(apiLimits.ModelMemoryLimit) + analysisLimitsTF.ModelMemoryLimit = customtypes.NewMemorySizeValue(apiLimits.ModelMemoryLimit) } else { - analysisLimitsTF.ModelMemoryLimit = types.StringNull() + analysisLimitsTF.ModelMemoryLimit = customtypes.NewMemorySizeNull() } analysisLimitsObjectValue, d := types.ObjectValueFrom(ctx, getAnalysisLimitsAttrTypes(), analysisLimitsTF) diff --git a/internal/elasticsearch/ml/anomaly_detection_job/schema.go b/internal/elasticsearch/ml/anomaly_detection_job/schema.go index d3f4a0bfc..ccaefbd5c 100644 --- a/internal/elasticsearch/ml/anomaly_detection_job/schema.go +++ b/internal/elasticsearch/ml/anomaly_detection_job/schema.go @@ -4,6 +4,7 @@ import ( "context" "regexp" + "github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes" "github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes" "github.com/hashicorp/terraform-plugin-framework-validators/int64validator" "github.com/hashicorp/terraform-plugin-framework-validators/listvalidator" @@ -250,9 +251,7 @@ func GetSchema() schema.Schema { "model_memory_limit": schema.StringAttribute{ MarkdownDescription: "The approximate maximum amount of memory resources that are required for analytical processing.", Optional: true, - Validators: []validator.String{ - stringvalidator.RegexMatches(regexp.MustCompile(`^\d+[kmgtKMGT]?[bB]?$`), "must be a valid memory size (e.g., 10mb, 1gb)"), - }, + CustomType: customtypes.MemorySizeType{}, }, }, }, diff --git a/internal/elasticsearch/ml/datafeed/delete.go b/internal/elasticsearch/ml/datafeed/delete.go index 0a66c8102..cd24b5aad 100644 --- a/internal/elasticsearch/ml/datafeed/delete.go +++ b/internal/elasticsearch/ml/datafeed/delete.go @@ -2,7 +2,6 @@ package datafeed import ( "context" - "fmt" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" "github.com/hashicorp/terraform-plugin-framework/diag" @@ -44,22 +43,18 @@ func (r *datafeedResource) delete(ctx context.Context, req resource.DeleteReques } func (r *datafeedResource) maybeStopDatafeed(ctx context.Context, datafeedId string) (bool, diag.Diagnostics) { - var diags diag.Diagnostics - // Check current state - currentState, err := r.getDatafeedState(ctx, datafeedId) - if err != nil { - // If we can't get the state, try to extract the error details - if err.Error() == fmt.Sprintf("datafeed %s not found", datafeedId) { - // Datafeed does not exist, nothing to stop - return false, diags - } - diags.AddError("Failed to get datafeed state", err.Error()) + currentState, diags := GetDatafeedState(ctx, r.client, datafeedId) + if diags.HasError() { return false, diags } + if currentState == nil { + return false, nil + } + // If the datafeed is not running, nothing to stop - if currentState != "started" && currentState != "starting" { + if *currentState != StateStarted && *currentState != StateStarting { return false, diags } @@ -71,9 +66,9 @@ func (r *datafeedResource) maybeStopDatafeed(ctx context.Context, datafeedId str } // Wait for the datafeed to reach stopped state - err = r.waitForDatafeedState(ctx, datafeedId, "stopped") - if err != nil { - diags.AddError("Failed to wait for datafeed to stop", fmt.Sprintf("Datafeed %s did not stop within timeout: %s", datafeedId, err.Error())) + _, waitDiags := WaitForDatafeedState(ctx, r.client, datafeedId, StateStopped) + diags.Append(waitDiags...) + if diags.HasError() { return true, diags } diff --git a/internal/elasticsearch/ml/datafeed/state_utils.go b/internal/elasticsearch/ml/datafeed/state_utils.go index 9c80f540c..522ebfb16 100644 --- a/internal/elasticsearch/ml/datafeed/state_utils.go +++ b/internal/elasticsearch/ml/datafeed/state_utils.go @@ -2,35 +2,74 @@ package datafeed import ( "context" + "errors" "fmt" "github.com/elastic/terraform-provider-elasticstack/internal/asyncutils" + "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" + "github.com/hashicorp/terraform-plugin-framework/diag" ) -// getDatafeedState returns the current state of a datafeed -func (r *datafeedResource) getDatafeedState(ctx context.Context, datafeedId string) (string, error) { - statsResponse, diags := elasticsearch.GetDatafeedStats(ctx, r.client, datafeedId) +type State string + +const ( + StateStopped State = "stopped" + StateStarted State = "started" + StateStarting State = "starting" +) + +// GetDatafeedState returns the current state of a datafeed +func GetDatafeedState(ctx context.Context, client *clients.ApiClient, datafeedId string) (*State, diag.Diagnostics) { + statsResponse, diags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId) if diags.HasError() { - return "", fmt.Errorf("failed to get datafeed stats: %v", diags) + return nil, diags } if statsResponse == nil { - return "", fmt.Errorf("datafeed %s not found", datafeedId) + return nil, nil } - return statsResponse.State, nil + state := State(statsResponse.State) + return &state, nil +} + +var terminalDatafeedStates = map[State]struct{}{ + StateStopped: {}, + StateStarted: {}, } -// waitForDatafeedState waits for a datafeed to reach the desired state -func (r *datafeedResource) waitForDatafeedState(ctx context.Context, datafeedId, desiredState string) error { +var errDatafeedInUndesiredState = errors.New("datafeed stuck in undesired state") + +// WaitForDatafeedState waits for a datafeed to reach the desired state +func WaitForDatafeedState(ctx context.Context, client *clients.ApiClient, datafeedId string, desiredState State) (bool, diag.Diagnostics) { stateChecker := func(ctx context.Context) (bool, error) { - currentState, err := r.getDatafeedState(ctx, datafeedId) - if err != nil { - return false, err + currentState, diags := GetDatafeedState(ctx, client, datafeedId) + if diags.HasError() { + return false, diagutil.FwDiagsAsError(diags) + } + + if currentState == nil { + return false, fmt.Errorf("datafeed %s not found", datafeedId) } - return currentState == desiredState, nil + + if *currentState == desiredState { + return true, nil + } + + _, isInTerminalState := terminalDatafeedStates[*currentState] + if isInTerminalState { + return false, fmt.Errorf("%w: datafeed is in state [%s] but desired state is [%s]", errDatafeedInUndesiredState, *currentState, desiredState) + } + + return false, nil + } + + err := asyncutils.WaitForStateTransition(ctx, "datafeed", datafeedId, stateChecker) + if errors.Is(err, errDatafeedInUndesiredState) { + return false, nil } - return asyncutils.WaitForStateTransition(ctx, "datafeed", datafeedId, stateChecker) + return err == nil, diagutil.FrameworkDiagFromError(err) } diff --git a/internal/elasticsearch/ml/datafeed/update.go b/internal/elasticsearch/ml/datafeed/update.go index 8e967def8..c7eb776ea 100644 --- a/internal/elasticsearch/ml/datafeed/update.go +++ b/internal/elasticsearch/ml/datafeed/update.go @@ -2,7 +2,6 @@ package datafeed import ( "context" - "fmt" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" @@ -62,9 +61,9 @@ func (r *datafeedResource) update(ctx context.Context, req resource.UpdateReques } // Wait for the datafeed to reach started state - err := r.waitForDatafeedState(ctx, datafeedId, "started") - if err != nil { - resp.Diagnostics.AddError("Failed to wait for datafeed to start", fmt.Sprintf("Datafeed %s did not start within timeout: %s", datafeedId, err.Error())) + _, waitDiags := WaitForDatafeedState(ctx, r.client, datafeedId, StateStarted) + resp.Diagnostics.Append(waitDiags...) + if resp.Diagnostics.HasError() { return } } diff --git a/internal/elasticsearch/ml/datafeed_state/acc_test.go b/internal/elasticsearch/ml/datafeed_state/acc_test.go new file mode 100644 index 000000000..b75ab5ab9 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/acc_test.go @@ -0,0 +1,121 @@ +package datafeed_state_test + +import ( + "fmt" + "testing" + + "github.com/elastic/terraform-provider-elasticstack/internal/acctest" + "github.com/hashicorp/terraform-plugin-testing/config" + sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest" + "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "github.com/hashicorp/terraform-plugin-testing/terraform" +) + +func TestAccResourceMLDatafeedState_basic(t *testing.T) { + jobID := fmt.Sprintf("test-job-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + datafeedID := fmt.Sprintf("test-datafeed-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + indexName := fmt.Sprintf("test-datafeed-index-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + Steps: []resource.TestStep{ + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("started"), + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "datafeed_id": config.StringVariable(datafeedID), + "index_name": config.StringVariable(indexName), + }, + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "datafeed_id", datafeedID), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "state", "started"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "force", "false"), + ), + }, + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("stopped"), + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "datafeed_id": config.StringVariable(datafeedID), + "index_name": config.StringVariable(indexName), + }, + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "datafeed_id", datafeedID), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "state", "stopped"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "force", "false"), + ), + }, + }, + }) +} + +func TestAccResourceMLDatafeedState_import(t *testing.T) { + jobID := fmt.Sprintf("test-job-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + datafeedID := fmt.Sprintf("test-datafeed-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + indexName := fmt.Sprintf("test-datafeed-index-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + Steps: []resource.TestStep{ + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("create"), + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "datafeed_id": config.StringVariable(datafeedID), + "index_name": config.StringVariable(indexName), + }, + }, + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("create"), + ResourceName: "elasticstack_elasticsearch_ml_datafeed_state.test", + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIdentifierAttribute: "datafeed_id", + ImportStateVerifyIgnore: []string{"force", "datafeed_timeout", "id"}, + ImportStateIdFunc: func(s *terraform.State) (string, error) { + rs, ok := s.RootModule().Resources["elasticstack_elasticsearch_ml_datafeed_state.test"] + if !ok { + return "", fmt.Errorf("not found: %s", "elasticstack_elasticsearch_ml_datafeed_state.test") + } + return rs.Primary.Attributes["datafeed_id"], nil + }, + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "datafeed_id": config.StringVariable(datafeedID), + "index_name": config.StringVariable(indexName), + }, + }, + }, + }) +} + +func TestAccResourceMLDatafeedState_withTimes(t *testing.T) { + jobID := fmt.Sprintf("test-job-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + datafeedID := fmt.Sprintf("test-datafeed-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + indexName := fmt.Sprintf("test-datafeed-index-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + Steps: []resource.TestStep{ + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("with_times"), + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "datafeed_id": config.StringVariable(datafeedID), + "index_name": config.StringVariable(indexName), + }, + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "datafeed_id", datafeedID), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "state", "started"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "start", "2024-01-01T00:00:00Z"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.test", "end", "2024-01-02T00:00:00Z"), + ), + }, + }, + }) +} diff --git a/internal/elasticsearch/ml/datafeed_state/create.go b/internal/elasticsearch/ml/datafeed_state/create.go new file mode 100644 index 000000000..161355a7a --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/create.go @@ -0,0 +1,33 @@ +package datafeed_state + +import ( + "context" + "fmt" + "time" + + "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" + "github.com/hashicorp/terraform-plugin-framework/resource" +) + +func (r *mlDatafeedStateResource) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) { + var data MLDatafeedStateData + diags := req.Plan.Get(ctx, &data) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + // Get create timeout + createTimeout, fwDiags := data.Timeouts.Create(ctx, 5*time.Minute) // Default 5 minutes + resp.Diagnostics.Append(fwDiags...) + if resp.Diagnostics.HasError() { + return + } + + diags = r.update(ctx, req.Plan, &resp.State, createTimeout) + if diagutil.ContainsContextDeadlineExceeded(ctx, diags) { + diags.AddError("Operation timed out", fmt.Sprintf("The operation to create the ML datafeed state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", createTimeout)) + } + + resp.Diagnostics.Append(diags...) +} diff --git a/internal/elasticsearch/ml/datafeed_state/delete.go b/internal/elasticsearch/ml/datafeed_state/delete.go new file mode 100644 index 000000000..f6b4f5d6c --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/delete.go @@ -0,0 +1,68 @@ +package datafeed_state + +import ( + "context" + "fmt" + + "github.com/elastic/terraform-provider-elasticstack/internal/clients" + "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-log/tflog" +) + +func (r *mlDatafeedStateResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) { + var data MLDatafeedStateData + diags := req.State.Get(ctx, &data) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + client, fwDiags := clients.MaybeNewApiClientFromFrameworkResource(ctx, data.ElasticsearchConnection, r.client) + resp.Diagnostics.Append(fwDiags...) + if resp.Diagnostics.HasError() { + return + } + + datafeedId := data.DatafeedId.ValueString() + currentState, fwDiags := datafeed.GetDatafeedState(ctx, client, datafeedId) + resp.Diagnostics.Append(fwDiags...) + if resp.Diagnostics.HasError() { + return + } + + if currentState == nil { + // Datafeed already doesn't exist, nothing to do + tflog.Info(ctx, fmt.Sprintf("ML datafeed %s not found during delete", datafeedId)) + return + } + + // If the datafeed is started, stop it when deleting the resource + if *currentState == datafeed.StateStarted { + tflog.Info(ctx, fmt.Sprintf("Stopping ML datafeed %s during delete", datafeedId)) + + // Parse timeout duration + timeout, parseErrs := data.Timeout.Parse() + resp.Diagnostics.Append(parseErrs...) + if resp.Diagnostics.HasError() { + return + } + + force := data.Force.ValueBool() + fwDiags = elasticsearch.StopDatafeed(ctx, client, datafeedId, force, timeout) + resp.Diagnostics.Append(fwDiags...) + if resp.Diagnostics.HasError() { + return + } + + // Wait for the datafeed to stop + _, diags := datafeed.WaitForDatafeedState(ctx, client, datafeedId, datafeed.StateStopped) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + tflog.Info(ctx, fmt.Sprintf("ML datafeed %s successfully stopped during delete", datafeedId)) + } +} diff --git a/internal/elasticsearch/ml/datafeed_state/models.go b/internal/elasticsearch/ml/datafeed_state/models.go new file mode 100644 index 000000000..4a670e915 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/models.go @@ -0,0 +1,68 @@ +package datafeed_state + +import ( + "strconv" + "time" + + "github.com/elastic/terraform-provider-elasticstack/internal/models" + "github.com/elastic/terraform-provider-elasticstack/internal/utils" + "github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes" + "github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts" + "github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/types" +) + +type MLDatafeedStateData struct { + Id types.String `tfsdk:"id"` + ElasticsearchConnection types.List `tfsdk:"elasticsearch_connection"` + DatafeedId types.String `tfsdk:"datafeed_id"` + State types.String `tfsdk:"state"` + Force types.Bool `tfsdk:"force"` + Timeout customtypes.Duration `tfsdk:"datafeed_timeout"` + Start timetypes.RFC3339 `tfsdk:"start"` + End timetypes.RFC3339 `tfsdk:"end"` + Timeouts timeouts.Value `tfsdk:"timeouts"` +} + +func (d *MLDatafeedStateData) GetStartAsString() (string, diag.Diagnostics) { + return d.getTimeAttributeAsString(d.Start) +} + +func (d *MLDatafeedStateData) GetEndAsString() (string, diag.Diagnostics) { + return d.getTimeAttributeAsString(d.End) +} + +func (d *MLDatafeedStateData) getTimeAttributeAsString(val timetypes.RFC3339) (string, diag.Diagnostics) { + if !utils.IsKnown(val) { + return "", nil + } + + valTime, diags := val.ValueRFC3339Time() + if diags.HasError() { + return "", diags + } + return strconv.FormatInt(valTime.Unix(), 10), nil +} + +func (d *MLDatafeedStateData) SetStartAndEndFromAPI(datafeedStats *models.DatafeedStats) diag.Diagnostics { + var diags diag.Diagnostics + if datafeedStats.RunningState == nil { + diags.AddWarning("Running state was empty for a started datafeed", "The Elasticsearch API returned an empty running state for a Datafeed which was successfully started. Ignoring start and end response values.") + return diags + } + + if datafeedStats.RunningState.SearchInterval != nil { + d.Start = timetypes.NewRFC3339TimeValue(time.UnixMilli(datafeedStats.RunningState.SearchInterval.StartMS)) + d.End = timetypes.NewRFC3339TimeValue(time.UnixMilli(datafeedStats.RunningState.SearchInterval.EndMS)) + } else { + d.Start = timetypes.NewRFC3339Null() + d.End = timetypes.NewRFC3339Null() + } + + if datafeedStats.RunningState.RealTimeConfigured { + d.End = timetypes.NewRFC3339Null() + } + + return diags +} diff --git a/internal/elasticsearch/ml/datafeed_state/read.go b/internal/elasticsearch/ml/datafeed_state/read.go new file mode 100644 index 000000000..05f29eeac --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/read.go @@ -0,0 +1,72 @@ +package datafeed_state + +import ( + "context" + + "github.com/elastic/terraform-provider-elasticstack/internal/clients" + "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" + "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-framework/types" +) + +func (r *mlDatafeedStateResource) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) { + var data MLDatafeedStateData + diags := req.State.Get(ctx, &data) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + readData, diags := r.read(ctx, data) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + if readData == nil { + resp.State.RemoveResource(ctx) + return + } + + resp.Diagnostics.Append(resp.State.Set(ctx, readData)...) +} + +func (r *mlDatafeedStateResource) read(ctx context.Context, data MLDatafeedStateData) (*MLDatafeedStateData, diag.Diagnostics) { + client, diags := clients.MaybeNewApiClientFromFrameworkResource(ctx, data.ElasticsearchConnection, r.client) + if diags.HasError() { + return nil, diags + } + + datafeedId := data.DatafeedId.ValueString() + // Check if the datafeed exists by getting its stats + datafeedStats, getDiags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId) + diags.Append(getDiags...) + if diags.HasError() { + return nil, diags + } + + if datafeedStats == nil { + return nil, diags + } + + // Update the data with current information + data.State = types.StringValue(datafeedStats.State) + + // Regenerate composite ID to ensure it's current + compId, sdkDiags := client.ID(ctx, datafeedId) + diags.Append(diagutil.FrameworkDiagsFromSDK(sdkDiags)...) + if diags.HasError() { + return nil, diags + } + + data.Id = types.StringValue(compId.String()) + + if datafeed.State(datafeedStats.State) == datafeed.StateStarted { + diags.Append(data.SetStartAndEndFromAPI(datafeedStats)...) + } + + return &data, diags +} diff --git a/internal/elasticsearch/ml/datafeed_state/resource-description.md b/internal/elasticsearch/ml/datafeed_state/resource-description.md new file mode 100644 index 000000000..4f01d2398 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/resource-description.md @@ -0,0 +1,3 @@ +Manages the state of an existing Elasticsearch ML datafeed by starting or stopping it. This resource does not create or configure a datafeed, but instead manages the operational state of an existing datafeed. + +Note: Starting a non-realtime datafeed (i.e with an absolute end time) will result in the datafeed automatically stopping once all available data has been processed. By default, Terraform will restart the datafeed from the configured start time and reprocess all data again. It's recommended to ignore changes to the `state` attribute via the [resource lifecycle](https://developer.hashicorp.com/terraform/tutorials/state/resource-lifecycle#ignore-changes) for non-realtime datafeeds. \ No newline at end of file diff --git a/internal/elasticsearch/ml/datafeed_state/resource.go b/internal/elasticsearch/ml/datafeed_state/resource.go new file mode 100644 index 000000000..d8fde7969 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/resource.go @@ -0,0 +1,32 @@ +package datafeed_state + +import ( + "context" + + "github.com/elastic/terraform-provider-elasticstack/internal/clients" + "github.com/hashicorp/terraform-plugin-framework/path" + "github.com/hashicorp/terraform-plugin-framework/resource" +) + +func NewMLDatafeedStateResource() resource.Resource { + return &mlDatafeedStateResource{} +} + +type mlDatafeedStateResource struct { + client *clients.ApiClient +} + +func (r *mlDatafeedStateResource) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) { + resp.TypeName = req.ProviderTypeName + "_elasticsearch_ml_datafeed_state" +} + +func (r *mlDatafeedStateResource) Configure(_ context.Context, req resource.ConfigureRequest, resp *resource.ConfigureResponse) { + client, diags := clients.ConvertProviderData(req.ProviderData) + resp.Diagnostics.Append(diags...) + r.client = client +} + +func (r *mlDatafeedStateResource) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) { + // Retrieve import ID and save to datafeed_id attribute + resource.ImportStatePassthroughID(ctx, path.Root("datafeed_id"), req, resp) +} diff --git a/internal/elasticsearch/ml/datafeed_state/schema.go b/internal/elasticsearch/ml/datafeed_state/schema.go new file mode 100644 index 000000000..f38f15f69 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/schema.go @@ -0,0 +1,97 @@ +package datafeed_state + +import ( + "context" + _ "embed" + "regexp" + + "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed" + "github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes" + "github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts" + "github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes" + "github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-framework/resource/schema" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/booldefault" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringdefault" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/schema/validator" + + providerschema "github.com/elastic/terraform-provider-elasticstack/internal/schema" +) + +func (r *mlDatafeedStateResource) Schema(_ context.Context, _ resource.SchemaRequest, resp *resource.SchemaResponse) { + resp.Schema = GetSchema() +} + +//go:embed resource-description.md +var description string + +func GetSchema() schema.Schema { + return schema.Schema{ + MarkdownDescription: description, + Blocks: map[string]schema.Block{ + "elasticsearch_connection": providerschema.GetEsFWConnectionBlock("elasticsearch_connection", false), + }, + Attributes: map[string]schema.Attribute{ + "id": schema.StringAttribute{ + MarkdownDescription: "Internal identifier of the resource", + Computed: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.UseStateForUnknown(), + }, + }, + "datafeed_id": schema.StringAttribute{ + MarkdownDescription: "Identifier for the ML datafeed.", + Required: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.RequiresReplace(), + }, + Validators: []validator.String{ + stringvalidator.LengthBetween(1, 64), + stringvalidator.RegexMatches(regexp.MustCompile(`^[a-zA-Z0-9_-]+$`), "must contain only alphanumeric characters, hyphens, and underscores"), + }, + }, + "state": schema.StringAttribute{ + MarkdownDescription: "The desired state for the ML datafeed. Valid values are `started` and `stopped`.", + Required: true, + Validators: []validator.String{ + // We don't allow starting/stopping here since they're transient states + stringvalidator.OneOf(string(datafeed.StateStarted), string(datafeed.StateStopped)), + }, + }, + "force": schema.BoolAttribute{ + MarkdownDescription: "When stopping a datafeed, use to forcefully stop it.", + Optional: true, + Computed: true, + Default: booldefault.StaticBool(false), + }, + "start": schema.StringAttribute{ + MarkdownDescription: "The time that the datafeed should start collecting data. When not specified, the datafeed starts in real-time. This property must be specified in RFC 3339 format.", + CustomType: timetypes.RFC3339Type{}, + Optional: true, + Computed: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.UseStateForUnknown(), + }, + }, + "end": schema.StringAttribute{ + MarkdownDescription: "The time that the datafeed should end collecting data. When not specified, the datafeed continues in real-time. This property must be specified in RFC 3339 format.", + CustomType: timetypes.RFC3339Type{}, + Optional: true, + }, + "datafeed_timeout": schema.StringAttribute{ + MarkdownDescription: "Timeout for the operation. Examples: `30s`, `5m`, `1h`. Default is `30s`.", + Optional: true, + Computed: true, + Default: stringdefault.StaticString("30s"), + CustomType: customtypes.DurationType{}, + }, + "timeouts": timeouts.Attributes(context.Background(), timeouts.Opts{ + Create: true, + Update: true, + }), + }, + } +} diff --git a/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/started/datafeed_state.tf b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/started/datafeed_state.tf new file mode 100644 index 000000000..2a5964cdd --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/started/datafeed_state.tf @@ -0,0 +1,76 @@ +variable "job_id" { + description = "The ML job ID" + type = string +} + +variable "datafeed_id" { + description = "The ML datafeed ID" + type = string +} + +variable "index_name" { + description = "The index name" + type = string +} + +provider "elasticstack" { + elasticsearch {} +} + +resource "elasticstack_elasticsearch_index" "test" { + name = var.index_name + deletion_protection = false + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { + job_id = var.job_id + description = "Test job for datafeed state testing" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "test" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + state = "opened" +} + +resource "elasticstack_elasticsearch_ml_datafeed" "test" { + datafeed_id = var.datafeed_id + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + indices = [elasticstack_elasticsearch_index.test.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "test" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.test.datafeed_id + state = "started" + + depends_on = [ + elasticstack_elasticsearch_ml_datafeed.test, + elasticstack_elasticsearch_ml_job_state.test + ] +} \ No newline at end of file diff --git a/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/stopped/datafeed_state.tf b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/stopped/datafeed_state.tf new file mode 100644 index 000000000..da42e54d2 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_basic/stopped/datafeed_state.tf @@ -0,0 +1,76 @@ +variable "job_id" { + description = "The ML job ID" + type = string +} + +variable "datafeed_id" { + description = "The ML datafeed ID" + type = string +} + +variable "index_name" { + description = "The index name" + type = string +} + +provider "elasticstack" { + elasticsearch {} +} + +resource "elasticstack_elasticsearch_index" "test" { + name = var.index_name + deletion_protection = false + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { + job_id = var.job_id + description = "Test job for datafeed state testing" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "test" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + state = "opened" +} + +resource "elasticstack_elasticsearch_ml_datafeed" "test" { + datafeed_id = var.datafeed_id + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + indices = [elasticstack_elasticsearch_index.test.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "test" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.test.datafeed_id + state = "stopped" + + depends_on = [ + elasticstack_elasticsearch_ml_datafeed.test, + elasticstack_elasticsearch_ml_job_state.test + ] +} \ No newline at end of file diff --git a/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_import/create/datafeed_state.tf b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_import/create/datafeed_state.tf new file mode 100644 index 000000000..2a5964cdd --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_import/create/datafeed_state.tf @@ -0,0 +1,76 @@ +variable "job_id" { + description = "The ML job ID" + type = string +} + +variable "datafeed_id" { + description = "The ML datafeed ID" + type = string +} + +variable "index_name" { + description = "The index name" + type = string +} + +provider "elasticstack" { + elasticsearch {} +} + +resource "elasticstack_elasticsearch_index" "test" { + name = var.index_name + deletion_protection = false + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { + job_id = var.job_id + description = "Test job for datafeed state testing" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "test" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + state = "opened" +} + +resource "elasticstack_elasticsearch_ml_datafeed" "test" { + datafeed_id = var.datafeed_id + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + indices = [elasticstack_elasticsearch_index.test.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "test" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.test.datafeed_id + state = "started" + + depends_on = [ + elasticstack_elasticsearch_ml_datafeed.test, + elasticstack_elasticsearch_ml_job_state.test + ] +} \ No newline at end of file diff --git a/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_withTimes/with_times/datafeed_state.tf b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_withTimes/with_times/datafeed_state.tf new file mode 100644 index 000000000..3ac6a2abc --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/testdata/TestAccResourceMLDatafeedState_withTimes/with_times/datafeed_state.tf @@ -0,0 +1,87 @@ +variable "job_id" { + description = "The ML job ID" + type = string +} + +variable "datafeed_id" { + description = "The ML datafeed ID" + type = string +} + +variable "index_name" { + description = "The index name" + type = string +} + +provider "elasticstack" { + elasticsearch {} +} + +resource "elasticstack_elasticsearch_index" "test" { + name = var.index_name + deletion_protection = false + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { + job_id = var.job_id + description = "Test job for datafeed state testing with time range" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "10mb" + } +} + +resource "elasticstack_elasticsearch_ml_job_state" "test" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + state = "opened" + + lifecycle { + ignore_changes = ["state"] + } +} + +resource "elasticstack_elasticsearch_ml_datafeed" "test" { + datafeed_id = var.datafeed_id + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + indices = [elasticstack_elasticsearch_index.test.name] + query = jsonencode({ + match_all = {} + }) +} + +resource "elasticstack_elasticsearch_ml_datafeed_state" "test" { + datafeed_id = elasticstack_elasticsearch_ml_datafeed.test.datafeed_id + state = "started" + start = "2024-01-01T00:00:00Z" + end = "2024-01-02T00:00:00Z" + datafeed_timeout = "60s" + + depends_on = [ + elasticstack_elasticsearch_ml_datafeed.test, + elasticstack_elasticsearch_ml_job_state.test + ] + + lifecycle { + ignore_changes = ["state"] + } +} \ No newline at end of file diff --git a/internal/elasticsearch/ml/datafeed_state/update.go b/internal/elasticsearch/ml/datafeed_state/update.go new file mode 100644 index 000000000..f188d4cc1 --- /dev/null +++ b/internal/elasticsearch/ml/datafeed_state/update.go @@ -0,0 +1,220 @@ +package datafeed_state + +import ( + "context" + "fmt" + "time" + + "github.com/elastic/terraform-provider-elasticstack/internal/clients" + "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" + "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed" + "github.com/elastic/terraform-provider-elasticstack/internal/models" + "github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-framework/tfsdk" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/hashicorp/terraform-plugin-log/tflog" +) + +func (r *mlDatafeedStateResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) { + var data MLDatafeedStateData + diags := req.Plan.Get(ctx, &data) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + // Get update timeout + updateTimeout, fwDiags := data.Timeouts.Update(ctx, 5*time.Minute) // Default 5 minutes + resp.Diagnostics.Append(fwDiags...) + if resp.Diagnostics.HasError() { + return + } + + diags = r.update(ctx, req.Plan, &resp.State, updateTimeout) + if diagutil.ContainsContextDeadlineExceeded(ctx, diags) { + diags.AddError("Operation timed out", fmt.Sprintf("The operation to update the ML datafeed state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", updateTimeout)) + } + + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } +} + +func (r *mlDatafeedStateResource) update(ctx context.Context, plan tfsdk.Plan, state *tfsdk.State, operationTimeout time.Duration) diag.Diagnostics { + var data MLDatafeedStateData + diags := plan.Get(ctx, &data) + if diags.HasError() { + return diags + } + + client, fwDiags := clients.MaybeNewApiClientFromFrameworkResource(ctx, data.ElasticsearchConnection, r.client) + diags.Append(fwDiags...) + if diags.HasError() { + return diags + } + + datafeedId := data.DatafeedId.ValueString() + desiredState := data.State.ValueString() + + // Create context with timeout + ctx, cancel := context.WithTimeout(ctx, operationTimeout) + defer cancel() + + // First, get the current datafeed stats to check if the datafeed exists and its current state + datafeedStats, fwDiags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId) + diags.Append(fwDiags...) + if diags.HasError() { + return diags + } + + if datafeedStats == nil { + diags.AddError( + "ML Datafeed not found", + fmt.Sprintf("ML datafeed %s does not exist", datafeedId), + ) + return diags + } + + // Perform state transition if needed + inDesiredState, fwDiags := r.performStateTransition(ctx, client, data, datafeed.State(datafeedStats.State)) + diags.Append(fwDiags...) + if diags.HasError() { + return diags + } + + // Generate composite ID + compId, sdkDiags := client.ID(ctx, datafeedId) + if len(sdkDiags) > 0 { + for _, d := range sdkDiags { + diags.AddError(d.Summary, d.Detail) + } + return diags + } + + // Set the response state + data.Id = types.StringValue(compId.String()) + + var finalData *MLDatafeedStateData + if inDesiredState { + var getDiags diag.Diagnostics + finalData, getDiags = r.read(ctx, data) + diags.Append(getDiags...) + if diags.HasError() { + return diags + } + } else { + var updateDiags diag.Diagnostics + finalData, updateDiags = r.updateAfterMissedTransition(ctx, client, data, datafeedStats) + diags.Append(updateDiags...) + if diags.HasError() { + return diags + } + } + + if finalData == nil { + diags.AddError("Failed to read datafeed stats after update", fmt.Sprintf("The datafeed was successfully transitioned to the %s state, but could not be read after this change", desiredState)) + return diags + } + + diags.Append(state.Set(ctx, finalData)...) + return diags +} + +func (r *mlDatafeedStateResource) updateAfterMissedTransition(ctx context.Context, client *clients.ApiClient, data MLDatafeedStateData, datafeedStats *models.DatafeedStats) (*MLDatafeedStateData, diag.Diagnostics) { + datafeedId := data.DatafeedId.ValueString() + statsAfterUpdate, diags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId) + if diags.HasError() { + return nil, diags + } + + if statsAfterUpdate == nil { + diags.AddError( + "ML Datafeed not found", + fmt.Sprintf("ML datafeed %s does not exist after successful update", datafeedId), + ) + return nil, diags + } + + // It's possible that the datafeed starts, and then immediately stops if there is no (or very little) data to process. + // In this case, the state transition may occur too quickly to be detected by the wait function. + // To handle this, we check if the search count has increased to determine if the datafeed actually started since the update. + if statsAfterUpdate.TimingStats == nil || datafeedStats.TimingStats == nil { + diags.AddWarning("Expected Datafeed to contain timing stats", + fmt.Sprintf("Stats for datafeed %s did not contain timing stats either before or after the update. Before %v - After %v", datafeedId, datafeedStats, statsAfterUpdate)) + } else if statsAfterUpdate.TimingStats.SearchCount <= datafeedStats.TimingStats.SearchCount { + diags.AddError( + "Datafeed did not successfully transition to the desired state", + fmt.Sprintf("[%s] datafeed did not settle into the [%s] state. The current state is [%s]", datafeedId, data.State.ValueString(), statsAfterUpdate.State), + ) + return nil, diags + } + + if data.Start.IsUnknown() { + data.Start = timetypes.NewRFC3339Null() + } + + return &data, nil +} + +// performStateTransition handles the ML datafeed state transition process +func (r *mlDatafeedStateResource) performStateTransition(ctx context.Context, client *clients.ApiClient, data MLDatafeedStateData, currentState datafeed.State) (bool, diag.Diagnostics) { + datafeedId := data.DatafeedId.ValueString() + desiredState := datafeed.State(data.State.ValueString()) + force := data.Force.ValueBool() + + // Parse timeout duration + timeout, parseErrs := data.Timeout.Parse() + if parseErrs.HasError() { + return false, parseErrs + } + + // Return early if no state change is needed + if currentState == desiredState { + tflog.Debug(ctx, fmt.Sprintf("ML datafeed %s is already in desired state %s", datafeedId, desiredState)) + return true, nil + } + + // Initiate the state change + switch desiredState { + case datafeed.StateStarted: + start, diags := data.GetStartAsString() + if diags.HasError() { + return false, diags + } + end, endDiags := data.GetEndAsString() + diags.Append(endDiags...) + if diags.HasError() { + return false, diags + } + + startDiags := elasticsearch.StartDatafeed(ctx, client, datafeedId, start, end, timeout) + diags.Append(startDiags...) + if diags.HasError() { + return false, diags + } + case datafeed.StateStopped: + if diags := elasticsearch.StopDatafeed(ctx, client, datafeedId, force, timeout); diags.HasError() { + return false, diags + } + default: + return false, diag.Diagnostics{ + diag.NewErrorDiagnostic( + "Invalid state", + fmt.Sprintf("Invalid state %s. Valid states are 'started' and 'stopped'", desiredState), + ), + } + } + + // Wait for state transition to complete + inDesiredState, diags := datafeed.WaitForDatafeedState(ctx, client, datafeedId, desiredState) + if diags.HasError() { + return false, diags + } + + tflog.Info(ctx, fmt.Sprintf("ML datafeed %s successfully transitioned to state %s", datafeedId, desiredState)) + return inDesiredState, nil +} diff --git a/internal/elasticsearch/ml/job_state/acc_test.go b/internal/elasticsearch/ml/job_state/acc_test.go index aa91e2e5e..bc7ab1f89 100644 --- a/internal/elasticsearch/ml/job_state/acc_test.go +++ b/internal/elasticsearch/ml/job_state/acc_test.go @@ -115,3 +115,23 @@ func TestAccResourceMLJobStateImport(t *testing.T) { }, }) } + +func TestAccResourceMLJobState_timeouts(t *testing.T) { + jobID := fmt.Sprintf("test-job-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + indexName := fmt.Sprintf("test-datafeed-index-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum)) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + Steps: []resource.TestStep{ + { + ProtoV6ProviderFactories: acctest.Providers, + ConfigDirectory: acctest.NamedTestCaseDirectory("timeouts"), + ConfigVariables: config.Variables{ + "job_id": config.StringVariable(jobID), + "index_name": config.StringVariable(indexName), + }, + ExpectError: regexp.MustCompile("Operation timed out"), + }, + }, + }) +} diff --git a/internal/elasticsearch/ml/job_state/create.go b/internal/elasticsearch/ml/job_state/create.go index 10e7914cd..7f33159d9 100644 --- a/internal/elasticsearch/ml/job_state/create.go +++ b/internal/elasticsearch/ml/job_state/create.go @@ -2,8 +2,10 @@ package job_state import ( "context" + "fmt" "time" + "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" "github.com/hashicorp/terraform-plugin-framework/resource" ) @@ -23,5 +25,9 @@ func (r *mlJobStateResource) Create(ctx context.Context, req resource.CreateRequ } diags = r.update(ctx, req.Plan, &resp.State, createTimeout) + if diagutil.ContainsContextDeadlineExceeded(ctx, diags) { + diags.AddError("Operation timed out", fmt.Sprintf("The operation to create the ML job state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", createTimeout)) + } + resp.Diagnostics.Append(diags...) } diff --git a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf new file mode 100644 index 000000000..698b7e2f6 --- /dev/null +++ b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState_timeouts/timeouts/job_state.tf @@ -0,0 +1,58 @@ +variable "job_id" { + description = "The ML job ID" + type = string +} + +variable "index_name" { + description = "The index name" + type = string +} + +provider "elasticstack" { + elasticsearch {} +} + +resource "elasticstack_elasticsearch_index" "test" { + name = var.index_name + deletion_protection = false + mappings = jsonencode({ + properties = { + "@timestamp" = { + type = "date" + } + value = { + type = "double" + } + } + }) +} + +resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" { + job_id = var.job_id + description = "Test job for datafeed state timeout testing with large memory" + analysis_config = { + bucket_span = "1h" + detectors = [{ + function = "count" + detector_description = "count" + }] + } + data_description = { + time_field = "@timestamp" + time_format = "epoch_ms" + } + analysis_limits = { + model_memory_limit = "2gb" + } + allow_lazy_open = true # This should cause datafeed to wait for available node +} + +resource "elasticstack_elasticsearch_ml_job_state" "test" { + job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id + state = "opened" + + timeouts = { + create = "10s" + update = "10s" + } +} \ No newline at end of file diff --git a/internal/elasticsearch/ml/job_state/update.go b/internal/elasticsearch/ml/job_state/update.go index 03be71df3..075fa2259 100644 --- a/internal/elasticsearch/ml/job_state/update.go +++ b/internal/elasticsearch/ml/job_state/update.go @@ -7,6 +7,7 @@ import ( "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" + "github.com/elastic/terraform-provider-elasticstack/internal/diagutil" "github.com/hashicorp/terraform-plugin-framework/diag" "github.com/hashicorp/terraform-plugin-framework/resource" "github.com/hashicorp/terraform-plugin-framework/tfsdk" @@ -30,6 +31,10 @@ func (r *mlJobStateResource) Update(ctx context.Context, req resource.UpdateRequ } diags = r.update(ctx, req.Plan, &resp.State, updateTimeout) + if diagutil.ContainsContextDeadlineExceeded(ctx, diags) { + diags.AddError("Operation timed out", fmt.Sprintf("The operation to update the ML job state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", updateTimeout)) + } + resp.Diagnostics.Append(diags...) if resp.Diagnostics.HasError() { return @@ -72,7 +77,7 @@ func (r *mlJobStateResource) update(ctx context.Context, plan tfsdk.Plan, state } // Perform state transition if needed - fwDiags = r.performStateTransition(ctx, client, data, *currentState, operationTimeout) + fwDiags = r.performStateTransition(ctx, client, data, *currentState) diags.Append(fwDiags...) if diags.HasError() { return diags @@ -97,7 +102,7 @@ func (r *mlJobStateResource) update(ctx context.Context, plan tfsdk.Plan, state } // performStateTransition handles the ML job state transition process -func (r *mlJobStateResource) performStateTransition(ctx context.Context, client *clients.ApiClient, data MLJobStateData, currentState string, operationTimeout time.Duration) diag.Diagnostics { +func (r *mlJobStateResource) performStateTransition(ctx context.Context, client *clients.ApiClient, data MLJobStateData, currentState string) diag.Diagnostics { jobId := data.JobId.ValueString() desiredState := data.State.ValueString() force := data.Force.ValueBool() diff --git a/internal/models/ml.go b/internal/models/ml.go index 8389d5409..4e29f4da1 100644 --- a/internal/models/ml.go +++ b/internal/models/ml.go @@ -106,11 +106,12 @@ type DatafeedStatsResponse struct { // DatafeedStats represents the statistics for a single datafeed type DatafeedStats struct { - DatafeedId string `json:"datafeed_id"` - State string `json:"state"` - Node *DatafeedNode `json:"node,omitempty"` - AssignmentExplanation *string `json:"assignment_explanation,omitempty"` - RunningState *DatafeedRunning `json:"running_state,omitempty"` + DatafeedId string `json:"datafeed_id"` + State string `json:"state"` + Node *DatafeedNode `json:"node,omitempty"` + AssignmentExplanation *string `json:"assignment_explanation,omitempty"` + RunningState *DatafeedRunning `json:"running_state,omitempty"` + TimingStats *DatafeedTimingStats `json:"timing_stats"` } // DatafeedNode represents the node information for a datafeed @@ -124,8 +125,36 @@ type DatafeedNode struct { // DatafeedRunning represents the running state of a datafeed type DatafeedRunning struct { - RealTimeConfigured bool `json:"real_time_configured"` - RealTimeRunning bool `json:"real_time_running"` - SearchInterval *int64 `json:"search_interval,omitempty"` - LastEndTime *time.Time `json:"last_end_time,omitempty"` + RealTimeConfigured bool `json:"real_time_configured"` + RealTimeRunning bool `json:"real_time_running"` + SearchInterval *DatafeedSearchInterval `json:"search_interval,omitempty"` + LastEndTime *time.Time `json:"last_end_time,omitempty"` +} + +type DatafeedTimingStats struct { + SearchCount int `json:"search_count"` +} + +type DatafeedSearchInterval struct { + StartMS int64 `json:"start_ms"` + EndMS int64 `json:"end_ms"` +} + +// MLJobStats represents the statistics structure for an ML job +type MLJobStats struct { + Jobs []MLJob `json:"jobs"` +} + +// MLJob represents a single ML job in the stats response +type MLJob struct { + JobId string `json:"job_id"` + State string `json:"state"` + Node *MLJobNode `json:"node,omitempty"` +} + +// MLJobNode represents the node information for an ML job +type MLJobNode struct { + Id string `json:"id"` + Name string `json:"name"` + Attributes map[string]interface{} `json:"attributes"` } diff --git a/internal/utils/customtypes/memory_size_type.go b/internal/utils/customtypes/memory_size_type.go new file mode 100644 index 000000000..da783a788 --- /dev/null +++ b/internal/utils/customtypes/memory_size_type.go @@ -0,0 +1,68 @@ +package customtypes + +import ( + "context" + "fmt" + + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/types/basetypes" + "github.com/hashicorp/terraform-plugin-go/tftypes" +) + +var ( + _ basetypes.StringTypable = (*MemorySizeType)(nil) +) + +type MemorySizeType struct { + basetypes.StringType +} + +// String returns a human readable string of the type name. +func (t MemorySizeType) String() string { + return "customtypes.MemorySizeType" +} + +// ValueType returns the Value type. +func (t MemorySizeType) ValueType(ctx context.Context) attr.Value { + return MemorySize{} +} + +// Equal returns true if the given type is equivalent. +func (t MemorySizeType) Equal(o attr.Type) bool { + other, ok := o.(MemorySizeType) + + if !ok { + return false + } + + return t.StringType.Equal(other.StringType) +} + +// ValueFromString returns a StringValuable type given a StringValue. +func (t MemorySizeType) ValueFromString(ctx context.Context, in basetypes.StringValue) (basetypes.StringValuable, diag.Diagnostics) { + return MemorySize{ + StringValue: in, + }, nil +} + +// ValueFromTerraform returns a Value given a tftypes.Value. This is meant to convert the tftypes.Value into a more convenient Go type +// for the provider to consume the data with. +func (t MemorySizeType) ValueFromTerraform(ctx context.Context, in tftypes.Value) (attr.Value, error) { + attrValue, err := t.StringType.ValueFromTerraform(ctx, in) + if err != nil { + return nil, err + } + + stringValue, ok := attrValue.(basetypes.StringValue) + if !ok { + return nil, fmt.Errorf("unexpected value type of %T", attrValue) + } + + stringValuable, diags := t.ValueFromString(ctx, stringValue) + if diags.HasError() { + return nil, fmt.Errorf("unexpected error converting StringValue to StringValuable: %v", diags) + } + + return stringValuable, nil +} diff --git a/internal/utils/customtypes/memory_size_type_test.go b/internal/utils/customtypes/memory_size_type_test.go new file mode 100644 index 000000000..6c81ca1f7 --- /dev/null +++ b/internal/utils/customtypes/memory_size_type_test.go @@ -0,0 +1,45 @@ +package customtypes + +import ( + "context" + "testing" + + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/stretchr/testify/require" +) + +func TestMemorySizeType_String(t *testing.T) { + require.Equal(t, "customtypes.MemorySizeType", MemorySizeType{}.String()) +} + +func TestMemorySizeType_ValueType(t *testing.T) { + require.Equal(t, MemorySize{}, MemorySizeType{}.ValueType(context.Background())) +} + +func TestMemorySizeType_Equal(t *testing.T) { + tests := []struct { + name string + typ MemorySizeType + other attr.Type + expected bool + }{ + { + name: "equal to another MemorySizeType", + typ: MemorySizeType{}, + other: MemorySizeType{}, + expected: true, + }, + { + name: "not equal to different type", + typ: MemorySizeType{}, + other: DurationType{}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, tt.typ.Equal(tt.other)) + }) + } +} diff --git a/internal/utils/customtypes/memory_size_value.go b/internal/utils/customtypes/memory_size_value.go new file mode 100644 index 000000000..59f1bc0bf --- /dev/null +++ b/internal/utils/customtypes/memory_size_value.go @@ -0,0 +1,187 @@ +package customtypes + +import ( + "context" + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/hashicorp/terraform-plugin-framework/attr/xattr" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/types/basetypes" +) + +var ( + _ basetypes.StringValuable = (*MemorySize)(nil) + _ basetypes.StringValuableWithSemanticEquals = (*MemorySize)(nil) + _ xattr.ValidateableAttribute = (*MemorySize)(nil) +) + +// memoryPattern matches memory size strings with optional 'b' suffix +var memoryPattern = regexp.MustCompile(`^(\d+)([kmgtKMGT])?[bB]?$`) + +type MemorySize struct { + basetypes.StringValue +} + +// Type returns a MemorySizeType. +func (v MemorySize) Type(_ context.Context) attr.Type { + return MemorySizeType{} +} + +// Equal returns true if the given value is equivalent. +func (v MemorySize) Equal(o attr.Value) bool { + other, ok := o.(MemorySize) + + if !ok { + return false + } + + return v.StringValue.Equal(other.StringValue) +} + +func (t MemorySize) ValidateAttribute(ctx context.Context, req xattr.ValidateAttributeRequest, resp *xattr.ValidateAttributeResponse) { + if t.IsNull() || t.IsUnknown() { + return + } + + valueString := t.ValueString() + if !memoryPattern.MatchString(valueString) { + resp.Diagnostics.AddAttributeError( + req.Path, + "Invalid memory size string value", + fmt.Sprintf("A string value was provided that is not a valid memory size format\n\nGiven value \"%s\"\nExpected format: number followed by optional unit (k/K, m/M, g/G, t/T) and optional 'b/B' suffix", valueString), + ) + } +} + +// StringSemanticEquals returns true if the given memory size string value is semantically equal to the current memory size string value. +// When compared, the memory sizes are parsed into bytes and the byte values compared. +func (v MemorySize) StringSemanticEquals(_ context.Context, newValuable basetypes.StringValuable) (bool, diag.Diagnostics) { + var diags diag.Diagnostics + + newValue, ok := newValuable.(MemorySize) + if !ok { + diags.AddError( + "Semantic equality check error", + "An unexpected value type was received while performing semantic equality checks. "+ + "Please report this to the provider developers.\n\n"+ + "Expected Value Type: "+fmt.Sprintf("%T", v)+"\n"+ + "Got Value Type: "+fmt.Sprintf("%T", newValuable), + ) + + return false, diags + } + + if v.IsNull() { + return newValue.IsNull(), diags + } + + if v.IsUnknown() { + return newValue.IsUnknown(), diags + } + + vParsed, diags := v.ConvertToMB() + if diags.HasError() { + return false, diags + } + + newParsed, diags := newValue.ConvertToMB() + if diags.HasError() { + return false, diags + } + + return vParsed == newParsed, diags +} + +// ConvertToMB parses the memory size string and returns the equivalent number of megabytes. +// Supports units: k/K (kilobytes), m/M (megabytes), g/G (gigabytes), t/T (terabytes) +// The 'b' suffix is optional and ignored. +// Note: As per ML documentation, values are rounded down to the nearest MB for consistency. +func (v MemorySize) ConvertToMB() (int64, diag.Diagnostics) { + var diags diag.Diagnostics + + if v.IsNull() { + diags.Append(diag.NewErrorDiagnostic("Memory size parse error", "memory size string value is null")) + return 0, diags + } + + if v.IsUnknown() { + diags.Append(diag.NewErrorDiagnostic("Memory size parse error", "memory size string value is unknown")) + return 0, diags + } + + valueString := v.ValueString() + matches := memoryPattern.FindStringSubmatch(valueString) + if len(matches) != 3 { + diags.Append(diag.NewErrorDiagnostic("Memory size parse error", + fmt.Sprintf("invalid memory size format: %s", valueString))) + return 0, diags + } + + // Parse the numeric part + numStr := matches[1] + num, err := strconv.ParseInt(numStr, 10, 64) + if err != nil { + diags.Append(diag.NewErrorDiagnostic("Memory size parse error", + fmt.Sprintf("invalid number in memory size: %s", numStr))) + return 0, diags + } + + // Parse the unit part (if present) and calculate bytes + unit := strings.ToLower(matches[2]) + var bytes int64 + + switch unit { + case "k": + bytes = num * 1024 + case "m": + bytes = num * 1024 * 1024 + case "g": + bytes = num * 1024 * 1024 * 1024 + case "t": + bytes = num * 1024 * 1024 * 1024 * 1024 + case "": // no unit = bytes + bytes = num + default: + diags.Append(diag.NewErrorDiagnostic("Memory size parse error", + fmt.Sprintf("unsupported memory unit: %s", unit))) + return 0, diags + } + + // Round down to the nearest MB (1024*1024 = 1048576 bytes) as per ML documentation + const mbInBytes = 1024 * 1024 + roundedMB := bytes / mbInBytes + + return roundedMB, diags +} + +// NewMemorySizeNull creates a MemorySize with a null value. Determine whether the value is null via IsNull method. +func NewMemorySizeNull() MemorySize { + return MemorySize{ + StringValue: basetypes.NewStringNull(), + } +} + +// NewMemorySizeUnknown creates a MemorySize with an unknown value. Determine whether the value is unknown via IsUnknown method. +func NewMemorySizeUnknown() MemorySize { + return MemorySize{ + StringValue: basetypes.NewStringUnknown(), + } +} + +// NewMemorySizeValue creates a MemorySize with a known value. Access the value via ValueString method. +func NewMemorySizeValue(value string) MemorySize { + return MemorySize{ + StringValue: basetypes.NewStringValue(value), + } +} + +// NewMemorySizePointerValue creates a MemorySize with a null value if nil or a known value. Access the value via ValueStringPointer method. +func NewMemorySizePointerValue(value *string) MemorySize { + return MemorySize{ + StringValue: basetypes.NewStringPointerValue(value), + } +} diff --git a/internal/utils/customtypes/memory_size_value_test.go b/internal/utils/customtypes/memory_size_value_test.go new file mode 100644 index 000000000..35bdc6e60 --- /dev/null +++ b/internal/utils/customtypes/memory_size_value_test.go @@ -0,0 +1,384 @@ +package customtypes + +import ( + "context" + "testing" + + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/hashicorp/terraform-plugin-framework/attr/xattr" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/path" + "github.com/hashicorp/terraform-plugin-framework/types/basetypes" + "github.com/stretchr/testify/require" +) + +func TestMemorySize_Type(t *testing.T) { + require.Equal(t, MemorySizeType{}, MemorySize{}.Type(context.Background())) +} + +func TestMemorySize_Equal(t *testing.T) { + tests := []struct { + name string + expectedEqual bool + val MemorySize + otherVal attr.Value + }{ + { + name: "not equal if the other value is not a memory size", + expectedEqual: false, + val: NewMemorySizeValue("128mb"), + otherVal: basetypes.NewBoolValue(true), + }, + { + name: "not equal if the memory sizes are not equal", + expectedEqual: false, + val: NewMemorySizeValue("128mb"), + otherVal: NewMemorySizeValue("256mb"), + }, + { + name: "not equal if the memory sizes are semantically equal but string values are not equal", + expectedEqual: false, + val: NewMemorySizeValue("1gb"), + otherVal: NewMemorySizeValue("1024mb"), + }, + { + name: "equal if the memory size string values are equal", + expectedEqual: true, + val: NewMemorySizeValue("128mb"), + otherVal: NewMemorySizeValue("128mb"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expectedEqual, tt.val.Equal(tt.otherVal)) + }) + } +} + +func TestMemorySize_ValidateAttribute(t *testing.T) { + tests := []struct { + name string + memorySize MemorySize + expectedDiags diag.Diagnostics + }{ + { + name: "unknown is valid", + memorySize: NewMemorySizeNull(), + }, + { + name: "null is valid", + memorySize: NewMemorySizeUnknown(), + }, + { + name: "valid memory sizes are valid - bytes", + memorySize: NewMemorySizeValue("1024"), + }, + { + name: "valid memory sizes are valid - kilobytes", + memorySize: NewMemorySizeValue("128k"), + }, + { + name: "valid memory sizes are valid - kilobytes with B", + memorySize: NewMemorySizeValue("128kb"), + }, + { + name: "valid memory sizes are valid - megabytes", + memorySize: NewMemorySizeValue("128m"), + }, + { + name: "valid memory sizes are valid - megabytes with B", + memorySize: NewMemorySizeValue("128mb"), + }, + { + name: "valid memory sizes are valid - uppercase megabytes", + memorySize: NewMemorySizeValue("128MB"), + }, + { + name: "valid memory sizes are valid - gigabytes", + memorySize: NewMemorySizeValue("2g"), + }, + { + name: "valid memory sizes are valid - gigabytes with B", + memorySize: NewMemorySizeValue("2gb"), + }, + { + name: "valid memory sizes are valid - terabytes", + memorySize: NewMemorySizeValue("1t"), + }, + { + name: "non-memory strings are invalid", + memorySize: NewMemorySizeValue("not a memory size"), + expectedDiags: diag.Diagnostics{ + diag.NewAttributeErrorDiagnostic( + path.Root("memory_size"), + "Invalid memory size string value", + "A string value was provided that is not a valid memory size format\n\nGiven value \"not a memory size\"\nExpected format: number followed by optional unit (k/K, m/M, g/G, t/T) and optional 'b/B' suffix", + ), + }, + }, + { + name: "negative numbers are invalid", + memorySize: NewMemorySizeValue("-128mb"), + expectedDiags: diag.Diagnostics{ + diag.NewAttributeErrorDiagnostic( + path.Root("memory_size"), + "Invalid memory size string value", + "A string value was provided that is not a valid memory size format\n\nGiven value \"-128mb\"\nExpected format: number followed by optional unit (k/K, m/M, g/G, t/T) and optional 'b/B' suffix", + ), + }, + }, + { + name: "float numbers are invalid", + memorySize: NewMemorySizeValue("128.5mb"), + expectedDiags: diag.Diagnostics{ + diag.NewAttributeErrorDiagnostic( + path.Root("memory_size"), + "Invalid memory size string value", + "A string value was provided that is not a valid memory size format\n\nGiven value \"128.5mb\"\nExpected format: number followed by optional unit (k/K, m/M, g/G, t/T) and optional 'b/B' suffix", + ), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resp := xattr.ValidateAttributeResponse{} + + tt.memorySize.ValidateAttribute( + context.Background(), + xattr.ValidateAttributeRequest{ + Path: path.Root("memory_size"), + }, + &resp, + ) + + if tt.expectedDiags == nil { + require.Nil(t, resp.Diagnostics) + } else { + require.Equal(t, tt.expectedDiags, resp.Diagnostics) + } + }) + } +} + +func TestMemorySize_StringSemanticEquals(t *testing.T) { + tests := []struct { + name string + memorySize MemorySize + otherVal basetypes.StringValuable + expectedEqual bool + expectedErrorDiags bool + }{ + { + name: "should error if the other value is not a memory size", + memorySize: NewMemorySizeValue("128mb"), + otherVal: basetypes.NewStringValue("128mb"), + expectedEqual: false, + expectedErrorDiags: true, + }, + { + name: "two null values are semantically equal", + memorySize: NewMemorySizeNull(), + otherVal: NewMemorySizeNull(), + expectedEqual: true, + }, + { + name: "null is not equal to unknown", + memorySize: NewMemorySizeNull(), + otherVal: NewMemorySizeUnknown(), + expectedEqual: false, + }, + { + name: "null is not equal to a string value", + memorySize: NewMemorySizeNull(), + otherVal: NewMemorySizeValue("128mb"), + expectedEqual: false, + }, + { + name: "two unknown values are semantically equal", + memorySize: NewMemorySizeUnknown(), + otherVal: NewMemorySizeUnknown(), + expectedEqual: true, + }, + { + name: "unknown is not equal to a string value", + memorySize: NewMemorySizeUnknown(), + otherVal: NewMemorySizeValue("128mb"), + expectedEqual: false, + }, + { + name: "two equal values are semantically equal", + memorySize: NewMemorySizeValue("128mb"), + otherVal: NewMemorySizeValue("128mb"), + expectedEqual: true, + }, + { + name: "two semantically equal values - gb to mb", + memorySize: NewMemorySizeValue("2g"), + otherVal: NewMemorySizeValue("2048m"), + expectedEqual: true, + }, + { + name: "two semantically equal values - gb with B to mb", + memorySize: NewMemorySizeValue("2gb"), + otherVal: NewMemorySizeValue("2048mb"), + expectedEqual: true, + }, + { + name: "two semantically equal values - different case", + memorySize: NewMemorySizeValue("128MB"), + otherVal: NewMemorySizeValue("128mb"), + expectedEqual: true, + }, + { + name: "two semantically equal values - kb to bytes (rounded to MB)", + memorySize: NewMemorySizeValue("2048k"), + otherVal: NewMemorySizeValue("2097152"), + expectedEqual: true, + }, + { + name: "bytes that don't round to same MB are not equal", + memorySize: NewMemorySizeValue("1048576"), // exactly 1MB + otherVal: NewMemorySizeValue("1048575"), // 1 byte less, rounds to 0MB + expectedEqual: false, + }, + { + name: "partial MB values round down to same MB", + memorySize: NewMemorySizeValue("1500000"), // ~1.43MB, rounds to 1MB + otherVal: NewMemorySizeValue("1048576"), // exactly 1MB + expectedEqual: true, + }, + { + name: "errors if this value is invalid", + memorySize: NewMemorySizeValue("not a memory size"), + otherVal: NewMemorySizeValue("128mb"), + expectedEqual: false, + expectedErrorDiags: true, + }, + { + name: "errors if the other value is invalid", + memorySize: NewMemorySizeValue("128mb"), + otherVal: NewMemorySizeValue("not a memory size"), + expectedEqual: false, + expectedErrorDiags: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + isEqual, diags := tt.memorySize.StringSemanticEquals(context.Background(), tt.otherVal) + + require.Equal(t, tt.expectedEqual, isEqual) + require.Equal(t, tt.expectedErrorDiags, diags.HasError()) + }) + } +} + +func TestMemorySize_ParseBytes(t *testing.T) { + tests := []struct { + name string + memorySize MemorySize + expectedMB int64 + expectedError bool + }{ + { + name: "null value should error", + memorySize: NewMemorySizeNull(), + expectedError: true, + }, + { + name: "unknown value should error", + memorySize: NewMemorySizeUnknown(), + expectedError: true, + }, + { + name: "bytes without unit", + memorySize: NewMemorySizeValue("1048576"), // exactly 1MB + expectedMB: 1, + }, + { + name: "bytes without unit - rounds down", + memorySize: NewMemorySizeValue("1048575"), // 1 byte less than 1MB + expectedMB: 0, // rounds down to 0MB + }, + { + name: "bytes without unit - partial MB rounds down", + memorySize: NewMemorySizeValue("1500000"), // ~1.43MB + expectedMB: 1, // rounds down to 1MB + }, + { + name: "kilobytes", + memorySize: NewMemorySizeValue("1024k"), // exactly 1MB + expectedMB: 1, + }, + { + name: "kilobytes - partial MB rounds down", + memorySize: NewMemorySizeValue("1000k"), // ~976KB, rounds down to 0MB + expectedMB: 0, + }, + { + name: "kilobytes with B suffix", + memorySize: NewMemorySizeValue("1024kb"), // exactly 1MB + expectedMB: 1, + }, + { + name: "megabytes", + memorySize: NewMemorySizeValue("128m"), + expectedMB: 128, + }, + { + name: "megabytes with B suffix", + memorySize: NewMemorySizeValue("128mb"), + expectedMB: 128, + }, + { + name: "uppercase megabytes", + memorySize: NewMemorySizeValue("128MB"), + expectedMB: 128, + }, + { + name: "gigabytes", + memorySize: NewMemorySizeValue("2g"), + expectedMB: 2 * 1024, + }, + { + name: "gigabytes with B suffix", + memorySize: NewMemorySizeValue("2gb"), + expectedMB: 2 * 1024, + }, + { + name: "terabytes", + memorySize: NewMemorySizeValue("1t"), + expectedMB: 1024 * 1024, + }, + { + name: "terabytes with B suffix", + memorySize: NewMemorySizeValue("1tb"), + expectedMB: 1024 * 1024, + }, + { + name: "invalid format", + memorySize: NewMemorySizeValue("not a memory size"), + expectedError: true, + }, + { + name: "invalid number", + memorySize: NewMemorySizeValue("abcmb"), + expectedError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bytes, diags := tt.memorySize.ConvertToMB() + + if tt.expectedError { + require.True(t, diags.HasError()) + } else { + require.False(t, diags.HasError()) + require.Equal(t, tt.expectedMB, bytes) + } + }) + } +} diff --git a/provider/plugin_framework.go b/provider/plugin_framework.go index 14a0d9939..0685d4228 100644 --- a/provider/plugin_framework.go +++ b/provider/plugin_framework.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/index/indices" "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/anomaly_detection_job" "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed" + "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed_state" "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/job_state" "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/security/api_key" "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/security/role_mapping" @@ -128,5 +129,6 @@ func (p *Provider) Resources(ctx context.Context) []func() resource.Resource { anomaly_detection_job.NewAnomalyDetectionJobResource, security_detection_rule.NewSecurityDetectionRuleResource, job_state.NewMLJobStateResource, + datafeed_state.NewMLDatafeedStateResource, } }