코루틴 공식 가이드 자세히 읽기 — Part 3

Myungpyo Shim
10 min readJan 23, 2019

--

공식 가이드 읽기 (3 / 8)

Korean [English]

(목차로 돌아가기)

채널(Channels)

지연 값(Deferred values)을 사용하면 서로 다른 코루틴들이 손쉽게 하나의 값을 공유할 수 있습니다. 채널을 이용하면 서로 다른 코루틴들간에 데이터 스트림을 공유할 수 있습니다. 채널은 BlockingQueue 와 유사하게 동작하지만 값을 읽고 쓰기 위한 함수의 이름이 다릅니다. 큐와 비교하면 값을 큐에 넣기 위해 put() 대신 send() 를 사용하고, 큐에서 값을 꺼내기 위해서 take() 대신 recieve() 를 사용합니다.

<출력 결과>

1
4
9
16
25
Done!

채널의 순회와 닫기 (Closing and iteration over channels)

큐(queue)와 달리 채널은 닫을 수 있고, 닫힌 채널은 더이상 값이 전달되지 않을 것임을 나타냅니다. 어떤 채널의 데이터를 수신하는 쪽에서는 이 점을 이용하여 채널을 순회하는 함수들을 사용할 수 있습니다(ex> for loop).

채널을 닫는 요청을 하게되면 내부적으로 채널이 닫힐 것임을 나타내는 값을 채널 큐의 마지막에 추가 하는 방식으로 동작하기 때문에, 이 닫힘 값이 채널에 전달되기 전에 도착했던 값들은 수신하는 쪽에서 모두 수신할 수 있음이 보장됩니다.

채널 프로듀서 만들기 (Building Channel Producers)

코루틴이 어떤 데이터 스트림(연속된 값 들의 흐름)을 생성해내는 일은 꽤나 흔한일이며, 이것은 우리가 동시성 코드에서 흔히 접할 수 있던 프로듀서-컨수머 패턴의 일부 입니다. 여러분은 이러한 프로듀서의 생성 작업을 추상화 하기 위해서 채널을 파라미터로 전달받는 생성 함수로 만들 수도 있습니다.
하지만 이러한 함수는 “함수는 반드시 결과가 반환되어야 한다”는 상식에 맞지 않게 됩니다.

코루틴 프레임워크 에서는 이러한 프로듀서의 생성 작업을 용이하게 하기 위해서 produce{ }라는 코루틴 빌더와 이렇게 생성된 프로듀서가 생성하는 값들의 수신을 돕기 위한 consumeEach() 라는 확장 함수가 제공됩니다. (consumeEach 는 for 를 대체함)

파이프라인 (Pipeline)

파이프라인이란 하나의 코루틴이 데이터 스트림 (무한 스트림 포함)을 생산해내고 다른 하나 이상의 코루틴들이 이 스트림을 수신 받아 필요한 작업을 수행 한 후 가공된 결과를 다시 전송하는 패턴을 말합니다.

위 예제는
1) 일련의 정수를 생성해내는 프로듀서를 produce { } 빌더를 통해 생성하고,
2) 그 결과인 ReceiveChannel 을 전달 된 수의 두배수를 생성해내는 프로듀서를 만들때 전달하여 파이프라인을 구성하고 그 결과를 출력하는 예제입니다.

[정수 생성 프로듀서] -> [두배수 변환 프로듀서] -> 출력

파이프라인을 이용한 소수 생성 (Prime numbers with pipeline)

코루틴 파이프라인을 이용하여 소수를 생성해내는 조금 더 무거운 작업을 수행해 봅시다.

우리는 먼저 numbersFrom() 프로듀서를 이용하여 2부터 시작되는 정수의 무한 스트림 채널을 만듭니다. 그리고 filter() 프로듀서를 더해 파이프라인을 구성하는데 filter() 프로듀서는 수신 채널과 소수를 전달 받아 수신되는 값들 중에서 소수와 나누어 떨어지는 값들을 제외하고 송신합니다.
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

여기서 우리는 runBlocking() 을 이용하여 메인스레드에서 모든 코루틴을 수행하였기 때문에 이들의 취소를 위해 코루틴들의 참조 리스트를 유지할 필요가 없이 10개의 소수를 출력한 후 cancelChildren() 확장함수를 이용하여 자식 코루틴들을 한번에 취소할 수 있습니다.

이것과 동일한 기능을 수행하는 파이프라인을 표준 라이브러리의 buildIterator { }를 이용하여 만들 수 있습니다. produce { }를 buildIterator { }로, send()를 yield()로, receive()를 next()로 ReceiveChannel을 Iterator로 변경하고 코루틴 스코프를 제거하면 됩니다.

이렇게 채널이 아닌 buildIterator를 이용한 구현으로 변경하고나면 더이상 runBlocking { }은 필요하지 않게됩니다. 하지만 채널을 사용하는 파이프라인 구현 방식은 멀티 코어 환경에서 Dispatchers.Default 컨텍스트를 사용하여 동시 수행의 이점을 누릴 수 있습니다.

앞서 설명한 예제를 채널을 이용한 구현에서 buildIterator를 이용한 구현으로 변경해보면 다음과 같습니다.

어쨌든 두 가지 방식 모두 소수를 찾기 위한 방법으로는 실용적이지 못합니다. 보통 파이프라인을 사용할 경우 다른 중단 호출들과 함께 작업을 수행하게 되는데(원격 서비스로의 비동기 호출 같은) 이러한 파이프라인의 기능은buildSquence 나 buildIterator 같은 것들로는 만들어 낼 수 없습니다. 그 이유는 이것들은 비동기를 전반적으로 지원하는 produce { }와는 달리 임의의 시점에 실행 중단을 허용하지 않기 때문입니다.

Fan-out

하나의 채널로 부터 두개 이상의 수신 코루틴들이 데이터를 분배하여 수신 받을 수 있습니다.

produceNumbers{}는 주기적으로 초당 10개의 정수를 생산해 내는 프로듀서 입니다. 그리고 launchProcessor{} 프로세서 코루틴은 그들의 id 와 수신받은 정수를 출력하는 동작만 수행하는 코루틴이며 main 함수에서 5개 생성 됩니다. 이를 실행하면 다음과 유사한 결과를 얻습니다.

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

예제에서 마지막에 producer 코루틴이 자신의 채널을 닫고있는데, 이는 이 채널을 순회(iteration)하고 있는 프로세서 코루틴들이 종료되도록 만듭니다.

또한 프로세서 코루틴은 consumeEach 확장 함수가 아니라 for-loop 으로 채널을 순회하고 있는데, 이것은 지금과 같이 코루틴이 두개 이상 함께 사용될 때 유용한 방법입니다. for-loop 을 사용하면 프로세서 코루틴들 중에 하나가 실패하더라도 나머지 프로세서 코루틴들은 연산을 이어나가게 만들 수 있습니다. 반대로 consumeEach 확장 함수를 이용했다면 각 프로세서 코루틴들은 다른 프로세서 코루틴의 정상 종료 혹은 비정상 종료에 대해서 취소 이벤트를 전파 받고 모두 종료될 것입니다.

위 예제는 프로세서 코루틴의 채널 순회 방식을 consumeEach 로 바꾸고, 메인 함수에서 프로세서 코루틴 중 하나가 200ms 후에 취소되도록 한 것입니다. 이를 실행하면 프로세서 하나가 취소됨에 따라 나머지 프로세서들도 취소되는 것을 확인할 수 있습니다.

Fan-In

두 개 이상의 코루틴들이 동일한 하나의 채널로 데이터를 전송할 수 있습니다. 예를들어 우리가 문자열 채널을 하나 가지고 있다고 가정해 봅시다. 그리고 일정 시간마다 특정 문자열을 반복적으로 이 채널로 전송하는 중단 함수가 있다고 생각해 봅시다.

이제, 우리가 문자열을 전송하는 코루틴 몇 개를 동시에 실행하면 어떻게 동작 하는지 다음 예제를 통해 살펴봅시다. (예제에서는 코루틴들을 메인 코루틴의 자식 코루틴으로 만들고 메인스레드 컨텍스트에서 수행되도록 합니다.)

<결과>

Foo
Foo
Bar
Foo
Foo
Bar

버퍼가 있는 채널 (Buffered channels)

지금까지 살펴본 채널들은 버퍼가 없습니다. 버퍼가 없는 채널들은 송신자와 수신자가 만나야만 데이터를 전달합니다 (랑데뷰). 만약 송신이 먼저 일어나면 수신이 일어날때까지 송신자는 중단 됩니다. 반대로 수신이 먼저 일어나면 송신이 일어날 때 까지 수신자는 중단 됩니다.

Channel<T>() 팩토리 함수와 produce { } 빌더는 모두 capacity 라는 버퍼 사이즈를 설정할 수 있는 옵셔널 파라미터를 가지고 있습니다. 버퍼는 송신자가 중단되기 전에 버퍼의 수용량 만큼 더 송신할 수 있도록 해줍니다. 최대 수용량이 지정된 BlockingQueue 와 유사하게 수용량의 최대치에 도달하면 송신자는 중단됩니다.

위 예제는 수용량이 4인 채널을 생성한 후 생성된 채널로 10개의 정수를 송신하는 코드 입니다.

<결과>

Try to send 0 : Done
Try to send 1 : Done
Try to send 2 : Done
Try to send 3 : Done
Try to send 4 :

수신하는 코루틴이 없기 때문에 5번째 송신에서 송신자가 중단되어 있음을 알 수 있습니다. (Done 이 출력되지 않음)

채널의 공정성 (Channels are fair)

2개 이상의 코루틴들이 하나의 채널로 송/수신을 수행한다면 실행 순서는 그 호출 순서에 따라 공정하게 할당되며 FIFO (First-In First-Out) 방식으로 스케쥴링 됩니다. 다시말해, 처음 receive() 를 호출한 코루틴이 데이터를 먼저 수신합니다. 다음 예제는 ping 코루틴과 pong 코루틴이 table 채널로부터 공유된 ball 오브젝트를 수신하는 예제입니다.

“ping” 코루틴이 먼저 시작했기 때문에 먼저 공을 수신합니다. 이 때, 주석의 내용처럼 delay() 함수를 지운다고 해서 그 순서가 변경되지는 않습니다. 왜냐하면 “ping” 코루틴이 송신하고 바로 수신으로 돌아간다고 하더라도 이미 “pong” 코루틴이 수신 대기 중이기 때문입니다.

(하지만 간혹 채널이 공정하지 못한 실행을 보이기도 하는데 이것은 사용되는 Executor 의 특성 때문입니다. 이슈 참고)

Ticker channels

Ticker channel 은 마지막 수신 이후에 지정된 지연 시간이 지나면 Unit 오브젝트를 송신하는 채널입니다. 이 자체로는 큰 실용성이 없을 수 있지만 시간 기반으로 무언가 처리해야 하는 파이프라인 같은 곳에서는 의미가 있을 수 있습니다.

이 채널을 만들기 위해서는 ticker() 팩토리 함수를 사용하면 되고, 더이상 필요가 없을 경우에는 RecieveChannel 에 cancel() 함수를 호출하면 됩니다.

ticker() 팩토리 함수를 이용해 채널을 생성하면 기본 TickerMode 가 FIXED_PERIOD 입니다. 이 모드는 수신자가 지연되는 것을 인지하고 지연이 발생하면 다음 송신을 그에 맞게 조절하여 데이터 발생을 지정된 지연 시간에 최대한 맞게 맞추어 줍니다.

<결과>

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: kotlin.Unit
Next element is ready in 100 ms: null
Consumer pauses for 300ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

반면, ticker 생성 시 다음과 같이 FIXED_DELAY 를 사용하면 다음의 결과와 같이 수신 후 지정한 지연 시간이 지나야 다음 데이터를 수신할 수 있습니다.

<결과>

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 300ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: null

--

--

No responses yet