"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출. PR-DocSrv-Markdown-Consumer-Split-1: markdown(marker) stage 를 별 consumer 로 분리. 대형 PDF split 변환(수십 분) 이 단일 consume_queue 코루틴을 점유해 전 파이프라인을 stall 시키던 문제 제거. consume_queue = markdown 제외 전 stage / consume_markdown_queue = markdown 전용. 두 consumer 의 stage 집합은 disjoint 이라 같은 row 경합/중복 reset 없음. """ import os from datetime import datetime, timedelta, timezone from sqlalchemy import select, update, delete, exists from sqlalchemy.exc import IntegrityError, SQLAlchemyError from sqlalchemy.orm import aliased from core.config import settings from core.database import async_session from core.utils import setup_logger from models.queue import ProcessingQueue, StageDeferred, enqueue_stage, not_deferred_condition logger = setup_logger("queue_consumer") # pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만. _hold_logged = False # stage별 배치 크기 # stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움. # deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. # fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면 # 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수. # embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이 # LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음. # 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일). BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1, "fulltext": 3} STALE_THRESHOLD_MINUTES = 10 # markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다. # marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분 # stale 임계로 보면 살아있는 변환을 stale 로 오인 → pending 복구해 이중 처리한다. # 따라서 markdown consumer 는 별도의 generous 임계를 쓴다. MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120")) # consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue, # embed/chunk 는 consume_fast_queue (2026-06-12) 로 분리 — 세 집합은 disjoint # (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험). # STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up). MAIN_QUEUE_STAGES = [ "extract", "classify", "summarize", "preview", "stt", "thumbnail", "fulltext", ] MARKDOWN_QUEUE_STAGES = ["markdown"] # 2026-06-15: deep_summary(26B, 콜당 70~300s)를 메인 루프에서 분리 (markdown/fast 선례). # 단일 deep 호출이 1분 틱을 초과해 메인 consume_queue 가 영구 coalesce 되고 extract/ # classify 등 경량 stage 까지 굶던 문제 제거. 집합 disjoint(자기 집합만 stale reset). DEEP_QUEUE_STAGES = ["deep_summary"] # 고속(비-LLM·경량 GPU) stage — LLM 사이클(분 단위)에서 분리해 1분 잡 전용 소비. # embed/chunk 는 건당 <1s 라 main 루프에 두면 classify(~190s×3) 뒤에서 굶는다 # (2026-06-12 실측: 적체 3,570 · 4070 가동률 0%). markdown 분리(05-01)와 동일 패턴. FAST_QUEUE_STAGES = ["embed", "chunk"] async def reset_stale_items(stages, threshold_minutes=STALE_THRESHOLD_MINUTES): """processing 상태로 오래 방치된 항목 복구 (지정 stage 한정) 1) 같은 (document_id, stage)에 pending 행이 이미 있으면 stale processing 행은 중복이므로 삭제 2) pending이 없는 stale processing 행만 pending으로 되돌림 stages: 이 reset 의 대상 stage 목록. consume_queue 와 consume_markdown_queue 가 서로 disjoint 한 stage 집합 + 서로 다른 threshold 로 호출한다. """ cutoff = datetime.now(timezone.utc) - timedelta(minutes=threshold_minutes) processing_row = aliased(ProcessingQueue) pending_row = aliased(ProcessingQueue) try: async with async_session() as session: # Step A: pending 중복이 이미 있는 stale processing 삭제 delete_stmt = ( delete(ProcessingQueue) .where( ProcessingQueue.id.in_( select(processing_row.id).where( processing_row.stage.in_(stages), processing_row.status == "processing", processing_row.started_at.is_not(None), processing_row.started_at < cutoff, exists( select(1).where( pending_row.document_id == processing_row.document_id, pending_row.stage == processing_row.stage, pending_row.status == "pending", ) ), ) ) ) ) delete_result = await session.execute(delete_stmt) # Step B: pending 없는 stale processing만 pending으로 복구 recoverable_ids = select(processing_row.id).where( processing_row.stage.in_(stages), processing_row.status == "processing", processing_row.started_at.is_not(None), processing_row.started_at < cutoff, ~exists( select(1).where( pending_row.document_id == processing_row.document_id, pending_row.stage == processing_row.stage, pending_row.status == "pending", ) ), ) update_stmt = ( update(ProcessingQueue) .where(ProcessingQueue.id.in_(recoverable_ids)) .values(status="pending", started_at=None) ) update_result = await session.execute(update_stmt) await session.commit() deleted = delete_result.rowcount or 0 recovered = update_result.rowcount or 0 if deleted > 0: logger.warning( "deleted %s stale processing rows that already had pending duplicates (stages=%s)", deleted, stages, ) if recovered > 0: logger.warning( "recovered %s stale processing rows back to pending (stages=%s)", recovered, stages, ) except IntegrityError: logger.exception("reset_stale_items failed with IntegrityError; skipping this cycle") except SQLAlchemyError: logger.exception("reset_stale_items failed with database error; skipping this cycle") except Exception: logger.exception("reset_stale_items failed unexpectedly; skipping this cycle") async def enqueue_next_stage(document_id: int, current_stage: str): """현재 stage 완료 후 다음 stage를 pending으로 등록. §3 추가: stt → [classify] (audio 는 extract 건너뛰고 stt 가 extracted_text 를 채움) thumbnail → [] (video 는 leaf — classify/embed 없음) Web/Blog ingest (devonagent 트랙) — plan db-snuggly-petal.md: source_channel='devonagent' 인 doc 의 extract 완료 시 classify/preview/markdown 전부 SKIP → [embed, chunk] 만 enqueue. AI 가공 (ai_tldr/ai_bullets 등) 은 별 PR (Mac mini derived-worker). """ # source_channel-aware override (extract stage 만). source_channel 누락 시 _default. extract_override_by_channel = { "devonagent": ["embed", "chunk"], # crawl 채널 파일형 (KOSHA 첨부/GUIDE PDF 등): preview 사전 캐시 스킵 — # 재료 코퍼스 대량 백필이 preview 큐를 점령하지 않게. classify → embed/chunk/markdown 유지. "crawl": ["classify"], } next_stages = { "extract": ["classify", "preview"], "classify": ["embed", "chunk", "markdown"], "stt": ["classify"], } # extract 의 경우만 doc.source_channel 을 lookup 해서 override 적용 if current_stage == "extract": from models.document import Document async with async_session() as lookup_session: doc = await lookup_session.get(Document, document_id) sc = doc.source_channel if doc else None if sc in extract_override_by_channel: stages = extract_override_by_channel[sc] else: stages = next_stages.get(current_stage, []) else: stages = next_stages.get(current_stage, []) if not stages: return async with async_session() as session: for next_stage in stages: await enqueue_stage(session, document_id, next_stage) await session.commit() def _load_workers(): """stage → worker process 함수 dict (lazy import — 순환 import 회피).""" from workers.classify_worker import process as classify_process from workers.chunk_worker import process as chunk_process from workers.deep_summary_worker import process as deep_summary_process from workers.embed_worker import process as embed_process from workers.extract_worker import process as extract_process from workers.preview_worker import process as preview_process from workers.stt_worker import process as stt_process from workers.summarize_worker import process as summarize_process from workers.thumbnail_worker import process as thumbnail_process from workers.marker_worker import process as marker_process from workers.fulltext_worker import process as fulltext_process return { "extract": extract_process, "classify": classify_process, "summarize": summarize_process, "embed": embed_process, "chunk": chunk_process, "preview": preview_process, "stt": stt_process, "thumbnail": thumbnail_process, # PR-B B-1: classify 가 에스컬레이션 판단 시 enqueue → 26B 가 detail_summary 작성. # next_stages 에 추가하지 않음 — deep_summary 는 leaf (classify→embed/chunk 흐름과 독립). "deep_summary": deep_summary_process, # Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립). # consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록). "markdown": marker_process, # crawl-24x7 A-2: 기사 페이지 fetch → 4-tier 본문 승격. 후속(summarize/embed/chunk)은 # 워커가 직접 enqueue — next_stages dict 미등록 (enqueue_next_stage no-op). "fulltext": fulltext_process, } async def _process_stage(stage, worker_fn): """단일 stage 의 pending 항목을 batch 만큼 가져와 워커 실행. consume_queue / consume_markdown_queue 가 공유한다. 항목별 독립 세션 + processing→completed/failed 상태 전이 + 재시도 정책은 기존 로직 그대로. """ batch_size = BATCH_SIZE.get(stage, 3) # pending 항목 조회 (보류 백오프 deferred_until 미래 항목 제외 — ds-macbook-offload-1) async with async_session() as session: result = await session.execute( select(ProcessingQueue.id, ProcessingQueue.document_id) .where( ProcessingQueue.stage == stage, ProcessingQueue.status == "pending", not_deferred_condition(), ) .order_by(ProcessingQueue.created_at) .limit(batch_size) ) pending_items = result.all() # 각 항목을 독립 세션에서 처리 for queue_id, document_id in pending_items: # 상태를 processing으로 변경 async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if not item or item.status != "pending": continue item.status = "processing" item.started_at = datetime.now(timezone.utc) item.attempts += 1 await session.commit() # 워커 실행 (독립 세션) try: # note(메모)는 이미 extracted_text가 있으므로 extract/preview skip if stage in ("extract", "preview"): from models.document import Document async with async_session() as check_session: doc = await check_session.get(Document, document_id) if doc and doc.file_type == "note": async with async_session() as skip_session: item = await skip_session.get(ProcessingQueue, queue_id) if item: item.status = "completed" item.completed_at = datetime.now(timezone.utc) await skip_session.commit() await enqueue_next_stage(document_id, stage) logger.info(f"[{stage}] document_id={document_id} skip (note)") continue async with async_session() as worker_session: await worker_fn(document_id, worker_session) await worker_session.commit() # 완료 처리 async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if not item: logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") continue item.status = "completed" item.completed_at = datetime.now(timezone.utc) await session.commit() await enqueue_next_stage(document_id, stage) logger.info(f"[{stage}] document_id={document_id} 완료") except StageDeferred as defer: # 보류 (ds-macbook-offload-1): 맥북 일시 불가(sleep/cold/editor_busy) — 실패 아님. # attempts 는 claim 시 선증가분을 반환(미소모)하고 deferred_until 백오프 후 자연 재개. # 워커는 완주 전 doc 쓰기를 하지 않으므로 이 시점의 데이터 변경 = 0 (sleep-안전). async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if not item: logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") continue item.status = "pending" item.started_at = None item.attempts = max(0, item.attempts - 1) until = datetime.now(timezone.utc) + timedelta(minutes=defer.retry_after_minutes) item.payload = {**(item.payload or {}), "deferred_until": until.isoformat()} await session.commit() logger.info( f"[{stage}] document_id={document_id} 보류({defer}) — " f"{defer.retry_after_minutes}분 후 재개" ) except Exception as e: # 실패 처리 async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if not item: logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") continue # 빈 메시지 방어: str → repr → 클래스명 순 fallback err_text = str(e) or repr(e) or type(e).__name__ item.error_message = err_text[:500] if item.attempts >= item.max_attempts: item.status = "failed" logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}") # B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다. # 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형 # batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan # (큐는 failed, 문서는 processing). 큐가 영구 failed 가 될 때 doc 상태도 # 동기화한다. 재시도 중(attempts