feat(workers): 맥북 M5 Max 분담 배선 — deep 슬롯 + 보류 시멘틱 + queue_drain CLI
plan ds-macbook-offload-1 P2 (Soft Lock 예외 박제 ds-macbook-offload-exec-20260611.md): - config ai.models.deep optional 슬롯 (라우터 :8890 경유 qwen-macbook, 부재 시 기존 경로) - AIClient.call_deep + is_deferrable_error + call_deep_or_defer (자동 cloud/맥미니 폴백 0) - deep_summary_worker: deep 슬롯 시 맥북 경유 (맥미니 mlx gate 미점유) + 실모델 기록 - StageDeferred 보류 시멘틱: 503/connect/read-timeout(sleep 절단) = attempts 미소모 + payload.deferred_until 30분 백오프, doc 쓰기는 완주+파싱 후 단일 커밋 (부분 쓰기 0) - queue_consumer: claim 에 deferred 필터 + StageDeferred 분기 - workers.queue_drain: 수동 burst-drain CLI (summarize/deep_summary, SKIP LOCKED 단건 claim, per-item 커밋, 보류 시 run 종료, deep 슬롯 필수 가드) - tests 20건 + 라우터 경유 Qwen 실응답 fixture 박제 (13.2s 라이브) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -134,6 +134,36 @@ def _fix_json_string_escapes(s: str) -> str:
|
||||
i += 1
|
||||
return "".join(out)
|
||||
|
||||
def is_deferrable_error(exc: Exception) -> bool:
|
||||
"""deep(맥북 M5 Max) 호출 실패가 '보류(StageDeferred)' 대상인지 분류 (ds-macbook-offload-1).
|
||||
|
||||
보류 = 맥북 일시 불가 신호:
|
||||
- HTTP 503 (라우터 upstream_cold / editor_busy / warming — no-silent-fallback 계약)
|
||||
- httpx.TransportError 전계열 (ConnectError·ReadError·RemoteProtocolError +
|
||||
ConnectTimeout·ReadTimeout 등) — 연결 실패와 생성 도중 sleep 절단을 모두 포함.
|
||||
그 외(400/500, 파싱/검증 오류 등)는 보류가 아니라 호출자의 기존 실패 경로.
|
||||
"""
|
||||
if isinstance(exc, httpx.HTTPStatusError):
|
||||
return exc.response.status_code == 503
|
||||
return isinstance(exc, httpx.TransportError)
|
||||
|
||||
|
||||
async def call_deep_or_defer(client: "AIClient", prompt: str, system: str | None = None) -> str:
|
||||
"""call_deep + 보류 변환 — 맥북 불가(503/연결/절단)는 StageDeferred 로 raise.
|
||||
|
||||
deep_summary_worker / summarize_worker(drain) 가 공유. StageDeferred 는 queue_consumer/
|
||||
queue_drain 이 attempts 미소모 + deferred_until 백오프로 처리한다 (sleep-안전 불변식).
|
||||
"""
|
||||
from models.queue import StageDeferred
|
||||
|
||||
try:
|
||||
return await client.call_deep(prompt, system=system)
|
||||
except Exception as exc:
|
||||
if is_deferrable_error(exc):
|
||||
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
|
||||
raise
|
||||
|
||||
|
||||
# 프롬프트 로딩
|
||||
PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
|
||||
|
||||
@@ -185,6 +215,18 @@ class AIClient:
|
||||
"""triage/primary 실패 시 최후 방어선. Claude Sonnet 4 API (config.yaml ai.models.fallback) — PR #20 이후 swap 완료."""
|
||||
return await self._request(self.ai.fallback, prompt)
|
||||
|
||||
async def call_deep(self, prompt: str, system: str | None = None) -> str:
|
||||
"""심층 전용 — 맥북 M5 Max Qwen3.6-27B (config.yaml ai.models.deep, ds-macbook-offload-1).
|
||||
|
||||
llm-router :8890 경유(model=qwen-macbook alias) — 라우터의 wake preflight(~24s)·
|
||||
editor_busy 가드를 재사용한다. 맥미니 mlx gate 와 무관(게이트는 맥미니 보호 목적)이라
|
||||
gate 없이 호출. 자동 cloud/맥미니 폴백 없음 — 실패는 그대로 전파하고 보류 판단은
|
||||
호출자가 is_deferrable_error() 로 한다. 슬롯 부재 시 primary 로 처리(방어적 —
|
||||
호출자가 보통 슬롯 유무를 먼저 분기).
|
||||
"""
|
||||
cfg = self.ai.deep or self.ai.primary
|
||||
return await self._request(cfg, prompt, system=system)
|
||||
|
||||
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
|
||||
|
||||
async def classify(self, text: str) -> dict:
|
||||
|
||||
@@ -98,6 +98,10 @@ class AIConfig(BaseModel):
|
||||
classifier: AIModelConfig | None = None
|
||||
# Phase 3.5b: semantic verifier (optional — 없으면 grounding-only). PR #20 이후 Mac mini 26B MLX endpoint (initial = exaone3.5).
|
||||
verifier: AIModelConfig | None = None
|
||||
# ds-macbook-offload-1: 심층 전용 슬롯 (optional). 맥북 M5 Max Qwen3.6-27B — llm-router :8890
|
||||
# 경유(model=qwen-macbook alias, wake preflight 재사용). 부재 시 deep_summary 는 기존
|
||||
# primary(맥미니 26B) 경로 그대로 = 기능 미활성. 명시 opt-in — silent fallback 없음.
|
||||
deep: AIModelConfig | None = None
|
||||
# Legacy: vision 슬롯 (현재 사용처 0 — Document Server 는 OCR/STT 별도 서비스).
|
||||
# 제거 진행 중이므로 optional 로 관대한 로딩 유지.
|
||||
vision: AIModelConfig | None = None
|
||||
@@ -218,6 +222,7 @@ def load_settings() -> Settings:
|
||||
verifier=(
|
||||
AIModelConfig(**models["verifier"]) if "verifier" in models else None
|
||||
),
|
||||
deep=(AIModelConfig(**models["deep"]) if "deep" in models else None),
|
||||
deep_summary_backlog=DeepSummaryBacklogConfig(
|
||||
**ai_raw.get("deep_summary_backlog", {})
|
||||
),
|
||||
|
||||
+28
-1
@@ -2,14 +2,41 @@
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text
|
||||
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, func, or_, text
|
||||
from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
from sqlalchemy.types import TIMESTAMP
|
||||
|
||||
from core.database import Base
|
||||
|
||||
|
||||
class StageDeferred(Exception):
|
||||
"""워커가 '지금은 처리 불가 — 자료 손상 없이 보류' 를 선언하는 신호 (ds-macbook-offload-1).
|
||||
|
||||
맥북(M5 Max) deep 슬롯 경로 전용: 503(upstream_cold/editor_busy/warming) · 연결 실패 ·
|
||||
생성 중 절단(read-timeout, 맥북 sleep) 시 raise. queue_consumer/queue_drain 이 attempts 를
|
||||
소모하지 않고 pending 복귀 + payload.deferred_until 백오프를 기록한다. 결과 쓰기는 호출
|
||||
완주 + 파싱 성공 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0 (sleep-안전 불변식).
|
||||
"""
|
||||
|
||||
def __init__(self, reason: str, retry_after_minutes: int = 30):
|
||||
super().__init__(reason)
|
||||
self.retry_after_minutes = retry_after_minutes
|
||||
|
||||
|
||||
def not_deferred_condition():
|
||||
"""보류 백오프(payload.deferred_until, ISO 문자열) 가 미래인 행을 claim 에서 제외.
|
||||
|
||||
payload 없음 / 키 없음 = 통과. queue_consumer 와 queue_drain 의 claim 이 공유한다.
|
||||
"""
|
||||
deferred = ProcessingQueue.payload["deferred_until"].astext
|
||||
return or_(
|
||||
deferred.is_(None),
|
||||
deferred.cast(TIMESTAMP(timezone=True)) <= func.now(),
|
||||
)
|
||||
|
||||
|
||||
class ProcessingQueue(Base):
|
||||
__tablename__ = "processing_queue"
|
||||
|
||||
|
||||
@@ -20,12 +20,12 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
import json
|
||||
import re
|
||||
from ai.client import AIClient, parse_json_response, strip_thinking
|
||||
from ai.client import AIClient, call_deep_or_defer, parse_json_response, strip_thinking
|
||||
from ai.envelope import EscalationEnvelope
|
||||
from core.config import settings
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.queue import ProcessingQueue
|
||||
from models.queue import ProcessingQueue, StageDeferred
|
||||
from policy.prompt_render import render_26b, policy_version as compute_policy_version
|
||||
from services.document_telemetry import record_analyze_event
|
||||
from services.search.llm_gate import Priority, acquire_mlx_gate
|
||||
@@ -101,17 +101,30 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
)
|
||||
|
||||
client = AIClient()
|
||||
# ds-macbook-offload-1: deep 슬롯 구성 시 맥북 M5 Max 경유(라우터). 부재 시 기존 경로 그대로.
|
||||
deep_cfg = client.ai.deep
|
||||
used_cfg = deep_cfg or settings.ai.primary
|
||||
latency_ms = 0
|
||||
parse_error: str | None = None
|
||||
deep_out = DeepSummaryOutput()
|
||||
|
||||
try:
|
||||
start = time.perf_counter()
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker
|
||||
raw = await client.call_primary(prompt)
|
||||
if deep_cfg is not None:
|
||||
# 맥북 경유 — 맥미니 mlx gate 미점유(게이트는 맥미니 보호 목적). 맥북 불가
|
||||
# (503/연결/생성 중 sleep 절단)는 StageDeferred = 보류, 맥미니 강등 없음.
|
||||
# doc 쓰기는 완주+파싱 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0.
|
||||
raw = await call_deep_or_defer(client, prompt)
|
||||
else:
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker
|
||||
raw = await client.call_primary(prompt)
|
||||
latency_ms = int((time.perf_counter() - start) * 1000)
|
||||
except StageDeferred:
|
||||
# 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), consumer 가 백오프 기록.
|
||||
logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)")
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.warning(f"[deep] 26B 호출 실패 id={document_id}: {exc}")
|
||||
logger.warning(f"[deep] 호출 실패 id={document_id} model={used_cfg.model}: {exc}")
|
||||
parse_error = "call_failed"
|
||||
raw = ""
|
||||
finally:
|
||||
@@ -147,12 +160,13 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
doc_id=document_id,
|
||||
user_id=None,
|
||||
mode="summary_deep",
|
||||
text_limit=settings.ai.primary.context_char_limit or 260000,
|
||||
text_limit=used_cfg.context_char_limit or 260000,
|
||||
truncated=False,
|
||||
layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [],
|
||||
cached=False,
|
||||
latency_ms=latency_ms,
|
||||
model_name=settings.ai.primary.model,
|
||||
# deep 슬롯 사용 시 실처리 모델(qwen-macbook alias) 기록 — 어느 머신이 처리했는지 추적
|
||||
model_name=used_cfg.model,
|
||||
prompt_version=(f"{DEEP_SUMMARY_TASK}@{pv}" if pv else DEEP_SUMMARY_TASK),
|
||||
error_code=parse_error,
|
||||
source="document_server",
|
||||
|
||||
@@ -15,7 +15,7 @@ from sqlalchemy.orm import aliased
|
||||
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.queue import ProcessingQueue, enqueue_stage
|
||||
from models.queue import ProcessingQueue, StageDeferred, enqueue_stage, not_deferred_condition
|
||||
|
||||
logger = setup_logger("queue_consumer")
|
||||
|
||||
@@ -216,13 +216,14 @@ async def _process_stage(stage, worker_fn):
|
||||
"""
|
||||
batch_size = BATCH_SIZE.get(stage, 3)
|
||||
|
||||
# pending 항목 조회
|
||||
# pending 항목 조회 (보류 백오프 deferred_until 미래 항목 제외 — ds-macbook-offload-1)
|
||||
async with async_session() as session:
|
||||
result = await session.execute(
|
||||
select(ProcessingQueue.id, ProcessingQueue.document_id)
|
||||
.where(
|
||||
ProcessingQueue.stage == stage,
|
||||
ProcessingQueue.status == "pending",
|
||||
not_deferred_condition(),
|
||||
)
|
||||
.order_by(ProcessingQueue.created_at)
|
||||
.limit(batch_size)
|
||||
@@ -276,6 +277,26 @@ async def _process_stage(stage, worker_fn):
|
||||
await enqueue_next_stage(document_id, stage)
|
||||
logger.info(f"[{stage}] document_id={document_id} 완료")
|
||||
|
||||
except StageDeferred as defer:
|
||||
# 보류 (ds-macbook-offload-1): 맥북 일시 불가(sleep/cold/editor_busy) — 실패 아님.
|
||||
# attempts 는 claim 시 선증가분을 반환(미소모)하고 deferred_until 백오프 후 자연 재개.
|
||||
# 워커는 완주 전 doc 쓰기를 하지 않으므로 이 시점의 데이터 변경 = 0 (sleep-안전).
|
||||
async with async_session() as session:
|
||||
item = await session.get(ProcessingQueue, queue_id)
|
||||
if not item:
|
||||
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
|
||||
continue
|
||||
item.status = "pending"
|
||||
item.started_at = None
|
||||
item.attempts = max(0, item.attempts - 1)
|
||||
until = datetime.now(timezone.utc) + timedelta(minutes=defer.retry_after_minutes)
|
||||
item.payload = {**(item.payload or {}), "deferred_until": until.isoformat()}
|
||||
await session.commit()
|
||||
logger.info(
|
||||
f"[{stage}] document_id={document_id} 보류({defer}) — "
|
||||
f"{defer.retry_after_minutes}분 후 재개"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# 실패 처리
|
||||
async with async_session() as session:
|
||||
|
||||
@@ -0,0 +1,164 @@
|
||||
"""수동 burst-drain CLI — 맥미니 백로그를 사용자가 의도적으로 맥북(M5 Max)으로 소화.
|
||||
|
||||
ds-macbook-offload-1 P2-3. 운영 패턴 = csb_collector --bulk 와 동일 (컨테이너 내 실행,
|
||||
장기 배치 중 fastapi 재생성 = in-flight 절단이지만 멱등 재실행으로 무손실).
|
||||
|
||||
docker compose exec fastapi python -m workers.queue_drain --stage summarize --limit 200
|
||||
|
||||
설계 원칙:
|
||||
- deep 슬롯(config.yaml ai.models.deep) 필수 — 부재 시 명시 종료 (silent 강등 금지)
|
||||
- claim = FOR UPDATE SKIP LOCKED 단건 전이 → consumer(1분 주기)와 이중처리 0
|
||||
- per-item 커밋 = sleep-안전: 중단돼도 완료분 무손상, 진행 중 1건만 stale recovery
|
||||
(10분) 로 pending 복귀. 재실행 멱등 (summarize 는 ai_summary 존재 시 skip)
|
||||
- 보류(StageDeferred = 맥북 sleep/cold/editor_busy): attempts 반환 + deferred_until
|
||||
백오프 기록 후 run 즉시 종료 — 불가 상태의 맥북을 계속 두드리지 않는다
|
||||
- 폴백 0: 맥미니/cloud 강등 없음
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from core.config import settings
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.queue import ProcessingQueue, StageDeferred, not_deferred_condition
|
||||
|
||||
logger = setup_logger("queue_drain")
|
||||
|
||||
# summarize = 맥미니 백로그 본체 / deep_summary = 심층 (consumer 도 deep 슬롯 시 맥북 경유).
|
||||
# classify 는 triage 경량 호출이라 맥미니 적합 — 대상에서 제외 (plan Q-4).
|
||||
DRAIN_STAGES = ("summarize", "deep_summary")
|
||||
|
||||
|
||||
async def _claim_one(stage: str) -> tuple[int, int] | None:
|
||||
"""pending 1건을 processing 으로 원자 전이 (SKIP LOCKED — consumer 와 경합 안전)."""
|
||||
async with async_session() as session:
|
||||
item = (await session.execute(
|
||||
select(ProcessingQueue)
|
||||
.where(
|
||||
ProcessingQueue.stage == stage,
|
||||
ProcessingQueue.status == "pending",
|
||||
not_deferred_condition(),
|
||||
)
|
||||
.order_by(ProcessingQueue.created_at)
|
||||
.limit(1)
|
||||
.with_for_update(skip_locked=True)
|
||||
)).scalar_one_or_none()
|
||||
if item is None:
|
||||
return None
|
||||
item.status = "processing"
|
||||
item.started_at = datetime.now(timezone.utc)
|
||||
item.attempts += 1
|
||||
claimed = (item.id, item.document_id)
|
||||
await session.commit()
|
||||
return claimed
|
||||
|
||||
|
||||
async def _mark_completed(queue_id: int) -> None:
|
||||
async with async_session() as session:
|
||||
item = await session.get(ProcessingQueue, queue_id)
|
||||
if item:
|
||||
item.status = "completed"
|
||||
item.completed_at = datetime.now(timezone.utc)
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def _mark_deferred(queue_id: int, defer: StageDeferred) -> None:
|
||||
"""보류: attempts 반환(미소모) + deferred_until 백오프 — consumer 의 처리와 동형."""
|
||||
async with async_session() as session:
|
||||
item = await session.get(ProcessingQueue, queue_id)
|
||||
if item:
|
||||
item.status = "pending"
|
||||
item.started_at = None
|
||||
item.attempts = max(0, item.attempts - 1)
|
||||
until = datetime.now(timezone.utc) + timedelta(minutes=defer.retry_after_minutes)
|
||||
item.payload = {**(item.payload or {}), "deferred_until": until.isoformat()}
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def _mark_failed(queue_id: int, exc: Exception) -> None:
|
||||
"""실패: consumer 와 동일 재시도 정책 (attempts >= max → failed, 아니면 pending 복귀)."""
|
||||
async with async_session() as session:
|
||||
item = await session.get(ProcessingQueue, queue_id)
|
||||
if item:
|
||||
err_text = str(exc) or repr(exc) or type(exc).__name__
|
||||
item.error_message = err_text[:500]
|
||||
if item.attempts >= item.max_attempts:
|
||||
item.status = "failed"
|
||||
else:
|
||||
item.status = "pending"
|
||||
item.started_at = None
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def drain(stage: str, limit: int) -> None:
|
||||
if stage not in DRAIN_STAGES:
|
||||
raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용 (classify 등은 맥미니 적합 — plan Q-4)")
|
||||
if settings.ai.deep is None:
|
||||
raise SystemExit(
|
||||
"config.yaml ai.models.deep 슬롯 미구성 — drain 은 맥북 분담 전용 레버라 진행하지 않음"
|
||||
" (맥미니로의 silent 강등 금지)"
|
||||
)
|
||||
|
||||
from workers.deep_summary_worker import process as deep_summary_process
|
||||
from workers.summarize_worker import process as summarize_process
|
||||
|
||||
done = failed = 0
|
||||
deferred = False
|
||||
while done + failed < limit:
|
||||
claimed = await _claim_one(stage)
|
||||
if claimed is None:
|
||||
logger.info(f"[drain:{stage}] pending 소진 — 종료")
|
||||
break
|
||||
queue_id, document_id = claimed
|
||||
try:
|
||||
async with async_session() as worker_session:
|
||||
if stage == "summarize":
|
||||
await summarize_process(document_id, worker_session, use_deep=True)
|
||||
else:
|
||||
# deep_summary 는 deep 슬롯 구성 시 워커가 자체적으로 맥북 경유
|
||||
await deep_summary_process(document_id, worker_session)
|
||||
await worker_session.commit()
|
||||
await _mark_completed(queue_id)
|
||||
done += 1
|
||||
logger.info(f"[drain:{stage}] {done}/{limit} doc={document_id} 완료")
|
||||
except StageDeferred as defer:
|
||||
await _mark_deferred(queue_id, defer)
|
||||
deferred = True
|
||||
logger.warning(
|
||||
f"[drain:{stage}] doc={document_id} 맥북 불가({defer}) — 보류 기록 후 run 종료. "
|
||||
f"맥북 깨운 뒤(또는 {defer.retry_after_minutes}분 후) 재실행"
|
||||
)
|
||||
break
|
||||
except Exception as exc:
|
||||
await _mark_failed(queue_id, exc)
|
||||
failed += 1
|
||||
logger.error(f"[drain:{stage}] doc={document_id} 실패: {exc}")
|
||||
|
||||
# 종료 요약 (잔여 = 지금 시점 pending 수)
|
||||
async with async_session() as session:
|
||||
from sqlalchemy import func as sa_func
|
||||
remaining = (await session.execute(
|
||||
select(sa_func.count()).select_from(ProcessingQueue).where(
|
||||
ProcessingQueue.stage == stage, ProcessingQueue.status == "pending",
|
||||
)
|
||||
)).scalar_one()
|
||||
logger.info(
|
||||
f"[drain:{stage}] 요약 — 완료 {done} · 실패 {failed} · "
|
||||
f"보류종료 {'예' if deferred else '아니오'} · 잔여 pending {remaining}"
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="맥북(M5 Max) burst-drain — 수동 백로그 분담 레버")
|
||||
parser.add_argument("--stage", required=True, choices=DRAIN_STAGES)
|
||||
parser.add_argument("--limit", type=int, default=50, help="이번 run 최대 처리 건수 (기본 50)")
|
||||
args = parser.parse_args()
|
||||
asyncio.run(drain(args.stage, args.limit))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -2,27 +2,37 @@
|
||||
|
||||
P3 of family-adaptive-bengio (2026-05-23): 50k 초과 input 은 sliding window
|
||||
(cumulative carry-over) 로 분할 처리. 50k 이하 input 은 기존 동작 유지.
|
||||
|
||||
ds-macbook-offload-1: use_deep=True (queue_drain 전용) 시 맥북 M5 Max deep 슬롯으로
|
||||
호출 — 맥미니 백로그를 사용자가 의도적으로 분담시키는 수동 레버. 기본(consumer) 경로는
|
||||
use_deep=False 로 기존 동작 그대로. 맥북 불가 시 StageDeferred (강등 0, 부분 쓰기 0).
|
||||
"""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from ai.client import AIClient, strip_thinking
|
||||
from ai.client import AIClient, call_deep_or_defer, strip_thinking
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
|
||||
logger = setup_logger("summarize_worker")
|
||||
|
||||
CHUNK_SIZE = 50000
|
||||
# client.summarize() 의 단일 프롬프트와 동일 문구 — deep 경로가 같은 과업을 수행하도록 고정
|
||||
SUMMARY_PROMPT_SINGLE = "다음 문서를 500자 이내로 요약해주세요:\n\n{text}"
|
||||
SUMMARY_PROMPT_CONTINUATION = (
|
||||
"이전 부분 요약:\n{prior}\n\n다음 부분:\n{text}\n\n"
|
||||
"위 두 정보를 합쳐 전체 문서를 500자 이내로 요약해주세요."
|
||||
)
|
||||
|
||||
|
||||
async def process(document_id: int, session: AsyncSession) -> None:
|
||||
"""문서 AI 요약 생성 (분류 없이 요약만)"""
|
||||
async def process(document_id: int, session: AsyncSession, *, use_deep: bool = False) -> None:
|
||||
"""문서 AI 요약 생성 (분류 없이 요약만).
|
||||
|
||||
use_deep: queue_drain 전용 — deep 슬롯(맥북) 경유. 슬롯 미구성 시 명시 에러
|
||||
(silent 강등 금지). consumer 기본 경로는 False (기존 동작 무변경).
|
||||
"""
|
||||
doc = await session.get(Document, document_id)
|
||||
if not doc:
|
||||
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
|
||||
@@ -35,13 +45,29 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
return
|
||||
|
||||
client = AIClient()
|
||||
if use_deep and client.ai.deep is None:
|
||||
await client.close()
|
||||
raise ValueError("use_deep=True 인데 config.yaml ai.models.deep 슬롯 미구성 — silent 강등 금지")
|
||||
used_cfg = client.ai.deep if use_deep else client.ai.primary
|
||||
|
||||
async def _summarize_first(text_part: str) -> str:
|
||||
if use_deep:
|
||||
return await call_deep_or_defer(client, SUMMARY_PROMPT_SINGLE.format(text=text_part))
|
||||
return await client.summarize(text_part)
|
||||
|
||||
async def _summarize_continuation(prompt: str) -> str:
|
||||
if use_deep:
|
||||
return await call_deep_or_defer(client, prompt)
|
||||
return await client.call_primary(prompt)
|
||||
|
||||
try:
|
||||
text = doc.extracted_text
|
||||
total_chars = len(text)
|
||||
if total_chars <= CHUNK_SIZE:
|
||||
summary = await client.summarize(text)
|
||||
summary = await _summarize_first(text)
|
||||
logger.info(
|
||||
f"[요약] document_id={document_id}: single chunk ({total_chars}자)"
|
||||
+ (" via deep(맥북)" if use_deep else "")
|
||||
)
|
||||
else:
|
||||
chunks = [text[i:i + CHUNK_SIZE] for i in range(0, total_chars, CHUNK_SIZE)]
|
||||
@@ -52,10 +78,10 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
carry = ""
|
||||
for idx, chunk in enumerate(chunks):
|
||||
if idx == 0:
|
||||
partial = await client.summarize(chunk)
|
||||
partial = await _summarize_first(chunk)
|
||||
else:
|
||||
prompt = SUMMARY_PROMPT_CONTINUATION.format(prior=carry, text=chunk)
|
||||
partial = await client.call_primary(prompt)
|
||||
partial = await _summarize_continuation(prompt)
|
||||
carry = strip_thinking(partial)
|
||||
logger.info(
|
||||
f"[요약] document_id={document_id}: chunk {idx + 1}/{len(chunks)} done "
|
||||
@@ -63,8 +89,10 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
)
|
||||
summary = carry
|
||||
|
||||
# sleep-안전 불변식: 쓰기는 전체 완주 후에만 — 중간 절단은 StageDeferred 로 빠져
|
||||
# 이 지점에 도달하지 않는다 (carry 는 로컬 변수, doc 무변경).
|
||||
doc.ai_summary = strip_thinking(summary)
|
||||
doc.ai_model_version = client.ai.primary.model
|
||||
doc.ai_model_version = used_cfg.model
|
||||
doc.ai_processed_at = datetime.now(timezone.utc)
|
||||
logger.info(
|
||||
f"[요약] document_id={document_id}: {len(doc.ai_summary)}자 final"
|
||||
|
||||
+32
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"id": "chatcmpl-80cd8ddc-7788-4605-b40e-3975fe7e1326",
|
||||
"object": "chat.completion",
|
||||
"created": 1781149952,
|
||||
"model": "/Users/hyungi/mlx-models/Qwen3.6-27B-8bit",
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"finish_reason": "stop",
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "\uc81c\uacf5\ub41c \ubb38\uc11c\ub294 \uc555\ub825\uc6a9\uae30 \uac80\uc0ac\uc758 \uae30\uc900\uc774 \ub418\ub294 \uaddc\uc815\uc744 \uba85\uc2dc\ud558\uace0 \uc788\uc2b5\ub2c8\ub2e4. \ud575\uc2ec \ub0b4\uc6a9\uc740 \uc555\ub825\uc6a9\uae30\uc5d0 \ub300\ud55c \ubaa8\ub4e0 \uac80\uc0ac \uc808\ucc28\uc640 \uae30\uc900\uc774 'ASME Section VIII Div 1'\uc774\ub77c\ub294 \uad6d\uc81c\uc801\uc73c\ub85c \uc778\uc815\ubc1b\ub294 \uc555\ub825\uc6a9\uae30 \uc124\uacc4 \ubc0f \uc81c\uc791 \uaddc\uc815\uc5d0 \ub530\ub77c \uc5c4\uaca9\ud558\uac8c \uc218\ud589\ub418\uc5b4\uc57c \ud55c\ub2e4\ub294 \uac83\uc785\ub2c8\ub2e4. \uc774\ub294 \uc548\uc804\uc131\uacfc \uc2e0\ub8b0\uc131\uc744 \ubcf4\uc7a5\ud558\uae30 \uc704\ud55c \ud544\uc218\uc801\uc778 \uc694\uad6c\uc0ac\ud56d\uc73c\ub85c, \ud574\ub2f9 \uaddc\uc815\uc744 \uc900\uc218\ud568\uc73c\ub85c\uc368 \uc555\ub825\uc6a9\uae30\uc758 \uad6c\uc870\uc801 \ubb34\uacb0\uc131\uacfc \uc6b4\uc601 \uc548\uc804\uc131\uc744 \ud655\ubcf4\ud560 \uc218 \uc788\uc2b5\ub2c8\ub2e4. \ub530\ub77c\uc11c \uad00\ub828 \uc5c5\ubb34 \uc218\ud589 \uc2dc \ubc18\ub4dc\uc2dc \uc774 \uaddc\uc815\uc744 \ucc38\uc870\ud558\uc5ec \uac80\uc0ac\ub97c \uc9c4\ud589\ud574\uc57c \ud569\ub2c8\ub2e4.",
|
||||
"reasoning": null,
|
||||
"tool_calls": null,
|
||||
"tool_call_id": null,
|
||||
"name": null
|
||||
},
|
||||
"logprobs": null
|
||||
}
|
||||
],
|
||||
"usage": {
|
||||
"prompt_tokens": 44,
|
||||
"completion_tokens": 118,
|
||||
"total_tokens": 162,
|
||||
"prompt_tokens_details": {
|
||||
"cached_tokens": 0
|
||||
},
|
||||
"prompt_tps": 0.0,
|
||||
"generation_tps": 0.0,
|
||||
"peak_memory": 29.804702642
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,157 @@
|
||||
"""ds-macbook-offload-1 P2-4 — deep 슬롯 라우팅 / 보류(StageDeferred) / drain 가드 테스트.
|
||||
|
||||
DB 불요(unit) — AIClient 는 __new__ 로 settings 우회, drain 가드는 settings monkeypatch.
|
||||
통합(보류 백오프 DB 기록, claim 경합)은 P3-2 E2E 게이트에서 라이브 실측.
|
||||
fixture = tests/fixtures/qwen_router_chat_completion.json (2026-06-11 라이브 박제 —
|
||||
라우터 :8890 경유 model=qwen-macbook, production 호출 형상과 동일 body, 13.2s 실측).
|
||||
"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from ai.client import AIClient, call_deep_or_defer, is_deferrable_error
|
||||
from models.queue import StageDeferred
|
||||
|
||||
FIXTURE = Path(__file__).parent / "fixtures" / "qwen_router_chat_completion.json"
|
||||
|
||||
|
||||
def _client(deep_cfg, primary_cfg):
|
||||
"""settings 비의존 AIClient — __init__ 우회 후 ai 슬롯만 주입."""
|
||||
client = AIClient.__new__(AIClient)
|
||||
client.ai = SimpleNamespace(deep=deep_cfg, primary=primary_cfg)
|
||||
return client
|
||||
|
||||
|
||||
def _http_status_error(status: int) -> httpx.HTTPStatusError:
|
||||
req = httpx.Request("POST", "http://router:8890/v1/chat/completions")
|
||||
resp = httpx.Response(status, request=req)
|
||||
return httpx.HTTPStatusError(f"status {status}", request=req, response=resp)
|
||||
|
||||
|
||||
# ─── is_deferrable_error 분류 ──────────────────────────────────────────────
|
||||
|
||||
@pytest.mark.parametrize("exc", [
|
||||
_http_status_error(503), # 라우터 upstream_cold/editor_busy/warming
|
||||
httpx.ConnectError("connection refused"), # 맥북 sleep — 연결 자체 불가
|
||||
httpx.ConnectTimeout("connect timeout"),
|
||||
httpx.ReadTimeout("read timeout"), # 생성 도중 sleep 절단
|
||||
httpx.ReadError("connection reset"),
|
||||
httpx.RemoteProtocolError("server disconnected"),
|
||||
])
|
||||
def test_deferrable_errors(exc):
|
||||
assert is_deferrable_error(exc) is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize("exc", [
|
||||
_http_status_error(400), # unknown alias 등 — 설정 오류는 보류 아님
|
||||
_http_status_error(500),
|
||||
ValueError("parse"),
|
||||
RuntimeError("boom"),
|
||||
])
|
||||
def test_non_deferrable_errors(exc):
|
||||
assert is_deferrable_error(exc) is False
|
||||
|
||||
|
||||
# ─── call_deep 슬롯 선택 ───────────────────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_deep_uses_deep_slot():
|
||||
deep = SimpleNamespace(model="qwen-macbook")
|
||||
primary = SimpleNamespace(model="gemma-26b")
|
||||
client = _client(deep, primary)
|
||||
captured = {}
|
||||
|
||||
async def fake_request(cfg, prompt, system=None):
|
||||
captured["cfg"] = cfg
|
||||
return "ok"
|
||||
|
||||
client._request = fake_request
|
||||
assert await client.call_deep("p") == "ok"
|
||||
assert captured["cfg"] is deep
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_deep_falls_back_to_primary_when_slot_absent():
|
||||
"""슬롯 부재 = 기능 미활성 (방어적 primary — silent 강등이 아니라 기존 경로 그대로)."""
|
||||
primary = SimpleNamespace(model="gemma-26b")
|
||||
client = _client(None, primary)
|
||||
captured = {}
|
||||
|
||||
async def fake_request(cfg, prompt, system=None):
|
||||
captured["cfg"] = cfg
|
||||
return "ok"
|
||||
|
||||
client._request = fake_request
|
||||
await client.call_deep("p")
|
||||
assert captured["cfg"] is primary
|
||||
|
||||
|
||||
# ─── call_deep_or_defer 보류 변환 ──────────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("exc", [
|
||||
_http_status_error(503),
|
||||
httpx.ConnectError("refused"),
|
||||
httpx.ReadTimeout("cut mid-generation"),
|
||||
])
|
||||
async def test_defer_conversion(exc):
|
||||
client = _client(SimpleNamespace(model="qwen-macbook"), None)
|
||||
|
||||
async def fail_request(cfg, prompt, system=None):
|
||||
raise exc
|
||||
|
||||
client._request = fail_request
|
||||
with pytest.raises(StageDeferred):
|
||||
await call_deep_or_defer(client, "p")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_deferrable_propagates():
|
||||
"""400/일반 오류는 StageDeferred 아님 — 호출자 기존 실패 경로로 전파."""
|
||||
client = _client(SimpleNamespace(model="qwen-macbook"), None)
|
||||
|
||||
async def fail_request(cfg, prompt, system=None):
|
||||
raise _http_status_error(400)
|
||||
|
||||
client._request = fail_request
|
||||
with pytest.raises(httpx.HTTPStatusError):
|
||||
await call_deep_or_defer(client, "p")
|
||||
|
||||
|
||||
def test_stage_deferred_carries_backoff():
|
||||
e = StageDeferred("macbook_unavailable:ConnectError")
|
||||
assert e.retry_after_minutes == 30
|
||||
|
||||
|
||||
def test_router_fixture_shape():
|
||||
"""_request 파싱 경로(choices[0].message.content)가 라우터 실응답 형상과 일치하는지 고정."""
|
||||
data = json.loads(FIXTURE.read_text())
|
||||
content = data["choices"][0]["message"]["content"]
|
||||
assert isinstance(content, str) and len(content) > 0
|
||||
assert data["choices"][0]["message"]["role"] == "assistant"
|
||||
# 라우터가 alias 를 upstream 로컬 경로로 치환해 응답 — 실처리 모델 추적 가능
|
||||
assert "Qwen3.6-27B-8bit" in data["model"]
|
||||
|
||||
|
||||
# ─── drain 가드 (silent 강등 금지) ─────────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_requires_deep_slot(monkeypatch):
|
||||
import workers.queue_drain as qd
|
||||
|
||||
monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=None)))
|
||||
with pytest.raises(SystemExit):
|
||||
await qd.drain("summarize", 1)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_rejects_non_drain_stage(monkeypatch):
|
||||
import workers.queue_drain as qd
|
||||
|
||||
monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=object())))
|
||||
with pytest.raises(SystemExit):
|
||||
await qd.drain("classify", 1)
|
||||
Reference in New Issue
Block a user