2626import csv
2727import ctypes
2828import json
29- import multiprocessing as mp
29+ import multiprocessing
3030import optparse
3131import os
3232import signal
3333import sys
3434import time
3535import traceback
36+ import six
3637from multiprocessing .queues import Queue , SimpleQueue
3738
3839from rethinkdb import ast , errors , query , utils_common
@@ -110,12 +111,12 @@ def __init__(
110111 self .query_runner = query_runner
111112
112113 # reporting information
113- self ._bytes_size = mp .Value (ctypes .c_longlong , - 1 )
114- self ._bytes_read = mp .Value (ctypes .c_longlong , - 1 )
114+ self ._bytes_size = multiprocessing .Value (ctypes .c_longlong , - 1 )
115+ self ._bytes_read = multiprocessing .Value (ctypes .c_longlong , - 1 )
115116
116- self ._total_rows = mp .Value (ctypes .c_longlong , - 1 )
117- self ._rows_read = mp .Value (ctypes .c_longlong , 0 )
118- self ._rows_written = mp .Value (ctypes .c_longlong , 0 )
117+ self ._total_rows = multiprocessing .Value (ctypes .c_longlong , - 1 )
118+ self ._rows_read = multiprocessing .Value (ctypes .c_longlong , 0 )
119+ self ._rows_written = multiprocessing .Value (ctypes .c_longlong , 0 )
119120
120121 # source
121122 if hasattr (source , 'read' ):
@@ -1083,15 +1084,21 @@ def import_tables(options, sources, files_ignored=None):
10831084
10841085 tables = dict (((x .db , x .table ), x ) for x in sources ) # (db, table) => table
10851086
1086- ctx = mp .get_context (mp .get_start_method ())
1087+ if six .PY3 :
1088+ ctx = multiprocessing .get_context (multiprocessing .get_start_method ())
1089+ error_queue = SimpleQueue (ctx = ctx )
1090+ warning_queue = SimpleQueue (ctx = ctx )
1091+ timing_queue = SimpleQueue (ctx = ctx )
1092+ else :
1093+ error_queue = SimpleQueue ()
1094+ warning_queue = SimpleQueue ()
1095+ timing_queue = SimpleQueue ()
1096+
10871097 max_queue_size = options .clients * 3
1088- work_queue = mp .Manager ().Queue (max_queue_size )
1089- error_queue = SimpleQueue (ctx = ctx )
1090- warning_queue = SimpleQueue (ctx = ctx )
1091- exit_event = mp .Event ()
1092- interrupt_event = mp .Event ()
1098+ work_queue = multiprocessing .Manager ().Queue (max_queue_size )
10931099
1094- timing_queue = SimpleQueue (ctx = ctx )
1100+ exit_event = multiprocessing .Event ()
1101+ interrupt_event = multiprocessing .Event ()
10951102
10961103 errors = []
10971104 warnings = []
@@ -1168,7 +1175,7 @@ def drain_queues():
11681175 try :
11691176 # - start the progress bar
11701177 if not options .quiet :
1171- progress_bar = mp .Process (
1178+ progress_bar = multiprocessing .Process (
11721179 target = update_progress ,
11731180 name = "progress bar" ,
11741181 args = (sources , options .debug , exit_event , progress_bar_sleep )
@@ -1180,7 +1187,7 @@ def drain_queues():
11801187 writers = []
11811188 pools .append (writers )
11821189 for i in range (options .clients ):
1183- writer = mp .Process (
1190+ writer = multiprocessing .Process (
11841191 target = table_writer ,
11851192 name = "table writer %d" %
11861193 i ,
@@ -1204,7 +1211,7 @@ def drain_queues():
12041211 # add a workers to fill up the readers pool
12051212 while len (readers ) < options .clients :
12061213 table = next (file_iter )
1207- reader = mp .Process (
1214+ reader = multiprocessing .Process (
12081215 target = table .read_to_queue ,
12091216 name = "table reader %s.%s" %
12101217 (table .db ,
0 commit comments