11require 'delegate'
22require 'concurrent/executor/executor'
33require 'concurrent/logging'
4+ require 'concurrent/atomic/synchronization'
45
56module Concurrent
67
78 # Ensures passed jobs in a serialized order never running at the same time.
89 class SerializedExecution
910 include Logging
11+ include Synchronization
1012
1113 Job = Struct . new ( :executor , :args , :block ) do
1214 def call
@@ -15,9 +17,10 @@ def call
1517 end
1618
1719 def initialize
18- @being_executed = false
19- @stash = [ ]
20- @mutex = Mutex . new
20+ synchronize do
21+ @being_executed = false
22+ @stash = [ ]
23+ end
2124 end
2225
2326 # Submit a task to the executor for asynchronous processing.
@@ -33,23 +36,36 @@ def initialize
3336 #
3437 # @raise [ArgumentError] if no task is given
3538 def post ( executor , *args , &task )
36- return nil if task . nil?
37-
38- job = Job . new executor , args , task
39-
40- begin
41- @mutex . lock
42- post = if @being_executed
43- @stash << job
44- false
45- else
46- @being_executed = true
47- end
48- ensure
49- @mutex . unlock
39+ posts [ [ executor , args , task ] ]
40+ true
41+ end
42+
43+ # As {#post} but allows to submit multiple tasks at once, it's guaranteed that they will not
44+ # be interleaved by other tasks.
45+ #
46+ # @param [Array<Array(Executor, Array<Object>, Proc)>] posts array of triplets where
47+ # first is a {Executor}, second is array of args for task, third is a task (Proc)
48+ def posts ( posts )
49+ # if can_overflow?
50+ # raise ArgumentError, 'SerializedExecution does not support thread-pools which can overflow'
51+ # end
52+
53+ return nil if posts . empty?
54+
55+ jobs = posts . map { |executor , args , task | Job . new executor , args , task }
56+
57+ job_to_post = synchronize do
58+ if @being_executed
59+ @stash . push ( *jobs )
60+ nil
61+ else
62+ @being_executed = true
63+ @stash . push ( *jobs [ 1 ..-1 ] )
64+ jobs . first
65+ end
5066 end
5167
52- call_job job if post
68+ call_job job_to_post if job_to_post
5369 true
5470 end
5571
@@ -78,11 +94,8 @@ def call_job(job)
7894 def work ( job )
7995 job . call
8096 ensure
81- begin
82- @mutex . lock
97+ synchronize do
8398 job = @stash . shift || ( @being_executed = false )
84- ensure
85- @mutex . unlock
8699 end
87100
88101 call_job job if job
@@ -98,7 +111,7 @@ class SerializedExecutionDelegator < SimpleDelegator
98111 include SerialExecutor
99112
100113 def initialize ( executor )
101- @executor = executor
114+ @executor = executor
102115 @serializer = SerializedExecution . new
103116 super ( executor )
104117 end
0 commit comments