fix: isolate each worker call in independent async session

Shared session between queue consumer and workers caused
MissingGreenlet errors in APScheduler context. Each worker
call now gets its own session with explicit commit/rollback.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-03 08:29:14 +09:00
parent 87683ca000
commit 62f5eccb96

View File

@@ -32,37 +32,37 @@ async def reset_stale_items():
logger.warning(f"stale 항목 {result.rowcount}건 복구")
async def enqueue_next_stage(document_id: int, current_stage: str, session):
async def enqueue_next_stage(document_id: int, current_stage: str):
"""현재 stage 완료 후 다음 stage를 pending으로 등록"""
next_stages = {"extract": "classify", "classify": "embed"}
next_stage = next_stages.get(current_stage)
if not next_stage:
return
# 이미 존재하는지 확인 (중복 방지)
existing = await session.execute(
select(ProcessingQueue).where(
ProcessingQueue.document_id == document_id,
ProcessingQueue.stage == next_stage,
ProcessingQueue.status.in_(["pending", "processing"]),
async with async_session() as session:
existing = await session.execute(
select(ProcessingQueue).where(
ProcessingQueue.document_id == document_id,
ProcessingQueue.stage == next_stage,
ProcessingQueue.status.in_(["pending", "processing"]),
)
)
)
if existing.scalar_one_or_none():
return
if existing.scalar_one_or_none():
return
session.add(ProcessingQueue(
document_id=document_id,
stage=next_stage,
status="pending",
))
session.add(ProcessingQueue(
document_id=document_id,
stage=next_stage,
status="pending",
))
await session.commit()
async def consume_queue():
"""큐에서 pending 항목을 가져와 stage별 워커 실행"""
# 지연 임포트 (순환 참조 방지)
from workers.extract_worker import process as extract_process
from workers.classify_worker import process as classify_process
from workers.embed_worker import process as embed_process
from workers.extract_worker import process as extract_process
workers = {
"extract": extract_process,
@@ -70,15 +70,15 @@ async def consume_queue():
"embed": embed_process,
}
# stale 항목 복구
await reset_stale_items()
for stage, worker_fn in workers.items():
batch_size = BATCH_SIZE.get(stage, 3)
# pending 항목 조회
async with async_session() as session:
result = await session.execute(
select(ProcessingQueue)
select(ProcessingQueue.id, ProcessingQueue.document_id)
.where(
ProcessingQueue.stage == stage,
ProcessingQueue.status == "pending",
@@ -86,32 +86,46 @@ async def consume_queue():
.order_by(ProcessingQueue.created_at)
.limit(batch_size)
)
items = result.scalars().all()
pending_items = result.all()
for item in items:
# 각 항목을 독립 세션에서 처리
for queue_id, document_id in pending_items:
# 상태를 processing으로 변경
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item or item.status != "pending":
continue
item.status = "processing"
item.started_at = datetime.now(timezone.utc)
item.attempts += 1
await session.commit()
try:
await worker_fn(item.document_id, session)
# 워커 실행 (독립 세션)
try:
async with async_session() as worker_session:
await worker_fn(document_id, worker_session)
await worker_session.commit()
# 완료 처리
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
item.status = "completed"
item.completed_at = datetime.now(timezone.utc)
await enqueue_next_stage(item.document_id, stage, session)
await session.commit()
logger.info(f"[{stage}] document_id={item.document_id} 완료")
except Exception as e:
await session.rollback()
# 세션에서 item 다시 로드
item = await session.get(ProcessingQueue, item.id)
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} 완료")
except Exception as e:
# 실패 처리
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
item.error_message = str(e)[:500]
if item.attempts >= item.max_attempts:
item.status = "failed"
logger.error(f"[{stage}] document_id={item.document_id} 영구 실패: {e}")
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
else:
item.status = "pending"
item.started_at = None
logger.warning(f"[{stage}] document_id={item.document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
await session.commit()