Files
hyungi_document_server/app/workers/queue_consumer.py
Hyungi Ahn 62f5eccb96 fix: isolate each worker call in independent async session
Shared session between queue consumer and workers caused
MissingGreenlet errors in APScheduler context. Each worker
call now gets its own session with explicit commit/rollback.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 08:29:14 +09:00

132 lines
4.9 KiB
Python

"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출"""
from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from core.database import async_session
from core.utils import setup_logger
from models.queue import ProcessingQueue
logger = setup_logger("queue_consumer")
# stage별 배치 크기
BATCH_SIZE = {"extract": 5, "classify": 3, "embed": 1}
STALE_THRESHOLD_MINUTES = 10
async def reset_stale_items():
"""processing 상태로 10분 이상 방치된 항목 복구"""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_THRESHOLD_MINUTES)
async with async_session() as session:
result = await session.execute(
update(ProcessingQueue)
.where(
ProcessingQueue.status == "processing",
ProcessingQueue.started_at < cutoff,
)
.values(status="pending", started_at=None)
)
if result.rowcount > 0:
await session.commit()
logger.warning(f"stale 항목 {result.rowcount}건 복구")
async def enqueue_next_stage(document_id: int, current_stage: str):
"""현재 stage 완료 후 다음 stage를 pending으로 등록"""
next_stages = {"extract": "classify", "classify": "embed"}
next_stage = next_stages.get(current_stage)
if not next_stage:
return
async with async_session() as session:
existing = await session.execute(
select(ProcessingQueue).where(
ProcessingQueue.document_id == document_id,
ProcessingQueue.stage == next_stage,
ProcessingQueue.status.in_(["pending", "processing"]),
)
)
if existing.scalar_one_or_none():
return
session.add(ProcessingQueue(
document_id=document_id,
stage=next_stage,
status="pending",
))
await session.commit()
async def consume_queue():
"""큐에서 pending 항목을 가져와 stage별 워커 실행"""
from workers.classify_worker import process as classify_process
from workers.embed_worker import process as embed_process
from workers.extract_worker import process as extract_process
workers = {
"extract": extract_process,
"classify": classify_process,
"embed": embed_process,
}
await reset_stale_items()
for stage, worker_fn in workers.items():
batch_size = BATCH_SIZE.get(stage, 3)
# pending 항목 조회
async with async_session() as session:
result = await session.execute(
select(ProcessingQueue.id, ProcessingQueue.document_id)
.where(
ProcessingQueue.stage == stage,
ProcessingQueue.status == "pending",
)
.order_by(ProcessingQueue.created_at)
.limit(batch_size)
)
pending_items = result.all()
# 각 항목을 독립 세션에서 처리
for queue_id, document_id in pending_items:
# 상태를 processing으로 변경
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item or item.status != "pending":
continue
item.status = "processing"
item.started_at = datetime.now(timezone.utc)
item.attempts += 1
await session.commit()
# 워커 실행 (독립 세션)
try:
async with async_session() as worker_session:
await worker_fn(document_id, worker_session)
await worker_session.commit()
# 완료 처리
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
item.status = "completed"
item.completed_at = datetime.now(timezone.utc)
await session.commit()
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} 완료")
except Exception as e:
# 실패 처리
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
item.error_message = str(e)[:500]
if item.attempts >= item.max_attempts:
item.status = "failed"
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
else:
item.status = "pending"
item.started_at = None
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
await session.commit()