From 080b253019a6966e4a78f9a043dbcff2573fc914 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 7 Nov 2025 13:29:09 +0000 Subject: [PATCH 1/2] remove file plugin --- internal/file/file_manager_service.go | 14 +- internal/file/file_operator.go | 2 +- internal/file/file_plugin.go | 429 -------------- internal/file/file_plugin_test.go | 527 ----------------- internal/file/file_service_operator.go | 4 +- .../fake_file_manager_service_interface.go | 3 + .../filefakes/fake_file_operator_interface.go | 528 ++++++++++++++++++ internal/plugin/plugin_manager.go | 23 +- internal/plugin/plugin_manager_test.go | 5 - internal/resource/resource_plugin.go | 452 ++++++++++++--- internal/resource/resource_plugin_test.go | 32 +- 11 files changed, 944 insertions(+), 1075 deletions(-) delete mode 100644 internal/file/file_plugin.go delete mode 100644 internal/file/file_plugin_test.go create mode 100644 internal/file/filefakes/fake_file_operator_interface.go diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index 6c7ed4afb..ffb3daedf 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -27,10 +27,10 @@ import ( ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate -//counterfeiter:generate . fileOperator +//counterfeiter:generate . FileOperatorInterface //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate -//counterfeiter:generate . fileManagerServiceInterface +//counterfeiter:generate . FileManagerServiceInterface const ( maxAttempts = 5 @@ -40,7 +40,7 @@ const ( ) type ( - fileOperator interface { + FileOperatorInterface interface { Write(ctx context.Context, fileContent []byte, fileName, filePermissions string) error CreateFileDirectories(ctx context.Context, fileName string) error WriteChunkedFile( @@ -60,7 +60,7 @@ type ( MoveFile(ctx context.Context, sourcePath, destPath string) error } - fileServiceOperatorInterface interface { + FileServiceOperatorInterface interface { File(ctx context.Context, file *mpi.File, tempFilePath, expectedHash string) error UpdateOverview(ctx context.Context, instanceID string, filesToUpdate []*mpi.File, configPath string, iteration int) error @@ -76,7 +76,7 @@ type ( UpdateClient(ctx context.Context, fileServiceClient mpi.FileServiceClient) } - fileManagerServiceInterface interface { + FileManagerServiceInterface interface { ConfigApply(ctx context.Context, configApplyRequest *mpi.ConfigApplyRequest) (writeStatus model.WriteStatus, err error) Rollback(ctx context.Context, instanceID string) error @@ -98,8 +98,8 @@ type ( type FileManagerService struct { manifestLock *sync.RWMutex agentConfig *config.Config - fileOperator fileOperator - fileServiceOperator fileServiceOperatorInterface + fileOperator FileOperatorInterface + fileServiceOperator FileServiceOperatorInterface // map of files and the actions performed on them during config apply fileActions map[string]*model.FileCache // key is file path // map of the files currently on disk, used to determine the file action during config apply diff --git a/internal/file/file_operator.go b/internal/file/file_operator.go index c54482199..a7339ed83 100644 --- a/internal/file/file_operator.go +++ b/internal/file/file_operator.go @@ -30,7 +30,7 @@ type FileOperator struct { manifestLock *sync.RWMutex } -var _ fileOperator = (*FileOperator)(nil) +var _ FileOperatorInterface = (*FileOperator)(nil) // FileOperator only purpose is to write files, diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go deleted file mode 100644 index a9747b7b3..000000000 --- a/internal/file/file_plugin.go +++ /dev/null @@ -1,429 +0,0 @@ -// Copyright (c) F5, Inc. -// -// This source code is licensed under the Apache License, Version 2.0 license found in the -// LICENSE file in the root directory of this source tree. - -package file - -import ( - "context" - "log/slog" - "sync" - - "github.com/nginx/agent/v3/pkg/files" - "github.com/nginx/agent/v3/pkg/id" - - mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" - "github.com/nginx/agent/v3/internal/bus" - "github.com/nginx/agent/v3/internal/config" - "github.com/nginx/agent/v3/internal/grpc" - "github.com/nginx/agent/v3/internal/logger" - "github.com/nginx/agent/v3/internal/model" - "google.golang.org/protobuf/types/known/timestamppb" -) - -var _ bus.Plugin = (*FilePlugin)(nil) - -// The file plugin only writes, deletes and checks hashes of files -// the file plugin does not care about the instance type - -type FilePlugin struct { - manifestLock *sync.RWMutex - messagePipe bus.MessagePipeInterface - config *config.Config - conn grpc.GrpcConnectionInterface - fileManagerService fileManagerServiceInterface - serverType model.ServerType -} - -func NewFilePlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface, - serverType model.ServerType, manifestLock *sync.RWMutex, -) *FilePlugin { - return &FilePlugin{ - config: agentConfig, - conn: grpcConnection, - serverType: serverType, - manifestLock: manifestLock, - } -} - -func (fp *FilePlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error { - ctx = context.WithValue( - ctx, - logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, fp.serverType.String()), - ) - slog.DebugContext(ctx, "Starting file plugin") - - fp.messagePipe = messagePipe - fp.fileManagerService = NewFileManagerService(fp.conn.FileServiceClient(), fp.config, fp.manifestLock) - - return nil -} - -func (fp *FilePlugin) Close(ctx context.Context) error { - ctx = context.WithValue( - ctx, - logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, fp.serverType.String()), - ) - slog.InfoContext(ctx, "Closing file plugin") - - return fp.conn.Close(ctx) -} - -func (fp *FilePlugin) Info() *bus.Info { - name := "file" - if fp.serverType.String() == model.Auxiliary.String() { - name = "auxiliary-file" - } - - return &bus.Info{ - Name: name, - } -} - -func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) { - ctxWithMetadata := fp.config.NewContextWithLabels(ctx) - - if logger.ServerType(ctx) == "" { - ctxWithMetadata = context.WithValue( - ctxWithMetadata, - logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, fp.serverType.String()), - ) - } - - if logger.ServerType(ctxWithMetadata) == fp.serverType.String() { - switch msg.Topic { - case bus.ConnectionResetTopic: - fp.handleConnectionReset(ctxWithMetadata, msg) - case bus.ConnectionCreatedTopic: - slog.DebugContext(ctxWithMetadata, "File plugin received connection created message") - fp.fileManagerService.SetIsConnected(true) - case bus.NginxConfigUpdateTopic: - fp.handleNginxConfigUpdate(ctxWithMetadata, msg) - case bus.ConfigUploadRequestTopic: - fp.handleConfigUploadRequest(ctxWithMetadata, msg) - case bus.ConfigApplyRequestTopic: - fp.handleConfigApplyRequest(ctxWithMetadata, msg) - case bus.ConfigApplyCompleteTopic: - fp.handleConfigApplyComplete(ctxWithMetadata, msg) - case bus.ReloadSuccessfulTopic: - fp.handleReloadSuccess(ctxWithMetadata, msg) - case bus.ConfigApplyFailedTopic: - fp.handleConfigApplyFailedRequest(ctxWithMetadata, msg) - default: - slog.DebugContext(ctxWithMetadata, "File plugin received unknown topic", "topic", msg.Topic) - } - } -} - -func (fp *FilePlugin) Subscriptions() []string { - if fp.serverType == model.Auxiliary { - return []string{ - bus.ConnectionResetTopic, - bus.ConnectionCreatedTopic, - bus.NginxConfigUpdateTopic, - bus.ConfigUploadRequestTopic, - } - } - - return []string{ - bus.ConnectionResetTopic, - bus.ConnectionCreatedTopic, - bus.NginxConfigUpdateTopic, - bus.ConfigUploadRequestTopic, - bus.ConfigApplyRequestTopic, - bus.ConfigApplyFailedTopic, - bus.ReloadSuccessfulTopic, - bus.ConfigApplyCompleteTopic, - } -} - -func (fp *FilePlugin) enableWatchers(ctx context.Context, - configContext *model.NginxConfigContext, - instanceID string, -) { - enableWatcher := &model.EnableWatchers{ - ConfigContext: configContext, - InstanceID: instanceID, - } - - fp.messagePipe.Process(ctx, &bus.Message{ - Data: enableWatcher, - Topic: bus.EnableWatchersTopic, - }) -} - -func (fp *FilePlugin) handleConnectionReset(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received connection reset message") - if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok { - var reconnect bool - err := fp.conn.Close(ctx) - if err != nil { - slog.ErrorContext(ctx, "File plugin: unable to close connection", "error", err) - } - fp.conn = newConnection - - reconnect = fp.fileManagerService.IsConnected() - fp.fileManagerService.ResetClient(ctx, fp.conn.FileServiceClient()) - fp.fileManagerService.SetIsConnected(reconnect) - - slog.DebugContext(ctx, "File manager service client reset successfully") - } -} - -func (fp *FilePlugin) handleConfigApplyComplete(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received config apply complete message") - response, ok := msg.Data.(*mpi.DataPlaneResponse) - - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.DataPlaneResponse", "payload", msg.Data) - return - } - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response}) - fp.fileManagerService.ClearCache() - fp.enableWatchers(ctx, &model.NginxConfigContext{}, response.GetInstanceId()) -} - -func (fp *FilePlugin) handleReloadSuccess(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received reload success message", "data", msg.Data) - - successMessage, ok := msg.Data.(*model.ReloadSuccess) - - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ReloadSuccess", "payload", msg.Data) - return - } - - fp.fileManagerService.ClearCache() - fp.enableWatchers(ctx, successMessage.ConfigContext, successMessage.DataPlaneResponse.GetInstanceId()) - - if successMessage.ConfigContext.Files != nil { - slog.DebugContext(ctx, "Changes made during config apply, update files on disk") - updateError := fp.fileManagerService.UpdateCurrentFilesOnDisk( - ctx, - files.ConvertToMapOfFiles(successMessage.ConfigContext.Files), - true, - ) - if updateError != nil { - slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError) - } - } - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: successMessage.DataPlaneResponse}) -} - -func (fp *FilePlugin) handleConfigApplyFailedRequest(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received config failed message") - - data, ok := msg.Data.(*model.ConfigApplyMessage) - if data.InstanceID == "" || !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", - "payload", msg.Data) - fp.fileManagerService.ClearCache() - - return - } - - err := fp.fileManagerService.Rollback(ctx, data.InstanceID) - if err != nil { - rollbackResponse := fp.createDataPlaneResponse(data.CorrelationID, - mpi.CommandResponse_COMMAND_STATUS_ERROR, - "Rollback failed", data.InstanceID, err.Error()) - - applyResponse := fp.createDataPlaneResponse(data.CorrelationID, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Config apply failed, rollback failed", data.InstanceID, data.Error.Error()) - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse}) - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse}) - - return - } - - // Send RollbackWriteTopic with Correlation and Instance ID for use by resource plugin - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.RollbackWriteTopic, Data: data}) -} - -func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received config apply request message") - var response *mpi.DataPlaneResponse - correlationID := logger.CorrelationID(ctx) - - managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest", - "payload", msg.Data) - - return - } - - request, requestOk := managementPlaneRequest.GetRequest().(*mpi.ManagementPlaneRequest_ConfigApplyRequest) - if !requestOk { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest_ConfigApplyRequest", - "payload", msg.Data) - - return - } - - configApplyRequest := request.ConfigApplyRequest - instanceID := configApplyRequest.GetOverview().GetConfigVersion().GetInstanceId() - - writeStatus, err := fp.fileManagerService.ConfigApply(ctx, configApplyRequest) - - switch writeStatus { - case model.NoChange: - slog.DebugContext(ctx, "No changes required for config apply request") - dpResponse := fp.createDataPlaneResponse( - correlationID, - mpi.CommandResponse_COMMAND_STATUS_OK, - "Config apply successful, no files to change", - instanceID, - "", - ) - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: dpResponse}) - - return - case model.Error: - slog.ErrorContext( - ctx, - "Failed to apply config changes", - "instance_id", instanceID, - "error", err, - ) - response = fp.createDataPlaneResponse( - correlationID, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Config apply failed", - instanceID, - err.Error(), - ) - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: response}) - - return - case model.RollbackRequired: - slog.ErrorContext( - ctx, - "Failed to apply config changes, rolling back", - "instance_id", instanceID, - "error", err, - ) - - response = fp.createDataPlaneResponse( - correlationID, - mpi.CommandResponse_COMMAND_STATUS_ERROR, - "Config apply failed, rolling back config", - instanceID, - err.Error(), - ) - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response}) - - rollbackErr := fp.fileManagerService.Rollback( - ctx, - instanceID, - ) - if rollbackErr != nil { - rollbackResponse := fp.createDataPlaneResponse( - correlationID, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Config apply failed, rollback failed", - instanceID, - rollbackErr.Error()) - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: rollbackResponse}) - - return - } - - response = fp.createDataPlaneResponse( - correlationID, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Config apply failed, rollback successful", - instanceID, - err.Error()) - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: response}) - - return - case model.OK: - slog.DebugContext(ctx, "Changes required for config apply request") - // Send WriteConfigSuccessfulTopic with Correlation and Instance ID for use by resource plugin - data := &model.ConfigApplyMessage{ - CorrelationID: correlationID, - InstanceID: instanceID, - } - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.WriteConfigSuccessfulTopic, Data: data}) - } -} - -func (fp *FilePlugin) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received nginx config update message") - nginxConfigContext, ok := msg.Data.(*model.NginxConfigContext) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", "payload", msg.Data) - - return - } - - fp.fileManagerService.ConfigUpdate(ctx, nginxConfigContext) -} - -func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received config upload request message") - managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) - if !ok { - slog.ErrorContext( - ctx, - "Unable to cast message payload to *mpi.ManagementPlaneRequest", - "payload", msg.Data, - ) - - return - } - - configUploadRequest := managementPlaneRequest.GetConfigUploadRequest() - - correlationID := logger.CorrelationID(ctx) - - updatingFilesError := fp.fileManagerService.ConfigUpload(ctx, configUploadRequest) - - response := &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: correlationID, - Timestamp: timestamppb.Now(), - }, - CommandResponse: &mpi.CommandResponse{ - Status: mpi.CommandResponse_COMMAND_STATUS_OK, - Message: "Successfully updated all files", - }, - } - - if updatingFilesError != nil { - response.CommandResponse.Status = mpi.CommandResponse_COMMAND_STATUS_FAILURE - response.CommandResponse.Message = "Failed to update all files" - response.CommandResponse.Error = updatingFilesError.Error() - } - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response}) -} - -func (fp *FilePlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus, - message, instanceID, err string, -) *mpi.DataPlaneResponse { - return &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: correlationID, - Timestamp: timestamppb.Now(), - }, - CommandResponse: &mpi.CommandResponse{ - Status: status, - Message: message, - Error: err, - }, - InstanceId: instanceID, - } -} diff --git a/internal/file/file_plugin_test.go b/internal/file/file_plugin_test.go deleted file mode 100644 index 2955dacf3..000000000 --- a/internal/file/file_plugin_test.go +++ /dev/null @@ -1,527 +0,0 @@ -// Copyright (c) F5, Inc. -// -// This source code is licensed under the Apache License, Version 2.0 license found in the -// LICENSE file in the root directory of this source tree. - -package file - -import ( - "context" - "errors" - "os" - "sync" - "testing" - "time" - - "github.com/nginx/agent/v3/internal/bus/busfakes" - "google.golang.org/protobuf/types/known/timestamppb" - - mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" - "github.com/nginx/agent/v3/api/grpc/mpi/v1/v1fakes" - "github.com/nginx/agent/v3/internal/bus" - "github.com/nginx/agent/v3/internal/file/filefakes" - "github.com/nginx/agent/v3/internal/grpc/grpcfakes" - "github.com/nginx/agent/v3/internal/model" - "github.com/nginx/agent/v3/pkg/files" - "github.com/nginx/agent/v3/pkg/id" - "github.com/nginx/agent/v3/test/helpers" - "github.com/nginx/agent/v3/test/protos" - "github.com/nginx/agent/v3/test/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestFilePlugin_Info(t *testing.T) { - filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, - model.Command, &sync.RWMutex{}) - assert.Equal(t, "file", filePlugin.Info().Name) -} - -func TestFilePlugin_Close(t *testing.T) { - ctx := context.Background() - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) - filePlugin.Close(ctx) - - assert.Equal(t, 1, fakeGrpcConnection.CloseCallCount()) -} - -func TestFilePlugin_Subscriptions(t *testing.T) { - filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, - model.Command, &sync.RWMutex{}) - assert.Equal( - t, - []string{ - bus.ConnectionResetTopic, - bus.ConnectionCreatedTopic, - bus.NginxConfigUpdateTopic, - bus.ConfigUploadRequestTopic, - bus.ConfigApplyRequestTopic, - bus.ConfigApplyFailedTopic, - bus.ReloadSuccessfulTopic, - bus.ConfigApplyCompleteTopic, - }, - filePlugin.Subscriptions(), - ) - - readOnlyFilePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, - model.Auxiliary, &sync.RWMutex{}) - assert.Equal(t, []string{ - bus.ConnectionResetTopic, - bus.ConnectionCreatedTopic, - bus.NginxConfigUpdateTopic, - bus.ConfigUploadRequestTopic, - }, readOnlyFilePlugin.Subscriptions()) -} - -func TestFilePlugin_Process_NginxConfigUpdateTopic(t *testing.T) { - ctx := context.Background() - - fileMeta := protos.FileMeta("/etc/nginx/nginx/conf", "") - - message := &model.NginxConfigContext{ - Files: []*mpi.File{ - { - FileMeta: fileMeta, - }, - }, - } - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fakeFileServiceClient.UpdateOverviewReturns(&mpi.UpdateOverviewResponse{ - Overview: nil, - }, nil) - - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - messagePipe := busfakes.NewFakeMessagePipe() - - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConnectionCreatedTopic}) - filePlugin.Process(ctx, &bus.Message{Topic: bus.NginxConfigUpdateTopic, Data: message}) - - assert.Eventually( - t, - func() bool { return fakeFileServiceClient.UpdateOverviewCallCount() == 1 }, - 2*time.Second, - 10*time.Millisecond, - ) -} - -func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) { - ctx := context.Background() - tempDir := t.TempDir() - - filePath := tempDir + "/nginx.conf" - fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}") - fileHash := files.GenerateHash(fileContent) - - message := &mpi.ManagementPlaneRequest{ - Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ - ConfigApplyRequest: protos.CreateConfigApplyRequest(protos.FileOverview(filePath, fileHash)), - }, - } - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - agentConfig := types.AgentConfig() - agentConfig.AllowedDirectories = []string{tempDir} - - tests := []struct { - message *mpi.ManagementPlaneRequest - configApplyReturnsErr error - name string - configApplyStatus model.WriteStatus - }{ - { - name: "Test 1 - Success", - configApplyReturnsErr: nil, - configApplyStatus: model.OK, - message: message, - }, - { - name: "Test 2 - Fail, Rollback", - configApplyReturnsErr: errors.New("something went wrong"), - configApplyStatus: model.RollbackRequired, - message: message, - }, - { - name: "Test 3 - Fail, No Rollback", - configApplyReturnsErr: errors.New("something went wrong"), - configApplyStatus: model.Error, - message: message, - }, - { - name: "Test 4 - Fail to cast payload", - configApplyReturnsErr: errors.New("something went wrong"), - configApplyStatus: model.Error, - message: nil, - }, - { - name: "Test 5 - No changes needed", - configApplyReturnsErr: nil, - configApplyStatus: model.NoChange, - message: message, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - fakeFileManagerService := &filefakes.FakeFileManagerServiceInterface{} - fakeFileManagerService.ConfigApplyReturns(test.configApplyStatus, test.configApplyReturnsErr) - messagePipe := busfakes.NewFakeMessagePipe() - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) - err := filePlugin.Init(ctx, messagePipe) - filePlugin.fileManagerService = fakeFileManagerService - require.NoError(t, err) - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplyRequestTopic, Data: test.message}) - - messages := messagePipe.Messages() - - switch { - case test.configApplyStatus == model.OK: - assert.Equal(t, bus.WriteConfigSuccessfulTopic, messages[0].Topic) - assert.Len(t, messages, 1) - - _, ok := messages[0].Data.(*model.ConfigApplyMessage) - assert.True(t, ok) - case test.configApplyStatus == model.RollbackRequired: - assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic) - assert.Len(t, messages, 2) - dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal( - t, - mpi.CommandResponse_COMMAND_STATUS_ERROR, - dataPlaneResponse.GetCommandResponse().GetStatus(), - ) - assert.Equal(t, "Config apply failed, rolling back config", - dataPlaneResponse.GetCommandResponse().GetMessage()) - assert.Equal(t, test.configApplyReturnsErr.Error(), dataPlaneResponse.GetCommandResponse().GetError()) - dataPlaneResponse, ok = messages[1].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal(t, "Config apply failed, rollback successful", - dataPlaneResponse.GetCommandResponse().GetMessage()) - assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - dataPlaneResponse.GetCommandResponse().GetStatus()) - case test.configApplyStatus == model.NoChange: - assert.Len(t, messages, 1) - - response, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal(t, bus.ConfigApplyCompleteTopic, messages[0].Topic) - assert.Equal( - t, - mpi.CommandResponse_COMMAND_STATUS_OK, - response.GetCommandResponse().GetStatus(), - ) - case test.message == nil: - assert.Empty(t, messages) - default: - assert.Len(t, messages, 1) - dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal( - t, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - dataPlaneResponse.GetCommandResponse().GetStatus(), - ) - assert.Equal(t, "Config apply failed", dataPlaneResponse.GetCommandResponse().GetMessage()) - assert.Equal(t, test.configApplyReturnsErr.Error(), dataPlaneResponse.GetCommandResponse().GetError()) - } - }) - } -} - -func TestFilePlugin_Process_ConfigUploadRequestTopic(t *testing.T) { - ctx := context.Background() - - tempDir := os.TempDir() - testFile := helpers.CreateFileWithErrorCheck(t, tempDir, "nginx.conf") - defer helpers.RemoveFileWithErrorCheck(t, testFile.Name()) - fileMeta := protos.FileMeta(testFile.Name(), "") - - message := &mpi.ManagementPlaneRequest{ - Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{ - ConfigUploadRequest: &mpi.ConfigUploadRequest{ - Overview: &mpi.FileOverview{ - Files: []*mpi.File{ - { - FileMeta: fileMeta, - }, - { - FileMeta: fileMeta, - }, - }, - ConfigVersion: &mpi.ConfigVersion{ - InstanceId: "123", - Version: "f33ref3d32d3c32d3a", - }, - }, - }, - }, - } - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - messagePipe := busfakes.NewFakeMessagePipe() - - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConnectionCreatedTopic}) - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigUploadRequestTopic, Data: message}) - - assert.Eventually( - t, - func() bool { return fakeFileServiceClient.UpdateFileCallCount() == 2 }, - 2*time.Second, - 10*time.Millisecond, - ) - - messages := messagePipe.Messages() - assert.Len(t, messages, 1) - assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic) - - dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal( - t, - mpi.CommandResponse_COMMAND_STATUS_OK, - dataPlaneResponse.GetCommandResponse().GetStatus(), - ) -} - -func TestFilePlugin_Process_ConfigUploadRequestTopic_Failure(t *testing.T) { - ctx := context.Background() - - fileMeta := protos.FileMeta("/unknown/file.conf", "") - - message := &mpi.ManagementPlaneRequest{ - Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{ - ConfigUploadRequest: &mpi.ConfigUploadRequest{ - Overview: &mpi.FileOverview{ - Files: []*mpi.File{ - { - FileMeta: fileMeta, - }, - { - FileMeta: fileMeta, - }, - }, - ConfigVersion: protos.CreateConfigVersion(), - }, - }, - }, - } - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - messagePipe := busfakes.NewFakeMessagePipe() - - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConnectionCreatedTopic}) - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigUploadRequestTopic, Data: message}) - - assert.Eventually( - t, - func() bool { return len(messagePipe.Messages()) == 1 }, - 2*time.Second, - 10*time.Millisecond, - ) - - assert.Equal(t, 0, fakeFileServiceClient.UpdateFileCallCount()) - - messages := messagePipe.Messages() - assert.Len(t, messages, 1) - - assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic) - - dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal( - t, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - dataPlaneResponse.GetCommandResponse().GetStatus(), - ) -} - -func TestFilePlugin_Process_ConfigApplyFailedTopic(t *testing.T) { - ctx := context.Background() - instanceID := protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId() - - tests := []struct { - name string - rollbackReturns error - instanceID string - }{ - { - name: "Test 1 - Rollback Success", - rollbackReturns: nil, - instanceID: instanceID, - }, - { - name: "Test 2 - Rollback Fail", - rollbackReturns: errors.New("something went wrong"), - instanceID: instanceID, - }, - - { - name: "Test 3 - Fail to cast payload", - rollbackReturns: errors.New("something went wrong"), - instanceID: "", - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - mockFileManager := &filefakes.FakeFileManagerServiceInterface{} - mockFileManager.RollbackReturns(test.rollbackReturns) - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - - messagePipe := busfakes.NewFakeMessagePipe() - agentConfig := types.AgentConfig() - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) - - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - filePlugin.fileManagerService = mockFileManager - - data := &model.ConfigApplyMessage{ - CorrelationID: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - InstanceID: test.instanceID, - Error: errors.New("something went wrong with config apply"), - } - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplyFailedTopic, Data: data}) - - messages := messagePipe.Messages() - - switch { - case test.rollbackReturns == nil: - assert.Equal(t, bus.RollbackWriteTopic, messages[0].Topic) - assert.Len(t, messages, 1) - - case test.instanceID == "": - assert.Empty(t, messages) - default: - rollbackMessage, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal(t, "Rollback failed", rollbackMessage.GetCommandResponse().GetMessage()) - assert.Equal(t, test.rollbackReturns.Error(), rollbackMessage.GetCommandResponse().GetError()) - applyMessage, ok := messages[1].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal(t, "Config apply failed, rollback failed", - applyMessage.GetCommandResponse().GetMessage()) - assert.Equal(t, data.Error.Error(), applyMessage.GetCommandResponse().GetError()) - assert.Len(t, messages, 2) - } - }) - } -} - -func TestFilePlugin_Process_ConfigApplyReloadSuccessTopic(t *testing.T) { - ctx := context.Background() - instance := protos.NginxOssInstance([]string{}) - mockFileManager := &filefakes.FakeFileManagerServiceInterface{} - - messagePipe := busfakes.NewFakeMessagePipe() - agentConfig := types.AgentConfig() - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) - - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - filePlugin.fileManagerService = mockFileManager - - expectedResponse := &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - Timestamp: timestamppb.Now(), - }, - CommandResponse: &mpi.CommandResponse{ - Status: mpi.CommandResponse_COMMAND_STATUS_OK, - Message: "Config apply successful", - Error: "", - }, - InstanceId: instance.GetInstanceMeta().GetInstanceId(), - } - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ReloadSuccessfulTopic, Data: &model.ReloadSuccess{ - ConfigContext: &model.NginxConfigContext{}, - DataPlaneResponse: expectedResponse, - }}) - - messages := messagePipe.Messages() - - watchers, ok := messages[0].Data.(*model.EnableWatchers) - assert.True(t, ok) - assert.Equal(t, bus.EnableWatchersTopic, messages[0].Topic) - assert.Equal(t, &model.NginxConfigContext{}, watchers.ConfigContext) - assert.Equal(t, instance.GetInstanceMeta().GetInstanceId(), watchers.InstanceID) - - response, ok := messages[1].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal(t, bus.DataPlaneResponseTopic, messages[1].Topic) - - assert.Equal(t, expectedResponse.GetCommandResponse().GetStatus(), response.GetCommandResponse().GetStatus()) - assert.Equal(t, expectedResponse.GetCommandResponse().GetMessage(), response.GetCommandResponse().GetMessage()) - assert.Equal(t, expectedResponse.GetCommandResponse().GetError(), response.GetCommandResponse().GetError()) - assert.Equal(t, expectedResponse.GetMessageMeta().GetCorrelationId(), response.GetMessageMeta().GetCorrelationId()) - - assert.Equal(t, expectedResponse.GetInstanceId(), response.GetInstanceId()) -} - -func TestFilePlugin_Process_ConfigApplyCompleteTopic(t *testing.T) { - ctx := context.Background() - instance := protos.NginxOssInstance([]string{}) - mockFileManager := &filefakes.FakeFileManagerServiceInterface{} - - messagePipe := busfakes.NewFakeMessagePipe() - agentConfig := types.AgentConfig() - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) - - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - filePlugin.fileManagerService = mockFileManager - expectedResponse := &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - Timestamp: timestamppb.Now(), - }, - CommandResponse: &mpi.CommandResponse{ - Status: mpi.CommandResponse_COMMAND_STATUS_OK, - Message: "Config apply successful", - Error: "", - }, - InstanceId: instance.GetInstanceMeta().GetInstanceId(), - } - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: expectedResponse}) - - messages := messagePipe.Messages() - response, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - - assert.Equal(t, expectedResponse.GetCommandResponse().GetStatus(), response.GetCommandResponse().GetStatus()) - assert.Equal(t, expectedResponse.GetCommandResponse().GetMessage(), response.GetCommandResponse().GetMessage()) - assert.Equal(t, expectedResponse.GetCommandResponse().GetError(), response.GetCommandResponse().GetError()) - assert.Equal(t, expectedResponse.GetMessageMeta().GetCorrelationId(), response.GetMessageMeta().GetCorrelationId()) - - assert.Equal(t, expectedResponse.GetInstanceId(), response.GetInstanceId()) -} diff --git a/internal/file/file_service_operator.go b/internal/file/file_service_operator.go index 19211c600..87aae90c9 100644 --- a/internal/file/file_service_operator.go +++ b/internal/file/file_service_operator.go @@ -35,11 +35,11 @@ import ( type FileServiceOperator struct { fileServiceClient mpi.FileServiceClient agentConfig *config.Config - fileOperator fileOperator + fileOperator FileOperatorInterface isConnected *atomic.Bool } -var _ fileServiceOperatorInterface = (*FileServiceOperator)(nil) +var _ FileServiceOperatorInterface = (*FileServiceOperator)(nil) func NewFileServiceOperator(agentConfig *config.Config, fileServiceClient mpi.FileServiceClient, manifestLock *sync.RWMutex, diff --git a/internal/file/filefakes/fake_file_manager_service_interface.go b/internal/file/filefakes/fake_file_manager_service_interface.go index f2af670fe..10016297e 100644 --- a/internal/file/filefakes/fake_file_manager_service_interface.go +++ b/internal/file/filefakes/fake_file_manager_service_interface.go @@ -6,6 +6,7 @@ import ( "sync" v1 "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/internal/file" "github.com/nginx/agent/v3/internal/model" ) @@ -645,3 +646,5 @@ func (fake *FakeFileManagerServiceInterface) recordInvocation(key string, args [ } fake.invocations[key] = append(fake.invocations[key], args) } + +var _ file.FileManagerServiceInterface = new(FakeFileManagerServiceInterface) diff --git a/internal/file/filefakes/fake_file_operator_interface.go b/internal/file/filefakes/fake_file_operator_interface.go new file mode 100644 index 000000000..0b191b979 --- /dev/null +++ b/internal/file/filefakes/fake_file_operator_interface.go @@ -0,0 +1,528 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package filefakes + +import ( + "bufio" + "context" + "sync" + + v1 "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/internal/file" + "github.com/nginx/agent/v3/internal/model" + "google.golang.org/grpc" +) + +type FakeFileOperatorInterface struct { + CreateFileDirectoriesStub func(context.Context, string) error + createFileDirectoriesMutex sync.RWMutex + createFileDirectoriesArgsForCall []struct { + arg1 context.Context + arg2 string + } + createFileDirectoriesReturns struct { + result1 error + } + createFileDirectoriesReturnsOnCall map[int]struct { + result1 error + } + MoveFileStub func(context.Context, string, string) error + moveFileMutex sync.RWMutex + moveFileArgsForCall []struct { + arg1 context.Context + arg2 string + arg3 string + } + moveFileReturns struct { + result1 error + } + moveFileReturnsOnCall map[int]struct { + result1 error + } + ReadChunkStub func(context.Context, uint32, *bufio.Reader, uint32) (v1.FileDataChunk_Content, error) + readChunkMutex sync.RWMutex + readChunkArgsForCall []struct { + arg1 context.Context + arg2 uint32 + arg3 *bufio.Reader + arg4 uint32 + } + readChunkReturns struct { + result1 v1.FileDataChunk_Content + result2 error + } + readChunkReturnsOnCall map[int]struct { + result1 v1.FileDataChunk_Content + result2 error + } + WriteStub func(context.Context, []byte, string, string) error + writeMutex sync.RWMutex + writeArgsForCall []struct { + arg1 context.Context + arg2 []byte + arg3 string + arg4 string + } + writeReturns struct { + result1 error + } + writeReturnsOnCall map[int]struct { + result1 error + } + WriteChunkedFileStub func(context.Context, string, string, *v1.FileDataChunkHeader, grpc.ServerStreamingClient[v1.FileDataChunk]) error + writeChunkedFileMutex sync.RWMutex + writeChunkedFileArgsForCall []struct { + arg1 context.Context + arg2 string + arg3 string + arg4 *v1.FileDataChunkHeader + arg5 grpc.ServerStreamingClient[v1.FileDataChunk] + } + writeChunkedFileReturns struct { + result1 error + } + writeChunkedFileReturnsOnCall map[int]struct { + result1 error + } + WriteManifestFileStub func(context.Context, map[string]*model.ManifestFile, string, string) error + writeManifestFileMutex sync.RWMutex + writeManifestFileArgsForCall []struct { + arg1 context.Context + arg2 map[string]*model.ManifestFile + arg3 string + arg4 string + } + writeManifestFileReturns struct { + result1 error + } + writeManifestFileReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeFileOperatorInterface) CreateFileDirectories(arg1 context.Context, arg2 string) error { + fake.createFileDirectoriesMutex.Lock() + ret, specificReturn := fake.createFileDirectoriesReturnsOnCall[len(fake.createFileDirectoriesArgsForCall)] + fake.createFileDirectoriesArgsForCall = append(fake.createFileDirectoriesArgsForCall, struct { + arg1 context.Context + arg2 string + }{arg1, arg2}) + stub := fake.CreateFileDirectoriesStub + fakeReturns := fake.createFileDirectoriesReturns + fake.recordInvocation("CreateFileDirectories", []interface{}{arg1, arg2}) + fake.createFileDirectoriesMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeFileOperatorInterface) CreateFileDirectoriesCallCount() int { + fake.createFileDirectoriesMutex.RLock() + defer fake.createFileDirectoriesMutex.RUnlock() + return len(fake.createFileDirectoriesArgsForCall) +} + +func (fake *FakeFileOperatorInterface) CreateFileDirectoriesCalls(stub func(context.Context, string) error) { + fake.createFileDirectoriesMutex.Lock() + defer fake.createFileDirectoriesMutex.Unlock() + fake.CreateFileDirectoriesStub = stub +} + +func (fake *FakeFileOperatorInterface) CreateFileDirectoriesArgsForCall(i int) (context.Context, string) { + fake.createFileDirectoriesMutex.RLock() + defer fake.createFileDirectoriesMutex.RUnlock() + argsForCall := fake.createFileDirectoriesArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeFileOperatorInterface) CreateFileDirectoriesReturns(result1 error) { + fake.createFileDirectoriesMutex.Lock() + defer fake.createFileDirectoriesMutex.Unlock() + fake.CreateFileDirectoriesStub = nil + fake.createFileDirectoriesReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeFileOperatorInterface) CreateFileDirectoriesReturnsOnCall(i int, result1 error) { + fake.createFileDirectoriesMutex.Lock() + defer fake.createFileDirectoriesMutex.Unlock() + fake.CreateFileDirectoriesStub = nil + if fake.createFileDirectoriesReturnsOnCall == nil { + fake.createFileDirectoriesReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.createFileDirectoriesReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeFileOperatorInterface) MoveFile(arg1 context.Context, arg2 string, arg3 string) error { + fake.moveFileMutex.Lock() + ret, specificReturn := fake.moveFileReturnsOnCall[len(fake.moveFileArgsForCall)] + fake.moveFileArgsForCall = append(fake.moveFileArgsForCall, struct { + arg1 context.Context + arg2 string + arg3 string + }{arg1, arg2, arg3}) + stub := fake.MoveFileStub + fakeReturns := fake.moveFileReturns + fake.recordInvocation("MoveFile", []interface{}{arg1, arg2, arg3}) + fake.moveFileMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeFileOperatorInterface) MoveFileCallCount() int { + fake.moveFileMutex.RLock() + defer fake.moveFileMutex.RUnlock() + return len(fake.moveFileArgsForCall) +} + +func (fake *FakeFileOperatorInterface) MoveFileCalls(stub func(context.Context, string, string) error) { + fake.moveFileMutex.Lock() + defer fake.moveFileMutex.Unlock() + fake.MoveFileStub = stub +} + +func (fake *FakeFileOperatorInterface) MoveFileArgsForCall(i int) (context.Context, string, string) { + fake.moveFileMutex.RLock() + defer fake.moveFileMutex.RUnlock() + argsForCall := fake.moveFileArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeFileOperatorInterface) MoveFileReturns(result1 error) { + fake.moveFileMutex.Lock() + defer fake.moveFileMutex.Unlock() + fake.MoveFileStub = nil + fake.moveFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeFileOperatorInterface) MoveFileReturnsOnCall(i int, result1 error) { + fake.moveFileMutex.Lock() + defer fake.moveFileMutex.Unlock() + fake.MoveFileStub = nil + if fake.moveFileReturnsOnCall == nil { + fake.moveFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.moveFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeFileOperatorInterface) ReadChunk(arg1 context.Context, arg2 uint32, arg3 *bufio.Reader, arg4 uint32) (v1.FileDataChunk_Content, error) { + fake.readChunkMutex.Lock() + ret, specificReturn := fake.readChunkReturnsOnCall[len(fake.readChunkArgsForCall)] + fake.readChunkArgsForCall = append(fake.readChunkArgsForCall, struct { + arg1 context.Context + arg2 uint32 + arg3 *bufio.Reader + arg4 uint32 + }{arg1, arg2, arg3, arg4}) + stub := fake.ReadChunkStub + fakeReturns := fake.readChunkReturns + fake.recordInvocation("ReadChunk", []interface{}{arg1, arg2, arg3, arg4}) + fake.readChunkMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeFileOperatorInterface) ReadChunkCallCount() int { + fake.readChunkMutex.RLock() + defer fake.readChunkMutex.RUnlock() + return len(fake.readChunkArgsForCall) +} + +func (fake *FakeFileOperatorInterface) ReadChunkCalls(stub func(context.Context, uint32, *bufio.Reader, uint32) (v1.FileDataChunk_Content, error)) { + fake.readChunkMutex.Lock() + defer fake.readChunkMutex.Unlock() + fake.ReadChunkStub = stub +} + +func (fake *FakeFileOperatorInterface) ReadChunkArgsForCall(i int) (context.Context, uint32, *bufio.Reader, uint32) { + fake.readChunkMutex.RLock() + defer fake.readChunkMutex.RUnlock() + argsForCall := fake.readChunkArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + +func (fake *FakeFileOperatorInterface) ReadChunkReturns(result1 v1.FileDataChunk_Content, result2 error) { + fake.readChunkMutex.Lock() + defer fake.readChunkMutex.Unlock() + fake.ReadChunkStub = nil + fake.readChunkReturns = struct { + result1 v1.FileDataChunk_Content + result2 error + }{result1, result2} +} + +func (fake *FakeFileOperatorInterface) ReadChunkReturnsOnCall(i int, result1 v1.FileDataChunk_Content, result2 error) { + fake.readChunkMutex.Lock() + defer fake.readChunkMutex.Unlock() + fake.ReadChunkStub = nil + if fake.readChunkReturnsOnCall == nil { + fake.readChunkReturnsOnCall = make(map[int]struct { + result1 v1.FileDataChunk_Content + result2 error + }) + } + fake.readChunkReturnsOnCall[i] = struct { + result1 v1.FileDataChunk_Content + result2 error + }{result1, result2} +} + +func (fake *FakeFileOperatorInterface) Write(arg1 context.Context, arg2 []byte, arg3 string, arg4 string) error { + var arg2Copy []byte + if arg2 != nil { + arg2Copy = make([]byte, len(arg2)) + copy(arg2Copy, arg2) + } + fake.writeMutex.Lock() + ret, specificReturn := fake.writeReturnsOnCall[len(fake.writeArgsForCall)] + fake.writeArgsForCall = append(fake.writeArgsForCall, struct { + arg1 context.Context + arg2 []byte + arg3 string + arg4 string + }{arg1, arg2Copy, arg3, arg4}) + stub := fake.WriteStub + fakeReturns := fake.writeReturns + fake.recordInvocation("Write", []interface{}{arg1, arg2Copy, arg3, arg4}) + fake.writeMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeFileOperatorInterface) WriteCallCount() int { + fake.writeMutex.RLock() + defer fake.writeMutex.RUnlock() + return len(fake.writeArgsForCall) +} + +func (fake *FakeFileOperatorInterface) WriteCalls(stub func(context.Context, []byte, string, string) error) { + fake.writeMutex.Lock() + defer fake.writeMutex.Unlock() + fake.WriteStub = stub +} + +func (fake *FakeFileOperatorInterface) WriteArgsForCall(i int) (context.Context, []byte, string, string) { + fake.writeMutex.RLock() + defer fake.writeMutex.RUnlock() + argsForCall := fake.writeArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + +func (fake *FakeFileOperatorInterface) WriteReturns(result1 error) { + fake.writeMutex.Lock() + defer fake.writeMutex.Unlock() + fake.WriteStub = nil + fake.writeReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeFileOperatorInterface) WriteReturnsOnCall(i int, result1 error) { + fake.writeMutex.Lock() + defer fake.writeMutex.Unlock() + fake.WriteStub = nil + if fake.writeReturnsOnCall == nil { + fake.writeReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeFileOperatorInterface) WriteChunkedFile(arg1 context.Context, arg2 string, arg3 string, arg4 *v1.FileDataChunkHeader, arg5 grpc.ServerStreamingClient[v1.FileDataChunk]) error { + fake.writeChunkedFileMutex.Lock() + ret, specificReturn := fake.writeChunkedFileReturnsOnCall[len(fake.writeChunkedFileArgsForCall)] + fake.writeChunkedFileArgsForCall = append(fake.writeChunkedFileArgsForCall, struct { + arg1 context.Context + arg2 string + arg3 string + arg4 *v1.FileDataChunkHeader + arg5 grpc.ServerStreamingClient[v1.FileDataChunk] + }{arg1, arg2, arg3, arg4, arg5}) + stub := fake.WriteChunkedFileStub + fakeReturns := fake.writeChunkedFileReturns + fake.recordInvocation("WriteChunkedFile", []interface{}{arg1, arg2, arg3, arg4, arg5}) + fake.writeChunkedFileMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4, arg5) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeFileOperatorInterface) WriteChunkedFileCallCount() int { + fake.writeChunkedFileMutex.RLock() + defer fake.writeChunkedFileMutex.RUnlock() + return len(fake.writeChunkedFileArgsForCall) +} + +func (fake *FakeFileOperatorInterface) WriteChunkedFileCalls(stub func(context.Context, string, string, *v1.FileDataChunkHeader, grpc.ServerStreamingClient[v1.FileDataChunk]) error) { + fake.writeChunkedFileMutex.Lock() + defer fake.writeChunkedFileMutex.Unlock() + fake.WriteChunkedFileStub = stub +} + +func (fake *FakeFileOperatorInterface) WriteChunkedFileArgsForCall(i int) (context.Context, string, string, *v1.FileDataChunkHeader, grpc.ServerStreamingClient[v1.FileDataChunk]) { + fake.writeChunkedFileMutex.RLock() + defer fake.writeChunkedFileMutex.RUnlock() + argsForCall := fake.writeChunkedFileArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 +} + +func (fake *FakeFileOperatorInterface) WriteChunkedFileReturns(result1 error) { + fake.writeChunkedFileMutex.Lock() + defer fake.writeChunkedFileMutex.Unlock() + fake.WriteChunkedFileStub = nil + fake.writeChunkedFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeFileOperatorInterface) WriteChunkedFileReturnsOnCall(i int, result1 error) { + fake.writeChunkedFileMutex.Lock() + defer fake.writeChunkedFileMutex.Unlock() + fake.WriteChunkedFileStub = nil + if fake.writeChunkedFileReturnsOnCall == nil { + fake.writeChunkedFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeChunkedFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeFileOperatorInterface) WriteManifestFile(arg1 context.Context, arg2 map[string]*model.ManifestFile, arg3 string, arg4 string) error { + fake.writeManifestFileMutex.Lock() + ret, specificReturn := fake.writeManifestFileReturnsOnCall[len(fake.writeManifestFileArgsForCall)] + fake.writeManifestFileArgsForCall = append(fake.writeManifestFileArgsForCall, struct { + arg1 context.Context + arg2 map[string]*model.ManifestFile + arg3 string + arg4 string + }{arg1, arg2, arg3, arg4}) + stub := fake.WriteManifestFileStub + fakeReturns := fake.writeManifestFileReturns + fake.recordInvocation("WriteManifestFile", []interface{}{arg1, arg2, arg3, arg4}) + fake.writeManifestFileMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeFileOperatorInterface) WriteManifestFileCallCount() int { + fake.writeManifestFileMutex.RLock() + defer fake.writeManifestFileMutex.RUnlock() + return len(fake.writeManifestFileArgsForCall) +} + +func (fake *FakeFileOperatorInterface) WriteManifestFileCalls(stub func(context.Context, map[string]*model.ManifestFile, string, string) error) { + fake.writeManifestFileMutex.Lock() + defer fake.writeManifestFileMutex.Unlock() + fake.WriteManifestFileStub = stub +} + +func (fake *FakeFileOperatorInterface) WriteManifestFileArgsForCall(i int) (context.Context, map[string]*model.ManifestFile, string, string) { + fake.writeManifestFileMutex.RLock() + defer fake.writeManifestFileMutex.RUnlock() + argsForCall := fake.writeManifestFileArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + +func (fake *FakeFileOperatorInterface) WriteManifestFileReturns(result1 error) { + fake.writeManifestFileMutex.Lock() + defer fake.writeManifestFileMutex.Unlock() + fake.WriteManifestFileStub = nil + fake.writeManifestFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeFileOperatorInterface) WriteManifestFileReturnsOnCall(i int, result1 error) { + fake.writeManifestFileMutex.Lock() + defer fake.writeManifestFileMutex.Unlock() + fake.WriteManifestFileStub = nil + if fake.writeManifestFileReturnsOnCall == nil { + fake.writeManifestFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeManifestFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeFileOperatorInterface) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.createFileDirectoriesMutex.RLock() + defer fake.createFileDirectoriesMutex.RUnlock() + fake.moveFileMutex.RLock() + defer fake.moveFileMutex.RUnlock() + fake.readChunkMutex.RLock() + defer fake.readChunkMutex.RUnlock() + fake.writeMutex.RLock() + defer fake.writeMutex.RUnlock() + fake.writeChunkedFileMutex.RLock() + defer fake.writeChunkedFileMutex.RUnlock() + fake.writeManifestFileMutex.RLock() + defer fake.writeManifestFileMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeFileOperatorInterface) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ file.FileOperatorInterface = new(FakeFileOperatorInterface) diff --git a/internal/plugin/plugin_manager.go b/internal/plugin/plugin_manager.go index fde011d8b..19f079139 100644 --- a/internal/plugin/plugin_manager.go +++ b/internal/plugin/plugin_manager.go @@ -16,7 +16,6 @@ import ( "github.com/nginx/agent/v3/internal/collector" "github.com/nginx/agent/v3/internal/command" - "github.com/nginx/agent/v3/internal/file" "github.com/nginx/agent/v3/internal/grpc" "github.com/nginx/agent/v3/internal/resource" @@ -30,7 +29,7 @@ func LoadPlugins(ctx context.Context, agentConfig *config.Config) []bus.Plugin { manifestLock := &sync.RWMutex{} - plugins = addResourcePlugin(plugins, agentConfig) + // plugins = addResourcePlugin(plugins, agentConfig) plugins = addCommandAndFilePlugins(ctx, plugins, agentConfig, manifestLock) plugins = addAuxiliaryCommandAndFilePlugins(ctx, plugins, agentConfig, manifestLock) plugins = addCollectorPlugin(ctx, agentConfig, plugins) @@ -39,12 +38,12 @@ func LoadPlugins(ctx context.Context, agentConfig *config.Config) []bus.Plugin { return plugins } -func addResourcePlugin(plugins []bus.Plugin, agentConfig *config.Config) []bus.Plugin { - resourcePlugin := resource.NewResource(agentConfig) - plugins = append(plugins, resourcePlugin) - - return plugins -} +//func addResourcePlugin(plugins []bus.Plugin, agentConfig *config.Config) []bus.Plugin { +// resourcePlugin := resource.NewResource(agentConfig) +// plugins = append(plugins, resourcePlugin) +// +// return plugins +//} func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentConfig *config.Config, manifestLock *sync.RWMutex, @@ -56,8 +55,8 @@ func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentCo } else { commandPlugin := command.NewCommandPlugin(agentConfig, grpcConnection, model.Command) plugins = append(plugins, commandPlugin) - filePlugin := file.NewFilePlugin(agentConfig, grpcConnection, model.Command, manifestLock) - plugins = append(plugins, filePlugin) + resourcePlugin := resource.NewResource(agentConfig, grpcConnection, model.Command, manifestLock) + plugins = append(plugins, resourcePlugin) } } else { slog.InfoContext(ctx, "Agent is not connected to a management plane. "+ @@ -77,8 +76,8 @@ func addAuxiliaryCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin } else { auxCommandPlugin := command.NewCommandPlugin(agentConfig, auxGRPCConnection, model.Auxiliary) plugins = append(plugins, auxCommandPlugin) - readFilePlugin := file.NewFilePlugin(agentConfig, auxGRPCConnection, model.Auxiliary, manifestLock) - plugins = append(plugins, readFilePlugin) + resourceReadPlugin := resource.NewResource(agentConfig, auxGRPCConnection, model.Auxiliary, manifestLock) + plugins = append(plugins, resourceReadPlugin) } } else { slog.DebugContext(ctx, "Agent is not connected to an auxiliary management plane. "+ diff --git a/internal/plugin/plugin_manager_test.go b/internal/plugin/plugin_manager_test.go index 172cb1542..f38e520fd 100644 --- a/internal/plugin/plugin_manager_test.go +++ b/internal/plugin/plugin_manager_test.go @@ -13,7 +13,6 @@ import ( "github.com/nginx/agent/v3/internal/collector" "github.com/nginx/agent/v3/internal/command" - "github.com/nginx/agent/v3/internal/file" "github.com/nginx/agent/v3/internal/resource" "github.com/nginx/agent/v3/internal/bus" @@ -60,9 +59,7 @@ func TestLoadPlugins(t *testing.T) { expected: []bus.Plugin{ &resource.Resource{}, &command.CommandPlugin{}, - &file.FilePlugin{}, &command.CommandPlugin{}, - &file.FilePlugin{}, &watcher.Watcher{}, }, }, @@ -105,7 +102,6 @@ func TestLoadPlugins(t *testing.T) { expected: []bus.Plugin{ &resource.Resource{}, &command.CommandPlugin{}, - &file.FilePlugin{}, &watcher.Watcher{}, }, }, @@ -136,7 +132,6 @@ func TestLoadPlugins(t *testing.T) { expected: []bus.Plugin{ &resource.Resource{}, &command.CommandPlugin{}, - &file.FilePlugin{}, &collector.Collector{}, &watcher.Watcher{}, }, diff --git a/internal/resource/resource_plugin.go b/internal/resource/resource_plugin.go index 036e817b7..c11a3502e 100644 --- a/internal/resource/resource_plugin.go +++ b/internal/resource/resource_plugin.go @@ -10,12 +10,18 @@ import ( "errors" "fmt" "log/slog" + "sync" mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" "github.com/nginx/agent/v3/internal/config" response "github.com/nginx/agent/v3/internal/datasource/proto" + "github.com/nginx/agent/v3/internal/file" + "github.com/nginx/agent/v3/internal/grpc" "github.com/nginx/agent/v3/internal/logger" "github.com/nginx/agent/v3/internal/model" + "github.com/nginx/agent/v3/pkg/files" + "github.com/nginx/agent/v3/pkg/id" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/nginx/agent/v3/internal/bus" ) @@ -25,9 +31,13 @@ import ( // This is done in the resource plugin to make the file plugin usable for every type of instance. type Resource struct { - messagePipe bus.MessagePipeInterface - resourceService resourceServiceInterface - agentConfig *config.Config + manifestLock *sync.RWMutex + messagePipe bus.MessagePipeInterface + resourceService resourceServiceInterface + agentConfig *config.Config + conn grpc.GrpcConnectionInterface + fileManagerService file.FileManagerServiceInterface + serverType model.ServerType } type errResponse struct { @@ -44,97 +54,236 @@ type plusAPIErr struct { var _ bus.Plugin = (*Resource)(nil) -func NewResource(agentConfig *config.Config) *Resource { +func NewResource(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface, + serverType model.ServerType, manifestLock *sync.RWMutex, +) *Resource { return &Resource{ - agentConfig: agentConfig, + agentConfig: agentConfig, + conn: grpcConnection, + serverType: serverType, + manifestLock: manifestLock, } } func (r *Resource) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error { + ctx = context.WithValue( + ctx, + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, r.serverType.String()), + ) slog.DebugContext(ctx, "Starting resource plugin") r.messagePipe = messagePipe r.resourceService = NewResourceService(ctx, r.agentConfig) + r.fileManagerService = file.NewFileManagerService(r.conn.FileServiceClient(), r.agentConfig, r.manifestLock) return nil } -func (*Resource) Close(ctx context.Context) error { +func (r *Resource) Close(ctx context.Context) error { + ctx = context.WithValue( + ctx, + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, r.serverType.String()), + ) slog.InfoContext(ctx, "Closing resource plugin") - return nil + return r.conn.Close(ctx) } -func (*Resource) Info() *bus.Info { +func (r *Resource) Info() *bus.Info { + name := "resource" + if r.serverType.String() == model.Auxiliary.String() { + name = "auxiliary-file" + } return &bus.Info{ - Name: "resource", + Name: name, } } -// cyclomatic complexity 11 max is 10 - func (r *Resource) Process(ctx context.Context, msg *bus.Message) { + ctxWithMetadata := r.agentConfig.NewContextWithLabels(ctx) + + if logger.ServerType(ctx) == "" { + ctxWithMetadata = context.WithValue( + ctxWithMetadata, + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, r.serverType.String()), + ) + } + switch msg.Topic { case bus.AddInstancesTopic: - slog.DebugContext(ctx, "Resource plugin received add instances message") - instanceList, ok := msg.Data.([]*mpi.Instance) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data) - - return + r.handleAddedInstances(ctx, msg) + case bus.UpdatedInstancesTopic: + r.handleUpdatedInstances(ctx, msg) + case bus.DeletedInstancesTopic: + r.handleDeletedInstances(ctx, msg) + case bus.APIActionRequestTopic: + r.handleAPIActionRequest(ctx, msg) + case bus.ConnectionResetTopic: + if logger.ServerType(ctxWithMetadata) == r.serverType.String() { + r.handleConnectionReset(ctxWithMetadata, msg) + } + case bus.ConnectionCreatedTopic: + if logger.ServerType(ctxWithMetadata) == r.serverType.String() { + slog.DebugContext(ctxWithMetadata, "Resource plugin received connection created message") + r.fileManagerService.SetIsConnected(true) + } + case bus.NginxConfigUpdateTopic: + if logger.ServerType(ctxWithMetadata) == r.serverType.String() { + r.handleNginxConfigUpdate(ctxWithMetadata, msg) + } + case bus.ConfigUploadRequestTopic: + if logger.ServerType(ctxWithMetadata) == r.serverType.String() { + r.handleConfigUploadRequest(ctxWithMetadata, msg) + } + case bus.ConfigApplyRequestTopic: + if logger.ServerType(ctxWithMetadata) == r.serverType.String() { + r.handleConfigApplyRequest(ctxWithMetadata, msg) } + default: + slog.DebugContext(ctx, "Resource plugin received unknown topic", "topic", msg.Topic) + } +} - resource := r.resourceService.AddInstances(instanceList) +func (r *Resource) Subscriptions() []string { + subscriptions := []string{ + bus.AddInstancesTopic, + bus.UpdatedInstancesTopic, + bus.DeletedInstancesTopic, + bus.APIActionRequestTopic, + bus.ConnectionResetTopic, + bus.ConnectionCreatedTopic, + bus.NginxConfigUpdateTopic, + bus.ConfigUploadRequestTopic, + } - r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource}) + if r.serverType == model.Command { + subscriptions = append(subscriptions, bus.ConfigApplyRequestTopic) + } + + return subscriptions +} + +func (r *Resource) enableWatchers(ctx context.Context, configContext *model.NginxConfigContext, + instanceID string, +) { + enableWatcher := &model.EnableWatchers{ + ConfigContext: configContext, + InstanceID: instanceID, + } + + r.messagePipe.Process(ctx, &bus.Message{ + Data: enableWatcher, + Topic: bus.EnableWatchersTopic, + }) +} + +func (r *Resource) handleConfigUploadRequest(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Resource plugin received config upload request message") + managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) + if !ok { + slog.ErrorContext( + ctx, + "Unable to cast message payload to *mpi.ManagementPlaneRequest", + "payload", msg.Data, + ) return - case bus.UpdatedInstancesTopic: - slog.DebugContext(ctx, "Resource plugin received update instances message") - instanceList, ok := msg.Data.([]*mpi.Instance) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data) + } - return + configUploadRequest := managementPlaneRequest.GetConfigUploadRequest() + + correlationID := logger.CorrelationID(ctx) + + updatingFilesError := r.fileManagerService.ConfigUpload(ctx, configUploadRequest) + + dataPlaneResponse := &mpi.DataPlaneResponse{ + MessageMeta: &mpi.MessageMeta{ + MessageId: id.GenerateMessageID(), + CorrelationId: correlationID, + Timestamp: timestamppb.Now(), + }, + CommandResponse: &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: "Successfully updated all files", + }, + } + + if updatingFilesError != nil { + dataPlaneResponse.CommandResponse.Status = mpi.CommandResponse_COMMAND_STATUS_FAILURE + dataPlaneResponse.CommandResponse.Message = "Failed to update all files" + dataPlaneResponse.CommandResponse.Error = updatingFilesError.Error() + } + + r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dataPlaneResponse}) +} + +func (r *Resource) handleConnectionReset(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Resource plugin received connection reset message") + if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok { + var reconnect bool + err := r.conn.Close(ctx) + if err != nil { + slog.ErrorContext(ctx, "Resource plugin: unable to close connection", "error", err) } - resource := r.resourceService.UpdateInstances(ctx, instanceList) + r.conn = newConnection - r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource}) + reconnect = r.fileManagerService.IsConnected() + r.fileManagerService.ResetClient(ctx, r.conn.FileServiceClient()) + r.fileManagerService.SetIsConnected(reconnect) + + slog.DebugContext(ctx, "File manager service client reset successfully") + } +} + +func (r *Resource) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Resource plugin received nginx config update message") + nginxConfigContext, ok := msg.Data.(*model.NginxConfigContext) + if !ok { + slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", "payload", msg.Data) return + } - case bus.DeletedInstancesTopic: - slog.DebugContext(ctx, "Resource plugin received delete instances message") - instanceList, ok := msg.Data.([]*mpi.Instance) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data) + r.fileManagerService.ConfigUpdate(ctx, nginxConfigContext) +} - return - } - resource := r.resourceService.DeleteInstances(ctx, instanceList) +func (r *Resource) handleAddedInstances(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Resource plugin received add instances message") + instanceList, ok := msg.Data.([]*mpi.Instance) + if !ok { + slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data) - r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource}) + return + } + + resource := r.resourceService.AddInstances(instanceList) + + r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource}) +} + +func (r *Resource) handleUpdatedInstances(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Resource plugin received update instances message") + instanceList, ok := msg.Data.([]*mpi.Instance) + if !ok { + slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data) return - case bus.WriteConfigSuccessfulTopic: - r.handleWriteConfigSuccessful(ctx, msg) - case bus.RollbackWriteTopic: - r.handleRollbackWrite(ctx, msg) - case bus.APIActionRequestTopic: - r.handleAPIActionRequest(ctx, msg) - default: - slog.DebugContext(ctx, "Unknown topic", "topic", msg.Topic) } + resource := r.resourceService.UpdateInstances(ctx, instanceList) + + r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource}) } -func (*Resource) Subscriptions() []string { - return []string{ - bus.AddInstancesTopic, - bus.UpdatedInstancesTopic, - bus.DeletedInstancesTopic, - bus.WriteConfigSuccessfulTopic, - bus.RollbackWriteTopic, - bus.APIActionRequestTopic, +func (r *Resource) handleDeletedInstances(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Resource plugin received delete instances message") + instanceList, ok := msg.Data.([]*mpi.Instance) + if !ok { + slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data) + + return } + resource := r.resourceService.DeleteInstances(ctx, instanceList) + + r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource}) } func (r *Resource) handleAPIActionRequest(ctx context.Context, msg *bus.Message) { @@ -217,67 +366,206 @@ func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi } } -func (r *Resource) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "Resource plugin received write config successful message") - data, ok := msg.Data.(*model.ConfigApplyMessage) +func (r *Resource) handleConfigApplyRequest(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Resource plugin received config apply request message") + + var dataPlaneResponse *mpi.DataPlaneResponse + correlationID := logger.CorrelationID(ctx) + + managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", "payload", msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest", + "payload", msg.Data) return } - configContext, err := r.resourceService.ApplyConfig(ctx, data.InstanceID) + + request, requestOk := managementPlaneRequest.GetRequest().(*mpi.ManagementPlaneRequest_ConfigApplyRequest) + if !requestOk { + slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest_ConfigApplyRequest", + "payload", msg.Data) + + return + } + + configApplyRequest := request.ConfigApplyRequest + instanceID := configApplyRequest.GetOverview().GetConfigVersion().GetInstanceId() + + writeStatus, err := r.fileManagerService.ConfigApply(ctx, configApplyRequest) + + switch writeStatus { + case model.NoChange: + slog.DebugContext(ctx, "No changes required for config apply request") + dataPlaneResponse := response.CreateDataPlaneResponse(correlationID, + mpi.CommandResponse_COMMAND_STATUS_OK, + "Config apply successful, no files to change", + instanceID, + "", + ) + r.completeConfigApply(ctx, dataPlaneResponse) + case model.Error: + slog.ErrorContext( + ctx, + "Failed to apply config changes", + "instance_id", instanceID, + "error", err, + ) + dataPlaneResponse = response.CreateDataPlaneResponse( + correlationID, + mpi.CommandResponse_COMMAND_STATUS_FAILURE, + "Config apply failed", + instanceID, + err.Error(), + ) + + r.completeConfigApply(ctx, dataPlaneResponse) + case model.RollbackRequired: + slog.ErrorContext( + ctx, + "Failed to apply config changes, rolling back", + "instance_id", instanceID, + "error", err, + ) + + dataPlaneResponse = response.CreateDataPlaneResponse( + correlationID, + mpi.CommandResponse_COMMAND_STATUS_ERROR, + "Config apply failed, rolling back config", + instanceID, + err.Error(), + ) + r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dataPlaneResponse}) + + rollbackErr := r.fileManagerService.Rollback( + ctx, + instanceID, + ) + if rollbackErr != nil { + rollbackResponse := response.CreateDataPlaneResponse( + correlationID, + mpi.CommandResponse_COMMAND_STATUS_FAILURE, + "Config apply failed, rollback failed", + instanceID, + rollbackErr.Error()) + + r.completeConfigApply(ctx, rollbackResponse) + return + } + + dataPlaneResponse = response.CreateDataPlaneResponse( + correlationID, + mpi.CommandResponse_COMMAND_STATUS_FAILURE, + "Config apply failed, rollback successful", + instanceID, + err.Error()) + + r.completeConfigApply(ctx, dataPlaneResponse) + case model.OK: + slog.DebugContext(ctx, "Changes required for config apply request") + r.applyConfig(ctx, correlationID, instanceID) + + } +} + +func (r *Resource) completeConfigApply(ctx context.Context, dpResponse *mpi.DataPlaneResponse) { + r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse}) + r.fileManagerService.ClearCache() + r.enableWatchers(ctx, &model.NginxConfigContext{}, dpResponse.GetInstanceId()) +} + +func (r *Resource) applyConfig(ctx context.Context, correlationID, instanceID string) { + slog.DebugContext(ctx, "Resource plugin received write config successful message") + + configContext, err := r.resourceService.ApplyConfig(ctx, instanceID) if err != nil { - data.Error = err slog.ErrorContext(ctx, "errors found during config apply, "+ "sending error status, rolling back config", "err", err) - dpResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_ERROR, - "Config apply failed, rolling back config", data.InstanceID, err.Error()) + dpResponse := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_ERROR, + "Config apply failed, rolling back config", instanceID, err.Error()) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse}) + + r.failedConfigApply(ctx, correlationID, instanceID, err) + + return + } + + dpResponse := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, + "Config apply successful", instanceID, "") + + r.reloadSuccessful(ctx, configContext, dpResponse) +} - r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyFailedTopic, Data: data}) +func (r *Resource) failedConfigApply(ctx context.Context, correlationID, instanceID string, applyErr error) { + if instanceID == "" { + r.fileManagerService.ClearCache() return } - dpResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_OK, - "Config apply successful", data.InstanceID, "") + err := r.fileManagerService.Rollback(ctx, instanceID) + if err != nil { + rollbackResponse := response.CreateDataPlaneResponse(correlationID, + mpi.CommandResponse_COMMAND_STATUS_ERROR, + "Rollback failed", instanceID, err.Error()) + + applyResponse := response.CreateDataPlaneResponse(correlationID, + mpi.CommandResponse_COMMAND_STATUS_FAILURE, + "Config apply failed, rollback failed", instanceID, applyErr.Error()) + + r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse}) + + r.completeConfigApply(ctx, applyResponse) - successMessage := &model.ReloadSuccess{ - ConfigContext: configContext, - DataPlaneResponse: dpResponse, + return } - r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ReloadSuccessfulTopic, Data: successMessage}) + r.handleRollbackWrite(ctx, correlationID, instanceID, applyErr) } -func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) { +func (r *Resource) handleRollbackWrite(ctx context.Context, correlationID, instanceID string, applyErr error) { slog.DebugContext(ctx, "Resource plugin received rollback write message") - data, ok := msg.Data.(*model.ConfigApplyMessage) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", "payload", msg.Data) - return - } - _, err := r.resourceService.ApplyConfig(ctx, data.InstanceID) + _, err := r.resourceService.ApplyConfig(ctx, instanceID) if err != nil { slog.ErrorContext(ctx, "errors found during rollback, sending failure status", "err", err) - rollbackResponse := response.CreateDataPlaneResponse(data.CorrelationID, - mpi.CommandResponse_COMMAND_STATUS_ERROR, "Rollback failed", data.InstanceID, err.Error()) + rollbackResponse := response.CreateDataPlaneResponse(correlationID, + mpi.CommandResponse_COMMAND_STATUS_ERROR, "Rollback failed", instanceID, err.Error()) - applyResponse := response.CreateDataPlaneResponse(data.CorrelationID, + applyResponse := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, "Config apply failed, rollback failed", - data.InstanceID, data.Error.Error()) + instanceID, applyErr.Error()) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse}) - r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse}) + + r.completeConfigApply(ctx, applyResponse) return } - applyResponse := response.CreateDataPlaneResponse(data.CorrelationID, + applyResponse := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Config apply failed, rollback successful", data.InstanceID, data.Error.Error()) + "Config apply failed, rollback successful", instanceID, applyErr.Error()) - r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse}) + r.completeConfigApply(ctx, applyResponse) +} + +func (r *Resource) reloadSuccessful(ctx context.Context, + configContext *model.NginxConfigContext, dpResponse *mpi.DataPlaneResponse, +) { + r.fileManagerService.ClearCache() + r.enableWatchers(ctx, configContext, dpResponse.GetInstanceId()) + + if configContext.Files != nil { + slog.DebugContext(ctx, "Changes made during config apply, update files on disk") + updateError := r.fileManagerService.UpdateCurrentFilesOnDisk( + ctx, + files.ConvertToMapOfFiles(configContext.Files), + true, + ) + if updateError != nil { + slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError) + } + } + r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse}) } diff --git a/internal/resource/resource_plugin_test.go b/internal/resource/resource_plugin_test.go index 0a7165b17..7dd5f0c73 100644 --- a/internal/resource/resource_plugin_test.go +++ b/internal/resource/resource_plugin_test.go @@ -11,8 +11,10 @@ import ( "encoding/json" "errors" "sort" + "sync" "testing" + "github.com/nginx/agent/v3/internal/grpc/grpcfakes" "github.com/nginx/agent/v3/test/stub" "google.golang.org/protobuf/types/known/structpb" @@ -106,7 +108,8 @@ func TestResource_Process(t *testing.T) { fakeResourceService.DeleteInstancesReturns(test.resource) messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewResource(types.AgentConfig()) + resourcePlugin := NewResource(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) resourcePlugin.resourceService = fakeResourceService err := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) @@ -165,7 +168,8 @@ func TestResource_Process_Apply(t *testing.T) { fakeResourceService.ApplyConfigReturns(&model.NginxConfigContext{}, test.applyErr) messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewResource(types.AgentConfig()) + resourcePlugin := NewResource(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) resourcePlugin.resourceService = fakeResourceService err := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) @@ -369,7 +373,8 @@ func TestResource_Process_APIAction_UpdateHTTPUpstreams(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewResource(types.AgentConfig()) + resourcePlugin := NewResource(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) resourcePlugin.resourceService = fakeResourceService err := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) @@ -472,7 +477,8 @@ func TestResource_Process_APIAction_UpdateStreamServers(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewResource(types.AgentConfig()) + resourcePlugin := NewResource(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) resourcePlugin.resourceService = fakeResourceService err := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) @@ -614,7 +620,8 @@ func TestResource_Process_APIAction_GetStreamUpstreams(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewResource(types.AgentConfig()) + resourcePlugin := NewResource(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) resourcePlugin.resourceService = fakeResourceService err := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) @@ -803,7 +810,8 @@ func TestResource_Process_Rollback(t *testing.T) { fakeResourceService.ApplyConfigReturns(&model.NginxConfigContext{}, test.rollbackErr) messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewResource(types.AgentConfig()) + resourcePlugin := NewResource(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) resourcePlugin.resourceService = fakeResourceService err := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) @@ -836,7 +844,8 @@ func TestResource_Process_Rollback(t *testing.T) { } func TestResource_Subscriptions(t *testing.T) { - resourcePlugin := NewResource(types.AgentConfig()) + resourcePlugin := NewResource(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) assert.Equal(t, []string{ bus.AddInstancesTopic, @@ -850,7 +859,8 @@ func TestResource_Subscriptions(t *testing.T) { } func TestResource_Info(t *testing.T) { - resourcePlugin := NewResource(types.AgentConfig()) + resourcePlugin := NewResource(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) assert.Equal(t, &bus.Info{Name: "resource"}, resourcePlugin.Info()) } @@ -861,7 +871,8 @@ func TestResource_Init(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() messagePipe.RunWithoutInit(ctx) - resourcePlugin := NewResource(types.AgentConfig()) + resourcePlugin := NewResource(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) resourcePlugin.resourceService = &resourceService err := resourcePlugin.Init(ctx, messagePipe) require.NoError(t, err) @@ -884,7 +895,8 @@ func runResourceTestHelper(t *testing.T, ctx context.Context, testName string, g } messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewResource(types.AgentConfig()) + resourcePlugin := NewResource(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) resourcePlugin.resourceService = fakeResourceService registerErr := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) From a28e5fefd0e94d2da54a69e1adc51b72e5e68267 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 7 Nov 2025 13:32:24 +0000 Subject: [PATCH 2/2] remove file plugin --- lefthook.yml | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/lefthook.yml b/lefthook.yml index 0888c9e0f..8754b6dd1 100644 --- a/lefthook.yml +++ b/lefthook.yml @@ -1,15 +1,15 @@ -# see https://github.com/evilmartians/lefthook for references -pre-push: - follow: true - piped: true # Stop if one of the steps fail - commands: - 1_generate: - run: make generate - 2_lint: - run: make lint - 3_format: - run: make format - 4_check_mod_change: - run: go mod tidy - 5_check_local_changes: - run: make no-local-changes +## see https://github.com/evilmartians/lefthook for references +#pre-push: +# follow: true +# piped: true # Stop if one of the steps fail +# commands: +# 1_generate: +# run: make generate +# 2_lint: +# run: make lint +# 3_format: +# run: make format +# 4_check_mod_change: +# run: go mod tidy +# 5_check_local_changes: +# run: make no-local-changes