Files
hyungi_document_server/app/workers/queue_consumer.py
T
hyungi a82b0724df fix(news): digest/briefing 생성 LLM 타임아웃 게이트 단일소스화 + deep_summary 컨슈머 분리
2026-06-11 맥미니 모델 교체(Gemma4 26B→Qwen3.6-27B-6bit, 콜당 ~90~300s)의
타임아웃 상향 sweep 이 config.yaml/synthesis 만 갱신하고 digest/briefing 코드의
하드코딩 LLM_CALL_TIMEOUT=25(빠른 Gemma 기준)를 누락 → digest 600s 하드캡 초과로
06-10 이후 미생성, briefing 4/4 LLM 폴백(status=failed). (적대 리뷰로 블로커 정정:
concurrency=1 사설 세마포로는 digest 44~68 클러스터가 하드캡에 여전히 걸림 + llm_gate
영구 룰 위반.)

- 타임아웃·재시도·하드캡을 config.pipeline 단일소스로 이관(digest_llm_timeout_s=300,
  attempts=2, pipeline_hard_cap_s=3000). 다음 모델 교체 때 재발 차단.
- digest/briefing LLM 호출을 사설 Semaphore 제거하고 전역 MLX gate(BACKGROUND)
  경유로 변경 — llm_gate 영구 룰(같은 endpoint 단일 게이트, 새 Semaphore 금지) 준수 +
  ask/eid(FOREGROUND)와 조율. 동시성 lever = 기존 mlx_gate_concurrency 2→4
  (continuous batching 실측 — 3동시콜 wall 121s ≈ 단일콜, 직렬 대비 ~3배).
- digest/briefing pipeline cluster 루프를 asyncio.gather 동시 실행으로 전환
  (실동시성은 게이트가 제한, rank/순서 보존).
- deep_summary(70~300s)를 메인 consume_queue 에서 분리해 consume_deep_queue 신설
  (markdown/fast split 선례) — 단일 deep 호출이 1분 틱 초과로 메인 큐를 영구 coalesce
  시키던 문제 제거.
- 죽은 PIPELINE_HARD_CAP=600(briefing/pipeline.py) 제거, summarizer docstring 갱신,
  deep 컨슈머 disjoint/hold 테스트 추가.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 23:29:56 +00:00

434 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출.
PR-DocSrv-Markdown-Consumer-Split-1: markdown(marker) stage 를 별 consumer 로 분리.
대형 PDF split 변환(수십 분) 이 단일 consume_queue 코루틴을 점유해 전 파이프라인을
stall 시키던 문제 제거. consume_queue = markdown 제외 전 stage / consume_markdown_queue =
markdown 전용. 두 consumer 의 stage 집합은 disjoint 이라 같은 row 경합/중복 reset 없음.
"""
import os
from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update, delete, exists
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import aliased
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.queue import ProcessingQueue, StageDeferred, enqueue_stage, not_deferred_condition
logger = setup_logger("queue_consumer")
# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만.
_hold_logged = False
# stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
# embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이
# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음.
# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일).
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3}
STALE_THRESHOLD_MINUTES = 10
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
# stale 임계로 보면 살아있는 변환을 stale 로 오인 → pending 복구해 이중 처리한다.
# 따라서 markdown consumer 는 별도의 generous 임계를 쓴다.
MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"))
# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue,
# embed/chunk 는 consume_fast_queue (2026-06-12) 로 분리 — 세 집합은 disjoint
# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험).
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [
"extract", "classify", "summarize",
"preview", "stt", "thumbnail", "fulltext",
]
MARKDOWN_QUEUE_STAGES = ["markdown"]
# 2026-06-15: deep_summary(26B, 콜당 70~300s)를 메인 루프에서 분리 (markdown/fast 선례).
# 단일 deep 호출이 1분 틱을 초과해 메인 consume_queue 가 영구 coalesce 되고 extract/
# classify 등 경량 stage 까지 굶던 문제 제거. 집합 disjoint(자기 집합만 stale reset).
DEEP_QUEUE_STAGES = ["deep_summary"]
# 고속(비-LLM·경량 GPU) stage — LLM 사이클(분 단위)에서 분리해 1분 잡 전용 소비.
# embed/chunk 는 건당 <1s 라 main 루프에 두면 classify(~190s×3) 뒤에서 굶는다
# (2026-06-12 실측: 적체 3,570 · 4070 가동률 0%). markdown 분리(05-01)와 동일 패턴.
FAST_QUEUE_STAGES = ["embed", "chunk"]
async def reset_stale_items(stages, threshold_minutes=STALE_THRESHOLD_MINUTES):
"""processing 상태로 오래 방치된 항목 복구 (지정 stage 한정)
1) 같은 (document_id, stage)에 pending 행이 이미 있으면
stale processing 행은 중복이므로 삭제
2) pending이 없는 stale processing 행만 pending으로 되돌림
stages: 이 reset 의 대상 stage 목록. consume_queue 와 consume_markdown_queue 가
서로 disjoint 한 stage 집합 + 서로 다른 threshold 로 호출한다.
"""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=threshold_minutes)
processing_row = aliased(ProcessingQueue)
pending_row = aliased(ProcessingQueue)
try:
async with async_session() as session:
# Step A: pending 중복이 이미 있는 stale processing 삭제
delete_stmt = (
delete(ProcessingQueue)
.where(
ProcessingQueue.id.in_(
select(processing_row.id).where(
processing_row.stage.in_(stages),
processing_row.status == "processing",
processing_row.started_at.is_not(None),
processing_row.started_at < cutoff,
exists(
select(1).where(
pending_row.document_id == processing_row.document_id,
pending_row.stage == processing_row.stage,
pending_row.status == "pending",
)
),
)
)
)
)
delete_result = await session.execute(delete_stmt)
# Step B: pending 없는 stale processing만 pending으로 복구
recoverable_ids = select(processing_row.id).where(
processing_row.stage.in_(stages),
processing_row.status == "processing",
processing_row.started_at.is_not(None),
processing_row.started_at < cutoff,
~exists(
select(1).where(
pending_row.document_id == processing_row.document_id,
pending_row.stage == processing_row.stage,
pending_row.status == "pending",
)
),
)
update_stmt = (
update(ProcessingQueue)
.where(ProcessingQueue.id.in_(recoverable_ids))
.values(status="pending", started_at=None)
)
update_result = await session.execute(update_stmt)
await session.commit()
deleted = delete_result.rowcount or 0
recovered = update_result.rowcount or 0
if deleted > 0:
logger.warning(
"deleted %s stale processing rows that already had pending duplicates (stages=%s)",
deleted, stages,
)
if recovered > 0:
logger.warning(
"recovered %s stale processing rows back to pending (stages=%s)",
recovered, stages,
)
except IntegrityError:
logger.exception("reset_stale_items failed with IntegrityError; skipping this cycle")
except SQLAlchemyError:
logger.exception("reset_stale_items failed with database error; skipping this cycle")
except Exception:
logger.exception("reset_stale_items failed unexpectedly; skipping this cycle")
async def enqueue_next_stage(document_id: int, current_stage: str):
"""현재 stage 완료 후 다음 stage를 pending으로 등록.
§3 추가:
stt → [classify] (audio 는 extract 건너뛰고 stt 가 extracted_text 를 채움)
thumbnail → [] (video 는 leaf — classify/embed 없음)
Web/Blog ingest (devonagent 트랙) — plan db-snuggly-petal.md:
source_channel='devonagent' 인 doc 의 extract 완료 시
classify/preview/markdown 전부 SKIP → [embed, chunk] 만 enqueue.
AI 가공 (ai_tldr/ai_bullets 등) 은 별 PR (Mac mini derived-worker).
"""
# source_channel-aware override (extract stage 만). source_channel 누락 시 _default.
extract_override_by_channel = {
"devonagent": ["embed", "chunk"],
# crawl 채널 파일형 (KOSHA 첨부/GUIDE PDF 등): preview 사전 캐시 스킵 —
# 재료 코퍼스 대량 백필이 preview 큐를 점령하지 않게. classify → embed/chunk/markdown 유지.
"crawl": ["classify"],
}
next_stages = {
"extract": ["classify", "preview"],
"classify": ["embed", "chunk", "markdown"],
"stt": ["classify"],
}
# extract 의 경우만 doc.source_channel 을 lookup 해서 override 적용
if current_stage == "extract":
from models.document import Document
async with async_session() as lookup_session:
doc = await lookup_session.get(Document, document_id)
sc = doc.source_channel if doc else None
if sc in extract_override_by_channel:
stages = extract_override_by_channel[sc]
else:
stages = next_stages.get(current_stage, [])
else:
stages = next_stages.get(current_stage, [])
if not stages:
return
async with async_session() as session:
for next_stage in stages:
await enqueue_stage(session, document_id, next_stage)
await session.commit()
def _load_workers():
"""stage → worker process 함수 dict (lazy import — 순환 import 회피)."""
from workers.classify_worker import process as classify_process
from workers.chunk_worker import process as chunk_process
from workers.deep_summary_worker import process as deep_summary_process
from workers.embed_worker import process as embed_process
from workers.extract_worker import process as extract_process
from workers.preview_worker import process as preview_process
from workers.stt_worker import process as stt_process
from workers.summarize_worker import process as summarize_process
from workers.thumbnail_worker import process as thumbnail_process
from workers.marker_worker import process as marker_process
from workers.fulltext_worker import process as fulltext_process
return {
"extract": extract_process,
"classify": classify_process,
"summarize": summarize_process,
"embed": embed_process,
"chunk": chunk_process,
"preview": preview_process,
"stt": stt_process,
"thumbnail": thumbnail_process,
# PR-B B-1: classify 가 에스컬레이션 판단 시 enqueue → 26B 가 detail_summary 작성.
# next_stages 에 추가하지 않음 — deep_summary 는 leaf (classify→embed/chunk 흐름과 독립).
"deep_summary": deep_summary_process,
# Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립).
# consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록).
"markdown": marker_process,
# crawl-24x7 A-2: 기사 페이지 fetch → 4-tier 본문 승격. 후속(summarize/embed/chunk)은
# 워커가 직접 enqueue — next_stages dict 미등록 (enqueue_next_stage no-op).
"fulltext": fulltext_process,
}
async def _process_stage(stage, worker_fn):
"""단일 stage 의 pending 항목을 batch 만큼 가져와 워커 실행.
consume_queue / consume_markdown_queue 가 공유한다. 항목별 독립 세션 +
processing→completed/failed 상태 전이 + 재시도 정책은 기존 로직 그대로.
"""
batch_size = BATCH_SIZE.get(stage, 3)
# pending 항목 조회 (보류 백오프 deferred_until 미래 항목 제외 — ds-macbook-offload-1)
async with async_session() as session:
result = await session.execute(
select(ProcessingQueue.id, ProcessingQueue.document_id)
.where(
ProcessingQueue.stage == stage,
ProcessingQueue.status == "pending",
not_deferred_condition(),
)
.order_by(ProcessingQueue.created_at)
.limit(batch_size)
)
pending_items = result.all()
# 각 항목을 독립 세션에서 처리
for queue_id, document_id in pending_items:
# 상태를 processing으로 변경
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item or item.status != "pending":
continue
item.status = "processing"
item.started_at = datetime.now(timezone.utc)
item.attempts += 1
await session.commit()
# 워커 실행 (독립 세션)
try:
# note(메모)는 이미 extracted_text가 있으므로 extract/preview skip
if stage in ("extract", "preview"):
from models.document import Document
async with async_session() as check_session:
doc = await check_session.get(Document, document_id)
if doc and doc.file_type == "note":
async with async_session() as skip_session:
item = await skip_session.get(ProcessingQueue, queue_id)
if item:
item.status = "completed"
item.completed_at = datetime.now(timezone.utc)
await skip_session.commit()
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} skip (note)")
continue
async with async_session() as worker_session:
await worker_fn(document_id, worker_session)
await worker_session.commit()
# 완료 처리
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item:
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
continue
item.status = "completed"
item.completed_at = datetime.now(timezone.utc)
await session.commit()
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} 완료")
except StageDeferred as defer:
# 보류 (ds-macbook-offload-1): 맥북 일시 불가(sleep/cold/editor_busy) — 실패 아님.
# attempts 는 claim 시 선증가분을 반환(미소모)하고 deferred_until 백오프 후 자연 재개.
# 워커는 완주 전 doc 쓰기를 하지 않으므로 이 시점의 데이터 변경 = 0 (sleep-안전).
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item:
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
continue
item.status = "pending"
item.started_at = None
item.attempts = max(0, item.attempts - 1)
until = datetime.now(timezone.utc) + timedelta(minutes=defer.retry_after_minutes)
item.payload = {**(item.payload or {}), "deferred_until": until.isoformat()}
await session.commit()
logger.info(
f"[{stage}] document_id={document_id} 보류({defer}) — "
f"{defer.retry_after_minutes}분 후 재개"
)
except Exception as e:
# 실패 처리
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item:
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
continue
# 빈 메시지 방어: str → repr → 클래스명 순 fallback
err_text = str(e) or repr(e) or type(e).__name__
item.error_message = err_text[:500]
if item.attempts >= item.max_attempts:
item.status = "failed"
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
# B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다.
# 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형
# batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan
# (큐는 failed, 문서는 processing). 큐가 영구 failed 가 될 때 doc 상태도
# 동기화한다. 재시도 중(attempts<max)엔 pending 큐 행이 남아 orphan 아님.
if stage == "markdown":
from models.document import Document
doc = await session.get(Document, document_id)
if doc is not None and doc.md_status == "processing":
doc.md_status = "failed"
if not doc.md_extraction_error:
doc.md_extraction_error = err_text[:500]
logger.warning(
f"[markdown] document_id={document_id} "
f"md_status processing→failed 동기화 (B3 orphan 방지)"
)
else:
item.status = "pending"
item.started_at = None
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
await session.commit()
async def consume_queue():
"""메인 큐 소비자 — markdown 제외 전 stage 를 1분 간격으로 처리."""
global _hold_logged
workers = _load_workers()
held = [s for s in MAIN_QUEUE_STAGES if s in settings.pipeline_held_stages]
if held and not _hold_logged:
logger.info(f"pipeline.held_stages 보류 중: {held} — claim 하지 않음 (pending 적체 = 의도)")
_hold_logged = True
try:
await reset_stale_items(MAIN_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
except Exception:
logger.exception("stale reset failed, but continuing queue consumption")
for stage in MAIN_QUEUE_STAGES:
if stage in settings.pipeline_held_stages:
continue
await _process_stage(stage, workers[stage])
async def consume_fast_queue():
"""embed/chunk 전용 고속 소비자 — LLM 사이클과 완전 디커플 (2026-06-12).
main 루프는 classify/summarize/deep 가 사이클을 분 단위로 점유해 건당 <1s 짜리
embed/chunk 가 사이클당 1번씩만 기회를 얻었다 (실효 ~60건/시 = 적체 원인).
분리 후 = 1분 잡 × 배치 10 → 캡 ~600건/시. APScheduler max_instances=1 이라
배치가 1분을 넘으면 다음 fire 는 coalesce (폭주 방지).
"""
workers = _load_workers()
try:
await reset_stale_items(FAST_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
except Exception:
logger.exception("fast stale reset failed, but continuing queue consumption")
for stage in FAST_QUEUE_STAGES:
if stage in settings.pipeline_held_stages:
continue
await _process_stage(stage, workers[stage])
async def consume_markdown_queue():
"""markdown 전용 큐 소비자 — 대형 PDF split 변환을 메인 파이프라인과 분리.
한 doc 변환이 수십 분 걸려도 메인 consume_queue 는 영향받지 않는다.
APScheduler max_instances=1(기본) 이므로 변환 진행 중엔 이 consumer 의
다음 fire 만 coalesce 된다(동시 marker 변환 2건 방지 — 의도된 직렬화).
"""
workers = _load_workers()
try:
await reset_stale_items(MARKDOWN_QUEUE_STAGES, MARKDOWN_STALE_THRESHOLD_MINUTES)
except Exception:
logger.exception("markdown stale reset failed, but continuing queue consumption")
for stage in MARKDOWN_QUEUE_STAGES:
await _process_stage(stage, workers[stage])
async def consume_deep_queue():
"""deep_summary 전용 큐 소비자 (2026-06-15) — 26B 심층요약을 메인 파이프라인과 분리.
deep_summary 1콜이 70~300s(맥미니 Qwen 27B 폴백)라 메인 consume_queue(1분 틱) 안에
있으면 매 틱이 interval 을 초과해 영구 "maximum running instances" coalesce 되고
extract/classify 등 경량 stage 까지 함께 굶었다. 분리 후 = deep 만 자기 1분 잡에서
coalesce, 나머지 메인 루프는 틱 내 완료. max_instances=1 로 동시 deep 2건은 방지.
"""
workers = _load_workers()
try:
await reset_stale_items(DEEP_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
except Exception:
logger.exception("deep stale reset failed, but continuing queue consumption")
for stage in DEEP_QUEUE_STAGES:
if stage in settings.pipeline_held_stages:
continue
await _process_stage(stage, workers[stage])