Files
hyungi_document_server/app/workers/summarize_worker.py
T
hyungi 88e5893041 feat(workers): 맥북 M5 Max 분담 배선 — deep 슬롯 + 보류 시멘틱 + queue_drain CLI
plan ds-macbook-offload-1 P2 (Soft Lock 예외 박제 ds-macbook-offload-exec-20260611.md):
- config ai.models.deep optional 슬롯 (라우터 :8890 경유 qwen-macbook, 부재 시 기존 경로)
- AIClient.call_deep + is_deferrable_error + call_deep_or_defer (자동 cloud/맥미니 폴백 0)
- deep_summary_worker: deep 슬롯 시 맥북 경유 (맥미니 mlx gate 미점유) + 실모델 기록
- StageDeferred 보류 시멘틱: 503/connect/read-timeout(sleep 절단) = attempts 미소모 +
  payload.deferred_until 30분 백오프, doc 쓰기는 완주+파싱 후 단일 커밋 (부분 쓰기 0)
- queue_consumer: claim 에 deferred 필터 + StageDeferred 분기
- workers.queue_drain: 수동 burst-drain CLI (summarize/deep_summary, SKIP LOCKED 단건
  claim, per-item 커밋, 보류 시 run 종료, deep 슬롯 필수 가드)
- tests 20건 + 라우터 경유 Qwen 실응답 fixture 박제 (13.2s 라이브)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 12:55:16 +09:00

102 lines
4.4 KiB
Python

"""요약 전용 워커 — 뉴스 등 classify 불필요한 문서의 AI 요약만 생성.
P3 of family-adaptive-bengio (2026-05-23): 50k 초과 input 은 sliding window
(cumulative carry-over) 로 분할 처리. 50k 이하 input 은 기존 동작 유지.
ds-macbook-offload-1: use_deep=True (queue_drain 전용) 시 맥북 M5 Max deep 슬롯으로
호출 — 맥미니 백로그를 사용자가 의도적으로 분담시키는 수동 레버. 기본(consumer) 경로는
use_deep=False 로 기존 동작 그대로. 맥북 불가 시 StageDeferred (강등 0, 부분 쓰기 0).
"""
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, call_deep_or_defer, strip_thinking
from core.utils import setup_logger
from models.document import Document
logger = setup_logger("summarize_worker")
CHUNK_SIZE = 50000
# client.summarize() 의 단일 프롬프트와 동일 문구 — deep 경로가 같은 과업을 수행하도록 고정
SUMMARY_PROMPT_SINGLE = "다음 문서를 500자 이내로 요약해주세요:\n\n{text}"
SUMMARY_PROMPT_CONTINUATION = (
"이전 부분 요약:\n{prior}\n\n다음 부분:\n{text}\n\n"
"위 두 정보를 합쳐 전체 문서를 500자 이내로 요약해주세요."
)
async def process(document_id: int, session: AsyncSession, *, use_deep: bool = False) -> None:
"""문서 AI 요약 생성 (분류 없이 요약만).
use_deep: queue_drain 전용 — deep 슬롯(맥북) 경유. 슬롯 미구성 시 명시 에러
(silent 강등 금지). consumer 기본 경로는 False (기존 동작 무변경).
"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
if not doc.extracted_text:
raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음")
if doc.ai_summary:
logger.info(f"[요약] document_id={document_id}: 이미 요약 있음, skip")
return
client = AIClient()
if use_deep and client.ai.deep is None:
await client.close()
raise ValueError("use_deep=True 인데 config.yaml ai.models.deep 슬롯 미구성 — silent 강등 금지")
used_cfg = client.ai.deep if use_deep else client.ai.primary
async def _summarize_first(text_part: str) -> str:
if use_deep:
return await call_deep_or_defer(client, SUMMARY_PROMPT_SINGLE.format(text=text_part))
return await client.summarize(text_part)
async def _summarize_continuation(prompt: str) -> str:
if use_deep:
return await call_deep_or_defer(client, prompt)
return await client.call_primary(prompt)
try:
text = doc.extracted_text
total_chars = len(text)
if total_chars <= CHUNK_SIZE:
summary = await _summarize_first(text)
logger.info(
f"[요약] document_id={document_id}: single chunk ({total_chars}자)"
+ (" via deep(맥북)" if use_deep else "")
)
else:
chunks = [text[i:i + CHUNK_SIZE] for i in range(0, total_chars, CHUNK_SIZE)]
logger.info(
f"[요약] document_id={document_id}: sliding window {len(chunks)} chunk "
f"(total {total_chars}자, chunk_size={CHUNK_SIZE})"
)
carry = ""
for idx, chunk in enumerate(chunks):
if idx == 0:
partial = await _summarize_first(chunk)
else:
prompt = SUMMARY_PROMPT_CONTINUATION.format(prior=carry, text=chunk)
partial = await _summarize_continuation(prompt)
carry = strip_thinking(partial)
logger.info(
f"[요약] document_id={document_id}: chunk {idx + 1}/{len(chunks)} done "
f"(in={len(chunk)}자, carry={len(carry)}자)"
)
summary = carry
# sleep-안전 불변식: 쓰기는 전체 완주 후에만 — 중간 절단은 StageDeferred 로 빠져
# 이 지점에 도달하지 않는다 (carry 는 로컬 변수, doc 무변경).
doc.ai_summary = strip_thinking(summary)
doc.ai_model_version = used_cfg.model
doc.ai_processed_at = datetime.now(timezone.utc)
logger.info(
f"[요약] document_id={document_id}: {len(doc.ai_summary)}자 final"
)
finally:
await client.close()