Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core.async/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/target
/classes
/checkouts
pom.xml
pom.xml.asc
*.jar
*.class
/.lein-*
/.nrepl-port
.hgignore
.hg/
8 changes: 8 additions & 0 deletions core.async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# finagle-clojure/core.async

Clojure core.async adapters for finagle-clojure.

### Dependency

[finagle-clojure/core.async "0.4.2-SNAPSHOT"]

15 changes: 15 additions & 0 deletions core.async/project.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
(defproject finagle-clojure/core.async "0.4.2-SNAPSHOT"
:description "Clojure core.async integration with finagle-clojure"
:url "https://github.com/twitter/finagle-clojure"
:license {:name "Apache License, Version 2.0"
:url "https://www.apache.org/licenses/LICENSE-2.0"}
:scm {:name "git" :url "http://github.com/finagle/finagle-clojure"}
:plugins [[lein-midje "3.2"]]
:profiles {:test {:dependencies [[midje "1.8.3" :exclusions [org.clojure/clojure]]]}
:dev [:test {:dependencies [[org.clojure/clojure "1.8.0"]]}]
:1.7 [:test {:dependencies [[org.clojure/clojure "1.7.0"]]}]
:1.6 [:test {:dependencies [[org.clojure/clojure "1.6.0"]]}]
:1.5 [:test {:dependencies [[org.clojure/clojure "1.5.1"]]}]
:1.4 [:test {:dependencies [[org.clojure/clojure "1.4.0"]]}]}
:dependencies [[finagle-clojure/core "0.4.2-SNAPSHOT"]
[org.clojure/core.async "0.2.374"]])
71 changes: 71 additions & 0 deletions core.async/src/finagle_clojure/core_async.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
(ns finagle-clojure.core-async
"Adapters to use Futures with core.async."
(:require [finagle-clojure.futures :as f]
[finagle-clojure.scala :as scala]
[clojure.core.async :as a])
(:import [com.twitter.util Future]))

(defn ^:no-doc throw-error
[o]
(if (instance? Throwable o)
(throw o)
o))

;; this needs to be a macro so it expands in the scope of the enclosing go block
;; otherwise the async take complains that it isn't in a go block
;; throw-error needs to be public so it's visible in the macro
(defmacro <?
"Similar to `clojure.core.async/<!`, but will throw instances of Throwable.

*Arguments*:

* `c`: a core.async chan

*Returns*:

The value from `c`, or throws it if it's `Throwable`."
[c]
`(throw-error (a/<! ~c)))

(defn <??
"Similar to `clojure.core.async/<!!`, but will throw instances of Throwable.

*Arguments*:

* `c`: a core.async chan

*Returns*:

The value from `c`, or throws it if it's `Throwable`."
[c]
(throw-error (a/<!! c)))

(defn- enqueue-to-chan
[c]
(fn [v]
;; Close the channel after the value has been put into channel c
;; to make sure it isn't closed before the value has been submitted.
(a/put! c v (fn [_] (a/close! c)))
scala/unit))

(defn future->chan
"Enqueues the value or Throwable that a Future is defined with to a channel.
If no chan is provided a new `promise-chan` will be created and returned.

*Arguments*:

* `f`: a Future
* `c`: (optional) a core.async chan

*Returns*:

The chan to which the result of Future `f` will be enqueued.

See the helper fns [[<??]] & [[<?]] to take a value fro a chan and throw
it if it's an instance of `Throwable`."
([^Future f]
(future->chan f (a/promise-chan)))
([^Future f c]
(f/on-success* f (enqueue-to-chan c))
(f/on-failure* f (enqueue-to-chan c))
c))
17 changes: 17 additions & 0 deletions core.async/test/finagle_clojure/core_async_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
(ns finagle-clojure.core-async-test
(:require [finagle-clojure.core-async :refer :all]
[finagle-clojure.futures :as f]
[clojure.core.async :as a]
[midje.sweet :refer :all]))

(facts "future->chan"
(<?? (future->chan (f/exception (Exception.)))) => (throws Exception)
(<?? (future->chan (f/value :value))) => :value
(a/<!! (a/go (<? (future->chan (f/value :value))))) => :value
(let [c (a/chan 1)
e (Exception.)]
(a/<!! (future->chan (f/exception e) c)) => e
(a/<!! c) => nil)
(let [c (a/chan 1)]
(a/<!! (future->chan (f/value :value) c)) => :value
(a/<!! c) => nil))