fix(search): split markdown into dedicated queue consumer to prevent pipeline stall
대형 PDF split 변환(5210 ≈ 40분 실측)이 단일 consume_queue 코루틴을 점유해 extract/classify/embed/chunk 등 전 파이프라인을 stall 시키던 문제 제거. - consume_markdown_queue 신규 — markdown 전용 scheduler job (id=markdown_consumer) - consume_queue 는 MAIN_QUEUE_STAGES (markdown 제외) 만 처리 - _process_stage / _load_workers 헬퍼로 per-stage 로직 공유 - reset_stale_items(stages, threshold_minutes) 파라미터화: main=10min(markdown 제외), markdown=MARKDOWN_STALE_MINUTES(기본 120). marker_worker 는 heartbeat 미기록이라 40분 변환을 10분 stale 로 오인하던 함정 차단 - enqueue flow (classify -> embed,chunk,markdown) 불변 STT/deep_summary 분리 + GPU 동시성 튜닝은 out of scope (follow-up). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+5
-1
@@ -51,7 +51,7 @@ async def lifespan(app: FastAPI):
|
||||
from workers.law_monitor import run as law_monitor_run
|
||||
from workers.mailplus_archive import run as mailplus_run
|
||||
from workers.news_collector import run as news_collector_run
|
||||
from workers.queue_consumer import consume_queue
|
||||
from workers.queue_consumer import consume_queue, consume_markdown_queue
|
||||
from workers.study_queue_consumer import consume_study_queue
|
||||
from workers.study_session_queue_consumer import consume_study_session_queue
|
||||
from workers.study_question_embed_worker import (
|
||||
@@ -77,6 +77,10 @@ async def lifespan(app: FastAPI):
|
||||
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
|
||||
# 상시 실행
|
||||
scheduler.add_job(consume_queue, "interval", minutes=1, id="queue_consumer")
|
||||
# PR-DocSrv-Markdown-Consumer-Split-1: markdown(marker) 전용 consumer.
|
||||
# 대형 PDF split 변환(수십 분)이 메인 consume_queue 를 점유해 전 파이프라인을
|
||||
# stall 시키던 문제 제거. max_instances=1(기본) 으로 동시 marker 변환 2건은 방지.
|
||||
scheduler.add_job(consume_markdown_queue, "interval", minutes=1, id="markdown_consumer")
|
||||
scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher")
|
||||
scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup")
|
||||
# PR-4: study_questions 자동 임베딩 (status='none/failed/stale' 행을 batch=10 처리).
|
||||
|
||||
+146
-88
@@ -1,5 +1,12 @@
|
||||
"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출"""
|
||||
"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출.
|
||||
|
||||
PR-DocSrv-Markdown-Consumer-Split-1: markdown(marker) stage 를 별 consumer 로 분리.
|
||||
대형 PDF split 변환(수십 분) 이 단일 consume_queue 코루틴을 점유해 전 파이프라인을
|
||||
stall 시키던 문제 제거. consume_queue = markdown 제외 전 stage / consume_markdown_queue =
|
||||
markdown 전용. 두 consumer 의 stage 집합은 disjoint 이라 같은 row 경합/중복 reset 없음.
|
||||
"""
|
||||
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from sqlalchemy import select, update, delete, exists
|
||||
@@ -18,16 +25,32 @@ logger = setup_logger("queue_consumer")
|
||||
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
|
||||
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1}
|
||||
STALE_THRESHOLD_MINUTES = 10
|
||||
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
|
||||
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
|
||||
# stale 임계로 보면 살아있는 변환을 stale 로 오인 → pending 복구해 이중 처리한다.
|
||||
# 따라서 markdown consumer 는 별도의 generous 임계를 쓴다.
|
||||
MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"))
|
||||
|
||||
# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue 로 분리.
|
||||
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
|
||||
MAIN_QUEUE_STAGES = [
|
||||
"extract", "classify", "summarize", "embed", "chunk",
|
||||
"preview", "stt", "thumbnail", "deep_summary",
|
||||
]
|
||||
MARKDOWN_QUEUE_STAGES = ["markdown"]
|
||||
|
||||
|
||||
async def reset_stale_items():
|
||||
"""processing 상태로 오래 방치된 항목 복구
|
||||
async def reset_stale_items(stages, threshold_minutes=STALE_THRESHOLD_MINUTES):
|
||||
"""processing 상태로 오래 방치된 항목 복구 (지정 stage 한정)
|
||||
|
||||
1) 같은 (document_id, stage)에 pending 행이 이미 있으면
|
||||
stale processing 행은 중복이므로 삭제
|
||||
2) pending이 없는 stale processing 행만 pending으로 되돌림
|
||||
|
||||
stages: 이 reset 의 대상 stage 목록. consume_queue 와 consume_markdown_queue 가
|
||||
서로 disjoint 한 stage 집합 + 서로 다른 threshold 로 호출한다.
|
||||
"""
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_THRESHOLD_MINUTES)
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(minutes=threshold_minutes)
|
||||
processing_row = aliased(ProcessingQueue)
|
||||
pending_row = aliased(ProcessingQueue)
|
||||
|
||||
@@ -39,6 +62,7 @@ async def reset_stale_items():
|
||||
.where(
|
||||
ProcessingQueue.id.in_(
|
||||
select(processing_row.id).where(
|
||||
processing_row.stage.in_(stages),
|
||||
processing_row.status == "processing",
|
||||
processing_row.started_at.is_not(None),
|
||||
processing_row.started_at < cutoff,
|
||||
@@ -57,6 +81,7 @@ async def reset_stale_items():
|
||||
|
||||
# Step B: pending 없는 stale processing만 pending으로 복구
|
||||
recoverable_ids = select(processing_row.id).where(
|
||||
processing_row.stage.in_(stages),
|
||||
processing_row.status == "processing",
|
||||
processing_row.started_at.is_not(None),
|
||||
processing_row.started_at < cutoff,
|
||||
@@ -81,13 +106,13 @@ async def reset_stale_items():
|
||||
recovered = update_result.rowcount or 0
|
||||
if deleted > 0:
|
||||
logger.warning(
|
||||
"deleted %s stale processing rows that already had pending duplicates",
|
||||
deleted,
|
||||
"deleted %s stale processing rows that already had pending duplicates (stages=%s)",
|
||||
deleted, stages,
|
||||
)
|
||||
if recovered > 0:
|
||||
logger.warning(
|
||||
"recovered %s stale processing rows back to pending",
|
||||
recovered,
|
||||
"recovered %s stale processing rows back to pending (stages=%s)",
|
||||
recovered, stages,
|
||||
)
|
||||
except IntegrityError:
|
||||
logger.exception("reset_stale_items failed with IntegrityError; skipping this cycle")
|
||||
@@ -142,8 +167,8 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def consume_queue():
|
||||
"""큐에서 pending 항목을 가져와 stage별 워커 실행"""
|
||||
def _load_workers():
|
||||
"""stage → worker process 함수 dict (lazy import — 순환 import 회피)."""
|
||||
from workers.classify_worker import process as classify_process
|
||||
from workers.chunk_worker import process as chunk_process
|
||||
from workers.deep_summary_worker import process as deep_summary_process
|
||||
@@ -155,7 +180,7 @@ async def consume_queue():
|
||||
from workers.thumbnail_worker import process as thumbnail_process
|
||||
from workers.marker_worker import process as marker_process
|
||||
|
||||
workers = {
|
||||
return {
|
||||
"extract": extract_process,
|
||||
"classify": classify_process,
|
||||
"summarize": summarize_process,
|
||||
@@ -168,92 +193,125 @@ async def consume_queue():
|
||||
# next_stages 에 추가하지 않음 — deep_summary 는 leaf (classify→embed/chunk 흐름과 독립).
|
||||
"deep_summary": deep_summary_process,
|
||||
# Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립).
|
||||
# consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록).
|
||||
"markdown": marker_process,
|
||||
}
|
||||
|
||||
|
||||
async def _process_stage(stage, worker_fn):
|
||||
"""단일 stage 의 pending 항목을 batch 만큼 가져와 워커 실행.
|
||||
|
||||
consume_queue / consume_markdown_queue 가 공유한다. 항목별 독립 세션 +
|
||||
processing→completed/failed 상태 전이 + 재시도 정책은 기존 로직 그대로.
|
||||
"""
|
||||
batch_size = BATCH_SIZE.get(stage, 3)
|
||||
|
||||
# pending 항목 조회
|
||||
async with async_session() as session:
|
||||
result = await session.execute(
|
||||
select(ProcessingQueue.id, ProcessingQueue.document_id)
|
||||
.where(
|
||||
ProcessingQueue.stage == stage,
|
||||
ProcessingQueue.status == "pending",
|
||||
)
|
||||
.order_by(ProcessingQueue.created_at)
|
||||
.limit(batch_size)
|
||||
)
|
||||
pending_items = result.all()
|
||||
|
||||
# 각 항목을 독립 세션에서 처리
|
||||
for queue_id, document_id in pending_items:
|
||||
# 상태를 processing으로 변경
|
||||
async with async_session() as session:
|
||||
item = await session.get(ProcessingQueue, queue_id)
|
||||
if not item or item.status != "pending":
|
||||
continue
|
||||
item.status = "processing"
|
||||
item.started_at = datetime.now(timezone.utc)
|
||||
item.attempts += 1
|
||||
await session.commit()
|
||||
|
||||
# 워커 실행 (독립 세션)
|
||||
try:
|
||||
# note(메모)는 이미 extracted_text가 있으므로 extract/preview skip
|
||||
if stage in ("extract", "preview"):
|
||||
from models.document import Document
|
||||
async with async_session() as check_session:
|
||||
doc = await check_session.get(Document, document_id)
|
||||
if doc and doc.file_type == "note":
|
||||
async with async_session() as skip_session:
|
||||
item = await skip_session.get(ProcessingQueue, queue_id)
|
||||
if item:
|
||||
item.status = "completed"
|
||||
item.completed_at = datetime.now(timezone.utc)
|
||||
await skip_session.commit()
|
||||
await enqueue_next_stage(document_id, stage)
|
||||
logger.info(f"[{stage}] document_id={document_id} skip (note)")
|
||||
continue
|
||||
|
||||
async with async_session() as worker_session:
|
||||
await worker_fn(document_id, worker_session)
|
||||
await worker_session.commit()
|
||||
|
||||
# 완료 처리
|
||||
async with async_session() as session:
|
||||
item = await session.get(ProcessingQueue, queue_id)
|
||||
if not item:
|
||||
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
|
||||
continue
|
||||
item.status = "completed"
|
||||
item.completed_at = datetime.now(timezone.utc)
|
||||
await session.commit()
|
||||
|
||||
await enqueue_next_stage(document_id, stage)
|
||||
logger.info(f"[{stage}] document_id={document_id} 완료")
|
||||
|
||||
except Exception as e:
|
||||
# 실패 처리
|
||||
async with async_session() as session:
|
||||
item = await session.get(ProcessingQueue, queue_id)
|
||||
if not item:
|
||||
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
|
||||
continue
|
||||
# 빈 메시지 방어: str → repr → 클래스명 순 fallback
|
||||
err_text = str(e) or repr(e) or type(e).__name__
|
||||
item.error_message = err_text[:500]
|
||||
if item.attempts >= item.max_attempts:
|
||||
item.status = "failed"
|
||||
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
|
||||
else:
|
||||
item.status = "pending"
|
||||
item.started_at = None
|
||||
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def consume_queue():
|
||||
"""메인 큐 소비자 — markdown 제외 전 stage 를 1분 간격으로 처리."""
|
||||
workers = _load_workers()
|
||||
|
||||
try:
|
||||
await reset_stale_items()
|
||||
await reset_stale_items(MAIN_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
|
||||
except Exception:
|
||||
logger.exception("stale reset failed, but continuing queue consumption")
|
||||
|
||||
for stage, worker_fn in workers.items():
|
||||
batch_size = BATCH_SIZE.get(stage, 3)
|
||||
for stage in MAIN_QUEUE_STAGES:
|
||||
await _process_stage(stage, workers[stage])
|
||||
|
||||
# pending 항목 조회
|
||||
async with async_session() as session:
|
||||
result = await session.execute(
|
||||
select(ProcessingQueue.id, ProcessingQueue.document_id)
|
||||
.where(
|
||||
ProcessingQueue.stage == stage,
|
||||
ProcessingQueue.status == "pending",
|
||||
)
|
||||
.order_by(ProcessingQueue.created_at)
|
||||
.limit(batch_size)
|
||||
)
|
||||
pending_items = result.all()
|
||||
|
||||
# 각 항목을 독립 세션에서 처리
|
||||
for queue_id, document_id in pending_items:
|
||||
# 상태를 processing으로 변경
|
||||
async with async_session() as session:
|
||||
item = await session.get(ProcessingQueue, queue_id)
|
||||
if not item or item.status != "pending":
|
||||
continue
|
||||
item.status = "processing"
|
||||
item.started_at = datetime.now(timezone.utc)
|
||||
item.attempts += 1
|
||||
await session.commit()
|
||||
async def consume_markdown_queue():
|
||||
"""markdown 전용 큐 소비자 — 대형 PDF split 변환을 메인 파이프라인과 분리.
|
||||
|
||||
# 워커 실행 (독립 세션)
|
||||
try:
|
||||
# note(메모)는 이미 extracted_text가 있으므로 extract/preview skip
|
||||
if stage in ("extract", "preview"):
|
||||
from models.document import Document
|
||||
async with async_session() as check_session:
|
||||
doc = await check_session.get(Document, document_id)
|
||||
if doc and doc.file_type == "note":
|
||||
async with async_session() as skip_session:
|
||||
item = await skip_session.get(ProcessingQueue, queue_id)
|
||||
if item:
|
||||
item.status = "completed"
|
||||
item.completed_at = datetime.now(timezone.utc)
|
||||
await skip_session.commit()
|
||||
await enqueue_next_stage(document_id, stage)
|
||||
logger.info(f"[{stage}] document_id={document_id} skip (note)")
|
||||
continue
|
||||
한 doc 변환이 수십 분 걸려도 메인 consume_queue 는 영향받지 않는다.
|
||||
APScheduler max_instances=1(기본) 이므로 변환 진행 중엔 이 consumer 의
|
||||
다음 fire 만 coalesce 된다(동시 marker 변환 2건 방지 — 의도된 직렬화).
|
||||
"""
|
||||
workers = _load_workers()
|
||||
|
||||
async with async_session() as worker_session:
|
||||
await worker_fn(document_id, worker_session)
|
||||
await worker_session.commit()
|
||||
try:
|
||||
await reset_stale_items(MARKDOWN_QUEUE_STAGES, MARKDOWN_STALE_THRESHOLD_MINUTES)
|
||||
except Exception:
|
||||
logger.exception("markdown stale reset failed, but continuing queue consumption")
|
||||
|
||||
# 완료 처리
|
||||
async with async_session() as session:
|
||||
item = await session.get(ProcessingQueue, queue_id)
|
||||
if not item:
|
||||
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
|
||||
continue
|
||||
item.status = "completed"
|
||||
item.completed_at = datetime.now(timezone.utc)
|
||||
await session.commit()
|
||||
|
||||
await enqueue_next_stage(document_id, stage)
|
||||
logger.info(f"[{stage}] document_id={document_id} 완료")
|
||||
|
||||
except Exception as e:
|
||||
# 실패 처리
|
||||
async with async_session() as session:
|
||||
item = await session.get(ProcessingQueue, queue_id)
|
||||
if not item:
|
||||
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
|
||||
continue
|
||||
# 빈 메시지 방어: str → repr → 클래스명 순 fallback
|
||||
err_text = str(e) or repr(e) or type(e).__name__
|
||||
item.error_message = err_text[:500]
|
||||
if item.attempts >= item.max_attempts:
|
||||
item.status = "failed"
|
||||
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
|
||||
else:
|
||||
item.status = "pending"
|
||||
item.started_at = None
|
||||
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
|
||||
await session.commit()
|
||||
for stage in MARKDOWN_QUEUE_STAGES:
|
||||
await _process_stage(stage, workers[stage])
|
||||
|
||||
Reference in New Issue
Block a user