백그라운드 작업과 스케줄링
1. 백그라운드 작업과 스케줄링 소개
웹 애플리케이션에서는 사용자의 요청에 즉시 응답하지 않아도 되는 작업들이 있습니다. 예를 들어, 이메일 전송, 대용량 파일 처리, 데이터 분석 등이 그런 작업들입니다. 이러한 작업들은 백그라운드에서 처리하거나 주기적으로 실행되도록 스케줄링할 수 있습니다. FastAPI는 이러한 기능을 구현하는데 도움이 되는 도구들을 제공합니다.
2. BackgroundTasks 사용하기
FastAPI의 BackgroundTasks
를 사용하면 HTTP 응답을 반환한 후에 작업을 실행할 수 있습니다.
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
이 예제에서는 알림을 보내는 작업을 백그라운드에서 실행합니다.
3. 간단한 작업 큐 구현하기
더 복잡한 백그라운드 작업을 위해 간단한 작업 큐를 구현할 수 있습니다.
from fastapi import FastAPI
import asyncio
from collections import deque
app = FastAPI()
task_queue = deque()
async def process_tasks():
while True:
if task_queue:
task = task_queue.popleft()
await task()
else:
await asyncio.sleep(1)
@app.on_event("startup")
async def startup_event():
asyncio.create_task(process_tasks())
async def example_task(item: str):
await asyncio.sleep(5) # 시간이 걸리는 작업 시뮬레이션
print(f"Processed item: {item}")
@app.post("/add-task/{item}")
async def add_task(item: str):
task_queue.append(lambda: example_task(item))
return {"message": f"Task added for item: {item}"}
from fastapi import FastAPI
import asyncio
from collections import deque
app = FastAPI()
task_queue = deque()
async def process_tasks():
while True:
if task_queue:
task = task_queue.popleft()
await task()
else:
await asyncio.sleep(1)
@app.on_event("startup")
async def startup_event():
asyncio.create_task(process_tasks())
async def example_task(item: str):
await asyncio.sleep(5) # 시간이 걸리는 작업 시뮬레이션
print(f"Processed item: {item}")
@app.post("/add-task/{item}")
async def add_task(item: str):
task_queue.append(lambda: example_task(item))
return {"message": f"Task added for item: {item}"}
이 예제에서는 비동기 큐를 사용하여 백그라운드 작업을 관리합니다.
4. APScheduler를 이용한 작업 스케줄링
주기적인 작업을 실행하기 위해 APScheduler 라이브러리를 사용할 수 있습니다.
먼저, APScheduler를 설치해야 합니다.
pip install apscheduler
pip install apscheduler
그리고 다음과 같이 사용할 수 있습니다.
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import time
app = FastAPI()
scheduler = BackgroundScheduler()
def scheduled_task():
print(f"This task runs every minute - {time.strftime('%Y-%m-%d %H:%M:%S')}")
scheduler.add_job(scheduled_task, CronTrigger(minute="*"))
@app.on_event("startup")
def startup_event():
scheduler.start()
@app.on_event("shutdown")
def shutdown_event():
scheduler.shutdown()
@app.get("/")
def read_root():
return {"Hello": "World"}
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import time
app = FastAPI()
scheduler = BackgroundScheduler()
def scheduled_task():
print(f"This task runs every minute - {time.strftime('%Y-%m-%d %H:%M:%S')}")
scheduler.add_job(scheduled_task, CronTrigger(minute="*"))
@app.on_event("startup")
def startup_event():
scheduler.start()
@app.on_event("shutdown")
def shutdown_event():
scheduler.shutdown()
@app.get("/")
def read_root():
return {"Hello": "World"}
이 예제에서는 매 분마다 scheduled_task
함수를 실행하는 작업을 스케줄링합니다.
5. 장기 실행 작업 처리하기
장시간 실행되는 작업의 경우, 작업 상태를 추적하고 결과를 나중에 조회할 수 있도록 구현할 수 있습니다.
from fastapi import FastAPI, BackgroundTasks
import time
import uuid
app = FastAPI()
# 작업 상태를 저장할 딕셔너리
tasks = {}
def long_running_task(task_id: str):
time.sleep(10) # 장기 실행 작업 시뮬레이션
tasks[task_id] = "completed"
@app.post("/start-task/")
async def start_task(background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4())
tasks[task_id] = "running"
background_tasks.add_task(long_running_task, task_id)
return {"task_id": task_id}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
return {"status": tasks.get(task_id, "not found")}
from fastapi import FastAPI, BackgroundTasks
import time
import uuid
app = FastAPI()
# 작업 상태를 저장할 딕셔너리
tasks = {}
def long_running_task(task_id: str):
time.sleep(10) # 장기 실행 작업 시뮬레이션
tasks[task_id] = "completed"
@app.post("/start-task/")
async def start_task(background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4())
tasks[task_id] = "running"
background_tasks.add_task(long_running_task, task_id)
return {"task_id": task_id}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
return {"status": tasks.get(task_id, "not found")}
이 예제에서는 장기 실행 작업을 시작하고 나중에 그 상태를 조회할 수 있습니다.
연습문제
-
이메일 전송을 시뮬레이션하는 백그라운드 작업을 구현해보세요. 이메일 주소와 메시지를 받아 백그라운드에서 "전송"하고, 로그 파일에 기록하는 기능을 만들어보세요.
-
매일 자정에 실행되는 데이터베이스 정리 작업을 APScheduler를 사용하여 구현해보세요. (실제 데이터베이스 연결 없이 로그를 출력하는 것으로 대체해도 됩니다)
-
파일 처리를 시뮬레이션하는 장기 실행 작업을 구현해보세요. 작업 진행 상황을 백분율로 추적하고, 상태 조회 엔드포인트에서 이 진행 상황을 반환하도록 만들어보세요.
-
간단한 작업 큐를 구현하고, 우선순위에 따라 작업을 처리하는 기능을 추가해보세요. 높은 우선순위의 작업이 먼저 처리되어야 합니다.