목적
저번 글에 이어지는 글이다. 저번 글에서 스레드가 변경돼도 MDC를 유지하기 위해서 MDC를 스트림 콘텍스트에 저장해 두고, 로깅하기 직전에 스트림 콘텍스트에서 MDC를 꺼내와서 현재의 스레드 로컬에 주입해 주는 방법을 사용했다
하지만 위 방법을 로깅하기 직전에 매번 스트림 콘텍스트에서 MDC를 꺼내와서 현재의 스레드 로컬에 주입해야 하는 불편함이 있다. 이번 글에서는 이 문제를 해결해 보도록 하겠다
아래는 문제 해결을 위한 기본 코드이다. 아래 코드에서는 'Third' 로깅하기 직전 상위 스트림에서 스트림 콘텍스트에서 MDC를 꺼내와 스레드 로컬에 주입하는 작업을 빠트렸기 때문에 'Third' 로깅의 MDC 값이 null 인걸 확인할 수 있다
object ReactorMdc {
const val MDC_TRACE_ID_KEY = "traceId"
fun putMdcToContext(): Context {
val traceId = MDC.get(MDC_TRACE_ID_KEY) // MDC에서 traceId 가져오기
return if (traceId != null) {
Context.of(MDC_TRACE_ID_KEY, traceId)
} else {
Context.empty()
}
}
fun <T> withMdcLogging(signal: Signal<T>) {
signal.contextView.getOrEmpty<String>(MDC_TRACE_ID_KEY).ifPresent { traceId ->
MDC.put(MDC_TRACE_ID_KEY, traceId)
}
}
}
fun getData(): String? {
val traceId = java.util.UUID.randomUUID().toString()
MDC.put("traceId", traceId)
log("First")
return webClient.get()
.uri("/delay")
.retrieve()
.bodyToMono(String::class.java)
.doOnEach(ReactorMdc::withMdcLogging)
.doOnNext { log("Second") }
.publishOn(Schedulers.boundedElastic()) // here
.doOnNext { log("Third") }
.contextWrite { ReactorMdc.putMdcToContext() }
.block()
}
/*
결과
INFO 93600 --- [nio-8080-exec-1] c.p.application.WebClientContextService : First doOnNext (thread: http-nio-8080-exec-1, MDC: 29f5956e-b121-4476-bb71-e03081267be4)
INFO 93600 --- [ctor-http-nio-2] c.p.application.WebClientContextService : Second doOnNext (thread: reactor-http-nio-2, MDC: 29f5956e-b121-4476-bb71-e03081267be4)
INFO 93600 --- [oundedElastic-1] c.p.application.WebClientContextService : Third doOnNext (thread: boundedElastic-1, MDC: null)
*/
해결 1
해결을 위한 코드는 아래와 같다
@Configuration
class ReactorHookConfig {
private val MDC_HOOK = "mdcHook"
@PostConstruct
fun addHook() {
Hooks.onEachOperator(
MDC_HOOK,
Operators.lift { scannable, subscriber -> MdcContextLifter(subscriber) }
)
}
}
private class MdcContextLifter<T>(private val coreSubscriber: CoreSubscriber<T>) : CoreSubscriber<T> {
override fun onSubscribe(subscription: Subscription) {
coreSubscriber.onSubscribe(subscription)
}
override fun onNext(t: T) {
injectMdcFromContext(coreSubscriber.currentContext())
coreSubscriber.onNext(t)
}
override fun onError(throwable: Throwable) {
injectMdcFromContext(coreSubscriber.currentContext())
coreSubscriber.onError(throwable)
}
override fun onComplete() {
injectMdcFromContext(coreSubscriber.currentContext())
coreSubscriber.onComplete()
}
override fun currentContext(): Context {
return coreSubscriber.currentContext()
}
private fun injectMdcFromContext(context: Context?) {
context?.getOrEmpty<String>(MDC_TRACE_ID_KEY)?.ifPresent { traceId ->
MDC.put(MDC_TRACE_ID_KEY, traceId)
}
}
}
위의 코드를 작성하면, 아래 코드처럼 webclient 호출 로직에서 doOnEach를 통해 콘텍스트에서 MDC로 값을 추출하는 로직을 제거해도(2) MDC가 잘 전파되는 것을 확인할 수 있다
fun getData(): String? {
val traceId = java.util.UUID.randomUUID().toString()
MDC.put("traceId", traceId)
log("First")
return delayWebClient.get()
.uri("/delay")
.retrieve()
.bodyToMono(String::class.java)
// .doOnEach(ReactorMdc::withMdcLogging) // 2
.doOnNext { log("Second") }
.publishOn(Schedulers.boundedElastic())
.doOnNext { log("Third") }
.contextWrite { ReactorMdc.putMdcToContext() } // 1
.block()
}
/*
결과
2025-03-22T14:41:27.411+09:00 INFO 39196 --- [nio-8080-exec-2] c.p.application.WebClientContextService : First doOnNext (thread: http-nio-8080-exec-2, MDC: f9646191-dbe1-491e-b720-9f634d028181)
2025-03-22T14:41:28.351+09:00 INFO 39196 --- [ctor-http-nio-2] c.p.application.WebClientContextService : Second doOnNext (thread: reactor-http-nio-2, MDC: f9646191-dbe1-491e-b720-9f634d028181)
2025-03-22T14:41:28.354+09:00 INFO 39196 --- [oundedElastic-1] c.p.application.WebClientContextService : Third doOnNext (thread: boundedElastic-1, MDC: f9646191-dbe1-491e-b720-9f634d028181)
*/
어떻게 이런 일이 가능한 걸까? 이런 동작이 가능한 이유는 ReactorHookConfig에서 콘텍스트에서 MDC를 추출해서 스레드 로컬에 주입해 주는 역할을 하는 프락시를 정의하고 hook으로 등록해 줬기 때문이다. 리액터에서 Publisher와 Subscriber는 보통 아래의 그림처럼 동작한다
publisher에서 subscriber로 데이터를 전달하는데, 이때 publisher는 subscriber의 onNext 메서드를 호출해서 전달한다. 이 상황에서 ReactorHookConfig처럼 MDC를 추출해서 스레드 로컬에 주입하는 sbuscriber를 정의하고 hook에 등록해 주면 연결관계는 아래처럼 변하게 된다
위 그림처럼 스트림의 모든 연산 사이사이에 MDC를 추출해서 스레드 로컬에 주입하는 프락시가 만들어지게 되고, 자연스럽게 MDC가 전파되게 되는 것이다. Flux -> map -> filter -> subscriber와 같은 스트림이 있으면 Flux -> proxy -> map -> proxy -> filter -> proxy -> subscriber와 같은 형태가 되는 것이다
이런 프락시의 기능은 MDC 전파 말고도 다양한 방법으로 활용할 수 있으니 알고 있으면 좋다
추가적으로 살펴보기. Operators.lift 는 뭘까?
Operators.lift는 (Scannable, CoreSubscriber) -> CoreSubscriber 형태의 함수를 입력으로 받고, (Publisher) -> Pbulisher 형태의 함수를 반환하는 함수이다. 즉 함수를 입력으로 넣고 함수를 반환하는 함수이다. 그리고 Operators.lift를 통해 만들어진 함수는 아래와 같은 함수에 적용될 수 있다
Flux.transform(Function)
Mono.transform(Function)
Hooks.onEachOperator(Function)
Hooks.onLastOperator(Function)
우리가 실제 로직을 넣을 수 있는 부분은 Operators.lift의 입력인 (Scannable, CoreSubscriber) -> CoreSubscriber 형태의 함수이다. (Scannable, CoreSubscriber)의 CoreSubscriber는 기존의 subscriber이고 반환되는 CoreSubscriber는 우리가 원하는 기능을 가지면서 기존 subscriber를 감싸고 있는 프락시 subscriber라고 생각하면 이해하기 쉬울 것이다
기존 subscriber를 건들지 않으면서도 프락시를 통해 기능을 추가하고 싶을 때 그 기능을 (Scannable, CoreSubscriber) -> CoreSubscriber와 같은 형태로 정의한다. 이 함수를 A라 하겠다. 그리고 A함수를 적용해야 하는데, Flux.trasform / Mono.transform / Hooks.onEachOperator / Hooks.onLastOperator와 같은 함수는 인자로 (Pubisher) -> Publisher형태의 함수를 받는다. 따라서 A함수를 (Pubisher) -> Publisher형태로 바꿔줄 필요가 있는데 그 기능을 하는 것이 Operators.lift 함수이다
Operators.lift는 결국 (Scannable, CoreSubscriber) -> CoreSubscriber를 (Pubisher) -> Publisher의 형태로 바꿔주는 함수이다. 이때 (Pubisher) -> Publisher는 publishe.1과 subscriber.1 사이에 구독이 발생해서 연결이 지어질 때 publisher.1 <-> subscriber.1이 바로 연결되도록 하지 않고 publisher.1 <-> proxy subscriber.1 <-> subscriber.1과 같은 형태로 연결되도록 한다. 이것이 기본 콘셉트이다
'WEB개발 > 백엔드' 카테고리의 다른 글
Spring boot 에서 종료되지 않은 스레드 모니터링 하기 (0) | 2025.05.05 |
---|---|
코루틴 withTimeoutOrNull과 Webclient 에 대해서 (2) | 2025.04.06 |
코루틴 내부에서 예외가 발생했을 때 (0) | 2025.03.08 |
코루틴의 async 내에서 예외 발생 시 동작 (0) | 2025.03.03 |
TransactionalEventListener 정리 (0) | 2025.02.23 |