Skip to content

Commit f57c703

Browse files
author
Gábor Boros
committed
Fix smaller issues
1 parent 50aafde commit f57c703

File tree

2 files changed

+24
-18
lines changed

2 files changed

+24
-18
lines changed

rethinkdb/_export.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import ctypes
2424
import datetime
2525
import json
26-
import multiprocessing as mp
26+
import multiprocessing
2727
import numbers
2828
import optparse
2929
import os
@@ -35,6 +35,8 @@
3535
import traceback
3636
from multiprocessing.queues import SimpleQueue
3737

38+
import six
39+
3840
from rethinkdb import errors, query, utils_common
3941
from rethinkdb.logger import default_logger
4042

@@ -259,12 +261,16 @@ def export_table(db, table, directory, options, error_queue, progress_info, sind
259261
with sindex_counter.get_lock():
260262
sindex_counter.value += len(table_info["indexes"])
261263
# -- start the writer
262-
ctx = mp.get_context(mp.get_start_method())
263-
task_queue = SimpleQueue(ctx=ctx)
264+
if six.PY3:
265+
ctx = multiprocessing.get_context(multiprocessing.get_start_method())
266+
task_queue = SimpleQueue(ctx=ctx)
267+
else:
268+
task_queue = SimpleQueue()
269+
264270
writer = None
265271
if options.format == "json":
266272
filename = directory + "/%s/%s.json" % (db, table)
267-
writer = mp.Process(
273+
writer = multiprocessing.Process(
268274
target=json_writer,
269275
args=(
270276
filename,
@@ -274,7 +280,7 @@ def export_table(db, table, directory, options, error_queue, progress_info, sind
274280
options.format))
275281
elif options.format == "csv":
276282
filename = directory + "/%s/%s.csv" % (db, table)
277-
writer = mp.Process(
283+
writer = multiprocessing.Process(
278284
target=csv_writer,
279285
args=(
280286
filename,
@@ -284,7 +290,7 @@ def export_table(db, table, directory, options, error_queue, progress_info, sind
284290
error_queue))
285291
elif options.format == "ndjson":
286292
filename = directory + "/%s/%s.ndjson" % (db, table)
287-
writer = mp.Process(
293+
writer = multiprocessing.Process(
288294
target=json_writer,
289295
args=(
290296
filename,
@@ -389,13 +395,13 @@ def update_progress(progress_info, options):
389395

390396
def run_clients(options, workingDir, db_table_set):
391397
# Spawn one client for each db.table, up to options.clients at a time
392-
exit_event = mp.Event()
398+
exit_event = multiprocessing.Event()
393399
processes = []
394-
ctx = mp.get_context(mp.get_start_method())
400+
ctx = multiprocessing.get_context(multiprocessing.get_start_method())
395401
error_queue = SimpleQueue(ctx=ctx)
396-
interrupt_event = mp.Event()
397-
sindex_counter = mp.Value(ctypes.c_longlong, 0)
398-
hook_counter = mp.Value(ctypes.c_longlong, 0)
402+
interrupt_event = multiprocessing.Event()
403+
sindex_counter = multiprocessing.Value(ctypes.c_longlong, 0)
404+
hook_counter = multiprocessing.Value(ctypes.c_longlong, 0)
399405

400406
signal.signal(signal.SIGINT, lambda a, b: abort_export(a, b, exit_event, interrupt_event))
401407
errors = []
@@ -407,8 +413,8 @@ def run_clients(options, workingDir, db_table_set):
407413

408414
tableSize = int(options.retryQuery("count", query.db(db).table(table).info()['doc_count_estimates'].sum()))
409415

410-
progress_info.append((mp.Value(ctypes.c_longlong, 0),
411-
mp.Value(ctypes.c_longlong, tableSize)))
416+
progress_info.append((multiprocessing.Value(ctypes.c_longlong, 0),
417+
multiprocessing.Value(ctypes.c_longlong, tableSize)))
412418
arg_lists.append((db, table,
413419
workingDir,
414420
options,
@@ -430,7 +436,7 @@ def run_clients(options, workingDir, db_table_set):
430436
processes = [process for process in processes if process.is_alive()]
431437

432438
if len(processes) < options.clients and len(arg_lists) > 0:
433-
newProcess = mp.Process(target=export_table, args=arg_lists.pop(0))
439+
newProcess = multiprocessing.Process(target=export_table, args=arg_lists.pop(0))
434440
newProcess.start()
435441
processes.append(newProcess)
436442

rethinkdb/utils_common.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def check_minimum_version(options, minimum_version='1.6'):
129129
version_string = options.retryQuery('get server version', query.db(
130130
'rethinkdb').table('server_status')[0]['process']['version'])
131131

132-
matches = re.match(r'rethinkdb (?P<version>(\d+)\.(\d+)\.(\d+)).*', version_string)
132+
matches = re.match(r'(rethinkdb|rebirthdb) (?P<version>(\d+)\.(\d+)\.(\d+)).*', version_string)
133133

134134
if not matches:
135135
raise RuntimeError("invalid version string format: %s" % version_string)
@@ -285,11 +285,11 @@ def take_action(self, action, dest, opt, value, values, parser):
285285
'--connect',
286286
dest='driver_port',
287287
metavar='HOST:PORT',
288-
help='host and client port of a rethinkdb node to connect (default: localhost:%d)' %
289-
net.DEFAULT_PORT,
288+
help='host and client port of a rethinkdb node to connect (default: localhost:%d)' % net.DEFAULT_PORT,
290289
action='callback',
291290
callback=combined_connect_action,
292-
type='string')
291+
type='str'
292+
)
293293
connection_group.add_option(
294294
'--driver-port',
295295
dest='driver_port',

0 commit comments

Comments
 (0)