코루틴 flow 란?
순차적으로 데이터를 방출하는 비동기적 스트림이며 정상적으로 끝나거나 예외와 함께 끝난다고 주석에서 설명한다
순차적이면서 비동기라는 건 무슨 의미일까? 아래의 코드를 통해 이해 해봤다
runBlocking {
flow {
emit("Start")
delay(500) // 비동기 대기
emit("Middle")
delay(500) // 비동기 대기
emit("End")
}.collect { value ->
println("Collected: $value at ${System.currentTimeMillis()}")
}
}
위 코드는 "Start", "Middle", "End" 가 순차적으로 출력된다. 하지만 코루틴 기반이기 때문에 delay 함수가 호출될 때 실행하던 스레드를 block 하지 않는다. 이런점에서 flow 가 "순차적" 이고 "비동기적" 이라고 한다. delay 를 호출한 스레드는 block 되지 않기 때문에 다른 코드를 실행할 수 있다.
코루틴 flow 는 언제 사용해야 할까?
코루틴 flow 는 크게 아래 2가지 경우에 사용할 수 있을 것 같다
- 여러 I/O 요청 (외부 API, DB) 스트림을 순차적으로 처리할 때
- 채팅, 영상 시스템
- Reactor 의 Flux, Mono 객체를 코루틴에서 사용할 수 있는 객체로 변환할 때
코루틴 flow 사용할 때 주의점
- 코루틴 내부에서 flow 빌더를 정의해야 한다
- 업스트림과 다운스트림 (terminal operator) 에서 예외처리하는 방법이 다르다. 전자는 catch operator 를 사용하고 후자는 try-catch 구문을 사용한다
- 예외가 발생하면 emit 은 중지된다
- 여러 코루틴에서 병렬적으로 값을 emit 하려면 channelFlow 를 사용해야 한다
Context preservation 과 Exception transparency 특징에 대해서
Flow 인터페이스 구현체는 반드시 두 가지 특징을 만족시켜야 하는데 그것이 바로 "context preservation" 과 "exception transparency" 이다. 이런 특징은 flow 를 사용하여 개발할 때 upstream 에 대한 세부내용을 몰라도 downstream 의 내용을 개발 가능하도록 한다. 두 가지 특징에 대해 각각 살펴보고 이 의미에 대해 다시 해석해보도록 하겠다
context preservation:
flow 는 자신의 실행 컨텍스트를 캡슐화하며, downstream 으로 노출하거나 전파하지 않는다
좀 더 자세히 설명하면 flow 는 메소드 체인에 flowOn 이 없다면 collect 를 포함한 terminal operator 가 호출된 코루틴 컨텍스트를 따르며, 메소드 체인에 flowOn 이 있다면 flowOn 기준으로 업스트림은 flowOn 에 정의한 컨텍스트를 따르고 그 아래는 terminarl operator 가 호출된 컨텍스트를 따른다
runBlocking {
val flowA = flowOf(1, 2, 3)
.map { it + 1 }
.flowOn(newContext) // Changes the upstream context
val filtered = flowA.filter { it == 3 }
withContext(Dispatchers.Main) {
val result = filtered.single()
}
}
이러한 특징 덕분에 외부에서 정의한 flow 를 가져와서 사용할 때 flow 의 down stream 부분은 우리가 정의한 코루틴 컨텍스트에서 실행시킬 수 있다
추가적으로 context preservation 을 지키기 위해서 flow { } 같은 flow builder 내부에 코루틴 컨텍스트를 사용하는 것을 금지한다. 이렇게 하면 terminal operator 를 호출한 코루틴 컨텍스트와 전혀 다른 코루틴 컨텍스트에 실행될 위험이 있기 때문으로 추측된다
Exception Transparency:
(확실하지 않음) 예외 투명성이란 terminal operator 에서 던져진 예외는 try-catch 블록을 통해 예외를 처리해야하며, terminal operator 위의 업스트림에서 발생하는 예외는 try-catch 블록과 같은 외부에 방출되면 안된다는 의미이다
이런 exception transparency 성질을 지키기 위해 catch operator 가 존재하며, catch operator 의 업스트림에서 발생하는 예외는 catch operator 에 의해 처리된다
catch operator 내부에서는 예외를 다시 던지거나, 값을 방출하거나, 로깅등을 할 수 있다
이러한 특징 덕분에 외부에서 정의한 flow 를 가져와서 사용할 때도 외부 flow 가 던지는 예외를 캡슐화된 상태로 사용할 수 있을 것 이라고 추측한다
runBlocking {
try {
flow {
emit(1)
emit(2)
throw Exception("Upstream failure") // This is an upstream exception
}
.map { value ->
if (value == 2) throw Exception("Mapping error") // Another upstream exception
value
}
.catch { e ->
println("Caught upstream exception: $e") // Catches upstream exceptions (Mapping error)
}
.collect { value ->
println(value) // print only 1
}
} catch (e: Exception) {
println("Caught downstream exception: $e") // Catches downstream exceptions
}
}
참고
'WEB개발 > 백엔드' 카테고리의 다른 글
TransactionalEventListener 정리 (0) | 2025.02.23 |
---|---|
플링크 공식문서 정리-Stateful Stream Processing (0) | 2025.02.16 |
Spring MVC & WebClient 에서 MDC 로깅하기 1 (1) | 2025.02.09 |
코루틴 flow 컨텍스트 보존해야 하는 이유 (0) | 2025.02.02 |
토스 서버기술 용어 정리 (0) | 2025.01.26 |