RxJava와 Coroutines을 간단하게 알아보자.



개인 광고 영역

약 1년 전 Coroutines을 처음 다루었고, RxJava에 대해서 조금 익숙하게 사용할 시점에 작성했던 글이다.

RxJava와 Kotlin Coroutines 비교해보기 - 링크

그간 지원은 다음과 같다.

  • 2019 Google I/O에서 Coroutines을 적극 도입하기로 하였고,
  • ViewModel/LiveData 등을 위한 CoroutineScope을 제공하고,
  • Coroutines에서도 RxJava Cold Observable과 유사한 Flow를 제공한다.
  • Kotlin/Coroutines은 계속 안정적으로 흘러가고 있다.


이 글을 보기 전에

  • Thread와 Coroutines을 간단하게 소개한다.
  • RxJava와 Coroutines에 대한 간단한 소개를 담는다.
  • RxJava와 Coroutines에 대해서 더 자세한 내용은 본 블로그의 이전 글을 보거나, 공식 문서를 참고하길 바란다.
  • 기본 내용을 다루는 챕터라서 아는 내용이라면 빠르게 넘어가길 바란다.


Java에서 동작하는 건 모두 Thread

Java에서는 new Thread()를 이용하여 Asynchronous 작업을 다룰 수 있는데, 결국 JVM(Java virtual machine) 위에서 동작해야 한다.

JVM(Java virtual machine) 위에서 동작하는 Coroutines은 결국 Thread를 사용할 수밖에 없다.

비동기 처리를 잘 이해하고 있고, Java에서 구현할 수 있는 Thread - 링크를 잘 알고 있다면 RxJava든 Coroutines이 든 접근하기 조금 쉬워진다.


Coroutines

코루틴 역시 Thread와 같은 Asynchronous 작업을 다룰 수 있다. 그냥 JVM 위에서 동작하기 위한 Thread를 이용할 뿐이다.

그리고 다른 점은 Observer pattern이 아닌 함수 return 형태로 응답을 받을 수 있다.

Coroutines은 functions 호촐 후 return을 대기하기 때문이다.

suspend fun loadData(): Item {
    // load data
    delay(500)
    return Item()
}

@Test fun test() = runBlocking {
    val item = loadData()
}


RxJava는?

Coroutines을 살펴보았으니, RxJava를 알아보자. 그전에 ReactiveX 라이브러리 배포는 현재 최신인 3.x까지 안정적인 라이브러리이다.

RxJava는 하나의 라이브러리일 뿐이고, ReactiveX - 링크 공식 문서에 다음과 같은 슬로건을 달고 있다.

ReactiveX : An API for asynchronous programming with observable streams 비동기 프로그래밍을 Observable streams을 활용하는 API이다.

ReactiveX의 Observable pattern, Iterator pattern, Functional programming을 결합한 형태를 가지고 있는데 Observable stream으로 실제 데이터 흐름을 파악할 수 있다.

ReactiveX 공식 홈을 통해 확인할 수 있는데, 다음과 같은 주요 언어들을 지원하고 있으며, 추가적인 언어들은 사이트에서 확인 가능하다.


RxJava

RxJava - 링크는 현재 3.0 정식 버전을 위해 달려가고 있으며, RxJava 보안 패치 및 업데이트에 대한 일정을 공식적으로 언급하고 있다.

  • RxJava 1.x : 2018년 12월 31일 이미 종료되었다.
  • RxJava 2.x : 2020년 12월 31일까지 보안 패치 지원한다.
  • RxJava 3.x : RC 5까지 나온 상태라 곧 배포할듯하다.


RxKotlin

RxKotlin - 링크은 RxJava의 일부 확장 함수를 제공하기 위하여 나온 것이다.

combineLatest, zip, withLatestFrom 등의 확장 함수와 disposable를 처리할 수 있는 간단한 형태의 코드를 제공하고 있다.

/**
 * disposable += observable.subscribe()
 */
operator fun CompositeDisposable.plusAssign(disposable: Disposable) {
    add(disposable)
}

/**
 * Add the disposable to a CompositeDisposable.
 * @param compositeDisposable CompositeDisposable to add this disposable to
 * @return this instance
 */
fun Disposable.addTo(compositeDisposable: CompositeDisposable): Disposable =
        apply { compositeDisposable.add(this) }

어디까지나 확장 기능이기에 RxKotlin을 사용하는 것은 필수가 아니며, RxKotlin을 단독으로 사용할 일 역시 없다.

문서에는 다음과 같이 명시하고 있다.

RxKotlin 문서에 보면 RxKotlin is a lightweight library that adds convenient extension functions to RxJava. 여기서 중요한 구문은 extension functions to RxJava.이다.

RxJava와 Kotlin 조합으로도 충분히 RxKotlin 없이 개발이 가능하다. combineLatest의 경우 RxJava만 사용하더라도 다음과 같은 형태의 코드를 만드는데 무리가 없다.

Observable.combineLatest<String, String, Boolean>(
  Observable.just("AAA"),
  Observable.just("BBB"),
  BiFunction { t1, t2 -> t1 == t2 }
)

2개의 Observable 스트림을 받고, BiFunction에서 이 2개의 값을 비교하여 Boolean을 조합해서 내려주는 형태의 코드이다.

Kotlin을 하면 당연히 RxKotlin을 해야 한다고 생각할 수 있는데 그럴 방법도, 그럴 수도 없다. RxKotlin은 어디까지나 확장 형태의 함수를 제공할 뿐이며, RxJava 2.x까지 만 이를 제공하고 있다.

현 상태로는 RxJava 3에서 이를 지원하지 않을 것 같다.


RxAndroid

RxJava와 RxKotlin을 간단하게 소개했는데, 그럼 안드로이드에서 RxJava를 활용하려면? Android를 위한 RxAndroid - 링크가 있다.

RxAndroid는 RxJava를 기본적으로 포함하고 있다. 다만 문서에 보면 RxAndorid 사용 시 RxJava를 기본으로 추가하길 권장하고 있다.

dependencies {
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0-SNAPSHOT'
    // Because RxAndroid releases are few and far between, it is recommended you also
    // explicitly depend on RxJava's latest version for bug fixes and new features.
    // (see https://github.com/ReactiveX/RxJava/releases for latest 3.x.x version)
    implementation 'io.reactivex.rxjava3:rxjava:3.0.0-RC2'
}

RxJava보다 배포 속도가 느리기에 RxJava의 더 최신 패치 및 최신 feature를 활용하려면 추가해주는 것을 추천한다는 글이다.

RxAndroid에서는 Android의 Main Scheduler(UI Scheduler)를 활용할 수 있도록 AndroidSchedulers.mainThread()를 제공하고 있다.

Observable.just("one", "two", "three", "four", "five")
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {/* an Observer */}


RxJava와 Coroutine의 다른 점?

Asynchronous 처리를 도와주는 부분은 동일하다.

그 Asynchronous 처리를 어떠한 형태로 할 것인지가 다를 뿐이다. 이 역시 Thread도 비슷하다.

하지만 하나씩 뜯어보면

  • RxJava
    • Observable pattern으로 구독
    • 구독 후 들어오는 data를 stream 형태로 내려보낸다.
    • 중간중간 데이터가 변환되는 걸 stream을 통해 확인 가능하다.
  • Coroutines
    • 누구나 아는 함수 호출을 하고, return이 오기 전까지 기다린다.
    • 처리가 끝나 return 한 데이터를 가지고 다음 처리를 할 수 있다.
  • Thread
    • 모든 걸 직접 구현해야 한다.
    • Listener도 등록해야 하고, Background, UI를 필요할 때마다 구현해야 한다.
    • 지저분하고 따라가기 힘들고, 복잡해진다. 데이터 흐름을 쉽게 파악하기 어렵다

마지막 Thread의 단점을 RxJava와 Coroutines은 알아서 해준다.

개발자가 알아야 할 처리 양이 적어진다.


RxJava와 Coroutines을 비교하는 게 맞을까?

같은 기능을 한다는 점에서는 비교하는 게 맞을 수 있다.

비교까지는 어렵고 Coroutines 1.3에 추가된 Flow - 링크를 간단하게 살펴보고 이 글을 끝내도록 하겠다.


Flow?

Flow는 Coroutines 1.3에 정식으로 추가된 스펙이고, 이 Asynchronous Flow - 링크는 다음을 구현했다고 한다.

A cold asynchronous data stream that sequentially emits values and completes normally or with an exception.

결국 RxJava의 Cold observable을 구현하고, Data stream을 만들어준 부분인데, RxJava를 안다면 어렵지 않게 접근 가능해 보인다.

그럼 샘플 코드를 하나 보자. for 문으로 1..3까지 emit 하는 간단한 코드이다.

foo() 함수의 collect이 호출되기 전까지는 동작하지 않는데, RxJava에서는 subscribe와 동일하다.

fun foo(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

collect를 구현해야 하는데 문서에 나온 샘플 코드를 가져와서 다양한 테스트가 가능하다.

println를 통해 어떤 식으로 동작하는지 체크가 가능하다.

collect는 RxJava 기준 subscribe의 onNext에 해당하며, emit을 호출하면 stream에 데이터가 흐르며 collect에서 하나씩 받아 처리할 수 있다.

fun main() = runBlocking<Unit> {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value ->
      println(value)
    }
    println("Calling collect again...")
    flow.collect { value ->
      println(value)
    }
}


Flow 사용에 허용하지 않는 패턴

Flow reference 문서 - 링크에도 나와있지만 flow 사용 시 아래와 같은 부분은 허용하지 않는다.

기본은 Data stream을 방해하는 모든 것을 허용하지 않는다.

  • GlobalScope.launch 생성
  • CoroutineScope().launch 생성
  • launch로 Dispatcher 변경
  • withContext의 사용

반대로 허용하는 경우도 있는데 전혀 의미 없는 코드인데, data stream을 방해하지 않는 동일 coroutine context라 의미 없는 코드이다.

fun foo(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)

        coroutineScope {
            emit(100)
        }
    }
}


마무리

RxJava와 Coroutines을 간단하게 비교하기 위해서 살펴보았다. 그럼 2부에서 샘플 코드를 통해 좀 더 알아보자.



About Taehwan

My name is Taehwan Kwon. I have developed Android for 6 years and blog has been active for eight years.

Facebook Comments

Comments