diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 3856acf..4117b89 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -32,37 +32,37 @@ async def reset_stale_items(): logger.warning(f"stale 항목 {result.rowcount}건 복구") -async def enqueue_next_stage(document_id: int, current_stage: str, session): +async def enqueue_next_stage(document_id: int, current_stage: str): """현재 stage 완료 후 다음 stage를 pending으로 등록""" next_stages = {"extract": "classify", "classify": "embed"} next_stage = next_stages.get(current_stage) if not next_stage: return - # 이미 존재하는지 확인 (중복 방지) - existing = await session.execute( - select(ProcessingQueue).where( - ProcessingQueue.document_id == document_id, - ProcessingQueue.stage == next_stage, - ProcessingQueue.status.in_(["pending", "processing"]), + async with async_session() as session: + existing = await session.execute( + select(ProcessingQueue).where( + ProcessingQueue.document_id == document_id, + ProcessingQueue.stage == next_stage, + ProcessingQueue.status.in_(["pending", "processing"]), + ) ) - ) - if existing.scalar_one_or_none(): - return + if existing.scalar_one_or_none(): + return - session.add(ProcessingQueue( - document_id=document_id, - stage=next_stage, - status="pending", - )) + session.add(ProcessingQueue( + document_id=document_id, + stage=next_stage, + status="pending", + )) + await session.commit() async def consume_queue(): """큐에서 pending 항목을 가져와 stage별 워커 실행""" - # 지연 임포트 (순환 참조 방지) - from workers.extract_worker import process as extract_process from workers.classify_worker import process as classify_process from workers.embed_worker import process as embed_process + from workers.extract_worker import process as extract_process workers = { "extract": extract_process, @@ -70,15 +70,15 @@ async def consume_queue(): "embed": embed_process, } - # stale 항목 복구 await reset_stale_items() for stage, worker_fn in workers.items(): batch_size = BATCH_SIZE.get(stage, 3) + # pending 항목 조회 async with async_session() as session: result = await session.execute( - select(ProcessingQueue) + select(ProcessingQueue.id, ProcessingQueue.document_id) .where( ProcessingQueue.stage == stage, ProcessingQueue.status == "pending", @@ -86,32 +86,46 @@ async def consume_queue(): .order_by(ProcessingQueue.created_at) .limit(batch_size) ) - items = result.scalars().all() + pending_items = result.all() - for item in items: + # 각 항목을 독립 세션에서 처리 + for queue_id, document_id in pending_items: + # 상태를 processing으로 변경 + async with async_session() as session: + item = await session.get(ProcessingQueue, queue_id) + if not item or item.status != "pending": + continue item.status = "processing" item.started_at = datetime.now(timezone.utc) item.attempts += 1 await session.commit() - try: - await worker_fn(item.document_id, session) + # 워커 실행 (독립 세션) + try: + async with async_session() as worker_session: + await worker_fn(document_id, worker_session) + await worker_session.commit() + + # 완료 처리 + async with async_session() as session: + item = await session.get(ProcessingQueue, queue_id) item.status = "completed" item.completed_at = datetime.now(timezone.utc) - await enqueue_next_stage(item.document_id, stage, session) await session.commit() - logger.info(f"[{stage}] document_id={item.document_id} 완료") - except Exception as e: - await session.rollback() - # 세션에서 item 다시 로드 - item = await session.get(ProcessingQueue, item.id) + await enqueue_next_stage(document_id, stage) + logger.info(f"[{stage}] document_id={document_id} 완료") + + except Exception as e: + # 실패 처리 + async with async_session() as session: + item = await session.get(ProcessingQueue, queue_id) item.error_message = str(e)[:500] if item.attempts >= item.max_attempts: item.status = "failed" - logger.error(f"[{stage}] document_id={item.document_id} 영구 실패: {e}") + logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}") else: item.status = "pending" item.started_at = None - logger.warning(f"[{stage}] document_id={item.document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}") + logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}") await session.commit()