코루틴 공식 가이드 자세히 읽기 — Part 3 — Dive 2
Fan-out 에서 수신 코루틴 중 하나에 오류가 발생했을 때 수신 방식(for-loop, consumeEach)에 따라 오류 전파 동작이 다른 이유는 무엇일까?
Korean [English]
(목차로 돌아가기)
공식 가이드에서 Fan-out (하나의 채널에 송신자와 수신자가 1:n 인 경우) 시에 수신자들 중 하나에 오류가 발생하면 수신자의 구현에 따라 오류 전파 방식이 다음과 같이 달라짐을 나타냈다.
For-loop : 오류가 발생 하지 않은 수신자들이 나머지 데이터를 수신.
consumeEach() : 오류가 모든 수신자로 전파되어 수신이 중지됨.
이를 확인해 보기 위한 예제는 다음과 같다.
예제를 보면 produceNumbers()
함수를 이용하여 1부터 무한한 수를 송신하는 송신자를 만들고 launchProcessorA()
함수를 이용하여 이를 수신하는 수신자를 5개 생성하고 있다 (repeat).
여기서 5번째 라인을 보면 repeat index가 3일 경우에 수신을 취소하고 있는데 이 launchProcessA는 consumeEach로 구현이 되어있기 때문에 다음과 같은 결과를 보인다.
ProcessorA #0 received 1
ProcessorA #0 received 2Process finished with exit code 0
수신이 취소되고 그에따라 취소 예외가 모든 수신자로 전파되어 Job을 취소하게되고 종료되었다.
launchProcessorA()
를 launchProcessorB()
로 변경하면 결과는 아래와 같이 달라진다.
ProcessorB #0 received 1
ProcessorB #0 received 2
ProcessorB #1 received 3
ProcessorB #2 received 4
ProcessorB #0 received 5
ProcessorB #4 received 6
ProcessorB #1 received 7
ProcessorB #2 received 8
ProcessorB #0 received 9
ProcessorB #4 received 10
ProcessorB #1 received 11
ProcessorB #2 received 12
결과에 보면 ProcessorB #3 은 취소되기 때문에 나타나지 않고 나머지 Processor들이 수신을 이어나감을 확인할 수 있다.
어째서 이런 차이가 발생하는 것일까?
사실 For-loop 방식은 코루틴의 기본적인 오류 전파 방식(자식 코루틴의 취소 오류는 부모로 전파되지 않고 자식만 종료)에 따라 나머지 Processor 들이 동작할 수 있었던 것 뿐이다.
그렇다면 consumeEach()
사용 시 나머지 Processor 들이 모두 취소되는 이유를 확인해보면 우리는 보다 명확히 채널의 동작 방식을 이해하게 될 것이다.
위 코드를 살펴보면 consumeEach()
는 단순히 consume()
을 호출하여 채널을 순회하면서 각각의 수신 이벤트에 대해서 전달된 action 을 호출하는 것을 확인할 수 있다.
이제 consume()
을 확인해보면 파라미터로 전달 된 block() 함수를 호출하는데 block 함수는 해당 채널에 전달된 모든 이벤트를 수신하는 것이 필수사항이다. 이렇게 block() 함수의 구현에서 모든 수신이 끝나게 되면 finally 블록에 의해서 채널의 cancel() 를 호출하게 된다.
지금 ReceiveChannel은 사실 produce { } 확장함수에 의해 생성된 ChannelCoroutine 이다. ChannelCoroutine 의 cancel()의 호출은 최종 적으로 송신 채널을 닫게 된다. 다음은 코루틴에서 채널 관련 클래스들의 관계이다.
ChannelCoroutine은 Channel 인터페이스를 구현하기 때문에 cancel()
을 구현하고 있고, 이를 호출하면 생성자로 전달되어 위임(delegation)된 channel을 cancel 하게 된다. 다음은 ChannelCoroutine 의 코드이다.
그리고 이렇게 채널에 cancel() 이 호출되면 다음과 같은 AbstractChannel 클래스의 구현에 의해
송신 채널을 close 하게 되고, 채널 큐(queue)에 있는 모든 데이터를 삭제하게 된다. (close 는 다이어그램에 보이듯이 송신 채널 인터페이스만 가지고 있다.)
송신 채널을 close 하는 과정을 좀 더 자세히 알아보기 위해 AbstractChannel 의 구현을 살펴보자.
3~7 라인은 채널 큐의 마지막 이벤트가 Close
가 아니면 Close
이벤트를 추가하고, 맞을 경우에는 그 항목을 그대로 사용하여 helpClose()
를 호출하게 되고 helpClose() 는 채널 큐의 Tail -> Head 로 이동하며 이벤트를 제거해 나간다.
지금까지 알아본 것과 같이 consumeEach()
를 통한 수신 중 수신 채널 중 하나에 취소가 발생되면 송신채널을 닫고 모든 이벤트를 제거하기 때문에 모든 수신이 종료되게 된다.
끝.