11import logging
22
33from fabric .api import hide , settings , local
4- from multiprocessing import Queue
4+ from math import floor
5+ from multiprocessing import Queue , cpu_count
56from time import sleep
67
78
@@ -22,6 +23,7 @@ def __init__(self, secondaries, base_dir, binary, dump_gzip=False, user=None, pa
2223 self .verbose = verbose
2324
2425 self .config_replset = False
26+ self .cpu_count = cpu_count ()
2527 self .response_queue = Queue ()
2628 self .threads = []
2729 self ._summary = {}
@@ -65,6 +67,18 @@ def wait(self):
6567 raise Exception , "Not all mongodump threads completed successfully!" , None
6668
6769 def run (self ):
70+ # decide how many parallel dump workers to use based on cpu count vs # of shards (if 3.2+), 8 max workers max to protect the db
71+ self .threads_per_dump = 0
72+ self .threads_per_dump_max = 8
73+ if tuple (self .version .split ("." )) >= tuple ("3.2.0" .split ("." )):
74+ self .threads_per_dump = 1
75+ if self .cpu_count > len (self .secondaries ):
76+ self .threads_per_dump = int (floor (self .cpu_count / len (self .secondaries )))
77+ if self .threads_per_dump > self .threads_per_dump_max :
78+ self .threads_per_dump = self .threads_per_dump_max
79+ else :
80+ logging .warn ("Threading unsupported by mongodump version %s. Use mongodump 3.2.0 or greater to enable per-dump threading." % self .version )
81+
6882 # backup a secondary from each shard:
6983 for shard in self .secondaries :
7084 secondary = self .secondaries [shard ]
@@ -77,6 +91,7 @@ def run(self):
7791 self .authdb ,
7892 self .base_dir ,
7993 self .binary ,
94+ self .threads_per_dump ,
8095 self .dump_gzip ,
8196 self .verbose
8297 )
@@ -87,7 +102,7 @@ def run(self):
87102
88103 # start all threads and wait
89104 logging .info (
90- "Starting backups in threads using mongodump %s (inline gzip: %s)" % (self .version , str (self .dump_gzip )))
105+ "Starting backups using mongodump %s (inline gzip: %s, threads per dump: %i )" % (self .version , str (self .dump_gzip ), self . threads_per_dump ))
91106 for thread in self .threads :
92107 thread .start ()
93108 self .wait ()
@@ -107,6 +122,7 @@ def run(self):
107122 self .authdb ,
108123 self .base_dir ,
109124 self .binary ,
125+ self .threads_per_dump ,
110126 self .dump_gzip ,
111127 self .verbose
112128 )]
0 commit comments