Callback으로 받은 데이터를 Coroutines에서 활용하는 방법! Flow 활용



개인 광고 영역

Callback Listener에서 전달되어오는 값을 Coroutines Flow에 전달하려면 어떻게 할 수 있을까?에서 시작한 고민을 글로 정리해 둔다.

이 글은 Flow에 대한 설명을 하지는 않을 것이지만 이를 해결하기 위해 사용한 방법은 아래와 같다.

  • Callback으로 들어온 값은 Coroutines의 suspend가 아니다.
  • flow에 emit 하려면 suspend 내에 있어야 한다.
  • SharedFlow와 StateFlow을 사용할 순 있다.

이 글은 방안에 대한 정리만 한 것일 뿐 전반적인 설명을 담고 있지는 않다. 그러니 참고용으론 활용할 수 있겠지 만 잘 이해하지 못하고 사용한다면 사용치 않는 것을 권장한다.

이 글에 도움 될 링크는 아래와 같다.


이 글에서 알아볼 내용

  • Callback으로 들어온 값을 Coroutines에 던져 처리하기 위한 방법을 설명한다.
  • SharedFlow와 StateFlow를 통한 해결 방법을 알아본다.


Callback을 Coroutines으로 던지는 방법은?

다음과 같은 callback block이 있다. 여기서 실시간으로 들어오는 값을 순서를 보장하며 Coroutines의 flow가 처리할 수 있도록 만들어주는 게 목표이다.

시작하기 전에 결과는 가장 위험한 방법을 통해 이를 해결 할 수 있는데, 바로 runBlocking이다.

사용하시는 분이 runBlocking을 잘 알고 있다면 사용하는 걸 말리지는 않겠으나, runBlocking을 잘 모른다면 절대로 쓰지 말 것을 권장한다.

최소한 runBlocking이 어떠한 문제가 있고, 어떻게 동작하는지는 알고 있어 한다.

구글 검색하면 이분이 작성한 좋은 글이 있으니 참고하시길

그리고 저도 드로이드나이츠에서 발표했던 내용이 있으니 참고하시길


Callback은 suspend가 아니다.

callback을 suspend로 처리하려는 이유는 이후의 데이터 처리를 스트림으로 바꾸고, 비동기 처리하기 위함이다.

  • 이미 만들어진 CallbackListener으로 받은 데이터를 Coroutines으로 던질 수 있어야 한다.
  • UI/IO 상관없이 Coroutines으로 값을 던질 수 있어야 한다.

위와 같은데 사실 같은 이야기다. 어디선가 들어오는 값이 UI/IO 상관없이 Coroutines으로 넘어가 이 값을 처리할 수 있어야 한다.

어디선가 들어오는 값은 연속적일 수도 있고, 비 연속 적일 수도 있는데, 아래와 같다.

class CallbackExample {

    @Test
    fun test() = runBlocking {
        val callback = object : CallbackTestListener {
            override fun onCallback(value: Int) {
                // 여기에서 처리가 필요
                println("callback $value")
            }
        }

        (1..10).forEach {
            callback.onCallback(it)
        }
    }
}

interface CallbackTestListener {

    fun onCallback(value: Int)
}


RxJava로는?

RxJava를 이용해 참고용으로 만들면 아래와 같을 수 있다.

이 테스트 코드에서는 이전 값부터 다시 불러오기 위해서 ReplaySubject를 이용해 처리했지만, 결과적으로 코루틴을 활용해 이와 동일한 처리를 하고 싶은 것이다.

class CallbackExample {

    private val subject = ReplaySubject.create<Int>()

    @Test
    fun test() = runBlocking {
        val callback = object : CallbackTestListener {
            override fun onCallback(value: Int) {
                // 여기에서 처리가 필요
                println("callback $value")
                subject.onNext(value)
            }
        }

        (1..10).forEach {
            callback.onCallback(it)
        }

        subject.subscribe {
            println("Update data $it")
        }
        delay(1000)
    }
}

interface CallbackTestListener {

    fun onCallback(value: Int)
}

이 코드의 결과는 아래와 같다.

Update data 1
... 생략
Update data 10


StateFlow와 SharedFlow

StateFlow and SharedFlow을 이용해 볼 수 있는데, 각각 차례대로 보도록 하자.


StateFlow

먼저 StateFlow를 활용해 볼 수 있어 보이지만 실상은 사용할 수 없다.

StateFlow는 모든 값의 마지막 값 만을 가질 수 있도록 설계되었다.

그러니 들어오는 값을 바로 넘겨주는 형태로 제공할 수 있고, 이 또한 여러 번의 값이 들어오면 무시된다.

class CallbackExample {

    private val stateFlow = MutableStateFlow(-1)

    @Test
    fun test() = runBlocking {
        val callback = object : CallbackTestListener {
            override fun onCallback(value: Int) {
                // 여기에서 처리가 필요
                println("callback $value")
                stateFlow.value = value
            }
        }

        (1..10).forEach {
            callback.onCallback(it)
        }

        launch {
            stateFlow.collect {
                println("Update data $it")
            }
        }
        delay(1000)
    }
}

이 코드의 결과는 Update data 10이 한번 출력되고 끝난다.

내부 코드에 대한 설명은 간단하게 하면 value는 아래와 같고,

public override var value: T
     get() = NULL.unbox(_state.value)
     set(value) { updateState(null, value ?: NULL) }

updateState의 일부 코드를 살펴보면 아래와 같다.

private fun updateState(expectedState: Any?, newState: Any): Boolean {
    var curSequence = 0
    var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
    synchronized(this) {
       // 동기 처리 후 값을 변경한다.
    }
}

그리고 다른 tryEmit이나 emit역시 value를 변경하는 정도의 처리만 한다.

결과적으로 stateFlow를 사용해야 하는 조건이라면 정말 상태 값을 던져서 처리해야 하는 정도의 수준으로만 접근해야 한다.


SharedFlow

이번엔 SharedFlow를 살펴보자. 코드는 동일하지만 SharedFlow는 일반 UI에서 처리가 불가능하다.

class CallbackExample {

    private val sharedFlow = MutableSharedFlow<Int>()

    @Test
    fun test() = runBlocking {
        val callback = object : CallbackTestListener {
            override fun onCallback(value: Int) {
                // 여기에서 처리가 필요
                println("callback $value")
                sharedFlow.emit(value)
                // emit은 suspend block이 아니기 때문에 오류가 발생한다.
            }
        }

        launch {
            sharedFlow.collect {
                println("Update data $it")
            }
        }

        (1..10).forEach {
            callback.onCallback(it)
        }
        delay(1000)
    }
}

그래서 코드를 아래처럼 수정해주고, 테스트 해보자.

launch {
    sharedFlow.emit(value)
}

얼핏 보면 순서대로 잘 들어온 것처럼 보인다. 하지만 콜백이 하나만 들어올 리 없고, launch가 현 코드에서처럼 매우 잘 동작할리 없다.

Update data 1
... 생략
Update data 10

모드 같은 Job에서 동작하는 코드라서 사실문제없이 순차적인 처리를 잘하고 있다.

하지만 실 코드에서는 이렇게 쓸 일이 없고, 라이브러리르 형태로 만들려고 할 것이다. 다음 글을 이어서 보자.


서로 다른 Coroutines 만들어 처리해보자.

위에서처럼 하나의 Coroutines에서 동작하기보단 서로 다른 Coroutines에서 collect 해 사용하게 될 것이다.

그래서 서로 다른 Coroutines을 보기 위해서 아래처럼 수정했다. 그리고 2개의 서로 다른 callback을 요청한다.

class CallbackExample {

    @Test
    fun test() = runBlocking {
        val test = TestTest()
        launch {
            test.sharedFlow.collect {
                println("Update data $it")
                delay(100)
            }
        }

        launch {
            (1..10).forEach {
                test.callback.onCallback(it)
            }
        }
        launch {
            (1..10).forEach {
                test.callback.onCallback(it * 100)
            }
        }
        delay(1000)
    }
}

class TestTest : CoroutineScope {
    val sharedFlow = MutableSharedFlow<Int>()

    override val coroutineContext: CoroutineContext
        get() = Dispatchers.IO

    val callback = object : CallbackTestListener {
        override fun onCallback(value: Int) {
            // 여기에서 처리가 필요
            launch {
                println("callback $value")
                sharedFlow.emit(value)
            }
        }
    }
}

이 코드의 주입은 순서대로 들어가지는 않고, 심지어 꺼낼 때도 순서대로 나오지 않는다.

1부터 들었으면, 1이 가장 먼저 나와야 하지만 9가 가장 먼저 나왔다.

callback 1
callback 7
... 생략
callback 400
callback 100
... 생략
callback 900
callback 800
Update data 9
Update data 2
... 생략
Update data 1000
Update data 6

그 말은 실제 내부 코드에서 9가 가장 먼저 들어갔고, 그래서 9가 가정 먼저 나왔다는 말이 된다.

emit 다음에 출력을 하나 더 해보면 그 이유는 알 수 있다.


그래서 어떻게 쓰는 게 좋을까?

결과는 처음에도 적었지만 runBlocking을 활용해야 한다.

하지만 runBlocking은 문제가 심각해진다.

val callback = object : CallbackTestListener {
    override fun onCallback(value: Int) {
        // 여기에서 처리가 필요
        runBlocking {
            println("callback $value")
            sharedFlow.emit(value)
            println("--- callback emit end $value")
        }
    }
}

onCallback을 호출하는 코드의 위치가 Main인 UI에서 호출하게 되면 생각지도 못하게 ANR이 발생할 수 있다.

이 코드는 내부에서 한 번 더 감싸고, 외부 호출 시 suspend를 보장하도록 만들어주는 게 중요하다.

suspend fun updateValue(value: Int) = withContext(Dispatchers.IO) {
    callback.onCallback(value)
}

private val callback = object : CallbackTestListener {
  // 생략
}

핵심은 UI 블락이 없도록 만드는 것이다.


Flow랑 함께 사용하려면

flow랑 함께 사용하려면 구조를 변경해야 하는데, sharedFlow도 불필요해진다. 이 방법을 사실 추천하지만 이 역시 runBlocking을 내부적으로 활용하고 있기에 주의해야 한다.

callbackFlow를 활용하면 flow로 값을 emit 시킬 수 있다.

테스트를 위한 코드라 조금 길게 작성했다. 먼저 callbackFlow 안에서 callback을 등록하고, 초기화하면 되겠다.

그리고 꼭 필요한 awaitClose를 사용해야 한다. 이 코드는 close 처리가 없는 단순한 코드이다.

class TestTest {

    // test를 위한 코드
    fun updateValue(value: Int) {
        callback?.onCallback(value)
    }

    // test를 위한 코드
    private var callback: CallbackTestListener? = null

    @OptIn(ExperimentalCoroutinesApi::class)
    fun createCallback(): Flow<Int> = callbackFlow {
        callback = object : CallbackTestListener {
            override fun onCallback(value: Int) {
                // 여기에서 처리가 필요
                println("callback $value")
                sendBlocking(value)
            }
        }
        awaitClose {
            callback = null
        }
    }
}

이를 테스트하기 위한 코드는 아래와 같다.

앞에서 작성한 코드와 크게 다르지는 않다.

class CallbackExample {

    @Test
    fun test() = runBlocking {
        val test = TestTest()
        launch {
            test.createCallback().collect {
                println("Update data $it")
                delay(100)
            }
        }

        // test를 위한 코드
        delay(100)
        launch {
            (1..10).forEach {
                test.updateValue(it)
            }
        }
        launch {
            (1..10).forEach {
                test.updateValue(it * 100)
            }
        }
        delay(1000)
    }
}


주의할 내용

계속 언급했으나, 여기서 또 한 번 언급한다.

callbackFlow에서 이용하는 sendBlocking이 중요한데 아래와 같다.

결국 runBlocking을 활용하고 있다. offer에서 처리가 되었다면 runBlocking으로 넘어가지는 않겠지만 언제든 넘어갈 수 있다는 말이다.

public fun <E> SendChannel<E>.sendBlocking(element: E) {
    // fast path
    if (offer(element))
        return
    // slow path
    runBlocking {
        send(element)
    }
}

UI에서 이런 코드를 호출하는 것 자체가 위험하다. UI를 Blocking 시키고, 처리가 다 끝날 때까지 놔주지 않기 때문이다.

결국 시간이 오래 걸리는 잡에 이런 코드를 사용하면 runBlocking에 의해 ANR이 발생할 수 있다.


마무리

콜백을 coroutines에서 처리하기 위한 방법을 알아보았다. 결론은 runBlocking 활용으로 해결 아닌 해결은 할 수 있다.

정말 본인이 꼭 필요한 코드를 잘 알고 쓴다면 이 코드는 문제가 없다. 어떠한 위치에서 사용할지를 잘 판단하고 쓰도록 하자. 그렇지 않다면 쓰지 않는 편이 100만 배 좋다.



About Taehwan

My name is Taehwan Kwon. I have developed Android for 6 years and blog has been active for eight years.

Comments