Compare commits

...

6 Commits

Author SHA1 Message Date
hyungi 371ee4ebe6 feat(presegment): 영구 실패 Chat 알람 — ALERT_FAIL_STAGES allowlist(기본 deep_summary,summarize) 2026-07-03 08:14:12 +09:00
hyungi a55bb3453d feat(presegment): PR3 테스트 — 알람·dedupe·override 검증·재개·무회귀 19건
tests/test_presegment_pr3.py: alerts no-op(env 미설정, 프로세스당 1회 로그)/
synochat·ntfy 포맷/실패 무raise(webhook 전부 fake — 실호출 0), HOLD 알람
발화+alerted_at 7일 dedupe, validate_override_boundaries(정상/dict형/중첩/
캡초과/커버리지 부족/범위 밖/TODO 잔존/공백 경고), leaf_spans 원문 재구성,
units_override 가 tier 판정(plan_summarize_units) 우회하고 map-reduce 재개,
잘못된 override(캡 초과·source_len 불일치)=재-HOLD+알람+LLM 콜 0,
override 없는 소형(단일콜)·whole(HOLD+알람) 문서 무회귀.

기존 test_summarize_units 26 + test_deep_summary_mapreduce 등 인접 100건
pass 유지 (test_pipeline_hold 1건 실패는 main 기존 결함 — 본 PR 무관).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:23 +09:00
hyungi 9061f2e25c feat(presegment): PR3 C — 유인 분할 CLI scripts/presegment_attended.py
컨테이너 안(docker exec)에서 실행하는 3-subcommand CLI:
  list   — awaiting_split 보류 큐 행(문서·tier·over%·토큰·초과 섹션·보류/재개 시각)
  export — 문서 통계+hier 개요(overview.md)·자동 pack 유닛 제안(summarize_units
           재사용)·초과 섹션 (start,end) 스팬+본문 덤프(섹션당 파일, 200K자 분할,
           파일명에 절대 오프셋)·boundaries 템플릿 JSON(자동팩 채움+초과=todo 마커)
           +README(유인 클로드 세션 작업 안내)
  apply  — 경계 검증(단조증가·비중첩·본문 범위·커버리지 90%+공백 경고·유닛 캡
           초과 시 유닛 명시 거부·todo 잔존 거부·source_len 드리프트 거부) 통과 시
           payload.presegment.units_override 기록 + awaiting_split=false +
           deferred_until 제거(즉시 재개) + status pending·alerted_at/map_results
           정리. --dry-run 지원.

stdout = 사람이 읽는 요약 + '{' 로 시작하는 기계 파싱용 JSON 라인.
DB 접속 = 기존 scripts/ 패턴(DATABASE_URL env, backfill_tier.py 동형).
마이그레이션 없음 — payload JSONB 만 사용.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:23 +09:00
hyungi 33427d4a42 feat(presegment): PR3 A+B — HOLD 웹훅 알람 + units_override 재개 경로
A. services/alerts.py 신설 — send_alert(title, message):
   ALERT_WEBHOOK_URL 미설정=no-op(프로세스당 1회 INFO), ALERT_WEBHOOK_KIND
   synochat(기본)|ntfy, httpx 5s, 실패=WARNING만(절대 raise 금지).
   deep_summary HOLD/override 거부 시 발화 — 문서 id·제목·tier·over%·토큰·
   초과 섹션 상위3·재개 예정시각·유인 분할 힌트. dedupe=payload.presegment
   .alerted_at(7일) — 매 24h 재보류마다 재알람 방지.

B. units_override 재개 — payload.presegment.units_override 존재 시 tier
   재판정·HOLD 없이 (start,end,title) 문자 오프셋 경계로 유닛 구성 후 기존
   PR2 map-reduce 그대로(유닛 단위 멱등 commit·reduce·doc 기록). 방어:
   source_len 불일치·형식 오류·유닛 추정토큰 > CAP*1.1 이면 실패 대신
   재-HOLD + 알람(잘못된 override 의 900s 콜 재생산 차단). override 없는
   문서는 기존 경로 무회귀.

summarize_units 에 공용 순수함수 추가: choose_override_source(md_content
우선)·validate_override_boundaries(단조·비중첩·범위·커버리지 90%·유닛 캡)·
units_from_boundaries·leaf_spans + greedy_pack 유닛에 leaf_indexes 기록
(export CLI 스팬 계산용).

plan ds-presegment-mapreduce-2 / env DEEP_SUMMARY_HOLD_RETRY_MINUTES 유지.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:02 +09:00
hyungi b91b05e889 refactor(board): 처리 머신 보드 나스+맥미니 2노드 재구성
2026-07-02 컷오버 반영 — GPU 서버 퇴역, 맥북 night-drain 보류(06-29 결정).

- 레인 2개: 나스(추출/마크다운/청크·임베딩 등 DS 본체 Docker 스테이지),
  맥미니(분류/요약/심층분석 — 단일 생성 LLM 허브 + bge-m3/리랭크)
- summarize 풀 분리(summarize_by_machine·ai_model_version 조인 SQL) 제거
  — FE 유일 소비자 확인 후 응답 스키마에서 정리 (5쿼리 -> 4쿼리)
- 맥북 전제 UI 제거: 요약 오프로드 분담막대·요약 합류 칩·번다운 합류
  변곡점 마커·잠듦 문구·전역 스트립 맥북 칩(맥미니 칩으로 대체)
- deferred_pending = LLM 백오프 신호로 맥미니 카드 귀속 (기능 보존)
- 번다운 차트·정직 ETA·실패 드로어·백그라운드 작업 등 머신 무관 기능 보존
- background_jobs 머신 귀속 기본값 gpu -> nas
- 단위테스트 2노드 기준 재작성 (27 passed)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-02 16:51:32 +09:00
hyungi 304a2b9c0f Merge pull request 'Feat/two node endpoints' (#51) from feat/two-node-endpoints into main
Reviewed-on: #51
2026-07-02 14:31:27 +09:00
14 changed files with 1555 additions and 379 deletions
+2 -17
View File
@@ -37,8 +37,8 @@ class CurrentItem(BaseModel):
class MachineCard(BaseModel): class MachineCard(BaseModel):
"""머신 카드 — stage 귀속 합산 + 완료 실적(summarize 는 풀 분리) + state.""" """머신 카드 — stage 귀속 합산 + 완료 실적 + state (나스/맥미니 2노드)."""
key: Literal["gpu", "macmini", "macbook"] key: Literal["nas", "macmini"]
label: str label: str
state: Literal["active", "deferred", "idle"] state: Literal["active", "deferred", "idle"]
stages: list[str] stages: list[str]
@@ -59,20 +59,6 @@ class SummarizeEta(BaseModel):
eta_minutes: int | None eta_minutes: int | None
class MachineDone(BaseModel):
"""머신 1대의 summarize 완료 실적 (분담 표시용)."""
done_1h: int
done_today: int
class SummarizeByMachine(BaseModel):
"""summarize 풀의 머신별 완료 실적 분담 — 보드 레인의 '맥미니 vs 맥북'
오프로드 가시화용. rows_to_summarize_split 이 이미 계산하던 값의 노출
(ds-board-merged A-1, 신규 수집 SQL 0)."""
macmini: MachineDone
macbook: MachineDone
class TrendBucket(BaseModel): class TrendBucket(BaseModel):
"""summarize 24h 추이 버킷 — hour 는 KST "HH:00" 라벨.""" """summarize 24h 추이 버킷 — hour 는 KST "HH:00" 라벨."""
hour: str hour: str
@@ -122,7 +108,6 @@ class QueueOverviewResponse(BaseModel):
machines: list[MachineCard] machines: list[MachineCard]
stages: list[StageRow] stages: list[StageRow]
summarize_eta: SummarizeEta summarize_eta: SummarizeEta
summarize_by_machine: SummarizeByMachine
trend_24h: list[TrendBucket] trend_24h: list[TrendBucket]
totals: Totals totals: Totals
background_jobs: list[BackgroundJobItem] = [] background_jobs: list[BackgroundJobItem] = []
+69
View File
@@ -0,0 +1,69 @@
"""운영 알람 webhook (presegment PR3 — HOLD 유인 전환 게이트).
deep_summary HOLD(awaiting_split) 처럼 "사람이 개입해야 풀리는" 상태를 웹훅으로 발화한다.
환경변수:
ALERT_WEBHOOK_URL — 미설정 = no-op (프로세스당 1회 INFO 로그만).
ALERT_WEBHOOK_KIND — 'synochat' | 'ntfy' (기본 synochat).
synochat: Synology Chat incoming webhook — POST form `payload={"text": "..."}`.
ntfy: POST body=message. 제목은 query param(?title=) — HTTP 헤더는 latin-1
한정이라 한글 제목이 깨진다 (ntfy 는 query param title 을 공식 지원).
불변식: 알람은 절대 raise 하지 않는다 — 실패는 WARNING 로그만. 알람이 워커를
죽이면 본전(요약 파이프라인)이 무너진다. 호출부는 반환값(bool)에 의존하지 말 것.
"""
from __future__ import annotations
import json
import os
import httpx
from core.utils import setup_logger
logger = setup_logger("alerts")
ALERT_TIMEOUT_SECONDS = 5.0
# 프로세스당 1회만 "미설정 no-op" 로그 — 매 HOLD 마다 로그 오염 방지.
_noop_logged = False
async def send_alert(title: str, message: str) -> bool:
"""webhook 으로 알람 1건 발화. 성공 True / no-op·실패 False. 절대 raise 금지."""
global _noop_logged
url = (os.getenv("ALERT_WEBHOOK_URL") or "").strip()
if not url:
if not _noop_logged:
logger.info("ALERT_WEBHOOK_URL 미설정 — 알람 no-op (이 로그는 프로세스당 1회)")
_noop_logged = True
return False
kind = (os.getenv("ALERT_WEBHOOK_KIND") or "synochat").strip().lower()
if kind not in ("synochat", "ntfy"):
logger.warning(f"ALERT_WEBHOOK_KIND={kind!r} 미지원 — synochat 으로 폴백")
kind = "synochat"
try:
async with httpx.AsyncClient(timeout=ALERT_TIMEOUT_SECONDS) as client:
if kind == "ntfy":
resp = await client.post(
url,
params={"title": title},
content=message.encode("utf-8"),
)
else: # synochat
text = f"{title}\n{message}" if title else message
resp = await client.post(
url,
data={"payload": json.dumps({"text": text}, ensure_ascii=False)},
)
if resp.status_code >= 400:
logger.warning(
f"알람 webhook({kind}) HTTP {resp.status_code}: {resp.text[:200]}"
)
return False
return True
except Exception as exc: # noqa: BLE001 — 알람 실패가 워커를 죽이면 안 됨
logger.warning(f"알람 webhook({kind}) 발화 실패: {exc}")
return False
+34 -112
View File
@@ -3,19 +3,16 @@
GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue / GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue /
documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약). documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약).
구조: SQL 수집부(build_overview 내부 5쿼리)와 판정부(순수 함수)를 분리. 구조: SQL 수집부(build_overview 내부 4쿼리)와 판정부(순수 함수)를 분리.
판정부(rows_to_* / build_machines / build_summarize_eta / build_trend / 판정부(rows_to_* / build_machines / build_summarize_eta / build_trend /
build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능. build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능.
귀속 규칙 (단일 진실): 귀속 규칙 (단일 진실 — 2026-07-02 컷오버 후 나스+맥미니 2노드):
- stage→machine 정적 맵: gpu = extract/embed/chunk/markdown/preview/thumbnail/ - stage→machine 정적 맵: nas = extract/embed/chunk/markdown/preview/thumbnail/
fulltext/stt · macmini = classify/summarize · macbook = deep_summary fulltext/stt (DS 본체 Docker — 임베딩·리랭크 모델 콜은 맥미니로 나감) ·
(단, settings.ai.deep 부재 시 deep_summary 도 macmini 귀속). macmini = classify/summarize/deep_summary (단일 생성 LLM 허브).
- summarize 는 풀(pool): pending/processing/failed 는 macmini 귀속이되, 완료 - deferred_pending(payload.deferred_until 미래)은 LLM 백오프 신호 —
실적(done_*)은 documents.ai_model_version 조인으로 분리 — 'qwen-macbook' summarize/deep_summary 소속인 macmini 카드 귀속.
이면 macbook 실적, 아니면 macmini 실적.
- deferred_pending(payload.deferred_until 미래)은 macbook 카드 귀속
(보류 = 맥북 불가 신호).
""" """
from datetime import datetime, timedelta from datetime import datetime, timedelta
@@ -25,42 +22,33 @@ from zoneinfo import ZoneInfo
from sqlalchemy import bindparam, text from sqlalchemy import bindparam, text
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
KST = ZoneInfo("Asia/Seoul") KST = ZoneInfo("Asia/Seoul")
# 내부 판별용 alias — 응답에 raw 모델명 노출 금지, 머신 label 만 노출.
_MACBOOK_MODEL_ALIAS = "qwen-macbook"
# stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서) # stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서)
_GPU_STAGES = ( _NAS_STAGES = (
"extract", "embed", "chunk", "markdown", "extract", "embed", "chunk", "markdown",
"preview", "thumbnail", "fulltext", "stt", "preview", "thumbnail", "fulltext", "stt",
) )
_MACMINI_STAGES = ("classify", "summarize") _MACMINI_STAGES = ("classify", "summarize", "deep_summary")
_MACBOOK_STAGES = ("deep_summary",) _STAGE_ORDER = _NAS_STAGES + _MACMINI_STAGES
_STAGE_ORDER = _GPU_STAGES + _MACMINI_STAGES + _MACBOOK_STAGES
_MACHINE_KEYS = ("gpu", "macmini", "macbook") _MACHINE_KEYS = ("nas", "macmini")
_MACHINE_LABELS = { _MACHINE_LABELS = {
"gpu": "GPU 서버", "nas": "나스",
"macmini": "맥미니", "macmini": "맥미니",
"macbook": "맥북 M5 Max",
} }
# 머신 카드당 current 표시 상한 # 머신 카드당 current 표시 상한
_CURRENT_LIMIT = 2 _CURRENT_LIMIT = 2
def stage_machine_map(deep_enabled: bool) -> dict[str, str]: def stage_machine_map() -> dict[str, str]:
"""stage → machine key 맵. deep 슬롯 부재 시 deep_summary 는 macmini 귀속.""" """stage → machine key 맵 (정적 — 나스/맥미니 2노드)."""
mapping: dict[str, str] = {} mapping: dict[str, str] = {}
for s in _GPU_STAGES: for s in _NAS_STAGES:
mapping[s] = "gpu" mapping[s] = "nas"
for s in _MACMINI_STAGES: for s in _MACMINI_STAGES:
mapping[s] = "macmini" mapping[s] = "macmini"
for s in _MACBOOK_STAGES:
mapping[s] = "macbook" if deep_enabled else "macmini"
return mapping return mapping
@@ -90,23 +78,6 @@ def rows_to_stage_stats(rows) -> dict[str, dict]:
return stats return stats
def rows_to_summarize_split(rows) -> dict[str, dict]:
"""summarize 완료 실적 분리 쿼리 행 → {"macbook"|"macmini": {done_*}}.
is_macbook = documents.ai_model_version 이 'qwen-macbook' 인지 (내부 판별 전용).
"""
split = {
"macbook": {"done_1h": 0, "done_today": 0, "done_15m": 0},
"macmini": {"done_1h": 0, "done_today": 0, "done_15m": 0},
}
for row in rows:
key = "macbook" if row[0] else "macmini"
split[key]["done_1h"] += int(row[1] or 0)
split[key]["done_today"] += int(row[2] or 0)
split[key]["done_15m"] += int(row[3] or 0)
return split
def display_title(row: dict) -> str: def display_title(row: dict) -> str:
"""표시용 제목 — title > original_filename > file_path basename > 문서 id.""" """표시용 제목 — title > original_filename > file_path basename > 문서 id."""
if row.get("title"): if row.get("title"):
@@ -120,13 +91,10 @@ def display_title(row: dict) -> str:
def build_machines( def build_machines(
stage_stats: dict[str, dict], stage_stats: dict[str, dict],
summarize_split: dict[str, dict],
current_rows: list[dict], current_rows: list[dict],
*,
deep_enabled: bool,
) -> list[dict]: ) -> list[dict]:
"""머신 카드 3장 (gpu / macmini / macbook) 구성 — 귀속 규칙의 판정부.""" """머신 카드 2장 (nas / macmini) 구성 — 귀속 규칙의 판정부."""
smap = stage_machine_map(deep_enabled) smap = stage_machine_map()
def g(stage: str, field: str) -> int: def g(stage: str, field: str) -> int:
return stage_stats.get(stage, {}).get(field, 0) return stage_stats.get(stage, {}).get(field, 0)
@@ -149,29 +117,23 @@ def build_machines(
pending = sum(g(s, "pending") for s in stages) pending = sum(g(s, "pending") for s in stages)
processing = sum(g(s, "processing") for s in stages) processing = sum(g(s, "processing") for s in stages)
failed = sum(g(s, "failed") for s in stages) failed = sum(g(s, "failed") for s in stages)
done_1h = sum(g(s, "done_1h") for s in stages)
done_today = sum(g(s, "done_today") for s in stages)
done_15m = sum(g(s, "done_15m") for s in stages)
# 완료 실적: summarize 는 풀이라 stage 합산에서 제외하고 split 로 귀속 # 보류 백오프 = LLM 불가 신호 → LLM stage 소속인 macmini 카드 귀속
done_1h = sum(g(s, "done_1h") for s in stages if s != "summarize")
done_today = sum(g(s, "done_today") for s in stages if s != "summarize")
done_15m = sum(g(s, "done_15m") for s in stages if s != "summarize")
if key in summarize_split:
done_1h += summarize_split[key]["done_1h"]
done_today += summarize_split[key]["done_today"]
done_15m += summarize_split[key]["done_15m"]
# 보류 백오프 = 맥북 불가 신호 → macbook 카드 귀속 (deep 슬롯 유무 무관)
deferred_pending = ( deferred_pending = (
g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending") g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending")
if key == "macbook" else 0 if key == "macmini" else 0
) )
# state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11). # state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11).
# 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" — # 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" —
# 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은 # 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은
# 실제로 일이 멈춰 있고 백오프만 쌓인 상태(sleep/불가 지속)에서만. # 실제로 일이 멈춰 있고 백오프만 쌓인 상태(LLM 허브 불가 지속)에서만.
if processing > 0 or done_15m > 0: if processing > 0 or done_15m > 0:
state = "active" state = "active"
elif key == "macbook" and deferred_pending > 0: elif deferred_pending > 0:
state = "deferred" state = "deferred"
else: else:
state = "idle" state = "idle"
@@ -213,16 +175,6 @@ def build_summarize_eta(stage_stats: dict[str, dict]) -> dict:
} }
def build_summarize_by_machine(summarize_split: dict[str, dict]) -> dict:
"""summarize 머신별 완료 실적 분담 (macmini vs macbook) — 보드 레인의
오프로드 가시화용. rows_to_summarize_split 이 이미 만든 값을 응답 형태로
투영(done_1h/done_today 만, done_15m 은 내부 state 판정 전용이라 제외)."""
def m(key: str) -> dict:
s = summarize_split.get(key, {})
return {"done_1h": int(s.get("done_1h", 0)), "done_today": int(s.get("done_today", 0))}
return {"macmini": m("macmini"), "macbook": m("macbook")}
def build_trend( def build_trend(
inflow_buckets: dict[str, int], inflow_buckets: dict[str, int],
done_buckets: dict[str, int], done_buckets: dict[str, int],
@@ -287,28 +239,23 @@ def build_totals(stage_stats: dict[str, dict]) -> dict:
def compose_overview( def compose_overview(
stage_stats: dict[str, dict], stage_stats: dict[str, dict],
summarize_split: dict[str, dict],
inflow_buckets: dict[str, int], inflow_buckets: dict[str, int],
done_buckets: dict[str, int], done_buckets: dict[str, int],
current_rows: list[dict], current_rows: list[dict],
*, *,
deep_enabled: bool,
now_kst: datetime, now_kst: datetime,
) -> dict: ) -> dict:
"""수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요.""" """수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요."""
return { return {
"machines": build_machines( "machines": build_machines(stage_stats, current_rows),
stage_stats, summarize_split, current_rows, deep_enabled=deep_enabled
),
"stages": build_stages(stage_stats), "stages": build_stages(stage_stats),
"summarize_eta": build_summarize_eta(stage_stats), "summarize_eta": build_summarize_eta(stage_stats),
"summarize_by_machine": build_summarize_by_machine(summarize_split),
"trend_24h": build_trend(inflow_buckets, done_buckets, now_kst), "trend_24h": build_trend(inflow_buckets, done_buckets, now_kst),
"totals": build_totals(stage_stats), "totals": build_totals(stage_stats),
} }
# ─── SQL 수집부 (총 5쿼리) ──────────────────────────────────────────────────── # ─── SQL 수집부 (총 4쿼리) ────────────────────────────────────────────────────
# 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방) # 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방)
_STAGE_STATS_SQL = """ _STAGE_STATS_SQL = """
@@ -333,23 +280,7 @@ _STAGE_STATS_SQL = """
GROUP BY stage GROUP BY stage
""" """
# 2) summarize 풀 완료 실적 분리 (documents.ai_model_version 조인, 1방) # 2/3) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
# 스캔 하한 = 오늘 0시(KST)와 1h 전 중 더 이른 시각 (자정 직후 1h 창 보전).
_SUMMARIZE_SPLIT_SQL = """
SELECT
COALESCE(d.ai_model_version = :macbook_alias, false) AS is_macbook,
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '1 hour') AS done_1h,
COUNT(*) FILTER (WHERE q.completed_at > :kst_midnight) AS done_today,
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '15 minutes') AS done_15m
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.stage = 'summarize'
AND q.status = 'completed'
AND q.completed_at > LEAST(:kst_midnight, NOW() - INTERVAL '1 hour')
GROUP BY 1
"""
# 3/4) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
_TREND_INFLOW_SQL = """ _TREND_INFLOW_SQL = """
SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'), SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'),
'YYYY-MM-DD HH24:00') AS bucket, 'YYYY-MM-DD HH24:00') AS bucket,
@@ -371,7 +302,7 @@ _TREND_DONE_SQL = """
GROUP BY 1 GROUP BY 1
""" """
# 5) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서) # 4) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서)
_CURRENT_SQL = """ _CURRENT_SQL = """
SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path
FROM processing_queue q FROM processing_queue q
@@ -383,20 +314,13 @@ _CURRENT_SQL = """
async def build_overview(session: AsyncSession) -> dict: async def build_overview(session: AsyncSession) -> dict:
"""5쿼리 수집 → compose_overview 판정 → 응답 dict.""" """4쿼리 수집 → compose_overview 판정 → 응답 dict."""
now_kst = datetime.now(KST) now_kst = datetime.now(KST)
kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0) kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0)
deep_enabled = settings.ai is not None and settings.ai.deep is not None
stage_rows = ( stage_rows = (
await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight}) await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight})
).all() ).all()
split_rows = (
await session.execute(
text(_SUMMARIZE_SPLIT_SQL),
{"kst_midnight": kst_midnight, "macbook_alias": _MACBOOK_MODEL_ALIAS},
)
).all()
inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all() inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all()
done_rows = (await session.execute(text(_TREND_DONE_SQL))).all() done_rows = (await session.execute(text(_TREND_DONE_SQL))).all()
current_result = (await session.execute(text(_CURRENT_SQL))).all() current_result = (await session.execute(text(_CURRENT_SQL))).all()
@@ -414,11 +338,9 @@ async def build_overview(session: AsyncSession) -> dict:
result = compose_overview( result = compose_overview(
rows_to_stage_stats(stage_rows), rows_to_stage_stats(stage_rows),
rows_to_summarize_split(split_rows),
{row[0]: int(row[1]) for row in inflow_rows}, {row[0]: int(row[1]) for row in inflow_rows},
{row[0]: int(row[1]) for row in done_rows}, {row[0]: int(row[1]) for row in done_rows},
current_rows, current_rows,
deep_enabled=deep_enabled,
now_kst=now_kst, now_kst=now_kst,
) )
# 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]). # 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]).
@@ -426,13 +348,13 @@ async def build_overview(session: AsyncSession) -> dict:
return result return result
# kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = gpu(오케스트레이션 호스트). # kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = nas(오케스트레이션 호스트).
_BG_JOB_MACHINE = { _BG_JOB_MACHINE = {
"global_digest": "macmini", "global_digest": "macmini",
"morning_briefing": "macmini", "morning_briefing": "macmini",
"section_summary": "macmini", "section_summary": "macmini",
"hier_backfill": "gpu", "hier_backfill": "nas",
"hier_redecompose": "gpu", "hier_redecompose": "nas",
} }
@@ -466,7 +388,7 @@ async def _fetch_background_jobs(session: AsyncSession) -> list[dict]:
"processed": int(r["processed"] or 0), "total": r["total"], "processed": int(r["processed"] or 0), "total": r["total"],
"elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]), "elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]),
"error": r["error"], "error": r["error"],
"machine": _BG_JOB_MACHINE.get(r["kind"], "gpu"), "machine": _BG_JOB_MACHINE.get(r["kind"], "nas"),
} }
for r in rows for r in rows
] ]
+193 -3
View File
@@ -66,6 +66,9 @@ class SummarizeUnit:
text: str = "" text: str = ""
est_tokens: int = 0 est_tokens: int = 0
over_cap: bool = False # 단독 섹션이 CAP 초과 (hybrid 시 클로드 대상) over_cap: bool = False # 단독 섹션이 CAP 초과 (hybrid 시 클로드 대상)
# PR3: 이 유닛을 구성한 leaf 의 서수(extract_leaves 순서) — export CLI 가
# leaf_spans 와 결합해 유닛 (start,end) 스팬을 계산한다. 페이로드 미기록.
leaf_indexes: list[int] = field(default_factory=list)
@dataclass @dataclass
@@ -92,20 +95,22 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
units: list[SummarizeUnit] = [] units: list[SummarizeUnit] = []
cur_titles: list[str | None] = [] cur_titles: list[str | None] = []
cur_texts: list[str] = [] cur_texts: list[str] = []
cur_indexes: list[int] = []
cur_tokens = 0 cur_tokens = 0
def _flush() -> None: def _flush() -> None:
nonlocal cur_titles, cur_texts, cur_tokens nonlocal cur_titles, cur_texts, cur_indexes, cur_tokens
if cur_texts: if cur_texts:
units.append(SummarizeUnit( units.append(SummarizeUnit(
index=len(units), index=len(units),
section_titles=cur_titles, section_titles=cur_titles,
text="\n\n".join(cur_texts), text="\n\n".join(cur_texts),
est_tokens=cur_tokens, est_tokens=cur_tokens,
leaf_indexes=cur_indexes,
)) ))
cur_titles, cur_texts, cur_tokens = [], [], 0 cur_titles, cur_texts, cur_indexes, cur_tokens = [], [], [], 0
for leaf in leaves: for li, leaf in enumerate(leaves):
t = estimate_tokens(leaf.text) t = estimate_tokens(leaf.text)
if t > cap: if t > cap:
_flush() _flush()
@@ -115,12 +120,14 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
text=leaf.text, text=leaf.text,
est_tokens=t, est_tokens=t,
over_cap=True, over_cap=True,
leaf_indexes=[li],
)) ))
continue continue
if cur_tokens + t > cap: if cur_tokens + t > cap:
_flush() _flush()
cur_titles.append(leaf.section_title) cur_titles.append(leaf.section_title)
cur_texts.append(leaf.text) cur_texts.append(leaf.text)
cur_indexes.append(li)
cur_tokens += t cur_tokens += t
_flush() _flush()
return units return units
@@ -222,3 +229,186 @@ def build_reduce_units_block(
block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))] block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))]
truncated = True truncated = True
return block, truncated return block, truncated
# ─── PR3 — 유인 분할(units_override) 경계 순수함수 (worker + attended CLI 공용) ───
#
# HOLD(hybrid/whole) 문서를 사람이(유인 클로드 세션) 분할한 경계
# [(start, end, title)] 로 재개하는 경로. 오프셋 = 소스 텍스트의 Python 문자
# (code point) 인덱스 — export 가 덤프한 파일과 apply/워커의 슬라이스가 같은
# 기준을 공유해야 한다 (builder 의 char_start 는 UTF-16 단위라 여기서 미사용).
OVERRIDE_MIN_COVERAGE_PCT = 90.0 # apply 게이트 — 전체 본문의 90%+ 커버 필수
OVERRIDE_GAP_WARN_CHARS = 1_000 # 이보다 큰 공백 구간은 경고로 노출
@dataclass
class OverrideCheck:
"""validate_override_boundaries 결과 — ok=False 면 errors 에 사유."""
ok: bool
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
coverage_pct: float = 0.0
# 정규화된 (start, end, title) — units_from_boundaries 입력으로 그대로 사용
boundaries: list[tuple[int, int, str | None]] = field(default_factory=list)
unit_tokens: list[int] = field(default_factory=list)
def choose_override_source(
md_content: str | None, extracted_text: str | None
) -> tuple[str, str]:
"""units_override 오프셋 기준 텍스트 선택 — (source_name, text).
canonical markdown(md_content) 우선, 부재/공백 시 extracted_text 폴백.
export CLI · apply CLI · 워커 재개가 반드시 같은 규칙을 공유해야
(start,end) 오프셋이 일치한다. 선택 결과는 units_override.source 에 박제.
"""
if md_content and md_content.strip():
return "md_content", md_content
return "extracted_text", extracted_text or ""
def _normalize_boundary(entry, idx: int, errors: list[str]) -> tuple[int, int, str | None] | None:
"""boundaries 1건 정규화 — [start,end,title?] 배열 또는 {start,end,title} 객체.
export 템플릿의 초과 스팬은 "todo" 키를 달고 나온다 — 미해결(todo 잔존) 상태로
apply 하면 여기서 에러 (사람이 CAP 이하 경계로 분할을 완성해야 통과).
"""
if isinstance(entry, dict):
if entry.get("todo"):
errors.append(
f"유닛 {idx}: TODO 미해결 — 초과 스팬을 CAP 이하 경계들로 분할한 뒤 todo 키를 제거해야 함"
)
return None
start, end, title = entry.get("start"), entry.get("end"), entry.get("title")
elif isinstance(entry, (list, tuple)) and 2 <= len(entry) <= 3:
start, end = entry[0], entry[1]
title = entry[2] if len(entry) == 3 else None
else:
errors.append(f"유닛 {idx}: 형식 오류 — [start, end, title] 또는 {{start, end, title}} 여야 함")
return None
if (
isinstance(start, bool) or isinstance(end, bool)
or not isinstance(start, int) or not isinstance(end, int)
):
errors.append(f"유닛 {idx}: start/end 는 정수여야 함 (start={start!r}, end={end!r})")
return None
if title is not None and not isinstance(title, str):
title = str(title)
return start, end, title
def validate_override_boundaries(
text: str,
raw_boundaries,
*,
cap: int = CAP_TOKENS,
min_coverage_pct: float = OVERRIDE_MIN_COVERAGE_PCT,
gap_warn_chars: int = OVERRIDE_GAP_WARN_CHARS,
) -> OverrideCheck:
"""units_override 경계 검증 — 단조증가·비중첩·본문 범위 내·커버리지·유닛별 캡.
apply CLI = 기본 게이트(cap=CAP_TOKENS, coverage>=90%).
워커 방어 = cap 슬랙(CAP*1.1)·coverage 0 으로 완화 호출 — 잘못된 override 가
900s 콜을 재생산하는 것만 차단하고, 품질 게이트는 apply 시점에 이미 통과했다고 본다.
"""
errors: list[str] = []
warnings: list[str] = []
if not isinstance(raw_boundaries, (list, tuple)) or not raw_boundaries:
return OverrideCheck(ok=False, errors=["boundaries 가 비어있거나 리스트가 아님"])
normalized: list[tuple[int, int, str | None]] = []
for i, entry in enumerate(raw_boundaries):
norm = _normalize_boundary(entry, i, errors)
if norm is not None:
normalized.append(norm)
if errors:
return OverrideCheck(ok=False, errors=errors, warnings=warnings)
n = len(text)
prev_end = None
unit_tokens: list[int] = []
covered = 0
for i, (start, end, title) in enumerate(normalized):
label = f"유닛 {i}" + (f" ({title})" if title else "")
if start < 0 or end > n:
errors.append(f"{label}: 본문 범위 밖 — [{start}, {end}) vs len={n}")
continue
if start >= end:
errors.append(f"{label}: start >= end ([{start}, {end}))")
continue
if prev_end is not None and start < prev_end:
errors.append(f"{label}: 직전 유닛과 중첩/역순 — start={start} < 직전 end={prev_end}")
if prev_end is not None and start - prev_end > gap_warn_chars:
warnings.append(f"{label} 앞 공백 구간 {start - prev_end:,}자 ([{prev_end}, {start})) — 의도 확인")
est = estimate_tokens(text[start:end])
unit_tokens.append(est)
if est > cap:
errors.append(f"{label}: 추정 {est:,} tok > cap {cap:,} — 이 스팬을 더 분할해야 함")
covered += end - start
prev_end = max(prev_end or 0, end)
if not errors:
head_gap = normalized[0][0]
tail_gap = n - normalized[-1][1]
if head_gap > gap_warn_chars:
warnings.append(f"문서 선두 공백 구간 {head_gap:,}자 ([0, {normalized[0][0]})) — 의도 확인")
if tail_gap > gap_warn_chars:
warnings.append(f"문서 말미 공백 구간 {tail_gap:,}자 ([{normalized[-1][1]}, {n})) — 의도 확인")
coverage_pct = round(covered * 100.0 / n, 2) if n else 0.0
if not errors and coverage_pct < min_coverage_pct:
errors.append(
f"커버리지 {coverage_pct}% < {min_coverage_pct}% — 경계가 본문 대부분을 덮어야 함"
)
return OverrideCheck(
ok=not errors,
errors=errors,
warnings=warnings,
coverage_pct=coverage_pct,
boundaries=normalized if not errors else [],
unit_tokens=unit_tokens,
)
def units_from_boundaries(
text: str, boundaries: list[tuple[int, int, str | None]]
) -> list[SummarizeUnit]:
"""정규화·검증 통과한 (start,end,title) 리스트 → SummarizeUnit 리스트.
유닛 index = 경계 서수 — boundaries 는 payload 에 박제되므로 attempt 간 안정
(map_results 멱등 재개 키와 정합).
"""
units: list[SummarizeUnit] = []
for i, (start, end, title) in enumerate(boundaries):
seg = text[start:end]
units.append(SummarizeUnit(
index=i,
section_titles=[title],
text=seg,
est_tokens=estimate_tokens(seg),
))
return units
def leaf_spans(text: str, leaves: list[HierNode]) -> list[tuple[int, int]]:
"""extract_leaves 결과 leaf 들의 원문 (start,end) 문자 스팬.
_segment 가 원문을 연속 파티션(빈 preamble 만 폐기)으로 자르므로, 커서 순차
탐색이 항상 정확한 위치를 찾는다 (동일 본문 반복이 있어도 순서가 앞선 leaf 가
앞 오프셋을 가져간다).
"""
spans: list[tuple[int, int]] = []
cursor = 0
for leaf in leaves:
pos = text.find(leaf.text, cursor)
if pos < 0:
# 이론상 불가(연속 파티션) — 방어적으로 전체 재탐색
pos = text.find(leaf.text)
if pos < 0:
raise ValueError(f"leaf 본문을 원문에서 찾지 못함 (title={leaf.section_title!r})")
spans.append((pos, pos + len(leaf.text)))
cursor = pos + len(leaf.text)
return spans
+189 -12
View File
@@ -14,7 +14,7 @@ import asyncio
import json import json
import os import os
import time import time
from datetime import datetime, timezone from datetime import datetime, timedelta, timezone
from pydantic import BaseModel, Field, ValidationError from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import desc, select from sqlalchemy import desc, select
@@ -29,15 +29,19 @@ from core.utils import setup_logger
from models.document import Document from models.document import Document
from models.queue import ProcessingQueue, StageDeferred from models.queue import ProcessingQueue, StageDeferred
from policy.prompt_render import render_26b, policy_version as compute_policy_version from policy.prompt_render import render_26b, policy_version as compute_policy_version
from services.alerts import send_alert
from services.document_telemetry import record_analyze_event from services.document_telemetry import record_analyze_event
from services.search.llm_gate import Priority, acquire_mlx_gate from services.search.llm_gate import Priority, acquire_mlx_gate
from services.summarize_units import ( from services.summarize_units import (
CAP_TOKENS, CAP_TOKENS,
UnitPlan, UnitPlan,
build_reduce_units_block, build_reduce_units_block,
choose_override_source,
estimate_tokens, estimate_tokens,
plan_summarize_units, plan_summarize_units,
render_map_slice, render_map_slice,
units_from_boundaries,
validate_override_boundaries,
) )
logger = setup_logger("deep_summary_worker") logger = setup_logger("deep_summary_worker")
@@ -45,9 +49,16 @@ logger = setup_logger("deep_summary_worker")
DEEP_SUMMARY_TASK = "p3c_deep_summary" DEEP_SUMMARY_TASK = "p3c_deep_summary"
# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce # presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce
REDUCE_TASK = "p3c_deep_summary_reduce" REDUCE_TASK = "p3c_deep_summary_reduce"
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격. PR3(알람·경계 주입) 전까지는 # HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격 — attempts 미소모(StageDeferred)라
# 이 간격으로 재계획만 반복한다 — attempts 미소모(StageDeferred)라 영구 failed 없음. # 영구 failed 없음. PR3: HOLD 시 웹훅 알람 + units_override 주입 시 즉시 재개.
HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440")) HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440"))
# HOLD 알람 dedupe — payload.presegment.alerted_at 이 이 일수 이내면 재발화 억제
# (매 24h 재보류마다 재알람 방지). apply CLI 가 override 기록 시 alerted_at 을 지워
# 다음 이벤트(예: override 거부)는 신선하게 발화된다.
ALERT_DEDUPE_DAYS = 7
# units_override 방어 캡 슬랙 — apply 게이트(CAP)보다 10% 여유. 초과 유닛은 실패
# 대신 재-HOLD + 알람 (잘못된 override 가 900s 콜을 재생산하는 것 차단).
OVERRIDE_CAP_SLACK = 1.1
# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어). # reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어).
REDUCE_BUDGET_FLOOR_TOKENS = 1_000 REDUCE_BUDGET_FLOOR_TOKENS = 1_000
@@ -111,6 +122,17 @@ async def process(
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw)) envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
# ─── presegment PR3 — units_override 재개 경로 (유인 분할 경계 주입) ───
# apply CLI(scripts/presegment_attended.py) 가 payload.presegment.units_override 를
# 기록한 문서는 tier 재판정·HOLD 없이 그 경계로 유닛을 구성해 기존 PR2
# map-reduce 경로를 그대로 탄다. override 없는 문서는 아래 기존 경로와 바이트 동일.
if (payload.get("presegment") or {}).get("units_override"):
await _process_units_override(
doc, queue_row, envelope, subject_domain, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
return
# ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ─── # ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ───
# TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way: # TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way:
# auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce) # auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce)
@@ -123,7 +145,9 @@ async def process(
# units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce # units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce
# 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다. # 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다.
if unit_plan.tier != "auto" or not unit_plan.units: if unit_plan.tier != "auto" or not unit_plan.units:
await _hold_awaiting_split(session, queue_row, unit_plan, document_id) await _hold_awaiting_split(
session, queue_row, unit_plan, document_id, doc_title=doc.title
)
await _process_map_reduce( await _process_map_reduce(
doc, queue_row, envelope, subject_domain, unit_plan, session, doc, queue_row, envelope, subject_domain, unit_plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable, defer_on_deep_unavailable=defer_on_deep_unavailable,
@@ -250,43 +274,196 @@ async def process(
) )
def _hold_alert_due(preseg: dict, now: datetime) -> bool:
"""HOLD 알람 dedupe — alerted_at 이 없거나 ALERT_DEDUPE_DAYS 초과 시에만 발화."""
ts = preseg.get("alerted_at")
if not ts:
return True
try:
prev = datetime.fromisoformat(str(ts))
except ValueError:
return True # 깨진 타임스탬프 = 기록 신뢰 불가 → 발화하고 재기록
if prev.tzinfo is None:
prev = prev.replace(tzinfo=timezone.utc)
return (now - prev) >= timedelta(days=ALERT_DEDUPE_DAYS)
async def _hold_awaiting_split( async def _hold_awaiting_split(
session: AsyncSession, queue_row: ProcessingQueue, plan: UnitPlan, document_id: int session: AsyncSession,
queue_row: ProcessingQueue,
plan: UnitPlan,
document_id: int,
doc_title: str | None = None,
) -> None: ) -> None:
"""HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류. """HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류.
payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러 payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러
(queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음. (queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음.
알람(ntfy)·클로드 경계 주입은 PR3 — 그 전까지는 HOLD_RETRY_MINUTES 간격 재계획만 반복. PR3: 유인 전환 게이트 웹훅 알람 발화(alerted_at dedupe — 매 24h 재보류마다 재알람 방지).
무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트). 무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트).
""" """
payload = dict(queue_row.payload or {}) payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {}) preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
oversized = [
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20]
preseg.update({ preseg.update({
"awaiting_split": True, "awaiting_split": True,
"tier": plan.tier, "tier": plan.tier,
"over_pct": plan.over_pct, "over_pct": plan.over_pct,
"total_est_tokens": plan.total_est_tokens, "total_est_tokens": plan.total_est_tokens,
"units": len(plan.units), "units": len(plan.units),
# 클로드가 분할해야 할 초과 섹션 표본 (PR3 알람 본문용) # 클로드가 분할해야 할 초과 섹션 표본 (알람 본문 + export CLI 안내용)
"oversized_sections": [ "oversized_sections": oversized,
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20],
}) })
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지 queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit() await session.commit()
logger.info( logger.info(
f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} " f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} "
f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} " f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} "
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, 클로드 분할=PR3 유인)" f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, alert={'발화' if alert_due else 'dedupe'})"
) )
if alert_due:
# commit 이후 발화 — 알람이 5s 행에 걸려도 payload 마킹은 이미 영속.
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
top3 = ", ".join(t for t in oversized[:3] if t) or "(제목 없음)"
await send_alert(
f"[DS] deep_summary HOLD — doc {document_id} 유인 분할 필요",
(
f"문서: {doc_title or '(제목 없음)'} (id={document_id})\n"
f"tier={plan.tier} / over%={plan.over_pct} / "
f"total_est_tokens={plan.total_est_tokens:,} / units={len(plan.units)}\n"
f"초과 섹션(상위 3): {top3}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"유인 분할: scripts/presegment_attended.py export --doc {document_id}"
),
)
raise StageDeferred( raise StageDeferred(
f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES
) )
async def _rehold_bad_override(
session: AsyncSession, queue_row: ProcessingQueue, doc: Document, reason: str
) -> None:
"""잘못된 units_override — 실패 대신 재-HOLD + 알람 (900s 콜 재생산 차단).
units_override 는 payload 에 보존(사람이 원인 조사) + override_rejected 사유 기록.
apply CLI 가 alerted_at 을 지워두므로 첫 거부는 즉시 발화되고, 이후 24h 재보류
루프는 ALERT_DEDUPE_DAYS dedupe 로 억제된다.
"""
document_id = doc.id
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
preseg.update({
"awaiting_split": True,
"override_rejected": reason,
"override_rejected_at": now.isoformat(),
})
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit()
logger.warning(f"[deep] id={document_id} units_override 거부 → 재-HOLD: {reason}")
if alert_due:
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
await send_alert(
f"[DS] deep_summary 유인 분할 경계 거부 — doc {document_id}",
(
f"문서: {doc.title or '(제목 없음)'} (id={document_id})\n"
f"사유: {reason}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"수정: scripts/presegment_attended.py export --doc {document_id} "
f"→ apply --doc {document_id} --boundaries FILE"
),
)
raise StageDeferred(
f"override_rejected:{reason[:80]}", retry_after_minutes=HOLD_RETRY_MINUTES
)
async def _process_units_override(
doc: Document,
queue_row: ProcessingQueue,
envelope: EscalationEnvelope,
subject_domain: str,
session: AsyncSession,
*,
defer_on_deep_unavailable: bool,
) -> None:
"""PR3 — apply CLI 가 기록한 유인 분할 경계로 map-reduce 재개.
경계 = units_override.source 텍스트의 (start, end) 문자 오프셋. 방어 검증
(source_len 일치·단조·비중첩·범위·유닛 캡*1.1) 실패 시 재-HOLD + 알람 —
apply 이후 본문이 재생성됐거나 수기 주입이 깨진 경우 900s 콜로 흐르지 않는다.
통과 시 기존 PR2 _process_map_reduce 를 그대로 탄다(맵 결과 유닛 단위 commit·
reduce·ai_detail_summary 기록 — 유닛 index 는 payload 박제 경계 서수라 안정).
"""
document_id = doc.id
preseg = dict((queue_row.payload or {}).get("presegment") or {})
override = preseg.get("units_override")
if isinstance(override, (list, tuple)):
# 수기 주입 호환 — bare [(start,end,title)] 리스트도 허용
override = {"boundaries": list(override)}
if not isinstance(override, dict):
await _rehold_bad_override(
session, queue_row, doc, f"units_override 형식 오류 (type={type(override).__name__})"
)
source = override.get("source")
if source is None:
source, text_src = choose_override_source(doc.md_content, doc.extracted_text)
elif source in ("md_content", "extracted_text"):
text_src = (doc.md_content if source == "md_content" else doc.extracted_text) or ""
else:
await _rehold_bad_override(
session, queue_row, doc, f"units_override.source={source!r} 미지원"
)
expected_len = override.get("source_len")
if expected_len is not None and expected_len != len(text_src):
await _rehold_bad_override(
session, queue_row, doc,
f"source_len 불일치 — override={expected_len:,} vs 현재 {source}={len(text_src):,}"
" (본문 재생성됨 — export 부터 재실행)",
)
check = validate_override_boundaries(
text_src,
override.get("boundaries") or [],
cap=int(CAP_TOKENS * OVERRIDE_CAP_SLACK),
min_coverage_pct=0.0, # 커버리지 품질 게이트는 apply CLI 가 이미 통과시킴
)
if not check.ok:
await _rehold_bad_override(session, queue_row, doc, "; ".join(check.errors[:5]))
units = units_from_boundaries(text_src, check.boundaries)
plan = UnitPlan(
mode="map_reduce",
tier="override",
total_est_tokens=estimate_tokens(text_src),
over_pct=float(preseg.get("over_pct") or 0.0),
units=units,
)
logger.info(
f"[deep] id={document_id} units_override 재개 — source={source} units={len(units)} "
f"coverage={check.coverage_pct}% max_unit_tokens={max(check.unit_tokens, default=0)}"
)
await _process_map_reduce(
doc, queue_row, envelope, subject_domain, plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
async def _call_26b( async def _call_26b(
client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int
): ):
+32
View File
@@ -23,6 +23,15 @@ logger = setup_logger("queue_consumer")
# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만. # pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만.
_hold_logged = False _hold_logged = False
# PR3 후속(2026-07-03): 영구 실패 알람 — 사람이 개입해야 풀리는 상태라 Chat 웹훅 발화.
# allowlist 로 소음 제한: embed/chunk 류 대량 배치가 일제히 실패하면 문서 수만큼 알람이
# 쏟아지므로, 건당 가치가 높고 발생률이 낮은 LLM 스테이지만 기본 대상으로 한다.
_ALERT_FAIL_STAGES = {
s.strip()
for s in os.getenv("ALERT_FAIL_STAGES", "deep_summary,summarize").split(",")
if s.strip()
}
# stage별 배치 크기 # stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움. # stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. # deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
@@ -353,6 +362,8 @@ async def _process_stage(stage, worker_fn):
except Exception as e: except Exception as e:
# 실패 처리 # 실패 처리
permanently_failed = False
doc_title = None
async with async_session() as session: async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id) item = await session.get(ProcessingQueue, queue_id)
if not item: if not item:
@@ -363,7 +374,16 @@ async def _process_stage(stage, worker_fn):
item.error_message = err_text[:500] item.error_message = err_text[:500]
if item.attempts >= item.max_attempts: if item.attempts >= item.max_attempts:
item.status = "failed" item.status = "failed"
permanently_failed = True
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}") logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
if stage in _ALERT_FAIL_STAGES:
# 알람용 제목 best-effort — 실패해도 알람 자체는 발화한다.
try:
from models.document import Document
_doc = await session.get(Document, document_id)
doc_title = getattr(_doc, "title", None)
except Exception:
doc_title = None
# B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다. # B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다.
# 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형 # 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형
# batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan # batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan
@@ -385,6 +405,18 @@ async def _process_stage(stage, worker_fn):
item.started_at = None item.started_at = None
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}") logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
await session.commit() await session.commit()
if permanently_failed and stage in _ALERT_FAIL_STAGES:
# 영구 실패 = 무인 파이프라인이 스스로 못 푸는 상태 → 유인 전환 알람.
# send_alert 는 절대 raise 하지 않음(no-op/실패 = False 반환뿐).
from services.alerts import send_alert
await send_alert(
f"[DS] {stage} 영구 실패 — doc {document_id}",
(
f"{doc_title or '(제목 미상)'}\n"
f"에러: {err_text[:300]}\n"
f"확인: scripts/presegment_attended.py list (보류/거부 사유) 또는 큐 재큐"
),
)
async def consume_queue(): async def consume_queue():
@@ -1,6 +1,7 @@
<script lang="ts"> <script lang="ts">
// 처리 머신 보드 v3통합안 (plan ds-board-merged: C2 머신레인 + C3 번다운/정직ETA). // 처리 머신 보드 v42026-07-02 컷오버 후 2노드 (나스+맥미니).
// · 머신 3레인(GPU/맥미니/맥북) = "누가 일하나" + 요약 오프로드(맥북 합류) 가시화 // · 머신 2레인(나스/맥미니) = "누가 일하나" — 나스=DS 본체 Docker(추출/마크다운/
// 청크·임베딩 등), 맥미니=단일 생성 LLM 허브(분류/요약/심층분석 + bge-m3/리랭크)
// · 지배 백로그 번다운 패널 = "언제 끝나나" + 유입 차감한 정직 ETA(summarize_eta) // · 지배 백로그 번다운 패널 = "언제 끝나나" + 유입 차감한 정직 ETA(summarize_eta)
// · 신선도 '갱신 N초 전' + stale 경고 / 실패 드로어·상세 패널은 v2 자산 재사용. // · 신선도 '갱신 N초 전' + stale 경고 / 실패 드로어·상세 패널은 v2 자산 재사용.
// 데이터 = GET /api/queue/overview (60s 폴링 store) + GET /api/queue/failed (드로어). // 데이터 = GET /api/queue/overview (60s 폴링 store) + GET /api/queue/failed (드로어).
@@ -193,7 +194,7 @@
const machineByKey = $derived( const machineByKey = $derived(
new Map<FlowMachine, MachineOverview>(overview.machines.map((m) => [m.key as FlowMachine, m])), new Map<FlowMachine, MachineOverview>(overview.machines.map((m) => [m.key as FlowMachine, m])),
); );
const LANE_ORDER: FlowMachine[] = ['gpu', 'macmini', 'macbook']; const LANE_ORDER: FlowMachine[] = ['nas', 'macmini'];
const lanes = $derived( const lanes = $derived(
LANE_ORDER.map((key) => ({ LANE_ORDER.map((key) => ({
key, key,
@@ -203,13 +204,6 @@
})), })),
); );
// 요약 오프로드 분담 — 맥미니 vs 맥북 (A-1 summarize_by_machine)
const split = $derived(overview.summarize_by_machine);
const splitTotal1h = $derived(Math.max(1, split.macmini.done_1h + split.macbook.done_1h));
const macbookSharePct = $derived(Math.round((split.macbook.done_1h / splitTotal1h) * 100));
// 맥북이 요약을 실제로 가져가는 중인가 (합류 표식 게이트)
const offloadActive = $derived(split.macbook.done_1h > 0);
// ─── 백그라운드 작업 (큐 밖 스크립트 backfill) — processing_queue 사각지대 노출 ─── // ─── 백그라운드 작업 (큐 밖 스크립트 backfill) — processing_queue 사각지대 노출 ───
const bgJobs = $derived(overview.background_jobs ?? []); const bgJobs = $derived(overview.background_jobs ?? []);
const runningBg = $derived(bgJobs.filter((j) => j.state === 'running')); const runningBg = $derived(bgJobs.filter((j) => j.state === 'running'));
@@ -266,7 +260,7 @@
: `갱신 ${Math.round(ageSec / 60)}분 전`, : `갱신 ${Math.round(ageSec / 60)}분 전`,
); );
// ─── 24h 번다운 (C3) — 요약 유입 vs 소화 + 맥북 합류 변곡점 마커 ─── // ─── 24h 번다운 (C3) — 요약 유입 vs 소화 ───
const burn = $derived.by(() => { const burn = $derived.by(() => {
const t = overview.trend_24h; const t = overview.trend_24h;
if (!t || t.length === 0) return null; if (!t || t.length === 0) return null;
@@ -279,20 +273,12 @@
t.map((b, i) => `${(i * step).toFixed(1)},${y(sel(b))}`).join(' '); t.map((b, i) => `${(i * step).toFixed(1)},${y(sel(b))}`).join(' ');
const doneLine = line((b) => b.done); const doneLine = line((b) => b.done);
const area = `0,${h} ${doneLine} ${w.toFixed(1)},${h}`; const area = `0,${h} ${doneLine} ${w.toFixed(1)},${h}`;
// 합류 변곡점 = done 최대 버킷 (맥북 야간 drain 합류 추정)
let mi = 0;
t.forEach((b, i) => {
if (b.done > t[mi].done) mi = i;
});
return { return {
w, w,
h, h,
area, area,
doneLine, doneLine,
inflowLine: line((b) => b.inflow), inflowLine: line((b) => b.inflow),
markX: (mi * step).toFixed(1),
markHour: t[mi].hour,
markDone: t[mi].done,
peak: max, peak: max,
}; };
}); });
@@ -332,7 +318,7 @@
</span> </span>
</div> </div>
<!-- 머신 레인 (누가 일하나 + 요약 오프로드) --> <!-- 머신 레인 (누가 일하나) -->
<div class="grid gap-2 mb-3"> <div class="grid gap-2 mb-3">
{#each lanes as lane (lane.key)} {#each lanes as lane (lane.key)}
<div class="bg-surface border border-default rounded-card px-3.5 py-2.5"> <div class="bg-surface border border-default rounded-card px-3.5 py-2.5">
@@ -342,11 +328,8 @@
<span class="text-[10px] text-faint font-mono">{lane.meta.model}</span> <span class="text-[10px] text-faint font-mono">{lane.meta.model}</span>
<span class="text-[11px] text-dim tabular-nums ml-1">{formatRate(lane.card?.done_1h ?? 0)}/h</span> <span class="text-[11px] text-dim tabular-nums ml-1">{formatRate(lane.card?.done_1h ?? 0)}/h</span>
{#each bgForMachine(lane.key) as j (j.id)}<span class="text-[10px] font-semibold text-success tabular-nums ml-1">생성 중: {j.label ?? j.kind}{#if j.total} {j.processed}/{j.total}{/if}</span>{/each} {#each bgForMachine(lane.key) as j (j.id)}<span class="text-[10px] font-semibold text-success tabular-nums ml-1">생성 중: {j.label ?? j.kind}{#if j.total} {j.processed}/{j.total}{/if}</span>{/each}
{#if lane.key === 'macbook' && (lane.card?.deferred_pending ?? 0) > 0} {#if (lane.card?.deferred_pending ?? 0) > 0}
<span class="text-[10px] font-semibold text-warning tabular-nums">보류 {lane.card?.deferred_pending}</span> <span class="text-[10px] font-semibold text-warning tabular-nums" title="LLM 백오프 — 자동 재개 대기">보류 {lane.card?.deferred_pending}</span>
{/if}
{#if lane.card?.state === 'deferred'}
<span class="text-[9px] text-warning">잠듦 — 요약은 맥미니로 복귀</span>
{/if} {/if}
</div> </div>
<div class="flex items-stretch gap-1.5 flex-wrap"> <div class="flex items-stretch gap-1.5 flex-wrap">
@@ -368,26 +351,8 @@
</div> </div>
<div class="text-sm font-extrabold tabular-nums leading-tight text-text">{n.pending.toLocaleString()}<span class="text-[9px] text-faint font-normal ml-0.5">대기</span></div> <div class="text-sm font-extrabold tabular-nums leading-tight text-text">{n.pending.toLocaleString()}<span class="text-[9px] text-faint font-normal ml-0.5">대기</span></div>
<div class="text-[9px] text-dim tabular-nums whitespace-nowrap">{formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()}</div> <div class="text-[9px] text-dim tabular-nums whitespace-nowrap">{formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()}</div>
{#if n.def.key === 'summarize'}
<div class="mt-1 h-1 w-full rounded-full overflow-hidden flex" title="맥미니 {split.macmini.done_1h}/h · 맥북 {split.macbook.done_1h}/h">
<span class="block h-full mtag-macmini-bar" style="width:{100 - macbookSharePct}%"></span>
<span class="block h-full mtag-macbook-bar" style="width:{macbookSharePct}%"></span>
</div>
<div class="text-[9px] text-faint tabular-nums whitespace-nowrap mt-0.5">맥미니 {split.macmini.done_1h} · 맥북 {split.macbook.done_1h}/h</div>
{/if}
</button> </button>
{/each} {/each}
{#if lane.key === 'macbook' && offloadActive}
<button
class="text-left rounded-lg border border-dashed border-warning/50 px-2.5 py-1.5 cursor-pointer hover:bg-surface-hover min-w-[96px]"
onclick={() => toggleNode('summarize')}
title="맥북이 요약을 맥미니에서 가져와 처리 "
>
<div class="flex items-center gap-1 text-[11px] font-semibold text-text whitespace-nowrap">요약 합류 <span class="text-[8px] font-bold text-warning">OFFLOAD</span></div>
<div class="text-sm font-extrabold tabular-nums leading-tight text-text">{split.macbook.done_1h}<span class="text-[9px] text-faint font-normal ml-0.5">/h</span></div>
<div class="text-[9px] text-dim tabular-nums whitespace-nowrap">요약의 {macbookSharePct}% 담당</div>
</button>
{/if}
</div> </div>
</div> </div>
{/each} {/each}
@@ -399,15 +364,11 @@
<div class="flex items-center gap-2 mb-2"> <div class="flex items-center gap-2 mb-2">
<span class="text-[11px] font-bold text-text">요약 백로그 24시간</span> <span class="text-[11px] font-bold text-text">요약 백로그 24시간</span>
<span class="text-[9px] text-faint">유입(회색) vs 소화(녹색)</span> <span class="text-[9px] text-faint">유입(회색) vs 소화(녹색)</span>
{#if offloadActive}<span class="text-[9px] text-warning ml-auto">맥북 합류 {burn.markHour} — 소화 급증</span>{/if}
</div> </div>
<svg viewBox="0 0 {burn.w} {burn.h}" class="block w-full" style="height:64px" preserveAspectRatio="none" role="img" aria-label="요약 백로그 24시간 번다운"> <svg viewBox="0 0 {burn.w} {burn.h}" class="block w-full" style="height:64px" preserveAspectRatio="none" role="img" aria-label="요약 백로그 24시간 번다운">
<polygon points={burn.area} fill="currentColor" class="text-success" opacity="0.12" /> <polygon points={burn.area} fill="currentColor" class="text-success" opacity="0.12" />
<polyline points={burn.inflowLine} fill="none" stroke="currentColor" stroke-width="1.2" class="text-faint" /> <polyline points={burn.inflowLine} fill="none" stroke="currentColor" stroke-width="1.2" class="text-faint" />
<polyline points={burn.doneLine} fill="none" stroke="currentColor" stroke-width="1.6" class="text-success" /> <polyline points={burn.doneLine} fill="none" stroke="currentColor" stroke-width="1.6" class="text-success" />
{#if offloadActive}
<line x1={burn.markX} y1="0" x2={burn.markX} y2={burn.h} stroke="currentColor" stroke-width="1" stroke-dasharray="2 2" class="text-warning" opacity="0.7" />
{/if}
</svg> </svg>
<div class="flex flex-wrap gap-x-4 gap-y-1 mt-2 pt-2 border-t border-default text-[10px] text-dim tabular-nums"> <div class="flex flex-wrap gap-x-4 gap-y-1 mt-2 pt-2 border-t border-default text-[10px] text-dim tabular-nums">
{#each mainNodes.filter((n) => n.pending > 0 && n.def.key !== 'summarize') as n (n.def.key)} {#each mainNodes.filter((n) => n.pending > 0 && n.def.key !== 'summarize') as n (n.def.key)}
@@ -558,13 +519,9 @@
</div> </div>
<style> <style>
/* 머신 색 — 디자인 토큰 외 3색 (gpu 청/macmini 보라/macbook 황) — 이 컴포넌트 한정 */ /* 머신 색 — 디자인 토큰 외 2색 (nas 청/macmini 보라) — 이 컴포넌트 한정 */
.mtag-gpu { background: #e7eef6; color: #3b6ea5; } .mtag-nas { background: #e7eef6; color: #3b6ea5; }
.mtag-macmini { background: #efe9f7; color: #8a5fbf; } .mtag-macmini { background: #efe9f7; color: #8a5fbf; }
.mtag-macbook { background: #f7eedd; color: #b07a10; }
/* 요약 오프로드 분담 막대 채움 (맥미니 보라 / 맥북 황) */
.mtag-macmini-bar { background: #8a5fbf; }
.mtag-macbook-bar { background: #b07a10; }
.node-sel { outline: 2px solid #3b6ea5; outline-offset: 1px; } .node-sel { outline: 2px solid #3b6ea5; outline-offset: 1px; }
.detail-frame { border-color: #3b6ea5; } .detail-frame { border-color: #3b6ea5; }
.detail-head { background: #e7eef6; } .detail-head { background: #e7eef6; }
@@ -1,6 +1,6 @@
<script lang="ts"> <script lang="ts">
// 처리 현황 드로어 (안6 라이트) — 전 페이지 상태 스트립 클릭 시 우측에서 열림. // 처리 현황 드로어 (안6 라이트) — 전 페이지 상태 스트립 클릭 시 우측에서 열림.
// 머신 미니카드 3 + ETA 한 줄 + 실패 합계 + 홈 링크 축약본. 상세는 홈 보드가 담당. // 머신 미니카드 2(나스/맥미니) + ETA 한 줄 + 실패 합계 + 홈 링크 축약본. 상세는 홈 보드가 담당.
// 데이터 = queueOverview store 공유 (60s 폴링, 실패 시 null → 안내문으로 degrade). // 데이터 = queueOverview store 공유 (60s 폴링, 실패 시 null → 안내문으로 degrade).
// 열림 상태는 uiState 단일 drawer slot('queue') — 사이드바 드로어와 동시 오픈 차단. // 열림 상태는 uiState 단일 drawer slot('queue') — 사이드바 드로어와 동시 오픈 차단.
import { X } from 'lucide-svelte'; import { X } from 'lucide-svelte';
@@ -51,7 +51,7 @@
<div class="p-4 space-y-3"> <div class="p-4 space-y-3">
{#if data} {#if data}
<!-- 머신 미니카드 3 --> <!-- 머신 미니카드 (나스/맥미니) -->
{#each data.machines as m (m.key)} {#each data.machines as m (m.key)}
<div class="bg-surface border border-default rounded-lg px-3.5 py-2.5"> <div class="bg-surface border border-default rounded-lg px-3.5 py-2.5">
<div class="flex items-center justify-between gap-2"> <div class="flex items-center justify-between gap-2">
+2 -9
View File
@@ -5,7 +5,7 @@
* . * .
*/ */
export type MachineKey = 'gpu' | 'macmini' | 'macbook'; export type MachineKey = 'nas' | 'macmini';
/** 머신 상태 — active(가동) / deferred(보류) / idle(대기) */ /** 머신 상태 — active(가동) / deferred(보류) / idle(대기) */
export type MachineState = 'active' | 'deferred' | 'idle'; export type MachineState = 'active' | 'deferred' | 'idle';
@@ -29,7 +29,7 @@ export interface MachineOverview {
/** 최근 1시간 완료 건수 (처리율 N/h 표기) */ /** 최근 1시간 완료 건수 (처리율 N/h 표기) */
done_1h: number; done_1h: number;
done_today: number; done_today: number;
/** 보류 건수 — 맥북 sleep 등으로 자동 재개 대기 중 */ /** 보류 건수 — LLM 허브 백오프 등으로 자동 재개 대기 중 */
deferred_pending: number; deferred_pending: number;
current: MachineCurrentItem[]; current: MachineCurrentItem[];
} }
@@ -50,12 +50,6 @@ export interface TrendPoint {
done: number; done: number;
} }
/** summarize 머신별 완료 실적 분담 (오프로드 가시화 — ds-board-merged A-1) */
export interface SummarizeByMachine {
macmini: { done_1h: number; done_today: number };
macbook: { done_1h: number; done_today: number };
}
export interface QueueTotals { export interface QueueTotals {
pending: number; pending: number;
processing: number; processing: number;
@@ -93,7 +87,6 @@ export interface BackgroundJob {
export interface QueueOverview { export interface QueueOverview {
machines: MachineOverview[]; machines: MachineOverview[];
summarize_eta: SummarizeEta; summarize_eta: SummarizeEta;
summarize_by_machine: SummarizeByMachine;
trend_24h: TrendPoint[]; trend_24h: TrendPoint[];
stages: QueueStageRow[]; stages: QueueStageRow[];
totals: QueueTotals; totals: QueueTotals;
+11 -12
View File
@@ -62,7 +62,7 @@ export function formatAgeSec(sec: number): string {
* / 1 (: 맥미니 ). * / 1 (: 맥미니 ).
*/ */
export type FlowMachine = 'gpu' | 'macmini' | 'macbook'; export type FlowMachine = 'nas' | 'macmini';
export interface FlowNodeDef { export interface FlowNodeDef {
key: string; key: string;
@@ -79,26 +79,25 @@ export interface FlowNodeDef {
/** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */ /** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */
export const FLOW_NODES: FlowNodeDef[] = [ export const FLOW_NODES: FlowNodeDef[] = [
{ key: 'extract', label: '추출', stages: ['extract'], machine: 'gpu', engine: 'Surya OCR', sub: 'ocr-service' }, { key: 'extract', label: '추출', stages: ['extract'], machine: 'nas', engine: 'kordoc', sub: 'kordoc' },
{ key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'gpu', engine: 'Marker', sub: 'marker-service' }, { key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'nas', engine: 'Marker', sub: 'marker-service' },
{ key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' }, { key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' },
{ key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' }, { key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' },
{ key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'gpu', engine: 'TEI bge-m3', sub: 'text-embeddings-inference' }, { key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'nas', engine: 'bge-m3 (맥미니 콜)', sub: 'embed worker' },
{ key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macbook', engine: 'Qwen3.6-27B', sub: 'deep_summary' }, { key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'deep_summary' },
]; ];
/** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */ /** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */
export const AUX_NODES: FlowNodeDef[] = [ export const AUX_NODES: FlowNodeDef[] = [
{ key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'gpu', engine: 'Playwright', sub: 'playwright-fetcher' }, { key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'nas', engine: 'Playwright', sub: 'playwright-fetcher' },
{ key: 'stt', label: '전사', stages: ['stt'], machine: 'gpu', engine: 'Whisper', sub: 'stt-service' }, { key: 'stt', label: '전사', stages: ['stt'], machine: 'nas', engine: 'Whisper', sub: 'stt-service' },
{ key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'gpu', engine: '유틸', sub: 'ffmpeg' }, { key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'nas', engine: '유틸', sub: 'ffmpeg' },
]; ];
/** 머신 스트립 메타 — 모델 표기 단일 지점 */ /** 머신 스트립 메타 — 모델 표기 단일 지점 (2026-07-02 컷오버: 나스+맥미니 2노드) */
export const MACHINE_META: Record<FlowMachine, { label: string; model: string }> = { export const MACHINE_META: Record<FlowMachine, { label: string; model: string }> = {
gpu: { label: 'GPU 서버', model: '특화 엔진' }, nas: { label: '나스', model: 'DS 본체 Docker · 특화 엔진' },
macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · 24/7' }, macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · bge-m3 · 24/7' },
macbook: { label: '맥북 M5 Max', model: 'Qwen3.6-27B · 야간 drain' },
}; };
/** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */ /** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */
+3 -3
View File
@@ -72,7 +72,7 @@
// 처리 현황 스트립 (안6 라이트) — 60s 폴링 store 공유. fetch 실패/401 시 // 처리 현황 스트립 (안6 라이트) — 60s 폴링 store 공유. fetch 실패/401 시
// store 가 null → 스트립 자체를 숨김 (silent 비차단, 로그인 페이지 동일). // store 가 null → 스트립 자체를 숨김 (silent 비차단, 로그인 페이지 동일).
let queue = $derived($queueOverview); let queue = $derived($queueOverview);
let queueMacbook = $derived(queue?.machines?.find((m) => m.key === 'macbook') ?? null); let queueMacmini = $derived(queue?.machines?.find((m) => m.key === 'macmini') ?? null);
function toggleQueueDrawer() { function toggleQueueDrawer() {
if (ui.isDrawerOpen('queue')) ui.closeDrawer(); if (ui.isDrawerOpen('queue')) ui.closeDrawer();
else ui.openDrawer('queue'); else ui.openDrawer('queue');
@@ -189,8 +189,8 @@
</span> </span>
<span class="tabular-nums shrink-0">대기 <strong class="text-text">{queue.totals.pending.toLocaleString()}</strong></span> <span class="tabular-nums shrink-0">대기 <strong class="text-text">{queue.totals.pending.toLocaleString()}</strong></span>
<span class="tabular-nums shrink-0 {queue.totals.failed > 0 ? 'text-error font-semibold' : ''}">실패 <strong class={queue.totals.failed > 0 ? '' : 'text-text'}>{queue.totals.failed.toLocaleString()}</strong></span> <span class="tabular-nums shrink-0 {queue.totals.failed > 0 ? 'text-error font-semibold' : ''}">실패 <strong class={queue.totals.failed > 0 ? '' : 'text-text'}>{queue.totals.failed.toLocaleString()}</strong></span>
{#if queueMacbook} {#if queueMacmini}
<span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(queueMacbook.state)}"> {MACHINE_STATE_LABEL[queueMacbook.state]}</span> <span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(queueMacmini.state)}">미니 {MACHINE_STATE_LABEL[queueMacmini.state]}</span>
{/if} {/if}
<span class="ml-auto flex items-center gap-0.5 text-faint shrink-0">자세히 <ChevronDown size={11} /></span> <span class="ml-auto flex items-center gap-0.5 text-faint shrink-0">자세히 <ChevronDown size={11} /></span>
</button> </button>
+456
View File
@@ -0,0 +1,456 @@
"""presegment PR3 — HOLD 거대문서 유인 분할 CLI (plan ds-presegment-mapreduce-2).
deep_summary 워커가 HOLD(payload.presegment.awaiting_split=true) 보류한
hybrid/whole tier 거대문서를, 사람이(유인 클로드 세션) 경계를 완성해 재개시키는 도구.
사용법 (fastapi 컨테이너 안에서 실행):
docker compose exec fastapi python /app/scripts/presegment_attended.py list
docker compose exec fastapi python /app/scripts/presegment_attended.py export --doc 44443 --out /app/logs/preseg_44443
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json --dry-run
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json
워크플로우:
1. list awaiting_split 문서 확인.
2. export 문서 통계·hier 개요·자동 pack 유닛 제안·초과 섹션 본문 덤프·boundaries
템플릿 JSON 생성. 유인 클로드 세션은 파일들만 읽고 TODO 스팬을
CAP 이하 경계들로 분할해 템플릿을 완성한다.
3. apply 완성된 boundaries 검증(단조·비중첩·범위·커버리지 90%+·유닛 )
payload.presegment.units_override 기록 + awaiting_split 해제 +
deferred_until 제거(즉시 재개). 워커가 다음 사이클에 map-reduce 재개.
stdout 규약: 사람이 읽는 요약 + '{' 시작하는 기계 파싱용 JSON 라인(1 1라인).
사람용 행은 절대 '{' 시작하지 않는다.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "app"))
from sqlalchemy import text as sql_text # noqa: E402
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine # noqa: E402
from services.hier_decomp.builder import build_hier_tree # noqa: E402
from services.summarize_units import ( # noqa: E402
CAP_TOKENS,
OVERRIDE_MIN_COVERAGE_PCT,
TRIGGER_TOKENS,
choose_override_source,
estimate_tokens,
extract_leaves,
leaf_spans,
plan_summarize_units,
validate_override_boundaries,
)
# 초과 섹션 본문 덤프 분할 단위 (유인 세션 컨텍스트 보호)
DUMP_CHUNK_CHARS = 200_000
def _jsonl(obj: dict) -> None:
"""기계 파싱용 JSON 라인 — 반드시 '{' 로 시작하는 단독 라인."""
print(json.dumps(obj, ensure_ascii=False, default=str))
def _session_factory():
database_url = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://pkm:pkm@postgres:5432/pkm",
)
engine = create_async_engine(database_url)
return engine, async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# ─── list ────────────────────────────────────────────────────────────────────
LIST_SQL = """
SELECT q.id AS queue_id, q.document_id, q.status, q.attempts,
q.payload::text AS payload_text,
LEFT(COALESCE(d.title, '(제목 없음)'), 80) AS title
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.stage = 'deep_summary'
AND q.status IN ('pending', 'processing', 'failed')
AND (q.payload -> 'presegment' ->> 'awaiting_split') = 'true'
ORDER BY q.id
"""
async def cmd_list() -> int:
engine, factory = _session_factory()
try:
async with factory() as session:
rows = (await session.execute(sql_text(LIST_SQL))).mappings().all()
finally:
await engine.dispose()
print(f"awaiting_split 보류 문서 {len(rows)}")
for r in rows:
payload = json.loads(r["payload_text"] or "{}")
preseg = payload.get("presegment") or {}
oversized = preseg.get("oversized_sections") or []
print(
f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']} "
f"tier={preseg.get('tier')} over%={preseg.get('over_pct')} "
f"tokens={preseg.get('total_est_tokens'):,} units={preseg.get('units')}"
if isinstance(preseg.get("total_est_tokens"), int)
else f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']}"
)
print(f" 초과 섹션 {len(oversized)}건: {', '.join(str(t) for t in oversized[:3] if t)}")
print(
f" 보류 알람={preseg.get('alerted_at') or '-'} / "
f"재개 예정={payload.get('deferred_until') or '(즉시)'}"
+ (f" / 거부 사유={preseg.get('override_rejected')}" if preseg.get("override_rejected") else "")
)
_jsonl({
"cmd": "list",
"queue_id": r["queue_id"],
"doc_id": r["document_id"],
"title": r["title"],
"status": r["status"],
"tier": preseg.get("tier"),
"over_pct": preseg.get("over_pct"),
"total_est_tokens": preseg.get("total_est_tokens"),
"units": preseg.get("units"),
"oversized_sections": oversized,
"alerted_at": preseg.get("alerted_at"),
"deferred_until": payload.get("deferred_until"),
"override_rejected": preseg.get("override_rejected"),
})
return 0
# ─── export ──────────────────────────────────────────────────────────────────
def _safe_name(title: str | None, fallback: str) -> str:
t = re.sub(r"[^0-9A-Za-z가-힣._-]+", "_", (title or fallback)).strip("_")
return (t or fallback)[:60]
def _build_outline(text: str) -> str:
"""hier_decomp builder 재사용 — window-split 억제(요약 계획과 동일 환경) 개요."""
nodes = build_hier_tree(text, leaf_target_max=sys.maxsize, leaf_hard_max=sys.maxsize)
lines = []
for n in nodes:
indent = " " * max(n.level, 0)
title = n.section_title or "(preamble)"
tok = estimate_tokens(n.text)
mark = " [CAP 초과]" if n.is_leaf and tok > CAP_TOKENS else ""
lines.append(f"{indent}- L{n.level} {title}{tok:,} tok{mark}")
return "\n".join(lines)
async def cmd_export(doc_id: int, out_dir: str) -> int:
engine, factory = _session_factory()
try:
async with factory() as session:
row = (await session.execute(
sql_text(
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
),
{"d": doc_id},
)).mappings().first()
finally:
await engine.dispose()
if not row:
print(f"[error] 문서 id={doc_id} 없음")
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
return 1
source, text = choose_override_source(row["md_content"], row["extracted_text"])
if not text.strip():
print(f"[error] 문서 id={doc_id} 본문 비어있음 (md_content/extracted_text 둘 다)")
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "empty_text"})
return 1
plan = plan_summarize_units(text)
leaves = extract_leaves(text)
spans = leaf_spans(text, leaves)
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
files: list[str] = []
now_iso = datetime.now(timezone.utc).isoformat(timespec="seconds")
# ① 통계 + hier 개요
oversized_units = [u for u in plan.units if u.over_cap]
overview = [
f"# presegment export — doc {doc_id}",
"",
f"- 제목: {row['title'] or '(제목 없음)'}",
f"- source: {source} (len={len(text):,}자, 오프셋 기준 텍스트)",
f"- 추정 토큰: {plan.total_est_tokens:,} (trigger={TRIGGER_TOKENS:,} / cap={CAP_TOKENS:,})",
f"- plan: mode={plan.mode} tier={plan.tier} over%={plan.over_pct}",
f"- 유닛: 자동 pack {len(plan.units) - len(oversized_units)}개 + CAP 초과 {len(oversized_units)}",
f"- 생성: {now_iso}",
"",
"## 유닛 제안 (summarize_units greedy-pack)",
"",
]
for u in plan.units:
if not u.leaf_indexes:
continue
s = spans[u.leaf_indexes[0]][0]
e = spans[u.leaf_indexes[-1]][1]
titles = " · ".join(t for t in u.section_titles if t) or "(무제 구간)"
flag = " ★CAP 초과 — 분할 필요" if u.over_cap else ""
overview.append(f"- 유닛 {u.index}: [{s}, {e}) {u.est_tokens:,} tok — {titles[:120]}{flag}")
overview += ["", "## hier 개요", "", _build_outline(text), ""]
(out / "overview.md").write_text("\n".join(overview), encoding="utf-8")
files.append("overview.md")
# ③ 초과 섹션 본문 덤프 (섹션당 파일 · 200K자 단위 분할 · 파일명에 절대 스팬)
boundaries: list[dict] = []
for u in plan.units:
if not u.leaf_indexes:
continue
s = spans[u.leaf_indexes[0]][0]
e = spans[u.leaf_indexes[-1]][1]
title = next((t for t in u.section_titles if t), None)
if not u.over_cap:
# ④ 자동 pack 유닛은 템플릿에 채워둔다
boundaries.append({"start": s, "end": e, "title": title or f"유닛 {u.index}"})
continue
boundaries.append({
"start": s, "end": e, "title": title or f"유닛 {u.index}",
"todo": (
f"CAP 초과({u.est_tokens:,} tok > {CAP_TOKENS:,}) — 이 스팬을 cap 이하 "
"경계 여러 개로 교체하고 todo 키를 제거할 것"
),
})
seg = text[s:e]
base = f"oversized_{u.index:03d}_{_safe_name(title, f'unit{u.index}')}"
for k in range(0, len(seg), DUMP_CHUNK_CHARS):
cs, ce = s + k, s + min(k + DUMP_CHUNK_CHARS, len(seg))
fname = f"{base}.{cs}_{ce}.md"
# 본문 원문 그대로 (헤더 미부착 — 파일 내 로컬 오프셋 + 파일명 cs 로 절대 오프셋 계산)
(out / fname).write_text(text[cs:ce], encoding="utf-8")
files.append(fname)
# ④ boundaries 템플릿 JSON
template = {
"doc_id": doc_id,
"source": source,
"source_len": len(text),
"cap_tokens": CAP_TOKENS,
"generated_at": now_iso,
"boundaries": boundaries,
}
(out / "boundaries_template.json").write_text(
json.dumps(template, ensure_ascii=False, indent=2), encoding="utf-8"
)
files.append("boundaries_template.json")
# 유인 세션용 작업 안내
readme = f"""# doc {doc_id} 유인 분할 안내
1. overview.md 구조 파악 (유닛 제안 + hier 개요).
2. oversized_*.md 본문을 읽고 의미 경계를 정한다.
- 파일명 `..._<cs>_<ce>.md` cs = 파일 문자의 절대 오프셋.
- 절대 오프셋 = cs + 파일 로컬 오프셋.
3. boundaries_template.json todo 항목을 cap({CAP_TOKENS:,} tok) 이하 경계 여러 개로
교체하고 todo 키를 제거한다. 나머지 자동 pack 항목은 그대로 둬도 된다.
- 토큰 추정: 한글 0.529 tok/ · 기타 0.217 tok/ (services/summarize_units.py).
- 규칙: start 단조증가 · 비중첩 · 전체 커버리지 {OVERRIDE_MIN_COVERAGE_PCT:.0f}%+ · 유닛당 cap 이하.
4. 검증/적용:
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE> --dry-run
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE>
"""
(out / "README.md").write_text(readme, encoding="utf-8")
files.append("README.md")
print(f"doc {doc_id} [{row['title'] or '(제목 없음)'}] export 완료 → {out}")
print(
f" source={source} len={len(text):,}자 tokens={plan.total_est_tokens:,} "
f"tier={plan.tier} over%={plan.over_pct}"
)
print(f" 자동 pack 유닛 {len(plan.units) - len(oversized_units)}개 / TODO(초과) {len(oversized_units)}")
print(f" 파일 {len(files)}개: {', '.join(files[:6])}{' ...' if len(files) > 6 else ''}")
_jsonl({
"cmd": "export", "ok": True, "doc_id": doc_id, "out": str(out),
"source": source, "source_len": len(text),
"total_est_tokens": plan.total_est_tokens, "tier": plan.tier,
"over_pct": plan.over_pct,
"units_auto": len(plan.units) - len(oversized_units),
"units_todo": len(oversized_units),
"files": files,
})
return 0
# ─── apply ───────────────────────────────────────────────────────────────────
QUEUE_ROW_SQL = """
SELECT id, status, attempts, payload::text AS payload_text
FROM processing_queue
WHERE document_id = :d AND stage = 'deep_summary'
AND status IN ('pending', 'processing', 'failed')
ORDER BY id DESC
LIMIT 1
"""
APPLY_UPDATE_SQL = """
UPDATE processing_queue
SET payload = CAST(:payload AS JSONB),
status = 'pending',
attempts = 0,
error_message = NULL
WHERE id = :qid
"""
async def cmd_apply(doc_id: int, boundaries_file: str, dry_run: bool) -> int:
raw = json.loads(Path(boundaries_file).read_text(encoding="utf-8"))
if isinstance(raw, dict):
boundaries = raw.get("boundaries") or []
declared_source = raw.get("source")
declared_len = raw.get("source_len")
if raw.get("doc_id") not in (None, doc_id):
print(f"[error] boundaries 파일 doc_id={raw.get('doc_id')} != --doc {doc_id}")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "doc_id_mismatch"})
return 1
else:
boundaries, declared_source, declared_len = raw, None, None
engine, factory = _session_factory()
try:
async with factory() as session:
doc = (await session.execute(
sql_text(
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
),
{"d": doc_id},
)).mappings().first()
if not doc:
print(f"[error] 문서 id={doc_id} 없음")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
return 1
qrow = (await session.execute(sql_text(QUEUE_ROW_SQL), {"d": doc_id})).mappings().first()
if not qrow:
print(f"[error] doc {doc_id} 의 활성 deep_summary 큐 행 없음 (pending/processing/failed)")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_row_not_found"})
return 1
if qrow["status"] == "processing":
print(f"[error] queue {qrow['id']} 가 processing 중 — 워커 완료/보류 후 재시도")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_processing"})
return 1
# 오프셋 기준 텍스트 — export 와 동일 규칙 (파일에 source 선언 시 그 선언 우선)
if declared_source in ("md_content", "extracted_text"):
source = declared_source
text = (doc["md_content"] if source == "md_content" else doc["extracted_text"]) or ""
else:
source, text = choose_override_source(doc["md_content"], doc["extracted_text"])
if declared_len is not None and declared_len != len(text):
print(
f"[error] source_len 불일치 — 파일={declared_len:,} vs 현재 {source}={len(text):,}"
" (본문 재생성됨 — export 부터 재실행)"
)
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "source_len_mismatch"})
return 1
check = validate_override_boundaries(text, boundaries)
for w in check.warnings:
print(f" [warn] {w}")
if not check.ok:
print(f"[error] 경계 검증 실패 — {len(check.errors)}건:")
for e in check.errors:
print(f" - {e}")
_jsonl({
"cmd": "apply", "ok": False, "doc_id": doc_id,
"error": "validation_failed", "errors": check.errors,
"warnings": check.warnings, "coverage_pct": check.coverage_pct,
})
return 1
print(
f"doc {doc_id} [{doc['title'] or '(제목 없음)'}] 경계 검증 통과 — "
f"유닛 {len(check.boundaries)}개 / 커버리지 {check.coverage_pct}% / "
f"최대 유닛 {max(check.unit_tokens):,} tok (cap {CAP_TOKENS:,})"
)
for i, ((s, e, t), tok) in enumerate(zip(check.boundaries, check.unit_tokens)):
print(f" 유닛 {i}: [{s}, {e}) {tok:,} tok — {t or '(무제)'}")
payload = json.loads(qrow["payload_text"] or "{}")
preseg = dict(payload.get("presegment") or {})
preseg["units_override"] = {
"source": source,
"source_len": len(text),
"boundaries": [[s, e, t] for s, e, t in check.boundaries],
"applied_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
}
preseg["awaiting_split"] = False
# 알람 dedupe 리셋(다음 이벤트는 신선하게 발화) + 이전 거부/맵 잔재 제거
for k in ("alerted_at", "override_rejected", "override_rejected_at", "map_results"):
preseg.pop(k, None)
payload["presegment"] = preseg
payload.pop("deferred_until", None) # 즉시 재개
if dry_run:
print(f" [dry-run] queue {qrow['id']} 미변경 — 위 경계로 적용 가능")
_jsonl({
"cmd": "apply", "ok": True, "dry_run": True, "doc_id": doc_id,
"queue_id": qrow["id"], "units": len(check.boundaries),
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
})
return 0
await session.execute(
sql_text(APPLY_UPDATE_SQL),
{"payload": json.dumps(payload, ensure_ascii=False), "qid": qrow["id"]},
)
await session.commit()
print(
f" 적용 완료 — queue {qrow['id']} status=pending, deferred_until 제거 "
f"(다음 queue_consumer 사이클에 재개)"
)
_jsonl({
"cmd": "apply", "ok": True, "dry_run": False, "doc_id": doc_id,
"queue_id": qrow["id"], "units": len(check.boundaries),
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
})
return 0
finally:
await engine.dispose()
# ─── main ────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(description="presegment 유인 분할 CLI (PR3)")
sub = parser.add_subparsers(dest="cmd", required=True)
sub.add_parser("list", help="awaiting_split 보류 문서 목록")
p_export = sub.add_parser("export", help="유인 분할 작업 패키지 덤프")
p_export.add_argument("--doc", type=int, required=True)
p_export.add_argument("--out", default=None, help="출력 디렉토리 (기본 ./preseg_export_<doc>)")
p_apply = sub.add_parser("apply", help="완성된 boundaries 검증·적용(재개)")
p_apply.add_argument("--doc", type=int, required=True)
p_apply.add_argument("--boundaries", required=True, help="boundaries JSON 파일 경로")
p_apply.add_argument("--dry-run", action="store_true", help="검증만 하고 DB 미변경")
args = parser.parse_args()
if args.cmd == "list":
return asyncio.run(cmd_list())
if args.cmd == "export":
out = args.out or f"./preseg_export_{args.doc}"
return asyncio.run(cmd_export(args.doc, out))
if args.cmd == "apply":
return asyncio.run(cmd_apply(args.doc, args.boundaries, args.dry_run))
return 2
if __name__ == "__main__":
sys.exit(main())
+486
View File
@@ -0,0 +1,486 @@
"""presegment PR3 — HOLD 알람·units_override 재개·유인 분할 경계 검증 단위테스트.
test_deep_summary_mapreduce.py (PR2) 선례를 따르는 seam 단위 검증:
- services.alerts.send_alert: env 미설정 no-op(프로세스당 1 로그)·synochat/ntfy
포맷·실패 절대 raise 금지 (webhook 전부 fake 실호출 0).
- _hold_awaiting_split: 알람 발화 + alerted_at dedupe(7).
- validate_override_boundaries: 정상/중첩/캡초과/커버리지 부족/TODO 잔존/범위 .
- process(): units_override 존재 tier 판정(plan_summarize_units) 우회
map-reduce 재개 / 잘못된 override -HOLD + 알람 / override 없는 문서 무회귀.
"""
from __future__ import annotations
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import httpx # noqa: E402
import services.alerts as alerts # noqa: E402
from ai.envelope import EscalationEnvelope # noqa: E402
from models.queue import StageDeferred # noqa: E402
from services.summarize_units import ( # noqa: E402
CAP_TOKENS,
choose_override_source,
estimate_tokens,
extract_leaves,
leaf_spans,
plan_summarize_units,
units_from_boundaries,
validate_override_boundaries,
)
import workers.deep_summary_worker as dsw # noqa: E402
# ─── fixtures (PR2 테스트와 동일 계열) ───────────────────────────────────────
# 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole (HOLD 대상)
GIANT_WHOLE_MD = "# 통짜\n" + ("" * 60_000)
# trigger(25K tok) 이하 소형 문서 — 기존 단일콜 경로 (무회귀 확인용)
SMALL_MD = "# 소형\n" + ("" * 2_000)
MAP_JSON = (
'{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",'
' "inconsistencies": [], "confidence": 0.9}'
)
REDUCE_JSON = (
'{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",'
' "inconsistencies": [], "confidence": 0.8}'
)
class FakeSession:
def __init__(self, row=None):
self.commits = 0
self._row = row
async def commit(self):
self.commits += 1
class FakeProcSession(FakeSession):
"""process() 레벨 — session.get(Document) + execute(select queue_row) fake."""
def __init__(self, doc, row):
super().__init__(row)
self._doc = doc
async def get(self, model, pk):
return self._doc
async def execute(self, stmt):
row = self._row
return SimpleNamespace(scalar_one_or_none=lambda: row)
class FakeClient:
"""deep 슬롯 보유 — call_deep_or_defer 가 call_deep 을 탄다 (PR2 테스트 동일)."""
def __init__(self):
self.ai = SimpleNamespace(
deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000)
)
self.prompts: list[str] = []
async def call_deep(self, prompt: str, system=None) -> str:
self.prompts.append(prompt)
if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커
return REDUCE_JSON
return MAP_JSON
async def close(self):
pass
def _doc(text: str = GIANT_WHOLE_MD, md_content: str | None = None):
return SimpleNamespace(
id=999,
title="테스트 문서",
extracted_text=text,
md_content=md_content,
ai_detail_summary=None,
ai_inconsistencies=None,
ai_analysis_tier="triage",
ai_processed_at=None,
)
def _envelope_raw():
return {
"from_stage": "classify",
"escalation_reasons": ["long_context"],
"risk_flags": [],
"distilled_context": "4B 요지",
"original_pointers": {"doc_ids": [999]},
}
def _envelope():
return EscalationEnvelope.from_json(json.dumps(_envelope_raw()))
@pytest.fixture
def _patch_telemetry(monkeypatch):
events: list[dict] = []
async def fake_record(**kwargs):
events.append(kwargs)
monkeypatch.setattr(dsw, "record_analyze_event", fake_record)
return events
@pytest.fixture
def _alert_recorder(monkeypatch):
calls: list[tuple[str, str]] = []
async def fake_alert(title: str, message: str) -> bool:
calls.append((title, message))
return True
monkeypatch.setattr(dsw, "send_alert", fake_alert)
return calls
# ─── A. services.alerts ──────────────────────────────────────────────────────
class _FakeHttpxClient:
"""httpx.AsyncClient 대체 — post 호출 캡처. 실호출 0."""
captured: list[dict] = []
fail_with: Exception | None = None
status_code = 200
def __init__(self, timeout=None):
self.timeout = timeout
async def __aenter__(self):
return self
async def __aexit__(self, *exc):
return False
async def post(self, url, **kwargs):
if _FakeHttpxClient.fail_with is not None:
raise _FakeHttpxClient.fail_with
_FakeHttpxClient.captured.append({"url": url, **kwargs})
return SimpleNamespace(status_code=_FakeHttpxClient.status_code, text="ok")
@pytest.fixture
def _fake_httpx(monkeypatch):
_FakeHttpxClient.captured = []
_FakeHttpxClient.fail_with = None
_FakeHttpxClient.status_code = 200
monkeypatch.setattr(alerts.httpx, "AsyncClient", _FakeHttpxClient)
return _FakeHttpxClient
@pytest.mark.asyncio
async def test_send_alert_noop_without_env(monkeypatch, _fake_httpx):
monkeypatch.delenv("ALERT_WEBHOOK_URL", raising=False)
monkeypatch.setattr(alerts, "_noop_logged", False)
infos: list[str] = []
monkeypatch.setattr(alerts.logger, "info", lambda msg, *a, **k: infos.append(msg))
assert await alerts.send_alert("t", "m") is False
assert await alerts.send_alert("t", "m") is False
assert len(infos) == 1 # 프로세스당 1회만 로그
assert _fake_httpx.captured == [] # webhook 미호출
@pytest.mark.asyncio
async def test_send_alert_synochat_format(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
monkeypatch.delenv("ALERT_WEBHOOK_KIND", raising=False) # 기본 = synochat
assert await alerts.send_alert("제목", "본문 줄1\n줄2") is True
assert len(_fake_httpx.captured) == 1
call = _fake_httpx.captured[0]
assert call["url"] == "http://chat.local/webhook"
payload = json.loads(call["data"]["payload"])
assert payload == {"text": "제목\n본문 줄1\n줄2"}
@pytest.mark.asyncio
async def test_send_alert_ntfy_format(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://ntfy.local/ds")
monkeypatch.setenv("ALERT_WEBHOOK_KIND", "ntfy")
assert await alerts.send_alert("한글 제목", "알람 본문") is True
call = _fake_httpx.captured[0]
# 한글 제목은 헤더(latin-1 한정) 대신 query param
assert call["params"] == {"title": "한글 제목"}
assert call["content"] == "알람 본문".encode("utf-8")
@pytest.mark.asyncio
async def test_send_alert_failure_never_raises(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
_fake_httpx.fail_with = httpx.ConnectError("down")
assert await alerts.send_alert("t", "m") is False # raise 없이 False
_fake_httpx.fail_with = None
_fake_httpx.status_code = 500
assert await alerts.send_alert("t", "m") is False # HTTP 5xx 도 False
# ─── B. HOLD 알람 + alerted_at dedupe ───────────────────────────────────────
@pytest.mark.asyncio
async def test_hold_alerts_once_then_dedupes(_alert_recorder):
plan = plan_summarize_units(GIANT_WHOLE_MD)
assert plan.tier == "whole"
row = SimpleNamespace(payload={"envelope": {"x": 1}})
session = FakeSession()
# 1차 HOLD — 알람 발화 + alerted_at 기록
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 1
title, message = _alert_recorder[0]
assert "999" in title
assert "테스트 문서" in message and "whole" in message
assert f"export --doc 999" in message # 유인 분할 힌트
alerted_at = row.payload["presegment"]["alerted_at"]
assert datetime.fromisoformat(alerted_at)
# 2차 재보류(24h 후 재계획 시나리오) — 7일 이내 → 재알람 억제
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 1
assert row.payload["presegment"]["alerted_at"] == alerted_at # 미갱신
# 3차 — alerted_at 이 8일 전이면 재발화 + 갱신
stale = (datetime.now(timezone.utc) - timedelta(days=8)).isoformat()
payload = dict(row.payload)
payload["presegment"] = {**payload["presegment"], "alerted_at": stale}
row.payload = payload
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 2
assert row.payload["presegment"]["alerted_at"] != stale
# ─── C. validate_override_boundaries ────────────────────────────────────────
def test_validate_ok_full_coverage():
text = "" * 40_000 # ≈ 21,160 tok
bounds = [[0, 20_000, "전반"], [20_000, 40_000, "후반"]]
check = validate_override_boundaries(text, bounds)
assert check.ok and check.errors == []
assert check.coverage_pct == 100.0
assert check.boundaries == [(0, 20_000, "전반"), (20_000, 40_000, "후반")]
assert all(t <= CAP_TOKENS for t in check.unit_tokens)
def test_validate_dict_form_and_units_from_boundaries():
text = "" * 10_000
bounds = [{"start": 0, "end": 5_000, "title": "a"}, {"start": 5_000, "end": 10_000, "title": "b"}]
check = validate_override_boundaries(text, bounds)
assert check.ok
units = units_from_boundaries(text, check.boundaries)
assert [u.index for u in units] == [0, 1]
assert units[0].text == text[0:5_000]
assert units[0].section_titles == ["a"]
assert units[0].est_tokens == estimate_tokens(text[0:5_000])
def test_validate_rejects_overlap():
text = "" * 10_000
check = validate_override_boundaries(text, [[0, 6_000, "a"], [5_000, 10_000, "b"]])
assert not check.ok
assert any("중첩" in e for e in check.errors)
def test_validate_rejects_cap_exceed():
text = "" * 40_000 # 단일 유닛 ≈ 21,160 tok > CAP 12,000
check = validate_override_boundaries(text, [[0, 40_000, "통짜"]])
assert not check.ok
assert any("cap" in e and "유닛 0" in e for e in check.errors) # 어느 유닛인지 명시
def test_validate_rejects_low_coverage():
text = "" * 10_000
check = validate_override_boundaries(text, [[0, 1_000, "머리만"]])
assert not check.ok
assert any("커버리지" in e for e in check.errors)
def test_validate_rejects_out_of_range_and_todo():
text = "" * 1_000
check = validate_override_boundaries(text, [[0, 2_000, ""]])
assert not check.ok and any("범위 밖" in e for e in check.errors)
check2 = validate_override_boundaries(
text, [{"start": 0, "end": 1_000, "title": "t", "todo": "분할 필요"}]
)
assert not check2.ok and any("TODO" in e for e in check2.errors)
def test_validate_warns_on_gap_but_passes_coverage():
text = "" * 100_000
# 4% 공백 구간 — 커버리지 96% (>=90) 통과 + 경고
bounds = [[0, 20_000, "a"], [24_000, 100_000, None]]
# 24000~100000 = 76000자 ≈ 40,204 tok > CAP → cap 완화해 gap 경고만 검증
check = validate_override_boundaries(text, bounds, cap=50_000)
assert check.ok
assert any("공백 구간" in w for w in check.warnings)
def test_choose_override_source_prefers_md_content():
assert choose_override_source("# md 본문", "추출본") == ("md_content", "# md 본문")
assert choose_override_source(" \n", "추출본") == ("extracted_text", "추출본")
assert choose_override_source(None, None) == ("extracted_text", "")
def test_leaf_spans_reconstruct_source():
md = "# 절 1\n" + "" * 100 + "\n# 절 2\n" + "" * 100
leaves = extract_leaves(md)
spans = leaf_spans(md, leaves)
assert len(spans) == len(leaves)
for (s, e), leaf in zip(spans, leaves):
assert md[s:e] == leaf.text
# 인접 파티션 — 이어붙이면 원문 전체
assert spans[0][0] == 0 and spans[-1][1] == len(md)
# ─── D. units_override 재개 경로 (worker process 레벨) ──────────────────────
def _override_payload(text: str, n_units: int = 3, source: str = "extracted_text") -> dict:
step = len(text) // n_units
bounds = []
for i in range(n_units):
s = i * step
e = len(text) if i == n_units - 1 else (i + 1) * step
bounds.append([s, e, f"파트 {i + 1}"])
return {
"envelope": _envelope_raw(),
"subject_domain": "generic",
"presegment": {
"tier": "whole", "over_pct": 61.46, "awaiting_split": False,
"units_override": {
"source": source, "source_len": len(text), "boundaries": bounds,
},
},
}
@pytest.mark.asyncio
async def test_units_override_bypasses_tier_gate(monkeypatch, _patch_telemetry, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD) # override 없으면 whole → HOLD 였을 문서
row = SimpleNamespace(payload=_override_payload(GIANT_WHOLE_MD, n_units=3))
session = FakeProcSession(doc, row)
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
def _boom(*a, **k):
raise AssertionError("units_override 는 plan_summarize_units(tier 재판정)를 타면 안 됨")
monkeypatch.setattr(dsw, "plan_summarize_units", _boom)
await dsw.process(999, session)
# map 3 + reduce 1, 모든 콜 캡 준수 (오버헤드 = 정책 템플릿+envelope ~3K)
assert len(client.prompts) == 4
for p in client.prompts:
assert estimate_tokens(p) <= CAP_TOKENS + 3_000
assert doc.ai_detail_summary == "최종 상세."
assert doc.ai_analysis_tier == "deep"
preseg = row.payload["presegment"]
assert preseg["tier"] == "override"
assert preseg["over_pct"] == 61.46 # HOLD 당시 실측치 보존
assert len(preseg["map_results"]) == 3
assert preseg["units_override"]["boundaries"][0][2] == "파트 1" # override 보존(감사)
assert _alert_recorder == [] # 정상 재개 — 알람 없음
assert len(_patch_telemetry) == 1
@pytest.mark.asyncio
async def test_bad_override_reholds_and_alerts(monkeypatch, _patch_telemetry, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD)
payload = _override_payload(GIANT_WHOLE_MD, n_units=1) # 단일 유닛 ≈ 31.7K tok > CAP*1.1
row = SimpleNamespace(payload=payload)
session = FakeProcSession(doc, row)
def _no_llm():
raise AssertionError("잘못된 override 는 LLM 콜(900s 재생산)로 흐르면 안 됨")
monkeypatch.setattr(dsw, "AIClient", _no_llm)
with pytest.raises(StageDeferred) as ei:
await dsw.process(999, session)
assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES
preseg = row.payload["presegment"]
assert preseg["awaiting_split"] is True # 재-HOLD
assert "cap" in preseg["override_rejected"]
assert "units_override" in preseg # 원인 조사용 보존
assert len(_alert_recorder) == 1 # 거부 알람
assert "거부" in _alert_recorder[0][0]
assert doc.ai_detail_summary is None
assert _patch_telemetry == []
@pytest.mark.asyncio
async def test_override_source_len_mismatch_reholds(monkeypatch, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD)
payload = _override_payload(GIANT_WHOLE_MD, n_units=3)
payload["presegment"]["units_override"]["source_len"] = 12_345 # 본문 재생성 시나리오
row = SimpleNamespace(payload=payload)
session = FakeProcSession(doc, row)
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
with pytest.raises(StageDeferred):
await dsw.process(999, session)
assert row.payload["presegment"]["awaiting_split"] is True
assert "source_len 불일치" in row.payload["presegment"]["override_rejected"]
assert len(_alert_recorder) == 1
@pytest.mark.asyncio
async def test_no_override_small_doc_keeps_single_call_path(
monkeypatch, _patch_telemetry, _alert_recorder
):
"""무회귀 — units_override 없는 소형 문서는 기존 단일콜 경로 그대로."""
doc = _doc(SMALL_MD)
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
session = FakeProcSession(doc, row)
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
await dsw.process(999, session)
assert len(client.prompts) == 1 # 단일콜 (map-reduce 아님)
assert SMALL_MD[:200].split("\n")[1][:50] in client.prompts[0] # 원문 슬라이스 포함
assert doc.ai_detail_summary == "유닛 상세."
assert "presegment" not in row.payload # payload 무변경
assert _alert_recorder == []
@pytest.mark.asyncio
async def test_no_override_whole_doc_still_holds_with_alert(
monkeypatch, _patch_telemetry, _alert_recorder
):
"""override 미주입 whole 문서 — 기존 HOLD 시멘틱 유지 + PR3 알람만 추가."""
doc = _doc(GIANT_WHOLE_MD)
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
session = FakeProcSession(doc, row)
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
with pytest.raises(StageDeferred):
await dsw.process(999, session)
preseg = row.payload["presegment"]
assert preseg["awaiting_split"] is True and preseg["tier"] == "whole"
assert len(_alert_recorder) == 1
assert "HOLD" in _alert_recorder[0][0]
+66 -156
View File
@@ -4,6 +4,8 @@ services/queue_overview 의 SQL 수집부와 분리된 순수 판정 함수
(stage_machine_map / build_machines / build_summarize_eta / build_trend / (stage_machine_map / build_machines / build_summarize_eta / build_trend /
build_totals / compute_eta_minutes / rows_to_* / display_title) build_totals / compute_eta_minutes / rows_to_* / display_title)
mock 행으로 검증한다. 통합( SQL) 배포 라이브 smoke 확인. mock 행으로 검증한다. 통합( SQL) 배포 라이브 smoke 확인.
2026-07-02 컷오버 2노드(나스+맥미니) 기준 3노드 레인은 제거됨.
""" """
from datetime import datetime from datetime import datetime
@@ -18,7 +20,6 @@ from services.queue_overview import (
compute_eta_minutes, compute_eta_minutes,
display_title, display_title,
rows_to_stage_stats, rows_to_stage_stats,
rows_to_summarize_split,
stage_machine_map, stage_machine_map,
) )
@@ -36,186 +37,115 @@ def _stage(**kw) -> dict:
return base return base
def _split(macbook: dict | None = None, macmini: dict | None = None) -> dict:
"""summarize 풀 완료 실적 split — 미지정 0."""
zero = {"done_1h": 0, "done_today": 0, "done_15m": 0}
return {
"macbook": {**zero, **(macbook or {})},
"macmini": {**zero, **(macmini or {})},
}
def _machine(machines: list[dict], key: str) -> dict: def _machine(machines: list[dict], key: str) -> dict:
return next(m for m in machines if m["key"] == key) return next(m for m in machines if m["key"] == key)
# ─── stage→machine 귀속 맵 ──────────────────────────────────────────────────── # ─── stage→machine 귀속 맵 ────────────────────────────────────────────────────
def test_stage_machine_map_deep_enabled(): def test_stage_machine_map_two_nodes():
smap = stage_machine_map(deep_enabled=True) smap = stage_machine_map()
for s in ("extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt"): for s in ("extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt"):
assert smap[s] == "gpu" assert smap[s] == "nas"
assert smap["classify"] == "macmini" assert smap["classify"] == "macmini"
assert smap["summarize"] == "macmini" assert smap["summarize"] == "macmini"
assert smap["deep_summary"] == "macbook"
def test_stage_machine_map_deep_disabled():
"""deep 슬롯 부재 시 deep_summary 도 macmini 귀속."""
smap = stage_machine_map(deep_enabled=False)
assert smap["deep_summary"] == "macmini" assert smap["deep_summary"] == "macmini"
# ─── 머신 카드 귀속 합산 ────────────────────────────────────────────────────── # ─── 머신 카드 귀속 합산 ──────────────────────────────────────────────────────
def test_gpu_stage_counts_attribution(): def test_nas_stage_counts_attribution():
stats = { stats = {
"extract": _stage(pending=3, processing=1, done_1h=5, done_today=9, done_15m=1), "extract": _stage(pending=3, processing=1, done_1h=5, done_today=9, done_15m=1),
"stt": _stage(failed=2, done_1h=1, done_today=2), "stt": _stage(failed=2, done_1h=1, done_today=2),
} }
machines = build_machines(stats, _split(), [], deep_enabled=True) machines = build_machines(stats, [])
gpu = _machine(machines, "gpu") nas = _machine(machines, "nas")
assert (gpu["pending"], gpu["processing"], gpu["failed"]) == (3, 1, 2) assert (nas["pending"], nas["processing"], nas["failed"]) == (3, 1, 2)
assert (gpu["done_1h"], gpu["done_today"]) == (6, 11) assert (nas["done_1h"], nas["done_today"]) == (6, 11)
# gpu 의 stages 는 정적 8종 전부 (집계 0 이어도 표시) # nas 의 stages 는 정적 8종 전부 (집계 0 이어도 표시)
assert gpu["stages"] == [ assert nas["stages"] == [
"extract", "embed", "chunk", "markdown", "extract", "embed", "chunk", "markdown",
"preview", "thumbnail", "fulltext", "stt", "preview", "thumbnail", "fulltext", "stt",
] ]
def test_summarize_pool_split_attribution(): def test_macmini_llm_stages_attribution():
"""summarize pending/failed = macmini 귀속, 완료 실적은 split 로 분리 — """classify/summarize/deep_summary 전부 macmini 귀속 (단일 생성 LLM 허브)."""
stage-level summarize done 수치는 카드에 이중 합산되지 않는다."""
stats = { stats = {
"classify": _stage(done_1h=2, done_today=3), "classify": _stage(done_1h=2, done_today=3),
"summarize": _stage(pending=7, failed=1, done_1h=10, done_today=20), "summarize": _stage(pending=7, failed=1, done_1h=10, done_today=20),
"deep_summary": _stage(pending=2, processing=1, done_1h=3, done_today=4),
} }
split = _split(macbook={"done_1h": 4, "done_today": 8}, macmini={"done_1h": 6, "done_today": 12}) machines = build_machines(stats, [])
machines = build_machines(stats, split, [], deep_enabled=True)
macmini = _machine(machines, "macmini") macmini = _machine(machines, "macmini")
macbook = _machine(machines, "macbook") assert macmini["pending"] == 9 and macmini["failed"] == 1
assert macmini["processing"] == 1
assert macmini["pending"] == 7 and macmini["failed"] == 1 assert macmini["done_1h"] == 2 + 10 + 3
assert macmini["done_1h"] == 2 + 6 # classify + macmini 몫 (10 아님) assert macmini["done_today"] == 3 + 20 + 4
assert macmini["done_today"] == 3 + 12 assert macmini["stages"] == ["classify", "summarize", "deep_summary"]
assert macbook["done_1h"] == 4 and macbook["done_today"] == 8 assert _machine(machines, "nas")["pending"] == 0
assert macbook["pending"] == 0 # 풀 pending 은 macmini 만
def test_summarize_by_machine_projection(): def test_deferred_pending_on_macmini_card():
"""build_summarize_by_machine = split 의 done_1h/done_today 를 머신별로 투영 """보류(deferred_until 미래)는 summarize+deep_summary 합산으로 macmini 카드 귀속
(done_15m 제외 내부 state 판정 전용).""" (보류 = LLM 백오프 신호)."""
from services.queue_overview import build_summarize_by_machine
split = _split(
macbook={"done_1h": 226, "done_today": 312, "done_15m": 60},
macmini={"done_1h": 37, "done_today": 94, "done_15m": 9},
)
sbm = build_summarize_by_machine(split)
assert sbm == {
"macmini": {"done_1h": 37, "done_today": 94},
"macbook": {"done_1h": 226, "done_today": 312},
}
assert "done_15m" not in sbm["macbook"]
def test_compose_overview_includes_summarize_by_machine():
"""compose_overview 응답 계약에 summarize_by_machine 포함 (FE 레인 분담 재료)."""
now_kst = datetime(2026, 6, 13, 13, 0, tzinfo=KST)
stats = {"summarize": _stage(pending=1317, done_1h=264)}
split = _split(macbook={"done_1h": 226, "done_today": 312}, macmini={"done_1h": 37, "done_today": 94})
ov = compose_overview(stats, split, {}, {}, [], deep_enabled=True, now_kst=now_kst)
assert ov["summarize_by_machine"]["macbook"]["done_1h"] == 226
assert ov["summarize_by_machine"]["macmini"]["done_today"] == 94
def test_deep_disabled_deep_summary_counts_to_macmini():
stats = {"deep_summary": _stage(pending=2, processing=1, done_1h=3, done_today=4)}
machines = build_machines(stats, _split(), [], deep_enabled=False)
macmini = _machine(machines, "macmini")
macbook = _machine(machines, "macbook")
assert macmini["pending"] == 2 and macmini["processing"] == 1
assert macmini["done_1h"] == 3 and macmini["done_today"] == 4
assert macbook["stages"] == [] and macbook["pending"] == 0
assert _machine(machines, "macmini")["stages"] == ["classify", "summarize", "deep_summary"]
def test_deferred_pending_always_on_macbook_card():
"""보류(deferred_until 미래)는 summarize+deep_summary 합산으로 macbook 카드 귀속.
deep 슬롯 유무와 무관 (보류 = 맥북 불가 신호)."""
stats = { stats = {
"summarize": _stage(pending=5, deferred_pending=2), "summarize": _stage(pending=5, deferred_pending=2),
"deep_summary": _stage(pending=1, deferred_pending=1), "deep_summary": _stage(pending=1, deferred_pending=1),
} }
for deep_enabled in (True, False): machines = build_machines(stats, [])
machines = build_machines(stats, _split(), [], deep_enabled=deep_enabled) assert _machine(machines, "macmini")["deferred_pending"] == 3
assert _machine(machines, "macbook")["deferred_pending"] == 3 assert _machine(machines, "nas")["deferred_pending"] == 0
assert _machine(machines, "gpu")["deferred_pending"] == 0
assert _machine(machines, "macmini")["deferred_pending"] == 0
# ─── state 판정 ─────────────────────────────────────────────────────────────── # ─── state 판정 ───────────────────────────────────────────────────────────────
def test_macbook_state_active_wins_over_deferred_while_working(): def test_macmini_state_active_wins_over_deferred_while_working():
"""가동 > 보류 (사용자 피드백 2026-06-11): 일하고 있으면 백오프 잔여가 있어도 '가동'. """가동 > 보류 (사용자 피드백 2026-06-11): 일하고 있으면 백오프 잔여가 있어도 '가동'.
보류 건수는 deferred_pending 필드가 별도로 전달 카드 라인이 표시. 보류 건수는 deferred_pending 필드가 별도로 전달 카드 라인이 표시.
""" """
stats = {"summarize": _stage(pending=1, deferred_pending=1)} stats = {"summarize": _stage(pending=1, deferred_pending=1, done_15m=3)}
split = _split(macbook={"done_15m": 3}) machines = build_machines(stats, [])
machines = build_machines(stats, split, [], deep_enabled=True) mm = _machine(machines, "macmini")
mb = _machine(machines, "macbook") assert mm["state"] == "active"
assert mb["state"] == "active" assert mm["deferred_pending"] == 1
assert mb["deferred_pending"] == 1
def test_macbook_state_deferred_only_when_not_working(): def test_macmini_state_deferred_only_when_not_working():
"""일이 멈춰 있고(처리 0·최근 완료 0) 백오프만 쌓인 상태에서만 '보류'.""" """일이 멈춰 있고(처리 0·최근 완료 0) 백오프만 쌓인 상태에서만 '보류'."""
stats = {"summarize": _stage(pending=1, deferred_pending=1)} stats = {"summarize": _stage(pending=1, deferred_pending=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True) machines = build_machines(stats, [])
assert _machine(machines, "macbook")["state"] == "deferred" assert _machine(machines, "macmini")["state"] == "deferred"
def test_macbook_state_active_on_recent_qwen_done(): def test_macmini_state_idle():
split = _split(macbook={"done_15m": 1}) machines = build_machines({}, [])
machines = build_machines({}, split, [], deep_enabled=True)
assert _machine(machines, "macbook")["state"] == "active"
def test_macbook_state_idle():
machines = build_machines({}, _split(), [], deep_enabled=True)
assert _machine(machines, "macbook")["state"] == "idle"
def test_gpu_state_active_on_processing():
stats = {"extract": _stage(processing=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "active"
def test_gpu_state_active_on_recent_done():
stats = {"embed": _stage(done_15m=2)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "active"
def test_gpu_state_idle_when_old_done_only():
stats = {"embed": _stage(done_1h=5, done_today=9)} # 15분 내 완료 없음
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "idle"
def test_macmini_state_not_active_on_macbook_pool_done():
"""summarize 풀 완료가 전부 macbook 몫이면 macmini 는 active 아님 (귀속 기준)."""
stats = {"summarize": _stage(done_15m=1)}
split = _split(macbook={"done_15m": 1})
machines = build_machines(stats, split, [], deep_enabled=True)
assert _machine(machines, "macmini")["state"] == "idle" assert _machine(machines, "macmini")["state"] == "idle"
def test_nas_state_active_on_processing():
stats = {"extract": _stage(processing=1)}
machines = build_machines(stats, [])
assert _machine(machines, "nas")["state"] == "active"
def test_nas_state_active_on_recent_done():
stats = {"embed": _stage(done_15m=2)}
machines = build_machines(stats, [])
assert _machine(machines, "nas")["state"] == "active"
def test_nas_state_idle_when_old_done_only():
stats = {"embed": _stage(done_1h=5, done_today=9)} # 15분 내 완료 없음
machines = build_machines(stats, [])
assert _machine(machines, "nas")["state"] == "idle"
def test_macmini_state_active_on_summarize_processing(): def test_macmini_state_active_on_summarize_processing():
stats = {"summarize": _stage(processing=1)} stats = {"summarize": _stage(processing=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True) machines = build_machines(stats, [])
assert _machine(machines, "macmini")["state"] == "active" assert _machine(machines, "macmini")["state"] == "active"
@@ -228,21 +158,18 @@ def test_current_summarize_to_macmini_max_two():
{"stage": "summarize", "document_id": 3, "title": "문서C", "original_filename": None, "file_path": None}, {"stage": "summarize", "document_id": 3, "title": "문서C", "original_filename": None, "file_path": None},
{"stage": "extract", "document_id": 4, "title": "문서D", "original_filename": None, "file_path": None}, {"stage": "extract", "document_id": 4, "title": "문서D", "original_filename": None, "file_path": None},
] ]
machines = build_machines({}, _split(), rows, deep_enabled=True) machines = build_machines({}, rows)
macmini = _machine(machines, "macmini") macmini = _machine(machines, "macmini")
gpu = _machine(machines, "gpu") nas = _machine(machines, "nas")
assert [c["document_id"] for c in macmini["current"]] == [1, 2] # 최대 2건 assert [c["document_id"] for c in macmini["current"]] == [1, 2] # 최대 2건
assert macmini["current"][0] == {"document_id": 1, "title": "문서A", "stage": "summarize"} assert macmini["current"][0] == {"document_id": 1, "title": "문서A", "stage": "summarize"}
assert [c["document_id"] for c in gpu["current"]] == [4] assert [c["document_id"] for c in nas["current"]] == [4]
assert _machine(machines, "macbook")["current"] == []
def test_current_deep_summary_follows_deep_slot(): def test_current_deep_summary_to_macmini():
rows = [{"stage": "deep_summary", "document_id": 9, "title": "심층", "original_filename": None, "file_path": None}] rows = [{"stage": "deep_summary", "document_id": 9, "title": "심층", "original_filename": None, "file_path": None}]
enabled = build_machines({}, _split(), rows, deep_enabled=True) machines = build_machines({}, rows)
disabled = build_machines({}, _split(), rows, deep_enabled=False) assert _machine(machines, "macmini")["current"][0]["document_id"] == 9
assert _machine(enabled, "macbook")["current"][0]["document_id"] == 9
assert _machine(disabled, "macmini")["current"][0]["document_id"] == 9
def test_display_title_fallback_chain(): def test_display_title_fallback_chain():
@@ -344,32 +271,15 @@ def test_rows_to_stage_stats_conversion():
assert stats["summarize"]["deferred_pending"] == 2 assert stats["summarize"]["deferred_pending"] == 2
def test_rows_to_summarize_split_conversion():
rows = [
(True, 4, 8, 1), # is_macbook
(False, 6, 12, 0),
]
split = rows_to_summarize_split(rows)
assert split["macbook"] == {"done_1h": 4, "done_today": 8, "done_15m": 1}
assert split["macmini"] == {"done_1h": 6, "done_today": 12, "done_15m": 0}
def test_rows_to_summarize_split_empty():
split = rows_to_summarize_split([])
assert split["macbook"]["done_1h"] == 0 and split["macmini"]["done_today"] == 0
def test_compose_overview_contract_shape(): def test_compose_overview_contract_shape():
"""응답 dict 의 키가 FE 계약 shape 과 정확히 일치하는지 고정.""" """응답 dict 의 키가 FE 계약 shape 과 정확히 일치하는지 고정."""
out = compose_overview( out = compose_overview(
{"summarize": _stage(pending=1)}, {"summarize": _stage(pending=1)},
_split(),
{}, {}, [], {}, {}, [],
deep_enabled=True,
now_kst=datetime(2026, 6, 11, 14, 30, tzinfo=KST), now_kst=datetime(2026, 6, 11, 14, 30, tzinfo=KST),
) )
assert set(out.keys()) == {"machines", "stages", "summarize_eta", "trend_24h", "totals"} assert set(out.keys()) == {"machines", "stages", "summarize_eta", "trend_24h", "totals"}
assert [m["key"] for m in out["machines"]] == ["gpu", "macmini", "macbook"] assert [m["key"] for m in out["machines"]] == ["nas", "macmini"]
for m in out["machines"]: for m in out["machines"]:
assert set(m.keys()) == { assert set(m.keys()) == {
"key", "label", "state", "stages", "pending", "processing", "failed", "key", "label", "state", "stages", "pending", "processing", "failed",
@@ -381,7 +291,7 @@ def test_compose_overview_contract_shape():
assert set(out["trend_24h"][0].keys()) == {"hour", "inflow", "done"} assert set(out["trend_24h"][0].keys()) == {"hour", "inflow", "done"}
assert set(out["totals"].keys()) == {"pending", "processing", "failed"} assert set(out["totals"].keys()) == {"pending", "processing", "failed"}
# 머신 label 고정 (raw 모델명 노출 금지 — label 만) # 머신 label 고정 (raw 모델명 노출 금지 — label 만)
assert [m["label"] for m in out["machines"]] == ["GPU 서버", "맥미니", "맥북 M5 Max"] assert [m["label"] for m in out["machines"]] == ["나스", "맥미니"]
# ─── build_stages (단계별 현황 — 2026-06-11 사용자 피드백: 완료 가시화) ────── # ─── build_stages (단계별 현황 — 2026-06-11 사용자 피드백: 완료 가시화) ──────