@@ -173,7 +173,6 @@ static void drop_replication_slots(void);
173173static void cleanup_after_server_start (void );
174174static void cleanup_repl_origins (void );
175175static void cleanup_repl_slots (void );
176- static void start_cleanup_worker (MemoryContext task_cxt );
177176static Snapshot build_historic_snapshot (SnapBuild * builder );
178177static void process_task_internal (MemoryContext task_cxt );
179178
@@ -941,20 +940,18 @@ squeeze_worker_main(Datum main_arg)
941940 * replication slots and/or origins that other workers could not remove
942941 * due to server crash. Do that while holding the exclusive lock - that
943942 * also ensures that the other workers wait for the cleanup to finish
944- * instead of complaining about the existing slots / origins.
943+ * before they create new slots / origins, which we might then drop
944+ * accidentally.
945+ *
946+ * If no "standalone" squeeze worker performed the cleanup yet, the
947+ * scheduler must do it now because it'll also create replication slots /
948+ * origins. Those could be dropped by one of the new workers if that
949+ * worker was to perform the cleanup.
945950 */
946- if (!am_i_scheduler && ! workerData -> cleanup_done )
951+ if (!workerData -> cleanup_done )
947952 {
948953 cleanup_after_server_start ();
949954 workerData -> cleanup_done = true;
950-
951- /* Are we assigned a "cleanup-only" task? */
952- if (!OidIsValid (MyWorkerTask -> dbid ))
953- {
954- LWLockRelease (workerData -> lock );
955- ereport (DEBUG1 , (errmsg ("cleanup-only task completed" )));
956- goto done ;
957- }
958955 }
959956
960957 for (i = 0 ; i < workerData -> nslots ; i ++ )
@@ -1073,27 +1070,12 @@ scheduler_worker_loop(void)
10731070 long delay = 0L ;
10741071 int i ;
10751072 MemoryContext sched_cxt , old_cxt ;
1076- bool cleanup_done ;
10771073
10781074 /* Context for allocations which cannot be freed too early. */
10791075 sched_cxt = AllocSetContextCreate (TopMemoryContext ,
10801076 "pg_squeeze scheduler context" ,
10811077 ALLOCSET_DEFAULT_SIZES );
10821078
1083- /*
1084- * This lock does not eliminate all the possible race conditions: e.g. if
1085- * multiple schedulers (one per database) are launched at the same time,
1086- * multiple clean-up workers can be launched. Nevertheless, it makes sense
1087- * as the worker also uses this lock to examine and set the field.
1088- */
1089- LWLockAcquire (workerData -> lock , LW_EXCLUSIVE );
1090- cleanup_done = workerData -> cleanup_done ;
1091- LWLockRelease (workerData -> lock );
1092-
1093- /* Do we need to do cleanup first? */
1094- if (!cleanup_done )
1095- start_cleanup_worker (sched_cxt );
1096-
10971079 while (!got_sigterm )
10981080 {
10991081 StringInfoData query ;
@@ -1863,78 +1845,6 @@ cleanup_repl_slots(void)
18631845 }
18641846}
18651847
1866- static void
1867- start_cleanup_worker (MemoryContext task_cxt )
1868- {
1869- WorkerTask * task ;
1870- bool task_exists ;
1871- int task_idx ;
1872- NameData dummy_name ;
1873- SqueezeWorker * worker ;
1874- MemoryContext old_cxt ;
1875- bool registered ;
1876-
1877- /*
1878- * Create a worker to perform the initial cleanup.
1879- *
1880- * We must be sure that the cleanup has finished before we start to create
1881- * replication slots for other workers, otherwise the "cleanup worker"
1882- * could drop them too.
1883- */
1884- squeezeWorkerCount = 1 ;
1885- squeezeWorkers = (SqueezeWorker * ) MemoryContextAllocZero (task_cxt ,
1886- squeezeWorkerCount *
1887- sizeof (SqueezeWorker ));
1888- task = get_unused_task (InvalidOid , NULL , NULL , & task_idx , & task_exists );
1889- Assert (!task_exists );
1890- if (task == NULL )
1891- /*
1892- * This is unlikely to happen, but possible if too many "standalone"
1893- * workers have been started after our check of the 'cleanup_done'
1894- * flag.
1895- */
1896- ereport (ERROR ,
1897- (errmsg ("the task queue is currently full" )));
1898-
1899- /*
1900- * No specific information needed here. Setting dummy values explicitly
1901- * seem a good practice though.
1902- */
1903- NameStr (dummy_name )[0 ] = '\0' ;
1904- initialize_worker_task (task , -1 , & dummy_name , & dummy_name , NULL ,
1905- false, false, 0 );
1906-
1907- worker = squeezeWorkers ;
1908- StartTransactionCommand ();
1909- /*
1910- * The handle (and possibly other allocations) must survive the current
1911- * transaction.
1912- */
1913- old_cxt = MemoryContextSwitchTo (task_cxt );
1914- registered = start_worker_internal (false, task_idx , & worker -> handle );
1915- MemoryContextSwitchTo (old_cxt );
1916- if (!registered )
1917- {
1918- /*
1919- * The worker could not even get registered, so it won't set its
1920- * status to WTS_UNUSED. Make sure the task does not leak.
1921- */
1922- release_task (worker -> task );
1923-
1924- ereport (ERROR ,
1925- (errmsg ("squeeze worker could not start" )),
1926- (errhint ("consider increasing \"max_worker_processes\" or decreasing \"squeeze.workers_per_database\"" )));
1927-
1928- }
1929- CommitTransactionCommand ();
1930-
1931- /* Wait until the cleanup is done. */
1932- cleanup_workers_and_tasks (false);
1933-
1934- if (!workerData -> cleanup_done )
1935- ereport (ERROR , (errmsg ("failed to perform the initial cleanup" )));
1936- }
1937-
19381848/*
19391849 * Wrapper for SnapBuildInitialSnapshot().
19401850 *
0 commit comments