Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jobs:
- name: Test
run: |
go generate ./api/pb/
go test -failfast -v -timeout=300s -p 1 -coverprofile=profile.cov ./cmd/... ./internal/...
go test -failfast -v -timeout=600s -p 1 -coverprofile=profile.cov ./cmd/... ./internal/...
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increased test timeouts to 10 minutes because Test_MaintainUniqueSources_DeleteOldPartitions/MaintainUniqueSources now takes more than 2 minutes alone, due to calls to PG_SLEEP(60) in MaintainUniqueSources().


- name: Coveralls
uses: coverallsapp/github-action@v2
Expand Down
94 changes: 4 additions & 90 deletions internal/sinks/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,97 +507,11 @@ func (pgw *PostgresWriter) DeleteOldPartitions() {
// in each metric table in admin.all_distinct_dbname_metrics.
// This is used to avoid listing the same source multiple times in Grafana dropdowns.
func (pgw *PostgresWriter) MaintainUniqueSources() {
sql := "SELECT admin.maintain_tables() WHERE pg_try_advisory_lock(1571543679778230000)"
logger := log.GetLogger(pgw.ctx)

sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
sqlDistinct := `
WITH RECURSIVE t(dbname) AS (
SELECT MIN(dbname) AS dbname FROM %s
UNION
SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t
)
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
sqlDroppedTables := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL($1)`
sqlAdd := `
INSERT INTO admin.all_distinct_dbname_metrics
SELECT u, $2 FROM (select unnest($1::text[]) as u) x
WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
RETURNING *`

var lock bool
logger.Infof("Trying to get admin.all_distinct_dbname_metrics maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
logger.Error("Getting admin.all_distinct_dbname_metrics maintainer advisory lock failed:", err)
return
}
if !lock {
logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
return
}

logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
if err != nil {
logger.Error(err)
return
}

for i, tableName := range allDistinctMetricTables {
foundDbnamesMap := make(map[string]bool)
foundDbnamesArr := make([]string, 0)

metricName := strings.Replace(tableName, "public.", "", 1)
// later usage in sqlDroppedTables requires no "public." prefix
allDistinctMetricTables[i] = metricName

logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
continue
}
for _, drDbname := range ret {
foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
}

// delete all that are not known and add all that are not there
for k := range foundDbnamesMap {
foundDbnamesArr = append(foundDbnamesArr, k)
}
if len(foundDbnamesArr) == 0 { // delete all entries for given metric
logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)

_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
if err != nil {
logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
}
continue
}
cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
}
cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
}
time.Sleep(time.Minute)
}

cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDroppedTables, allDistinctMetricTables)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for dropped metric tables: %s", err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Removed %d stale entries for dropped tables from all_distinct_dbname_metrics listing table", cmdTag.RowsAffected())
logger.Info("Starting maintenance...")
if _, err := pgw.sinkDb.Exec(pgw.ctx, sql); err != nil {
logger.Error("Maintaining 'admin.all_distinct_dbname_metrics' listing table failed:", err)
}
}

Expand Down
25 changes: 14 additions & 11 deletions internal/sinks/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,20 +668,26 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
pgw, err := NewPostgresWriter(ctx, connStr, opts)
r.NoError(err)

// Drop tables for all builtin metrics to avoid timeout in
// in `MaintainUniqueSources` test.
for _, metric := range metrics.GetDefaultBuiltInMetrics() {
_, err = pgw.sinkDb.Exec(pgw.ctx, fmt.Sprintf("DROP TABLE %s;", metric))
a.NoError(err)
}

t.Run("MaintainUniqueSources", func(_ *testing.T) {
// adds an entry to `admin.all_distinct_dbname_metrics`
// creates an empty metric table and adds
// an entry to `admin.all_distinct_dbname_metrics`
err = pgw.SyncMetric("test", "test_metric_1", AddOp)
r.NoError(err)

var numOfEntries int
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
a.NoError(err)
a.Equal(1, numOfEntries)

// manually call the maintenance routine
pgw.MaintainUniqueSources()

// entry should have been deleted, because it has no corresponding entries in `test_metric_1` table.
// entry should have been deleted, because there is now rows in `test_metric_1` table.
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
a.NoError(err)
a.Equal(0, numOfEntries)
Expand All @@ -696,19 +702,16 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
},
}
pgw.flush(message)

// manually call the maintenance routine
pgw.MaintainUniqueSources()

// entry should have been added, because there is a corresponding entry in `test_metric_1` table just written.
// an entry should have been added, because there is a corresponding entry in `test_metric_1` table just written.
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
a.NoError(err)
a.Equal(1, numOfEntries)

_, err = conn.Exec(ctx, "DROP TABLE test_metric_1;")
r.NoError(err)

// the corresponding entry should be deleted
// all entries should be deleted
pgw.MaintainUniqueSources()
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
a.NoError(err)
Expand All @@ -731,8 +734,8 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
_, err = conn.Exec(ctx,
fmt.Sprintf(
`CREATE TABLE subpartitions.test_metric_2_dbname_time
PARTITION OF subpartitions.test_metric_2_dbname
FOR VALUES FROM ('%s') TO ('%s')`,
PARTITION OF subpartitions.test_metric_2_dbname
FOR VALUES FROM ('%s') TO ('%s')`,
boundStart, boundEnd),
)
a.NoError(err)
Expand Down
64 changes: 64 additions & 0 deletions internal/sinks/sql/admin_functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,67 @@ BEGIN
RETURN i;
END;
$SQL$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION admin.maintain_tables()
RETURNS VOID
AS
$SQL$
DECLARE
rec record;
metric_name text;
existing_metrics text[];
BEGIN
FOR rec IN SELECT * FROM admin.get_top_level_metric_tables()
LOOP
IF POSITION('public.' IN rec.table_name) = 1 THEN
metric_name = SUBSTRING(rec.table_name FROM POSITION('public.' IN rec.table_name) + LENGTH('public.'));
ELSE
metric_name = rec.table_name;
END IF;

SELECT array_append(existing_metrics, metric_name) INTO existing_metrics;

EXECUTE FORMAT($$SELECT admin.update_listing_table(metric_name => '%s')$$, metric_name);
PERFORM PG_SLEEP(60);
END LOOP;

-- Delete entries for dropped tables
DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL(existing_metrics) OR existing_metrics IS NULL;
END;
$SQL$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION admin.update_listing_table(metric_name text)
RETURNS VOID
AS
$SQL$
BEGIN

EXECUTE FORMAT(
$$
CREATE TEMP TABLE distinct_dbnames AS
WITH RECURSIVE t(dbname) AS (
SELECT MIN(dbname) AS dbname FROM public.%I
UNION
SELECT (SELECT MIN(dbname) FROM public.%I WHERE dbname > t.dbname) FROM t
)
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1;
$$, metric_name, metric_name);

EXECUTE FORMAT(
$$
DELETE FROM admin.all_distinct_dbname_metrics
WHERE dbname NOT IN (SELECT * FROM distinct_dbnames)
AND metric = '%s';
$$, metric_name);

EXECUTE FORMAT(
$$
INSERT INTO admin.all_distinct_dbname_metrics
SELECT d.dbname, '%s' FROM distinct_dbnames AS d
WHERE NOT EXISTS (SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = d.dbname AND metric = '%s');
$$, metric_name, metric_name);

DROP TABLE distinct_dbnames;

END;
$SQL$ LANGUAGE plpgsql;
Loading