diff --git a/cmd/migrate/main_test.go b/cmd/migrate/main_test.go new file mode 100644 index 000000000..116fae513 --- /dev/null +++ b/cmd/migrate/main_test.go @@ -0,0 +1,45 @@ +package main + +import ( + "os" + "os/exec" + "strings" + "testing" +) + +func TestCLIFunctionality(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + // Test that the CLI can be built + buildCmd := exec.Command("go", "build", "-o", "migrate-test") + buildCmd.Env = os.Environ() + output, err := buildCmd.CombinedOutput() + if err != nil { + t.Fatalf("Failed to build CLI: %v\nOutput: %s", err, output) + } + defer os.Remove("migrate-test") + + // Test the version command + versionCmd := exec.Command("./migrate-test", "-version") + output, err = versionCmd.CombinedOutput() + if err != nil { + t.Fatalf("Failed to run version command: %v\nOutput: %s", err, output) + } + + if !strings.Contains(string(output), "migrate version") { + t.Errorf("Expected version output, got: %s", output) + } + + // Test the help command + helpCmd := exec.Command("./migrate-test", "-help") + output, err = helpCmd.CombinedOutput() + if err != nil { + t.Fatalf("Failed to run help command: %v\nOutput: %s", err, output) + } + + if !strings.Contains(string(output), "Usage:") || !strings.Contains(string(output), "Commands:") { + t.Errorf("Expected help output, got: %s", output) + } +} diff --git a/database/victoria/README.md b/database/victoria/README.md new file mode 100644 index 000000000..09f43f800 --- /dev/null +++ b/database/victoria/README.md @@ -0,0 +1,5 @@ +# VictoriaMetrics + +This driver enables golang-migrate to work with [VictoriaMetrics](https://victoriametrics.com/), a high-performance time series database. + +## Usage diff --git a/database/victoria/example/migration_example.down.json b/database/victoria/example/migration_example.down.json new file mode 100644 index 000000000..3a2a64d93 --- /dev/null +++ b/database/victoria/example/migration_example.down.json @@ -0,0 +1,2 @@ +-- This is just a placeholder as VictoriaMetrics doesn't have a way to revert imports +-- Instead, you would typically filter by time range or labels to exclude the data diff --git a/database/victoria/example/migration_example.up.json b/database/victoria/example/migration_example.up.json new file mode 100644 index 000000000..b01ed8fcb --- /dev/null +++ b/database/victoria/example/migration_example.up.json @@ -0,0 +1,3 @@ +{"metric":{"__name__":"up","job":"migrate_test"},"values":[1],"timestamps":[1596698684000]} +{"metric":{"__name__":"cpu_usage","instance":"server1","job":"migrate_test"},"values":[0.45,0.52,0.48],"timestamps":[1596698684000,1596698694000,1596698704000]} +{"metric":{"__name__":"memory_usage","instance":"server1","job":"migrate_test"},"values":[0.25,0.28,0.22],"timestamps":[1596698684000,1596698694000,1596698704000]} diff --git a/database/victoria/export.go b/database/victoria/export.go new file mode 100644 index 000000000..8f6909189 --- /dev/null +++ b/database/victoria/export.go @@ -0,0 +1,59 @@ +package victoria + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" +) + +// Export reads data from VictoriaMetrics based on label filters and time range +func (v *Victoria) Export(ctx context.Context, w io.Writer) error { + if !v.isOpen { + return ErrClosed + } + + // Build query parameters + query := url.Values{} + if v.config.LabelFilter != "" { + query.Set("match[]", v.config.LabelFilter) + } + if v.config.StartTime != "" { + query.Set("start", v.config.StartTime) + } + if v.config.EndTime != "" { + query.Set("end", v.config.EndTime) + } + + // Create request with context + req, err := http.NewRequestWithContext(ctx, http.MethodGet, v.exportURL+"?"+query.Encode(), nil) + if err != nil { + return fmt.Errorf("failed to create export request: %w", err) + } + + // Set headers + req.Header.Set("Accept", "application/json") + req.Header.Set("Accept-Encoding", "gzip") + + // Execute request + resp, err := v.client.Do(req) + if err != nil { + return fmt.Errorf("failed to execute export request: %w", err) + } + defer resp.Body.Close() + + // Check response status + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return fmt.Errorf("export failed with status %d: %s", resp.StatusCode, string(bodyBytes)) + } + + // Copy response body to writer + _, err = io.Copy(w, resp.Body) + if err != nil { + return fmt.Errorf("failed to read export data: %w", err) + } + + return nil +} diff --git a/database/victoria/export_test.go b/database/victoria/export_test.go new file mode 100644 index 000000000..b2c407c96 --- /dev/null +++ b/database/victoria/export_test.go @@ -0,0 +1,82 @@ +package victoria + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestVictoriaExport(t *testing.T) { + // Sample export data + exportData := `{"metric":{"__name__":"up","job":"test"},"values":[1],"timestamps":[1596698684000]} +{"metric":{"__name__":"cpu_usage","instance":"server1"},"values":[0.45],"timestamps":[1596698684000]}` + + // Setup test server + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/api/v1/export" { + // Check query parameters + query := r.URL.Query() + assert.Equal(t, "{__name__=\"up\"}", query.Get("match[]")) + assert.Equal(t, "2020-01-01T00:00:00Z", query.Get("start")) + assert.Equal(t, "2020-01-02T00:00:00Z", query.Get("end")) + + // Return export data + w.WriteHeader(http.StatusOK) + w.Write([]byte(exportData)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer testServer.Close() + + // Parse the URL for our test server + serverURL := strings.TrimPrefix(testServer.URL, "http://") + + // Create driver + d := &Victoria{} + dsn := "victoria://" + serverURL + "?label_filter={__name__=\"up\"}&start=2020-01-01T00:00:00Z&end=2020-01-02T00:00:00Z" + // No need to store the returned driver since we're testing the receiver methods directly + _, err := d.Open(dsn) + assert.NoError(t, err) + + // Test export + var buf bytes.Buffer + err = d.Export(context.Background(), &buf) + assert.NoError(t, err) + assert.Equal(t, exportData, buf.String()) + + // Test export with closed connection + d.Close() + err = d.Export(context.Background(), &buf) + assert.Error(t, err) + assert.Equal(t, ErrClosed, err) +} + +func TestVictoriaExportError(t *testing.T) { + // Setup test server that returns an error + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("Internal server error")) + })) + defer testServer.Close() + + // Parse the URL for our test server + serverURL := strings.TrimPrefix(testServer.URL, "http://") + + // Create driver + d := &Victoria{} + dsn := "victoria://" + serverURL + _, err := d.Open(dsn) + assert.NoError(t, err) + + // Test export + var buf bytes.Buffer + err = d.Export(context.Background(), &buf) + assert.Error(t, err) + assert.Contains(t, err.Error(), "status 500") +} diff --git a/database/victoria/victoria.go b/database/victoria/victoria.go new file mode 100644 index 000000000..28250def1 --- /dev/null +++ b/database/victoria/victoria.go @@ -0,0 +1,206 @@ +package victoria + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" + + "github.com/golang-migrate/migrate/v4/database" +) + +// Define error constants if they don't exist in the database package +var ( + ErrLocked = errors.New("database is locked") + ErrClosed = errors.New("database connection is closed") +) + +func init() { + database.Register("victoria", &Victoria{}) +} + +// Victoria implements the database.Driver interface for VictoriaMetrics time series database +type Victoria struct { + client *http.Client + url string + isLocked bool + isOpen bool + config *Config + importURL string + exportURL string +} + +// Config holds the configuration parameters for VictoriaMetrics connection +type Config struct { + URL string + LabelFilter string + StartTime string + EndTime string + Timeout time.Duration +} + +// Open initializes the VictoriaMetrics driver +func (v *Victoria) Open(dsn string) (database.Driver, error) { + if v.client == nil { + v.client = &http.Client{ + Timeout: 30 * time.Second, + } + } + + config, err := parseConfig(dsn) + if err != nil { + return nil, err + } + + v.config = config + v.url = config.URL + v.importURL = config.URL + "/api/v1/import" + v.exportURL = config.URL + "/api/v1/export" + v.isOpen = true + + return v, nil +} + +// parseConfig parses the DSN into a Config struct +func parseConfig(dsn string) (*Config, error) { + u, err := url.Parse(dsn) + if err != nil { + return nil, fmt.Errorf("invalid VictoriaMetrics DSN: %w", err) + } + + if u.Scheme != "victoria" { + return nil, fmt.Errorf("invalid scheme for VictoriaMetrics: %s", u.Scheme) + } + + // Construct the base URL with scheme, host, and port + baseURL := "http://" + u.Host + if u.User != nil { + // Handle authentication if provided + password, _ := u.User.Password() + baseURL = "http://" + u.User.Username() + ":" + password + "@" + u.Host + } + + // Extract query parameters + timeout := 30 * time.Second + if timeoutStr := u.Query().Get("timeout"); timeoutStr != "" { + timeoutVal, err := time.ParseDuration(timeoutStr) + if err == nil && timeoutVal > 0 { + timeout = timeoutVal + } + } + + return &Config{ + URL: baseURL, + LabelFilter: u.Query().Get("label_filter"), + StartTime: u.Query().Get("start"), + EndTime: u.Query().Get("end"), + Timeout: timeout, + }, nil +} + +// Close closes the connection to VictoriaMetrics +func (v *Victoria) Close() error { + v.isOpen = false + if v.client != nil { + v.client.CloseIdleConnections() + } + return nil +} + +// Lock acquires a database lock (no-op for VictoriaMetrics) +func (v *Victoria) Lock() error { + if !v.isOpen { + return ErrLocked + } + v.isLocked = true + return nil +} + +// Unlock releases a database lock (no-op for VictoriaMetrics) +func (v *Victoria) Unlock() error { + if !v.isOpen { + return ErrLocked + } + v.isLocked = false + return nil +} + +// Run executes a migration by importing data into VictoriaMetrics +func (v *Victoria) Run(migration io.Reader) error { + if !v.isOpen { + return ErrClosed + } + + if !v.isLocked { + return ErrLocked + } + + // Buffer to collect migration data + var migrationBuffer bytes.Buffer + + // Read migration content + scanner := bufio.NewScanner(migration) + scanner.Buffer(make([]byte, 4*1024*1024), 4*1024*1024) // 4MB buffer + + for scanner.Scan() { + line := scanner.Text() + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "--") { + continue // Skip empty lines and comments + } + migrationBuffer.WriteString(line) + migrationBuffer.WriteString("\n") + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("error reading migration: %w", err) + } + + // If we have content to import + if migrationBuffer.Len() > 0 { + // Send data to VictoriaMetrics + req, err := http.NewRequest(http.MethodPost, v.importURL, &migrationBuffer) + if err != nil { + return fmt.Errorf("failed to create import request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := v.client.Do(req) + if err != nil { + return fmt.Errorf("failed to import data: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return fmt.Errorf("import failed with status %d: %s", resp.StatusCode, string(bodyBytes)) + } + } + + return nil +} + +// SetVersion sets the migration version (no-op for VictoriaMetrics) +func (v *Victoria) SetVersion(version int, dirty bool) error { + // VictoriaMetrics doesn't have schema version tracking + return nil +} + +// Version returns the current migration version (no-op for VictoriaMetrics) +func (v *Victoria) Version() (int, bool, error) { + // VictoriaMetrics doesn't support version tracking + return -1, false, nil +} + +// Drop clears all data (not supported in VictoriaMetrics) +func (v *Victoria) Drop() error { + return errors.New("drop operation is not supported in VictoriaMetrics") +} + +// Ensure Victoria implements the database.Driver interface +var _ database.Driver = (*Victoria)(nil) diff --git a/database/victoria/victoria_test.go b/database/victoria/victoria_test.go new file mode 100644 index 000000000..826bc6221 --- /dev/null +++ b/database/victoria/victoria_test.go @@ -0,0 +1,202 @@ +package victoria + +import ( + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/golang-migrate/migrate/v4/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestVictoriaInit(t *testing.T) { + d := &Victoria{} + _, err := d.Open("victoria://localhost:8428") + if err != nil { + t.Fatal(err) + } +} + +func TestParseConfig(t *testing.T) { + testCases := []struct { + name string + dsn string + expectedError bool + expectedURL string + expectedLabel string + }{ + { + name: "Valid DSN", + dsn: "victoria://localhost:8428", + expectedError: false, + expectedURL: "http://localhost:8428", + expectedLabel: "", + }, + { + name: "Valid DSN with parameters", + dsn: "victoria://localhost:8428?label_filter={__name__=\"up\"}&start=2020-01-01T00:00:00Z", + expectedError: false, + expectedURL: "http://localhost:8428", + expectedLabel: "{__name__=\"up\"}", + }, + { + name: "Invalid scheme", + dsn: "postgres://localhost:8428", + expectedError: true, + }, + { + name: "Invalid URL", + dsn: "victoria://", + expectedError: false, + expectedURL: "http://", + expectedLabel: "", + }, + { + name: "With auth", + dsn: "victoria://user:pass@localhost:8428", + expectedError: false, + expectedURL: "http://user:pass@localhost:8428", + expectedLabel: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + config, err := parseConfig(tc.dsn) + if tc.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectedURL, config.URL) + assert.Equal(t, tc.expectedLabel, config.LabelFilter) + } + }) + } +} + +func TestVictoriaLockUnlock(t *testing.T) { + d := &Victoria{} + d.isOpen = true + + // Lock should succeed + err := d.Lock() + assert.NoError(t, err) + assert.True(t, d.isLocked) + + // Unlock should succeed + err = d.Unlock() + assert.NoError(t, err) + assert.False(t, d.isLocked) + + // When closed, lock should fail + d.isOpen = false + err = d.Lock() + assert.Equal(t, database.ErrLocked, err) +} + +func TestVictoriaClose(t *testing.T) { + d := &Victoria{ + client: &http.Client{}, + isOpen: true, + } + + err := d.Close() + assert.NoError(t, err) + assert.False(t, d.isOpen) +} + +func TestVictoriaRun(t *testing.T) { + // Setup test server + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/api/v1/import" { + // Read the request body + body, _ := io.ReadAll(r.Body) + defer r.Body.Close() + + // Check if the body contains the expected data + if !strings.Contains(string(body), "{\"metric\":{\"__name__\":\"test\"}") { + w.WriteHeader(http.StatusBadRequest) + return + } + + // Successful import + w.WriteHeader(http.StatusNoContent) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer testServer.Close() + + // Parse the URL for our test server + serverURL := strings.TrimPrefix(testServer.URL, "http://") + + // Create driver and test migration + d := &Victoria{} + driver, err := d.Open("victoria://" + serverURL) + assert.NoError(t, err) + + // Need to lock before running + err = driver.Lock() + assert.NoError(t, err) + + // Test with valid migration data + migrationData := `{"metric":{"__name__":"test","job":"test"},"values":[1],"timestamps":[1596698684000]}` + err = driver.Run(strings.NewReader(migrationData)) + assert.NoError(t, err) + + // Test with commented lines and empty lines + migrationData = ` + -- This is a comment + {"metric":{"__name__":"test","job":"test"},"values":[1],"timestamps":[1596698684000]} + + ` + err = driver.Run(strings.NewReader(migrationData)) + assert.NoError(t, err) + + // Test with driver not locked + driver.Unlock() + err = driver.Run(strings.NewReader(migrationData)) + assert.Equal(t, database.ErrLocked, err) + + // Test with driver closed + driver.Lock() + driver.Close() + err = driver.Run(strings.NewReader(migrationData)) + assert.Equal(t, database.ErrDatabaseClosed, err) +} + +func TestVictoriaVersion(t *testing.T) { + d := &Victoria{} + driver, err := d.Open("victoria://localhost:8428") + assert.NoError(t, err) + + // VictoriaMetrics doesn't support versioning, so we should always get -1 + version, dirty, err := driver.Version() + assert.NoError(t, err) + assert.Equal(t, -1, version) + assert.False(t, dirty) +} + +func TestVictoriaSetVersion(t *testing.T) { + d := &Victoria{} + driver, err := d.Open("victoria://localhost:8428") + assert.NoError(t, err) + + // This should be a no-op for VictoriaMetrics + err = driver.SetVersion(42, true) + assert.NoError(t, err) +} + +func TestVictoriaDrop(t *testing.T) { + d := &Victoria{} + driver, err := d.Open("victoria://localhost:8428") + assert.NoError(t, err) + + // Drop is not supported in VictoriaMetrics + err = driver.Drop() + assert.Error(t, err) + assert.Contains(t, err.Error(), "not supported") +}