Pingu
영차영차! Backend

BULLMQ Delayed Job 톺아보기

2026년 2월 6일
9개 태그
BullMQ
Delayed Job
Redis
Event-driven
BZPOPMIN
Lua Script
TypeScript
Node.js
Job Queue

BULLMQ Delayed Job 톺아보기

들어가며

평소에 BullMQ를 사용하면서 delayed job을 자주 활용했습니다. queue.add('job', data, { delay: 5000 })처럼 간단하게 5초 후에 실행되는 job을 만들 수 있어서 정말 편리했죠.

그런데 문득 궁금해졌습니다.

"BullMQ의 delayed job이 실제로 어떻게 동작하는지 궁금한데, 진짜 workerHost에서 polling 방식으로 bullMQ를 사용하는 서버 내부에서 polling이 일어나는 방식으로 동작하는거야??"

많은 개발자들이 delayed job이 주기적으로 Redis를 체크하는 polling 방식으로 동작할 것이라고 생각합니다. 예를 들어, 1초마다 Redis에 "지금 처리할 수 있는 delayed job이 있나요?"라고 물어보는 방식 말이죠.

하지만 실제로는 완전히 다른 방식으로 동작합니다.

"Polling은 아닐 것 같은데, 그럼 도대체 Redis의 어떤 구조를 이용하는 거지?"

이런 궁금증이 생겨서 BullMQ의 소스코드를 직접 파헤쳐보게 되었습니다. Redis의 Sorted Set, blocking primitive, Lua 스크립트 등이 어떻게 조합되어서 이런 효율적인 delayed job 처리가 가능한지 확인해봤습니다.

이 글에서는 BullMQ의 실제 소스코드를 분석하여:

  1. Delayed job이 어떻게 저장되고 관리되는지
  2. Worker가 어떻게 delayed job을 감지하고 처리하는지
  3. Polling이 아닌 event-driven 방식의 동작 원리
  4. TypeScript 코드가 실제로 어떻게 실행되는지

까지 깊이 파고 들어봅니다.

핵심 답변: Polling이 아닙니다

결론부터 말하면, BullMQ의 delayed job은 polling 방식이 아닙니다.

대신 Redis의 blocking primitive인 BZPOPMIN을 사용하는 event-driven 방식으로 동작합니다. 이는 훨씬 더 효율적이고 정확한 방식입니다.

Polling vs Event-driven 비교

Polling vs Event-driven 비교

위 다이어그램은 Polling 방식과 Event-driven 방식의 차이를 보여줍니다. Polling은 주기적으로 체크하는 반면, Event-driven은 이벤트가 발생할 때만 반응합니다.

Delayed Job의 생명주기

Delayed job은 다음과 같은 생명주기를 거칩니다:

  1. Delayed: Redis Sorted Set에 저장 (timestamp 기반 정렬)
  2. Waiting: 시간이 되면 wait queue로 이동
  3. Active: Worker가 job을 가져와 처리 중
  4. Completed/Failed: 처리 완료 또는 실패

각 단계를 자세히 살펴보겠습니다.

1. Delayed Job 추가

await queue.add('job', { data: 'value' }, { delay: 5000 });

이 코드가 실행되면 내부적으로 다음과 같은 일이 발생합니다:

local function addDelayedJob(jobId, delayedKey, eventsKey, timestamp,
  maxEvents, markerKey, delay)

  local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))

  rcall("ZADD", delayedKey, score, jobId)
  rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
    "jobId", jobId, "delay", delayedTimestamp)

  -- mark that a delayed job is available
  addDelayMarkerIfNeeded(markerKey, delayedKey)
end

핵심 포인트:

  • Delayed job은 Redis의 **Sorted Set (ZSET)**에 저장됩니다
  • Score는 timestamp * 0x1000 + jobId 일부 비트로 계산됩니다
  • Marker key에 다음 delayed job의 timestamp가 추가됩니다

2. Worker가 Job을 가져오려고 시도

Worker의 mainLoop는 지속적으로 job을 가져오려고 시도합니다:

  private async mainLoop(client: RedisClient, bclient: RedisClient) {
    const asyncFifoQueue = new AsyncFifoQueue<void | Job<
      DataType,
      ResultType,
      NameType
    >>();

    let tokenPostfix = 0;

    while ((!this.closing && !this.paused) || asyncFifoQueue.numTotal() > 0) {
      /**
       * This inner loop tries to fetch jobs concurrently, but if we are waiting for a job
       * to arrive at the queue we should not try to fetch more jobs (as it would be pointless)
       */
      while (
        !this.closing &&
        !this.paused &&
        !this.waiting &&
        asyncFifoQueue.numTotal() < this._concurrency &&
        !this.isRateLimited()
      ) {
        const token = `${this.id}:${tokenPostfix++}`;

        const fetchedJob = this.retryIfFailed<void | Job<
          DataType,
          ResultType,
          NameType
        >>(() => this._getNextJob(client, bclient, token, { block: true }), {
          delayInMs: this.opts.runRetryDelay,
          onlyEmitError: true,
        });
        asyncFifoQueue.add(fetchedJob);

        if (this.waiting && asyncFifoQueue.numTotal() > 1) {
          // We are waiting for jobs but we have others that we could start processing already
          break;
        }

        // We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls
        // to Redis in high concurrency scenarios.
        const job = await fetchedJob;

        // No more jobs waiting but we have others that could start processing already
        if (!job && asyncFifoQueue.numTotal() > 1) {
          break;
        }

        // If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting
        // for processing this job.
        if (this.blockUntil) {
          break;
        }
      }

_getNextJob 메서드는 moveToActive Lua 스크립트를 호출합니다:

  protected async moveToActive(
    client: RedisClient,
    token: string,
    name?: string,
  ): Promise<Job<DataType, ResultType, NameType>> {
    const [jobData, id, rateLimitDelay, delayUntil] =
      await this.scripts.moveToActive(client, token, name);
    this.updateDelays(rateLimitDelay, delayUntil);

    return this.nextJobFromJobData(jobData, id, token);
  }

3. promoteDelayedJobs: 핵심 메커니즘

moveToActive Lua 스크립트는 가장 먼저 promoteDelayedJobs를 실행합니다:

-- Check if there are delayed jobs that we can move to wait.
local markerKey = KEYS[11]
promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1],
                   ARGV[2], KEYS[10], isPausedOrMaxed)

promoteDelayedJobs 함수의 구현:

-- Try to get as much as 1000 jobs at once
local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
                                  eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
    local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)

    if (#jobs > 0) then
        rcall("ZREM", delayedKey, unpack(jobs))

        for _, jobId in ipairs(jobs) do
            local jobKey = prefix .. jobId
            local priority =
                tonumber(rcall("HGET", jobKey, "priority")) or 0

            if priority == 0 then
                -- LIFO or FIFO
                rcall("LPUSH", targetKey, jobId)
            else
                local score = getPriorityScore(priority, priorityCounterKey)
                rcall("ZADD", prioritizedKey, score, jobId)
            end

            -- Emit waiting event
            rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
                  jobId, "prev", "delayed")
            rcall("HSET", jobKey, "delay", 0)
        end

        addBaseMarkerIfNeeded(markerKey, isPaused)
    end
end

동작 방식:

  1. 현재 timestamp 이하의 delayed job들을 ZSET에서 조회
  2. 해당 job들을 delayed ZSET에서 제거
  3. Priority에 따라 wait queue 또는 prioritized queue로 이동
  4. Marker를 업데이트하여 worker에게 알림

4. Blocking Wait: BZPOPMIN

Job이 없을 때 Worker는 어떻게 동작할까요? Polling을 할까요?

아니요. Redis의 BZPOPMIN blocking primitive를 사용합니다:

  private async waitForJob(
    bclient: RedisClient,
    blockUntil: number,
  ): Promise<number> {
    if (this.paused) {
      return Infinity;
    }

    let timeout: NodeJS.Timeout;
    try {
      if (!this.closing && !this.isRateLimited()) {
        let blockTimeout = this.getBlockTimeout(blockUntil);

        if (blockTimeout > 0) {
          blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
            ? blockTimeout
            : Math.ceil(blockTimeout);

          // We cannot trust that the blocking connection stays blocking forever
          // due to issues in Redis and IORedis, so we will reconnect if we
          // don't get a response in the expected time.
          timeout = setTimeout(
            async () => {
              bclient.disconnect(!this.closing);
            },
            blockTimeout * 1000 + 1000,
          );

          this.updateDelays(); // reset delays to avoid reusing same values in next iteration

          // Markers should only be used for un-blocking, so we will handle them in this
          // function only.
          const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
          if (result) {
            const [_key, member, score] = result;

            if (member) {
              const newBlockUntil = parseInt(score);
              // Use by pro version as rate limited groups could generate lower blockUntil values
              // markers only return delays for delayed jobs
              if (blockUntil && newBlockUntil > blockUntil) {
                return blockUntil;
              }
              return newBlockUntil;
            }
          }
        }

        return 0;
      }
    } catch (error) {
      if (isNotConnectionError(<Error>error)) {
        this.emit('error', <Error>error);
      }
      if (!this.closing) {
        await this.delay();
      }
    } finally {
      clearTimeout(timeout);
    }
    return Infinity;
  }

핵심 코드: 808번 라인

const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);

이것이 바로 polling이 아닌 event-driven 방식의 핵심입니다.

BZPOPMIN이란 무엇인가?

BZPOPMIN은 Redis의 blocking primitive 명령어입니다. 이름에서 알 수 있듯이:

  • B = Blocking (블로킹)
  • Z = Sorted Set (정렬된 집합)
  • POP = Pop (꺼내기)
  • MIN = Minimum (최소값)

BZPOPMIN의 정확한 문법

BZPOPMIN key [key ...] timeout

파라미터:

  • key: Sorted Set의 키 이름 (하나 이상 지정 가능)
  • timeout: Blocking wait 시간 (초 단위, 0이면 무한 대기)

반환값:

  • 요소가 있는 경우: [key, member, score] 배열
    • key: 요소를 pop한 Sorted Set의 키 이름
    • member: pop된 요소의 값 (member)
    • score: pop된 요소의 score 값
  • 요소가 없고 timeout이 지난 경우: nil

동작 단계:

  1. 지정된 Sorted Set들 중 가장 작은 score를 가진 요소를 찾습니다
  2. 요소가 있으면 즉시 pop하여 반환합니다
  3. 요소가 없으면 timeout 초 동안 blocking wait합니다
  4. Blocking 중 다른 클라이언트가 ZADD로 요소를 추가하면 즉시 반환합니다
  5. Timeout이 지나도 요소가 없으면 nil을 반환합니다

여러 키를 동시에 대기하기

BZPOPMIN은 여러 키를 동시에 지정할 수 있습니다:

# 여러 queue를 동시에 대기
127.0.0.1:6379> BZPOPMIN queue1 queue2 queue3 10

이 경우, 어느 키든 요소가 추가되면 즉시 반환됩니다. BullMQ는 주로 단일 marker key를 사용하지만, 이 기능을 활용하면 여러 queue를 효율적으로 처리할 수 있습니다.

반환값의 구조 이해하기

BullMQ에서 BZPOPMIN의 반환값을 어떻게 처리하는지 보겠습니다:

          const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
          if (result) {
            const [_key, member, score] = result;

            if (member) {
              const newBlockUntil = parseInt(score);
              // Use by pro version as rate limited groups could generate lower blockUntil values
              // markers only return delays for delayed jobs
              if (blockUntil && newBlockUntil > blockUntil) {
                return blockUntil;
              }
              return newBlockUntil;
            }
          }

반환값 처리:

  • result[0] (_key): 어떤 키에서 pop되었는지 (BullMQ에서는 marker key)
  • result[1] (member): Marker의 값 (보통 "1")
  • result[2] (score): 다음 delayed job의 timestamp (밀리초 단위)

핵심: score 값이 바로 다음에 처리할 수 있는 delayed job의 시간입니다. Worker는 이 값을 newBlockUntil로 사용하여 정확한 시간에 다시 깨어납니다.

BullMQ에서의 사용

BullMQ는 BZPOPMIN을 marker key에 사용합니다:

          const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
          if (result) {
            const [_key, member, score] = result;

            if (member) {
              const newBlockUntil = parseInt(score);
              // Use by pro version as rate limited groups could generate lower blockUntil values
              // markers only return delays for delayed jobs
              if (blockUntil && newBlockUntil > blockUntil) {
                return blockUntil;
              }
              return newBlockUntil;
            }
          }

동작 흐름:

다이어그램 로딩 중...

BZPOPMIN vs ZPOPMIN

명령어동작Blocking
ZPOPMIN가장 작은 score 요소 pop❌ 없음 (즉시 반환)
BZPOPMIN가장 작은 score 요소 pop✅ 있음 (요소 없으면 대기)

비교 예시:

# ZPOPMIN: 요소가 없으면 즉시 nil 반환
127.0.0.1:6379> ZPOPMIN myqueue
(nil)

# BZPOPMIN: 요소가 없으면 5초 동안 대기
127.0.0.1:6379> BZPOPMIN myqueue 5
(nil)  # 5초 후에도 요소가 없으면 nil 반환

# 다른 터미널에서 요소 추가
127.0.0.1:6379> ZADD myqueue 100 "job1"
(integer) 1

# BZPOPMIN이 즉시 반환됨
127.0.0.1:6379> BZPOPMIN myqueue 5
1) "myqueue"
2) "job1"
3) "100"

왜 BZPOPMIN을 사용하는가?

1. Polling의 문제점

만약 ZPOPMIN을 사용한다면 (polling 방식):

// 가상의 polling 방식 (비효율적)
while (true) {
  const result = await redis.zpopmin('marker:queue');
  if (result) {
    // job 처리
    break;
  }
  await sleep(1000); // 1초마다 체크
}

문제점:

  • CPU 자원 낭비 (지속적인 루프 실행)
  • 네트워크 트래픽 증가 (주기적 요청)
  • 정확한 타이밍 보장 어려움
  • Redis 부하 증가

2. BZPOPMIN의 장점

// 실제 BullMQ 방식 (효율적)
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
// Redis 서버에서 blocking wait
// Marker가 추가되면 즉시 반환

장점:

  • CPU 절약: 대기 중에는 리소스 사용 없음
  • 즉각적인 반응: Marker가 추가되면 즉시 깨어남
  • 네트워크 효율: 필요할 때만 통신
  • 정확한 타이밍: Redis 서버에서 직접 관리

Redis 서버 내부 동작 메커니즘

Redis 서버는 BZPOPMIN 요청을 받으면 다음과 같이 처리합니다:

1. 즉시 체크 단계

// Redis 내부 의사 코드
if (sortedSetHasElements(key)) {
    // 요소가 있으면 즉시 pop하여 반환
    return popMinElement(key);
}

2. Blocking Wait 단계

요소가 없을 때:

  1. 클라이언트를 blocking 리스트에 등록

    • Redis는 각 키별로 blocking 클라이언트 리스트를 유지합니다
    • 클라이언트의 소켓과 timeout 정보를 저장합니다
  2. 이벤트 루프에서 대기

    • Redis의 이벤트 루프는 다른 클라이언트의 명령어를 계속 처리합니다
    • Blocking 클라이언트는 CPU를 사용하지 않습니다
  3. 요소 추가 시 깨우기

    // 다른 클라이언트가 ZADD 실행 시
    when (ZADD key score member) {
        if (hasBlockingClients(key)) {
            // Blocking 클라이언트 중 하나를 깨움
            wakeUpBlockingClient(key);
            // 즉시 BZPOPMIN 반환
        }
    }

3. Timeout 처리

  • Redis는 각 blocking 클라이언트의 timeout을 추적합니다
  • Timeout이 지나면 nil을 반환하고 blocking 리스트에서 제거합니다

핵심 포인트:

  • CPU 효율성: Blocking 중에는 Redis 서버도 클라이언트도 CPU를 사용하지 않습니다
  • 즉각적인 반응: 요소가 추가되면 마이크로초 단위로 즉시 반환됩니다
  • 확장성: 수천 개의 Worker가 동시에 blocking wait해도 Redis 부하는 거의 없습니다

이것이 event-driven 방식의 핵심입니다. Redis 서버가 직접 이벤트를 감지하고 클라이언트를 깨웁니다.

실제 예시

Redis CLI에서 직접 확인해볼 수 있습니다:

# 터미널 1: BZPOPMIN으로 대기
127.0.0.1:6379> BZPOPMIN marker:queue 10
(nil)  # 10초 후에도 요소가 없으면 nil 반환

# 터미널 2: 다른 곳에서 marker 추가
127.0.0.1:6379> ZADD marker:queue 1000 "1"
(integer) 1

# 터미널 1: 즉시 반환됨!
127.0.0.1:6379> BZPOPMIN marker:queue 10
1) "marker:queue"
2) "1"
3) "1000"

위 예시에서 볼 수 있듯이, BZPOPMIN은 요소가 없을 때는 blocking wait하고, 요소가 추가되면 즉시 반환됩니다.

// Worker 1: Job이 없어서 대기 중
const worker1 = new Worker('my-queue', async (job) => {
  return processJob(job);
});
// 내부적으로 BZPOPMIN marker:queue 10 실행
// Redis 서버에서 blocking wait 중...

// Worker 2: Delayed job 추가
await queue.add('job', { data: 'value' }, { delay: 5000 });
// 내부적으로 ZADD marker:queue (timestamp, "1") 실행

// Worker 1: 즉시 깨어남!
// BZPOPMIN이 marker를 감지하고 반환
// Promise resolve → moveToActive 호출 → job 처리

타임아웃 처리의 세부사항

BullMQ는 delayed job의 다음 timestamp를 기반으로 정확한 timeout을 계산합니다:

  protected getBlockTimeout(blockUntil: number): number {
    const opts: WorkerOptions = <WorkerOptions>this.opts;

    // when there are delayed jobs
    if (blockUntil) {
      const blockDelay = blockUntil - Date.now();
      // when we reach the time to get new jobs
      if (blockDelay <= 0) {
        return blockDelay;
      } else if (blockDelay < this.minimumBlockTimeout * 1000) {
        return this.minimumBlockTimeout;
      } else {
        // We restrict the maximum block timeout to 10 second to avoid
        // blocking the connection for too long in the case of reconnections
        // reference: https://github.com/taskforcesh/bullmq/issues/1658
        return Math.min(blockDelay / 1000, maximumBlockTimeout);
      }
    } else {
      return Math.max(opts.drainDelay, this.minimumBlockTimeout);
    }
  }

타임아웃 계산 로직:

  1. Delayed job이 있는 경우:

    • blockDelay = nextTimestamp - 현재시간 (밀리초)
    • blockDelay <= 0: 이미 시간이 지났으므로 즉시 처리 (음수 반환)
    • blockDelay < minimumBlockTimeout: 최소 대기 시간 보장 (기본 1ms)
    • 그 외: blockDelay / 1000 (초 단위로 변환), 최대 10초로 제한
  2. Delayed job이 없는 경우:

    • drainDelay (기본 5초) 또는 minimumBlockTimeout 중 큰 값

왜 최대 10초로 제한하는가?

코드 주석에 나와있듯이, Redis 연결이 너무 오래 blocking되면 재연결 문제가 발생할 수 있습니다. 10초마다 한 번씩 연결 상태를 확인하여 안정성을 보장합니다.

실제 예시:

// 현재 시간: 1000000ms
// 다음 delayed job: 1005000ms (5초 후)
// blockDelay = 5000ms
// blockTimeout = 5초

// 현재 시간: 1000000ms  
// 다음 delayed job: 10015000ms (15초 후)
// blockDelay = 15000ms
// blockTimeout = 10초 (최대값으로 제한)

이렇게 하면 delayed job이 처리 가능해지는 정확한 시간에 Worker가 깨어나며, 동시에 연결 안정성도 보장됩니다.

Event-driven vs Polling

Polling 방식 (사용하지 않음)

// 가상의 polling 방식 (BullMQ는 이렇게 하지 않음)
while (true) {
  const jobs = await redis.zrange('delayed:queue', 0, 0);
  if (jobs.length > 0 && jobs[0].timestamp <= Date.now()) {
    // process job
  }
  await sleep(1000); // 1초마다 체크
}

문제점:

  • CPU 자원 낭비
  • 네트워크 트래픽 증가
  • 정확한 타이밍 보장 어려움
  • 불필요한 Redis 부하

Event-driven 방식 (BullMQ가 사용하는 방식)

// 실제 BullMQ 방식
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
// Redis 서버에서 blocking wait
// Marker가 추가되면 즉시 반환

장점:

  • CPU 자원 절약 (대기 중에는 리소스 사용 없음)
  • 네트워크 트래픽 최소화
  • 정확한 타이밍 보장
  • Redis 부하 최소화

전체 동작 흐름

다이어그램 로딩 중...

Worker 클래스 구조

Worker 클래스 상속 구조

Worker 클래스 상속 구조

상속 구조

export class Worker<
  DataType = any,
  ResultType = any,
  NameType extends string = string,
> extends QueueBase {
EventEmitter (Node.js 내장)
    ↓ extends
QueueBase (Redis 연결, Scripts 관리)
    ↓ extends  
Worker (Job 처리 로직)

중요한 오해 해소: 상속 구조 ≠ Polling

많은 개발자들이 이 상속 구조를 보고 "EventEmitter를 상속받으니 polling 방식이 아닌가?"라고 생각할 수 있습니다. 하지만 이것은 완전히 다른 개념입니다.

EventEmitter의 역할:

  • EventEmitter이벤트를 emit하고 listen하는 기능만 제공합니다
  • Job이 완료되거나 실패했을 때 emit('completed', job) 같은 이벤트를 발생시킵니다
  • Job을 가져오는 방식과는 전혀 무관합니다

실제 Job 가져오기 방식:

  • 상속 구조와 무관하게, 실제로는 BZPOPMIN blocking primitive를 사용합니다
  • mainLoop에서 waitForJob()bclient.bzpopmin() 호출
  • 이것이 polling이 아닌 event-driven 방식의 핵심입니다

비유로 이해하기:

상속 구조 = 건물의 구조 (어떤 층에 무엇이 있는지)
Polling vs Event-driven = 건물에서 사람을 찾는 방법 (주기적으로 방문 vs 벨을 누르면 나옴)

EventEmitter는 단지 "벨이 울렸을 때 알림을 받는 시스템"일 뿐, "사람을 찾는 방법"과는 무관합니다.

주요 컴포넌트

  1. QueueBase: Redis 연결, Scripts (Lua 스크립트 실행), QueueKeys 관리
  2. LockManager: Job lock 관리 및 갱신
  3. blockingConnection: BZPOPMIN을 위한 별도 Redis 연결
  4. mainLoop: Job을 가져와 처리하는 메인 루프
  5. Scripts: Redis Lua 스크립트 실행 (moveToActive, promoteDelayedJobs 등)

생성자에서 초기화

  constructor(
    name: string,
    processor?: string | URL | null | Processor<DataType, ResultType, NameType>,
    opts?: WorkerOptions,
    Connection?: typeof RedisConnection,
  ) {
    super(
      name,
      {
        drainDelay: 5,
        concurrency: 1,
        lockDuration: 30000,
        maximumRateLimitDelay: 30000,
        maxStalledCount: 1,
        stalledInterval: 30000,
        autorun: true,
        runRetryDelay: 15000,
        ...opts,
        blockingConnection: true,
      },
      Connection,
    );

    if (!opts || !opts.connection) {
      throw new Error('Worker requires a connection');
    }

    // ... validation code ...

    this.concurrency = this.opts.concurrency;

    this.opts.lockRenewTime =
      this.opts.lockRenewTime || this.opts.lockDuration / 2;

    this.id = v4();

    this.createLockManager();

    if (processor) {
      if (typeof processor === 'function') {
        this.processFn = processor;
        // Check if processor accepts signal parameter (3rd parameter)
        this.processorAcceptsSignal = processor.length >= 3;
      } else {
        // SANDBOXED - worker threads or child process
        // ...
      }

      if (this.opts.autorun) {
        this.run().catch(error => this.emit('error', error));
      }
    }

    // Blocking connection for BZPOPMIN
    const connectionName =
      this.clientName() + (this.opts.name ? `:w:${this.opts.name}` : '');
    this.blockingConnection = new RedisConnection(
      isRedisInstance(opts.connection)
        ? (<Redis>opts.connection).isCluster
          ? (<Cluster>opts.connection).duplicate(undefined, {
              redisOptions: {
                ...((<Cluster>opts.connection).options?.redisOptions || {}),
                connectionName,
              },
            })
          : (<Redis>opts.connection).duplicate({ connectionName })
        : { ...opts.connection, connectionName },
      {
        shared: false,
        blocking: true,
        skipVersionCheck: opts.skipVersionCheck,
      },
    );
    this.blockingConnection.on('error', error => this.emit('error', error));
    this.blockingConnection.on('ready', () =>
      setTimeout(() => this.emit('ready'), 0),
    );
  }

핵심 포인트:

  • blockingConnection: true 옵션으로 별도의 blocking 연결 생성
  • 이 연결은 BZPOPMIN 같은 blocking 명령을 위해 사용됨
  • 일반 연결과 분리하여 blocking이 다른 작업에 영향을 주지 않도록 함

TypeScript → 실제 실행까지

1. 컴파일 단계

{
  "compilerOptions": {
    "types": ["node"],
    "target": "ES2017",
    "module": "ES2020",
    "incremental": true,
    "declaration": true,
    "outDir": "dist/esm",
    "sourceMap": true,
    "experimentalDecorators": true,
    "emitDecoratorMetadata": true,
    "strict": true,
    "jsx": "preserve",
    "importHelpers": true,
    "moduleResolution": "node",
    "esModuleInterop": false,
    "allowSyntheticDefaultImports": false,
    "strictNullChecks": false,
    "baseUrl": ".",
    "lib": ["esnext", "DOM"]
  },
  "include": ["src"],
  "exclude": ["node_modules", "dist", "tests/*", "src/commands/*.ts"]
}

TypeScript 소스코드가 ES2017 JavaScript로 컴파일되어 dist/esm/ 디렉토리에 출력됩니다.

2. Redis 연결

  private async init() {
    if (!this._client) {
      const { url, ...rest } = this.opts;
      this._client = url ? new IORedis(url, rest) : new IORedis(rest);
    }

    increaseMaxListeners(this._client, 3);

    this._client.on('error', this.handleClientError);
    // ioredis treats connection errors as a different event ('close')
    this._client.on('close', this.handleClientClose);

    this._client.on('ready', this.handleClientReady);

    if (!this.extraOptions.skipWaitingForReady) {
      await RedisConnection.waitUntilReady(this._client);
    }

    this.loadCommands(this.packageVersion);

ioredis 라이브러리를 사용하여 Redis에 TCP/IP 연결을 생성합니다.

3. Lua 스크립트 로딩

  protected loadCommands(
    packageVersion: string,
    providedScripts?: Record<string, RawCommand>,
  ): void {
    const finalScripts =
      providedScripts || (scripts as Record<string, RawCommand>);
    for (const property in finalScripts as Record<string, RawCommand>) {
      // Only define the command if not already defined
      const commandName = `${finalScripts[property].name}:${packageVersion}`;
      if (!(<any>this._client)[commandName]) {
        (<any>this._client).defineCommand(commandName, {
          numberOfKeys: finalScripts[property].keys,
          lua: finalScripts[property].content,
        });
      }
    }
  }

Lua 스크립트 파일을 읽어서 ioredis.defineCommand()로 커스텀 명령으로 등록합니다. 이후 호출 시 Redis에서 EVAL 또는 EVALSHA로 실행됩니다.

4. 네트워크 레벨 동작

다이어그램 로딩 중...

핵심:

  • await bzpopmin()은 Promise를 pending 상태로 만듭니다
  • libuv가 TCP 소켓을 모니터링합니다
  • Node.js 이벤트 루프는 다른 작업을 계속 처리할 수 있습니다
  • Redis에서 marker가 추가되면 TCP 소켓을 통해 응답이 전달됩니다
  • Promise가 resolve되어 코드가 계속 실행됩니다

Node.js 이벤트 루프와의 관계

다이어그램 로딩 중...

핵심 포인트:

  • Blocking wait는 Node.js 프로세스를 블록하지 않습니다
  • 이벤트 루프는 다른 작업을 계속 처리할 수 있습니다
  • 이것이 Node.js의 비동기 I/O 모델의 핵심입니다

NestJS와의 통합

NestJS에서 WorkerHost를 사용하는 경우도 내부적으로는 동일한 Worker 클래스를 사용합니다:

// @nestjs/bullmq 패키지 내부 (의사 코드)
class WorkerHost {
  protected worker: Worker; // BullMQ의 Worker 인스턴스
  
  constructor() {
    // NestJS가 자동으로 Worker를 생성
    this.worker = new Worker(queueName, this.process.bind(this), options);
  }
  
  async process(job: Job): Promise<any> {
    // 하위 클래스에서 구현
    throw new Error('process method must be implemented');
  }
}

WorkerHost는 단순히 NestJS의 DI 시스템과 생명주기 관리와 통합하기 위한 래퍼일 뿐입니다. 실제 동작은 동일한 BullMQ Worker 클래스가 담당합니다.

핵심 코드 위치 정리

기능파일 위치핵심 코드
Worker 클래스src/classes/worker.tsmainLoop(), waitForJob()
Blocking Waitsrc/classes/worker.ts:808await bclient.bzpopmin()
Delayed Job 추가src/commands/includes/addDelayedJob.luaZADD delayed:queue
Delayed Job Promotionsrc/commands/includes/promoteDelayedJobs.luaZRANGEBYSCORE
moveToActivesrc/commands/moveToActive-11.luapromoteDelayedJobs() 호출
Redis 연결src/classes/redis-connection.tsnew IORedis()

왜 이 방식인가?

장점

  1. 효율성: 불필요한 polling이 없어 CPU와 네트워크 리소스를 절약합니다
  2. 정확성: Redis 서버에서 직접 시간을 체크하므로 정확한 타이밍을 보장합니다
  3. 확장성: 여러 Worker가 동시에 동작해도 문제없이 작동합니다
  4. 리소스 절약: Blocking wait 중에는 리소스를 거의 사용하지 않습니다

Polling 방식과의 비교

항목Polling 방식Event-driven 방식 (BullMQ)
CPU 사용지속적 사용대기 중 거의 사용 안 함
네트워크 트래픽주기적 요청필요할 때만 통신
정확도주기 간격에 의존Redis 서버에서 직접 체크
확장성Worker 수에 비례하여 부하 증가Worker 수와 무관하게 효율적

결론

BullMQ의 delayed job은:

  1. Polling 방식이 아닙니다 - Redis의 blocking primitive (BZPOPMIN)를 사용합니다
  2. Event-driven 방식 - Marker가 추가되면 Worker가 즉시 깨어납니다
  3. 효율적 - 불필요한 리소스 사용이 없습니다
  4. 정확함 - Redis 서버에서 직접 시간을 체크합니다

이러한 설계 덕분에 BullMQ는 수천 개의 delayed job을 효율적으로 처리할 수 있으며, 여러 Worker가 동시에 동작해도 문제없이 작동합니다.

Redis의 무한한 가능성

이번 탐구를 통해 Redis의 강력함을 다시 한번 느꼈습니다. 알면 알수록 재밌는 Redis입니다.

Redis를 비동기 Queue로 활용하기

Redis는 단순한 key-value 저장소가 아닙니다. Sorted Set, List, Blocking Primitives, Lua Scripts 등을 조합하면:

  • Delayed Job 처리: BullMQ처럼 정확한 타이밍의 delayed job
  • Priority Queue: Score 기반 우선순위 처리
  • Rate Limiting: 시간 기반 요청 제한
  • Distributed Lock: 분산 환경에서의 동시성 제어
  • Pub/Sub: 실시간 이벤트 스트리밍

등 다양한 비동기 패턴을 구현할 수 있습니다.

Kafka vs Redis: 정답은 없다

많은 개발자들이 "큐가 필요하면 Kafka를 써야 한다"고 생각합니다. 하지만 항상 그런 것은 아닙니다.

Redis가 적합한 경우:

  • 처리량이 초당 수만~수십만 건 수준
  • 지연시간이 중요 (마이크로초 단위)
  • 단순한 큐 구조로 충분
  • 이미 Redis 인프라가 있음
  • 개발 및 운영 복잡도를 낮추고 싶음

Kafka가 적합한 경우:

  • 초당 수백만 건 이상의 처리량 필요
  • 장기간 메시지 보관 필요 (일주일 이상)
  • 복잡한 스트림 처리 (Kafka Streams)
  • 여러 Consumer Group이 서로 다른 속도로 소비
  • 대규모 분산 시스템

핵심은 상황에 맞는 선택입니다.

BullMQ처럼 Redis를 활용하면, Kafka 없이도 충분히 강력한 비동기 큐 시스템을 구축할 수 있습니다. 특히 delayed job, priority queue, rate limiting 같은 기능은 Redis의 Sorted Set과 Blocking Primitives로 매우 효율적으로 구현할 수 있습니다.

이번 탐구를 통해 Redis의 내부 동작 원리를 이해하게 되었고, 앞으로도 Redis를 더 다양하게 활용해볼 수 있을 것 같습니다.

참고 자료

댓글

?