Compare commits

...

5 Commits

Author SHA1 Message Date
hyungi a55bb3453d feat(presegment): PR3 테스트 — 알람·dedupe·override 검증·재개·무회귀 19건
tests/test_presegment_pr3.py: alerts no-op(env 미설정, 프로세스당 1회 로그)/
synochat·ntfy 포맷/실패 무raise(webhook 전부 fake — 실호출 0), HOLD 알람
발화+alerted_at 7일 dedupe, validate_override_boundaries(정상/dict형/중첩/
캡초과/커버리지 부족/범위 밖/TODO 잔존/공백 경고), leaf_spans 원문 재구성,
units_override 가 tier 판정(plan_summarize_units) 우회하고 map-reduce 재개,
잘못된 override(캡 초과·source_len 불일치)=재-HOLD+알람+LLM 콜 0,
override 없는 소형(단일콜)·whole(HOLD+알람) 문서 무회귀.

기존 test_summarize_units 26 + test_deep_summary_mapreduce 등 인접 100건
pass 유지 (test_pipeline_hold 1건 실패는 main 기존 결함 — 본 PR 무관).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:23 +09:00
hyungi 9061f2e25c feat(presegment): PR3 C — 유인 분할 CLI scripts/presegment_attended.py
컨테이너 안(docker exec)에서 실행하는 3-subcommand CLI:
  list   — awaiting_split 보류 큐 행(문서·tier·over%·토큰·초과 섹션·보류/재개 시각)
  export — 문서 통계+hier 개요(overview.md)·자동 pack 유닛 제안(summarize_units
           재사용)·초과 섹션 (start,end) 스팬+본문 덤프(섹션당 파일, 200K자 분할,
           파일명에 절대 오프셋)·boundaries 템플릿 JSON(자동팩 채움+초과=todo 마커)
           +README(유인 클로드 세션 작업 안내)
  apply  — 경계 검증(단조증가·비중첩·본문 범위·커버리지 90%+공백 경고·유닛 캡
           초과 시 유닛 명시 거부·todo 잔존 거부·source_len 드리프트 거부) 통과 시
           payload.presegment.units_override 기록 + awaiting_split=false +
           deferred_until 제거(즉시 재개) + status pending·alerted_at/map_results
           정리. --dry-run 지원.

stdout = 사람이 읽는 요약 + '{' 로 시작하는 기계 파싱용 JSON 라인.
DB 접속 = 기존 scripts/ 패턴(DATABASE_URL env, backfill_tier.py 동형).
마이그레이션 없음 — payload JSONB 만 사용.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:23 +09:00
hyungi 33427d4a42 feat(presegment): PR3 A+B — HOLD 웹훅 알람 + units_override 재개 경로
A. services/alerts.py 신설 — send_alert(title, message):
   ALERT_WEBHOOK_URL 미설정=no-op(프로세스당 1회 INFO), ALERT_WEBHOOK_KIND
   synochat(기본)|ntfy, httpx 5s, 실패=WARNING만(절대 raise 금지).
   deep_summary HOLD/override 거부 시 발화 — 문서 id·제목·tier·over%·토큰·
   초과 섹션 상위3·재개 예정시각·유인 분할 힌트. dedupe=payload.presegment
   .alerted_at(7일) — 매 24h 재보류마다 재알람 방지.

B. units_override 재개 — payload.presegment.units_override 존재 시 tier
   재판정·HOLD 없이 (start,end,title) 문자 오프셋 경계로 유닛 구성 후 기존
   PR2 map-reduce 그대로(유닛 단위 멱등 commit·reduce·doc 기록). 방어:
   source_len 불일치·형식 오류·유닛 추정토큰 > CAP*1.1 이면 실패 대신
   재-HOLD + 알람(잘못된 override 의 900s 콜 재생산 차단). override 없는
   문서는 기존 경로 무회귀.

summarize_units 에 공용 순수함수 추가: choose_override_source(md_content
우선)·validate_override_boundaries(단조·비중첩·범위·커버리지 90%·유닛 캡)·
units_from_boundaries·leaf_spans + greedy_pack 유닛에 leaf_indexes 기록
(export CLI 스팬 계산용).

plan ds-presegment-mapreduce-2 / env DEEP_SUMMARY_HOLD_RETRY_MINUTES 유지.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:02 +09:00
hyungi b91b05e889 refactor(board): 처리 머신 보드 나스+맥미니 2노드 재구성
2026-07-02 컷오버 반영 — GPU 서버 퇴역, 맥북 night-drain 보류(06-29 결정).

- 레인 2개: 나스(추출/마크다운/청크·임베딩 등 DS 본체 Docker 스테이지),
  맥미니(분류/요약/심층분석 — 단일 생성 LLM 허브 + bge-m3/리랭크)
- summarize 풀 분리(summarize_by_machine·ai_model_version 조인 SQL) 제거
  — FE 유일 소비자 확인 후 응답 스키마에서 정리 (5쿼리 -> 4쿼리)
- 맥북 전제 UI 제거: 요약 오프로드 분담막대·요약 합류 칩·번다운 합류
  변곡점 마커·잠듦 문구·전역 스트립 맥북 칩(맥미니 칩으로 대체)
- deferred_pending = LLM 백오프 신호로 맥미니 카드 귀속 (기능 보존)
- 번다운 차트·정직 ETA·실패 드로어·백그라운드 작업 등 머신 무관 기능 보존
- background_jobs 머신 귀속 기본값 gpu -> nas
- 단위테스트 2노드 기준 재작성 (27 passed)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-02 16:51:32 +09:00
hyungi 304a2b9c0f Merge pull request 'Feat/two node endpoints' (#51) from feat/two-node-endpoints into main
Reviewed-on: #51
2026-07-02 14:31:27 +09:00
13 changed files with 1523 additions and 379 deletions
+2 -17
View File
@@ -37,8 +37,8 @@ class CurrentItem(BaseModel):
class MachineCard(BaseModel):
"""머신 카드 — stage 귀속 합산 + 완료 실적(summarize 는 풀 분리) + state."""
key: Literal["gpu", "macmini", "macbook"]
"""머신 카드 — stage 귀속 합산 + 완료 실적 + state (나스/맥미니 2노드)."""
key: Literal["nas", "macmini"]
label: str
state: Literal["active", "deferred", "idle"]
stages: list[str]
@@ -59,20 +59,6 @@ class SummarizeEta(BaseModel):
eta_minutes: int | None
class MachineDone(BaseModel):
"""머신 1대의 summarize 완료 실적 (분담 표시용)."""
done_1h: int
done_today: int
class SummarizeByMachine(BaseModel):
"""summarize 풀의 머신별 완료 실적 분담 — 보드 레인의 '맥미니 vs 맥북'
오프로드 가시화용. rows_to_summarize_split 이 이미 계산하던 값의 노출
(ds-board-merged A-1, 신규 수집 SQL 0)."""
macmini: MachineDone
macbook: MachineDone
class TrendBucket(BaseModel):
"""summarize 24h 추이 버킷 — hour 는 KST "HH:00" 라벨."""
hour: str
@@ -122,7 +108,6 @@ class QueueOverviewResponse(BaseModel):
machines: list[MachineCard]
stages: list[StageRow]
summarize_eta: SummarizeEta
summarize_by_machine: SummarizeByMachine
trend_24h: list[TrendBucket]
totals: Totals
background_jobs: list[BackgroundJobItem] = []
+69
View File
@@ -0,0 +1,69 @@
"""운영 알람 webhook (presegment PR3 — HOLD 유인 전환 게이트).
deep_summary HOLD(awaiting_split) 처럼 "사람이 개입해야 풀리는" 상태를 웹훅으로 발화한다.
환경변수:
ALERT_WEBHOOK_URL — 미설정 = no-op (프로세스당 1회 INFO 로그만).
ALERT_WEBHOOK_KIND — 'synochat' | 'ntfy' (기본 synochat).
synochat: Synology Chat incoming webhook — POST form `payload={"text": "..."}`.
ntfy: POST body=message. 제목은 query param(?title=) — HTTP 헤더는 latin-1
한정이라 한글 제목이 깨진다 (ntfy 는 query param title 을 공식 지원).
불변식: 알람은 절대 raise 하지 않는다 — 실패는 WARNING 로그만. 알람이 워커를
죽이면 본전(요약 파이프라인)이 무너진다. 호출부는 반환값(bool)에 의존하지 말 것.
"""
from __future__ import annotations
import json
import os
import httpx
from core.utils import setup_logger
logger = setup_logger("alerts")
ALERT_TIMEOUT_SECONDS = 5.0
# 프로세스당 1회만 "미설정 no-op" 로그 — 매 HOLD 마다 로그 오염 방지.
_noop_logged = False
async def send_alert(title: str, message: str) -> bool:
"""webhook 으로 알람 1건 발화. 성공 True / no-op·실패 False. 절대 raise 금지."""
global _noop_logged
url = (os.getenv("ALERT_WEBHOOK_URL") or "").strip()
if not url:
if not _noop_logged:
logger.info("ALERT_WEBHOOK_URL 미설정 — 알람 no-op (이 로그는 프로세스당 1회)")
_noop_logged = True
return False
kind = (os.getenv("ALERT_WEBHOOK_KIND") or "synochat").strip().lower()
if kind not in ("synochat", "ntfy"):
logger.warning(f"ALERT_WEBHOOK_KIND={kind!r} 미지원 — synochat 으로 폴백")
kind = "synochat"
try:
async with httpx.AsyncClient(timeout=ALERT_TIMEOUT_SECONDS) as client:
if kind == "ntfy":
resp = await client.post(
url,
params={"title": title},
content=message.encode("utf-8"),
)
else: # synochat
text = f"{title}\n{message}" if title else message
resp = await client.post(
url,
data={"payload": json.dumps({"text": text}, ensure_ascii=False)},
)
if resp.status_code >= 400:
logger.warning(
f"알람 webhook({kind}) HTTP {resp.status_code}: {resp.text[:200]}"
)
return False
return True
except Exception as exc: # noqa: BLE001 — 알람 실패가 워커를 죽이면 안 됨
logger.warning(f"알람 webhook({kind}) 발화 실패: {exc}")
return False
+34 -112
View File
@@ -3,19 +3,16 @@
GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue /
documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약).
구조: SQL 수집부(build_overview 내부 5쿼리)와 판정부(순수 함수)를 분리.
구조: SQL 수집부(build_overview 내부 4쿼리)와 판정부(순수 함수)를 분리.
판정부(rows_to_* / build_machines / build_summarize_eta / build_trend /
build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능.
귀속 규칙 (단일 진실):
- stage→machine 정적 맵: gpu = extract/embed/chunk/markdown/preview/thumbnail/
fulltext/stt · macmini = classify/summarize · macbook = deep_summary
(단, settings.ai.deep 부재 시 deep_summary 도 macmini 귀속).
- summarize 는 풀(pool): pending/processing/failed 는 macmini 귀속이되, 완료
실적(done_*)은 documents.ai_model_version 조인으로 분리 — 'qwen-macbook'
이면 macbook 실적, 아니면 macmini 실적.
- deferred_pending(payload.deferred_until 미래)은 macbook 카드 귀속
(보류 = 맥북 불가 신호).
귀속 규칙 (단일 진실 — 2026-07-02 컷오버 후 나스+맥미니 2노드):
- stage→machine 정적 맵: nas = extract/embed/chunk/markdown/preview/thumbnail/
fulltext/stt (DS 본체 Docker — 임베딩·리랭크 모델 콜은 맥미니로 나감) ·
macmini = classify/summarize/deep_summary (단일 생성 LLM 허브).
- deferred_pending(payload.deferred_until 미래)은 LLM 백오프 신호 —
summarize/deep_summary 소속인 macmini 카드 귀속.
"""
from datetime import datetime, timedelta
@@ -25,42 +22,33 @@ from zoneinfo import ZoneInfo
from sqlalchemy import bindparam, text
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
KST = ZoneInfo("Asia/Seoul")
# 내부 판별용 alias — 응답에 raw 모델명 노출 금지, 머신 label 만 노출.
_MACBOOK_MODEL_ALIAS = "qwen-macbook"
# stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서)
_GPU_STAGES = (
_NAS_STAGES = (
"extract", "embed", "chunk", "markdown",
"preview", "thumbnail", "fulltext", "stt",
)
_MACMINI_STAGES = ("classify", "summarize")
_MACBOOK_STAGES = ("deep_summary",)
_STAGE_ORDER = _GPU_STAGES + _MACMINI_STAGES + _MACBOOK_STAGES
_MACMINI_STAGES = ("classify", "summarize", "deep_summary")
_STAGE_ORDER = _NAS_STAGES + _MACMINI_STAGES
_MACHINE_KEYS = ("gpu", "macmini", "macbook")
_MACHINE_KEYS = ("nas", "macmini")
_MACHINE_LABELS = {
"gpu": "GPU 서버",
"nas": "나스",
"macmini": "맥미니",
"macbook": "맥북 M5 Max",
}
# 머신 카드당 current 표시 상한
_CURRENT_LIMIT = 2
def stage_machine_map(deep_enabled: bool) -> dict[str, str]:
"""stage → machine key 맵. deep 슬롯 부재 시 deep_summary 는 macmini 귀속."""
def stage_machine_map() -> dict[str, str]:
"""stage → machine key 맵 (정적 — 나스/맥미니 2노드)."""
mapping: dict[str, str] = {}
for s in _GPU_STAGES:
mapping[s] = "gpu"
for s in _NAS_STAGES:
mapping[s] = "nas"
for s in _MACMINI_STAGES:
mapping[s] = "macmini"
for s in _MACBOOK_STAGES:
mapping[s] = "macbook" if deep_enabled else "macmini"
return mapping
@@ -90,23 +78,6 @@ def rows_to_stage_stats(rows) -> dict[str, dict]:
return stats
def rows_to_summarize_split(rows) -> dict[str, dict]:
"""summarize 완료 실적 분리 쿼리 행 → {"macbook"|"macmini": {done_*}}.
is_macbook = documents.ai_model_version 이 'qwen-macbook' 인지 (내부 판별 전용).
"""
split = {
"macbook": {"done_1h": 0, "done_today": 0, "done_15m": 0},
"macmini": {"done_1h": 0, "done_today": 0, "done_15m": 0},
}
for row in rows:
key = "macbook" if row[0] else "macmini"
split[key]["done_1h"] += int(row[1] or 0)
split[key]["done_today"] += int(row[2] or 0)
split[key]["done_15m"] += int(row[3] or 0)
return split
def display_title(row: dict) -> str:
"""표시용 제목 — title > original_filename > file_path basename > 문서 id."""
if row.get("title"):
@@ -120,13 +91,10 @@ def display_title(row: dict) -> str:
def build_machines(
stage_stats: dict[str, dict],
summarize_split: dict[str, dict],
current_rows: list[dict],
*,
deep_enabled: bool,
) -> list[dict]:
"""머신 카드 3장 (gpu / macmini / macbook) 구성 — 귀속 규칙의 판정부."""
smap = stage_machine_map(deep_enabled)
"""머신 카드 2장 (nas / macmini) 구성 — 귀속 규칙의 판정부."""
smap = stage_machine_map()
def g(stage: str, field: str) -> int:
return stage_stats.get(stage, {}).get(field, 0)
@@ -149,29 +117,23 @@ def build_machines(
pending = sum(g(s, "pending") for s in stages)
processing = sum(g(s, "processing") for s in stages)
failed = sum(g(s, "failed") for s in stages)
done_1h = sum(g(s, "done_1h") for s in stages)
done_today = sum(g(s, "done_today") for s in stages)
done_15m = sum(g(s, "done_15m") for s in stages)
# 완료 실적: summarize 는 풀이라 stage 합산에서 제외하고 split 로 귀속
done_1h = sum(g(s, "done_1h") for s in stages if s != "summarize")
done_today = sum(g(s, "done_today") for s in stages if s != "summarize")
done_15m = sum(g(s, "done_15m") for s in stages if s != "summarize")
if key in summarize_split:
done_1h += summarize_split[key]["done_1h"]
done_today += summarize_split[key]["done_today"]
done_15m += summarize_split[key]["done_15m"]
# 보류 백오프 = 맥북 불가 신호 → macbook 카드 귀속 (deep 슬롯 유무 무관)
# 보류 백오프 = LLM 불가 신호 → LLM stage 소속인 macmini 카드 귀속
deferred_pending = (
g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending")
if key == "macbook" else 0
if key == "macmini" else 0
)
# state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11).
# 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" —
# 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은
# 실제로 일이 멈춰 있고 백오프만 쌓인 상태(sleep/불가 지속)에서만.
# 실제로 일이 멈춰 있고 백오프만 쌓인 상태(LLM 허브 불가 지속)에서만.
if processing > 0 or done_15m > 0:
state = "active"
elif key == "macbook" and deferred_pending > 0:
elif deferred_pending > 0:
state = "deferred"
else:
state = "idle"
@@ -213,16 +175,6 @@ def build_summarize_eta(stage_stats: dict[str, dict]) -> dict:
}
def build_summarize_by_machine(summarize_split: dict[str, dict]) -> dict:
"""summarize 머신별 완료 실적 분담 (macmini vs macbook) — 보드 레인의
오프로드 가시화용. rows_to_summarize_split 이 이미 만든 값을 응답 형태로
투영(done_1h/done_today 만, done_15m 은 내부 state 판정 전용이라 제외)."""
def m(key: str) -> dict:
s = summarize_split.get(key, {})
return {"done_1h": int(s.get("done_1h", 0)), "done_today": int(s.get("done_today", 0))}
return {"macmini": m("macmini"), "macbook": m("macbook")}
def build_trend(
inflow_buckets: dict[str, int],
done_buckets: dict[str, int],
@@ -287,28 +239,23 @@ def build_totals(stage_stats: dict[str, dict]) -> dict:
def compose_overview(
stage_stats: dict[str, dict],
summarize_split: dict[str, dict],
inflow_buckets: dict[str, int],
done_buckets: dict[str, int],
current_rows: list[dict],
*,
deep_enabled: bool,
now_kst: datetime,
) -> dict:
"""수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요."""
return {
"machines": build_machines(
stage_stats, summarize_split, current_rows, deep_enabled=deep_enabled
),
"machines": build_machines(stage_stats, current_rows),
"stages": build_stages(stage_stats),
"summarize_eta": build_summarize_eta(stage_stats),
"summarize_by_machine": build_summarize_by_machine(summarize_split),
"trend_24h": build_trend(inflow_buckets, done_buckets, now_kst),
"totals": build_totals(stage_stats),
}
# ─── SQL 수집부 (총 5쿼리) ────────────────────────────────────────────────────
# ─── SQL 수집부 (총 4쿼리) ────────────────────────────────────────────────────
# 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방)
_STAGE_STATS_SQL = """
@@ -333,23 +280,7 @@ _STAGE_STATS_SQL = """
GROUP BY stage
"""
# 2) summarize 풀 완료 실적 분리 (documents.ai_model_version 조인, 1방)
# 스캔 하한 = 오늘 0시(KST)와 1h 전 중 더 이른 시각 (자정 직후 1h 창 보전).
_SUMMARIZE_SPLIT_SQL = """
SELECT
COALESCE(d.ai_model_version = :macbook_alias, false) AS is_macbook,
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '1 hour') AS done_1h,
COUNT(*) FILTER (WHERE q.completed_at > :kst_midnight) AS done_today,
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '15 minutes') AS done_15m
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.stage = 'summarize'
AND q.status = 'completed'
AND q.completed_at > LEAST(:kst_midnight, NOW() - INTERVAL '1 hour')
GROUP BY 1
"""
# 3/4) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
# 2/3) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
_TREND_INFLOW_SQL = """
SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'),
'YYYY-MM-DD HH24:00') AS bucket,
@@ -371,7 +302,7 @@ _TREND_DONE_SQL = """
GROUP BY 1
"""
# 5) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서)
# 4) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서)
_CURRENT_SQL = """
SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path
FROM processing_queue q
@@ -383,20 +314,13 @@ _CURRENT_SQL = """
async def build_overview(session: AsyncSession) -> dict:
"""5쿼리 수집 → compose_overview 판정 → 응답 dict."""
"""4쿼리 수집 → compose_overview 판정 → 응답 dict."""
now_kst = datetime.now(KST)
kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0)
deep_enabled = settings.ai is not None and settings.ai.deep is not None
stage_rows = (
await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight})
).all()
split_rows = (
await session.execute(
text(_SUMMARIZE_SPLIT_SQL),
{"kst_midnight": kst_midnight, "macbook_alias": _MACBOOK_MODEL_ALIAS},
)
).all()
inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all()
done_rows = (await session.execute(text(_TREND_DONE_SQL))).all()
current_result = (await session.execute(text(_CURRENT_SQL))).all()
@@ -414,11 +338,9 @@ async def build_overview(session: AsyncSession) -> dict:
result = compose_overview(
rows_to_stage_stats(stage_rows),
rows_to_summarize_split(split_rows),
{row[0]: int(row[1]) for row in inflow_rows},
{row[0]: int(row[1]) for row in done_rows},
current_rows,
deep_enabled=deep_enabled,
now_kst=now_kst,
)
# 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]).
@@ -426,13 +348,13 @@ async def build_overview(session: AsyncSession) -> dict:
return result
# kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = gpu(오케스트레이션 호스트).
# kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = nas(오케스트레이션 호스트).
_BG_JOB_MACHINE = {
"global_digest": "macmini",
"morning_briefing": "macmini",
"section_summary": "macmini",
"hier_backfill": "gpu",
"hier_redecompose": "gpu",
"hier_backfill": "nas",
"hier_redecompose": "nas",
}
@@ -466,7 +388,7 @@ async def _fetch_background_jobs(session: AsyncSession) -> list[dict]:
"processed": int(r["processed"] or 0), "total": r["total"],
"elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]),
"error": r["error"],
"machine": _BG_JOB_MACHINE.get(r["kind"], "gpu"),
"machine": _BG_JOB_MACHINE.get(r["kind"], "nas"),
}
for r in rows
]
+193 -3
View File
@@ -66,6 +66,9 @@ class SummarizeUnit:
text: str = ""
est_tokens: int = 0
over_cap: bool = False # 단독 섹션이 CAP 초과 (hybrid 시 클로드 대상)
# PR3: 이 유닛을 구성한 leaf 의 서수(extract_leaves 순서) — export CLI 가
# leaf_spans 와 결합해 유닛 (start,end) 스팬을 계산한다. 페이로드 미기록.
leaf_indexes: list[int] = field(default_factory=list)
@dataclass
@@ -92,20 +95,22 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
units: list[SummarizeUnit] = []
cur_titles: list[str | None] = []
cur_texts: list[str] = []
cur_indexes: list[int] = []
cur_tokens = 0
def _flush() -> None:
nonlocal cur_titles, cur_texts, cur_tokens
nonlocal cur_titles, cur_texts, cur_indexes, cur_tokens
if cur_texts:
units.append(SummarizeUnit(
index=len(units),
section_titles=cur_titles,
text="\n\n".join(cur_texts),
est_tokens=cur_tokens,
leaf_indexes=cur_indexes,
))
cur_titles, cur_texts, cur_tokens = [], [], 0
cur_titles, cur_texts, cur_indexes, cur_tokens = [], [], [], 0
for leaf in leaves:
for li, leaf in enumerate(leaves):
t = estimate_tokens(leaf.text)
if t > cap:
_flush()
@@ -115,12 +120,14 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
text=leaf.text,
est_tokens=t,
over_cap=True,
leaf_indexes=[li],
))
continue
if cur_tokens + t > cap:
_flush()
cur_titles.append(leaf.section_title)
cur_texts.append(leaf.text)
cur_indexes.append(li)
cur_tokens += t
_flush()
return units
@@ -222,3 +229,186 @@ def build_reduce_units_block(
block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))]
truncated = True
return block, truncated
# ─── PR3 — 유인 분할(units_override) 경계 순수함수 (worker + attended CLI 공용) ───
#
# HOLD(hybrid/whole) 문서를 사람이(유인 클로드 세션) 분할한 경계
# [(start, end, title)] 로 재개하는 경로. 오프셋 = 소스 텍스트의 Python 문자
# (code point) 인덱스 — export 가 덤프한 파일과 apply/워커의 슬라이스가 같은
# 기준을 공유해야 한다 (builder 의 char_start 는 UTF-16 단위라 여기서 미사용).
OVERRIDE_MIN_COVERAGE_PCT = 90.0 # apply 게이트 — 전체 본문의 90%+ 커버 필수
OVERRIDE_GAP_WARN_CHARS = 1_000 # 이보다 큰 공백 구간은 경고로 노출
@dataclass
class OverrideCheck:
"""validate_override_boundaries 결과 — ok=False 면 errors 에 사유."""
ok: bool
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
coverage_pct: float = 0.0
# 정규화된 (start, end, title) — units_from_boundaries 입력으로 그대로 사용
boundaries: list[tuple[int, int, str | None]] = field(default_factory=list)
unit_tokens: list[int] = field(default_factory=list)
def choose_override_source(
md_content: str | None, extracted_text: str | None
) -> tuple[str, str]:
"""units_override 오프셋 기준 텍스트 선택 — (source_name, text).
canonical markdown(md_content) 우선, 부재/공백 시 extracted_text 폴백.
export CLI · apply CLI · 워커 재개가 반드시 같은 규칙을 공유해야
(start,end) 오프셋이 일치한다. 선택 결과는 units_override.source 에 박제.
"""
if md_content and md_content.strip():
return "md_content", md_content
return "extracted_text", extracted_text or ""
def _normalize_boundary(entry, idx: int, errors: list[str]) -> tuple[int, int, str | None] | None:
"""boundaries 1건 정규화 — [start,end,title?] 배열 또는 {start,end,title} 객체.
export 템플릿의 초과 스팬은 "todo" 키를 달고 나온다 — 미해결(todo 잔존) 상태로
apply 하면 여기서 에러 (사람이 CAP 이하 경계로 분할을 완성해야 통과).
"""
if isinstance(entry, dict):
if entry.get("todo"):
errors.append(
f"유닛 {idx}: TODO 미해결 — 초과 스팬을 CAP 이하 경계들로 분할한 뒤 todo 키를 제거해야 함"
)
return None
start, end, title = entry.get("start"), entry.get("end"), entry.get("title")
elif isinstance(entry, (list, tuple)) and 2 <= len(entry) <= 3:
start, end = entry[0], entry[1]
title = entry[2] if len(entry) == 3 else None
else:
errors.append(f"유닛 {idx}: 형식 오류 — [start, end, title] 또는 {{start, end, title}} 여야 함")
return None
if (
isinstance(start, bool) or isinstance(end, bool)
or not isinstance(start, int) or not isinstance(end, int)
):
errors.append(f"유닛 {idx}: start/end 는 정수여야 함 (start={start!r}, end={end!r})")
return None
if title is not None and not isinstance(title, str):
title = str(title)
return start, end, title
def validate_override_boundaries(
text: str,
raw_boundaries,
*,
cap: int = CAP_TOKENS,
min_coverage_pct: float = OVERRIDE_MIN_COVERAGE_PCT,
gap_warn_chars: int = OVERRIDE_GAP_WARN_CHARS,
) -> OverrideCheck:
"""units_override 경계 검증 — 단조증가·비중첩·본문 범위 내·커버리지·유닛별 캡.
apply CLI = 기본 게이트(cap=CAP_TOKENS, coverage>=90%).
워커 방어 = cap 슬랙(CAP*1.1)·coverage 0 으로 완화 호출 — 잘못된 override 가
900s 콜을 재생산하는 것만 차단하고, 품질 게이트는 apply 시점에 이미 통과했다고 본다.
"""
errors: list[str] = []
warnings: list[str] = []
if not isinstance(raw_boundaries, (list, tuple)) or not raw_boundaries:
return OverrideCheck(ok=False, errors=["boundaries 가 비어있거나 리스트가 아님"])
normalized: list[tuple[int, int, str | None]] = []
for i, entry in enumerate(raw_boundaries):
norm = _normalize_boundary(entry, i, errors)
if norm is not None:
normalized.append(norm)
if errors:
return OverrideCheck(ok=False, errors=errors, warnings=warnings)
n = len(text)
prev_end = None
unit_tokens: list[int] = []
covered = 0
for i, (start, end, title) in enumerate(normalized):
label = f"유닛 {i}" + (f" ({title})" if title else "")
if start < 0 or end > n:
errors.append(f"{label}: 본문 범위 밖 — [{start}, {end}) vs len={n}")
continue
if start >= end:
errors.append(f"{label}: start >= end ([{start}, {end}))")
continue
if prev_end is not None and start < prev_end:
errors.append(f"{label}: 직전 유닛과 중첩/역순 — start={start} < 직전 end={prev_end}")
if prev_end is not None and start - prev_end > gap_warn_chars:
warnings.append(f"{label} 앞 공백 구간 {start - prev_end:,}자 ([{prev_end}, {start})) — 의도 확인")
est = estimate_tokens(text[start:end])
unit_tokens.append(est)
if est > cap:
errors.append(f"{label}: 추정 {est:,} tok > cap {cap:,} — 이 스팬을 더 분할해야 함")
covered += end - start
prev_end = max(prev_end or 0, end)
if not errors:
head_gap = normalized[0][0]
tail_gap = n - normalized[-1][1]
if head_gap > gap_warn_chars:
warnings.append(f"문서 선두 공백 구간 {head_gap:,}자 ([0, {normalized[0][0]})) — 의도 확인")
if tail_gap > gap_warn_chars:
warnings.append(f"문서 말미 공백 구간 {tail_gap:,}자 ([{normalized[-1][1]}, {n})) — 의도 확인")
coverage_pct = round(covered * 100.0 / n, 2) if n else 0.0
if not errors and coverage_pct < min_coverage_pct:
errors.append(
f"커버리지 {coverage_pct}% < {min_coverage_pct}% — 경계가 본문 대부분을 덮어야 함"
)
return OverrideCheck(
ok=not errors,
errors=errors,
warnings=warnings,
coverage_pct=coverage_pct,
boundaries=normalized if not errors else [],
unit_tokens=unit_tokens,
)
def units_from_boundaries(
text: str, boundaries: list[tuple[int, int, str | None]]
) -> list[SummarizeUnit]:
"""정규화·검증 통과한 (start,end,title) 리스트 → SummarizeUnit 리스트.
유닛 index = 경계 서수 — boundaries 는 payload 에 박제되므로 attempt 간 안정
(map_results 멱등 재개 키와 정합).
"""
units: list[SummarizeUnit] = []
for i, (start, end, title) in enumerate(boundaries):
seg = text[start:end]
units.append(SummarizeUnit(
index=i,
section_titles=[title],
text=seg,
est_tokens=estimate_tokens(seg),
))
return units
def leaf_spans(text: str, leaves: list[HierNode]) -> list[tuple[int, int]]:
"""extract_leaves 결과 leaf 들의 원문 (start,end) 문자 스팬.
_segment 가 원문을 연속 파티션(빈 preamble 만 폐기)으로 자르므로, 커서 순차
탐색이 항상 정확한 위치를 찾는다 (동일 본문 반복이 있어도 순서가 앞선 leaf 가
앞 오프셋을 가져간다).
"""
spans: list[tuple[int, int]] = []
cursor = 0
for leaf in leaves:
pos = text.find(leaf.text, cursor)
if pos < 0:
# 이론상 불가(연속 파티션) — 방어적으로 전체 재탐색
pos = text.find(leaf.text)
if pos < 0:
raise ValueError(f"leaf 본문을 원문에서 찾지 못함 (title={leaf.section_title!r})")
spans.append((pos, pos + len(leaf.text)))
cursor = pos + len(leaf.text)
return spans
+189 -12
View File
@@ -14,7 +14,7 @@ import asyncio
import json
import os
import time
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import desc, select
@@ -29,15 +29,19 @@ from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue, StageDeferred
from policy.prompt_render import render_26b, policy_version as compute_policy_version
from services.alerts import send_alert
from services.document_telemetry import record_analyze_event
from services.search.llm_gate import Priority, acquire_mlx_gate
from services.summarize_units import (
CAP_TOKENS,
UnitPlan,
build_reduce_units_block,
choose_override_source,
estimate_tokens,
plan_summarize_units,
render_map_slice,
units_from_boundaries,
validate_override_boundaries,
)
logger = setup_logger("deep_summary_worker")
@@ -45,9 +49,16 @@ logger = setup_logger("deep_summary_worker")
DEEP_SUMMARY_TASK = "p3c_deep_summary"
# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce
REDUCE_TASK = "p3c_deep_summary_reduce"
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격. PR3(알람·경계 주입) 전까지는
# 이 간격으로 재계획만 반복한다 — attempts 미소모(StageDeferred)라 영구 failed 없음.
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격 — attempts 미소모(StageDeferred)라
# 영구 failed 없음. PR3: HOLD 시 웹훅 알람 + units_override 주입 시 즉시 재개.
HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440"))
# HOLD 알람 dedupe — payload.presegment.alerted_at 이 이 일수 이내면 재발화 억제
# (매 24h 재보류마다 재알람 방지). apply CLI 가 override 기록 시 alerted_at 을 지워
# 다음 이벤트(예: override 거부)는 신선하게 발화된다.
ALERT_DEDUPE_DAYS = 7
# units_override 방어 캡 슬랙 — apply 게이트(CAP)보다 10% 여유. 초과 유닛은 실패
# 대신 재-HOLD + 알람 (잘못된 override 가 900s 콜을 재생산하는 것 차단).
OVERRIDE_CAP_SLACK = 1.1
# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어).
REDUCE_BUDGET_FLOOR_TOKENS = 1_000
@@ -111,6 +122,17 @@ async def process(
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
# ─── presegment PR3 — units_override 재개 경로 (유인 분할 경계 주입) ───
# apply CLI(scripts/presegment_attended.py) 가 payload.presegment.units_override 를
# 기록한 문서는 tier 재판정·HOLD 없이 그 경계로 유닛을 구성해 기존 PR2
# map-reduce 경로를 그대로 탄다. override 없는 문서는 아래 기존 경로와 바이트 동일.
if (payload.get("presegment") or {}).get("units_override"):
await _process_units_override(
doc, queue_row, envelope, subject_domain, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
return
# ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ───
# TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way:
# auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce)
@@ -123,7 +145,9 @@ async def process(
# units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce
# 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다.
if unit_plan.tier != "auto" or not unit_plan.units:
await _hold_awaiting_split(session, queue_row, unit_plan, document_id)
await _hold_awaiting_split(
session, queue_row, unit_plan, document_id, doc_title=doc.title
)
await _process_map_reduce(
doc, queue_row, envelope, subject_domain, unit_plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
@@ -250,43 +274,196 @@ async def process(
)
def _hold_alert_due(preseg: dict, now: datetime) -> bool:
"""HOLD 알람 dedupe — alerted_at 이 없거나 ALERT_DEDUPE_DAYS 초과 시에만 발화."""
ts = preseg.get("alerted_at")
if not ts:
return True
try:
prev = datetime.fromisoformat(str(ts))
except ValueError:
return True # 깨진 타임스탬프 = 기록 신뢰 불가 → 발화하고 재기록
if prev.tzinfo is None:
prev = prev.replace(tzinfo=timezone.utc)
return (now - prev) >= timedelta(days=ALERT_DEDUPE_DAYS)
async def _hold_awaiting_split(
session: AsyncSession, queue_row: ProcessingQueue, plan: UnitPlan, document_id: int
session: AsyncSession,
queue_row: ProcessingQueue,
plan: UnitPlan,
document_id: int,
doc_title: str | None = None,
) -> None:
"""HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류.
payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러
(queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음.
알람(ntfy)·클로드 경계 주입은 PR3 — 그 전까지는 HOLD_RETRY_MINUTES 간격 재계획만 반복.
PR3: 유인 전환 게이트 웹훅 알람 발화(alerted_at dedupe — 매 24h 재보류마다 재알람 방지).
무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트).
"""
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
oversized = [
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20]
preseg.update({
"awaiting_split": True,
"tier": plan.tier,
"over_pct": plan.over_pct,
"total_est_tokens": plan.total_est_tokens,
"units": len(plan.units),
# 클로드가 분할해야 할 초과 섹션 표본 (PR3 알람 본문용)
"oversized_sections": [
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20],
# 클로드가 분할해야 할 초과 섹션 표본 (알람 본문 + export CLI 안내용)
"oversized_sections": oversized,
})
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit()
logger.info(
f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} "
f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} "
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, 클로드 분할=PR3 유인)"
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, alert={'발화' if alert_due else 'dedupe'})"
)
if alert_due:
# commit 이후 발화 — 알람이 5s 행에 걸려도 payload 마킹은 이미 영속.
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
top3 = ", ".join(t for t in oversized[:3] if t) or "(제목 없음)"
await send_alert(
f"[DS] deep_summary HOLD — doc {document_id} 유인 분할 필요",
(
f"문서: {doc_title or '(제목 없음)'} (id={document_id})\n"
f"tier={plan.tier} / over%={plan.over_pct} / "
f"total_est_tokens={plan.total_est_tokens:,} / units={len(plan.units)}\n"
f"초과 섹션(상위 3): {top3}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"유인 분할: scripts/presegment_attended.py export --doc {document_id}"
),
)
raise StageDeferred(
f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES
)
async def _rehold_bad_override(
session: AsyncSession, queue_row: ProcessingQueue, doc: Document, reason: str
) -> None:
"""잘못된 units_override — 실패 대신 재-HOLD + 알람 (900s 콜 재생산 차단).
units_override 는 payload 에 보존(사람이 원인 조사) + override_rejected 사유 기록.
apply CLI 가 alerted_at 을 지워두므로 첫 거부는 즉시 발화되고, 이후 24h 재보류
루프는 ALERT_DEDUPE_DAYS dedupe 로 억제된다.
"""
document_id = doc.id
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
preseg.update({
"awaiting_split": True,
"override_rejected": reason,
"override_rejected_at": now.isoformat(),
})
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit()
logger.warning(f"[deep] id={document_id} units_override 거부 → 재-HOLD: {reason}")
if alert_due:
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
await send_alert(
f"[DS] deep_summary 유인 분할 경계 거부 — doc {document_id}",
(
f"문서: {doc.title or '(제목 없음)'} (id={document_id})\n"
f"사유: {reason}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"수정: scripts/presegment_attended.py export --doc {document_id} "
f"→ apply --doc {document_id} --boundaries FILE"
),
)
raise StageDeferred(
f"override_rejected:{reason[:80]}", retry_after_minutes=HOLD_RETRY_MINUTES
)
async def _process_units_override(
doc: Document,
queue_row: ProcessingQueue,
envelope: EscalationEnvelope,
subject_domain: str,
session: AsyncSession,
*,
defer_on_deep_unavailable: bool,
) -> None:
"""PR3 — apply CLI 가 기록한 유인 분할 경계로 map-reduce 재개.
경계 = units_override.source 텍스트의 (start, end) 문자 오프셋. 방어 검증
(source_len 일치·단조·비중첩·범위·유닛 캡*1.1) 실패 시 재-HOLD + 알람 —
apply 이후 본문이 재생성됐거나 수기 주입이 깨진 경우 900s 콜로 흐르지 않는다.
통과 시 기존 PR2 _process_map_reduce 를 그대로 탄다(맵 결과 유닛 단위 commit·
reduce·ai_detail_summary 기록 — 유닛 index 는 payload 박제 경계 서수라 안정).
"""
document_id = doc.id
preseg = dict((queue_row.payload or {}).get("presegment") or {})
override = preseg.get("units_override")
if isinstance(override, (list, tuple)):
# 수기 주입 호환 — bare [(start,end,title)] 리스트도 허용
override = {"boundaries": list(override)}
if not isinstance(override, dict):
await _rehold_bad_override(
session, queue_row, doc, f"units_override 형식 오류 (type={type(override).__name__})"
)
source = override.get("source")
if source is None:
source, text_src = choose_override_source(doc.md_content, doc.extracted_text)
elif source in ("md_content", "extracted_text"):
text_src = (doc.md_content if source == "md_content" else doc.extracted_text) or ""
else:
await _rehold_bad_override(
session, queue_row, doc, f"units_override.source={source!r} 미지원"
)
expected_len = override.get("source_len")
if expected_len is not None and expected_len != len(text_src):
await _rehold_bad_override(
session, queue_row, doc,
f"source_len 불일치 — override={expected_len:,} vs 현재 {source}={len(text_src):,}"
" (본문 재생성됨 — export 부터 재실행)",
)
check = validate_override_boundaries(
text_src,
override.get("boundaries") or [],
cap=int(CAP_TOKENS * OVERRIDE_CAP_SLACK),
min_coverage_pct=0.0, # 커버리지 품질 게이트는 apply CLI 가 이미 통과시킴
)
if not check.ok:
await _rehold_bad_override(session, queue_row, doc, "; ".join(check.errors[:5]))
units = units_from_boundaries(text_src, check.boundaries)
plan = UnitPlan(
mode="map_reduce",
tier="override",
total_est_tokens=estimate_tokens(text_src),
over_pct=float(preseg.get("over_pct") or 0.0),
units=units,
)
logger.info(
f"[deep] id={document_id} units_override 재개 — source={source} units={len(units)} "
f"coverage={check.coverage_pct}% max_unit_tokens={max(check.unit_tokens, default=0)}"
)
await _process_map_reduce(
doc, queue_row, envelope, subject_domain, plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
async def _call_26b(
client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int
):
@@ -1,6 +1,7 @@
<script lang="ts">
// 처리 머신 보드 v3통합안 (plan ds-board-merged: C2 머신레인 + C3 번다운/정직ETA).
// · 머신 3레인(GPU/맥미니/맥북) = "누가 일하나" + 요약 오프로드(맥북 합류) 가시화
// 처리 머신 보드 v42026-07-02 컷오버 후 2노드 (나스+맥미니).
// · 머신 2레인(나스/맥미니) = "누가 일하나" — 나스=DS 본체 Docker(추출/마크다운/
// 청크·임베딩 등), 맥미니=단일 생성 LLM 허브(분류/요약/심층분석 + bge-m3/리랭크)
// · 지배 백로그 번다운 패널 = "언제 끝나나" + 유입 차감한 정직 ETA(summarize_eta)
// · 신선도 '갱신 N초 전' + stale 경고 / 실패 드로어·상세 패널은 v2 자산 재사용.
// 데이터 = GET /api/queue/overview (60s 폴링 store) + GET /api/queue/failed (드로어).
@@ -193,7 +194,7 @@
const machineByKey = $derived(
new Map<FlowMachine, MachineOverview>(overview.machines.map((m) => [m.key as FlowMachine, m])),
);
const LANE_ORDER: FlowMachine[] = ['gpu', 'macmini', 'macbook'];
const LANE_ORDER: FlowMachine[] = ['nas', 'macmini'];
const lanes = $derived(
LANE_ORDER.map((key) => ({
key,
@@ -203,13 +204,6 @@
})),
);
// 요약 오프로드 분담 — 맥미니 vs 맥북 (A-1 summarize_by_machine)
const split = $derived(overview.summarize_by_machine);
const splitTotal1h = $derived(Math.max(1, split.macmini.done_1h + split.macbook.done_1h));
const macbookSharePct = $derived(Math.round((split.macbook.done_1h / splitTotal1h) * 100));
// 맥북이 요약을 실제로 가져가는 중인가 (합류 표식 게이트)
const offloadActive = $derived(split.macbook.done_1h > 0);
// ─── 백그라운드 작업 (큐 밖 스크립트 backfill) — processing_queue 사각지대 노출 ───
const bgJobs = $derived(overview.background_jobs ?? []);
const runningBg = $derived(bgJobs.filter((j) => j.state === 'running'));
@@ -266,7 +260,7 @@
: `갱신 ${Math.round(ageSec / 60)}분 전`,
);
// ─── 24h 번다운 (C3) — 요약 유입 vs 소화 + 맥북 합류 변곡점 마커 ───
// ─── 24h 번다운 (C3) — 요약 유입 vs 소화 ───
const burn = $derived.by(() => {
const t = overview.trend_24h;
if (!t || t.length === 0) return null;
@@ -279,20 +273,12 @@
t.map((b, i) => `${(i * step).toFixed(1)},${y(sel(b))}`).join(' ');
const doneLine = line((b) => b.done);
const area = `0,${h} ${doneLine} ${w.toFixed(1)},${h}`;
// 합류 변곡점 = done 최대 버킷 (맥북 야간 drain 합류 추정)
let mi = 0;
t.forEach((b, i) => {
if (b.done > t[mi].done) mi = i;
});
return {
w,
h,
area,
doneLine,
inflowLine: line((b) => b.inflow),
markX: (mi * step).toFixed(1),
markHour: t[mi].hour,
markDone: t[mi].done,
peak: max,
};
});
@@ -332,7 +318,7 @@
</span>
</div>
<!-- 머신 레인 (누가 일하나 + 요약 오프로드) -->
<!-- 머신 레인 (누가 일하나) -->
<div class="grid gap-2 mb-3">
{#each lanes as lane (lane.key)}
<div class="bg-surface border border-default rounded-card px-3.5 py-2.5">
@@ -342,11 +328,8 @@
<span class="text-[10px] text-faint font-mono">{lane.meta.model}</span>
<span class="text-[11px] text-dim tabular-nums ml-1">{formatRate(lane.card?.done_1h ?? 0)}/h</span>
{#each bgForMachine(lane.key) as j (j.id)}<span class="text-[10px] font-semibold text-success tabular-nums ml-1">생성 중: {j.label ?? j.kind}{#if j.total} {j.processed}/{j.total}{/if}</span>{/each}
{#if lane.key === 'macbook' && (lane.card?.deferred_pending ?? 0) > 0}
<span class="text-[10px] font-semibold text-warning tabular-nums">보류 {lane.card?.deferred_pending}</span>
{/if}
{#if lane.card?.state === 'deferred'}
<span class="text-[9px] text-warning">잠듦 — 요약은 맥미니로 복귀</span>
{#if (lane.card?.deferred_pending ?? 0) > 0}
<span class="text-[10px] font-semibold text-warning tabular-nums" title="LLM 백오프 — 자동 재개 대기">보류 {lane.card?.deferred_pending}</span>
{/if}
</div>
<div class="flex items-stretch gap-1.5 flex-wrap">
@@ -368,26 +351,8 @@
</div>
<div class="text-sm font-extrabold tabular-nums leading-tight text-text">{n.pending.toLocaleString()}<span class="text-[9px] text-faint font-normal ml-0.5">대기</span></div>
<div class="text-[9px] text-dim tabular-nums whitespace-nowrap">{formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()}</div>
{#if n.def.key === 'summarize'}
<div class="mt-1 h-1 w-full rounded-full overflow-hidden flex" title="맥미니 {split.macmini.done_1h}/h · 맥북 {split.macbook.done_1h}/h">
<span class="block h-full mtag-macmini-bar" style="width:{100 - macbookSharePct}%"></span>
<span class="block h-full mtag-macbook-bar" style="width:{macbookSharePct}%"></span>
</div>
<div class="text-[9px] text-faint tabular-nums whitespace-nowrap mt-0.5">맥미니 {split.macmini.done_1h} · 맥북 {split.macbook.done_1h}/h</div>
{/if}
</button>
{/each}
{#if lane.key === 'macbook' && offloadActive}
<button
class="text-left rounded-lg border border-dashed border-warning/50 px-2.5 py-1.5 cursor-pointer hover:bg-surface-hover min-w-[96px]"
onclick={() => toggleNode('summarize')}
title="맥북이 요약을 맥미니에서 가져와 처리 "
>
<div class="flex items-center gap-1 text-[11px] font-semibold text-text whitespace-nowrap">요약 합류 <span class="text-[8px] font-bold text-warning">OFFLOAD</span></div>
<div class="text-sm font-extrabold tabular-nums leading-tight text-text">{split.macbook.done_1h}<span class="text-[9px] text-faint font-normal ml-0.5">/h</span></div>
<div class="text-[9px] text-dim tabular-nums whitespace-nowrap">요약의 {macbookSharePct}% 담당</div>
</button>
{/if}
</div>
</div>
{/each}
@@ -399,15 +364,11 @@
<div class="flex items-center gap-2 mb-2">
<span class="text-[11px] font-bold text-text">요약 백로그 24시간</span>
<span class="text-[9px] text-faint">유입(회색) vs 소화(녹색)</span>
{#if offloadActive}<span class="text-[9px] text-warning ml-auto">맥북 합류 {burn.markHour} — 소화 급증</span>{/if}
</div>
<svg viewBox="0 0 {burn.w} {burn.h}" class="block w-full" style="height:64px" preserveAspectRatio="none" role="img" aria-label="요약 백로그 24시간 번다운">
<polygon points={burn.area} fill="currentColor" class="text-success" opacity="0.12" />
<polyline points={burn.inflowLine} fill="none" stroke="currentColor" stroke-width="1.2" class="text-faint" />
<polyline points={burn.doneLine} fill="none" stroke="currentColor" stroke-width="1.6" class="text-success" />
{#if offloadActive}
<line x1={burn.markX} y1="0" x2={burn.markX} y2={burn.h} stroke="currentColor" stroke-width="1" stroke-dasharray="2 2" class="text-warning" opacity="0.7" />
{/if}
</svg>
<div class="flex flex-wrap gap-x-4 gap-y-1 mt-2 pt-2 border-t border-default text-[10px] text-dim tabular-nums">
{#each mainNodes.filter((n) => n.pending > 0 && n.def.key !== 'summarize') as n (n.def.key)}
@@ -558,13 +519,9 @@
</div>
<style>
/* 머신 색 — 디자인 토큰 외 3색 (gpu 청/macmini 보라/macbook 황) — 이 컴포넌트 한정 */
.mtag-gpu { background: #e7eef6; color: #3b6ea5; }
/* 머신 색 — 디자인 토큰 외 2색 (nas 청/macmini 보라) — 이 컴포넌트 한정 */
.mtag-nas { background: #e7eef6; color: #3b6ea5; }
.mtag-macmini { background: #efe9f7; color: #8a5fbf; }
.mtag-macbook { background: #f7eedd; color: #b07a10; }
/* 요약 오프로드 분담 막대 채움 (맥미니 보라 / 맥북 황) */
.mtag-macmini-bar { background: #8a5fbf; }
.mtag-macbook-bar { background: #b07a10; }
.node-sel { outline: 2px solid #3b6ea5; outline-offset: 1px; }
.detail-frame { border-color: #3b6ea5; }
.detail-head { background: #e7eef6; }
@@ -1,6 +1,6 @@
<script lang="ts">
// 처리 현황 드로어 (안6 라이트) — 전 페이지 상태 스트립 클릭 시 우측에서 열림.
// 머신 미니카드 3 + ETA 한 줄 + 실패 합계 + 홈 링크 축약본. 상세는 홈 보드가 담당.
// 머신 미니카드 2(나스/맥미니) + ETA 한 줄 + 실패 합계 + 홈 링크 축약본. 상세는 홈 보드가 담당.
// 데이터 = queueOverview store 공유 (60s 폴링, 실패 시 null → 안내문으로 degrade).
// 열림 상태는 uiState 단일 drawer slot('queue') — 사이드바 드로어와 동시 오픈 차단.
import { X } from 'lucide-svelte';
@@ -51,7 +51,7 @@
<div class="p-4 space-y-3">
{#if data}
<!-- 머신 미니카드 3 -->
<!-- 머신 미니카드 (나스/맥미니) -->
{#each data.machines as m (m.key)}
<div class="bg-surface border border-default rounded-lg px-3.5 py-2.5">
<div class="flex items-center justify-between gap-2">
+2 -9
View File
@@ -5,7 +5,7 @@
* .
*/
export type MachineKey = 'gpu' | 'macmini' | 'macbook';
export type MachineKey = 'nas' | 'macmini';
/** 머신 상태 — active(가동) / deferred(보류) / idle(대기) */
export type MachineState = 'active' | 'deferred' | 'idle';
@@ -29,7 +29,7 @@ export interface MachineOverview {
/** 최근 1시간 완료 건수 (처리율 N/h 표기) */
done_1h: number;
done_today: number;
/** 보류 건수 — 맥북 sleep 등으로 자동 재개 대기 중 */
/** 보류 건수 — LLM 허브 백오프 등으로 자동 재개 대기 중 */
deferred_pending: number;
current: MachineCurrentItem[];
}
@@ -50,12 +50,6 @@ export interface TrendPoint {
done: number;
}
/** summarize 머신별 완료 실적 분담 (오프로드 가시화 — ds-board-merged A-1) */
export interface SummarizeByMachine {
macmini: { done_1h: number; done_today: number };
macbook: { done_1h: number; done_today: number };
}
export interface QueueTotals {
pending: number;
processing: number;
@@ -93,7 +87,6 @@ export interface BackgroundJob {
export interface QueueOverview {
machines: MachineOverview[];
summarize_eta: SummarizeEta;
summarize_by_machine: SummarizeByMachine;
trend_24h: TrendPoint[];
stages: QueueStageRow[];
totals: QueueTotals;
+11 -12
View File
@@ -62,7 +62,7 @@ export function formatAgeSec(sec: number): string {
* / 1 (: 맥미니 ).
*/
export type FlowMachine = 'gpu' | 'macmini' | 'macbook';
export type FlowMachine = 'nas' | 'macmini';
export interface FlowNodeDef {
key: string;
@@ -79,26 +79,25 @@ export interface FlowNodeDef {
/** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */
export const FLOW_NODES: FlowNodeDef[] = [
{ key: 'extract', label: '추출', stages: ['extract'], machine: 'gpu', engine: 'Surya OCR', sub: 'ocr-service' },
{ key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'gpu', engine: 'Marker', sub: 'marker-service' },
{ key: 'extract', label: '추출', stages: ['extract'], machine: 'nas', engine: 'kordoc', sub: 'kordoc' },
{ key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'nas', engine: 'Marker', sub: 'marker-service' },
{ key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' },
{ key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' },
{ key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'gpu', engine: 'TEI bge-m3', sub: 'text-embeddings-inference' },
{ key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macbook', engine: 'Qwen3.6-27B', sub: 'deep_summary' },
{ key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'nas', engine: 'bge-m3 (맥미니 콜)', sub: 'embed worker' },
{ key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'deep_summary' },
];
/** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */
export const AUX_NODES: FlowNodeDef[] = [
{ key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'gpu', engine: 'Playwright', sub: 'playwright-fetcher' },
{ key: 'stt', label: '전사', stages: ['stt'], machine: 'gpu', engine: 'Whisper', sub: 'stt-service' },
{ key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'gpu', engine: '유틸', sub: 'ffmpeg' },
{ key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'nas', engine: 'Playwright', sub: 'playwright-fetcher' },
{ key: 'stt', label: '전사', stages: ['stt'], machine: 'nas', engine: 'Whisper', sub: 'stt-service' },
{ key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'nas', engine: '유틸', sub: 'ffmpeg' },
];
/** 머신 스트립 메타 — 모델 표기 단일 지점 */
/** 머신 스트립 메타 — 모델 표기 단일 지점 (2026-07-02 컷오버: 나스+맥미니 2노드) */
export const MACHINE_META: Record<FlowMachine, { label: string; model: string }> = {
gpu: { label: 'GPU 서버', model: '특화 엔진' },
macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · 24/7' },
macbook: { label: '맥북 M5 Max', model: 'Qwen3.6-27B · 야간 drain' },
nas: { label: '나스', model: 'DS 본체 Docker · 특화 엔진' },
macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · bge-m3 · 24/7' },
};
/** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */
+3 -3
View File
@@ -72,7 +72,7 @@
// 처리 현황 스트립 (안6 라이트) — 60s 폴링 store 공유. fetch 실패/401 시
// store 가 null → 스트립 자체를 숨김 (silent 비차단, 로그인 페이지 동일).
let queue = $derived($queueOverview);
let queueMacbook = $derived(queue?.machines?.find((m) => m.key === 'macbook') ?? null);
let queueMacmini = $derived(queue?.machines?.find((m) => m.key === 'macmini') ?? null);
function toggleQueueDrawer() {
if (ui.isDrawerOpen('queue')) ui.closeDrawer();
else ui.openDrawer('queue');
@@ -189,8 +189,8 @@
</span>
<span class="tabular-nums shrink-0">대기 <strong class="text-text">{queue.totals.pending.toLocaleString()}</strong></span>
<span class="tabular-nums shrink-0 {queue.totals.failed > 0 ? 'text-error font-semibold' : ''}">실패 <strong class={queue.totals.failed > 0 ? '' : 'text-text'}>{queue.totals.failed.toLocaleString()}</strong></span>
{#if queueMacbook}
<span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(queueMacbook.state)}"> {MACHINE_STATE_LABEL[queueMacbook.state]}</span>
{#if queueMacmini}
<span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(queueMacmini.state)}">미니 {MACHINE_STATE_LABEL[queueMacmini.state]}</span>
{/if}
<span class="ml-auto flex items-center gap-0.5 text-faint shrink-0">자세히 <ChevronDown size={11} /></span>
</button>
+456
View File
@@ -0,0 +1,456 @@
"""presegment PR3 — HOLD 거대문서 유인 분할 CLI (plan ds-presegment-mapreduce-2).
deep_summary 워커가 HOLD(payload.presegment.awaiting_split=true) 보류한
hybrid/whole tier 거대문서를, 사람이(유인 클로드 세션) 경계를 완성해 재개시키는 도구.
사용법 (fastapi 컨테이너 안에서 실행):
docker compose exec fastapi python /app/scripts/presegment_attended.py list
docker compose exec fastapi python /app/scripts/presegment_attended.py export --doc 44443 --out /app/logs/preseg_44443
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json --dry-run
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json
워크플로우:
1. list awaiting_split 문서 확인.
2. export 문서 통계·hier 개요·자동 pack 유닛 제안·초과 섹션 본문 덤프·boundaries
템플릿 JSON 생성. 유인 클로드 세션은 파일들만 읽고 TODO 스팬을
CAP 이하 경계들로 분할해 템플릿을 완성한다.
3. apply 완성된 boundaries 검증(단조·비중첩·범위·커버리지 90%+·유닛 )
payload.presegment.units_override 기록 + awaiting_split 해제 +
deferred_until 제거(즉시 재개). 워커가 다음 사이클에 map-reduce 재개.
stdout 규약: 사람이 읽는 요약 + '{' 시작하는 기계 파싱용 JSON 라인(1 1라인).
사람용 행은 절대 '{' 시작하지 않는다.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "app"))
from sqlalchemy import text as sql_text # noqa: E402
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine # noqa: E402
from services.hier_decomp.builder import build_hier_tree # noqa: E402
from services.summarize_units import ( # noqa: E402
CAP_TOKENS,
OVERRIDE_MIN_COVERAGE_PCT,
TRIGGER_TOKENS,
choose_override_source,
estimate_tokens,
extract_leaves,
leaf_spans,
plan_summarize_units,
validate_override_boundaries,
)
# 초과 섹션 본문 덤프 분할 단위 (유인 세션 컨텍스트 보호)
DUMP_CHUNK_CHARS = 200_000
def _jsonl(obj: dict) -> None:
"""기계 파싱용 JSON 라인 — 반드시 '{' 로 시작하는 단독 라인."""
print(json.dumps(obj, ensure_ascii=False, default=str))
def _session_factory():
database_url = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://pkm:pkm@postgres:5432/pkm",
)
engine = create_async_engine(database_url)
return engine, async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# ─── list ────────────────────────────────────────────────────────────────────
LIST_SQL = """
SELECT q.id AS queue_id, q.document_id, q.status, q.attempts,
q.payload::text AS payload_text,
LEFT(COALESCE(d.title, '(제목 없음)'), 80) AS title
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.stage = 'deep_summary'
AND q.status IN ('pending', 'processing', 'failed')
AND (q.payload -> 'presegment' ->> 'awaiting_split') = 'true'
ORDER BY q.id
"""
async def cmd_list() -> int:
engine, factory = _session_factory()
try:
async with factory() as session:
rows = (await session.execute(sql_text(LIST_SQL))).mappings().all()
finally:
await engine.dispose()
print(f"awaiting_split 보류 문서 {len(rows)}")
for r in rows:
payload = json.loads(r["payload_text"] or "{}")
preseg = payload.get("presegment") or {}
oversized = preseg.get("oversized_sections") or []
print(
f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']} "
f"tier={preseg.get('tier')} over%={preseg.get('over_pct')} "
f"tokens={preseg.get('total_est_tokens'):,} units={preseg.get('units')}"
if isinstance(preseg.get("total_est_tokens"), int)
else f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']}"
)
print(f" 초과 섹션 {len(oversized)}건: {', '.join(str(t) for t in oversized[:3] if t)}")
print(
f" 보류 알람={preseg.get('alerted_at') or '-'} / "
f"재개 예정={payload.get('deferred_until') or '(즉시)'}"
+ (f" / 거부 사유={preseg.get('override_rejected')}" if preseg.get("override_rejected") else "")
)
_jsonl({
"cmd": "list",
"queue_id": r["queue_id"],
"doc_id": r["document_id"],
"title": r["title"],
"status": r["status"],
"tier": preseg.get("tier"),
"over_pct": preseg.get("over_pct"),
"total_est_tokens": preseg.get("total_est_tokens"),
"units": preseg.get("units"),
"oversized_sections": oversized,
"alerted_at": preseg.get("alerted_at"),
"deferred_until": payload.get("deferred_until"),
"override_rejected": preseg.get("override_rejected"),
})
return 0
# ─── export ──────────────────────────────────────────────────────────────────
def _safe_name(title: str | None, fallback: str) -> str:
t = re.sub(r"[^0-9A-Za-z가-힣._-]+", "_", (title or fallback)).strip("_")
return (t or fallback)[:60]
def _build_outline(text: str) -> str:
"""hier_decomp builder 재사용 — window-split 억제(요약 계획과 동일 환경) 개요."""
nodes = build_hier_tree(text, leaf_target_max=sys.maxsize, leaf_hard_max=sys.maxsize)
lines = []
for n in nodes:
indent = " " * max(n.level, 0)
title = n.section_title or "(preamble)"
tok = estimate_tokens(n.text)
mark = " [CAP 초과]" if n.is_leaf and tok > CAP_TOKENS else ""
lines.append(f"{indent}- L{n.level} {title}{tok:,} tok{mark}")
return "\n".join(lines)
async def cmd_export(doc_id: int, out_dir: str) -> int:
engine, factory = _session_factory()
try:
async with factory() as session:
row = (await session.execute(
sql_text(
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
),
{"d": doc_id},
)).mappings().first()
finally:
await engine.dispose()
if not row:
print(f"[error] 문서 id={doc_id} 없음")
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
return 1
source, text = choose_override_source(row["md_content"], row["extracted_text"])
if not text.strip():
print(f"[error] 문서 id={doc_id} 본문 비어있음 (md_content/extracted_text 둘 다)")
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "empty_text"})
return 1
plan = plan_summarize_units(text)
leaves = extract_leaves(text)
spans = leaf_spans(text, leaves)
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
files: list[str] = []
now_iso = datetime.now(timezone.utc).isoformat(timespec="seconds")
# ① 통계 + hier 개요
oversized_units = [u for u in plan.units if u.over_cap]
overview = [
f"# presegment export — doc {doc_id}",
"",
f"- 제목: {row['title'] or '(제목 없음)'}",
f"- source: {source} (len={len(text):,}자, 오프셋 기준 텍스트)",
f"- 추정 토큰: {plan.total_est_tokens:,} (trigger={TRIGGER_TOKENS:,} / cap={CAP_TOKENS:,})",
f"- plan: mode={plan.mode} tier={plan.tier} over%={plan.over_pct}",
f"- 유닛: 자동 pack {len(plan.units) - len(oversized_units)}개 + CAP 초과 {len(oversized_units)}",
f"- 생성: {now_iso}",
"",
"## 유닛 제안 (summarize_units greedy-pack)",
"",
]
for u in plan.units:
if not u.leaf_indexes:
continue
s = spans[u.leaf_indexes[0]][0]
e = spans[u.leaf_indexes[-1]][1]
titles = " · ".join(t for t in u.section_titles if t) or "(무제 구간)"
flag = " ★CAP 초과 — 분할 필요" if u.over_cap else ""
overview.append(f"- 유닛 {u.index}: [{s}, {e}) {u.est_tokens:,} tok — {titles[:120]}{flag}")
overview += ["", "## hier 개요", "", _build_outline(text), ""]
(out / "overview.md").write_text("\n".join(overview), encoding="utf-8")
files.append("overview.md")
# ③ 초과 섹션 본문 덤프 (섹션당 파일 · 200K자 단위 분할 · 파일명에 절대 스팬)
boundaries: list[dict] = []
for u in plan.units:
if not u.leaf_indexes:
continue
s = spans[u.leaf_indexes[0]][0]
e = spans[u.leaf_indexes[-1]][1]
title = next((t for t in u.section_titles if t), None)
if not u.over_cap:
# ④ 자동 pack 유닛은 템플릿에 채워둔다
boundaries.append({"start": s, "end": e, "title": title or f"유닛 {u.index}"})
continue
boundaries.append({
"start": s, "end": e, "title": title or f"유닛 {u.index}",
"todo": (
f"CAP 초과({u.est_tokens:,} tok > {CAP_TOKENS:,}) — 이 스팬을 cap 이하 "
"경계 여러 개로 교체하고 todo 키를 제거할 것"
),
})
seg = text[s:e]
base = f"oversized_{u.index:03d}_{_safe_name(title, f'unit{u.index}')}"
for k in range(0, len(seg), DUMP_CHUNK_CHARS):
cs, ce = s + k, s + min(k + DUMP_CHUNK_CHARS, len(seg))
fname = f"{base}.{cs}_{ce}.md"
# 본문 원문 그대로 (헤더 미부착 — 파일 내 로컬 오프셋 + 파일명 cs 로 절대 오프셋 계산)
(out / fname).write_text(text[cs:ce], encoding="utf-8")
files.append(fname)
# ④ boundaries 템플릿 JSON
template = {
"doc_id": doc_id,
"source": source,
"source_len": len(text),
"cap_tokens": CAP_TOKENS,
"generated_at": now_iso,
"boundaries": boundaries,
}
(out / "boundaries_template.json").write_text(
json.dumps(template, ensure_ascii=False, indent=2), encoding="utf-8"
)
files.append("boundaries_template.json")
# 유인 세션용 작업 안내
readme = f"""# doc {doc_id} 유인 분할 안내
1. overview.md 구조 파악 (유닛 제안 + hier 개요).
2. oversized_*.md 본문을 읽고 의미 경계를 정한다.
- 파일명 `..._<cs>_<ce>.md` cs = 파일 문자의 절대 오프셋.
- 절대 오프셋 = cs + 파일 로컬 오프셋.
3. boundaries_template.json todo 항목을 cap({CAP_TOKENS:,} tok) 이하 경계 여러 개로
교체하고 todo 키를 제거한다. 나머지 자동 pack 항목은 그대로 둬도 된다.
- 토큰 추정: 한글 0.529 tok/ · 기타 0.217 tok/ (services/summarize_units.py).
- 규칙: start 단조증가 · 비중첩 · 전체 커버리지 {OVERRIDE_MIN_COVERAGE_PCT:.0f}%+ · 유닛당 cap 이하.
4. 검증/적용:
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE> --dry-run
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE>
"""
(out / "README.md").write_text(readme, encoding="utf-8")
files.append("README.md")
print(f"doc {doc_id} [{row['title'] or '(제목 없음)'}] export 완료 → {out}")
print(
f" source={source} len={len(text):,}자 tokens={plan.total_est_tokens:,} "
f"tier={plan.tier} over%={plan.over_pct}"
)
print(f" 자동 pack 유닛 {len(plan.units) - len(oversized_units)}개 / TODO(초과) {len(oversized_units)}")
print(f" 파일 {len(files)}개: {', '.join(files[:6])}{' ...' if len(files) > 6 else ''}")
_jsonl({
"cmd": "export", "ok": True, "doc_id": doc_id, "out": str(out),
"source": source, "source_len": len(text),
"total_est_tokens": plan.total_est_tokens, "tier": plan.tier,
"over_pct": plan.over_pct,
"units_auto": len(plan.units) - len(oversized_units),
"units_todo": len(oversized_units),
"files": files,
})
return 0
# ─── apply ───────────────────────────────────────────────────────────────────
QUEUE_ROW_SQL = """
SELECT id, status, attempts, payload::text AS payload_text
FROM processing_queue
WHERE document_id = :d AND stage = 'deep_summary'
AND status IN ('pending', 'processing', 'failed')
ORDER BY id DESC
LIMIT 1
"""
APPLY_UPDATE_SQL = """
UPDATE processing_queue
SET payload = CAST(:payload AS JSONB),
status = 'pending',
attempts = 0,
error_message = NULL
WHERE id = :qid
"""
async def cmd_apply(doc_id: int, boundaries_file: str, dry_run: bool) -> int:
raw = json.loads(Path(boundaries_file).read_text(encoding="utf-8"))
if isinstance(raw, dict):
boundaries = raw.get("boundaries") or []
declared_source = raw.get("source")
declared_len = raw.get("source_len")
if raw.get("doc_id") not in (None, doc_id):
print(f"[error] boundaries 파일 doc_id={raw.get('doc_id')} != --doc {doc_id}")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "doc_id_mismatch"})
return 1
else:
boundaries, declared_source, declared_len = raw, None, None
engine, factory = _session_factory()
try:
async with factory() as session:
doc = (await session.execute(
sql_text(
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
),
{"d": doc_id},
)).mappings().first()
if not doc:
print(f"[error] 문서 id={doc_id} 없음")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
return 1
qrow = (await session.execute(sql_text(QUEUE_ROW_SQL), {"d": doc_id})).mappings().first()
if not qrow:
print(f"[error] doc {doc_id} 의 활성 deep_summary 큐 행 없음 (pending/processing/failed)")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_row_not_found"})
return 1
if qrow["status"] == "processing":
print(f"[error] queue {qrow['id']} 가 processing 중 — 워커 완료/보류 후 재시도")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_processing"})
return 1
# 오프셋 기준 텍스트 — export 와 동일 규칙 (파일에 source 선언 시 그 선언 우선)
if declared_source in ("md_content", "extracted_text"):
source = declared_source
text = (doc["md_content"] if source == "md_content" else doc["extracted_text"]) or ""
else:
source, text = choose_override_source(doc["md_content"], doc["extracted_text"])
if declared_len is not None and declared_len != len(text):
print(
f"[error] source_len 불일치 — 파일={declared_len:,} vs 현재 {source}={len(text):,}"
" (본문 재생성됨 — export 부터 재실행)"
)
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "source_len_mismatch"})
return 1
check = validate_override_boundaries(text, boundaries)
for w in check.warnings:
print(f" [warn] {w}")
if not check.ok:
print(f"[error] 경계 검증 실패 — {len(check.errors)}건:")
for e in check.errors:
print(f" - {e}")
_jsonl({
"cmd": "apply", "ok": False, "doc_id": doc_id,
"error": "validation_failed", "errors": check.errors,
"warnings": check.warnings, "coverage_pct": check.coverage_pct,
})
return 1
print(
f"doc {doc_id} [{doc['title'] or '(제목 없음)'}] 경계 검증 통과 — "
f"유닛 {len(check.boundaries)}개 / 커버리지 {check.coverage_pct}% / "
f"최대 유닛 {max(check.unit_tokens):,} tok (cap {CAP_TOKENS:,})"
)
for i, ((s, e, t), tok) in enumerate(zip(check.boundaries, check.unit_tokens)):
print(f" 유닛 {i}: [{s}, {e}) {tok:,} tok — {t or '(무제)'}")
payload = json.loads(qrow["payload_text"] or "{}")
preseg = dict(payload.get("presegment") or {})
preseg["units_override"] = {
"source": source,
"source_len": len(text),
"boundaries": [[s, e, t] for s, e, t in check.boundaries],
"applied_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
}
preseg["awaiting_split"] = False
# 알람 dedupe 리셋(다음 이벤트는 신선하게 발화) + 이전 거부/맵 잔재 제거
for k in ("alerted_at", "override_rejected", "override_rejected_at", "map_results"):
preseg.pop(k, None)
payload["presegment"] = preseg
payload.pop("deferred_until", None) # 즉시 재개
if dry_run:
print(f" [dry-run] queue {qrow['id']} 미변경 — 위 경계로 적용 가능")
_jsonl({
"cmd": "apply", "ok": True, "dry_run": True, "doc_id": doc_id,
"queue_id": qrow["id"], "units": len(check.boundaries),
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
})
return 0
await session.execute(
sql_text(APPLY_UPDATE_SQL),
{"payload": json.dumps(payload, ensure_ascii=False), "qid": qrow["id"]},
)
await session.commit()
print(
f" 적용 완료 — queue {qrow['id']} status=pending, deferred_until 제거 "
f"(다음 queue_consumer 사이클에 재개)"
)
_jsonl({
"cmd": "apply", "ok": True, "dry_run": False, "doc_id": doc_id,
"queue_id": qrow["id"], "units": len(check.boundaries),
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
})
return 0
finally:
await engine.dispose()
# ─── main ────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(description="presegment 유인 분할 CLI (PR3)")
sub = parser.add_subparsers(dest="cmd", required=True)
sub.add_parser("list", help="awaiting_split 보류 문서 목록")
p_export = sub.add_parser("export", help="유인 분할 작업 패키지 덤프")
p_export.add_argument("--doc", type=int, required=True)
p_export.add_argument("--out", default=None, help="출력 디렉토리 (기본 ./preseg_export_<doc>)")
p_apply = sub.add_parser("apply", help="완성된 boundaries 검증·적용(재개)")
p_apply.add_argument("--doc", type=int, required=True)
p_apply.add_argument("--boundaries", required=True, help="boundaries JSON 파일 경로")
p_apply.add_argument("--dry-run", action="store_true", help="검증만 하고 DB 미변경")
args = parser.parse_args()
if args.cmd == "list":
return asyncio.run(cmd_list())
if args.cmd == "export":
out = args.out or f"./preseg_export_{args.doc}"
return asyncio.run(cmd_export(args.doc, out))
if args.cmd == "apply":
return asyncio.run(cmd_apply(args.doc, args.boundaries, args.dry_run))
return 2
if __name__ == "__main__":
sys.exit(main())
+486
View File
@@ -0,0 +1,486 @@
"""presegment PR3 — HOLD 알람·units_override 재개·유인 분할 경계 검증 단위테스트.
test_deep_summary_mapreduce.py (PR2) 선례를 따르는 seam 단위 검증:
- services.alerts.send_alert: env 미설정 no-op(프로세스당 1 로그)·synochat/ntfy
포맷·실패 절대 raise 금지 (webhook 전부 fake 실호출 0).
- _hold_awaiting_split: 알람 발화 + alerted_at dedupe(7).
- validate_override_boundaries: 정상/중첩/캡초과/커버리지 부족/TODO 잔존/범위 .
- process(): units_override 존재 tier 판정(plan_summarize_units) 우회
map-reduce 재개 / 잘못된 override -HOLD + 알람 / override 없는 문서 무회귀.
"""
from __future__ import annotations
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import httpx # noqa: E402
import services.alerts as alerts # noqa: E402
from ai.envelope import EscalationEnvelope # noqa: E402
from models.queue import StageDeferred # noqa: E402
from services.summarize_units import ( # noqa: E402
CAP_TOKENS,
choose_override_source,
estimate_tokens,
extract_leaves,
leaf_spans,
plan_summarize_units,
units_from_boundaries,
validate_override_boundaries,
)
import workers.deep_summary_worker as dsw # noqa: E402
# ─── fixtures (PR2 테스트와 동일 계열) ───────────────────────────────────────
# 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole (HOLD 대상)
GIANT_WHOLE_MD = "# 통짜\n" + ("" * 60_000)
# trigger(25K tok) 이하 소형 문서 — 기존 단일콜 경로 (무회귀 확인용)
SMALL_MD = "# 소형\n" + ("" * 2_000)
MAP_JSON = (
'{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",'
' "inconsistencies": [], "confidence": 0.9}'
)
REDUCE_JSON = (
'{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",'
' "inconsistencies": [], "confidence": 0.8}'
)
class FakeSession:
def __init__(self, row=None):
self.commits = 0
self._row = row
async def commit(self):
self.commits += 1
class FakeProcSession(FakeSession):
"""process() 레벨 — session.get(Document) + execute(select queue_row) fake."""
def __init__(self, doc, row):
super().__init__(row)
self._doc = doc
async def get(self, model, pk):
return self._doc
async def execute(self, stmt):
row = self._row
return SimpleNamespace(scalar_one_or_none=lambda: row)
class FakeClient:
"""deep 슬롯 보유 — call_deep_or_defer 가 call_deep 을 탄다 (PR2 테스트 동일)."""
def __init__(self):
self.ai = SimpleNamespace(
deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000)
)
self.prompts: list[str] = []
async def call_deep(self, prompt: str, system=None) -> str:
self.prompts.append(prompt)
if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커
return REDUCE_JSON
return MAP_JSON
async def close(self):
pass
def _doc(text: str = GIANT_WHOLE_MD, md_content: str | None = None):
return SimpleNamespace(
id=999,
title="테스트 문서",
extracted_text=text,
md_content=md_content,
ai_detail_summary=None,
ai_inconsistencies=None,
ai_analysis_tier="triage",
ai_processed_at=None,
)
def _envelope_raw():
return {
"from_stage": "classify",
"escalation_reasons": ["long_context"],
"risk_flags": [],
"distilled_context": "4B 요지",
"original_pointers": {"doc_ids": [999]},
}
def _envelope():
return EscalationEnvelope.from_json(json.dumps(_envelope_raw()))
@pytest.fixture
def _patch_telemetry(monkeypatch):
events: list[dict] = []
async def fake_record(**kwargs):
events.append(kwargs)
monkeypatch.setattr(dsw, "record_analyze_event", fake_record)
return events
@pytest.fixture
def _alert_recorder(monkeypatch):
calls: list[tuple[str, str]] = []
async def fake_alert(title: str, message: str) -> bool:
calls.append((title, message))
return True
monkeypatch.setattr(dsw, "send_alert", fake_alert)
return calls
# ─── A. services.alerts ──────────────────────────────────────────────────────
class _FakeHttpxClient:
"""httpx.AsyncClient 대체 — post 호출 캡처. 실호출 0."""
captured: list[dict] = []
fail_with: Exception | None = None
status_code = 200
def __init__(self, timeout=None):
self.timeout = timeout
async def __aenter__(self):
return self
async def __aexit__(self, *exc):
return False
async def post(self, url, **kwargs):
if _FakeHttpxClient.fail_with is not None:
raise _FakeHttpxClient.fail_with
_FakeHttpxClient.captured.append({"url": url, **kwargs})
return SimpleNamespace(status_code=_FakeHttpxClient.status_code, text="ok")
@pytest.fixture
def _fake_httpx(monkeypatch):
_FakeHttpxClient.captured = []
_FakeHttpxClient.fail_with = None
_FakeHttpxClient.status_code = 200
monkeypatch.setattr(alerts.httpx, "AsyncClient", _FakeHttpxClient)
return _FakeHttpxClient
@pytest.mark.asyncio
async def test_send_alert_noop_without_env(monkeypatch, _fake_httpx):
monkeypatch.delenv("ALERT_WEBHOOK_URL", raising=False)
monkeypatch.setattr(alerts, "_noop_logged", False)
infos: list[str] = []
monkeypatch.setattr(alerts.logger, "info", lambda msg, *a, **k: infos.append(msg))
assert await alerts.send_alert("t", "m") is False
assert await alerts.send_alert("t", "m") is False
assert len(infos) == 1 # 프로세스당 1회만 로그
assert _fake_httpx.captured == [] # webhook 미호출
@pytest.mark.asyncio
async def test_send_alert_synochat_format(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
monkeypatch.delenv("ALERT_WEBHOOK_KIND", raising=False) # 기본 = synochat
assert await alerts.send_alert("제목", "본문 줄1\n줄2") is True
assert len(_fake_httpx.captured) == 1
call = _fake_httpx.captured[0]
assert call["url"] == "http://chat.local/webhook"
payload = json.loads(call["data"]["payload"])
assert payload == {"text": "제목\n본문 줄1\n줄2"}
@pytest.mark.asyncio
async def test_send_alert_ntfy_format(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://ntfy.local/ds")
monkeypatch.setenv("ALERT_WEBHOOK_KIND", "ntfy")
assert await alerts.send_alert("한글 제목", "알람 본문") is True
call = _fake_httpx.captured[0]
# 한글 제목은 헤더(latin-1 한정) 대신 query param
assert call["params"] == {"title": "한글 제목"}
assert call["content"] == "알람 본문".encode("utf-8")
@pytest.mark.asyncio
async def test_send_alert_failure_never_raises(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
_fake_httpx.fail_with = httpx.ConnectError("down")
assert await alerts.send_alert("t", "m") is False # raise 없이 False
_fake_httpx.fail_with = None
_fake_httpx.status_code = 500
assert await alerts.send_alert("t", "m") is False # HTTP 5xx 도 False
# ─── B. HOLD 알람 + alerted_at dedupe ───────────────────────────────────────
@pytest.mark.asyncio
async def test_hold_alerts_once_then_dedupes(_alert_recorder):
plan = plan_summarize_units(GIANT_WHOLE_MD)
assert plan.tier == "whole"
row = SimpleNamespace(payload={"envelope": {"x": 1}})
session = FakeSession()
# 1차 HOLD — 알람 발화 + alerted_at 기록
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 1
title, message = _alert_recorder[0]
assert "999" in title
assert "테스트 문서" in message and "whole" in message
assert f"export --doc 999" in message # 유인 분할 힌트
alerted_at = row.payload["presegment"]["alerted_at"]
assert datetime.fromisoformat(alerted_at)
# 2차 재보류(24h 후 재계획 시나리오) — 7일 이내 → 재알람 억제
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 1
assert row.payload["presegment"]["alerted_at"] == alerted_at # 미갱신
# 3차 — alerted_at 이 8일 전이면 재발화 + 갱신
stale = (datetime.now(timezone.utc) - timedelta(days=8)).isoformat()
payload = dict(row.payload)
payload["presegment"] = {**payload["presegment"], "alerted_at": stale}
row.payload = payload
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 2
assert row.payload["presegment"]["alerted_at"] != stale
# ─── C. validate_override_boundaries ────────────────────────────────────────
def test_validate_ok_full_coverage():
text = "" * 40_000 # ≈ 21,160 tok
bounds = [[0, 20_000, "전반"], [20_000, 40_000, "후반"]]
check = validate_override_boundaries(text, bounds)
assert check.ok and check.errors == []
assert check.coverage_pct == 100.0
assert check.boundaries == [(0, 20_000, "전반"), (20_000, 40_000, "후반")]
assert all(t <= CAP_TOKENS for t in check.unit_tokens)
def test_validate_dict_form_and_units_from_boundaries():
text = "" * 10_000
bounds = [{"start": 0, "end": 5_000, "title": "a"}, {"start": 5_000, "end": 10_000, "title": "b"}]
check = validate_override_boundaries(text, bounds)
assert check.ok
units = units_from_boundaries(text, check.boundaries)
assert [u.index for u in units] == [0, 1]
assert units[0].text == text[0:5_000]
assert units[0].section_titles == ["a"]
assert units[0].est_tokens == estimate_tokens(text[0:5_000])
def test_validate_rejects_overlap():
text = "" * 10_000
check = validate_override_boundaries(text, [[0, 6_000, "a"], [5_000, 10_000, "b"]])
assert not check.ok
assert any("중첩" in e for e in check.errors)
def test_validate_rejects_cap_exceed():
text = "" * 40_000 # 단일 유닛 ≈ 21,160 tok > CAP 12,000
check = validate_override_boundaries(text, [[0, 40_000, "통짜"]])
assert not check.ok
assert any("cap" in e and "유닛 0" in e for e in check.errors) # 어느 유닛인지 명시
def test_validate_rejects_low_coverage():
text = "" * 10_000
check = validate_override_boundaries(text, [[0, 1_000, "머리만"]])
assert not check.ok
assert any("커버리지" in e for e in check.errors)
def test_validate_rejects_out_of_range_and_todo():
text = "" * 1_000
check = validate_override_boundaries(text, [[0, 2_000, ""]])
assert not check.ok and any("범위 밖" in e for e in check.errors)
check2 = validate_override_boundaries(
text, [{"start": 0, "end": 1_000, "title": "t", "todo": "분할 필요"}]
)
assert not check2.ok and any("TODO" in e for e in check2.errors)
def test_validate_warns_on_gap_but_passes_coverage():
text = "" * 100_000
# 4% 공백 구간 — 커버리지 96% (>=90) 통과 + 경고
bounds = [[0, 20_000, "a"], [24_000, 100_000, None]]
# 24000~100000 = 76000자 ≈ 40,204 tok > CAP → cap 완화해 gap 경고만 검증
check = validate_override_boundaries(text, bounds, cap=50_000)
assert check.ok
assert any("공백 구간" in w for w in check.warnings)
def test_choose_override_source_prefers_md_content():
assert choose_override_source("# md 본문", "추출본") == ("md_content", "# md 본문")
assert choose_override_source(" \n", "추출본") == ("extracted_text", "추출본")
assert choose_override_source(None, None) == ("extracted_text", "")
def test_leaf_spans_reconstruct_source():
md = "# 절 1\n" + "" * 100 + "\n# 절 2\n" + "" * 100
leaves = extract_leaves(md)
spans = leaf_spans(md, leaves)
assert len(spans) == len(leaves)
for (s, e), leaf in zip(spans, leaves):
assert md[s:e] == leaf.text
# 인접 파티션 — 이어붙이면 원문 전체
assert spans[0][0] == 0 and spans[-1][1] == len(md)
# ─── D. units_override 재개 경로 (worker process 레벨) ──────────────────────
def _override_payload(text: str, n_units: int = 3, source: str = "extracted_text") -> dict:
step = len(text) // n_units
bounds = []
for i in range(n_units):
s = i * step
e = len(text) if i == n_units - 1 else (i + 1) * step
bounds.append([s, e, f"파트 {i + 1}"])
return {
"envelope": _envelope_raw(),
"subject_domain": "generic",
"presegment": {
"tier": "whole", "over_pct": 61.46, "awaiting_split": False,
"units_override": {
"source": source, "source_len": len(text), "boundaries": bounds,
},
},
}
@pytest.mark.asyncio
async def test_units_override_bypasses_tier_gate(monkeypatch, _patch_telemetry, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD) # override 없으면 whole → HOLD 였을 문서
row = SimpleNamespace(payload=_override_payload(GIANT_WHOLE_MD, n_units=3))
session = FakeProcSession(doc, row)
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
def _boom(*a, **k):
raise AssertionError("units_override 는 plan_summarize_units(tier 재판정)를 타면 안 됨")
monkeypatch.setattr(dsw, "plan_summarize_units", _boom)
await dsw.process(999, session)
# map 3 + reduce 1, 모든 콜 캡 준수 (오버헤드 = 정책 템플릿+envelope ~3K)
assert len(client.prompts) == 4
for p in client.prompts:
assert estimate_tokens(p) <= CAP_TOKENS + 3_000
assert doc.ai_detail_summary == "최종 상세."
assert doc.ai_analysis_tier == "deep"
preseg = row.payload["presegment"]
assert preseg["tier"] == "override"
assert preseg["over_pct"] == 61.46 # HOLD 당시 실측치 보존
assert len(preseg["map_results"]) == 3
assert preseg["units_override"]["boundaries"][0][2] == "파트 1" # override 보존(감사)
assert _alert_recorder == [] # 정상 재개 — 알람 없음
assert len(_patch_telemetry) == 1
@pytest.mark.asyncio
async def test_bad_override_reholds_and_alerts(monkeypatch, _patch_telemetry, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD)
payload = _override_payload(GIANT_WHOLE_MD, n_units=1) # 단일 유닛 ≈ 31.7K tok > CAP*1.1
row = SimpleNamespace(payload=payload)
session = FakeProcSession(doc, row)
def _no_llm():
raise AssertionError("잘못된 override 는 LLM 콜(900s 재생산)로 흐르면 안 됨")
monkeypatch.setattr(dsw, "AIClient", _no_llm)
with pytest.raises(StageDeferred) as ei:
await dsw.process(999, session)
assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES
preseg = row.payload["presegment"]
assert preseg["awaiting_split"] is True # 재-HOLD
assert "cap" in preseg["override_rejected"]
assert "units_override" in preseg # 원인 조사용 보존
assert len(_alert_recorder) == 1 # 거부 알람
assert "거부" in _alert_recorder[0][0]
assert doc.ai_detail_summary is None
assert _patch_telemetry == []
@pytest.mark.asyncio
async def test_override_source_len_mismatch_reholds(monkeypatch, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD)
payload = _override_payload(GIANT_WHOLE_MD, n_units=3)
payload["presegment"]["units_override"]["source_len"] = 12_345 # 본문 재생성 시나리오
row = SimpleNamespace(payload=payload)
session = FakeProcSession(doc, row)
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
with pytest.raises(StageDeferred):
await dsw.process(999, session)
assert row.payload["presegment"]["awaiting_split"] is True
assert "source_len 불일치" in row.payload["presegment"]["override_rejected"]
assert len(_alert_recorder) == 1
@pytest.mark.asyncio
async def test_no_override_small_doc_keeps_single_call_path(
monkeypatch, _patch_telemetry, _alert_recorder
):
"""무회귀 — units_override 없는 소형 문서는 기존 단일콜 경로 그대로."""
doc = _doc(SMALL_MD)
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
session = FakeProcSession(doc, row)
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
await dsw.process(999, session)
assert len(client.prompts) == 1 # 단일콜 (map-reduce 아님)
assert SMALL_MD[:200].split("\n")[1][:50] in client.prompts[0] # 원문 슬라이스 포함
assert doc.ai_detail_summary == "유닛 상세."
assert "presegment" not in row.payload # payload 무변경
assert _alert_recorder == []
@pytest.mark.asyncio
async def test_no_override_whole_doc_still_holds_with_alert(
monkeypatch, _patch_telemetry, _alert_recorder
):
"""override 미주입 whole 문서 — 기존 HOLD 시멘틱 유지 + PR3 알람만 추가."""
doc = _doc(GIANT_WHOLE_MD)
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
session = FakeProcSession(doc, row)
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
with pytest.raises(StageDeferred):
await dsw.process(999, session)
preseg = row.payload["presegment"]
assert preseg["awaiting_split"] is True and preseg["tier"] == "whole"
assert len(_alert_recorder) == 1
assert "HOLD" in _alert_recorder[0][0]
+66 -156
View File
@@ -4,6 +4,8 @@ services/queue_overview 의 SQL 수집부와 분리된 순수 판정 함수
(stage_machine_map / build_machines / build_summarize_eta / build_trend /
build_totals / compute_eta_minutes / rows_to_* / display_title)
mock 행으로 검증한다. 통합( SQL) 배포 라이브 smoke 확인.
2026-07-02 컷오버 2노드(나스+맥미니) 기준 3노드 레인은 제거됨.
"""
from datetime import datetime
@@ -18,7 +20,6 @@ from services.queue_overview import (
compute_eta_minutes,
display_title,
rows_to_stage_stats,
rows_to_summarize_split,
stage_machine_map,
)
@@ -36,186 +37,115 @@ def _stage(**kw) -> dict:
return base
def _split(macbook: dict | None = None, macmini: dict | None = None) -> dict:
"""summarize 풀 완료 실적 split — 미지정 0."""
zero = {"done_1h": 0, "done_today": 0, "done_15m": 0}
return {
"macbook": {**zero, **(macbook or {})},
"macmini": {**zero, **(macmini or {})},
}
def _machine(machines: list[dict], key: str) -> dict:
return next(m for m in machines if m["key"] == key)
# ─── stage→machine 귀속 맵 ────────────────────────────────────────────────────
def test_stage_machine_map_deep_enabled():
smap = stage_machine_map(deep_enabled=True)
def test_stage_machine_map_two_nodes():
smap = stage_machine_map()
for s in ("extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt"):
assert smap[s] == "gpu"
assert smap[s] == "nas"
assert smap["classify"] == "macmini"
assert smap["summarize"] == "macmini"
assert smap["deep_summary"] == "macbook"
def test_stage_machine_map_deep_disabled():
"""deep 슬롯 부재 시 deep_summary 도 macmini 귀속."""
smap = stage_machine_map(deep_enabled=False)
assert smap["deep_summary"] == "macmini"
# ─── 머신 카드 귀속 합산 ──────────────────────────────────────────────────────
def test_gpu_stage_counts_attribution():
def test_nas_stage_counts_attribution():
stats = {
"extract": _stage(pending=3, processing=1, done_1h=5, done_today=9, done_15m=1),
"stt": _stage(failed=2, done_1h=1, done_today=2),
}
machines = build_machines(stats, _split(), [], deep_enabled=True)
gpu = _machine(machines, "gpu")
assert (gpu["pending"], gpu["processing"], gpu["failed"]) == (3, 1, 2)
assert (gpu["done_1h"], gpu["done_today"]) == (6, 11)
# gpu 의 stages 는 정적 8종 전부 (집계 0 이어도 표시)
assert gpu["stages"] == [
machines = build_machines(stats, [])
nas = _machine(machines, "nas")
assert (nas["pending"], nas["processing"], nas["failed"]) == (3, 1, 2)
assert (nas["done_1h"], nas["done_today"]) == (6, 11)
# nas 의 stages 는 정적 8종 전부 (집계 0 이어도 표시)
assert nas["stages"] == [
"extract", "embed", "chunk", "markdown",
"preview", "thumbnail", "fulltext", "stt",
]
def test_summarize_pool_split_attribution():
"""summarize pending/failed = macmini 귀속, 완료 실적은 split 로 분리 —
stage-level summarize done 수치는 카드에 이중 합산되지 않는다."""
def test_macmini_llm_stages_attribution():
"""classify/summarize/deep_summary 전부 macmini 귀속 (단일 생성 LLM 허브)."""
stats = {
"classify": _stage(done_1h=2, done_today=3),
"summarize": _stage(pending=7, failed=1, done_1h=10, done_today=20),
"deep_summary": _stage(pending=2, processing=1, done_1h=3, done_today=4),
}
split = _split(macbook={"done_1h": 4, "done_today": 8}, macmini={"done_1h": 6, "done_today": 12})
machines = build_machines(stats, split, [], deep_enabled=True)
machines = build_machines(stats, [])
macmini = _machine(machines, "macmini")
macbook = _machine(machines, "macbook")
assert macmini["pending"] == 7 and macmini["failed"] == 1
assert macmini["done_1h"] == 2 + 6 # classify + macmini 몫 (10 아님)
assert macmini["done_today"] == 3 + 12
assert macbook["done_1h"] == 4 and macbook["done_today"] == 8
assert macbook["pending"] == 0 # 풀 pending 은 macmini 만
assert macmini["pending"] == 9 and macmini["failed"] == 1
assert macmini["processing"] == 1
assert macmini["done_1h"] == 2 + 10 + 3
assert macmini["done_today"] == 3 + 20 + 4
assert macmini["stages"] == ["classify", "summarize", "deep_summary"]
assert _machine(machines, "nas")["pending"] == 0
def test_summarize_by_machine_projection():
"""build_summarize_by_machine = split 의 done_1h/done_today 를 머신별로 투영
(done_15m 제외 내부 state 판정 전용)."""
from services.queue_overview import build_summarize_by_machine
split = _split(
macbook={"done_1h": 226, "done_today": 312, "done_15m": 60},
macmini={"done_1h": 37, "done_today": 94, "done_15m": 9},
)
sbm = build_summarize_by_machine(split)
assert sbm == {
"macmini": {"done_1h": 37, "done_today": 94},
"macbook": {"done_1h": 226, "done_today": 312},
}
assert "done_15m" not in sbm["macbook"]
def test_compose_overview_includes_summarize_by_machine():
"""compose_overview 응답 계약에 summarize_by_machine 포함 (FE 레인 분담 재료)."""
now_kst = datetime(2026, 6, 13, 13, 0, tzinfo=KST)
stats = {"summarize": _stage(pending=1317, done_1h=264)}
split = _split(macbook={"done_1h": 226, "done_today": 312}, macmini={"done_1h": 37, "done_today": 94})
ov = compose_overview(stats, split, {}, {}, [], deep_enabled=True, now_kst=now_kst)
assert ov["summarize_by_machine"]["macbook"]["done_1h"] == 226
assert ov["summarize_by_machine"]["macmini"]["done_today"] == 94
def test_deep_disabled_deep_summary_counts_to_macmini():
stats = {"deep_summary": _stage(pending=2, processing=1, done_1h=3, done_today=4)}
machines = build_machines(stats, _split(), [], deep_enabled=False)
macmini = _machine(machines, "macmini")
macbook = _machine(machines, "macbook")
assert macmini["pending"] == 2 and macmini["processing"] == 1
assert macmini["done_1h"] == 3 and macmini["done_today"] == 4
assert macbook["stages"] == [] and macbook["pending"] == 0
assert _machine(machines, "macmini")["stages"] == ["classify", "summarize", "deep_summary"]
def test_deferred_pending_always_on_macbook_card():
"""보류(deferred_until 미래)는 summarize+deep_summary 합산으로 macbook 카드 귀속.
deep 슬롯 유무와 무관 (보류 = 맥북 불가 신호)."""
def test_deferred_pending_on_macmini_card():
"""보류(deferred_until 미래)는 summarize+deep_summary 합산으로 macmini 카드 귀속
(보류 = LLM 백오프 신호)."""
stats = {
"summarize": _stage(pending=5, deferred_pending=2),
"deep_summary": _stage(pending=1, deferred_pending=1),
}
for deep_enabled in (True, False):
machines = build_machines(stats, _split(), [], deep_enabled=deep_enabled)
assert _machine(machines, "macbook")["deferred_pending"] == 3
assert _machine(machines, "gpu")["deferred_pending"] == 0
assert _machine(machines, "macmini")["deferred_pending"] == 0
machines = build_machines(stats, [])
assert _machine(machines, "macmini")["deferred_pending"] == 3
assert _machine(machines, "nas")["deferred_pending"] == 0
# ─── state 판정 ───────────────────────────────────────────────────────────────
def test_macbook_state_active_wins_over_deferred_while_working():
def test_macmini_state_active_wins_over_deferred_while_working():
"""가동 > 보류 (사용자 피드백 2026-06-11): 일하고 있으면 백오프 잔여가 있어도 '가동'.
보류 건수는 deferred_pending 필드가 별도로 전달 카드 라인이 표시.
"""
stats = {"summarize": _stage(pending=1, deferred_pending=1)}
split = _split(macbook={"done_15m": 3})
machines = build_machines(stats, split, [], deep_enabled=True)
mb = _machine(machines, "macbook")
assert mb["state"] == "active"
assert mb["deferred_pending"] == 1
stats = {"summarize": _stage(pending=1, deferred_pending=1, done_15m=3)}
machines = build_machines(stats, [])
mm = _machine(machines, "macmini")
assert mm["state"] == "active"
assert mm["deferred_pending"] == 1
def test_macbook_state_deferred_only_when_not_working():
def test_macmini_state_deferred_only_when_not_working():
"""일이 멈춰 있고(처리 0·최근 완료 0) 백오프만 쌓인 상태에서만 '보류'."""
stats = {"summarize": _stage(pending=1, deferred_pending=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "macbook")["state"] == "deferred"
machines = build_machines(stats, [])
assert _machine(machines, "macmini")["state"] == "deferred"
def test_macbook_state_active_on_recent_qwen_done():
split = _split(macbook={"done_15m": 1})
machines = build_machines({}, split, [], deep_enabled=True)
assert _machine(machines, "macbook")["state"] == "active"
def test_macbook_state_idle():
machines = build_machines({}, _split(), [], deep_enabled=True)
assert _machine(machines, "macbook")["state"] == "idle"
def test_gpu_state_active_on_processing():
stats = {"extract": _stage(processing=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "active"
def test_gpu_state_active_on_recent_done():
stats = {"embed": _stage(done_15m=2)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "active"
def test_gpu_state_idle_when_old_done_only():
stats = {"embed": _stage(done_1h=5, done_today=9)} # 15분 내 완료 없음
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "idle"
def test_macmini_state_not_active_on_macbook_pool_done():
"""summarize 풀 완료가 전부 macbook 몫이면 macmini 는 active 아님 (귀속 기준)."""
stats = {"summarize": _stage(done_15m=1)}
split = _split(macbook={"done_15m": 1})
machines = build_machines(stats, split, [], deep_enabled=True)
def test_macmini_state_idle():
machines = build_machines({}, [])
assert _machine(machines, "macmini")["state"] == "idle"
def test_nas_state_active_on_processing():
stats = {"extract": _stage(processing=1)}
machines = build_machines(stats, [])
assert _machine(machines, "nas")["state"] == "active"
def test_nas_state_active_on_recent_done():
stats = {"embed": _stage(done_15m=2)}
machines = build_machines(stats, [])
assert _machine(machines, "nas")["state"] == "active"
def test_nas_state_idle_when_old_done_only():
stats = {"embed": _stage(done_1h=5, done_today=9)} # 15분 내 완료 없음
machines = build_machines(stats, [])
assert _machine(machines, "nas")["state"] == "idle"
def test_macmini_state_active_on_summarize_processing():
stats = {"summarize": _stage(processing=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
machines = build_machines(stats, [])
assert _machine(machines, "macmini")["state"] == "active"
@@ -228,21 +158,18 @@ def test_current_summarize_to_macmini_max_two():
{"stage": "summarize", "document_id": 3, "title": "문서C", "original_filename": None, "file_path": None},
{"stage": "extract", "document_id": 4, "title": "문서D", "original_filename": None, "file_path": None},
]
machines = build_machines({}, _split(), rows, deep_enabled=True)
machines = build_machines({}, rows)
macmini = _machine(machines, "macmini")
gpu = _machine(machines, "gpu")
nas = _machine(machines, "nas")
assert [c["document_id"] for c in macmini["current"]] == [1, 2] # 최대 2건
assert macmini["current"][0] == {"document_id": 1, "title": "문서A", "stage": "summarize"}
assert [c["document_id"] for c in gpu["current"]] == [4]
assert _machine(machines, "macbook")["current"] == []
assert [c["document_id"] for c in nas["current"]] == [4]
def test_current_deep_summary_follows_deep_slot():
def test_current_deep_summary_to_macmini():
rows = [{"stage": "deep_summary", "document_id": 9, "title": "심층", "original_filename": None, "file_path": None}]
enabled = build_machines({}, _split(), rows, deep_enabled=True)
disabled = build_machines({}, _split(), rows, deep_enabled=False)
assert _machine(enabled, "macbook")["current"][0]["document_id"] == 9
assert _machine(disabled, "macmini")["current"][0]["document_id"] == 9
machines = build_machines({}, rows)
assert _machine(machines, "macmini")["current"][0]["document_id"] == 9
def test_display_title_fallback_chain():
@@ -344,32 +271,15 @@ def test_rows_to_stage_stats_conversion():
assert stats["summarize"]["deferred_pending"] == 2
def test_rows_to_summarize_split_conversion():
rows = [
(True, 4, 8, 1), # is_macbook
(False, 6, 12, 0),
]
split = rows_to_summarize_split(rows)
assert split["macbook"] == {"done_1h": 4, "done_today": 8, "done_15m": 1}
assert split["macmini"] == {"done_1h": 6, "done_today": 12, "done_15m": 0}
def test_rows_to_summarize_split_empty():
split = rows_to_summarize_split([])
assert split["macbook"]["done_1h"] == 0 and split["macmini"]["done_today"] == 0
def test_compose_overview_contract_shape():
"""응답 dict 의 키가 FE 계약 shape 과 정확히 일치하는지 고정."""
out = compose_overview(
{"summarize": _stage(pending=1)},
_split(),
{}, {}, [],
deep_enabled=True,
now_kst=datetime(2026, 6, 11, 14, 30, tzinfo=KST),
)
assert set(out.keys()) == {"machines", "stages", "summarize_eta", "trend_24h", "totals"}
assert [m["key"] for m in out["machines"]] == ["gpu", "macmini", "macbook"]
assert [m["key"] for m in out["machines"]] == ["nas", "macmini"]
for m in out["machines"]:
assert set(m.keys()) == {
"key", "label", "state", "stages", "pending", "processing", "failed",
@@ -381,7 +291,7 @@ def test_compose_overview_contract_shape():
assert set(out["trend_24h"][0].keys()) == {"hour", "inflow", "done"}
assert set(out["totals"].keys()) == {"pending", "processing", "failed"}
# 머신 label 고정 (raw 모델명 노출 금지 — label 만)
assert [m["label"] for m in out["machines"]] == ["GPU 서버", "맥미니", "맥북 M5 Max"]
assert [m["label"] for m in out["machines"]] == ["나스", "맥미니"]
# ─── build_stages (단계별 현황 — 2026-06-11 사용자 피드백: 완료 가시화) ──────