Celery란?

Celery는 Python 기반의 분산 태스크 큐 프레임워크다. 시간이 오래 걸리거나 즉시 응답할 필요가 없는 작업(이메일 발송, 이미지 처리, 보고서 생성, webhook 전송 등)을 메인 프로세스에서 분리하여 비동기로 처리한다.


아키텍처

┌────────────┐     ┌──────────┐     ┌────────────┐
│ Producer   │────▶│ Broker   │────▶│ Worker(s)  │
│ (App)      │     │          │     │            │
└────────────┘     └──────────┘     └─────┬──────┘
                                          │
                                    ┌─────▼──────┐
                                    │ Result     │
                                    │ Backend    │
                                    └────────────┘
컴포넌트역할구현체
Producer태스크를 정의하고 브로커에 전송하는 애플리케이션Django, Flask 등
Broker태스크 메시지를 저장하고 Worker에 전달하는 메시지 큐RabbitMQ, Valkey/Redis
Worker브로커에서 태스크를 가져와 실행하는 프로세스Celery Worker
Result Backend태스크 실행 결과를 저장 (선택적)Redis, DB, S3 등

핵심 개념

Task

Celery의 기본 작업 단위다. Python 함수에 데코레이터를 붙여 정의한다.

@app.task
def send_order_confirmation(order_id):
    order = Order.objects.get(id=order_id)
    send_email(order.customer.email, ...)

Task 호출 방식

메서드동작
task.delay(args)비동기 호출 (가장 간단)
task.apply_async(args, kwargs, ...)상세 옵션 지정 가능 (countdown, eta, queue 등)
task.apply(args)동기 호출 (테스트용)

Worker

태스크를 실제로 실행하는 프로세스다. 여러 Worker를 병렬로 띄워 처리량을 확장할 수 있다.

celery -A myapp worker --loglevel=info --concurrency=4
  • Prefork (기본): 멀티프로세스. CPU 바운드 작업에 적합
  • Eventlet/Gevent: 그린스레드. I/O 바운드 작업에 적합
  • Solo: 단일 스레드. 디버깅용

브로커 선택

브로커장점단점
RabbitMQAMQP 표준, 메시지 보장 강력, 라우팅 유연운영 복잡도 높음
Valkey/Redis빠른 속도, 설정 간단, 캐시와 겸용 가능메시지 내구성 상대적으로 약함

소규모~중규모 환경에서는 Valkey/Redis가 간편하고, 메시지 유실이 치명적인 환경에서는 RabbitMQ가 안전하다.


주기적 태스크 (Celery Beat)

cron과 유사하게 주기적으로 태스크를 실행하는 스케줄러다. 별도 프로세스로 구동된다.

app.conf.beat_schedule = {
    'cleanup-every-hour': {
        'task': 'tasks.cleanup_expired_sessions',
        'schedule': 3600.0,  # 매 1시간
    },
}

태스크 라우팅

태스크를 특정 큐로 보내고, 특정 Worker가 해당 큐만 소비하도록 분리할 수 있다.

# 태스크 전송 시 큐 지정
send_email.apply_async(args=[order_id], queue='email')
 
# Worker 실행 시 큐 지정
# celery -A myapp worker -Q email

이를 통해 이메일 발송, 이미지 처리, webhook 전송 등을 독립적으로 스케일링할 수 있다.


재시도와 에러 처리

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def call_external_api(self, url):
    try:
        response = requests.post(url)
        response.raise_for_status()
    except requests.RequestException as exc:
        raise self.retry(exc=exc)
  • max_retries: 최대 재시도 횟수
  • default_retry_delay: 재시도 간격 (초)
  • Exponential backoff: retry_backoff=True로 지수적 대기 가능

모니터링

도구설명
Flower웹 기반 실시간 모니터링. 태스크 상태, Worker 현황, 큐 길이 시각화
celery inspectCLI로 Worker 상태, 활성 태스크, 예약 태스크 조회
celery events이벤트 스트림을 구독하여 외부 모니터링 시스템에 연동

관련 문서

  • Valkey — Celery의 메시지 브로커로 사용되는 인메모리 데이터스토어
  • Mailpit — 개발 환경에서 Celery Worker가 발송한 이메일을 수신하는 테스트 도구