Skip to content

Commit d1daf23

Browse files
author
Andrew Sawyers
committed
- multiprocessing SimpleQueue takes a context as a required arg
lookup the context and pass it into class initalization of the SimpleQueue - for simplification, follow example patterns of importing multiprocessing as mp - multiprocessing.Queue can cause surprising results which are avoided using a queue manager Manager(). See https://docs.python.org/3.7/library/multiprocessing.html - optparse passes in self to the check_existing_file, so set as _ - optparse calls the callback with many more args then originally setup for. - fix env lookup variables which were missed when rebirthdb merged back with rethinkdb
1 parent 66edbbb commit d1daf23

File tree

3 files changed

+46
-39
lines changed

3 files changed

+46
-39
lines changed

rethinkdb/_export.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import ctypes
2424
import datetime
2525
import json
26-
import multiprocessing
26+
import multiprocessing as mp
2727
import numbers
2828
import optparse
2929
import os
@@ -259,11 +259,12 @@ def export_table(db, table, directory, options, error_queue, progress_info, sind
259259
with sindex_counter.get_lock():
260260
sindex_counter.value += len(table_info["indexes"])
261261
# -- start the writer
262-
task_queue = SimpleQueue()
262+
ctx = mp.get_context(mp.get_start_method())
263+
task_queue = SimpleQueue(ctx=ctx)
263264
writer = None
264265
if options.format == "json":
265266
filename = directory + "/%s/%s.json" % (db, table)
266-
writer = multiprocessing.Process(
267+
writer = mp.Process(
267268
target=json_writer,
268269
args=(
269270
filename,
@@ -273,7 +274,7 @@ def export_table(db, table, directory, options, error_queue, progress_info, sind
273274
options.format))
274275
elif options.format == "csv":
275276
filename = directory + "/%s/%s.csv" % (db, table)
276-
writer = multiprocessing.Process(
277+
writer = mp.Process(
277278
target=csv_writer,
278279
args=(
279280
filename,
@@ -283,7 +284,7 @@ def export_table(db, table, directory, options, error_queue, progress_info, sind
283284
error_queue))
284285
elif options.format == "ndjson":
285286
filename = directory + "/%s/%s.ndjson" % (db, table)
286-
writer = multiprocessing.Process(
287+
writer = mp.Process(
287288
target=json_writer,
288289
args=(
289290
filename,
@@ -388,12 +389,13 @@ def update_progress(progress_info, options):
388389

389390
def run_clients(options, workingDir, db_table_set):
390391
# Spawn one client for each db.table, up to options.clients at a time
391-
exit_event = multiprocessing.Event()
392+
exit_event = mp.Event()
392393
processes = []
393-
error_queue = SimpleQueue()
394-
interrupt_event = multiprocessing.Event()
395-
sindex_counter = multiprocessing.Value(ctypes.c_longlong, 0)
396-
hook_counter = multiprocessing.Value(ctypes.c_longlong, 0)
394+
ctx = mp.get_context(mp.get_start_method())
395+
error_queue = SimpleQueue(ctx=ctx)
396+
interrupt_event = mp.Event()
397+
sindex_counter = mp.Value(ctypes.c_longlong, 0)
398+
hook_counter = mp.Value(ctypes.c_longlong, 0)
397399

398400
signal.signal(signal.SIGINT, lambda a, b: abort_export(a, b, exit_event, interrupt_event))
399401
errors = []
@@ -405,8 +407,8 @@ def run_clients(options, workingDir, db_table_set):
405407

406408
tableSize = int(options.retryQuery("count", query.db(db).table(table).info()['doc_count_estimates'].sum()))
407409

408-
progress_info.append((multiprocessing.Value(ctypes.c_longlong, 0),
409-
multiprocessing.Value(ctypes.c_longlong, tableSize)))
410+
progress_info.append((mp.Value(ctypes.c_longlong, 0),
411+
mp.Value(ctypes.c_longlong, tableSize)))
410412
arg_lists.append((db, table,
411413
workingDir,
412414
options,
@@ -428,7 +430,7 @@ def run_clients(options, workingDir, db_table_set):
428430
processes = [process for process in processes if process.is_alive()]
429431

430432
if len(processes) < options.clients and len(arg_lists) > 0:
431-
newProcess = multiprocessing.Process(target=export_table, args=arg_lists.pop(0))
433+
newProcess = mp.Process(target=export_table, args=arg_lists.pop(0))
432434
newProcess.start()
433435
processes.append(newProcess)
434436

rethinkdb/_import.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import csv
2727
import ctypes
2828
import json
29-
import multiprocessing
29+
import multiprocessing as mp
3030
import optparse
3131
import os
3232
import signal
@@ -110,12 +110,12 @@ def __init__(
110110
self.query_runner = query_runner
111111

112112
# reporting information
113-
self._bytes_size = multiprocessing.Value(ctypes.c_longlong, -1)
114-
self._bytes_read = multiprocessing.Value(ctypes.c_longlong, -1)
113+
self._bytes_size = mp.Value(ctypes.c_longlong, -1)
114+
self._bytes_read = mp.Value(ctypes.c_longlong, -1)
115115

116-
self._total_rows = multiprocessing.Value(ctypes.c_longlong, -1)
117-
self._rows_read = multiprocessing.Value(ctypes.c_longlong, 0)
118-
self._rows_written = multiprocessing.Value(ctypes.c_longlong, 0)
116+
self._total_rows = mp.Value(ctypes.c_longlong, -1)
117+
self._rows_read = mp.Value(ctypes.c_longlong, 0)
118+
self._rows_written = mp.Value(ctypes.c_longlong, 0)
119119

120120
# source
121121
if hasattr(source, 'read'):
@@ -957,7 +957,7 @@ def table_writer(tables, options, work_queue, error_queue, warning_queue, exit_e
957957
nesting_depth=MAX_NESTING_DEPTH),
958958
durability=options.durability,
959959
conflict=conflict_action,
960-
ignore_write_hook=True))
960+
))
961961

962962
if res["errors"] > 0:
963963
raise RuntimeError("Error when importing into table '%s.%s': %s" % (db, table, res["first_error"]))
@@ -1083,13 +1083,15 @@ def import_tables(options, sources, files_ignored=None):
10831083

10841084
tables = dict(((x.db, x.table), x) for x in sources) # (db, table) => table
10851085

1086-
work_queue = Queue(options.clients * 3)
1087-
error_queue = SimpleQueue()
1088-
warning_queue = SimpleQueue()
1089-
exit_event = multiprocessing.Event()
1090-
interrupt_event = multiprocessing.Event()
1086+
ctx = mp.get_context(mp.get_start_method())
1087+
max_queue_size = options.clients * 3
1088+
work_queue = mp.Manager().Queue(max_queue_size)
1089+
error_queue = SimpleQueue(ctx=ctx)
1090+
warning_queue = SimpleQueue(ctx=ctx)
1091+
exit_event = mp.Event()
1092+
interrupt_event = mp.Event()
10911093

1092-
timing_queue = SimpleQueue()
1094+
timing_queue = SimpleQueue(ctx=ctx)
10931095

10941096
errors = []
10951097
warnings = []
@@ -1166,7 +1168,7 @@ def drain_queues():
11661168
try:
11671169
# - start the progress bar
11681170
if not options.quiet:
1169-
progress_bar = multiprocessing.Process(
1171+
progress_bar = mp.Process(
11701172
target=update_progress,
11711173
name="progress bar",
11721174
args=(sources, options.debug, exit_event, progress_bar_sleep)
@@ -1178,7 +1180,7 @@ def drain_queues():
11781180
writers = []
11791181
pools.append(writers)
11801182
for i in range(options.clients):
1181-
writer = multiprocessing.Process(
1183+
writer = mp.Process(
11821184
target=table_writer,
11831185
name="table writer %d" %
11841186
i,
@@ -1202,7 +1204,7 @@ def drain_queues():
12021204
# add a workers to fill up the readers pool
12031205
while len(readers) < options.clients:
12041206
table = next(file_iter)
1205-
reader = multiprocessing.Process(
1207+
reader = mp.Process(
12061208
target=table.read_to_queue,
12071209
name="table reader %s.%s" %
12081210
(table.db,

rethinkdb/utils_common.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ def format_epilog(self, formatter):
151151
return self.epilog or ''
152152

153153
def __init__(self, *args, **kwargs):
154-
155154
# -- Type Checkers
156155

157156
def check_tls_option(opt_str, value):
@@ -178,7 +177,7 @@ def check_positive_int(opt_str, value):
178177

179178
return int(value)
180179

181-
def check_existing_file(opt_str, value):
180+
def check_existing_file(_, opt_str, value):
182181
if not os.path.isfile(value):
183182
raise optparse.OptionValueError('%s value was not an existing file: %s' % (opt_str, value))
184183

@@ -207,7 +206,10 @@ def file_contents(opt_str, value):
207206

208207
# -- Callbacks
209208

210-
def combined_connect_action(value, parser):
209+
def combined_connect_action(obj, opt, value, parser, *args, **kwargs):
210+
"""optparse.takeaction() calls the callback (which this is set as)
211+
with the following args: self, opt, value, parser *args, **kwargs
212+
"""
211213
res = self.__connectRegex.match(value)
212214
if not res:
213215
raise optparse.OptionValueError("Invalid 'host:port' format: %s" % value)
@@ -295,15 +297,15 @@ def take_action(self, action, dest, opt, value, values, parser):
295297
help='driver port of a rethinkdb server',
296298
type='int',
297299
default=os.environ.get(
298-
'REBIRTHDB_DRIVER_PORT',
300+
'RETHINKDB_DRIVER_PORT',
299301
net.DEFAULT_PORT))
300302
connection_group.add_option(
301303
'--host-name',
302304
dest='hostname',
303305
metavar='HOST',
304306
help='host and driver port of a rethinkdb server',
305307
default=os.environ.get(
306-
'REBIRTHDB_HOSTNAME',
308+
'RETHINKDB_HOSTNAME',
307309
'localhost'))
308310
connection_group.add_option(
309311
'-u',
@@ -312,7 +314,7 @@ def take_action(self, action, dest, opt, value, values, parser):
312314
metavar='USERNAME',
313315
help='user name to connect as',
314316
default=os.environ.get(
315-
'REBIRTHDB_USER',
317+
'RETHINKDB_USER',
316318
'admin'))
317319
connection_group.add_option(
318320
'-p',
@@ -344,12 +346,13 @@ def parse_args(self, *args, **kwargs):
344346

345347
# - validate ENV variables
346348

347-
if 'REBIRTHDB_DRIVER_PORT' in os.environ:
348-
driver_port = os.environ['REBIRTHDB_DRIVER_PORT']
349+
if 'RETHINKDB_DRIVER_PORT' in os.environ:
350+
driver_port = os.environ['RETHINKDB_DRIVER_PORT']
349351

350352
if not isinstance(driver_port, int) or driver_port < 1:
351-
self.error('ENV variable REBIRTHDB_DRIVER_PORT is not a useable integer: %s'
352-
% os.environ['REBIRTHDB_DRIVER_PORT'])
353+
self.error('ENV variable RETHINKDB_DRIVER_PORT is not a useable '
354+
'integer: %s'
355+
% os.environ['RETHINKDB_DRIVER_PORT'])
353356

354357
# - parse options
355358

0 commit comments

Comments
 (0)