목록으로 돌아가기
WebFlux에서 Coroutine으로
2025년 12월 20일
2개 태그
coroutine
kotlin
서론
Spring WebFlux는 리액티브 프로그래밍을 위한 강력한 프레임워크이지만, Reactor의 Mono와 Flux를 사용하는 것은 학습 곡선이 높고 코드가 복잡해질 수 있습니다. Kotlin Coroutine을 사용하면 더 직관적이고 읽기 쉬운 비동기 코드를 작성할 수 있습니다.
이 글에서는 실제 프로젝트에서 WebFlux (Reactor)를 Coroutine으로 마이그레이션한 경험을 공유합니다.
1. 왜 Coroutine으로 마이그레이션했나?
WebFlux (Reactor)의 한계
// WebFlux 방식 @GetMapping("/{userId}/balance") fun getCreditBalance(@PathVariable userId: String): Mono<ResponseEntity<CreditBalanceResponse>> { return creditUseCase.getCreditBalance(userId) .map { balance -> ResponseEntity.ok(CreditBalanceResponse.from(balance)) } .onErrorResume { e -> when (e) { is CreditNotFoundException -> Mono.just(ResponseEntity.notFound().build()) else -> Mono.error(e) } } }
문제점:
- 체이닝이 길어질수록 가독성 저하
- 예외 처리 복잡
- 디버깅 어려움
- Kotlin의
suspend함수와 자연스럽게 통합되지 않음
Coroutine의 장점
// Coroutine 방식 @GetMapping("/{userId}/balance") suspend fun getCreditBalance(@PathVariable userId: String): ResponseEntity<CreditBalanceResponse> { val balance = creditUseCase.getCreditBalance(userId) return ResponseEntity.ok(CreditBalanceResponse.from(balance)) }
장점:
- ✅ 동기 코드처럼 읽기 쉬움
- ✅ 예외 처리가 직관적 (try-catch 사용 가능)
- ✅ 디버깅이 쉬움
- ✅ Kotlin의 언어 기능과 자연스럽게 통합
2. 마이그레이션 전후 비교
Controller 레이어
Before: WebFlux (Reactor)
@RestController @RequestMapping("/credits") class CreditController( private val creditUseCase: CreditUseCase, ) { @GetMapping("/{userId}/balance") fun getCreditBalance(@PathVariable userId: String): Mono<ResponseEntity<CreditBalanceResponse>> { return creditUseCase.getCreditBalance(userId) .map { balance -> ResponseEntity.ok(CreditBalanceResponse.from(balance)) } .onErrorResume { e -> when (e) { is CreditNotFoundException -> Mono.just(ResponseEntity.notFound().build()) else -> Mono.error(e) } } } @PostMapping("/{userId}/issue") fun issueCredit( @PathVariable userId: String, @RequestBody request: IssueCreditRequest, ): Mono<ResponseEntity<CreditBalanceResponse>> { return creditUseCase.issueCredit( userId = userId, amount = request.amount, reason = request.reason, description = request.description, ) .map { balance -> ResponseEntity.status(HttpStatus.CREATED) .body(CreditBalanceResponse.from(balance)) } .onErrorResume { e -> when (e) { is InvalidAmountException -> Mono.just(ResponseEntity.badRequest().build()) else -> Mono.error(e) } } } }
After: Coroutine
@RestController @RequestMapping("/credits") class CreditController( private val creditUseCase: CreditUseCase, ) { @GetMapping("/{userId}/balance") suspend fun getCreditBalance( @PathVariable userId: String, ): ResponseEntity<CreditBalanceResponse> { val balance = creditUseCase.getCreditBalance(userId) return ResponseEntity.ok(CreditBalanceResponse.from(balance)) } @PostMapping("/{userId}/issue") suspend fun issueCredit( @PathVariable userId: String, @Valid @RequestBody request: IssueCreditRequest, ): ResponseEntity<CreditBalanceResponse> { val balance = creditUseCase.issueCredit( userId = userId, amount = request.amount, reason = request.reason, description = request.description, ) return ResponseEntity.status(HttpStatus.CREATED) .body(CreditBalanceResponse.from(balance)) } }
개선점:
- 코드가 50% 이상 간결해짐
- 예외 처리는
@ControllerAdvice에서 일관되게 처리 가능 - 가독성과 유지보수성 향상
Service 레이어
Before: WebFlux (Reactor)
@Service class CreditService( private val creditRepository: CreditRepository, private val transactionalOperator: TransactionalOperator, ) { fun getCreditBalance(userId: String): Mono<CreditBalance> { return creditRepository.findByUserId(userId) .switchIfEmpty(Mono.error(ExceptionMessages.creditNotFound(userId))) } fun issueCredit( userId: String, amount: Int, reason: String?, description: String?, ): Mono<CreditBalance> { return getOrCreateBalance(userId) .flatMap { balance -> balance.issue(amount) processTransaction( balance = balance, amount = amount, transactionType = TransactionType.ISSUED, userId = userId, reason = reason, description = description, ) } .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) .filter { it is OptimisticLockingFailureException }) } private fun processTransaction( balance: CreditBalance, amount: Int, transactionType: TransactionType, userId: String, reason: String?, description: String?, ): Mono<CreditBalance> { return transactionalOperator.transactional( creditRepository.save(balance) .flatMap { savedBalance -> val transaction = CreditTransaction(...) creditRepository.saveTransaction(transaction) .then(Mono.just(savedBalance)) } ) } }
After: Coroutine
@Service class CreditService( private val creditRepository: CreditRepository, private val transactionalOperator: TransactionalOperator, ) : CreditUseCase { override suspend fun getCreditBalance(userId: String): CreditBalance { return creditRepository.findByUserId(userId) ?: throw ExceptionMessages.creditNotFound(userId) } override suspend fun issueCredit( userId: String, amount: Int, reason: String?, description: String?, ): CreditBalance { return processTransactionWithRetry( userId = userId, amount = amount, transactionType = TransactionType.ISSUED, referenceId = null, reason = reason, description = description, getBalanceStrategy = { getOrCreateBalance(userId) }, ) } private suspend fun processTransactionWithRetry(...): CreditBalance { var lastException: OptimisticLockingFailureException? = null var currentDelay = INITIAL_RETRY_DELAY_MS for (attempt in 1..MAX_RETRY_ATTEMPTS) { try { val balance = getBalanceStrategy() return processTransaction(...) } catch (e: OptimisticLockingFailureException) { lastException = e if (attempt == MAX_RETRY_ATTEMPTS) throw e val jitter = Random.nextLong( (currentDelay * 0.75).toLong(), (currentDelay * 1.25).toLong(), ) delay(jitter.coerceAtMost(MAX_RETRY_DELAY_MS)) currentDelay *= 2 } } throw lastException ?: IllegalStateException(...) } private suspend fun processTransaction(...): CreditBalance { when (transactionType) { TransactionType.ISSUED -> balance.issue(amount) TransactionType.DEDUCTED -> balance.deduct(amount) TransactionType.REFUNDED -> balance.refund(amount) } return withTransaction(transactionalOperator) { val savedBalance = creditRepository.save(balance) val transaction = CreditTransaction(...) creditRepository.saveTransaction(transaction) savedBalance } } }
개선점:
- 재시도 로직이 더 명확하고 제어 가능
- 예외 처리가 직관적 (try-catch)
- 코드 흐름이 순차적으로 읽힘
Repository Adapter 레이어
Before: WebFlux (Reactor)
@Component class CreditRepositoryAdapter( private val creditBalanceR2dbcRepository: CreditBalanceR2dbcRepository, ) : CreditRepository { override fun findByUserId(userId: String): Mono<CreditBalance> { return creditBalanceR2dbcRepository.findByUserId(userId) .map { it.toDomain() } } override fun save(balance: CreditBalance): Mono<CreditBalance> { val entity = CreditBalanceEntity.from(balance) return creditBalanceR2dbcRepository.save(entity) .map { it.toDomain() } } }
After: Coroutine
@Component class CreditRepositoryAdapter( private val creditBalanceR2dbcRepository: CreditBalanceR2dbcRepository, ) : CreditRepository { override suspend fun findByUserId(userId: String): CreditBalance? { return creditBalanceR2dbcRepository.findByUserId(userId) .map { it.toDomain() } .awaitSingleOrNull() } override suspend fun save(balance: CreditBalance): CreditBalance { val entity = CreditBalanceEntity.from(balance) return creditBalanceR2dbcRepository.save(entity) .map { it.toDomain() } .awaitSingle() } }
핵심:
- R2DBC Repository는 여전히
Mono/Flux를 반환 awaitSingle(),awaitSingleOrNull()로 코루틴으로 변환kotlinx-coroutines-reactor의존성 필요
3. 의존성 설정
build.gradle.kts
dependencies { // WebFlux (여전히 필요 - R2DBC가 Reactor 기반) implementation("org.springframework.boot:spring-boot-starter-webflux") implementation("org.springframework.boot:spring-boot-starter-data-r2dbc") // Coroutine 지원 implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3") }
중요:
kotlinx-coroutines-reactor: Mono/Flux를 코루틴으로 변환- WebFlux는 여전히 필요 (R2DBC가 Reactor 기반이므로)
4. 트랜잭션 처리
문제: TransactionalOperator는 Mono 기반
R2DBC의 TransactionalOperator는 Mono를 기대하지만, 우리는 코루틴을 사용하고 있습니다.
해결: Bridge 함수 작성
package com.sclass.supportersservice.adapter.outbound.persistence import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.mono import org.springframework.transaction.reactive.TransactionalOperator /** * 코루틴에서 트랜잭션을 처리하는 헬퍼 함수 */ suspend inline fun <T> withTransaction( transactionalOperator: TransactionalOperator, crossinline block: suspend () -> T, ): T { return transactionalOperator.transactional( mono { // 이 블록은 코루틴 컨텍스트에서 실행됨 block() }, ).awaitSingle() }
사용 예시
private suspend fun processTransaction(...): CreditBalance { // 도메인 로직 when (transactionType) { TransactionType.ISSUED -> balance.issue(amount) TransactionType.DEDUCTED -> balance.deduct(amount) TransactionType.REFUNDED -> balance.refund(amount) } // 트랜잭션으로 묶기 return withTransaction(transactionalOperator) { val savedBalance = creditRepository.save(balance) val transaction = CreditTransaction(...) creditRepository.saveTransaction(transaction) savedBalance } }
5. Mono/Flux → Coroutine 변환
주요 변환 함수
| Reactor 타입 | Coroutine 변환 | 설명 |
|---|---|---|
Mono<T> | .awaitSingle() | 단일 값 반환, null 불가 |
Mono<T?> | .awaitSingleOrNull() | 단일 값 반환, null 가능 |
Flux<T> | .collectList().awaitSingle() | 리스트로 수집 |
Flux<T> | .toList().awaitSingle() | 리스트로 수집 (동일) |
실제 사용 예시
// Mono → suspend override suspend fun findByUserId(userId: String): CreditBalance? { return creditBalanceR2dbcRepository.findByUserId(userId) .map { it.toDomain() } .awaitSingleOrNull() // Mono → 코루틴 } // Flux → suspend (List) override suspend fun findTransactionsByUserId( userId: String, page: Int, size: Int, ): List<CreditTransaction> { val offset = page.toLong() * size return creditTransactionR2dbcRepository .findByUserIdOrderByCreatedAtDesc(userId, size, offset) .map { it.toDomain() } .collectList() // Flux → Mono<List> .awaitSingle() // Mono → 코루틴 }
6. 예외 처리 개선
Before: WebFlux
fun getCreditBalance(userId: String): Mono<CreditBalance> { return creditRepository.findByUserId(userId) .switchIfEmpty(Mono.error(CreditNotFoundException(userId))) .onErrorMap { e -> when (e) { is OptimisticLockingFailureException -> TransactionConflictException(e) else -> e } } }
After: Coroutine
suspend fun getCreditBalance(userId: String): CreditBalance { return try { creditRepository.findByUserId(userId) ?: throw CreditNotFoundException(userId) } catch (e: OptimisticLockingFailureException) { throw TransactionConflictException(e) } }
또는 Controller에서 일관된 예외 처리:
@ControllerAdvice class GlobalExceptionHandler { @ExceptionHandler(CreditNotFoundException::class) suspend fun handleCreditNotFound( e: CreditNotFoundException, ): ResponseEntity<ErrorResponse> { return ResponseEntity.status(HttpStatus.NOT_FOUND) .body(ErrorResponse.from(e)) } }
7. 재시도 로직 개선
Before: WebFlux (Reactor Retry)
fun issueCredit(...): Mono<CreditBalance> { return processTransaction(...) .retryWhen( Retry.backoff(3, Duration.ofMillis(100)) .filter { it is OptimisticLockingFailureException } .doBeforeRetry { logger.warn("Retrying... attempt:${it.totalRetries() + 1}") } ) }
문제점:
- 지수 백오프 커스터마이징 어려움
- 재시도 간격 제어 제한적
- Jitter 추가 복잡
After: Coroutine
private suspend fun processTransactionWithRetry(...): CreditBalance { var lastException: OptimisticLockingFailureException? = null var currentDelay = INITIAL_RETRY_DELAY_MS for (attempt in 1..MAX_RETRY_ATTEMPTS) { try { val balance = getBalanceStrategy() return processTransaction(...) } catch (e: OptimisticLockingFailureException) { lastException = e if (attempt == MAX_RETRY_ATTEMPTS) { logger.error( "Optimistic lock failure after {} attempts. UserId: {}, Amount: {}", MAX_RETRY_ATTEMPTS, userId, amount, e ) throw e } // Exponential backoff with jitter val jitter = Random.nextLong( (currentDelay * 0.75).toLong(), (currentDelay * 1.25).toLong(), ) val delayWithJitter = jitter.coerceAtMost(MAX_RETRY_DELAY_MS) logger.warn( "Optimistic lock failure detected, retrying... " + "UserId: {}, Attempt: {}/{}", userId, attempt + 1, MAX_RETRY_ATTEMPTS ) delay(delayWithJitter) currentDelay *= 2 } } throw lastException ?: IllegalStateException(...) }
개선점:
- ✅ 완전한 제어 (지수 백오프 + Jitter)
- ✅ 재시도 로직이 명확함
- ✅ 로깅이 쉬움
- ✅ 최대 지연 시간 제한 가능
8. 성능 및 호환성
성능
- 동일한 성능: Coroutine도 논블로킹이므로 성능 차이 없음
- 오버헤드 최소:
awaitSingle()은 단순 변환일 뿐
호환성
댓글 (0)
아직 댓글이 없습니다. 첫 번째 댓글을 작성해보세요!