Callback으로 전달받은 데이터를 Coroutines의 Channel로 처리해보자.



개인 광고 영역

Callback 이벤트로 받은 데이터를 Coroutines의 Flow를 활용한 방법에 대해서 소개했었다. 최근 Medium에서 Channel을 사용한 방법을 소개한 글이 있어 Channel을 사용하는 방법을 정리해보려고 한다.

여기에서 사용한 샘플은 단순한 테스트를 한 것이고, 네트워크가 포함되어 있지 않다.

다만 단순한 테스트로도 발생하는 문제가 있어 정리해두려고 한다.

참고한 글은 Kotlin Channel and WebSocket Complete Example (Also Why Not Flow)이다.

이 글에 도움 될 문서는 아래와 같다.


이 글에서 알아볼 내용

  • Flow와 Channel의 간단한 사용법을 알아본다.
  • Callback으로 전달된 데이터를 Channel으로 넘기기 위한 방법을 알아본다.
  • 데이터의 순서가 보장되는지 확인해보고, 해결 방법을 알아본다.


Flow 대신 Channel을 간단하게 알아보자.

이전에 작성한 Callback으로 받은 데이터를 Coroutines에서 활용하는 방법! Flow 활용은 SharedFlow와 StateFlow를 활용해 데이터를 처리하는 방법을 정리했다. 결국 callbackFlow를 사용해 데이터를 안전하게 보장하여 처리하는 방법을 익힐 수 있었다.

이번 글에서는 Kotlin Channel and WebSocket Complete Example (Also Why Not Flow)을 참고해 Channel을 활용하는 방법을 정리한다.

Flow를 활용하는 방법과 Channel을 이용하는 방법은 내부적으론 비슷하게 동작하는데, ReceiveChannel을 활용하고 있다.

Flow와 Channel의 차이를 더 잘 알고 싶다면 Roman Elizarov이 작성한 Cold flows, hot channels을 참고하길

이 글을 참고하면 Flow는 Cold이고, Channel은 Hot에 해당한다.

Channel은 옵저버 등록 없이도 데이터를 보관할 수 있다. 상태에 따라서 이를 활용하면 이득일 수 있지만 receive 전까지 데이터를 보관하게 되는 단점이 생긴다.

대신 Flow는 collect을 하기 전에는 데이터를 흘려보내지 않고 대기한다.

그래서 이 둘을 사용하는 데 있어 명확한 차이가 발생한다.


Flow를 활용할 경우

flow를 활용한 0..10까지의 숫자를 stream으로 흘려보내고, 이를 출력하는 코드를 작성했다.

@Test
fun testFlow() = runBlocking {
    flow<Int> {
        (0..10).forEach {
            println("emit $it")
            emit(it) // 1을 입력하면
        }
    }.collect {
        println("collect $it") // 1이 출력되고,
        delay(1) // 1ms를 대기한다. 그동안 blocking 한다.
    }
    delay(100)
}

결과는 아래와 같다. 0이 들어가면 0이 출력되고, 1이 들어가면 1이 출력되는 반복적인 작업을 한다.

emit 0
collect 0
emit 1
collect 1
... 생략
emit 9
collect 9
emit 10
collect 10

Flow는 collect을 하지 않으면 emit도 하지 않기 때문에 별도의 내부 캐싱이 필요치 않는다.


Channel의 동작은

이번엔 Channel을 이용해보자.

Flow와는 다르게 독립적인 구성을 할 수 있다.

@Test
fun testChannel() = runBlocking {
    val channel = Channel<Int>()
    launch {
        (0..10).forEach {
            println("send $it")
            channel.send(it)
        }
    }

    repeat(11) {
        println("receive ${channel.receive()}")
    }
}

Channel의 capacity의 기본값은 RENDEZVOUS으로 설정되어 있다.

Requests a rendezvous channel in the Channel(...) factory function a channel that does not have a buffer.

이 값은 별도의 버퍼를 만들지 않고 처리하는데, 실험해보면 send 2번 정도 보내고 나면 다음 것부터 전송하지 않는다.

지속적인 receive를 해줘야 값이 잘 들어감을 알 수 있다.

send 0
send 1
receive 0
receive 1
// 생략
send 8
send 9
receive 8
receive 9

10개를 모두 입력받고, 10개를 출력하고 싶다면 Channel<Int>(capacity = 10)으로 변경하거나, Channel<Int>(CONFLATED)으로 변경해 주면 되겠다.

  • CONFLATED

Requests a conflated channel in the Channel(...) factory function. This is a shortcut to creating a channel with [onBufferOverflow = DROP_OLDEST][BufferOverflow.DROP_OLDEST].


그래서?

Flow와 Channel은 서로 적절한 사용처가 있는 건 당연하다. Flow가 항상 좋은 것도 아니고, Channel이 필요 없는 것 역시 아니다.

다만 필요한 경우에 따라 적절하게 활용할 수 있어야 한다.


본론으로 돌아가 Channel을 활용해보자.

Kotlin Channel and WebSocket Complete Example (Also Why Not Flow)에 소개한 내용대로 Channel을 활용하고, 필자가 작성했던 이전 글에서처럼 Callback에 대한 순서가 문제없는지 지켜보자.

단순한 Callback 샘플을 추가해 아래와 같이 구성해보았다. 0에서 100까지의 Int를 String으로 변환하고, 0부터 100까지 순서 꼬임 없이 잘 출력하는지 확인해보자.

class CallbackExample {

    @Test
    fun test() = runBlocking {
        val channel = Channel<String>()
        val callback = object : CallbackListener {
            override fun onCallback(value: String) {
                GlobalScope.launch {
                    println("callback $value")
                    channel.send(value)
                }
            }
        }

        val end = launch {
            while (isActive) {
                println("------ receive !!!! ${channel.receive()}")
            }
        }

        GlobalScope.launch(Dispatchers.IO) {
            (0..100).forEach {
                callback.onCallback("input one - $it")
                println("input one - $it")
            }
        }.join()

        end.cancel()
    }
}

interface CallbackListener {

    fun onCallback(value: String)
}

이 코드의 결과는 너무 길기 때문에 비정상적으로 보이는 결과만 가져와 보겠다.

예상 결과는 0, 1, … 10, 11 -> 0, 1, … 10, 11이 잘 출력되어야 한다.

하지만 단순 테스트 결과에서도 알 수 있는데, 순서를 보장하지 않는다.

중간중간 가끔 이런 일이 발생하기는 하지만 그래도 어디선가 꼬이면 문제가 생길 수 있는 데이터라면 이런 방식의 접근은 잘못되었다고 보면 되겠다.

input one - 8
input one - 9
input one - 10
input one - 11
input one - 12
input one - 13
callback input one - 9
callback input one - 8
input one - 14
callback input one - 11
callback input one - 12
callback input one - 13
callback input one - 10
callback input one - 15


문제를 하나씩 보자.

여기서 발생하는 문제는

  • 서버에서 time stamp를 포함해 순서대로 내려주었다.
  • 하지만 받는 입장에서 순서가 뒤바뀌어 변경되었다.

이 데이터의 순서가 매우 중요하다면 문제가 생길 수 있는데, 채팅에서는 대화의 순서가 꼬일 수도 있다.(임의로 작성한 글이다)

메시지를 이렇게 보내면

xxx야 안녕
나에게 5000원 보내줘
12453535353535
계좌야
010-1234-1234

하지만 받는 곳에서는 순서가 다 꼬여서 아래와 같을 수 있다.

계좌야
나에게 5000원 보내줘
12453535353535
xxx야 안녕
010-1234-1234

이런 메시지가 있다면 순서 보장이 안되어 어떤 이야기를 하고 싶은지도 모르게 될 수도 있다.

정렬을 할 수도 있겠지만 데이터양이 매우 많은 실시간 데이터라면 이런 문제는 매우 복잡해진다.

특히나 돈과 관련한 차트의 순서가 뒤 바뀐다면 더 큰 문제가 생길 수 있다.


코드상 어디가 문제인가?

코드에서는 이 부분이 문제가 있는데,

val callback = object : CallbackListener {
    override fun onCallback(value: String) {
        GlobalScope.launch {
            println("callback $value")
            channel.send(value)
        }
    }
}

channel의 send는 내부적으로 suspend로 구현되어 있다.

public suspend fun send(element: E)

그렇기에 일반적인 Thread인 callback 상황에서 코루틴의 channel에 값을 던지려면 이와 같은 Coroutines에서 처리를 해야 한다.

그래서 이 코드에서는 GlobalScope을 사용한다.

GlobalScope의 launch가 먼저 실행되는 쪽이 먼저 값을 전달하게 된다. 아래 코드에서 볼 수 있듯 callback은 순서대로 8, 9가 들어왔지만 GlobalScope의 launch는 9가 먼저 실행되어 send를 먼저 보내 Channel의 queue에 쌓아두기에 receive을 하더라도 9가 먼저 출력된다.

결국 어떤 Coroutines이 먼저 실행될지는 보장할 수 없기 때문이다.

callback input one - 9
callback input one - 8
callback input one - 11
callback input one - 12
callback input one - 13
callback input one - 10
callback input one - 15

channel은 순서를 보장해 주고, 버퍼 사이즈를 지정함에 있어 버퍼도 가질 수 있다. 다만 GlobalScope이 생성되고, send 하는 이 사이의 시점이 순서를 보장하지 않는다.

그렇기 때문에 이러한 위험성을 안고, Coroutines을 활용하는 방법을 활용하거나, 순서를 보장해 줄 수 있도록 코드를 변경해야 하는데 runBlocking을 활용할 수 있다.

val callback = object : CallbackListener {
    override fun onCallback(value: String) {
        println("callback $value")
        runBlocking {
            println("runBlocking send $value")
            channel.send(value)
        }
//      GlobalScope.launch {
//          println("launch send $value")
//          channel.send(value)
//      }
    }
}


runBlocking은 답은 아니다.

이 코드에서 순서를 보장하기 위해서 runBlocking을 활용해야 한다. 하지만 단순히 작성한 샘플에서는 runBlocking이 문제가 없다.

다만 실제 사용할 때는 주의해야 한다. UI에서 runBlocking을 활용하면 100% 문제가 생긴다. 그렇기에 본인이 명확한 별도의 Thread라고 인지했다면 runBlocking을 사용해도 문제없다.

OkHttp에서 제공하는 WebSocket을 사용할 때는 명확히 별도의 Thread에서 동작함을 인지하고 있다면 써도 괜찮다.


마무리

참고했던 글처럼 WebSocket과 GlobalScope을 사용하는 경우 순서에 대한 보장을 하지 못하는 걸 알 수 있다.

이전 글과도 동일한 결과이지만 한 번 더 정리해 Channel을 활용하는 방법을 정리해보았다.

callbackFlow를 활용하는 것 역시 runBlocking을 활용하는 데 IO를 보장해 주는지 확인해야 한다.

결국 서로 다른 Thread에서 코루틴을 활용해 값을 처리하는 데 있어 알아야 할 것들이 많은데, 이런 부분이 어렵다면 RxJava의 Subject를 활용하는 게 가장 쉽다.



About Taehwan

My name is Taehwan Kwon. I have developed Android for 6 years and blog has been active for eight years.

Comments