RxJava와 Coroutine 함께 사용하기라는 삽질?
개인 광고 영역
이전 글에서 RxJava와 Coroutines을 각각 알아보고, Coroutines Flow도 아주 간단하게 살펴봤다.
RxJava와 Coroutines을 간단하게 알아보자.
2 번째 글에서는 샘플을 위주로 살펴보려고 한다. RxJava 샘플을 만들고, RxJava와 Coroutines을 엮어서 사용하는 방법을 알아보려고 한다.
실제 개발에서 이렇게 사용하지 않으리라는 보장은 없고, Thread와 Coroutines을 엮어서 사용할 수도 있으니, 하단에 짧은 샘플을 만들어본다.
이 글을 보기 전에
- 이전 글에서 Thread와 Coroutines을 간단하게 소개했기에 궁금하다면 이전 글을 확인
- RxJava와 Coroutines을 엮어서 사용하는 방법은 공식 Github 소스에 나온다.
- 별도 배포하는 라이브러리도 있다.
- 여기에서는 다루지 않는다.(과거에 이미 했었다)
무슨 샘플을 만들까?
ID와 Password 입력받고, 버튼을 활성화화하는 샘플을 RxJava로 만들어 보았다.
그럼 간략하게 정리해보자.
- ID가 입력되었는지 확인할 필요가 있다.
- Password가 입력되었는지 확인할 필요가 있다.
- ID/Password가 입력되었으니 버튼을 활성화한다.
- 버튼을 눌러서 다음 이벤트를 처리할 수 있다.
여기서는 어디까지나 2개의 데이터를 입력받고, 버튼을 활성화하는 이벤트만을 테스트할 것이다.
RxJava로 구현해보자.
Input event를 처리할 subject 추가
RxJava의 다양한 도구 중 Hot observable인 BehaviorSubject - 링크을 사용했다.
- ID/Password를 입력을 대기하는 Subject 2개 생성
private val emailSubject = BehaviorSubject.create<String>()
private val passwordSubject = BehaviorSubject.create<String>()
LiveData로 이벤트를 받을 수도 있고, RxJava로도 받을 수 있다. Unit Test니깐 테스트를 위한 RxJava 하나 더 추가하자
private val enableLoginSubject = BehaviorSubject.create<Boolean>()
emailSubject, passwordSubject에 값을 넘겨줄 때는 onNext(““)로 넘겨줄 수 있다.
2개의 Input event를 처리할 combineLatest
2개의 subject를 합쳐서 하나의 스트림으로 확인하고, 처리하기 위해서 combineLatest - 링크를 이용하여 2개의 응답을 기다려보자.
키보드 입력에 따라 하나씩 전달될 것이다.
emailSubject.onNext("email")
passwordSubject.onNext("password")
2 값이 순차적으로 들어오고 combineLatest의 BiFunction을 통해 값을 통합적으로 받고, 각 입력 값이 isNotEmpty()
가 아니어야 버튼을 활성화한다.
RxJava의 filter를 이용해서 true인 경우만 데이터를 흘려보낼 수 있으나, 그러면 버튼이 활성화된 이후 돌아가지 않기에 적지 않아야 한다.
@Before
fun setUp() {
Observable.combineLatest<String, String, Boolean>(
idSubject,
passwordSubject,
BiFunction { t1, t2 ->
t1.isNotEmpty() && t2.isNotEmpty()
})
.subscribe {
enableLoginSubject.onNext(it)
}
}
이게 일반적인 로그인을 RxJava를 활용해서 만들어본 샘플이다.
응답은 RxJava로 왔으니, Coroutines과 역어보자
Unit Test에서 Subscribe 처리 시에는 TestObserver
val testObserver = TestObserver<Boolean>()
enableLoginSubject.subscribe(testObserver)
testObserver.awaitCount(1)
testObserver.assertResult(true)
하지만 코루틴이 메인 글이니 Coroutines을 사용해보도록 하자.
suspendCancellableCoroutine
suspendCancellableCoroutine - 링크이라는게 있다.
Suspends coroutine similar to suspendCoroutine, but provide an implementation of CancellableContinuation to the block. This function throws CancellationException if the coroutine is cancelled or completed while suspended.
간단하게 응답이 오기 전까지는 lock 시키고, 응답을 resumeWith/resumeWithException을 주면 그때 되돌아간다.
- resumeWith : 응답 값을 return 시킨다.
- resumeWithException : Exception에 대해 return 한다.
suspendCancellableCoroutine을 조금 더 살펴보면 Generic T를 리턴해준다. resumeWith을 통해 응답을 넘겨주거나, resumeWithException을 이용해 Exception을 리턴해주는 Coroutine suspend 함수이다.
내부의 처리는 lambdas 처리를 위한 block에 CancellableContinuation을 넘겨주는 부분을 가지고 있다. 마지막 줄의 cancellable의 getResult 호출 시 T 리턴해주는 형태이다.
결국 lock을 통해 응답이 오기 전까지 기다리는 coroutine block을 구현해두고 사용하고 있다.
/**
* Suspends coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
* the [block]. This function throws [CancellationException] if the coroutine is cancelled or completed while suspended.
*/
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
// NOTE: Before version 1.1.0 the following invocation was inlined here, so invocation of this
// method indicates that the code was compiled by kotlinx.coroutines < 1.1.0
// cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
suspendCancellableCoroutine과 RxJava 엮어보기
결국 Coroutines block에서 동작해야 하기에 runBlocking을 활용해야 한다.
runBlocking은 뒤에서 좀 더 알아보겠다.
키보드에서 값을 입력한 것처럼 구현하기 위해서 다음 코드를 추가했다.
launch {
delay(800)
passwordSubject.onNext("123456")
}
emailSubject.onNext("[email protected]")
email과 password를 시간차로 입력했다.
그리고 suspendCancellableCoroutine에서 Boolean을 최종 리턴 받도록 구현하고, continuation에 RxJava의 응답이 성공했을 때와 실패했을 때 처리를 하도록 구현했다.
onNext = 성공 시 boolean의 값을 처리하도록 하고, 이때 runCatching {}을 활용해서 Coroutine Result
invokeOnCancellation은 필요하면 구현해주어야 한다. 일정 시간 후 취소시키는 행동이 필요하다면 필요하다.
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
@Test
fun test() = runBlocking {
val ret = suspendCancellableCoroutine<Boolean> { continuation ->
enableLoginSubject
.subscribeOn(Schedulers.io())
.subscribe({
continuation.resumeWith(kotlin.runCatching { it })
}, {
continuation.resumeWithException(it)
})
continuation.invokeOnCancellation {
try {
cancel()
} catch (e: Exception) {
}
}
}
assert(ret)
}
결과는
결과야 매우 간단하게 아래와 같은데 위에서 보았던 코드에서 좀 더 로그를 담아보았다.
start
send password
wait suspend
send email
t1 [email protected] t2 123456
out!!!! true
Coroutine으로는 못하나?
코루틴으로도 RxJava 형태로 구현한다고 하면 가능은 하나, 그냥 기존 함수를 사용하는 간단한 방법으로 접근하는 게 좋다.
email/password 입력이 들어오면 Android LiveData에 값을 담아두고, 체크하는 매우 단순한 방식 말이다.
coroutine에 익숙해지면 오히려 RxJava가 좋은 라이브러리 인건 알 수 있지만 그렇다고 coroutine이 불편하지도 않다.
사실 이런 작업에 굳이 비동기 처리가 필요한가?에 대한 질문에 그럴 수도 아닐 수도 있다고 말할 수 있다.
하려고 한다면 매우 거대해 보이는 channels - 링크을 사용해본다거나? actors - 링크을 사용해본다거나 할 수 있다.
함정을 통해 Coroutine 주의할 점을 찾아보자
이건 어디까지나 비동기를 이해하기 쉽게 만든 함정을 하나 만들었다.
이 코드는 Unit Test
라는 점을 한 번 더 상기하길 바란다.
Flow 샘플 코드 - 링크를 그대로 가져와 함정 카드를 만들었다.
fun call(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun runBlockingTest() {
GlobalScope.launch {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
foo().collect { value ->
println(value)
}
}
}
@Test fun callTest() {
println("subroutines") // subroutines 출력
runBlockingTest() // Coroutines // 호출만 하고,
println("subroutines") // subroutines 출력 하고 callTest는 종료한다.
}
먼저 결과를 한번 보자. 생각한 결과가 맞는가?
subroutines
subroutines
아무런 동작도 없이 그냥 callTest가 종료되었다. 원하는 동작이 아닐 것이다.
주의 깊게 보아야 할 부분을 찾아보면 callTest()
와 runBlockingTest()
이다.
runBlockingTest에 잘 보면 GlobalScope.launch {}
를 호출했다. 결국 저 코드는 어디선가 호출하고 끝남을 대기하지 않겠다는 의미이다.
특히나 Unit Test
는 callTest()
내의 메소드 호출만 하면 끝난다.
해결해보자
해결 방법은 매우 간단하다 의도적으로 runBlocking을 제거한 상태이다. runBlocking을 추가하면 자연스럽게 해결된다.
fun runBlockingTest() = runBlocking {
/* GlobalScope.launch { // Unit Test에서는 runBlocking을 사용해야 테스트가 편한다. */
// ... 위 코드 참고
}
}
runBlocking 추가만으로 원하는 결과를 볼 수 있다.
subroutines
Flow started
I'm not blocked 1
1
100
I'm not blocked 2
2
100
I'm not blocked 3
3
100
subroutines
이 코드는 Unit Test 임을 다시 한번 강조하며, Coroutine은 함수 호출 후 완료를 대기한다. return을 만나면 이전으로 돌아가 다음 라인을 실행한다.
어디까지나 Coroutine 안에 있다는 조건이다.
runBlocking을 좀 더 알아보자
Android에서 runBlocking은 Unit Test에서만 사용해야 한다.
UI에서 runBlocking을 사용하면 무슨 문제가 생길까?
class TestActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
test()
}
private fun test() = runBlocking {
delay(15000L)
CoroutineScope(Dispatchers.Main).launch {
Toast.makeText(
this@TestActivity,
"lazy Toast!",
Toast.LENGTH_SHORT)
.show()
}
}
}
위 코드는 사실 비동기가 아니다. runBlocking의 내부 코드를 보면 아래와 같다.
public fun <T> runBlocking(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T): T {
// 생략
}
runBlocking 초기화에서 CoroutineContext인 Thread를 지정하지 않았기에 EmptyCoroutineContext으로 동작하는데, 호출한 곳의 Thread를 그대로 따르게 된다.
Android UI에서 runBlocking 사용 시 UI Thread에서 호출했기에 UI Thread를 그대로 따른다.
그래서 비동기 작업을 하는 것이 아닌 그냥 UI에 응답을 주지 않는 UI 동기 작업을 진행한다.
심각하면 ANR(Application Not Responding) - 링크이 발생한다.
ANR 발생 시 일정 시간 동안 App에서 응답을 주지 않았기에 앱을 강제 종료할 수 있다.
마무리
이번 글에서 중요한 점을 정리하면 다음과 같다.
- RxJava를 한 번에 다 수정하지는 못하지만 함께 쓰고 싶다면 suspendCancellableCoroutine을 사용할 수 있다
- Android UI 환경에서는 runBlocking을 사용하지 말아야 한다. 절대
- Android Unit Test 환경에서는 runBlocking을 사용할 수 있다.
Comments