"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출""" from datetime import datetime, timedelta, timezone from sqlalchemy import select, update, delete, exists from sqlalchemy.exc import IntegrityError, SQLAlchemyError from sqlalchemy.orm import aliased from core.database import async_session from core.utils import setup_logger from models.queue import ProcessingQueue, enqueue_stage logger = setup_logger("queue_consumer") # stage별 배치 크기 # stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움. # deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1} STALE_THRESHOLD_MINUTES = 10 async def reset_stale_items(): """processing 상태로 오래 방치된 항목 복구 1) 같은 (document_id, stage)에 pending 행이 이미 있으면 stale processing 행은 중복이므로 삭제 2) pending이 없는 stale processing 행만 pending으로 되돌림 """ cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_THRESHOLD_MINUTES) processing_row = aliased(ProcessingQueue) pending_row = aliased(ProcessingQueue) try: async with async_session() as session: # Step A: pending 중복이 이미 있는 stale processing 삭제 delete_stmt = ( delete(ProcessingQueue) .where( ProcessingQueue.id.in_( select(processing_row.id).where( processing_row.status == "processing", processing_row.started_at.is_not(None), processing_row.started_at < cutoff, exists( select(1).where( pending_row.document_id == processing_row.document_id, pending_row.stage == processing_row.stage, pending_row.status == "pending", ) ), ) ) ) ) delete_result = await session.execute(delete_stmt) # Step B: pending 없는 stale processing만 pending으로 복구 recoverable_ids = select(processing_row.id).where( processing_row.status == "processing", processing_row.started_at.is_not(None), processing_row.started_at < cutoff, ~exists( select(1).where( pending_row.document_id == processing_row.document_id, pending_row.stage == processing_row.stage, pending_row.status == "pending", ) ), ) update_stmt = ( update(ProcessingQueue) .where(ProcessingQueue.id.in_(recoverable_ids)) .values(status="pending", started_at=None) ) update_result = await session.execute(update_stmt) await session.commit() deleted = delete_result.rowcount or 0 recovered = update_result.rowcount or 0 if deleted > 0: logger.warning( "deleted %s stale processing rows that already had pending duplicates", deleted, ) if recovered > 0: logger.warning( "recovered %s stale processing rows back to pending", recovered, ) except IntegrityError: logger.exception("reset_stale_items failed with IntegrityError; skipping this cycle") except SQLAlchemyError: logger.exception("reset_stale_items failed with database error; skipping this cycle") except Exception: logger.exception("reset_stale_items failed unexpectedly; skipping this cycle") async def enqueue_next_stage(document_id: int, current_stage: str): """현재 stage 완료 후 다음 stage를 pending으로 등록. §3 추가: stt → [classify] (audio 는 extract 건너뛰고 stt 가 extracted_text 를 채움) thumbnail → [] (video 는 leaf — classify/embed 없음) """ next_stages = { "extract": ["classify", "preview"], "classify": ["embed", "chunk", "markdown"], "stt": ["classify"], } stages = next_stages.get(current_stage, []) if not stages: return async with async_session() as session: for next_stage in stages: await enqueue_stage(session, document_id, next_stage) await session.commit() async def consume_queue(): """큐에서 pending 항목을 가져와 stage별 워커 실행""" from workers.classify_worker import process as classify_process from workers.chunk_worker import process as chunk_process from workers.deep_summary_worker import process as deep_summary_process from workers.embed_worker import process as embed_process from workers.extract_worker import process as extract_process from workers.preview_worker import process as preview_process from workers.stt_worker import process as stt_process from workers.summarize_worker import process as summarize_process from workers.thumbnail_worker import process as thumbnail_process from workers.marker_worker import process as marker_process workers = { "extract": extract_process, "classify": classify_process, "summarize": summarize_process, "embed": embed_process, "chunk": chunk_process, "preview": preview_process, "stt": stt_process, "thumbnail": thumbnail_process, # PR-B B-1: classify 가 에스컬레이션 판단 시 enqueue → 26B 가 detail_summary 작성. # next_stages 에 추가하지 않음 — deep_summary 는 leaf (classify→embed/chunk 흐름과 독립). "deep_summary": deep_summary_process, # Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립). "markdown": marker_process, } try: await reset_stale_items() except Exception: logger.exception("stale reset failed, but continuing queue consumption") for stage, worker_fn in workers.items(): batch_size = BATCH_SIZE.get(stage, 3) # pending 항목 조회 async with async_session() as session: result = await session.execute( select(ProcessingQueue.id, ProcessingQueue.document_id) .where( ProcessingQueue.stage == stage, ProcessingQueue.status == "pending", ) .order_by(ProcessingQueue.created_at) .limit(batch_size) ) pending_items = result.all() # 각 항목을 독립 세션에서 처리 for queue_id, document_id in pending_items: # 상태를 processing으로 변경 async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if not item or item.status != "pending": continue item.status = "processing" item.started_at = datetime.now(timezone.utc) item.attempts += 1 await session.commit() # 워커 실행 (독립 세션) try: # note(메모)는 이미 extracted_text가 있으므로 extract/preview skip if stage in ("extract", "preview"): from models.document import Document async with async_session() as check_session: doc = await check_session.get(Document, document_id) if doc and doc.file_type == "note": async with async_session() as skip_session: item = await skip_session.get(ProcessingQueue, queue_id) if item: item.status = "completed" item.completed_at = datetime.now(timezone.utc) await skip_session.commit() await enqueue_next_stage(document_id, stage) logger.info(f"[{stage}] document_id={document_id} skip (note)") continue async with async_session() as worker_session: await worker_fn(document_id, worker_session) await worker_session.commit() # 완료 처리 async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if not item: logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") continue item.status = "completed" item.completed_at = datetime.now(timezone.utc) await session.commit() await enqueue_next_stage(document_id, stage) logger.info(f"[{stage}] document_id={document_id} 완료") except Exception as e: # 실패 처리 async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if not item: logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") continue # 빈 메시지 방어: str → repr → 클래스명 순 fallback err_text = str(e) or repr(e) or type(e).__name__ item.error_message = err_text[:500] if item.attempts >= item.max_attempts: item.status = "failed" logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}") else: item.status = "pending" item.started_at = None logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}") await session.commit()