Skip to content

Commit 6613af0

Browse files
authored
Merge pull request #70 from eulerfx/master
bufferByTime
2 parents 2713edc + 49aab21 commit 6613af0

File tree

6 files changed

+78
-8
lines changed

6 files changed

+78
-8
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
### 2.0.15 - 27.09.2017
2+
* NEW: AsyncSeq.bufferByTime
3+
14
### 2.0.14 - 27.09.2017
25
* BUG: Fixed head of line blocking in AsyncSeq.mapAsyncParallel
36

src/FSharp.Control.AsyncSeq.Profile7/AssemblyInfo.fs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ open System.Reflection
44
[<assembly: AssemblyTitleAttribute("FSharp.Control.AsyncSeq.Profile7")>]
55
[<assembly: AssemblyProductAttribute("FSharp.Control.AsyncSeq")>]
66
[<assembly: AssemblyDescriptionAttribute("Asynchronous sequences for F#")>]
7-
[<assembly: AssemblyVersionAttribute("2.0.13")>]
8-
[<assembly: AssemblyFileVersionAttribute("2.0.13")>]
7+
[<assembly: AssemblyVersionAttribute("2.0.14")>]
8+
[<assembly: AssemblyFileVersionAttribute("2.0.14")>]
99
do ()
1010

1111
module internal AssemblyVersionInformation =
12-
let [<Literal>] Version = "2.0.13"
13-
let [<Literal>] InformationalVersion = "2.0.13"
12+
let [<Literal>] Version = "2.0.14"
13+
let [<Literal>] InformationalVersion = "2.0.14"

src/FSharp.Control.AsyncSeq/AssemblyInfo.fs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ open System.Reflection
44
[<assembly: AssemblyTitleAttribute("FSharp.Control.AsyncSeq")>]
55
[<assembly: AssemblyProductAttribute("FSharp.Control.AsyncSeq")>]
66
[<assembly: AssemblyDescriptionAttribute("Asynchronous sequences for F#")>]
7-
[<assembly: AssemblyVersionAttribute("2.0.13")>]
8-
[<assembly: AssemblyFileVersionAttribute("2.0.13")>]
7+
[<assembly: AssemblyVersionAttribute("2.0.14")>]
8+
[<assembly: AssemblyFileVersionAttribute("2.0.14")>]
99
do ()
1010

1111
module internal AssemblyVersionInformation =
12-
let [<Literal>] Version = "2.0.13"
13-
let [<Literal>] InformationalVersion = "2.0.13"
12+
let [<Literal>] Version = "2.0.14"
13+
let [<Literal>] InformationalVersion = "2.0.14"

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ module internal Utils =
110110
elif i = 1 then return (Choice2Of2 (b.Result, a))
111111
else return! failwith (sprintf "unreachable, i = %d" i) }
112112

113+
static member internal chooseTasks2 (a:Task<'T>) (b:Task) : Async<Choice<'T * Task, Task<'T>>> =
114+
async {
115+
let! ct = Async.CancellationToken
116+
let i = Task.WaitAny( [| (a :> Task);(b) |],ct)
117+
if i = 0 then return (Choice1Of2 (a.Result, b))
118+
elif i = 1 then return (Choice2Of2 (a))
119+
else return! failwith (sprintf "unreachable, i = %d" i) }
120+
113121
type MailboxProcessor<'Msg> with
114122
member __.PostAndAsyncReplyTask (f:TaskCompletionSource<'a> -> 'Msg) : Task<'a> =
115123
let tcs = new TaskCompletionSource<'a>()
@@ -131,6 +139,9 @@ module internal Utils =
131139
let chooseTask (t:Task<'a>) (a:Async<'a>) : Async<'a> =
132140
chooseTaskAsTask t a |> Async.bind Async.awaitTaskCancellationAsError
133141

142+
let toUnit (t:Task) : Task<unit> =
143+
t.ContinueWith (Func<_, _>(fun (_:Task) -> ()))
144+
134145
let taskFault (t:Task<'a>) : Task<'b> =
135146
t
136147
|> extend (fun t ->
@@ -1389,6 +1400,35 @@ module AsyncSeq =
13891400
yield! loop None timeoutMs
13901401
}
13911402

1403+
let bufferByTime (timeMs:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> = asyncSeq {
1404+
if (timeMs < 1) then invalidArg "timeMs" "must be positive"
1405+
let buf = new ResizeArray<_>()
1406+
use ie = source.GetEnumerator()
1407+
let rec loop (next:Task<'T option> option, waitFor:Task option) = asyncSeq {
1408+
let! next =
1409+
match next with
1410+
| Some n -> async.Return n
1411+
| None -> ie.MoveNext () |> Async.StartChildAsTask
1412+
let waitFor =
1413+
match waitFor with
1414+
| Some w -> w
1415+
| None -> Task.Delay timeMs
1416+
let! res = Async.chooseTasks2 next waitFor
1417+
match res with
1418+
| Choice1Of2 (Some a,waitFor) ->
1419+
buf.Add a
1420+
yield! loop (None,Some waitFor)
1421+
| Choice1Of2 (None,_) ->
1422+
let arr = buf.ToArray()
1423+
if arr.Length > 0 then
1424+
yield arr
1425+
| Choice2Of2 next ->
1426+
let arr = buf.ToArray()
1427+
buf.Clear()
1428+
yield arr
1429+
yield! loop (Some next, None) }
1430+
yield! loop (None, None) }
1431+
13921432
let private mergeChoiceEnum (ie1:IAsyncEnumerator<'T1>) (ie2:IAsyncEnumerator<'T2>) : AsyncSeq<Choice<'T1,'T2>> = asyncSeq {
13931433
let! move1T = Async.StartChildAsTask (ie1.MoveNext())
13941434
let! move2T = Async.StartChildAsTask (ie2.MoveNext())

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,10 @@ module AsyncSeq =
421421
/// Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed.
422422
val bufferByCountAndTime : bufferSize:int -> timeoutMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>
423423

424+
/// Buffers items from the async sequence by the specified time interval.
425+
/// If no items are received in an intervel and empty array is emitted.
426+
val bufferByTime : timeMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T[]>
427+
424428
/// Merges two async sequences into an async sequence non-deterministically.
425429
/// The resulting async sequence produces elements when any argument sequence produces an element.
426430
val mergeChoice: source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<Choice<'T1,'T2>>

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,29 @@ let ``AsyncSeq.bufferByTimeAndCount empty``() =
446446
let actual = AsyncSeq.bufferByCountAndTime 2 10 s |> AsyncSeq.toList
447447
Assert.True((actual = []))
448448

449+
[<Test>]
450+
let ``AsyncSeq.bufferByTime`` () =
451+
452+
let s = asyncSeq {
453+
yield 1
454+
yield 2
455+
do! Async.Sleep 100
456+
yield 3
457+
yield 4
458+
do! Async.Sleep 100
459+
yield 5
460+
yield 6
461+
}
462+
463+
let actual =
464+
s
465+
|> AsyncSeq.bufferByTime 100
466+
|> AsyncSeq.map (List.ofArray)
467+
|> AsyncSeq.toList
468+
469+
let expected = [ [1;2] ; [3;4] ; [5;6] ]
470+
471+
Assert.True ((actual = expected))
449472

450473
[<Test>]
451474
let ``try finally works no exception``() =

0 commit comments

Comments
 (0)