"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출""" from datetime import datetime, timedelta, timezone from sqlalchemy import select, update from core.database import async_session from core.utils import setup_logger from models.queue import ProcessingQueue logger = setup_logger("queue_consumer") # stage별 배치 크기 BATCH_SIZE = {"extract": 5, "classify": 3, "embed": 1, "preview": 2} STALE_THRESHOLD_MINUTES = 10 async def reset_stale_items(): """processing 상태로 10분 이상 방치된 항목 복구""" cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_THRESHOLD_MINUTES) async with async_session() as session: result = await session.execute( update(ProcessingQueue) .where( ProcessingQueue.status == "processing", ProcessingQueue.started_at < cutoff, ) .values(status="pending", started_at=None) ) if result.rowcount > 0: await session.commit() logger.warning(f"stale 항목 {result.rowcount}건 복구") async def enqueue_next_stage(document_id: int, current_stage: str): """현재 stage 완료 후 다음 stage를 pending으로 등록""" next_stages = {"extract": ["classify", "preview"], "classify": ["embed"]} stages = next_stages.get(current_stage, []) if not stages: return async with async_session() as session: for next_stage in stages: existing = await session.execute( select(ProcessingQueue).where( ProcessingQueue.document_id == document_id, ProcessingQueue.stage == next_stage, ProcessingQueue.status.in_(["pending", "processing"]), ) ) if existing.scalar_one_or_none(): continue session.add(ProcessingQueue( document_id=document_id, stage=next_stage, status="pending", )) await session.commit() async def consume_queue(): """큐에서 pending 항목을 가져와 stage별 워커 실행""" from workers.classify_worker import process as classify_process from workers.embed_worker import process as embed_process from workers.extract_worker import process as extract_process from workers.preview_worker import process as preview_process workers = { "extract": extract_process, "classify": classify_process, "embed": embed_process, "preview": preview_process, } await reset_stale_items() for stage, worker_fn in workers.items(): batch_size = BATCH_SIZE.get(stage, 3) # pending 항목 조회 async with async_session() as session: result = await session.execute( select(ProcessingQueue.id, ProcessingQueue.document_id) .where( ProcessingQueue.stage == stage, ProcessingQueue.status == "pending", ) .order_by(ProcessingQueue.created_at) .limit(batch_size) ) pending_items = result.all() # 각 항목을 독립 세션에서 처리 for queue_id, document_id in pending_items: # 상태를 processing으로 변경 async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if not item or item.status != "pending": continue item.status = "processing" item.started_at = datetime.now(timezone.utc) item.attempts += 1 await session.commit() # 워커 실행 (독립 세션) try: async with async_session() as worker_session: await worker_fn(document_id, worker_session) await worker_session.commit() # 완료 처리 async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if not item: logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") continue item.status = "completed" item.completed_at = datetime.now(timezone.utc) await session.commit() await enqueue_next_stage(document_id, stage) logger.info(f"[{stage}] document_id={document_id} 완료") except Exception as e: # 실패 처리 async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if not item: logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") continue item.error_message = str(e)[:500] if item.attempts >= item.max_attempts: item.status = "failed" logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}") else: item.status = "pending" item.started_at = None logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}") await session.commit()