diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/disposable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/disposable.kt index 6f3d73a..f1a6cf0 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/disposable.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/disposable.kt @@ -10,10 +10,18 @@ operator fun CompositeDisposable.plusAssign(disposable: Disposable) { add(disposable) } + +/** + * disposable -= observable.subscribe() + */ +operator fun CompositeDisposable.minusAssign(disposable: Disposable) { + delete(disposable) +} + /** * Add the disposable to a CompositeDisposable. * @param compositeDisposable CompositeDisposable to add this disposable to * @return this instance */ fun Disposable.addTo(compositeDisposable: CompositeDisposable): Disposable = - apply { compositeDisposable.add(this) } + apply { compositeDisposable += this } diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt index ed246db..41519cf 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt @@ -177,3 +177,11 @@ fun Observable>.toMultimap(): Single Iterable>.concatAll(): Observable = Observable.concat(this) + +/** + * Merge multiple Observable by using plus operator + */ +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +operator fun Observable.plus(other: Observable): Observable = + Observable.merge(this, other) \ No newline at end of file diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt index 2ed8cae..fa9ba3c 100644 --- a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt @@ -268,7 +268,19 @@ class ObservableTest { .concatAll() .toList() .subscribe { result -> - Assert.assertEquals((0 until 30).toList(), result) + assertEquals((0 until 30).toList(), result) } } + + @Test fun testMergeWithPlus() { + val output = + Observable.just(1, 2) + + Observable.just(3, 4) + + Observable.just(5, 6) + + Observable.just(7, 8) + + Observable.just(9, 10) + val result = mutableListOf() + output.subscribe { data -> result.add(data) } + assertEquals(result, (1..10).toMutableList()) + } } diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/SubscriptionTests.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/SubscriptionTests.kt index 7e03448..e288585 100644 --- a/src/test/kotlin/io/reactivex/rxjava3/kotlin/SubscriptionTests.kt +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/SubscriptionTests.kt @@ -2,28 +2,55 @@ package io.reactivex.rxjava3.kotlin import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.disposables.CompositeDisposable +import org.junit.After +import org.junit.Before import org.junit.Test import java.util.concurrent.TimeUnit class SubscriptionTest { - @Test fun testSubscriptionAddTo() { - val compositeSubscription = CompositeDisposable() - // Create an asynchronous subscription - // The delay ensures that we don't automatically unsubscribe because data finished emitting - val subscription = Observable.just("test") - .delay(100, TimeUnit.MILLISECONDS) - .subscribe() + // Create an asynchronous subscription + // The delay ensures that we don't automatically unsubscribe because data finished emitting + private val subscription = Observable.just("test") + .delay(100, TimeUnit.MILLISECONDS) + .subscribe() + private lateinit var compositeSubscription: CompositeDisposable + + @Before + fun setUp() { + compositeSubscription = CompositeDisposable() + } + + @After + fun tearDown() { + compositeSubscription.dispose() + assert(compositeSubscription.isDisposed) + } + + @Test + fun testSubscriptionAddTo() { assert(!subscription.isDisposed) subscription.addTo(compositeSubscription) assert(compositeSubscription.size() > 0) assert(!subscription.isDisposed) + } - compositeSubscription.dispose() + @Test + fun testSubscriptionAddToWithPlusAssign() { + compositeSubscription += subscription - assert(compositeSubscription.isDisposed) + assert(compositeSubscription.size() > 0) + } + + @Test + fun testSubscriptionDeleteWithMinusAssign() { + this.testSubscriptionAddToWithPlusAssign() + + compositeSubscription -= subscription + + assert(compositeSubscription.size() == 0) } } \ No newline at end of file