목록으로 돌아가기

WebFlux에서 Coroutine으로

2025년 12월 20일
2개 태그
coroutine
kotlin

서론

Spring WebFlux는 리액티브 프로그래밍을 위한 강력한 프레임워크이지만, Reactor의 Mono와 Flux를 사용하는 것은 학습 곡선이 높고 코드가 복잡해질 수 있습니다. Kotlin Coroutine을 사용하면 더 직관적이고 읽기 쉬운 비동기 코드를 작성할 수 있습니다.

이 글에서는 실제 프로젝트에서 WebFlux (Reactor)를 Coroutine으로 마이그레이션한 경험을 공유합니다.


1. 왜 Coroutine으로 마이그레이션했나?

WebFlux (Reactor)의 한계

// WebFlux 방식
@GetMapping("/{userId}/balance")
fun getCreditBalance(@PathVariable userId: String): Mono<ResponseEntity<CreditBalanceResponse>> {
    return creditUseCase.getCreditBalance(userId)
        .map { balance -> ResponseEntity.ok(CreditBalanceResponse.from(balance)) }
        .onErrorResume { e ->
            when (e) {
                is CreditNotFoundException -> Mono.just(ResponseEntity.notFound().build())
                else -> Mono.error(e)
            }
        }
}

문제점:

  • 체이닝이 길어질수록 가독성 저하
  • 예외 처리 복잡
  • 디버깅 어려움
  • Kotlin의 suspend 함수와 자연스럽게 통합되지 않음

Coroutine의 장점

// Coroutine 방식
@GetMapping("/{userId}/balance")
suspend fun getCreditBalance(@PathVariable userId: String): ResponseEntity<CreditBalanceResponse> {
    val balance = creditUseCase.getCreditBalance(userId)
    return ResponseEntity.ok(CreditBalanceResponse.from(balance))
}

장점:

  • ✅ 동기 코드처럼 읽기 쉬움
  • ✅ 예외 처리가 직관적 (try-catch 사용 가능)
  • ✅ 디버깅이 쉬움
  • ✅ Kotlin의 언어 기능과 자연스럽게 통합

2. 마이그레이션 전후 비교

Controller 레이어

Before: WebFlux (Reactor)

@RestController
@RequestMapping("/credits")
class CreditController(
    private val creditUseCase: CreditUseCase,
) {
    @GetMapping("/{userId}/balance")
    fun getCreditBalance(@PathVariable userId: String): Mono<ResponseEntity<CreditBalanceResponse>> {
        return creditUseCase.getCreditBalance(userId)
            .map { balance -> ResponseEntity.ok(CreditBalanceResponse.from(balance)) }
            .onErrorResume { e ->
                when (e) {
                    is CreditNotFoundException ->
                        Mono.just(ResponseEntity.notFound().build())
                    else -> Mono.error(e)
                }
            }
    }

    @PostMapping("/{userId}/issue")
    fun issueCredit(
        @PathVariable userId: String,
        @RequestBody request: IssueCreditRequest,
    ): Mono<ResponseEntity<CreditBalanceResponse>> {
        return creditUseCase.issueCredit(
            userId = userId,
            amount = request.amount,
            reason = request.reason,
            description = request.description,
        )
            .map { balance ->
                ResponseEntity.status(HttpStatus.CREATED)
                    .body(CreditBalanceResponse.from(balance))
            }
            .onErrorResume { e ->
                when (e) {
                    is InvalidAmountException ->
                        Mono.just(ResponseEntity.badRequest().build())
                    else -> Mono.error(e)
                }
            }
    }
}

After: Coroutine

@RestController
@RequestMapping("/credits")
class CreditController(
    private val creditUseCase: CreditUseCase,
) {
    @GetMapping("/{userId}/balance")
    suspend fun getCreditBalance(
        @PathVariable userId: String,
    ): ResponseEntity<CreditBalanceResponse> {
        val balance = creditUseCase.getCreditBalance(userId)
        return ResponseEntity.ok(CreditBalanceResponse.from(balance))
    }

    @PostMapping("/{userId}/issue")
    suspend fun issueCredit(
        @PathVariable userId: String,
        @Valid @RequestBody request: IssueCreditRequest,
    ): ResponseEntity<CreditBalanceResponse> {
        val balance = creditUseCase.issueCredit(
            userId = userId,
            amount = request.amount,
            reason = request.reason,
            description = request.description,
        )
        return ResponseEntity.status(HttpStatus.CREATED)
            .body(CreditBalanceResponse.from(balance))
    }
}

개선점:

  • 코드가 50% 이상 간결해짐
  • 예외 처리는 @ControllerAdvice에서 일관되게 처리 가능
  • 가독성과 유지보수성 향상

Service 레이어

Before: WebFlux (Reactor)

@Service
class CreditService(
    private val creditRepository: CreditRepository,
    private val transactionalOperator: TransactionalOperator,
) {
    fun getCreditBalance(userId: String): Mono<CreditBalance> {
        return creditRepository.findByUserId(userId)
            .switchIfEmpty(Mono.error(ExceptionMessages.creditNotFound(userId)))
    }

    fun issueCredit(
        userId: String,
        amount: Int,
        reason: String?,
        description: String?,
    ): Mono<CreditBalance> {
        return getOrCreateBalance(userId)
            .flatMap { balance ->
                balance.issue(amount)
                processTransaction(
                    balance = balance,
                    amount = amount,
                    transactionType = TransactionType.ISSUED,
                    userId = userId,
                    reason = reason,
                    description = description,
                )
            }
            .retryWhen(Retry.backoff(3, Duration.ofMillis(100))
                .filter { it is OptimisticLockingFailureException })
    }

    private fun processTransaction(
        balance: CreditBalance,
        amount: Int,
        transactionType: TransactionType,
        userId: String,
        reason: String?,
        description: String?,
    ): Mono<CreditBalance> {
        return transactionalOperator.transactional(
            creditRepository.save(balance)
                .flatMap { savedBalance ->
                    val transaction = CreditTransaction(...)
                    creditRepository.saveTransaction(transaction)
                        .then(Mono.just(savedBalance))
                }
        )
    }
}

After: Coroutine

@Service
class CreditService(
    private val creditRepository: CreditRepository,
    private val transactionalOperator: TransactionalOperator,
) : CreditUseCase {
    override suspend fun getCreditBalance(userId: String): CreditBalance {
        return creditRepository.findByUserId(userId)
            ?: throw ExceptionMessages.creditNotFound(userId)
    }

    override suspend fun issueCredit(
        userId: String,
        amount: Int,
        reason: String?,
        description: String?,
    ): CreditBalance {
        return processTransactionWithRetry(
            userId = userId,
            amount = amount,
            transactionType = TransactionType.ISSUED,
            referenceId = null,
            reason = reason,
            description = description,
            getBalanceStrategy = { getOrCreateBalance(userId) },
        )
    }

    private suspend fun processTransactionWithRetry(...): CreditBalance {
        var lastException: OptimisticLockingFailureException? = null
        var currentDelay = INITIAL_RETRY_DELAY_MS

        for (attempt in 1..MAX_RETRY_ATTEMPTS) {
            try {
                val balance = getBalanceStrategy()
                return processTransaction(...)
            } catch (e: OptimisticLockingFailureException) {
                lastException = e
                if (attempt == MAX_RETRY_ATTEMPTS) throw e

                val jitter = Random.nextLong(
                    (currentDelay * 0.75).toLong(),
                    (currentDelay * 1.25).toLong(),
                )
                delay(jitter.coerceAtMost(MAX_RETRY_DELAY_MS))
                currentDelay *= 2
            }
        }
        throw lastException ?: IllegalStateException(...)
    }

    private suspend fun processTransaction(...): CreditBalance {
        when (transactionType) {
            TransactionType.ISSUED -> balance.issue(amount)
            TransactionType.DEDUCTED -> balance.deduct(amount)
            TransactionType.REFUNDED -> balance.refund(amount)
        }

        return withTransaction(transactionalOperator) {
            val savedBalance = creditRepository.save(balance)
            val transaction = CreditTransaction(...)
            creditRepository.saveTransaction(transaction)
            savedBalance
        }
    }
}

개선점:

  • 재시도 로직이 더 명확하고 제어 가능
  • 예외 처리가 직관적 (try-catch)
  • 코드 흐름이 순차적으로 읽힘

Repository Adapter 레이어

Before: WebFlux (Reactor)

@Component
class CreditRepositoryAdapter(
    private val creditBalanceR2dbcRepository: CreditBalanceR2dbcRepository,
) : CreditRepository {
    override fun findByUserId(userId: String): Mono<CreditBalance> {
        return creditBalanceR2dbcRepository.findByUserId(userId)
            .map { it.toDomain() }
    }

    override fun save(balance: CreditBalance): Mono<CreditBalance> {
        val entity = CreditBalanceEntity.from(balance)
        return creditBalanceR2dbcRepository.save(entity)
            .map { it.toDomain() }
    }
}

After: Coroutine

@Component
class CreditRepositoryAdapter(
    private val creditBalanceR2dbcRepository: CreditBalanceR2dbcRepository,
) : CreditRepository {
    override suspend fun findByUserId(userId: String): CreditBalance? {
        return creditBalanceR2dbcRepository.findByUserId(userId)
            .map { it.toDomain() }
            .awaitSingleOrNull()
    }

    override suspend fun save(balance: CreditBalance): CreditBalance {
        val entity = CreditBalanceEntity.from(balance)
        return creditBalanceR2dbcRepository.save(entity)
            .map { it.toDomain() }
            .awaitSingle()
    }
}

핵심:

  • R2DBC Repository는 여전히 Mono/Flux를 반환
  • awaitSingle()awaitSingleOrNull()로 코루틴으로 변환
  • kotlinx-coroutines-reactor 의존성 필요

3. 의존성 설정

build.gradle.kts

dependencies {
    // WebFlux (여전히 필요 - R2DBC가 Reactor 기반)
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")

    // Coroutine 지원
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
}

중요:

  • kotlinx-coroutines-reactor: Mono/Flux를 코루틴으로 변환
  • WebFlux는 여전히 필요 (R2DBC가 Reactor 기반이므로)

4. 트랜잭션 처리

문제: TransactionalOperator는 Mono 기반

R2DBC의 TransactionalOperator는 Mono를 기대하지만, 우리는 코루틴을 사용하고 있습니다.

해결: Bridge 함수 작성

package com.sclass.supportersservice.adapter.outbound.persistence

import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.mono
import org.springframework.transaction.reactive.TransactionalOperator

/**
 * 코루틴에서 트랜잭션을 처리하는 헬퍼 함수
 */
suspend inline fun <T> withTransaction(
    transactionalOperator: TransactionalOperator,
    crossinline block: suspend () -> T,
): T {
    return transactionalOperator.transactional(
        mono {
            // 이 블록은 코루틴 컨텍스트에서 실행됨
            block()
        },
    ).awaitSingle()
}

사용 예시

private suspend fun processTransaction(...): CreditBalance {
    // 도메인 로직
    when (transactionType) {
        TransactionType.ISSUED -> balance.issue(amount)
        TransactionType.DEDUCTED -> balance.deduct(amount)
        TransactionType.REFUNDED -> balance.refund(amount)
    }

    // 트랜잭션으로 묶기
    return withTransaction(transactionalOperator) {
        val savedBalance = creditRepository.save(balance)
        val transaction = CreditTransaction(...)
        creditRepository.saveTransaction(transaction)
        savedBalance
    }
}

5. Mono/Flux → Coroutine 변환

주요 변환 함수

Reactor 타입Coroutine 변환설명
Mono<T>.awaitSingle()단일 값 반환, null 불가
Mono<T?>.awaitSingleOrNull()단일 값 반환, null 가능
Flux<T>.collectList().awaitSingle()리스트로 수집
Flux<T>.toList().awaitSingle()리스트로 수집 (동일)

실제 사용 예시

// Mono → suspend
override suspend fun findByUserId(userId: String): CreditBalance? {
    return creditBalanceR2dbcRepository.findByUserId(userId)
        .map { it.toDomain() }
        .awaitSingleOrNull()  // Mono → 코루틴
}

// Flux → suspend (List)
override suspend fun findTransactionsByUserId(
    userId: String,
    page: Int,
    size: Int,
): List<CreditTransaction> {
    val offset = page.toLong() * size
    return creditTransactionR2dbcRepository
        .findByUserIdOrderByCreatedAtDesc(userId, size, offset)
        .map { it.toDomain() }
        .collectList()  // Flux → Mono<List>
        .awaitSingle()  // Mono → 코루틴
}

6. 예외 처리 개선

Before: WebFlux

fun getCreditBalance(userId: String): Mono<CreditBalance> {
    return creditRepository.findByUserId(userId)
        .switchIfEmpty(Mono.error(CreditNotFoundException(userId)))
        .onErrorMap { e ->
            when (e) {
                is OptimisticLockingFailureException ->
                    TransactionConflictException(e)
                else -> e
            }
        }
}

After: Coroutine

suspend fun getCreditBalance(userId: String): CreditBalance {
    return try {
        creditRepository.findByUserId(userId)
            ?: throw CreditNotFoundException(userId)
    } catch (e: OptimisticLockingFailureException) {
        throw TransactionConflictException(e)
    }
}

또는 Controller에서 일관된 예외 처리:

@ControllerAdvice
class GlobalExceptionHandler {
    @ExceptionHandler(CreditNotFoundException::class)
    suspend fun handleCreditNotFound(
        e: CreditNotFoundException,
    ): ResponseEntity<ErrorResponse> {
        return ResponseEntity.status(HttpStatus.NOT_FOUND)
            .body(ErrorResponse.from(e))
    }
}

7. 재시도 로직 개선

Before: WebFlux (Reactor Retry)

fun issueCredit(...): Mono<CreditBalance> {
    return processTransaction(...)
        .retryWhen(
            Retry.backoff(3, Duration.ofMillis(100))
                .filter { it is OptimisticLockingFailureException }
                .doBeforeRetry {
                    logger.warn("Retrying... attempt:${it.totalRetries() + 1}")
                }
        )
}

문제점:

  • 지수 백오프 커스터마이징 어려움
  • 재시도 간격 제어 제한적
  • Jitter 추가 복잡

After: Coroutine

private suspend fun processTransactionWithRetry(...): CreditBalance {
    var lastException: OptimisticLockingFailureException? = null
    var currentDelay = INITIAL_RETRY_DELAY_MS

    for (attempt in 1..MAX_RETRY_ATTEMPTS) {
        try {
            val balance = getBalanceStrategy()
            return processTransaction(...)
        } catch (e: OptimisticLockingFailureException) {
            lastException = e

            if (attempt == MAX_RETRY_ATTEMPTS) {
                logger.error(
                    "Optimistic lock failure after {} attempts. UserId: {}, Amount: {}",
                    MAX_RETRY_ATTEMPTS, userId, amount, e
                )
                throw e
            }

            // Exponential backoff with jitter
            val jitter = Random.nextLong(
                (currentDelay * 0.75).toLong(),
                (currentDelay * 1.25).toLong(),
            )
            val delayWithJitter = jitter.coerceAtMost(MAX_RETRY_DELAY_MS)

            logger.warn(
                "Optimistic lock failure detected, retrying... " +
                    "UserId: {}, Attempt: {}/{}",
                userId, attempt + 1, MAX_RETRY_ATTEMPTS
            )

            delay(delayWithJitter)
            currentDelay *= 2
        }
    }

    throw lastException ?: IllegalStateException(...)
}

개선점:

  • ✅ 완전한 제어 (지수 백오프 + Jitter)
  • ✅ 재시도 로직이 명확함
  • ✅ 로깅이 쉬움
  • ✅ 최대 지연 시간 제한 가능

8. 성능 및 호환성

성능

  • 동일한 성능: Coroutine도 논블로킹이므로 성능 차이 없음
  • 오버헤드 최소awaitSingle()은 단순 변환일 뿐

호환성

댓글 (0)

댓글 수정 시 필요합니다. 최소 4자 이상 입력해주세요.

아직 댓글이 없습니다. 첫 번째 댓글을 작성해보세요!