From 62f5eccb96172ba0cb32733d61cc0ef3bd0edf0a Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Fri, 3 Apr 2026 08:29:14 +0900 Subject: [PATCH] fix: isolate each worker call in independent async session Shared session between queue consumer and workers caused MissingGreenlet errors in APScheduler context. Each worker call now gets its own session with explicit commit/rollback. Co-Authored-By: Claude Opus 4.6 (1M context) --- app/workers/queue_consumer.py | 76 +++++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 31 deletions(-) diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 3856acf..4117b89 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -32,37 +32,37 @@ async def reset_stale_items(): logger.warning(f"stale 항목 {result.rowcount}건 복구") -async def enqueue_next_stage(document_id: int, current_stage: str, session): +async def enqueue_next_stage(document_id: int, current_stage: str): """현재 stage 완료 후 다음 stage를 pending으로 등록""" next_stages = {"extract": "classify", "classify": "embed"} next_stage = next_stages.get(current_stage) if not next_stage: return - # 이미 존재하는지 확인 (중복 방지) - existing = await session.execute( - select(ProcessingQueue).where( - ProcessingQueue.document_id == document_id, - ProcessingQueue.stage == next_stage, - ProcessingQueue.status.in_(["pending", "processing"]), + async with async_session() as session: + existing = await session.execute( + select(ProcessingQueue).where( + ProcessingQueue.document_id == document_id, + ProcessingQueue.stage == next_stage, + ProcessingQueue.status.in_(["pending", "processing"]), + ) ) - ) - if existing.scalar_one_or_none(): - return + if existing.scalar_one_or_none(): + return - session.add(ProcessingQueue( - document_id=document_id, - stage=next_stage, - status="pending", - )) + session.add(ProcessingQueue( + document_id=document_id, + stage=next_stage, + status="pending", + )) + await session.commit() async def consume_queue(): """큐에서 pending 항목을 가져와 stage별 워커 실행""" - # 지연 임포트 (순환 참조 방지) - from workers.extract_worker import process as extract_process from workers.classify_worker import process as classify_process from workers.embed_worker import process as embed_process + from workers.extract_worker import process as extract_process workers = { "extract": extract_process, @@ -70,15 +70,15 @@ async def consume_queue(): "embed": embed_process, } - # stale 항목 복구 await reset_stale_items() for stage, worker_fn in workers.items(): batch_size = BATCH_SIZE.get(stage, 3) + # pending 항목 조회 async with async_session() as session: result = await session.execute( - select(ProcessingQueue) + select(ProcessingQueue.id, ProcessingQueue.document_id) .where( ProcessingQueue.stage == stage, ProcessingQueue.status == "pending", @@ -86,32 +86,46 @@ async def consume_queue(): .order_by(ProcessingQueue.created_at) .limit(batch_size) ) - items = result.scalars().all() + pending_items = result.all() - for item in items: + # 각 항목을 독립 세션에서 처리 + for queue_id, document_id in pending_items: + # 상태를 processing으로 변경 + async with async_session() as session: + item = await session.get(ProcessingQueue, queue_id) + if not item or item.status != "pending": + continue item.status = "processing" item.started_at = datetime.now(timezone.utc) item.attempts += 1 await session.commit() - try: - await worker_fn(item.document_id, session) + # 워커 실행 (독립 세션) + try: + async with async_session() as worker_session: + await worker_fn(document_id, worker_session) + await worker_session.commit() + + # 완료 처리 + async with async_session() as session: + item = await session.get(ProcessingQueue, queue_id) item.status = "completed" item.completed_at = datetime.now(timezone.utc) - await enqueue_next_stage(item.document_id, stage, session) await session.commit() - logger.info(f"[{stage}] document_id={item.document_id} 완료") - except Exception as e: - await session.rollback() - # 세션에서 item 다시 로드 - item = await session.get(ProcessingQueue, item.id) + await enqueue_next_stage(document_id, stage) + logger.info(f"[{stage}] document_id={document_id} 완료") + + except Exception as e: + # 실패 처리 + async with async_session() as session: + item = await session.get(ProcessingQueue, queue_id) item.error_message = str(e)[:500] if item.attempts >= item.max_attempts: item.status = "failed" - logger.error(f"[{stage}] document_id={item.document_id} 영구 실패: {e}") + logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}") else: item.status = "pending" item.started_at = None - logger.warning(f"[{stage}] document_id={item.document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}") + logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}") await session.commit()