WeniVooks

검색

FastAPI 베이스캠프

백그라운드 작업과 스케줄링

1. 백그라운드 작업과 스케줄링 소개

웹 애플리케이션에서는 사용자의 요청에 즉시 응답하지 않아도 되는 작업들이 있습니다. 예를 들어, 이메일 전송, 대용량 파일 처리, 데이터 분석 등이 그런 작업들입니다. 이러한 작업들은 백그라운드에서 처리하거나 주기적으로 실행되도록 스케줄링할 수 있습니다. FastAPI는 이러한 기능을 구현하는데 도움이 되는 도구들을 제공합니다.

2. BackgroundTasks 사용하기

FastAPI의 BackgroundTasks를 사용하면 HTTP 응답을 반환한 후에 작업을 실행할 수 있습니다.

from fastapi import FastAPI, BackgroundTasks
 
app = FastAPI()
 
def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)
 
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}
from fastapi import FastAPI, BackgroundTasks
 
app = FastAPI()
 
def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)
 
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

이 예제에서는 알림을 보내는 작업을 백그라운드에서 실행합니다.

3. 간단한 작업 큐 구현하기

더 복잡한 백그라운드 작업을 위해 간단한 작업 큐를 구현할 수 있습니다.

from fastapi import FastAPI
import asyncio
from collections import deque
 
app = FastAPI()
task_queue = deque()
 
async def process_tasks():
    while True:
        if task_queue:
            task = task_queue.popleft()
            await task()
        else:
            await asyncio.sleep(1)
 
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(process_tasks())
 
async def example_task(item: str):
    await asyncio.sleep(5)  # 시간이 걸리는 작업 시뮬레이션
    print(f"Processed item: {item}")
 
@app.post("/add-task/{item}")
async def add_task(item: str):
    task_queue.append(lambda: example_task(item))
    return {"message": f"Task added for item: {item}"}
from fastapi import FastAPI
import asyncio
from collections import deque
 
app = FastAPI()
task_queue = deque()
 
async def process_tasks():
    while True:
        if task_queue:
            task = task_queue.popleft()
            await task()
        else:
            await asyncio.sleep(1)
 
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(process_tasks())
 
async def example_task(item: str):
    await asyncio.sleep(5)  # 시간이 걸리는 작업 시뮬레이션
    print(f"Processed item: {item}")
 
@app.post("/add-task/{item}")
async def add_task(item: str):
    task_queue.append(lambda: example_task(item))
    return {"message": f"Task added for item: {item}"}

이 예제에서는 비동기 큐를 사용하여 백그라운드 작업을 관리합니다.

4. APScheduler를 이용한 작업 스케줄링

주기적인 작업을 실행하기 위해 APScheduler 라이브러리를 사용할 수 있습니다.

먼저, APScheduler를 설치해야 합니다.

pip install apscheduler
pip install apscheduler

그리고 다음과 같이 사용할 수 있습니다.

from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import time
 
app = FastAPI()
scheduler = BackgroundScheduler()
 
def scheduled_task():
    print(f"This task runs every minute - {time.strftime('%Y-%m-%d %H:%M:%S')}")
 
scheduler.add_job(scheduled_task, CronTrigger(minute="*"))
 
@app.on_event("startup")
def startup_event():
    scheduler.start()
 
@app.on_event("shutdown")
def shutdown_event():
    scheduler.shutdown()
 
@app.get("/")
def read_root():
    return {"Hello": "World"}
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import time
 
app = FastAPI()
scheduler = BackgroundScheduler()
 
def scheduled_task():
    print(f"This task runs every minute - {time.strftime('%Y-%m-%d %H:%M:%S')}")
 
scheduler.add_job(scheduled_task, CronTrigger(minute="*"))
 
@app.on_event("startup")
def startup_event():
    scheduler.start()
 
@app.on_event("shutdown")
def shutdown_event():
    scheduler.shutdown()
 
@app.get("/")
def read_root():
    return {"Hello": "World"}

이 예제에서는 매 분마다 scheduled_task 함수를 실행하는 작업을 스케줄링합니다.

5. 장기 실행 작업 처리하기

장시간 실행되는 작업의 경우, 작업 상태를 추적하고 결과를 나중에 조회할 수 있도록 구현할 수 있습니다.

from fastapi import FastAPI, BackgroundTasks
import time
import uuid
 
app = FastAPI()
 
# 작업 상태를 저장할 딕셔너리
tasks = {}
 
def long_running_task(task_id: str):
    time.sleep(10)  # 장기 실행 작업 시뮬레이션
    tasks[task_id] = "completed"
 
@app.post("/start-task/")
async def start_task(background_tasks: BackgroundTasks):
    task_id = str(uuid.uuid4())
    tasks[task_id] = "running"
    background_tasks.add_task(long_running_task, task_id)
    return {"task_id": task_id}
 
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    return {"status": tasks.get(task_id, "not found")}
from fastapi import FastAPI, BackgroundTasks
import time
import uuid
 
app = FastAPI()
 
# 작업 상태를 저장할 딕셔너리
tasks = {}
 
def long_running_task(task_id: str):
    time.sleep(10)  # 장기 실행 작업 시뮬레이션
    tasks[task_id] = "completed"
 
@app.post("/start-task/")
async def start_task(background_tasks: BackgroundTasks):
    task_id = str(uuid.uuid4())
    tasks[task_id] = "running"
    background_tasks.add_task(long_running_task, task_id)
    return {"task_id": task_id}
 
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    return {"status": tasks.get(task_id, "not found")}

이 예제에서는 장기 실행 작업을 시작하고 나중에 그 상태를 조회할 수 있습니다.

연습문제

  1. 이메일 전송을 시뮬레이션하는 백그라운드 작업을 구현해보세요. 이메일 주소와 메시지를 받아 백그라운드에서 "전송"하고, 로그 파일에 기록하는 기능을 만들어보세요.

  2. 매일 자정에 실행되는 데이터베이스 정리 작업을 APScheduler를 사용하여 구현해보세요. (실제 데이터베이스 연결 없이 로그를 출력하는 것으로 대체해도 됩니다)

  3. 파일 처리를 시뮬레이션하는 장기 실행 작업을 구현해보세요. 작업 진행 상황을 백분율로 추적하고, 상태 조회 엔드포인트에서 이 진행 상황을 반환하도록 만들어보세요.

  4. 간단한 작업 큐를 구현하고, 우선순위에 따라 작업을 처리하는 기능을 추가해보세요. 높은 우선순위의 작업이 먼저 처리되어야 합니다.

5.3 파일 업로드 및 처리