Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 371ee4ebe6 | |||
| a55bb3453d | |||
| 9061f2e25c | |||
| 33427d4a42 | |||
| b91b05e889 | |||
| 304a2b9c0f |
@@ -37,8 +37,8 @@ class CurrentItem(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class MachineCard(BaseModel):
|
class MachineCard(BaseModel):
|
||||||
"""머신 카드 — stage 귀속 합산 + 완료 실적(summarize 는 풀 분리) + state."""
|
"""머신 카드 — stage 귀속 합산 + 완료 실적 + state (나스/맥미니 2노드)."""
|
||||||
key: Literal["gpu", "macmini", "macbook"]
|
key: Literal["nas", "macmini"]
|
||||||
label: str
|
label: str
|
||||||
state: Literal["active", "deferred", "idle"]
|
state: Literal["active", "deferred", "idle"]
|
||||||
stages: list[str]
|
stages: list[str]
|
||||||
@@ -59,20 +59,6 @@ class SummarizeEta(BaseModel):
|
|||||||
eta_minutes: int | None
|
eta_minutes: int | None
|
||||||
|
|
||||||
|
|
||||||
class MachineDone(BaseModel):
|
|
||||||
"""머신 1대의 summarize 완료 실적 (분담 표시용)."""
|
|
||||||
done_1h: int
|
|
||||||
done_today: int
|
|
||||||
|
|
||||||
|
|
||||||
class SummarizeByMachine(BaseModel):
|
|
||||||
"""summarize 풀의 머신별 완료 실적 분담 — 보드 레인의 '맥미니 vs 맥북'
|
|
||||||
오프로드 가시화용. rows_to_summarize_split 이 이미 계산하던 값의 노출
|
|
||||||
(ds-board-merged A-1, 신규 수집 SQL 0)."""
|
|
||||||
macmini: MachineDone
|
|
||||||
macbook: MachineDone
|
|
||||||
|
|
||||||
|
|
||||||
class TrendBucket(BaseModel):
|
class TrendBucket(BaseModel):
|
||||||
"""summarize 24h 추이 버킷 — hour 는 KST "HH:00" 라벨."""
|
"""summarize 24h 추이 버킷 — hour 는 KST "HH:00" 라벨."""
|
||||||
hour: str
|
hour: str
|
||||||
@@ -122,7 +108,6 @@ class QueueOverviewResponse(BaseModel):
|
|||||||
machines: list[MachineCard]
|
machines: list[MachineCard]
|
||||||
stages: list[StageRow]
|
stages: list[StageRow]
|
||||||
summarize_eta: SummarizeEta
|
summarize_eta: SummarizeEta
|
||||||
summarize_by_machine: SummarizeByMachine
|
|
||||||
trend_24h: list[TrendBucket]
|
trend_24h: list[TrendBucket]
|
||||||
totals: Totals
|
totals: Totals
|
||||||
background_jobs: list[BackgroundJobItem] = []
|
background_jobs: list[BackgroundJobItem] = []
|
||||||
|
|||||||
@@ -0,0 +1,69 @@
|
|||||||
|
"""운영 알람 webhook (presegment PR3 — HOLD 유인 전환 게이트).
|
||||||
|
|
||||||
|
deep_summary HOLD(awaiting_split) 처럼 "사람이 개입해야 풀리는" 상태를 웹훅으로 발화한다.
|
||||||
|
환경변수:
|
||||||
|
ALERT_WEBHOOK_URL — 미설정 = no-op (프로세스당 1회 INFO 로그만).
|
||||||
|
ALERT_WEBHOOK_KIND — 'synochat' | 'ntfy' (기본 synochat).
|
||||||
|
synochat: Synology Chat incoming webhook — POST form `payload={"text": "..."}`.
|
||||||
|
ntfy: POST body=message. 제목은 query param(?title=) — HTTP 헤더는 latin-1
|
||||||
|
한정이라 한글 제목이 깨진다 (ntfy 는 query param title 을 공식 지원).
|
||||||
|
|
||||||
|
불변식: 알람은 절대 raise 하지 않는다 — 실패는 WARNING 로그만. 알람이 워커를
|
||||||
|
죽이면 본전(요약 파이프라인)이 무너진다. 호출부는 반환값(bool)에 의존하지 말 것.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from core.utils import setup_logger
|
||||||
|
|
||||||
|
logger = setup_logger("alerts")
|
||||||
|
|
||||||
|
ALERT_TIMEOUT_SECONDS = 5.0
|
||||||
|
|
||||||
|
# 프로세스당 1회만 "미설정 no-op" 로그 — 매 HOLD 마다 로그 오염 방지.
|
||||||
|
_noop_logged = False
|
||||||
|
|
||||||
|
|
||||||
|
async def send_alert(title: str, message: str) -> bool:
|
||||||
|
"""webhook 으로 알람 1건 발화. 성공 True / no-op·실패 False. 절대 raise 금지."""
|
||||||
|
global _noop_logged
|
||||||
|
url = (os.getenv("ALERT_WEBHOOK_URL") or "").strip()
|
||||||
|
if not url:
|
||||||
|
if not _noop_logged:
|
||||||
|
logger.info("ALERT_WEBHOOK_URL 미설정 — 알람 no-op (이 로그는 프로세스당 1회)")
|
||||||
|
_noop_logged = True
|
||||||
|
return False
|
||||||
|
|
||||||
|
kind = (os.getenv("ALERT_WEBHOOK_KIND") or "synochat").strip().lower()
|
||||||
|
if kind not in ("synochat", "ntfy"):
|
||||||
|
logger.warning(f"ALERT_WEBHOOK_KIND={kind!r} 미지원 — synochat 으로 폴백")
|
||||||
|
kind = "synochat"
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=ALERT_TIMEOUT_SECONDS) as client:
|
||||||
|
if kind == "ntfy":
|
||||||
|
resp = await client.post(
|
||||||
|
url,
|
||||||
|
params={"title": title},
|
||||||
|
content=message.encode("utf-8"),
|
||||||
|
)
|
||||||
|
else: # synochat
|
||||||
|
text = f"{title}\n{message}" if title else message
|
||||||
|
resp = await client.post(
|
||||||
|
url,
|
||||||
|
data={"payload": json.dumps({"text": text}, ensure_ascii=False)},
|
||||||
|
)
|
||||||
|
if resp.status_code >= 400:
|
||||||
|
logger.warning(
|
||||||
|
f"알람 webhook({kind}) HTTP {resp.status_code}: {resp.text[:200]}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
except Exception as exc: # noqa: BLE001 — 알람 실패가 워커를 죽이면 안 됨
|
||||||
|
logger.warning(f"알람 webhook({kind}) 발화 실패: {exc}")
|
||||||
|
return False
|
||||||
+34
-112
@@ -3,19 +3,16 @@
|
|||||||
GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue /
|
GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue /
|
||||||
documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약).
|
documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약).
|
||||||
|
|
||||||
구조: SQL 수집부(build_overview 내부 5쿼리)와 판정부(순수 함수)를 분리.
|
구조: SQL 수집부(build_overview 내부 4쿼리)와 판정부(순수 함수)를 분리.
|
||||||
판정부(rows_to_* / build_machines / build_summarize_eta / build_trend /
|
판정부(rows_to_* / build_machines / build_summarize_eta / build_trend /
|
||||||
build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능.
|
build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능.
|
||||||
|
|
||||||
귀속 규칙 (단일 진실):
|
귀속 규칙 (단일 진실 — 2026-07-02 컷오버 후 나스+맥미니 2노드):
|
||||||
- stage→machine 정적 맵: gpu = extract/embed/chunk/markdown/preview/thumbnail/
|
- stage→machine 정적 맵: nas = extract/embed/chunk/markdown/preview/thumbnail/
|
||||||
fulltext/stt · macmini = classify/summarize · macbook = deep_summary
|
fulltext/stt (DS 본체 Docker — 임베딩·리랭크 모델 콜은 맥미니로 나감) ·
|
||||||
(단, settings.ai.deep 부재 시 deep_summary 도 macmini 귀속).
|
macmini = classify/summarize/deep_summary (단일 생성 LLM 허브).
|
||||||
- summarize 는 풀(pool): pending/processing/failed 는 macmini 귀속이되, 완료
|
- deferred_pending(payload.deferred_until 미래)은 LLM 백오프 신호 —
|
||||||
실적(done_*)은 documents.ai_model_version 조인으로 분리 — 'qwen-macbook'
|
summarize/deep_summary 소속인 macmini 카드 귀속.
|
||||||
이면 macbook 실적, 아니면 macmini 실적.
|
|
||||||
- deferred_pending(payload.deferred_until 미래)은 macbook 카드 귀속
|
|
||||||
(보류 = 맥북 불가 신호).
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
@@ -25,42 +22,33 @@ from zoneinfo import ZoneInfo
|
|||||||
from sqlalchemy import bindparam, text
|
from sqlalchemy import bindparam, text
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from core.config import settings
|
|
||||||
|
|
||||||
KST = ZoneInfo("Asia/Seoul")
|
KST = ZoneInfo("Asia/Seoul")
|
||||||
|
|
||||||
# 내부 판별용 alias — 응답에 raw 모델명 노출 금지, 머신 label 만 노출.
|
|
||||||
_MACBOOK_MODEL_ALIAS = "qwen-macbook"
|
|
||||||
|
|
||||||
# stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서)
|
# stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서)
|
||||||
_GPU_STAGES = (
|
_NAS_STAGES = (
|
||||||
"extract", "embed", "chunk", "markdown",
|
"extract", "embed", "chunk", "markdown",
|
||||||
"preview", "thumbnail", "fulltext", "stt",
|
"preview", "thumbnail", "fulltext", "stt",
|
||||||
)
|
)
|
||||||
_MACMINI_STAGES = ("classify", "summarize")
|
_MACMINI_STAGES = ("classify", "summarize", "deep_summary")
|
||||||
_MACBOOK_STAGES = ("deep_summary",)
|
_STAGE_ORDER = _NAS_STAGES + _MACMINI_STAGES
|
||||||
_STAGE_ORDER = _GPU_STAGES + _MACMINI_STAGES + _MACBOOK_STAGES
|
|
||||||
|
|
||||||
_MACHINE_KEYS = ("gpu", "macmini", "macbook")
|
_MACHINE_KEYS = ("nas", "macmini")
|
||||||
_MACHINE_LABELS = {
|
_MACHINE_LABELS = {
|
||||||
"gpu": "GPU 서버",
|
"nas": "나스",
|
||||||
"macmini": "맥미니",
|
"macmini": "맥미니",
|
||||||
"macbook": "맥북 M5 Max",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# 머신 카드당 current 표시 상한
|
# 머신 카드당 current 표시 상한
|
||||||
_CURRENT_LIMIT = 2
|
_CURRENT_LIMIT = 2
|
||||||
|
|
||||||
|
|
||||||
def stage_machine_map(deep_enabled: bool) -> dict[str, str]:
|
def stage_machine_map() -> dict[str, str]:
|
||||||
"""stage → machine key 맵. deep 슬롯 부재 시 deep_summary 는 macmini 귀속."""
|
"""stage → machine key 맵 (정적 — 나스/맥미니 2노드)."""
|
||||||
mapping: dict[str, str] = {}
|
mapping: dict[str, str] = {}
|
||||||
for s in _GPU_STAGES:
|
for s in _NAS_STAGES:
|
||||||
mapping[s] = "gpu"
|
mapping[s] = "nas"
|
||||||
for s in _MACMINI_STAGES:
|
for s in _MACMINI_STAGES:
|
||||||
mapping[s] = "macmini"
|
mapping[s] = "macmini"
|
||||||
for s in _MACBOOK_STAGES:
|
|
||||||
mapping[s] = "macbook" if deep_enabled else "macmini"
|
|
||||||
return mapping
|
return mapping
|
||||||
|
|
||||||
|
|
||||||
@@ -90,23 +78,6 @@ def rows_to_stage_stats(rows) -> dict[str, dict]:
|
|||||||
return stats
|
return stats
|
||||||
|
|
||||||
|
|
||||||
def rows_to_summarize_split(rows) -> dict[str, dict]:
|
|
||||||
"""summarize 완료 실적 분리 쿼리 행 → {"macbook"|"macmini": {done_*}}.
|
|
||||||
|
|
||||||
is_macbook = documents.ai_model_version 이 'qwen-macbook' 인지 (내부 판별 전용).
|
|
||||||
"""
|
|
||||||
split = {
|
|
||||||
"macbook": {"done_1h": 0, "done_today": 0, "done_15m": 0},
|
|
||||||
"macmini": {"done_1h": 0, "done_today": 0, "done_15m": 0},
|
|
||||||
}
|
|
||||||
for row in rows:
|
|
||||||
key = "macbook" if row[0] else "macmini"
|
|
||||||
split[key]["done_1h"] += int(row[1] or 0)
|
|
||||||
split[key]["done_today"] += int(row[2] or 0)
|
|
||||||
split[key]["done_15m"] += int(row[3] or 0)
|
|
||||||
return split
|
|
||||||
|
|
||||||
|
|
||||||
def display_title(row: dict) -> str:
|
def display_title(row: dict) -> str:
|
||||||
"""표시용 제목 — title > original_filename > file_path basename > 문서 id."""
|
"""표시용 제목 — title > original_filename > file_path basename > 문서 id."""
|
||||||
if row.get("title"):
|
if row.get("title"):
|
||||||
@@ -120,13 +91,10 @@ def display_title(row: dict) -> str:
|
|||||||
|
|
||||||
def build_machines(
|
def build_machines(
|
||||||
stage_stats: dict[str, dict],
|
stage_stats: dict[str, dict],
|
||||||
summarize_split: dict[str, dict],
|
|
||||||
current_rows: list[dict],
|
current_rows: list[dict],
|
||||||
*,
|
|
||||||
deep_enabled: bool,
|
|
||||||
) -> list[dict]:
|
) -> list[dict]:
|
||||||
"""머신 카드 3장 (gpu / macmini / macbook) 구성 — 귀속 규칙의 판정부."""
|
"""머신 카드 2장 (nas / macmini) 구성 — 귀속 규칙의 판정부."""
|
||||||
smap = stage_machine_map(deep_enabled)
|
smap = stage_machine_map()
|
||||||
|
|
||||||
def g(stage: str, field: str) -> int:
|
def g(stage: str, field: str) -> int:
|
||||||
return stage_stats.get(stage, {}).get(field, 0)
|
return stage_stats.get(stage, {}).get(field, 0)
|
||||||
@@ -149,29 +117,23 @@ def build_machines(
|
|||||||
pending = sum(g(s, "pending") for s in stages)
|
pending = sum(g(s, "pending") for s in stages)
|
||||||
processing = sum(g(s, "processing") for s in stages)
|
processing = sum(g(s, "processing") for s in stages)
|
||||||
failed = sum(g(s, "failed") for s in stages)
|
failed = sum(g(s, "failed") for s in stages)
|
||||||
|
done_1h = sum(g(s, "done_1h") for s in stages)
|
||||||
|
done_today = sum(g(s, "done_today") for s in stages)
|
||||||
|
done_15m = sum(g(s, "done_15m") for s in stages)
|
||||||
|
|
||||||
# 완료 실적: summarize 는 풀이라 stage 합산에서 제외하고 split 로 귀속
|
# 보류 백오프 = LLM 불가 신호 → LLM stage 소속인 macmini 카드 귀속
|
||||||
done_1h = sum(g(s, "done_1h") for s in stages if s != "summarize")
|
|
||||||
done_today = sum(g(s, "done_today") for s in stages if s != "summarize")
|
|
||||||
done_15m = sum(g(s, "done_15m") for s in stages if s != "summarize")
|
|
||||||
if key in summarize_split:
|
|
||||||
done_1h += summarize_split[key]["done_1h"]
|
|
||||||
done_today += summarize_split[key]["done_today"]
|
|
||||||
done_15m += summarize_split[key]["done_15m"]
|
|
||||||
|
|
||||||
# 보류 백오프 = 맥북 불가 신호 → macbook 카드 귀속 (deep 슬롯 유무 무관)
|
|
||||||
deferred_pending = (
|
deferred_pending = (
|
||||||
g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending")
|
g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending")
|
||||||
if key == "macbook" else 0
|
if key == "macmini" else 0
|
||||||
)
|
)
|
||||||
|
|
||||||
# state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11).
|
# state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11).
|
||||||
# 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" —
|
# 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" —
|
||||||
# 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은
|
# 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은
|
||||||
# 실제로 일이 멈춰 있고 백오프만 쌓인 상태(sleep/불가 지속)에서만.
|
# 실제로 일이 멈춰 있고 백오프만 쌓인 상태(LLM 허브 불가 지속)에서만.
|
||||||
if processing > 0 or done_15m > 0:
|
if processing > 0 or done_15m > 0:
|
||||||
state = "active"
|
state = "active"
|
||||||
elif key == "macbook" and deferred_pending > 0:
|
elif deferred_pending > 0:
|
||||||
state = "deferred"
|
state = "deferred"
|
||||||
else:
|
else:
|
||||||
state = "idle"
|
state = "idle"
|
||||||
@@ -213,16 +175,6 @@ def build_summarize_eta(stage_stats: dict[str, dict]) -> dict:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def build_summarize_by_machine(summarize_split: dict[str, dict]) -> dict:
|
|
||||||
"""summarize 머신별 완료 실적 분담 (macmini vs macbook) — 보드 레인의
|
|
||||||
오프로드 가시화용. rows_to_summarize_split 이 이미 만든 값을 응답 형태로
|
|
||||||
투영(done_1h/done_today 만, done_15m 은 내부 state 판정 전용이라 제외)."""
|
|
||||||
def m(key: str) -> dict:
|
|
||||||
s = summarize_split.get(key, {})
|
|
||||||
return {"done_1h": int(s.get("done_1h", 0)), "done_today": int(s.get("done_today", 0))}
|
|
||||||
return {"macmini": m("macmini"), "macbook": m("macbook")}
|
|
||||||
|
|
||||||
|
|
||||||
def build_trend(
|
def build_trend(
|
||||||
inflow_buckets: dict[str, int],
|
inflow_buckets: dict[str, int],
|
||||||
done_buckets: dict[str, int],
|
done_buckets: dict[str, int],
|
||||||
@@ -287,28 +239,23 @@ def build_totals(stage_stats: dict[str, dict]) -> dict:
|
|||||||
|
|
||||||
def compose_overview(
|
def compose_overview(
|
||||||
stage_stats: dict[str, dict],
|
stage_stats: dict[str, dict],
|
||||||
summarize_split: dict[str, dict],
|
|
||||||
inflow_buckets: dict[str, int],
|
inflow_buckets: dict[str, int],
|
||||||
done_buckets: dict[str, int],
|
done_buckets: dict[str, int],
|
||||||
current_rows: list[dict],
|
current_rows: list[dict],
|
||||||
*,
|
*,
|
||||||
deep_enabled: bool,
|
|
||||||
now_kst: datetime,
|
now_kst: datetime,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요."""
|
"""수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요."""
|
||||||
return {
|
return {
|
||||||
"machines": build_machines(
|
"machines": build_machines(stage_stats, current_rows),
|
||||||
stage_stats, summarize_split, current_rows, deep_enabled=deep_enabled
|
|
||||||
),
|
|
||||||
"stages": build_stages(stage_stats),
|
"stages": build_stages(stage_stats),
|
||||||
"summarize_eta": build_summarize_eta(stage_stats),
|
"summarize_eta": build_summarize_eta(stage_stats),
|
||||||
"summarize_by_machine": build_summarize_by_machine(summarize_split),
|
|
||||||
"trend_24h": build_trend(inflow_buckets, done_buckets, now_kst),
|
"trend_24h": build_trend(inflow_buckets, done_buckets, now_kst),
|
||||||
"totals": build_totals(stage_stats),
|
"totals": build_totals(stage_stats),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
# ─── SQL 수집부 (총 5쿼리) ────────────────────────────────────────────────────
|
# ─── SQL 수집부 (총 4쿼리) ────────────────────────────────────────────────────
|
||||||
|
|
||||||
# 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방)
|
# 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방)
|
||||||
_STAGE_STATS_SQL = """
|
_STAGE_STATS_SQL = """
|
||||||
@@ -333,23 +280,7 @@ _STAGE_STATS_SQL = """
|
|||||||
GROUP BY stage
|
GROUP BY stage
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# 2) summarize 풀 완료 실적 분리 (documents.ai_model_version 조인, 1방)
|
# 2/3) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
|
||||||
# 스캔 하한 = 오늘 0시(KST)와 1h 전 중 더 이른 시각 (자정 직후 1h 창 보전).
|
|
||||||
_SUMMARIZE_SPLIT_SQL = """
|
|
||||||
SELECT
|
|
||||||
COALESCE(d.ai_model_version = :macbook_alias, false) AS is_macbook,
|
|
||||||
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '1 hour') AS done_1h,
|
|
||||||
COUNT(*) FILTER (WHERE q.completed_at > :kst_midnight) AS done_today,
|
|
||||||
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '15 minutes') AS done_15m
|
|
||||||
FROM processing_queue q
|
|
||||||
JOIN documents d ON d.id = q.document_id
|
|
||||||
WHERE q.stage = 'summarize'
|
|
||||||
AND q.status = 'completed'
|
|
||||||
AND q.completed_at > LEAST(:kst_midnight, NOW() - INTERVAL '1 hour')
|
|
||||||
GROUP BY 1
|
|
||||||
"""
|
|
||||||
|
|
||||||
# 3/4) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
|
|
||||||
_TREND_INFLOW_SQL = """
|
_TREND_INFLOW_SQL = """
|
||||||
SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'),
|
SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'),
|
||||||
'YYYY-MM-DD HH24:00') AS bucket,
|
'YYYY-MM-DD HH24:00') AS bucket,
|
||||||
@@ -371,7 +302,7 @@ _TREND_DONE_SQL = """
|
|||||||
GROUP BY 1
|
GROUP BY 1
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# 5) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서)
|
# 4) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서)
|
||||||
_CURRENT_SQL = """
|
_CURRENT_SQL = """
|
||||||
SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path
|
SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path
|
||||||
FROM processing_queue q
|
FROM processing_queue q
|
||||||
@@ -383,20 +314,13 @@ _CURRENT_SQL = """
|
|||||||
|
|
||||||
|
|
||||||
async def build_overview(session: AsyncSession) -> dict:
|
async def build_overview(session: AsyncSession) -> dict:
|
||||||
"""5쿼리 수집 → compose_overview 판정 → 응답 dict."""
|
"""4쿼리 수집 → compose_overview 판정 → 응답 dict."""
|
||||||
now_kst = datetime.now(KST)
|
now_kst = datetime.now(KST)
|
||||||
kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0)
|
kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||||
deep_enabled = settings.ai is not None and settings.ai.deep is not None
|
|
||||||
|
|
||||||
stage_rows = (
|
stage_rows = (
|
||||||
await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight})
|
await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight})
|
||||||
).all()
|
).all()
|
||||||
split_rows = (
|
|
||||||
await session.execute(
|
|
||||||
text(_SUMMARIZE_SPLIT_SQL),
|
|
||||||
{"kst_midnight": kst_midnight, "macbook_alias": _MACBOOK_MODEL_ALIAS},
|
|
||||||
)
|
|
||||||
).all()
|
|
||||||
inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all()
|
inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all()
|
||||||
done_rows = (await session.execute(text(_TREND_DONE_SQL))).all()
|
done_rows = (await session.execute(text(_TREND_DONE_SQL))).all()
|
||||||
current_result = (await session.execute(text(_CURRENT_SQL))).all()
|
current_result = (await session.execute(text(_CURRENT_SQL))).all()
|
||||||
@@ -414,11 +338,9 @@ async def build_overview(session: AsyncSession) -> dict:
|
|||||||
|
|
||||||
result = compose_overview(
|
result = compose_overview(
|
||||||
rows_to_stage_stats(stage_rows),
|
rows_to_stage_stats(stage_rows),
|
||||||
rows_to_summarize_split(split_rows),
|
|
||||||
{row[0]: int(row[1]) for row in inflow_rows},
|
{row[0]: int(row[1]) for row in inflow_rows},
|
||||||
{row[0]: int(row[1]) for row in done_rows},
|
{row[0]: int(row[1]) for row in done_rows},
|
||||||
current_rows,
|
current_rows,
|
||||||
deep_enabled=deep_enabled,
|
|
||||||
now_kst=now_kst,
|
now_kst=now_kst,
|
||||||
)
|
)
|
||||||
# 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]).
|
# 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]).
|
||||||
@@ -426,13 +348,13 @@ async def build_overview(session: AsyncSession) -> dict:
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
# kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = gpu(오케스트레이션 호스트).
|
# kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = nas(오케스트레이션 호스트).
|
||||||
_BG_JOB_MACHINE = {
|
_BG_JOB_MACHINE = {
|
||||||
"global_digest": "macmini",
|
"global_digest": "macmini",
|
||||||
"morning_briefing": "macmini",
|
"morning_briefing": "macmini",
|
||||||
"section_summary": "macmini",
|
"section_summary": "macmini",
|
||||||
"hier_backfill": "gpu",
|
"hier_backfill": "nas",
|
||||||
"hier_redecompose": "gpu",
|
"hier_redecompose": "nas",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -466,7 +388,7 @@ async def _fetch_background_jobs(session: AsyncSession) -> list[dict]:
|
|||||||
"processed": int(r["processed"] or 0), "total": r["total"],
|
"processed": int(r["processed"] or 0), "total": r["total"],
|
||||||
"elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]),
|
"elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]),
|
||||||
"error": r["error"],
|
"error": r["error"],
|
||||||
"machine": _BG_JOB_MACHINE.get(r["kind"], "gpu"),
|
"machine": _BG_JOB_MACHINE.get(r["kind"], "nas"),
|
||||||
}
|
}
|
||||||
for r in rows
|
for r in rows
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -66,6 +66,9 @@ class SummarizeUnit:
|
|||||||
text: str = ""
|
text: str = ""
|
||||||
est_tokens: int = 0
|
est_tokens: int = 0
|
||||||
over_cap: bool = False # 단독 섹션이 CAP 초과 (hybrid 시 클로드 대상)
|
over_cap: bool = False # 단독 섹션이 CAP 초과 (hybrid 시 클로드 대상)
|
||||||
|
# PR3: 이 유닛을 구성한 leaf 의 서수(extract_leaves 순서) — export CLI 가
|
||||||
|
# leaf_spans 와 결합해 유닛 (start,end) 스팬을 계산한다. 페이로드 미기록.
|
||||||
|
leaf_indexes: list[int] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -92,20 +95,22 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
|
|||||||
units: list[SummarizeUnit] = []
|
units: list[SummarizeUnit] = []
|
||||||
cur_titles: list[str | None] = []
|
cur_titles: list[str | None] = []
|
||||||
cur_texts: list[str] = []
|
cur_texts: list[str] = []
|
||||||
|
cur_indexes: list[int] = []
|
||||||
cur_tokens = 0
|
cur_tokens = 0
|
||||||
|
|
||||||
def _flush() -> None:
|
def _flush() -> None:
|
||||||
nonlocal cur_titles, cur_texts, cur_tokens
|
nonlocal cur_titles, cur_texts, cur_indexes, cur_tokens
|
||||||
if cur_texts:
|
if cur_texts:
|
||||||
units.append(SummarizeUnit(
|
units.append(SummarizeUnit(
|
||||||
index=len(units),
|
index=len(units),
|
||||||
section_titles=cur_titles,
|
section_titles=cur_titles,
|
||||||
text="\n\n".join(cur_texts),
|
text="\n\n".join(cur_texts),
|
||||||
est_tokens=cur_tokens,
|
est_tokens=cur_tokens,
|
||||||
|
leaf_indexes=cur_indexes,
|
||||||
))
|
))
|
||||||
cur_titles, cur_texts, cur_tokens = [], [], 0
|
cur_titles, cur_texts, cur_indexes, cur_tokens = [], [], [], 0
|
||||||
|
|
||||||
for leaf in leaves:
|
for li, leaf in enumerate(leaves):
|
||||||
t = estimate_tokens(leaf.text)
|
t = estimate_tokens(leaf.text)
|
||||||
if t > cap:
|
if t > cap:
|
||||||
_flush()
|
_flush()
|
||||||
@@ -115,12 +120,14 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
|
|||||||
text=leaf.text,
|
text=leaf.text,
|
||||||
est_tokens=t,
|
est_tokens=t,
|
||||||
over_cap=True,
|
over_cap=True,
|
||||||
|
leaf_indexes=[li],
|
||||||
))
|
))
|
||||||
continue
|
continue
|
||||||
if cur_tokens + t > cap:
|
if cur_tokens + t > cap:
|
||||||
_flush()
|
_flush()
|
||||||
cur_titles.append(leaf.section_title)
|
cur_titles.append(leaf.section_title)
|
||||||
cur_texts.append(leaf.text)
|
cur_texts.append(leaf.text)
|
||||||
|
cur_indexes.append(li)
|
||||||
cur_tokens += t
|
cur_tokens += t
|
||||||
_flush()
|
_flush()
|
||||||
return units
|
return units
|
||||||
@@ -222,3 +229,186 @@ def build_reduce_units_block(
|
|||||||
block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))]
|
block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))]
|
||||||
truncated = True
|
truncated = True
|
||||||
return block, truncated
|
return block, truncated
|
||||||
|
|
||||||
|
|
||||||
|
# ─── PR3 — 유인 분할(units_override) 경계 순수함수 (worker + attended CLI 공용) ───
|
||||||
|
#
|
||||||
|
# HOLD(hybrid/whole) 문서를 사람이(유인 클로드 세션) 분할한 경계
|
||||||
|
# [(start, end, title)] 로 재개하는 경로. 오프셋 = 소스 텍스트의 Python 문자
|
||||||
|
# (code point) 인덱스 — export 가 덤프한 파일과 apply/워커의 슬라이스가 같은
|
||||||
|
# 기준을 공유해야 한다 (builder 의 char_start 는 UTF-16 단위라 여기서 미사용).
|
||||||
|
|
||||||
|
OVERRIDE_MIN_COVERAGE_PCT = 90.0 # apply 게이트 — 전체 본문의 90%+ 커버 필수
|
||||||
|
OVERRIDE_GAP_WARN_CHARS = 1_000 # 이보다 큰 공백 구간은 경고로 노출
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class OverrideCheck:
|
||||||
|
"""validate_override_boundaries 결과 — ok=False 면 errors 에 사유."""
|
||||||
|
ok: bool
|
||||||
|
errors: list[str] = field(default_factory=list)
|
||||||
|
warnings: list[str] = field(default_factory=list)
|
||||||
|
coverage_pct: float = 0.0
|
||||||
|
# 정규화된 (start, end, title) — units_from_boundaries 입력으로 그대로 사용
|
||||||
|
boundaries: list[tuple[int, int, str | None]] = field(default_factory=list)
|
||||||
|
unit_tokens: list[int] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
def choose_override_source(
|
||||||
|
md_content: str | None, extracted_text: str | None
|
||||||
|
) -> tuple[str, str]:
|
||||||
|
"""units_override 오프셋 기준 텍스트 선택 — (source_name, text).
|
||||||
|
|
||||||
|
canonical markdown(md_content) 우선, 부재/공백 시 extracted_text 폴백.
|
||||||
|
export CLI · apply CLI · 워커 재개가 반드시 같은 규칙을 공유해야
|
||||||
|
(start,end) 오프셋이 일치한다. 선택 결과는 units_override.source 에 박제.
|
||||||
|
"""
|
||||||
|
if md_content and md_content.strip():
|
||||||
|
return "md_content", md_content
|
||||||
|
return "extracted_text", extracted_text or ""
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_boundary(entry, idx: int, errors: list[str]) -> tuple[int, int, str | None] | None:
|
||||||
|
"""boundaries 1건 정규화 — [start,end,title?] 배열 또는 {start,end,title} 객체.
|
||||||
|
|
||||||
|
export 템플릿의 초과 스팬은 "todo" 키를 달고 나온다 — 미해결(todo 잔존) 상태로
|
||||||
|
apply 하면 여기서 에러 (사람이 CAP 이하 경계로 분할을 완성해야 통과).
|
||||||
|
"""
|
||||||
|
if isinstance(entry, dict):
|
||||||
|
if entry.get("todo"):
|
||||||
|
errors.append(
|
||||||
|
f"유닛 {idx}: TODO 미해결 — 초과 스팬을 CAP 이하 경계들로 분할한 뒤 todo 키를 제거해야 함"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
start, end, title = entry.get("start"), entry.get("end"), entry.get("title")
|
||||||
|
elif isinstance(entry, (list, tuple)) and 2 <= len(entry) <= 3:
|
||||||
|
start, end = entry[0], entry[1]
|
||||||
|
title = entry[2] if len(entry) == 3 else None
|
||||||
|
else:
|
||||||
|
errors.append(f"유닛 {idx}: 형식 오류 — [start, end, title] 또는 {{start, end, title}} 여야 함")
|
||||||
|
return None
|
||||||
|
if (
|
||||||
|
isinstance(start, bool) or isinstance(end, bool)
|
||||||
|
or not isinstance(start, int) or not isinstance(end, int)
|
||||||
|
):
|
||||||
|
errors.append(f"유닛 {idx}: start/end 는 정수여야 함 (start={start!r}, end={end!r})")
|
||||||
|
return None
|
||||||
|
if title is not None and not isinstance(title, str):
|
||||||
|
title = str(title)
|
||||||
|
return start, end, title
|
||||||
|
|
||||||
|
|
||||||
|
def validate_override_boundaries(
|
||||||
|
text: str,
|
||||||
|
raw_boundaries,
|
||||||
|
*,
|
||||||
|
cap: int = CAP_TOKENS,
|
||||||
|
min_coverage_pct: float = OVERRIDE_MIN_COVERAGE_PCT,
|
||||||
|
gap_warn_chars: int = OVERRIDE_GAP_WARN_CHARS,
|
||||||
|
) -> OverrideCheck:
|
||||||
|
"""units_override 경계 검증 — 단조증가·비중첩·본문 범위 내·커버리지·유닛별 캡.
|
||||||
|
|
||||||
|
apply CLI = 기본 게이트(cap=CAP_TOKENS, coverage>=90%).
|
||||||
|
워커 방어 = cap 슬랙(CAP*1.1)·coverage 0 으로 완화 호출 — 잘못된 override 가
|
||||||
|
900s 콜을 재생산하는 것만 차단하고, 품질 게이트는 apply 시점에 이미 통과했다고 본다.
|
||||||
|
"""
|
||||||
|
errors: list[str] = []
|
||||||
|
warnings: list[str] = []
|
||||||
|
|
||||||
|
if not isinstance(raw_boundaries, (list, tuple)) or not raw_boundaries:
|
||||||
|
return OverrideCheck(ok=False, errors=["boundaries 가 비어있거나 리스트가 아님"])
|
||||||
|
|
||||||
|
normalized: list[tuple[int, int, str | None]] = []
|
||||||
|
for i, entry in enumerate(raw_boundaries):
|
||||||
|
norm = _normalize_boundary(entry, i, errors)
|
||||||
|
if norm is not None:
|
||||||
|
normalized.append(norm)
|
||||||
|
if errors:
|
||||||
|
return OverrideCheck(ok=False, errors=errors, warnings=warnings)
|
||||||
|
|
||||||
|
n = len(text)
|
||||||
|
prev_end = None
|
||||||
|
unit_tokens: list[int] = []
|
||||||
|
covered = 0
|
||||||
|
for i, (start, end, title) in enumerate(normalized):
|
||||||
|
label = f"유닛 {i}" + (f" ({title})" if title else "")
|
||||||
|
if start < 0 or end > n:
|
||||||
|
errors.append(f"{label}: 본문 범위 밖 — [{start}, {end}) vs len={n}")
|
||||||
|
continue
|
||||||
|
if start >= end:
|
||||||
|
errors.append(f"{label}: start >= end ([{start}, {end}))")
|
||||||
|
continue
|
||||||
|
if prev_end is not None and start < prev_end:
|
||||||
|
errors.append(f"{label}: 직전 유닛과 중첩/역순 — start={start} < 직전 end={prev_end}")
|
||||||
|
if prev_end is not None and start - prev_end > gap_warn_chars:
|
||||||
|
warnings.append(f"{label} 앞 공백 구간 {start - prev_end:,}자 ([{prev_end}, {start})) — 의도 확인")
|
||||||
|
est = estimate_tokens(text[start:end])
|
||||||
|
unit_tokens.append(est)
|
||||||
|
if est > cap:
|
||||||
|
errors.append(f"{label}: 추정 {est:,} tok > cap {cap:,} — 이 스팬을 더 분할해야 함")
|
||||||
|
covered += end - start
|
||||||
|
prev_end = max(prev_end or 0, end)
|
||||||
|
|
||||||
|
if not errors:
|
||||||
|
head_gap = normalized[0][0]
|
||||||
|
tail_gap = n - normalized[-1][1]
|
||||||
|
if head_gap > gap_warn_chars:
|
||||||
|
warnings.append(f"문서 선두 공백 구간 {head_gap:,}자 ([0, {normalized[0][0]})) — 의도 확인")
|
||||||
|
if tail_gap > gap_warn_chars:
|
||||||
|
warnings.append(f"문서 말미 공백 구간 {tail_gap:,}자 ([{normalized[-1][1]}, {n})) — 의도 확인")
|
||||||
|
|
||||||
|
coverage_pct = round(covered * 100.0 / n, 2) if n else 0.0
|
||||||
|
if not errors and coverage_pct < min_coverage_pct:
|
||||||
|
errors.append(
|
||||||
|
f"커버리지 {coverage_pct}% < {min_coverage_pct}% — 경계가 본문 대부분을 덮어야 함"
|
||||||
|
)
|
||||||
|
|
||||||
|
return OverrideCheck(
|
||||||
|
ok=not errors,
|
||||||
|
errors=errors,
|
||||||
|
warnings=warnings,
|
||||||
|
coverage_pct=coverage_pct,
|
||||||
|
boundaries=normalized if not errors else [],
|
||||||
|
unit_tokens=unit_tokens,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def units_from_boundaries(
|
||||||
|
text: str, boundaries: list[tuple[int, int, str | None]]
|
||||||
|
) -> list[SummarizeUnit]:
|
||||||
|
"""정규화·검증 통과한 (start,end,title) 리스트 → SummarizeUnit 리스트.
|
||||||
|
|
||||||
|
유닛 index = 경계 서수 — boundaries 는 payload 에 박제되므로 attempt 간 안정
|
||||||
|
(map_results 멱등 재개 키와 정합).
|
||||||
|
"""
|
||||||
|
units: list[SummarizeUnit] = []
|
||||||
|
for i, (start, end, title) in enumerate(boundaries):
|
||||||
|
seg = text[start:end]
|
||||||
|
units.append(SummarizeUnit(
|
||||||
|
index=i,
|
||||||
|
section_titles=[title],
|
||||||
|
text=seg,
|
||||||
|
est_tokens=estimate_tokens(seg),
|
||||||
|
))
|
||||||
|
return units
|
||||||
|
|
||||||
|
|
||||||
|
def leaf_spans(text: str, leaves: list[HierNode]) -> list[tuple[int, int]]:
|
||||||
|
"""extract_leaves 결과 leaf 들의 원문 (start,end) 문자 스팬.
|
||||||
|
|
||||||
|
_segment 가 원문을 연속 파티션(빈 preamble 만 폐기)으로 자르므로, 커서 순차
|
||||||
|
탐색이 항상 정확한 위치를 찾는다 (동일 본문 반복이 있어도 순서가 앞선 leaf 가
|
||||||
|
앞 오프셋을 가져간다).
|
||||||
|
"""
|
||||||
|
spans: list[tuple[int, int]] = []
|
||||||
|
cursor = 0
|
||||||
|
for leaf in leaves:
|
||||||
|
pos = text.find(leaf.text, cursor)
|
||||||
|
if pos < 0:
|
||||||
|
# 이론상 불가(연속 파티션) — 방어적으로 전체 재탐색
|
||||||
|
pos = text.find(leaf.text)
|
||||||
|
if pos < 0:
|
||||||
|
raise ValueError(f"leaf 본문을 원문에서 찾지 못함 (title={leaf.section_title!r})")
|
||||||
|
spans.append((pos, pos + len(leaf.text)))
|
||||||
|
cursor = pos + len(leaf.text)
|
||||||
|
return spans
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ import asyncio
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
|
|
||||||
from pydantic import BaseModel, Field, ValidationError
|
from pydantic import BaseModel, Field, ValidationError
|
||||||
from sqlalchemy import desc, select
|
from sqlalchemy import desc, select
|
||||||
@@ -29,15 +29,19 @@ from core.utils import setup_logger
|
|||||||
from models.document import Document
|
from models.document import Document
|
||||||
from models.queue import ProcessingQueue, StageDeferred
|
from models.queue import ProcessingQueue, StageDeferred
|
||||||
from policy.prompt_render import render_26b, policy_version as compute_policy_version
|
from policy.prompt_render import render_26b, policy_version as compute_policy_version
|
||||||
|
from services.alerts import send_alert
|
||||||
from services.document_telemetry import record_analyze_event
|
from services.document_telemetry import record_analyze_event
|
||||||
from services.search.llm_gate import Priority, acquire_mlx_gate
|
from services.search.llm_gate import Priority, acquire_mlx_gate
|
||||||
from services.summarize_units import (
|
from services.summarize_units import (
|
||||||
CAP_TOKENS,
|
CAP_TOKENS,
|
||||||
UnitPlan,
|
UnitPlan,
|
||||||
build_reduce_units_block,
|
build_reduce_units_block,
|
||||||
|
choose_override_source,
|
||||||
estimate_tokens,
|
estimate_tokens,
|
||||||
plan_summarize_units,
|
plan_summarize_units,
|
||||||
render_map_slice,
|
render_map_slice,
|
||||||
|
units_from_boundaries,
|
||||||
|
validate_override_boundaries,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = setup_logger("deep_summary_worker")
|
logger = setup_logger("deep_summary_worker")
|
||||||
@@ -45,9 +49,16 @@ logger = setup_logger("deep_summary_worker")
|
|||||||
DEEP_SUMMARY_TASK = "p3c_deep_summary"
|
DEEP_SUMMARY_TASK = "p3c_deep_summary"
|
||||||
# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce
|
# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce
|
||||||
REDUCE_TASK = "p3c_deep_summary_reduce"
|
REDUCE_TASK = "p3c_deep_summary_reduce"
|
||||||
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격. PR3(알람·경계 주입) 전까지는
|
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격 — attempts 미소모(StageDeferred)라
|
||||||
# 이 간격으로 재계획만 반복한다 — attempts 미소모(StageDeferred)라 영구 failed 없음.
|
# 영구 failed 없음. PR3: HOLD 시 웹훅 알람 + units_override 주입 시 즉시 재개.
|
||||||
HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440"))
|
HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440"))
|
||||||
|
# HOLD 알람 dedupe — payload.presegment.alerted_at 이 이 일수 이내면 재발화 억제
|
||||||
|
# (매 24h 재보류마다 재알람 방지). apply CLI 가 override 기록 시 alerted_at 을 지워
|
||||||
|
# 다음 이벤트(예: override 거부)는 신선하게 발화된다.
|
||||||
|
ALERT_DEDUPE_DAYS = 7
|
||||||
|
# units_override 방어 캡 슬랙 — apply 게이트(CAP)보다 10% 여유. 초과 유닛은 실패
|
||||||
|
# 대신 재-HOLD + 알람 (잘못된 override 가 900s 콜을 재생산하는 것 차단).
|
||||||
|
OVERRIDE_CAP_SLACK = 1.1
|
||||||
# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어).
|
# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어).
|
||||||
REDUCE_BUDGET_FLOOR_TOKENS = 1_000
|
REDUCE_BUDGET_FLOOR_TOKENS = 1_000
|
||||||
|
|
||||||
@@ -111,6 +122,17 @@ async def process(
|
|||||||
|
|
||||||
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
|
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
|
||||||
|
|
||||||
|
# ─── presegment PR3 — units_override 재개 경로 (유인 분할 경계 주입) ───
|
||||||
|
# apply CLI(scripts/presegment_attended.py) 가 payload.presegment.units_override 를
|
||||||
|
# 기록한 문서는 tier 재판정·HOLD 없이 그 경계로 유닛을 구성해 기존 PR2
|
||||||
|
# map-reduce 경로를 그대로 탄다. override 없는 문서는 아래 기존 경로와 바이트 동일.
|
||||||
|
if (payload.get("presegment") or {}).get("units_override"):
|
||||||
|
await _process_units_override(
|
||||||
|
doc, queue_row, envelope, subject_domain, session,
|
||||||
|
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
# ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ───
|
# ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ───
|
||||||
# TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way:
|
# TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way:
|
||||||
# auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce)
|
# auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce)
|
||||||
@@ -123,7 +145,9 @@ async def process(
|
|||||||
# units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce
|
# units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce
|
||||||
# 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다.
|
# 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다.
|
||||||
if unit_plan.tier != "auto" or not unit_plan.units:
|
if unit_plan.tier != "auto" or not unit_plan.units:
|
||||||
await _hold_awaiting_split(session, queue_row, unit_plan, document_id)
|
await _hold_awaiting_split(
|
||||||
|
session, queue_row, unit_plan, document_id, doc_title=doc.title
|
||||||
|
)
|
||||||
await _process_map_reduce(
|
await _process_map_reduce(
|
||||||
doc, queue_row, envelope, subject_domain, unit_plan, session,
|
doc, queue_row, envelope, subject_domain, unit_plan, session,
|
||||||
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
||||||
@@ -250,43 +274,196 @@ async def process(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _hold_alert_due(preseg: dict, now: datetime) -> bool:
|
||||||
|
"""HOLD 알람 dedupe — alerted_at 이 없거나 ALERT_DEDUPE_DAYS 초과 시에만 발화."""
|
||||||
|
ts = preseg.get("alerted_at")
|
||||||
|
if not ts:
|
||||||
|
return True
|
||||||
|
try:
|
||||||
|
prev = datetime.fromisoformat(str(ts))
|
||||||
|
except ValueError:
|
||||||
|
return True # 깨진 타임스탬프 = 기록 신뢰 불가 → 발화하고 재기록
|
||||||
|
if prev.tzinfo is None:
|
||||||
|
prev = prev.replace(tzinfo=timezone.utc)
|
||||||
|
return (now - prev) >= timedelta(days=ALERT_DEDUPE_DAYS)
|
||||||
|
|
||||||
|
|
||||||
async def _hold_awaiting_split(
|
async def _hold_awaiting_split(
|
||||||
session: AsyncSession, queue_row: ProcessingQueue, plan: UnitPlan, document_id: int
|
session: AsyncSession,
|
||||||
|
queue_row: ProcessingQueue,
|
||||||
|
plan: UnitPlan,
|
||||||
|
document_id: int,
|
||||||
|
doc_title: str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류.
|
"""HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류.
|
||||||
|
|
||||||
payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러
|
payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러
|
||||||
(queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음.
|
(queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음.
|
||||||
알람(ntfy)·클로드 경계 주입은 PR3 — 그 전까지는 HOLD_RETRY_MINUTES 간격 재계획만 반복.
|
PR3: 유인 전환 게이트 웹훅 알람 발화(alerted_at dedupe — 매 24h 재보류마다 재알람 방지).
|
||||||
무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트).
|
무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트).
|
||||||
"""
|
"""
|
||||||
payload = dict(queue_row.payload or {})
|
payload = dict(queue_row.payload or {})
|
||||||
preseg = dict(payload.get("presegment") or {})
|
preseg = dict(payload.get("presegment") or {})
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
alert_due = _hold_alert_due(preseg, now)
|
||||||
|
oversized = [
|
||||||
|
(u.section_titles[0] if u.section_titles else None)
|
||||||
|
for u in plan.units if u.over_cap
|
||||||
|
][:20]
|
||||||
preseg.update({
|
preseg.update({
|
||||||
"awaiting_split": True,
|
"awaiting_split": True,
|
||||||
"tier": plan.tier,
|
"tier": plan.tier,
|
||||||
"over_pct": plan.over_pct,
|
"over_pct": plan.over_pct,
|
||||||
"total_est_tokens": plan.total_est_tokens,
|
"total_est_tokens": plan.total_est_tokens,
|
||||||
"units": len(plan.units),
|
"units": len(plan.units),
|
||||||
# 클로드가 분할해야 할 초과 섹션 표본 (PR3 알람 본문용)
|
# 클로드가 분할해야 할 초과 섹션 표본 (알람 본문 + export CLI 안내용)
|
||||||
"oversized_sections": [
|
"oversized_sections": oversized,
|
||||||
(u.section_titles[0] if u.section_titles else None)
|
|
||||||
for u in plan.units if u.over_cap
|
|
||||||
][:20],
|
|
||||||
})
|
})
|
||||||
|
if alert_due:
|
||||||
|
preseg["alerted_at"] = now.isoformat()
|
||||||
payload["presegment"] = preseg
|
payload["presegment"] = preseg
|
||||||
queue_row.payload = payload # 재할당 = JSONB 변경 감지
|
queue_row.payload = payload # 재할당 = JSONB 변경 감지
|
||||||
await session.commit()
|
await session.commit()
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} "
|
f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} "
|
||||||
f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} "
|
f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} "
|
||||||
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, 클로드 분할=PR3 유인)"
|
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, alert={'발화' if alert_due else 'dedupe'})"
|
||||||
)
|
)
|
||||||
|
if alert_due:
|
||||||
|
# commit 이후 발화 — 알람이 5s 행에 걸려도 payload 마킹은 이미 영속.
|
||||||
|
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
|
||||||
|
top3 = ", ".join(t for t in oversized[:3] if t) or "(제목 없음)"
|
||||||
|
await send_alert(
|
||||||
|
f"[DS] deep_summary HOLD — doc {document_id} 유인 분할 필요",
|
||||||
|
(
|
||||||
|
f"문서: {doc_title or '(제목 없음)'} (id={document_id})\n"
|
||||||
|
f"tier={plan.tier} / over%={plan.over_pct} / "
|
||||||
|
f"total_est_tokens={plan.total_est_tokens:,} / units={len(plan.units)}\n"
|
||||||
|
f"초과 섹션(상위 3): {top3}\n"
|
||||||
|
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
|
||||||
|
f"유인 분할: scripts/presegment_attended.py export --doc {document_id}"
|
||||||
|
),
|
||||||
|
)
|
||||||
raise StageDeferred(
|
raise StageDeferred(
|
||||||
f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES
|
f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _rehold_bad_override(
|
||||||
|
session: AsyncSession, queue_row: ProcessingQueue, doc: Document, reason: str
|
||||||
|
) -> None:
|
||||||
|
"""잘못된 units_override — 실패 대신 재-HOLD + 알람 (900s 콜 재생산 차단).
|
||||||
|
|
||||||
|
units_override 는 payload 에 보존(사람이 원인 조사) + override_rejected 사유 기록.
|
||||||
|
apply CLI 가 alerted_at 을 지워두므로 첫 거부는 즉시 발화되고, 이후 24h 재보류
|
||||||
|
루프는 ALERT_DEDUPE_DAYS dedupe 로 억제된다.
|
||||||
|
"""
|
||||||
|
document_id = doc.id
|
||||||
|
payload = dict(queue_row.payload or {})
|
||||||
|
preseg = dict(payload.get("presegment") or {})
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
alert_due = _hold_alert_due(preseg, now)
|
||||||
|
preseg.update({
|
||||||
|
"awaiting_split": True,
|
||||||
|
"override_rejected": reason,
|
||||||
|
"override_rejected_at": now.isoformat(),
|
||||||
|
})
|
||||||
|
if alert_due:
|
||||||
|
preseg["alerted_at"] = now.isoformat()
|
||||||
|
payload["presegment"] = preseg
|
||||||
|
queue_row.payload = payload # 재할당 = JSONB 변경 감지
|
||||||
|
await session.commit()
|
||||||
|
logger.warning(f"[deep] id={document_id} units_override 거부 → 재-HOLD: {reason}")
|
||||||
|
if alert_due:
|
||||||
|
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
|
||||||
|
await send_alert(
|
||||||
|
f"[DS] deep_summary 유인 분할 경계 거부 — doc {document_id}",
|
||||||
|
(
|
||||||
|
f"문서: {doc.title or '(제목 없음)'} (id={document_id})\n"
|
||||||
|
f"사유: {reason}\n"
|
||||||
|
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
|
||||||
|
f"수정: scripts/presegment_attended.py export --doc {document_id} "
|
||||||
|
f"→ apply --doc {document_id} --boundaries FILE"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
raise StageDeferred(
|
||||||
|
f"override_rejected:{reason[:80]}", retry_after_minutes=HOLD_RETRY_MINUTES
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_units_override(
|
||||||
|
doc: Document,
|
||||||
|
queue_row: ProcessingQueue,
|
||||||
|
envelope: EscalationEnvelope,
|
||||||
|
subject_domain: str,
|
||||||
|
session: AsyncSession,
|
||||||
|
*,
|
||||||
|
defer_on_deep_unavailable: bool,
|
||||||
|
) -> None:
|
||||||
|
"""PR3 — apply CLI 가 기록한 유인 분할 경계로 map-reduce 재개.
|
||||||
|
|
||||||
|
경계 = units_override.source 텍스트의 (start, end) 문자 오프셋. 방어 검증
|
||||||
|
(source_len 일치·단조·비중첩·범위·유닛 캡*1.1) 실패 시 재-HOLD + 알람 —
|
||||||
|
apply 이후 본문이 재생성됐거나 수기 주입이 깨진 경우 900s 콜로 흐르지 않는다.
|
||||||
|
통과 시 기존 PR2 _process_map_reduce 를 그대로 탄다(맵 결과 유닛 단위 commit·
|
||||||
|
reduce·ai_detail_summary 기록 — 유닛 index 는 payload 박제 경계 서수라 안정).
|
||||||
|
"""
|
||||||
|
document_id = doc.id
|
||||||
|
preseg = dict((queue_row.payload or {}).get("presegment") or {})
|
||||||
|
override = preseg.get("units_override")
|
||||||
|
if isinstance(override, (list, tuple)):
|
||||||
|
# 수기 주입 호환 — bare [(start,end,title)] 리스트도 허용
|
||||||
|
override = {"boundaries": list(override)}
|
||||||
|
if not isinstance(override, dict):
|
||||||
|
await _rehold_bad_override(
|
||||||
|
session, queue_row, doc, f"units_override 형식 오류 (type={type(override).__name__})"
|
||||||
|
)
|
||||||
|
|
||||||
|
source = override.get("source")
|
||||||
|
if source is None:
|
||||||
|
source, text_src = choose_override_source(doc.md_content, doc.extracted_text)
|
||||||
|
elif source in ("md_content", "extracted_text"):
|
||||||
|
text_src = (doc.md_content if source == "md_content" else doc.extracted_text) or ""
|
||||||
|
else:
|
||||||
|
await _rehold_bad_override(
|
||||||
|
session, queue_row, doc, f"units_override.source={source!r} 미지원"
|
||||||
|
)
|
||||||
|
|
||||||
|
expected_len = override.get("source_len")
|
||||||
|
if expected_len is not None and expected_len != len(text_src):
|
||||||
|
await _rehold_bad_override(
|
||||||
|
session, queue_row, doc,
|
||||||
|
f"source_len 불일치 — override={expected_len:,} vs 현재 {source}={len(text_src):,}"
|
||||||
|
" (본문 재생성됨 — export 부터 재실행)",
|
||||||
|
)
|
||||||
|
|
||||||
|
check = validate_override_boundaries(
|
||||||
|
text_src,
|
||||||
|
override.get("boundaries") or [],
|
||||||
|
cap=int(CAP_TOKENS * OVERRIDE_CAP_SLACK),
|
||||||
|
min_coverage_pct=0.0, # 커버리지 품질 게이트는 apply CLI 가 이미 통과시킴
|
||||||
|
)
|
||||||
|
if not check.ok:
|
||||||
|
await _rehold_bad_override(session, queue_row, doc, "; ".join(check.errors[:5]))
|
||||||
|
|
||||||
|
units = units_from_boundaries(text_src, check.boundaries)
|
||||||
|
plan = UnitPlan(
|
||||||
|
mode="map_reduce",
|
||||||
|
tier="override",
|
||||||
|
total_est_tokens=estimate_tokens(text_src),
|
||||||
|
over_pct=float(preseg.get("over_pct") or 0.0),
|
||||||
|
units=units,
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"[deep] id={document_id} units_override 재개 — source={source} units={len(units)} "
|
||||||
|
f"coverage={check.coverage_pct}% max_unit_tokens={max(check.unit_tokens, default=0)}"
|
||||||
|
)
|
||||||
|
await _process_map_reduce(
|
||||||
|
doc, queue_row, envelope, subject_domain, plan, session,
|
||||||
|
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def _call_26b(
|
async def _call_26b(
|
||||||
client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int
|
client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int
|
||||||
):
|
):
|
||||||
|
|||||||
@@ -23,6 +23,15 @@ logger = setup_logger("queue_consumer")
|
|||||||
# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만.
|
# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만.
|
||||||
_hold_logged = False
|
_hold_logged = False
|
||||||
|
|
||||||
|
# PR3 후속(2026-07-03): 영구 실패 알람 — 사람이 개입해야 풀리는 상태라 Chat 웹훅 발화.
|
||||||
|
# allowlist 로 소음 제한: embed/chunk 류 대량 배치가 일제히 실패하면 문서 수만큼 알람이
|
||||||
|
# 쏟아지므로, 건당 가치가 높고 발생률이 낮은 LLM 스테이지만 기본 대상으로 한다.
|
||||||
|
_ALERT_FAIL_STAGES = {
|
||||||
|
s.strip()
|
||||||
|
for s in os.getenv("ALERT_FAIL_STAGES", "deep_summary,summarize").split(",")
|
||||||
|
if s.strip()
|
||||||
|
}
|
||||||
|
|
||||||
# stage별 배치 크기
|
# stage별 배치 크기
|
||||||
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
|
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
|
||||||
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
|
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
|
||||||
@@ -353,6 +362,8 @@ async def _process_stage(stage, worker_fn):
|
|||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# 실패 처리
|
# 실패 처리
|
||||||
|
permanently_failed = False
|
||||||
|
doc_title = None
|
||||||
async with async_session() as session:
|
async with async_session() as session:
|
||||||
item = await session.get(ProcessingQueue, queue_id)
|
item = await session.get(ProcessingQueue, queue_id)
|
||||||
if not item:
|
if not item:
|
||||||
@@ -363,7 +374,16 @@ async def _process_stage(stage, worker_fn):
|
|||||||
item.error_message = err_text[:500]
|
item.error_message = err_text[:500]
|
||||||
if item.attempts >= item.max_attempts:
|
if item.attempts >= item.max_attempts:
|
||||||
item.status = "failed"
|
item.status = "failed"
|
||||||
|
permanently_failed = True
|
||||||
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
|
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
|
||||||
|
if stage in _ALERT_FAIL_STAGES:
|
||||||
|
# 알람용 제목 best-effort — 실패해도 알람 자체는 발화한다.
|
||||||
|
try:
|
||||||
|
from models.document import Document
|
||||||
|
_doc = await session.get(Document, document_id)
|
||||||
|
doc_title = getattr(_doc, "title", None)
|
||||||
|
except Exception:
|
||||||
|
doc_title = None
|
||||||
# B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다.
|
# B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다.
|
||||||
# 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형
|
# 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형
|
||||||
# batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan
|
# batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan
|
||||||
@@ -385,6 +405,18 @@ async def _process_stage(stage, worker_fn):
|
|||||||
item.started_at = None
|
item.started_at = None
|
||||||
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
|
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
if permanently_failed and stage in _ALERT_FAIL_STAGES:
|
||||||
|
# 영구 실패 = 무인 파이프라인이 스스로 못 푸는 상태 → 유인 전환 알람.
|
||||||
|
# send_alert 는 절대 raise 하지 않음(no-op/실패 = False 반환뿐).
|
||||||
|
from services.alerts import send_alert
|
||||||
|
await send_alert(
|
||||||
|
f"[DS] {stage} 영구 실패 — doc {document_id}",
|
||||||
|
(
|
||||||
|
f"{doc_title or '(제목 미상)'}\n"
|
||||||
|
f"에러: {err_text[:300]}\n"
|
||||||
|
f"확인: scripts/presegment_attended.py list (보류/거부 사유) 또는 큐 재큐"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def consume_queue():
|
async def consume_queue():
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
<script lang="ts">
|
<script lang="ts">
|
||||||
// 처리 머신 보드 v3 — 통합안 (plan ds-board-merged: C2 머신레인 + C3 번다운/정직ETA).
|
// 처리 머신 보드 v4 — 2026-07-02 컷오버 후 2노드 (나스+맥미니).
|
||||||
// · 머신 3레인(GPU/맥미니/맥북) = "누가 일하나" + 요약 오프로드(맥북 합류) 가시화
|
// · 머신 2레인(나스/맥미니) = "누가 일하나" — 나스=DS 본체 Docker(추출/마크다운/
|
||||||
|
// 청크·임베딩 등), 맥미니=단일 생성 LLM 허브(분류/요약/심층분석 + bge-m3/리랭크)
|
||||||
// · 지배 백로그 번다운 패널 = "언제 끝나나" + 유입 차감한 정직 ETA(summarize_eta)
|
// · 지배 백로그 번다운 패널 = "언제 끝나나" + 유입 차감한 정직 ETA(summarize_eta)
|
||||||
// · 신선도 '갱신 N초 전' + stale 경고 / 실패 드로어·상세 패널은 v2 자산 재사용.
|
// · 신선도 '갱신 N초 전' + stale 경고 / 실패 드로어·상세 패널은 v2 자산 재사용.
|
||||||
// 데이터 = GET /api/queue/overview (60s 폴링 store) + GET /api/queue/failed (드로어).
|
// 데이터 = GET /api/queue/overview (60s 폴링 store) + GET /api/queue/failed (드로어).
|
||||||
@@ -193,7 +194,7 @@
|
|||||||
const machineByKey = $derived(
|
const machineByKey = $derived(
|
||||||
new Map<FlowMachine, MachineOverview>(overview.machines.map((m) => [m.key as FlowMachine, m])),
|
new Map<FlowMachine, MachineOverview>(overview.machines.map((m) => [m.key as FlowMachine, m])),
|
||||||
);
|
);
|
||||||
const LANE_ORDER: FlowMachine[] = ['gpu', 'macmini', 'macbook'];
|
const LANE_ORDER: FlowMachine[] = ['nas', 'macmini'];
|
||||||
const lanes = $derived(
|
const lanes = $derived(
|
||||||
LANE_ORDER.map((key) => ({
|
LANE_ORDER.map((key) => ({
|
||||||
key,
|
key,
|
||||||
@@ -203,13 +204,6 @@
|
|||||||
})),
|
})),
|
||||||
);
|
);
|
||||||
|
|
||||||
// 요약 오프로드 분담 — 맥미니 vs 맥북 (A-1 summarize_by_machine)
|
|
||||||
const split = $derived(overview.summarize_by_machine);
|
|
||||||
const splitTotal1h = $derived(Math.max(1, split.macmini.done_1h + split.macbook.done_1h));
|
|
||||||
const macbookSharePct = $derived(Math.round((split.macbook.done_1h / splitTotal1h) * 100));
|
|
||||||
// 맥북이 요약을 실제로 가져가는 중인가 (합류 표식 게이트)
|
|
||||||
const offloadActive = $derived(split.macbook.done_1h > 0);
|
|
||||||
|
|
||||||
// ─── 백그라운드 작업 (큐 밖 스크립트 backfill) — processing_queue 사각지대 노출 ───
|
// ─── 백그라운드 작업 (큐 밖 스크립트 backfill) — processing_queue 사각지대 노출 ───
|
||||||
const bgJobs = $derived(overview.background_jobs ?? []);
|
const bgJobs = $derived(overview.background_jobs ?? []);
|
||||||
const runningBg = $derived(bgJobs.filter((j) => j.state === 'running'));
|
const runningBg = $derived(bgJobs.filter((j) => j.state === 'running'));
|
||||||
@@ -266,7 +260,7 @@
|
|||||||
: `갱신 ${Math.round(ageSec / 60)}분 전`,
|
: `갱신 ${Math.round(ageSec / 60)}분 전`,
|
||||||
);
|
);
|
||||||
|
|
||||||
// ─── 24h 번다운 (C3) — 요약 유입 vs 소화 + 맥북 합류 변곡점 마커 ───
|
// ─── 24h 번다운 (C3) — 요약 유입 vs 소화 ───
|
||||||
const burn = $derived.by(() => {
|
const burn = $derived.by(() => {
|
||||||
const t = overview.trend_24h;
|
const t = overview.trend_24h;
|
||||||
if (!t || t.length === 0) return null;
|
if (!t || t.length === 0) return null;
|
||||||
@@ -279,20 +273,12 @@
|
|||||||
t.map((b, i) => `${(i * step).toFixed(1)},${y(sel(b))}`).join(' ');
|
t.map((b, i) => `${(i * step).toFixed(1)},${y(sel(b))}`).join(' ');
|
||||||
const doneLine = line((b) => b.done);
|
const doneLine = line((b) => b.done);
|
||||||
const area = `0,${h} ${doneLine} ${w.toFixed(1)},${h}`;
|
const area = `0,${h} ${doneLine} ${w.toFixed(1)},${h}`;
|
||||||
// 합류 변곡점 = done 최대 버킷 (맥북 야간 drain 합류 추정)
|
|
||||||
let mi = 0;
|
|
||||||
t.forEach((b, i) => {
|
|
||||||
if (b.done > t[mi].done) mi = i;
|
|
||||||
});
|
|
||||||
return {
|
return {
|
||||||
w,
|
w,
|
||||||
h,
|
h,
|
||||||
area,
|
area,
|
||||||
doneLine,
|
doneLine,
|
||||||
inflowLine: line((b) => b.inflow),
|
inflowLine: line((b) => b.inflow),
|
||||||
markX: (mi * step).toFixed(1),
|
|
||||||
markHour: t[mi].hour,
|
|
||||||
markDone: t[mi].done,
|
|
||||||
peak: max,
|
peak: max,
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
@@ -332,7 +318,7 @@
|
|||||||
</span>
|
</span>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<!-- 머신 레인 (누가 일하나 + 요약 오프로드) -->
|
<!-- 머신 레인 (누가 일하나) -->
|
||||||
<div class="grid gap-2 mb-3">
|
<div class="grid gap-2 mb-3">
|
||||||
{#each lanes as lane (lane.key)}
|
{#each lanes as lane (lane.key)}
|
||||||
<div class="bg-surface border border-default rounded-card px-3.5 py-2.5">
|
<div class="bg-surface border border-default rounded-card px-3.5 py-2.5">
|
||||||
@@ -342,11 +328,8 @@
|
|||||||
<span class="text-[10px] text-faint font-mono">{lane.meta.model}</span>
|
<span class="text-[10px] text-faint font-mono">{lane.meta.model}</span>
|
||||||
<span class="text-[11px] text-dim tabular-nums ml-1">{formatRate(lane.card?.done_1h ?? 0)}/h</span>
|
<span class="text-[11px] text-dim tabular-nums ml-1">{formatRate(lane.card?.done_1h ?? 0)}/h</span>
|
||||||
{#each bgForMachine(lane.key) as j (j.id)}<span class="text-[10px] font-semibold text-success tabular-nums ml-1">생성 중: {j.label ?? j.kind}{#if j.total} {j.processed}/{j.total}{/if}</span>{/each}
|
{#each bgForMachine(lane.key) as j (j.id)}<span class="text-[10px] font-semibold text-success tabular-nums ml-1">생성 중: {j.label ?? j.kind}{#if j.total} {j.processed}/{j.total}{/if}</span>{/each}
|
||||||
{#if lane.key === 'macbook' && (lane.card?.deferred_pending ?? 0) > 0}
|
{#if (lane.card?.deferred_pending ?? 0) > 0}
|
||||||
<span class="text-[10px] font-semibold text-warning tabular-nums">보류 {lane.card?.deferred_pending}</span>
|
<span class="text-[10px] font-semibold text-warning tabular-nums" title="LLM 백오프 — 자동 재개 대기">보류 {lane.card?.deferred_pending}</span>
|
||||||
{/if}
|
|
||||||
{#if lane.card?.state === 'deferred'}
|
|
||||||
<span class="text-[9px] text-warning">잠듦 — 요약은 맥미니로 복귀</span>
|
|
||||||
{/if}
|
{/if}
|
||||||
</div>
|
</div>
|
||||||
<div class="flex items-stretch gap-1.5 flex-wrap">
|
<div class="flex items-stretch gap-1.5 flex-wrap">
|
||||||
@@ -368,26 +351,8 @@
|
|||||||
</div>
|
</div>
|
||||||
<div class="text-sm font-extrabold tabular-nums leading-tight text-text">{n.pending.toLocaleString()}<span class="text-[9px] text-faint font-normal ml-0.5">대기</span></div>
|
<div class="text-sm font-extrabold tabular-nums leading-tight text-text">{n.pending.toLocaleString()}<span class="text-[9px] text-faint font-normal ml-0.5">대기</span></div>
|
||||||
<div class="text-[9px] text-dim tabular-nums whitespace-nowrap">{formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()}</div>
|
<div class="text-[9px] text-dim tabular-nums whitespace-nowrap">{formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()}</div>
|
||||||
{#if n.def.key === 'summarize'}
|
|
||||||
<div class="mt-1 h-1 w-full rounded-full overflow-hidden flex" title="맥미니 {split.macmini.done_1h}/h · 맥북 {split.macbook.done_1h}/h">
|
|
||||||
<span class="block h-full mtag-macmini-bar" style="width:{100 - macbookSharePct}%"></span>
|
|
||||||
<span class="block h-full mtag-macbook-bar" style="width:{macbookSharePct}%"></span>
|
|
||||||
</div>
|
|
||||||
<div class="text-[9px] text-faint tabular-nums whitespace-nowrap mt-0.5">맥미니 {split.macmini.done_1h} · 맥북 {split.macbook.done_1h}/h</div>
|
|
||||||
{/if}
|
|
||||||
</button>
|
</button>
|
||||||
{/each}
|
{/each}
|
||||||
{#if lane.key === 'macbook' && offloadActive}
|
|
||||||
<button
|
|
||||||
class="text-left rounded-lg border border-dashed border-warning/50 px-2.5 py-1.5 cursor-pointer hover:bg-surface-hover min-w-[96px]"
|
|
||||||
onclick={() => toggleNode('summarize')}
|
|
||||||
title="맥북이 요약을 맥미니에서 가져와 처리 중"
|
|
||||||
>
|
|
||||||
<div class="flex items-center gap-1 text-[11px] font-semibold text-text whitespace-nowrap">요약 합류 <span class="text-[8px] font-bold text-warning">OFFLOAD</span></div>
|
|
||||||
<div class="text-sm font-extrabold tabular-nums leading-tight text-text">{split.macbook.done_1h}<span class="text-[9px] text-faint font-normal ml-0.5">/h</span></div>
|
|
||||||
<div class="text-[9px] text-dim tabular-nums whitespace-nowrap">요약의 {macbookSharePct}% 담당</div>
|
|
||||||
</button>
|
|
||||||
{/if}
|
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
{/each}
|
{/each}
|
||||||
@@ -399,15 +364,11 @@
|
|||||||
<div class="flex items-center gap-2 mb-2">
|
<div class="flex items-center gap-2 mb-2">
|
||||||
<span class="text-[11px] font-bold text-text">요약 백로그 24시간</span>
|
<span class="text-[11px] font-bold text-text">요약 백로그 24시간</span>
|
||||||
<span class="text-[9px] text-faint">유입(회색) vs 소화(녹색)</span>
|
<span class="text-[9px] text-faint">유입(회색) vs 소화(녹색)</span>
|
||||||
{#if offloadActive}<span class="text-[9px] text-warning ml-auto">맥북 합류 {burn.markHour} — 소화 급증</span>{/if}
|
|
||||||
</div>
|
</div>
|
||||||
<svg viewBox="0 0 {burn.w} {burn.h}" class="block w-full" style="height:64px" preserveAspectRatio="none" role="img" aria-label="요약 백로그 24시간 번다운">
|
<svg viewBox="0 0 {burn.w} {burn.h}" class="block w-full" style="height:64px" preserveAspectRatio="none" role="img" aria-label="요약 백로그 24시간 번다운">
|
||||||
<polygon points={burn.area} fill="currentColor" class="text-success" opacity="0.12" />
|
<polygon points={burn.area} fill="currentColor" class="text-success" opacity="0.12" />
|
||||||
<polyline points={burn.inflowLine} fill="none" stroke="currentColor" stroke-width="1.2" class="text-faint" />
|
<polyline points={burn.inflowLine} fill="none" stroke="currentColor" stroke-width="1.2" class="text-faint" />
|
||||||
<polyline points={burn.doneLine} fill="none" stroke="currentColor" stroke-width="1.6" class="text-success" />
|
<polyline points={burn.doneLine} fill="none" stroke="currentColor" stroke-width="1.6" class="text-success" />
|
||||||
{#if offloadActive}
|
|
||||||
<line x1={burn.markX} y1="0" x2={burn.markX} y2={burn.h} stroke="currentColor" stroke-width="1" stroke-dasharray="2 2" class="text-warning" opacity="0.7" />
|
|
||||||
{/if}
|
|
||||||
</svg>
|
</svg>
|
||||||
<div class="flex flex-wrap gap-x-4 gap-y-1 mt-2 pt-2 border-t border-default text-[10px] text-dim tabular-nums">
|
<div class="flex flex-wrap gap-x-4 gap-y-1 mt-2 pt-2 border-t border-default text-[10px] text-dim tabular-nums">
|
||||||
{#each mainNodes.filter((n) => n.pending > 0 && n.def.key !== 'summarize') as n (n.def.key)}
|
{#each mainNodes.filter((n) => n.pending > 0 && n.def.key !== 'summarize') as n (n.def.key)}
|
||||||
@@ -558,13 +519,9 @@
|
|||||||
</div>
|
</div>
|
||||||
|
|
||||||
<style>
|
<style>
|
||||||
/* 머신 색 — 디자인 토큰 외 3색 (gpu 청/macmini 보라/macbook 황) — 이 컴포넌트 한정 */
|
/* 머신 색 — 디자인 토큰 외 2색 (nas 청/macmini 보라) — 이 컴포넌트 한정 */
|
||||||
.mtag-gpu { background: #e7eef6; color: #3b6ea5; }
|
.mtag-nas { background: #e7eef6; color: #3b6ea5; }
|
||||||
.mtag-macmini { background: #efe9f7; color: #8a5fbf; }
|
.mtag-macmini { background: #efe9f7; color: #8a5fbf; }
|
||||||
.mtag-macbook { background: #f7eedd; color: #b07a10; }
|
|
||||||
/* 요약 오프로드 분담 막대 채움 (맥미니 보라 / 맥북 황) */
|
|
||||||
.mtag-macmini-bar { background: #8a5fbf; }
|
|
||||||
.mtag-macbook-bar { background: #b07a10; }
|
|
||||||
.node-sel { outline: 2px solid #3b6ea5; outline-offset: 1px; }
|
.node-sel { outline: 2px solid #3b6ea5; outline-offset: 1px; }
|
||||||
.detail-frame { border-color: #3b6ea5; }
|
.detail-frame { border-color: #3b6ea5; }
|
||||||
.detail-head { background: #e7eef6; }
|
.detail-head { background: #e7eef6; }
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
<script lang="ts">
|
<script lang="ts">
|
||||||
// 처리 현황 드로어 (안6 라이트) — 전 페이지 상태 스트립 클릭 시 우측에서 열림.
|
// 처리 현황 드로어 (안6 라이트) — 전 페이지 상태 스트립 클릭 시 우측에서 열림.
|
||||||
// 머신 미니카드 3 + ETA 한 줄 + 실패 합계 + 홈 링크 축약본. 상세는 홈 보드가 담당.
|
// 머신 미니카드 2(나스/맥미니) + ETA 한 줄 + 실패 합계 + 홈 링크 축약본. 상세는 홈 보드가 담당.
|
||||||
// 데이터 = queueOverview store 공유 (60s 폴링, 실패 시 null → 안내문으로 degrade).
|
// 데이터 = queueOverview store 공유 (60s 폴링, 실패 시 null → 안내문으로 degrade).
|
||||||
// 열림 상태는 uiState 단일 drawer slot('queue') — 사이드바 드로어와 동시 오픈 차단.
|
// 열림 상태는 uiState 단일 drawer slot('queue') — 사이드바 드로어와 동시 오픈 차단.
|
||||||
import { X } from 'lucide-svelte';
|
import { X } from 'lucide-svelte';
|
||||||
@@ -51,7 +51,7 @@
|
|||||||
|
|
||||||
<div class="p-4 space-y-3">
|
<div class="p-4 space-y-3">
|
||||||
{#if data}
|
{#if data}
|
||||||
<!-- 머신 미니카드 3 -->
|
<!-- 머신 미니카드 (나스/맥미니) -->
|
||||||
{#each data.machines as m (m.key)}
|
{#each data.machines as m (m.key)}
|
||||||
<div class="bg-surface border border-default rounded-lg px-3.5 py-2.5">
|
<div class="bg-surface border border-default rounded-lg px-3.5 py-2.5">
|
||||||
<div class="flex items-center justify-between gap-2">
|
<div class="flex items-center justify-between gap-2">
|
||||||
|
|||||||
@@ -5,7 +5,7 @@
|
|||||||
* 필드 변경 시 양쪽 동시 수정 필수.
|
* 필드 변경 시 양쪽 동시 수정 필수.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
export type MachineKey = 'gpu' | 'macmini' | 'macbook';
|
export type MachineKey = 'nas' | 'macmini';
|
||||||
|
|
||||||
/** 머신 상태 — active(가동) / deferred(보류) / idle(대기) */
|
/** 머신 상태 — active(가동) / deferred(보류) / idle(대기) */
|
||||||
export type MachineState = 'active' | 'deferred' | 'idle';
|
export type MachineState = 'active' | 'deferred' | 'idle';
|
||||||
@@ -29,7 +29,7 @@ export interface MachineOverview {
|
|||||||
/** 최근 1시간 완료 건수 (처리율 N/h 표기) */
|
/** 최근 1시간 완료 건수 (처리율 N/h 표기) */
|
||||||
done_1h: number;
|
done_1h: number;
|
||||||
done_today: number;
|
done_today: number;
|
||||||
/** 보류 건수 — 맥북 sleep 등으로 자동 재개 대기 중 */
|
/** 보류 건수 — LLM 허브 백오프 등으로 자동 재개 대기 중 */
|
||||||
deferred_pending: number;
|
deferred_pending: number;
|
||||||
current: MachineCurrentItem[];
|
current: MachineCurrentItem[];
|
||||||
}
|
}
|
||||||
@@ -50,12 +50,6 @@ export interface TrendPoint {
|
|||||||
done: number;
|
done: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** summarize 머신별 완료 실적 분담 (오프로드 가시화 — ds-board-merged A-1) */
|
|
||||||
export interface SummarizeByMachine {
|
|
||||||
macmini: { done_1h: number; done_today: number };
|
|
||||||
macbook: { done_1h: number; done_today: number };
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface QueueTotals {
|
export interface QueueTotals {
|
||||||
pending: number;
|
pending: number;
|
||||||
processing: number;
|
processing: number;
|
||||||
@@ -93,7 +87,6 @@ export interface BackgroundJob {
|
|||||||
export interface QueueOverview {
|
export interface QueueOverview {
|
||||||
machines: MachineOverview[];
|
machines: MachineOverview[];
|
||||||
summarize_eta: SummarizeEta;
|
summarize_eta: SummarizeEta;
|
||||||
summarize_by_machine: SummarizeByMachine;
|
|
||||||
trend_24h: TrendPoint[];
|
trend_24h: TrendPoint[];
|
||||||
stages: QueueStageRow[];
|
stages: QueueStageRow[];
|
||||||
totals: QueueTotals;
|
totals: QueueTotals;
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ export function formatAgeSec(sec: number): string {
|
|||||||
* ★ 모델/엔진 교체 시 이 블록 1곳만 수정 (예: 맥미니 모델 스왑).
|
* ★ 모델/엔진 교체 시 이 블록 1곳만 수정 (예: 맥미니 모델 스왑).
|
||||||
*/
|
*/
|
||||||
|
|
||||||
export type FlowMachine = 'gpu' | 'macmini' | 'macbook';
|
export type FlowMachine = 'nas' | 'macmini';
|
||||||
|
|
||||||
export interface FlowNodeDef {
|
export interface FlowNodeDef {
|
||||||
key: string;
|
key: string;
|
||||||
@@ -79,26 +79,25 @@ export interface FlowNodeDef {
|
|||||||
|
|
||||||
/** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */
|
/** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */
|
||||||
export const FLOW_NODES: FlowNodeDef[] = [
|
export const FLOW_NODES: FlowNodeDef[] = [
|
||||||
{ key: 'extract', label: '추출', stages: ['extract'], machine: 'gpu', engine: 'Surya OCR', sub: 'ocr-service' },
|
{ key: 'extract', label: '추출', stages: ['extract'], machine: 'nas', engine: 'kordoc', sub: 'kordoc' },
|
||||||
{ key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'gpu', engine: 'Marker', sub: 'marker-service' },
|
{ key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'nas', engine: 'Marker', sub: 'marker-service' },
|
||||||
{ key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' },
|
{ key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' },
|
||||||
{ key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' },
|
{ key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' },
|
||||||
{ key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'gpu', engine: 'TEI bge-m3', sub: 'text-embeddings-inference' },
|
{ key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'nas', engine: 'bge-m3 (맥미니 콜)', sub: 'embed worker' },
|
||||||
{ key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macbook', engine: 'Qwen3.6-27B', sub: 'deep_summary' },
|
{ key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'deep_summary' },
|
||||||
];
|
];
|
||||||
|
|
||||||
/** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */
|
/** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */
|
||||||
export const AUX_NODES: FlowNodeDef[] = [
|
export const AUX_NODES: FlowNodeDef[] = [
|
||||||
{ key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'gpu', engine: 'Playwright', sub: 'playwright-fetcher' },
|
{ key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'nas', engine: 'Playwright', sub: 'playwright-fetcher' },
|
||||||
{ key: 'stt', label: '전사', stages: ['stt'], machine: 'gpu', engine: 'Whisper', sub: 'stt-service' },
|
{ key: 'stt', label: '전사', stages: ['stt'], machine: 'nas', engine: 'Whisper', sub: 'stt-service' },
|
||||||
{ key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'gpu', engine: '유틸', sub: 'ffmpeg' },
|
{ key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'nas', engine: '유틸', sub: 'ffmpeg' },
|
||||||
];
|
];
|
||||||
|
|
||||||
/** 머신 스트립 메타 — 모델 표기 단일 지점 */
|
/** 머신 스트립 메타 — 모델 표기 단일 지점 (2026-07-02 컷오버: 나스+맥미니 2노드) */
|
||||||
export const MACHINE_META: Record<FlowMachine, { label: string; model: string }> = {
|
export const MACHINE_META: Record<FlowMachine, { label: string; model: string }> = {
|
||||||
gpu: { label: 'GPU 서버', model: '특화 엔진' },
|
nas: { label: '나스', model: 'DS 본체 Docker · 특화 엔진' },
|
||||||
macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · 24/7' },
|
macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · bge-m3 · 24/7' },
|
||||||
macbook: { label: '맥북 M5 Max', model: 'Qwen3.6-27B · 야간 drain' },
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */
|
/** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */
|
||||||
|
|||||||
@@ -72,7 +72,7 @@
|
|||||||
// 처리 현황 스트립 (안6 라이트) — 60s 폴링 store 공유. fetch 실패/401 시
|
// 처리 현황 스트립 (안6 라이트) — 60s 폴링 store 공유. fetch 실패/401 시
|
||||||
// store 가 null → 스트립 자체를 숨김 (silent 비차단, 로그인 페이지 동일).
|
// store 가 null → 스트립 자체를 숨김 (silent 비차단, 로그인 페이지 동일).
|
||||||
let queue = $derived($queueOverview);
|
let queue = $derived($queueOverview);
|
||||||
let queueMacbook = $derived(queue?.machines?.find((m) => m.key === 'macbook') ?? null);
|
let queueMacmini = $derived(queue?.machines?.find((m) => m.key === 'macmini') ?? null);
|
||||||
function toggleQueueDrawer() {
|
function toggleQueueDrawer() {
|
||||||
if (ui.isDrawerOpen('queue')) ui.closeDrawer();
|
if (ui.isDrawerOpen('queue')) ui.closeDrawer();
|
||||||
else ui.openDrawer('queue');
|
else ui.openDrawer('queue');
|
||||||
@@ -189,8 +189,8 @@
|
|||||||
</span>
|
</span>
|
||||||
<span class="tabular-nums shrink-0">대기 <strong class="text-text">{queue.totals.pending.toLocaleString()}</strong></span>
|
<span class="tabular-nums shrink-0">대기 <strong class="text-text">{queue.totals.pending.toLocaleString()}</strong></span>
|
||||||
<span class="tabular-nums shrink-0 {queue.totals.failed > 0 ? 'text-error font-semibold' : ''}">실패 <strong class={queue.totals.failed > 0 ? '' : 'text-text'}>{queue.totals.failed.toLocaleString()}</strong></span>
|
<span class="tabular-nums shrink-0 {queue.totals.failed > 0 ? 'text-error font-semibold' : ''}">실패 <strong class={queue.totals.failed > 0 ? '' : 'text-text'}>{queue.totals.failed.toLocaleString()}</strong></span>
|
||||||
{#if queueMacbook}
|
{#if queueMacmini}
|
||||||
<span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(queueMacbook.state)}">맥북 {MACHINE_STATE_LABEL[queueMacbook.state]}</span>
|
<span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(queueMacmini.state)}">맥미니 {MACHINE_STATE_LABEL[queueMacmini.state]}</span>
|
||||||
{/if}
|
{/if}
|
||||||
<span class="ml-auto flex items-center gap-0.5 text-faint shrink-0">자세히 <ChevronDown size={11} /></span>
|
<span class="ml-auto flex items-center gap-0.5 text-faint shrink-0">자세히 <ChevronDown size={11} /></span>
|
||||||
</button>
|
</button>
|
||||||
|
|||||||
@@ -0,0 +1,456 @@
|
|||||||
|
"""presegment PR3 — HOLD 거대문서 유인 분할 CLI (plan ds-presegment-mapreduce-2).
|
||||||
|
|
||||||
|
deep_summary 워커가 HOLD(payload.presegment.awaiting_split=true) 로 보류한
|
||||||
|
hybrid/whole tier 거대문서를, 사람이(유인 클로드 세션) 경계를 완성해 재개시키는 도구.
|
||||||
|
|
||||||
|
사용법 (fastapi 컨테이너 안에서 실행):
|
||||||
|
docker compose exec fastapi python /app/scripts/presegment_attended.py list
|
||||||
|
docker compose exec fastapi python /app/scripts/presegment_attended.py export --doc 44443 --out /app/logs/preseg_44443
|
||||||
|
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json --dry-run
|
||||||
|
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json
|
||||||
|
|
||||||
|
워크플로우:
|
||||||
|
1. list — awaiting_split 문서 확인.
|
||||||
|
2. export — 문서 통계·hier 개요·자동 pack 유닛 제안·초과 섹션 본문 덤프·boundaries
|
||||||
|
템플릿 JSON 생성. 유인 클로드 세션은 이 파일들만 읽고 TODO 스팬을
|
||||||
|
CAP 이하 경계들로 분할해 템플릿을 완성한다.
|
||||||
|
3. apply — 완성된 boundaries 검증(단조·비중첩·범위·커버리지 90%+·유닛 캡) 후
|
||||||
|
payload.presegment.units_override 기록 + awaiting_split 해제 +
|
||||||
|
deferred_until 제거(즉시 재개). 워커가 다음 사이클에 map-reduce 재개.
|
||||||
|
|
||||||
|
stdout 규약: 사람이 읽는 요약 행 + '{' 로 시작하는 기계 파싱용 JSON 라인(1건 1라인).
|
||||||
|
사람용 행은 절대 '{' 로 시작하지 않는다.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "app"))
|
||||||
|
|
||||||
|
from sqlalchemy import text as sql_text # noqa: E402
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine # noqa: E402
|
||||||
|
|
||||||
|
from services.hier_decomp.builder import build_hier_tree # noqa: E402
|
||||||
|
from services.summarize_units import ( # noqa: E402
|
||||||
|
CAP_TOKENS,
|
||||||
|
OVERRIDE_MIN_COVERAGE_PCT,
|
||||||
|
TRIGGER_TOKENS,
|
||||||
|
choose_override_source,
|
||||||
|
estimate_tokens,
|
||||||
|
extract_leaves,
|
||||||
|
leaf_spans,
|
||||||
|
plan_summarize_units,
|
||||||
|
validate_override_boundaries,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 초과 섹션 본문 덤프 분할 단위 (유인 세션 컨텍스트 보호)
|
||||||
|
DUMP_CHUNK_CHARS = 200_000
|
||||||
|
|
||||||
|
|
||||||
|
def _jsonl(obj: dict) -> None:
|
||||||
|
"""기계 파싱용 JSON 라인 — 반드시 '{' 로 시작하는 단독 라인."""
|
||||||
|
print(json.dumps(obj, ensure_ascii=False, default=str))
|
||||||
|
|
||||||
|
|
||||||
|
def _session_factory():
|
||||||
|
database_url = os.getenv(
|
||||||
|
"DATABASE_URL",
|
||||||
|
"postgresql+asyncpg://pkm:pkm@postgres:5432/pkm",
|
||||||
|
)
|
||||||
|
engine = create_async_engine(database_url)
|
||||||
|
return engine, async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── list ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
LIST_SQL = """
|
||||||
|
SELECT q.id AS queue_id, q.document_id, q.status, q.attempts,
|
||||||
|
q.payload::text AS payload_text,
|
||||||
|
LEFT(COALESCE(d.title, '(제목 없음)'), 80) AS title
|
||||||
|
FROM processing_queue q
|
||||||
|
JOIN documents d ON d.id = q.document_id
|
||||||
|
WHERE q.stage = 'deep_summary'
|
||||||
|
AND q.status IN ('pending', 'processing', 'failed')
|
||||||
|
AND (q.payload -> 'presegment' ->> 'awaiting_split') = 'true'
|
||||||
|
ORDER BY q.id
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_list() -> int:
|
||||||
|
engine, factory = _session_factory()
|
||||||
|
try:
|
||||||
|
async with factory() as session:
|
||||||
|
rows = (await session.execute(sql_text(LIST_SQL))).mappings().all()
|
||||||
|
finally:
|
||||||
|
await engine.dispose()
|
||||||
|
|
||||||
|
print(f"awaiting_split 보류 문서 {len(rows)}건")
|
||||||
|
for r in rows:
|
||||||
|
payload = json.loads(r["payload_text"] or "{}")
|
||||||
|
preseg = payload.get("presegment") or {}
|
||||||
|
oversized = preseg.get("oversized_sections") or []
|
||||||
|
print(
|
||||||
|
f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']} "
|
||||||
|
f"tier={preseg.get('tier')} over%={preseg.get('over_pct')} "
|
||||||
|
f"tokens={preseg.get('total_est_tokens'):,} units={preseg.get('units')}"
|
||||||
|
if isinstance(preseg.get("total_est_tokens"), int)
|
||||||
|
else f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']}"
|
||||||
|
)
|
||||||
|
print(f" 초과 섹션 {len(oversized)}건: {', '.join(str(t) for t in oversized[:3] if t)}")
|
||||||
|
print(
|
||||||
|
f" 보류 알람={preseg.get('alerted_at') or '-'} / "
|
||||||
|
f"재개 예정={payload.get('deferred_until') or '(즉시)'}"
|
||||||
|
+ (f" / 거부 사유={preseg.get('override_rejected')}" if preseg.get("override_rejected") else "")
|
||||||
|
)
|
||||||
|
_jsonl({
|
||||||
|
"cmd": "list",
|
||||||
|
"queue_id": r["queue_id"],
|
||||||
|
"doc_id": r["document_id"],
|
||||||
|
"title": r["title"],
|
||||||
|
"status": r["status"],
|
||||||
|
"tier": preseg.get("tier"),
|
||||||
|
"over_pct": preseg.get("over_pct"),
|
||||||
|
"total_est_tokens": preseg.get("total_est_tokens"),
|
||||||
|
"units": preseg.get("units"),
|
||||||
|
"oversized_sections": oversized,
|
||||||
|
"alerted_at": preseg.get("alerted_at"),
|
||||||
|
"deferred_until": payload.get("deferred_until"),
|
||||||
|
"override_rejected": preseg.get("override_rejected"),
|
||||||
|
})
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
# ─── export ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _safe_name(title: str | None, fallback: str) -> str:
|
||||||
|
t = re.sub(r"[^0-9A-Za-z가-힣._-]+", "_", (title or fallback)).strip("_")
|
||||||
|
return (t or fallback)[:60]
|
||||||
|
|
||||||
|
|
||||||
|
def _build_outline(text: str) -> str:
|
||||||
|
"""hier_decomp builder 재사용 — window-split 억제(요약 계획과 동일 환경) 개요."""
|
||||||
|
nodes = build_hier_tree(text, leaf_target_max=sys.maxsize, leaf_hard_max=sys.maxsize)
|
||||||
|
lines = []
|
||||||
|
for n in nodes:
|
||||||
|
indent = " " * max(n.level, 0)
|
||||||
|
title = n.section_title or "(preamble)"
|
||||||
|
tok = estimate_tokens(n.text)
|
||||||
|
mark = " [CAP 초과]" if n.is_leaf and tok > CAP_TOKENS else ""
|
||||||
|
lines.append(f"{indent}- L{n.level} {title} — {tok:,} tok{mark}")
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_export(doc_id: int, out_dir: str) -> int:
|
||||||
|
engine, factory = _session_factory()
|
||||||
|
try:
|
||||||
|
async with factory() as session:
|
||||||
|
row = (await session.execute(
|
||||||
|
sql_text(
|
||||||
|
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
|
||||||
|
),
|
||||||
|
{"d": doc_id},
|
||||||
|
)).mappings().first()
|
||||||
|
finally:
|
||||||
|
await engine.dispose()
|
||||||
|
|
||||||
|
if not row:
|
||||||
|
print(f"[error] 문서 id={doc_id} 없음")
|
||||||
|
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
|
||||||
|
return 1
|
||||||
|
|
||||||
|
source, text = choose_override_source(row["md_content"], row["extracted_text"])
|
||||||
|
if not text.strip():
|
||||||
|
print(f"[error] 문서 id={doc_id} 본문 비어있음 (md_content/extracted_text 둘 다)")
|
||||||
|
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "empty_text"})
|
||||||
|
return 1
|
||||||
|
|
||||||
|
plan = plan_summarize_units(text)
|
||||||
|
leaves = extract_leaves(text)
|
||||||
|
spans = leaf_spans(text, leaves)
|
||||||
|
|
||||||
|
out = Path(out_dir)
|
||||||
|
out.mkdir(parents=True, exist_ok=True)
|
||||||
|
files: list[str] = []
|
||||||
|
now_iso = datetime.now(timezone.utc).isoformat(timespec="seconds")
|
||||||
|
|
||||||
|
# ① 통계 + hier 개요
|
||||||
|
oversized_units = [u for u in plan.units if u.over_cap]
|
||||||
|
overview = [
|
||||||
|
f"# presegment export — doc {doc_id}",
|
||||||
|
"",
|
||||||
|
f"- 제목: {row['title'] or '(제목 없음)'}",
|
||||||
|
f"- source: {source} (len={len(text):,}자, 오프셋 기준 텍스트)",
|
||||||
|
f"- 추정 토큰: {plan.total_est_tokens:,} (trigger={TRIGGER_TOKENS:,} / cap={CAP_TOKENS:,})",
|
||||||
|
f"- plan: mode={plan.mode} tier={plan.tier} over%={plan.over_pct}",
|
||||||
|
f"- 유닛: 자동 pack {len(plan.units) - len(oversized_units)}개 + CAP 초과 {len(oversized_units)}개",
|
||||||
|
f"- 생성: {now_iso}",
|
||||||
|
"",
|
||||||
|
"## 유닛 제안 (summarize_units greedy-pack)",
|
||||||
|
"",
|
||||||
|
]
|
||||||
|
for u in plan.units:
|
||||||
|
if not u.leaf_indexes:
|
||||||
|
continue
|
||||||
|
s = spans[u.leaf_indexes[0]][0]
|
||||||
|
e = spans[u.leaf_indexes[-1]][1]
|
||||||
|
titles = " · ".join(t for t in u.section_titles if t) or "(무제 구간)"
|
||||||
|
flag = " ★CAP 초과 — 분할 필요" if u.over_cap else ""
|
||||||
|
overview.append(f"- 유닛 {u.index}: [{s}, {e}) {u.est_tokens:,} tok — {titles[:120]}{flag}")
|
||||||
|
overview += ["", "## hier 개요", "", _build_outline(text), ""]
|
||||||
|
(out / "overview.md").write_text("\n".join(overview), encoding="utf-8")
|
||||||
|
files.append("overview.md")
|
||||||
|
|
||||||
|
# ③ 초과 섹션 본문 덤프 (섹션당 파일 · 200K자 단위 분할 · 파일명에 절대 스팬)
|
||||||
|
boundaries: list[dict] = []
|
||||||
|
for u in plan.units:
|
||||||
|
if not u.leaf_indexes:
|
||||||
|
continue
|
||||||
|
s = spans[u.leaf_indexes[0]][0]
|
||||||
|
e = spans[u.leaf_indexes[-1]][1]
|
||||||
|
title = next((t for t in u.section_titles if t), None)
|
||||||
|
if not u.over_cap:
|
||||||
|
# ④ 자동 pack 유닛은 템플릿에 채워둔다
|
||||||
|
boundaries.append({"start": s, "end": e, "title": title or f"유닛 {u.index}"})
|
||||||
|
continue
|
||||||
|
boundaries.append({
|
||||||
|
"start": s, "end": e, "title": title or f"유닛 {u.index}",
|
||||||
|
"todo": (
|
||||||
|
f"CAP 초과({u.est_tokens:,} tok > {CAP_TOKENS:,}) — 이 스팬을 cap 이하 "
|
||||||
|
"경계 여러 개로 교체하고 todo 키를 제거할 것"
|
||||||
|
),
|
||||||
|
})
|
||||||
|
seg = text[s:e]
|
||||||
|
base = f"oversized_{u.index:03d}_{_safe_name(title, f'unit{u.index}')}"
|
||||||
|
for k in range(0, len(seg), DUMP_CHUNK_CHARS):
|
||||||
|
cs, ce = s + k, s + min(k + DUMP_CHUNK_CHARS, len(seg))
|
||||||
|
fname = f"{base}.{cs}_{ce}.md"
|
||||||
|
# 본문 원문 그대로 (헤더 미부착 — 파일 내 로컬 오프셋 + 파일명 cs 로 절대 오프셋 계산)
|
||||||
|
(out / fname).write_text(text[cs:ce], encoding="utf-8")
|
||||||
|
files.append(fname)
|
||||||
|
|
||||||
|
# ④ boundaries 템플릿 JSON
|
||||||
|
template = {
|
||||||
|
"doc_id": doc_id,
|
||||||
|
"source": source,
|
||||||
|
"source_len": len(text),
|
||||||
|
"cap_tokens": CAP_TOKENS,
|
||||||
|
"generated_at": now_iso,
|
||||||
|
"boundaries": boundaries,
|
||||||
|
}
|
||||||
|
(out / "boundaries_template.json").write_text(
|
||||||
|
json.dumps(template, ensure_ascii=False, indent=2), encoding="utf-8"
|
||||||
|
)
|
||||||
|
files.append("boundaries_template.json")
|
||||||
|
|
||||||
|
# 유인 세션용 작업 안내
|
||||||
|
readme = f"""# doc {doc_id} 유인 분할 안내
|
||||||
|
|
||||||
|
1. overview.md 로 구조 파악 (유닛 제안 + hier 개요).
|
||||||
|
2. oversized_*.md 본문을 읽고 의미 경계를 정한다.
|
||||||
|
- 파일명 `..._<cs>_<ce>.md` 의 cs = 파일 첫 문자의 절대 오프셋.
|
||||||
|
- 절대 오프셋 = cs + 파일 내 로컬 오프셋.
|
||||||
|
3. boundaries_template.json 의 todo 항목을 cap({CAP_TOKENS:,} tok) 이하 경계 여러 개로
|
||||||
|
교체하고 todo 키를 제거한다. 나머지 자동 pack 항목은 그대로 둬도 된다.
|
||||||
|
- 토큰 추정: 한글 0.529 tok/자 · 기타 0.217 tok/자 (services/summarize_units.py).
|
||||||
|
- 규칙: start 단조증가 · 비중첩 · 전체 커버리지 {OVERRIDE_MIN_COVERAGE_PCT:.0f}%+ · 유닛당 cap 이하.
|
||||||
|
4. 검증/적용:
|
||||||
|
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE> --dry-run
|
||||||
|
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE>
|
||||||
|
"""
|
||||||
|
(out / "README.md").write_text(readme, encoding="utf-8")
|
||||||
|
files.append("README.md")
|
||||||
|
|
||||||
|
print(f"doc {doc_id} [{row['title'] or '(제목 없음)'}] export 완료 → {out}")
|
||||||
|
print(
|
||||||
|
f" source={source} len={len(text):,}자 tokens={plan.total_est_tokens:,} "
|
||||||
|
f"tier={plan.tier} over%={plan.over_pct}"
|
||||||
|
)
|
||||||
|
print(f" 자동 pack 유닛 {len(plan.units) - len(oversized_units)}개 / TODO(초과) {len(oversized_units)}개")
|
||||||
|
print(f" 파일 {len(files)}개: {', '.join(files[:6])}{' ...' if len(files) > 6 else ''}")
|
||||||
|
_jsonl({
|
||||||
|
"cmd": "export", "ok": True, "doc_id": doc_id, "out": str(out),
|
||||||
|
"source": source, "source_len": len(text),
|
||||||
|
"total_est_tokens": plan.total_est_tokens, "tier": plan.tier,
|
||||||
|
"over_pct": plan.over_pct,
|
||||||
|
"units_auto": len(plan.units) - len(oversized_units),
|
||||||
|
"units_todo": len(oversized_units),
|
||||||
|
"files": files,
|
||||||
|
})
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
# ─── apply ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
QUEUE_ROW_SQL = """
|
||||||
|
SELECT id, status, attempts, payload::text AS payload_text
|
||||||
|
FROM processing_queue
|
||||||
|
WHERE document_id = :d AND stage = 'deep_summary'
|
||||||
|
AND status IN ('pending', 'processing', 'failed')
|
||||||
|
ORDER BY id DESC
|
||||||
|
LIMIT 1
|
||||||
|
"""
|
||||||
|
|
||||||
|
APPLY_UPDATE_SQL = """
|
||||||
|
UPDATE processing_queue
|
||||||
|
SET payload = CAST(:payload AS JSONB),
|
||||||
|
status = 'pending',
|
||||||
|
attempts = 0,
|
||||||
|
error_message = NULL
|
||||||
|
WHERE id = :qid
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_apply(doc_id: int, boundaries_file: str, dry_run: bool) -> int:
|
||||||
|
raw = json.loads(Path(boundaries_file).read_text(encoding="utf-8"))
|
||||||
|
if isinstance(raw, dict):
|
||||||
|
boundaries = raw.get("boundaries") or []
|
||||||
|
declared_source = raw.get("source")
|
||||||
|
declared_len = raw.get("source_len")
|
||||||
|
if raw.get("doc_id") not in (None, doc_id):
|
||||||
|
print(f"[error] boundaries 파일 doc_id={raw.get('doc_id')} != --doc {doc_id}")
|
||||||
|
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "doc_id_mismatch"})
|
||||||
|
return 1
|
||||||
|
else:
|
||||||
|
boundaries, declared_source, declared_len = raw, None, None
|
||||||
|
|
||||||
|
engine, factory = _session_factory()
|
||||||
|
try:
|
||||||
|
async with factory() as session:
|
||||||
|
doc = (await session.execute(
|
||||||
|
sql_text(
|
||||||
|
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
|
||||||
|
),
|
||||||
|
{"d": doc_id},
|
||||||
|
)).mappings().first()
|
||||||
|
if not doc:
|
||||||
|
print(f"[error] 문서 id={doc_id} 없음")
|
||||||
|
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
|
||||||
|
return 1
|
||||||
|
|
||||||
|
qrow = (await session.execute(sql_text(QUEUE_ROW_SQL), {"d": doc_id})).mappings().first()
|
||||||
|
if not qrow:
|
||||||
|
print(f"[error] doc {doc_id} 의 활성 deep_summary 큐 행 없음 (pending/processing/failed)")
|
||||||
|
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_row_not_found"})
|
||||||
|
return 1
|
||||||
|
if qrow["status"] == "processing":
|
||||||
|
print(f"[error] queue {qrow['id']} 가 processing 중 — 워커 완료/보류 후 재시도")
|
||||||
|
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_processing"})
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# 오프셋 기준 텍스트 — export 와 동일 규칙 (파일에 source 선언 시 그 선언 우선)
|
||||||
|
if declared_source in ("md_content", "extracted_text"):
|
||||||
|
source = declared_source
|
||||||
|
text = (doc["md_content"] if source == "md_content" else doc["extracted_text"]) or ""
|
||||||
|
else:
|
||||||
|
source, text = choose_override_source(doc["md_content"], doc["extracted_text"])
|
||||||
|
if declared_len is not None and declared_len != len(text):
|
||||||
|
print(
|
||||||
|
f"[error] source_len 불일치 — 파일={declared_len:,} vs 현재 {source}={len(text):,}"
|
||||||
|
" (본문 재생성됨 — export 부터 재실행)"
|
||||||
|
)
|
||||||
|
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "source_len_mismatch"})
|
||||||
|
return 1
|
||||||
|
|
||||||
|
check = validate_override_boundaries(text, boundaries)
|
||||||
|
for w in check.warnings:
|
||||||
|
print(f" [warn] {w}")
|
||||||
|
if not check.ok:
|
||||||
|
print(f"[error] 경계 검증 실패 — {len(check.errors)}건:")
|
||||||
|
for e in check.errors:
|
||||||
|
print(f" - {e}")
|
||||||
|
_jsonl({
|
||||||
|
"cmd": "apply", "ok": False, "doc_id": doc_id,
|
||||||
|
"error": "validation_failed", "errors": check.errors,
|
||||||
|
"warnings": check.warnings, "coverage_pct": check.coverage_pct,
|
||||||
|
})
|
||||||
|
return 1
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"doc {doc_id} [{doc['title'] or '(제목 없음)'}] 경계 검증 통과 — "
|
||||||
|
f"유닛 {len(check.boundaries)}개 / 커버리지 {check.coverage_pct}% / "
|
||||||
|
f"최대 유닛 {max(check.unit_tokens):,} tok (cap {CAP_TOKENS:,})"
|
||||||
|
)
|
||||||
|
for i, ((s, e, t), tok) in enumerate(zip(check.boundaries, check.unit_tokens)):
|
||||||
|
print(f" 유닛 {i}: [{s}, {e}) {tok:,} tok — {t or '(무제)'}")
|
||||||
|
|
||||||
|
payload = json.loads(qrow["payload_text"] or "{}")
|
||||||
|
preseg = dict(payload.get("presegment") or {})
|
||||||
|
preseg["units_override"] = {
|
||||||
|
"source": source,
|
||||||
|
"source_len": len(text),
|
||||||
|
"boundaries": [[s, e, t] for s, e, t in check.boundaries],
|
||||||
|
"applied_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
||||||
|
}
|
||||||
|
preseg["awaiting_split"] = False
|
||||||
|
# 알람 dedupe 리셋(다음 이벤트는 신선하게 발화) + 이전 거부/맵 잔재 제거
|
||||||
|
for k in ("alerted_at", "override_rejected", "override_rejected_at", "map_results"):
|
||||||
|
preseg.pop(k, None)
|
||||||
|
payload["presegment"] = preseg
|
||||||
|
payload.pop("deferred_until", None) # 즉시 재개
|
||||||
|
|
||||||
|
if dry_run:
|
||||||
|
print(f" [dry-run] queue {qrow['id']} 미변경 — 위 경계로 적용 가능")
|
||||||
|
_jsonl({
|
||||||
|
"cmd": "apply", "ok": True, "dry_run": True, "doc_id": doc_id,
|
||||||
|
"queue_id": qrow["id"], "units": len(check.boundaries),
|
||||||
|
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
|
||||||
|
})
|
||||||
|
return 0
|
||||||
|
|
||||||
|
await session.execute(
|
||||||
|
sql_text(APPLY_UPDATE_SQL),
|
||||||
|
{"payload": json.dumps(payload, ensure_ascii=False), "qid": qrow["id"]},
|
||||||
|
)
|
||||||
|
await session.commit()
|
||||||
|
print(
|
||||||
|
f" 적용 완료 — queue {qrow['id']} status=pending, deferred_until 제거 "
|
||||||
|
f"(다음 queue_consumer 사이클에 재개)"
|
||||||
|
)
|
||||||
|
_jsonl({
|
||||||
|
"cmd": "apply", "ok": True, "dry_run": False, "doc_id": doc_id,
|
||||||
|
"queue_id": qrow["id"], "units": len(check.boundaries),
|
||||||
|
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
|
||||||
|
})
|
||||||
|
return 0
|
||||||
|
finally:
|
||||||
|
await engine.dispose()
|
||||||
|
|
||||||
|
|
||||||
|
# ─── main ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
parser = argparse.ArgumentParser(description="presegment 유인 분할 CLI (PR3)")
|
||||||
|
sub = parser.add_subparsers(dest="cmd", required=True)
|
||||||
|
|
||||||
|
sub.add_parser("list", help="awaiting_split 보류 문서 목록")
|
||||||
|
|
||||||
|
p_export = sub.add_parser("export", help="유인 분할 작업 패키지 덤프")
|
||||||
|
p_export.add_argument("--doc", type=int, required=True)
|
||||||
|
p_export.add_argument("--out", default=None, help="출력 디렉토리 (기본 ./preseg_export_<doc>)")
|
||||||
|
|
||||||
|
p_apply = sub.add_parser("apply", help="완성된 boundaries 검증·적용(재개)")
|
||||||
|
p_apply.add_argument("--doc", type=int, required=True)
|
||||||
|
p_apply.add_argument("--boundaries", required=True, help="boundaries JSON 파일 경로")
|
||||||
|
p_apply.add_argument("--dry-run", action="store_true", help="검증만 하고 DB 미변경")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
if args.cmd == "list":
|
||||||
|
return asyncio.run(cmd_list())
|
||||||
|
if args.cmd == "export":
|
||||||
|
out = args.out or f"./preseg_export_{args.doc}"
|
||||||
|
return asyncio.run(cmd_export(args.doc, out))
|
||||||
|
if args.cmd == "apply":
|
||||||
|
return asyncio.run(cmd_apply(args.doc, args.boundaries, args.dry_run))
|
||||||
|
return 2
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
@@ -0,0 +1,486 @@
|
|||||||
|
"""presegment PR3 — HOLD 알람·units_override 재개·유인 분할 경계 검증 단위테스트.
|
||||||
|
|
||||||
|
test_deep_summary_mapreduce.py (PR2) 선례를 따르는 seam 단위 검증:
|
||||||
|
- services.alerts.send_alert: env 미설정 no-op(프로세스당 1회 로그)·synochat/ntfy
|
||||||
|
포맷·실패 시 절대 raise 금지 (webhook 은 전부 fake — 실호출 0).
|
||||||
|
- _hold_awaiting_split: 알람 발화 + alerted_at dedupe(7일).
|
||||||
|
- validate_override_boundaries: 정상/중첩/캡초과/커버리지 부족/TODO 잔존/범위 밖.
|
||||||
|
- process(): units_override 존재 시 tier 판정(plan_summarize_units) 우회 →
|
||||||
|
map-reduce 재개 / 잘못된 override 는 재-HOLD + 알람 / override 없는 문서 무회귀.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from types import SimpleNamespace
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
||||||
|
|
||||||
|
import httpx # noqa: E402
|
||||||
|
|
||||||
|
import services.alerts as alerts # noqa: E402
|
||||||
|
from ai.envelope import EscalationEnvelope # noqa: E402
|
||||||
|
from models.queue import StageDeferred # noqa: E402
|
||||||
|
from services.summarize_units import ( # noqa: E402
|
||||||
|
CAP_TOKENS,
|
||||||
|
choose_override_source,
|
||||||
|
estimate_tokens,
|
||||||
|
extract_leaves,
|
||||||
|
leaf_spans,
|
||||||
|
plan_summarize_units,
|
||||||
|
units_from_boundaries,
|
||||||
|
validate_override_boundaries,
|
||||||
|
)
|
||||||
|
import workers.deep_summary_worker as dsw # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
# ─── fixtures (PR2 테스트와 동일 계열) ───────────────────────────────────────
|
||||||
|
|
||||||
|
# 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole (HOLD 대상)
|
||||||
|
GIANT_WHOLE_MD = "# 통짜\n" + ("가" * 60_000)
|
||||||
|
# trigger(25K tok) 이하 소형 문서 — 기존 단일콜 경로 (무회귀 확인용)
|
||||||
|
SMALL_MD = "# 소형\n" + ("가" * 2_000)
|
||||||
|
|
||||||
|
MAP_JSON = (
|
||||||
|
'{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",'
|
||||||
|
' "inconsistencies": [], "confidence": 0.9}'
|
||||||
|
)
|
||||||
|
REDUCE_JSON = (
|
||||||
|
'{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",'
|
||||||
|
' "inconsistencies": [], "confidence": 0.8}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeSession:
|
||||||
|
def __init__(self, row=None):
|
||||||
|
self.commits = 0
|
||||||
|
self._row = row
|
||||||
|
|
||||||
|
async def commit(self):
|
||||||
|
self.commits += 1
|
||||||
|
|
||||||
|
|
||||||
|
class FakeProcSession(FakeSession):
|
||||||
|
"""process() 레벨 — session.get(Document) + execute(select queue_row) fake."""
|
||||||
|
|
||||||
|
def __init__(self, doc, row):
|
||||||
|
super().__init__(row)
|
||||||
|
self._doc = doc
|
||||||
|
|
||||||
|
async def get(self, model, pk):
|
||||||
|
return self._doc
|
||||||
|
|
||||||
|
async def execute(self, stmt):
|
||||||
|
row = self._row
|
||||||
|
return SimpleNamespace(scalar_one_or_none=lambda: row)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeClient:
|
||||||
|
"""deep 슬롯 보유 — call_deep_or_defer 가 call_deep 을 탄다 (PR2 테스트 동일)."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.ai = SimpleNamespace(
|
||||||
|
deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000)
|
||||||
|
)
|
||||||
|
self.prompts: list[str] = []
|
||||||
|
|
||||||
|
async def call_deep(self, prompt: str, system=None) -> str:
|
||||||
|
self.prompts.append(prompt)
|
||||||
|
if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커
|
||||||
|
return REDUCE_JSON
|
||||||
|
return MAP_JSON
|
||||||
|
|
||||||
|
async def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _doc(text: str = GIANT_WHOLE_MD, md_content: str | None = None):
|
||||||
|
return SimpleNamespace(
|
||||||
|
id=999,
|
||||||
|
title="테스트 문서",
|
||||||
|
extracted_text=text,
|
||||||
|
md_content=md_content,
|
||||||
|
ai_detail_summary=None,
|
||||||
|
ai_inconsistencies=None,
|
||||||
|
ai_analysis_tier="triage",
|
||||||
|
ai_processed_at=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _envelope_raw():
|
||||||
|
return {
|
||||||
|
"from_stage": "classify",
|
||||||
|
"escalation_reasons": ["long_context"],
|
||||||
|
"risk_flags": [],
|
||||||
|
"distilled_context": "4B 요지",
|
||||||
|
"original_pointers": {"doc_ids": [999]},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _envelope():
|
||||||
|
return EscalationEnvelope.from_json(json.dumps(_envelope_raw()))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def _patch_telemetry(monkeypatch):
|
||||||
|
events: list[dict] = []
|
||||||
|
|
||||||
|
async def fake_record(**kwargs):
|
||||||
|
events.append(kwargs)
|
||||||
|
|
||||||
|
monkeypatch.setattr(dsw, "record_analyze_event", fake_record)
|
||||||
|
return events
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def _alert_recorder(monkeypatch):
|
||||||
|
calls: list[tuple[str, str]] = []
|
||||||
|
|
||||||
|
async def fake_alert(title: str, message: str) -> bool:
|
||||||
|
calls.append((title, message))
|
||||||
|
return True
|
||||||
|
|
||||||
|
monkeypatch.setattr(dsw, "send_alert", fake_alert)
|
||||||
|
return calls
|
||||||
|
|
||||||
|
|
||||||
|
# ─── A. services.alerts ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class _FakeHttpxClient:
|
||||||
|
"""httpx.AsyncClient 대체 — post 호출 캡처. 실호출 0."""
|
||||||
|
|
||||||
|
captured: list[dict] = []
|
||||||
|
fail_with: Exception | None = None
|
||||||
|
status_code = 200
|
||||||
|
|
||||||
|
def __init__(self, timeout=None):
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, *exc):
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def post(self, url, **kwargs):
|
||||||
|
if _FakeHttpxClient.fail_with is not None:
|
||||||
|
raise _FakeHttpxClient.fail_with
|
||||||
|
_FakeHttpxClient.captured.append({"url": url, **kwargs})
|
||||||
|
return SimpleNamespace(status_code=_FakeHttpxClient.status_code, text="ok")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def _fake_httpx(monkeypatch):
|
||||||
|
_FakeHttpxClient.captured = []
|
||||||
|
_FakeHttpxClient.fail_with = None
|
||||||
|
_FakeHttpxClient.status_code = 200
|
||||||
|
monkeypatch.setattr(alerts.httpx, "AsyncClient", _FakeHttpxClient)
|
||||||
|
return _FakeHttpxClient
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_alert_noop_without_env(monkeypatch, _fake_httpx):
|
||||||
|
monkeypatch.delenv("ALERT_WEBHOOK_URL", raising=False)
|
||||||
|
monkeypatch.setattr(alerts, "_noop_logged", False)
|
||||||
|
infos: list[str] = []
|
||||||
|
monkeypatch.setattr(alerts.logger, "info", lambda msg, *a, **k: infos.append(msg))
|
||||||
|
|
||||||
|
assert await alerts.send_alert("t", "m") is False
|
||||||
|
assert await alerts.send_alert("t", "m") is False
|
||||||
|
assert len(infos) == 1 # 프로세스당 1회만 로그
|
||||||
|
assert _fake_httpx.captured == [] # webhook 미호출
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_alert_synochat_format(monkeypatch, _fake_httpx):
|
||||||
|
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
|
||||||
|
monkeypatch.delenv("ALERT_WEBHOOK_KIND", raising=False) # 기본 = synochat
|
||||||
|
|
||||||
|
assert await alerts.send_alert("제목", "본문 줄1\n줄2") is True
|
||||||
|
assert len(_fake_httpx.captured) == 1
|
||||||
|
call = _fake_httpx.captured[0]
|
||||||
|
assert call["url"] == "http://chat.local/webhook"
|
||||||
|
payload = json.loads(call["data"]["payload"])
|
||||||
|
assert payload == {"text": "제목\n본문 줄1\n줄2"}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_alert_ntfy_format(monkeypatch, _fake_httpx):
|
||||||
|
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://ntfy.local/ds")
|
||||||
|
monkeypatch.setenv("ALERT_WEBHOOK_KIND", "ntfy")
|
||||||
|
|
||||||
|
assert await alerts.send_alert("한글 제목", "알람 본문") is True
|
||||||
|
call = _fake_httpx.captured[0]
|
||||||
|
# 한글 제목은 헤더(latin-1 한정) 대신 query param
|
||||||
|
assert call["params"] == {"title": "한글 제목"}
|
||||||
|
assert call["content"] == "알람 본문".encode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_alert_failure_never_raises(monkeypatch, _fake_httpx):
|
||||||
|
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
|
||||||
|
_fake_httpx.fail_with = httpx.ConnectError("down")
|
||||||
|
assert await alerts.send_alert("t", "m") is False # raise 없이 False
|
||||||
|
|
||||||
|
_fake_httpx.fail_with = None
|
||||||
|
_fake_httpx.status_code = 500
|
||||||
|
assert await alerts.send_alert("t", "m") is False # HTTP 5xx 도 False
|
||||||
|
|
||||||
|
|
||||||
|
# ─── B. HOLD 알람 + alerted_at dedupe ───────────────────────────────────────
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_hold_alerts_once_then_dedupes(_alert_recorder):
|
||||||
|
plan = plan_summarize_units(GIANT_WHOLE_MD)
|
||||||
|
assert plan.tier == "whole"
|
||||||
|
row = SimpleNamespace(payload={"envelope": {"x": 1}})
|
||||||
|
session = FakeSession()
|
||||||
|
|
||||||
|
# 1차 HOLD — 알람 발화 + alerted_at 기록
|
||||||
|
with pytest.raises(StageDeferred):
|
||||||
|
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
|
||||||
|
assert len(_alert_recorder) == 1
|
||||||
|
title, message = _alert_recorder[0]
|
||||||
|
assert "999" in title
|
||||||
|
assert "테스트 문서" in message and "whole" in message
|
||||||
|
assert f"export --doc 999" in message # 유인 분할 힌트
|
||||||
|
alerted_at = row.payload["presegment"]["alerted_at"]
|
||||||
|
assert datetime.fromisoformat(alerted_at)
|
||||||
|
|
||||||
|
# 2차 재보류(24h 후 재계획 시나리오) — 7일 이내 → 재알람 억제
|
||||||
|
with pytest.raises(StageDeferred):
|
||||||
|
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
|
||||||
|
assert len(_alert_recorder) == 1
|
||||||
|
assert row.payload["presegment"]["alerted_at"] == alerted_at # 미갱신
|
||||||
|
|
||||||
|
# 3차 — alerted_at 이 8일 전이면 재발화 + 갱신
|
||||||
|
stale = (datetime.now(timezone.utc) - timedelta(days=8)).isoformat()
|
||||||
|
payload = dict(row.payload)
|
||||||
|
payload["presegment"] = {**payload["presegment"], "alerted_at": stale}
|
||||||
|
row.payload = payload
|
||||||
|
with pytest.raises(StageDeferred):
|
||||||
|
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
|
||||||
|
assert len(_alert_recorder) == 2
|
||||||
|
assert row.payload["presegment"]["alerted_at"] != stale
|
||||||
|
|
||||||
|
|
||||||
|
# ─── C. validate_override_boundaries ────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_validate_ok_full_coverage():
|
||||||
|
text = "가" * 40_000 # ≈ 21,160 tok
|
||||||
|
bounds = [[0, 20_000, "전반"], [20_000, 40_000, "후반"]]
|
||||||
|
check = validate_override_boundaries(text, bounds)
|
||||||
|
assert check.ok and check.errors == []
|
||||||
|
assert check.coverage_pct == 100.0
|
||||||
|
assert check.boundaries == [(0, 20_000, "전반"), (20_000, 40_000, "후반")]
|
||||||
|
assert all(t <= CAP_TOKENS for t in check.unit_tokens)
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_dict_form_and_units_from_boundaries():
|
||||||
|
text = "가" * 10_000
|
||||||
|
bounds = [{"start": 0, "end": 5_000, "title": "a"}, {"start": 5_000, "end": 10_000, "title": "b"}]
|
||||||
|
check = validate_override_boundaries(text, bounds)
|
||||||
|
assert check.ok
|
||||||
|
units = units_from_boundaries(text, check.boundaries)
|
||||||
|
assert [u.index for u in units] == [0, 1]
|
||||||
|
assert units[0].text == text[0:5_000]
|
||||||
|
assert units[0].section_titles == ["a"]
|
||||||
|
assert units[0].est_tokens == estimate_tokens(text[0:5_000])
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_rejects_overlap():
|
||||||
|
text = "가" * 10_000
|
||||||
|
check = validate_override_boundaries(text, [[0, 6_000, "a"], [5_000, 10_000, "b"]])
|
||||||
|
assert not check.ok
|
||||||
|
assert any("중첩" in e for e in check.errors)
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_rejects_cap_exceed():
|
||||||
|
text = "가" * 40_000 # 단일 유닛 ≈ 21,160 tok > CAP 12,000
|
||||||
|
check = validate_override_boundaries(text, [[0, 40_000, "통짜"]])
|
||||||
|
assert not check.ok
|
||||||
|
assert any("cap" in e and "유닛 0" in e for e in check.errors) # 어느 유닛인지 명시
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_rejects_low_coverage():
|
||||||
|
text = "가" * 10_000
|
||||||
|
check = validate_override_boundaries(text, [[0, 1_000, "머리만"]])
|
||||||
|
assert not check.ok
|
||||||
|
assert any("커버리지" in e for e in check.errors)
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_rejects_out_of_range_and_todo():
|
||||||
|
text = "가" * 1_000
|
||||||
|
check = validate_override_boundaries(text, [[0, 2_000, "밖"]])
|
||||||
|
assert not check.ok and any("범위 밖" in e for e in check.errors)
|
||||||
|
|
||||||
|
check2 = validate_override_boundaries(
|
||||||
|
text, [{"start": 0, "end": 1_000, "title": "t", "todo": "분할 필요"}]
|
||||||
|
)
|
||||||
|
assert not check2.ok and any("TODO" in e for e in check2.errors)
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_warns_on_gap_but_passes_coverage():
|
||||||
|
text = "가" * 100_000
|
||||||
|
# 4% 공백 구간 — 커버리지 96% (>=90) 통과 + 경고
|
||||||
|
bounds = [[0, 20_000, "a"], [24_000, 100_000, None]]
|
||||||
|
# 24000~100000 = 76000자 ≈ 40,204 tok > CAP → cap 완화해 gap 경고만 검증
|
||||||
|
check = validate_override_boundaries(text, bounds, cap=50_000)
|
||||||
|
assert check.ok
|
||||||
|
assert any("공백 구간" in w for w in check.warnings)
|
||||||
|
|
||||||
|
|
||||||
|
def test_choose_override_source_prefers_md_content():
|
||||||
|
assert choose_override_source("# md 본문", "추출본") == ("md_content", "# md 본문")
|
||||||
|
assert choose_override_source(" \n", "추출본") == ("extracted_text", "추출본")
|
||||||
|
assert choose_override_source(None, None) == ("extracted_text", "")
|
||||||
|
|
||||||
|
|
||||||
|
def test_leaf_spans_reconstruct_source():
|
||||||
|
md = "# 절 1\n" + "가" * 100 + "\n# 절 2\n" + "나" * 100
|
||||||
|
leaves = extract_leaves(md)
|
||||||
|
spans = leaf_spans(md, leaves)
|
||||||
|
assert len(spans) == len(leaves)
|
||||||
|
for (s, e), leaf in zip(spans, leaves):
|
||||||
|
assert md[s:e] == leaf.text
|
||||||
|
# 인접 파티션 — 이어붙이면 원문 전체
|
||||||
|
assert spans[0][0] == 0 and spans[-1][1] == len(md)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── D. units_override 재개 경로 (worker process 레벨) ──────────────────────
|
||||||
|
|
||||||
|
def _override_payload(text: str, n_units: int = 3, source: str = "extracted_text") -> dict:
|
||||||
|
step = len(text) // n_units
|
||||||
|
bounds = []
|
||||||
|
for i in range(n_units):
|
||||||
|
s = i * step
|
||||||
|
e = len(text) if i == n_units - 1 else (i + 1) * step
|
||||||
|
bounds.append([s, e, f"파트 {i + 1}"])
|
||||||
|
return {
|
||||||
|
"envelope": _envelope_raw(),
|
||||||
|
"subject_domain": "generic",
|
||||||
|
"presegment": {
|
||||||
|
"tier": "whole", "over_pct": 61.46, "awaiting_split": False,
|
||||||
|
"units_override": {
|
||||||
|
"source": source, "source_len": len(text), "boundaries": bounds,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_units_override_bypasses_tier_gate(monkeypatch, _patch_telemetry, _alert_recorder):
|
||||||
|
doc = _doc(GIANT_WHOLE_MD) # override 없으면 whole → HOLD 였을 문서
|
||||||
|
row = SimpleNamespace(payload=_override_payload(GIANT_WHOLE_MD, n_units=3))
|
||||||
|
session = FakeProcSession(doc, row)
|
||||||
|
|
||||||
|
client = FakeClient()
|
||||||
|
monkeypatch.setattr(dsw, "AIClient", lambda: client)
|
||||||
|
|
||||||
|
def _boom(*a, **k):
|
||||||
|
raise AssertionError("units_override 는 plan_summarize_units(tier 재판정)를 타면 안 됨")
|
||||||
|
|
||||||
|
monkeypatch.setattr(dsw, "plan_summarize_units", _boom)
|
||||||
|
|
||||||
|
await dsw.process(999, session)
|
||||||
|
|
||||||
|
# map 3 + reduce 1, 모든 콜 캡 준수 (오버헤드 = 정책 템플릿+envelope ~3K)
|
||||||
|
assert len(client.prompts) == 4
|
||||||
|
for p in client.prompts:
|
||||||
|
assert estimate_tokens(p) <= CAP_TOKENS + 3_000
|
||||||
|
assert doc.ai_detail_summary == "최종 상세."
|
||||||
|
assert doc.ai_analysis_tier == "deep"
|
||||||
|
preseg = row.payload["presegment"]
|
||||||
|
assert preseg["tier"] == "override"
|
||||||
|
assert preseg["over_pct"] == 61.46 # HOLD 당시 실측치 보존
|
||||||
|
assert len(preseg["map_results"]) == 3
|
||||||
|
assert preseg["units_override"]["boundaries"][0][2] == "파트 1" # override 보존(감사)
|
||||||
|
assert _alert_recorder == [] # 정상 재개 — 알람 없음
|
||||||
|
assert len(_patch_telemetry) == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_bad_override_reholds_and_alerts(monkeypatch, _patch_telemetry, _alert_recorder):
|
||||||
|
doc = _doc(GIANT_WHOLE_MD)
|
||||||
|
payload = _override_payload(GIANT_WHOLE_MD, n_units=1) # 단일 유닛 ≈ 31.7K tok > CAP*1.1
|
||||||
|
row = SimpleNamespace(payload=payload)
|
||||||
|
session = FakeProcSession(doc, row)
|
||||||
|
|
||||||
|
def _no_llm():
|
||||||
|
raise AssertionError("잘못된 override 는 LLM 콜(900s 재생산)로 흐르면 안 됨")
|
||||||
|
|
||||||
|
monkeypatch.setattr(dsw, "AIClient", _no_llm)
|
||||||
|
|
||||||
|
with pytest.raises(StageDeferred) as ei:
|
||||||
|
await dsw.process(999, session)
|
||||||
|
|
||||||
|
assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES
|
||||||
|
preseg = row.payload["presegment"]
|
||||||
|
assert preseg["awaiting_split"] is True # 재-HOLD
|
||||||
|
assert "cap" in preseg["override_rejected"]
|
||||||
|
assert "units_override" in preseg # 원인 조사용 보존
|
||||||
|
assert len(_alert_recorder) == 1 # 거부 알람
|
||||||
|
assert "거부" in _alert_recorder[0][0]
|
||||||
|
assert doc.ai_detail_summary is None
|
||||||
|
assert _patch_telemetry == []
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_override_source_len_mismatch_reholds(monkeypatch, _alert_recorder):
|
||||||
|
doc = _doc(GIANT_WHOLE_MD)
|
||||||
|
payload = _override_payload(GIANT_WHOLE_MD, n_units=3)
|
||||||
|
payload["presegment"]["units_override"]["source_len"] = 12_345 # 본문 재생성 시나리오
|
||||||
|
row = SimpleNamespace(payload=payload)
|
||||||
|
session = FakeProcSession(doc, row)
|
||||||
|
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
|
||||||
|
|
||||||
|
with pytest.raises(StageDeferred):
|
||||||
|
await dsw.process(999, session)
|
||||||
|
|
||||||
|
assert row.payload["presegment"]["awaiting_split"] is True
|
||||||
|
assert "source_len 불일치" in row.payload["presegment"]["override_rejected"]
|
||||||
|
assert len(_alert_recorder) == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_override_small_doc_keeps_single_call_path(
|
||||||
|
monkeypatch, _patch_telemetry, _alert_recorder
|
||||||
|
):
|
||||||
|
"""무회귀 — units_override 없는 소형 문서는 기존 단일콜 경로 그대로."""
|
||||||
|
doc = _doc(SMALL_MD)
|
||||||
|
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
|
||||||
|
session = FakeProcSession(doc, row)
|
||||||
|
client = FakeClient()
|
||||||
|
monkeypatch.setattr(dsw, "AIClient", lambda: client)
|
||||||
|
|
||||||
|
await dsw.process(999, session)
|
||||||
|
|
||||||
|
assert len(client.prompts) == 1 # 단일콜 (map-reduce 아님)
|
||||||
|
assert SMALL_MD[:200].split("\n")[1][:50] in client.prompts[0] # 원문 슬라이스 포함
|
||||||
|
assert doc.ai_detail_summary == "유닛 상세."
|
||||||
|
assert "presegment" not in row.payload # payload 무변경
|
||||||
|
assert _alert_recorder == []
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_override_whole_doc_still_holds_with_alert(
|
||||||
|
monkeypatch, _patch_telemetry, _alert_recorder
|
||||||
|
):
|
||||||
|
"""override 미주입 whole 문서 — 기존 HOLD 시멘틱 유지 + PR3 알람만 추가."""
|
||||||
|
doc = _doc(GIANT_WHOLE_MD)
|
||||||
|
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
|
||||||
|
session = FakeProcSession(doc, row)
|
||||||
|
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
|
||||||
|
|
||||||
|
with pytest.raises(StageDeferred):
|
||||||
|
await dsw.process(999, session)
|
||||||
|
|
||||||
|
preseg = row.payload["presegment"]
|
||||||
|
assert preseg["awaiting_split"] is True and preseg["tier"] == "whole"
|
||||||
|
assert len(_alert_recorder) == 1
|
||||||
|
assert "HOLD" in _alert_recorder[0][0]
|
||||||
+66
-156
@@ -4,6 +4,8 @@ services/queue_overview 의 SQL 수집부와 분리된 순수 판정 함수
|
|||||||
(stage_machine_map / build_machines / build_summarize_eta / build_trend /
|
(stage_machine_map / build_machines / build_summarize_eta / build_trend /
|
||||||
build_totals / compute_eta_minutes / rows_to_* / display_title) 를
|
build_totals / compute_eta_minutes / rows_to_* / display_title) 를
|
||||||
mock 행으로 검증한다. 통합(실 SQL)은 배포 후 라이브 smoke 로 확인.
|
mock 행으로 검증한다. 통합(실 SQL)은 배포 후 라이브 smoke 로 확인.
|
||||||
|
|
||||||
|
2026-07-02 컷오버 후 2노드(나스+맥미니) 기준 — 구 3노드 레인은 제거됨.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@@ -18,7 +20,6 @@ from services.queue_overview import (
|
|||||||
compute_eta_minutes,
|
compute_eta_minutes,
|
||||||
display_title,
|
display_title,
|
||||||
rows_to_stage_stats,
|
rows_to_stage_stats,
|
||||||
rows_to_summarize_split,
|
|
||||||
stage_machine_map,
|
stage_machine_map,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -36,186 +37,115 @@ def _stage(**kw) -> dict:
|
|||||||
return base
|
return base
|
||||||
|
|
||||||
|
|
||||||
def _split(macbook: dict | None = None, macmini: dict | None = None) -> dict:
|
|
||||||
"""summarize 풀 완료 실적 split — 미지정 0."""
|
|
||||||
zero = {"done_1h": 0, "done_today": 0, "done_15m": 0}
|
|
||||||
return {
|
|
||||||
"macbook": {**zero, **(macbook or {})},
|
|
||||||
"macmini": {**zero, **(macmini or {})},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _machine(machines: list[dict], key: str) -> dict:
|
def _machine(machines: list[dict], key: str) -> dict:
|
||||||
return next(m for m in machines if m["key"] == key)
|
return next(m for m in machines if m["key"] == key)
|
||||||
|
|
||||||
|
|
||||||
# ─── stage→machine 귀속 맵 ────────────────────────────────────────────────────
|
# ─── stage→machine 귀속 맵 ────────────────────────────────────────────────────
|
||||||
|
|
||||||
def test_stage_machine_map_deep_enabled():
|
def test_stage_machine_map_two_nodes():
|
||||||
smap = stage_machine_map(deep_enabled=True)
|
smap = stage_machine_map()
|
||||||
for s in ("extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt"):
|
for s in ("extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt"):
|
||||||
assert smap[s] == "gpu"
|
assert smap[s] == "nas"
|
||||||
assert smap["classify"] == "macmini"
|
assert smap["classify"] == "macmini"
|
||||||
assert smap["summarize"] == "macmini"
|
assert smap["summarize"] == "macmini"
|
||||||
assert smap["deep_summary"] == "macbook"
|
|
||||||
|
|
||||||
|
|
||||||
def test_stage_machine_map_deep_disabled():
|
|
||||||
"""deep 슬롯 부재 시 deep_summary 도 macmini 귀속."""
|
|
||||||
smap = stage_machine_map(deep_enabled=False)
|
|
||||||
assert smap["deep_summary"] == "macmini"
|
assert smap["deep_summary"] == "macmini"
|
||||||
|
|
||||||
|
|
||||||
# ─── 머신 카드 귀속 합산 ──────────────────────────────────────────────────────
|
# ─── 머신 카드 귀속 합산 ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
def test_gpu_stage_counts_attribution():
|
def test_nas_stage_counts_attribution():
|
||||||
stats = {
|
stats = {
|
||||||
"extract": _stage(pending=3, processing=1, done_1h=5, done_today=9, done_15m=1),
|
"extract": _stage(pending=3, processing=1, done_1h=5, done_today=9, done_15m=1),
|
||||||
"stt": _stage(failed=2, done_1h=1, done_today=2),
|
"stt": _stage(failed=2, done_1h=1, done_today=2),
|
||||||
}
|
}
|
||||||
machines = build_machines(stats, _split(), [], deep_enabled=True)
|
machines = build_machines(stats, [])
|
||||||
gpu = _machine(machines, "gpu")
|
nas = _machine(machines, "nas")
|
||||||
assert (gpu["pending"], gpu["processing"], gpu["failed"]) == (3, 1, 2)
|
assert (nas["pending"], nas["processing"], nas["failed"]) == (3, 1, 2)
|
||||||
assert (gpu["done_1h"], gpu["done_today"]) == (6, 11)
|
assert (nas["done_1h"], nas["done_today"]) == (6, 11)
|
||||||
# gpu 의 stages 는 정적 8종 전부 (집계 0 이어도 표시)
|
# nas 의 stages 는 정적 8종 전부 (집계 0 이어도 표시)
|
||||||
assert gpu["stages"] == [
|
assert nas["stages"] == [
|
||||||
"extract", "embed", "chunk", "markdown",
|
"extract", "embed", "chunk", "markdown",
|
||||||
"preview", "thumbnail", "fulltext", "stt",
|
"preview", "thumbnail", "fulltext", "stt",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_summarize_pool_split_attribution():
|
def test_macmini_llm_stages_attribution():
|
||||||
"""summarize pending/failed = macmini 귀속, 완료 실적은 split 로 분리 —
|
"""classify/summarize/deep_summary 전부 macmini 귀속 (단일 생성 LLM 허브)."""
|
||||||
stage-level summarize done 수치는 카드에 이중 합산되지 않는다."""
|
|
||||||
stats = {
|
stats = {
|
||||||
"classify": _stage(done_1h=2, done_today=3),
|
"classify": _stage(done_1h=2, done_today=3),
|
||||||
"summarize": _stage(pending=7, failed=1, done_1h=10, done_today=20),
|
"summarize": _stage(pending=7, failed=1, done_1h=10, done_today=20),
|
||||||
|
"deep_summary": _stage(pending=2, processing=1, done_1h=3, done_today=4),
|
||||||
}
|
}
|
||||||
split = _split(macbook={"done_1h": 4, "done_today": 8}, macmini={"done_1h": 6, "done_today": 12})
|
machines = build_machines(stats, [])
|
||||||
machines = build_machines(stats, split, [], deep_enabled=True)
|
|
||||||
macmini = _machine(machines, "macmini")
|
macmini = _machine(machines, "macmini")
|
||||||
macbook = _machine(machines, "macbook")
|
assert macmini["pending"] == 9 and macmini["failed"] == 1
|
||||||
|
assert macmini["processing"] == 1
|
||||||
assert macmini["pending"] == 7 and macmini["failed"] == 1
|
assert macmini["done_1h"] == 2 + 10 + 3
|
||||||
assert macmini["done_1h"] == 2 + 6 # classify + macmini 몫 (10 아님)
|
assert macmini["done_today"] == 3 + 20 + 4
|
||||||
assert macmini["done_today"] == 3 + 12
|
assert macmini["stages"] == ["classify", "summarize", "deep_summary"]
|
||||||
assert macbook["done_1h"] == 4 and macbook["done_today"] == 8
|
assert _machine(machines, "nas")["pending"] == 0
|
||||||
assert macbook["pending"] == 0 # 풀 pending 은 macmini 만
|
|
||||||
|
|
||||||
|
|
||||||
def test_summarize_by_machine_projection():
|
def test_deferred_pending_on_macmini_card():
|
||||||
"""build_summarize_by_machine = split 의 done_1h/done_today 를 머신별로 투영
|
"""보류(deferred_until 미래)는 summarize+deep_summary 합산으로 macmini 카드 귀속
|
||||||
(done_15m 은 제외 — 내부 state 판정 전용)."""
|
(보류 = LLM 백오프 신호)."""
|
||||||
from services.queue_overview import build_summarize_by_machine
|
|
||||||
split = _split(
|
|
||||||
macbook={"done_1h": 226, "done_today": 312, "done_15m": 60},
|
|
||||||
macmini={"done_1h": 37, "done_today": 94, "done_15m": 9},
|
|
||||||
)
|
|
||||||
sbm = build_summarize_by_machine(split)
|
|
||||||
assert sbm == {
|
|
||||||
"macmini": {"done_1h": 37, "done_today": 94},
|
|
||||||
"macbook": {"done_1h": 226, "done_today": 312},
|
|
||||||
}
|
|
||||||
assert "done_15m" not in sbm["macbook"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_compose_overview_includes_summarize_by_machine():
|
|
||||||
"""compose_overview 응답 계약에 summarize_by_machine 포함 (FE 레인 분담 재료)."""
|
|
||||||
now_kst = datetime(2026, 6, 13, 13, 0, tzinfo=KST)
|
|
||||||
stats = {"summarize": _stage(pending=1317, done_1h=264)}
|
|
||||||
split = _split(macbook={"done_1h": 226, "done_today": 312}, macmini={"done_1h": 37, "done_today": 94})
|
|
||||||
ov = compose_overview(stats, split, {}, {}, [], deep_enabled=True, now_kst=now_kst)
|
|
||||||
assert ov["summarize_by_machine"]["macbook"]["done_1h"] == 226
|
|
||||||
assert ov["summarize_by_machine"]["macmini"]["done_today"] == 94
|
|
||||||
|
|
||||||
|
|
||||||
def test_deep_disabled_deep_summary_counts_to_macmini():
|
|
||||||
stats = {"deep_summary": _stage(pending=2, processing=1, done_1h=3, done_today=4)}
|
|
||||||
machines = build_machines(stats, _split(), [], deep_enabled=False)
|
|
||||||
macmini = _machine(machines, "macmini")
|
|
||||||
macbook = _machine(machines, "macbook")
|
|
||||||
assert macmini["pending"] == 2 and macmini["processing"] == 1
|
|
||||||
assert macmini["done_1h"] == 3 and macmini["done_today"] == 4
|
|
||||||
assert macbook["stages"] == [] and macbook["pending"] == 0
|
|
||||||
assert _machine(machines, "macmini")["stages"] == ["classify", "summarize", "deep_summary"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_deferred_pending_always_on_macbook_card():
|
|
||||||
"""보류(deferred_until 미래)는 summarize+deep_summary 합산으로 macbook 카드 귀속.
|
|
||||||
deep 슬롯 유무와 무관 (보류 = 맥북 불가 신호)."""
|
|
||||||
stats = {
|
stats = {
|
||||||
"summarize": _stage(pending=5, deferred_pending=2),
|
"summarize": _stage(pending=5, deferred_pending=2),
|
||||||
"deep_summary": _stage(pending=1, deferred_pending=1),
|
"deep_summary": _stage(pending=1, deferred_pending=1),
|
||||||
}
|
}
|
||||||
for deep_enabled in (True, False):
|
machines = build_machines(stats, [])
|
||||||
machines = build_machines(stats, _split(), [], deep_enabled=deep_enabled)
|
assert _machine(machines, "macmini")["deferred_pending"] == 3
|
||||||
assert _machine(machines, "macbook")["deferred_pending"] == 3
|
assert _machine(machines, "nas")["deferred_pending"] == 0
|
||||||
assert _machine(machines, "gpu")["deferred_pending"] == 0
|
|
||||||
assert _machine(machines, "macmini")["deferred_pending"] == 0
|
|
||||||
|
|
||||||
|
|
||||||
# ─── state 판정 ───────────────────────────────────────────────────────────────
|
# ─── state 판정 ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def test_macbook_state_active_wins_over_deferred_while_working():
|
def test_macmini_state_active_wins_over_deferred_while_working():
|
||||||
"""가동 > 보류 (사용자 피드백 2026-06-11): 일하고 있으면 백오프 잔여가 있어도 '가동'.
|
"""가동 > 보류 (사용자 피드백 2026-06-11): 일하고 있으면 백오프 잔여가 있어도 '가동'.
|
||||||
|
|
||||||
보류 건수는 deferred_pending 필드가 별도로 전달 — 카드 라인이 표시.
|
보류 건수는 deferred_pending 필드가 별도로 전달 — 카드 라인이 표시.
|
||||||
"""
|
"""
|
||||||
stats = {"summarize": _stage(pending=1, deferred_pending=1)}
|
stats = {"summarize": _stage(pending=1, deferred_pending=1, done_15m=3)}
|
||||||
split = _split(macbook={"done_15m": 3})
|
machines = build_machines(stats, [])
|
||||||
machines = build_machines(stats, split, [], deep_enabled=True)
|
mm = _machine(machines, "macmini")
|
||||||
mb = _machine(machines, "macbook")
|
assert mm["state"] == "active"
|
||||||
assert mb["state"] == "active"
|
assert mm["deferred_pending"] == 1
|
||||||
assert mb["deferred_pending"] == 1
|
|
||||||
|
|
||||||
|
|
||||||
def test_macbook_state_deferred_only_when_not_working():
|
def test_macmini_state_deferred_only_when_not_working():
|
||||||
"""일이 멈춰 있고(처리 0·최근 완료 0) 백오프만 쌓인 상태에서만 '보류'."""
|
"""일이 멈춰 있고(처리 0·최근 완료 0) 백오프만 쌓인 상태에서만 '보류'."""
|
||||||
stats = {"summarize": _stage(pending=1, deferred_pending=1)}
|
stats = {"summarize": _stage(pending=1, deferred_pending=1)}
|
||||||
machines = build_machines(stats, _split(), [], deep_enabled=True)
|
machines = build_machines(stats, [])
|
||||||
assert _machine(machines, "macbook")["state"] == "deferred"
|
assert _machine(machines, "macmini")["state"] == "deferred"
|
||||||
|
|
||||||
|
|
||||||
def test_macbook_state_active_on_recent_qwen_done():
|
def test_macmini_state_idle():
|
||||||
split = _split(macbook={"done_15m": 1})
|
machines = build_machines({}, [])
|
||||||
machines = build_machines({}, split, [], deep_enabled=True)
|
|
||||||
assert _machine(machines, "macbook")["state"] == "active"
|
|
||||||
|
|
||||||
|
|
||||||
def test_macbook_state_idle():
|
|
||||||
machines = build_machines({}, _split(), [], deep_enabled=True)
|
|
||||||
assert _machine(machines, "macbook")["state"] == "idle"
|
|
||||||
|
|
||||||
|
|
||||||
def test_gpu_state_active_on_processing():
|
|
||||||
stats = {"extract": _stage(processing=1)}
|
|
||||||
machines = build_machines(stats, _split(), [], deep_enabled=True)
|
|
||||||
assert _machine(machines, "gpu")["state"] == "active"
|
|
||||||
|
|
||||||
|
|
||||||
def test_gpu_state_active_on_recent_done():
|
|
||||||
stats = {"embed": _stage(done_15m=2)}
|
|
||||||
machines = build_machines(stats, _split(), [], deep_enabled=True)
|
|
||||||
assert _machine(machines, "gpu")["state"] == "active"
|
|
||||||
|
|
||||||
|
|
||||||
def test_gpu_state_idle_when_old_done_only():
|
|
||||||
stats = {"embed": _stage(done_1h=5, done_today=9)} # 15분 내 완료 없음
|
|
||||||
machines = build_machines(stats, _split(), [], deep_enabled=True)
|
|
||||||
assert _machine(machines, "gpu")["state"] == "idle"
|
|
||||||
|
|
||||||
|
|
||||||
def test_macmini_state_not_active_on_macbook_pool_done():
|
|
||||||
"""summarize 풀 완료가 전부 macbook 몫이면 macmini 는 active 아님 (귀속 기준)."""
|
|
||||||
stats = {"summarize": _stage(done_15m=1)}
|
|
||||||
split = _split(macbook={"done_15m": 1})
|
|
||||||
machines = build_machines(stats, split, [], deep_enabled=True)
|
|
||||||
assert _machine(machines, "macmini")["state"] == "idle"
|
assert _machine(machines, "macmini")["state"] == "idle"
|
||||||
|
|
||||||
|
|
||||||
|
def test_nas_state_active_on_processing():
|
||||||
|
stats = {"extract": _stage(processing=1)}
|
||||||
|
machines = build_machines(stats, [])
|
||||||
|
assert _machine(machines, "nas")["state"] == "active"
|
||||||
|
|
||||||
|
|
||||||
|
def test_nas_state_active_on_recent_done():
|
||||||
|
stats = {"embed": _stage(done_15m=2)}
|
||||||
|
machines = build_machines(stats, [])
|
||||||
|
assert _machine(machines, "nas")["state"] == "active"
|
||||||
|
|
||||||
|
|
||||||
|
def test_nas_state_idle_when_old_done_only():
|
||||||
|
stats = {"embed": _stage(done_1h=5, done_today=9)} # 15분 내 완료 없음
|
||||||
|
machines = build_machines(stats, [])
|
||||||
|
assert _machine(machines, "nas")["state"] == "idle"
|
||||||
|
|
||||||
|
|
||||||
def test_macmini_state_active_on_summarize_processing():
|
def test_macmini_state_active_on_summarize_processing():
|
||||||
stats = {"summarize": _stage(processing=1)}
|
stats = {"summarize": _stage(processing=1)}
|
||||||
machines = build_machines(stats, _split(), [], deep_enabled=True)
|
machines = build_machines(stats, [])
|
||||||
assert _machine(machines, "macmini")["state"] == "active"
|
assert _machine(machines, "macmini")["state"] == "active"
|
||||||
|
|
||||||
|
|
||||||
@@ -228,21 +158,18 @@ def test_current_summarize_to_macmini_max_two():
|
|||||||
{"stage": "summarize", "document_id": 3, "title": "문서C", "original_filename": None, "file_path": None},
|
{"stage": "summarize", "document_id": 3, "title": "문서C", "original_filename": None, "file_path": None},
|
||||||
{"stage": "extract", "document_id": 4, "title": "문서D", "original_filename": None, "file_path": None},
|
{"stage": "extract", "document_id": 4, "title": "문서D", "original_filename": None, "file_path": None},
|
||||||
]
|
]
|
||||||
machines = build_machines({}, _split(), rows, deep_enabled=True)
|
machines = build_machines({}, rows)
|
||||||
macmini = _machine(machines, "macmini")
|
macmini = _machine(machines, "macmini")
|
||||||
gpu = _machine(machines, "gpu")
|
nas = _machine(machines, "nas")
|
||||||
assert [c["document_id"] for c in macmini["current"]] == [1, 2] # 최대 2건
|
assert [c["document_id"] for c in macmini["current"]] == [1, 2] # 최대 2건
|
||||||
assert macmini["current"][0] == {"document_id": 1, "title": "문서A", "stage": "summarize"}
|
assert macmini["current"][0] == {"document_id": 1, "title": "문서A", "stage": "summarize"}
|
||||||
assert [c["document_id"] for c in gpu["current"]] == [4]
|
assert [c["document_id"] for c in nas["current"]] == [4]
|
||||||
assert _machine(machines, "macbook")["current"] == []
|
|
||||||
|
|
||||||
|
|
||||||
def test_current_deep_summary_follows_deep_slot():
|
def test_current_deep_summary_to_macmini():
|
||||||
rows = [{"stage": "deep_summary", "document_id": 9, "title": "심층", "original_filename": None, "file_path": None}]
|
rows = [{"stage": "deep_summary", "document_id": 9, "title": "심층", "original_filename": None, "file_path": None}]
|
||||||
enabled = build_machines({}, _split(), rows, deep_enabled=True)
|
machines = build_machines({}, rows)
|
||||||
disabled = build_machines({}, _split(), rows, deep_enabled=False)
|
assert _machine(machines, "macmini")["current"][0]["document_id"] == 9
|
||||||
assert _machine(enabled, "macbook")["current"][0]["document_id"] == 9
|
|
||||||
assert _machine(disabled, "macmini")["current"][0]["document_id"] == 9
|
|
||||||
|
|
||||||
|
|
||||||
def test_display_title_fallback_chain():
|
def test_display_title_fallback_chain():
|
||||||
@@ -344,32 +271,15 @@ def test_rows_to_stage_stats_conversion():
|
|||||||
assert stats["summarize"]["deferred_pending"] == 2
|
assert stats["summarize"]["deferred_pending"] == 2
|
||||||
|
|
||||||
|
|
||||||
def test_rows_to_summarize_split_conversion():
|
|
||||||
rows = [
|
|
||||||
(True, 4, 8, 1), # is_macbook
|
|
||||||
(False, 6, 12, 0),
|
|
||||||
]
|
|
||||||
split = rows_to_summarize_split(rows)
|
|
||||||
assert split["macbook"] == {"done_1h": 4, "done_today": 8, "done_15m": 1}
|
|
||||||
assert split["macmini"] == {"done_1h": 6, "done_today": 12, "done_15m": 0}
|
|
||||||
|
|
||||||
|
|
||||||
def test_rows_to_summarize_split_empty():
|
|
||||||
split = rows_to_summarize_split([])
|
|
||||||
assert split["macbook"]["done_1h"] == 0 and split["macmini"]["done_today"] == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_compose_overview_contract_shape():
|
def test_compose_overview_contract_shape():
|
||||||
"""응답 dict 의 키가 FE 계약 shape 과 정확히 일치하는지 고정."""
|
"""응답 dict 의 키가 FE 계약 shape 과 정확히 일치하는지 고정."""
|
||||||
out = compose_overview(
|
out = compose_overview(
|
||||||
{"summarize": _stage(pending=1)},
|
{"summarize": _stage(pending=1)},
|
||||||
_split(),
|
|
||||||
{}, {}, [],
|
{}, {}, [],
|
||||||
deep_enabled=True,
|
|
||||||
now_kst=datetime(2026, 6, 11, 14, 30, tzinfo=KST),
|
now_kst=datetime(2026, 6, 11, 14, 30, tzinfo=KST),
|
||||||
)
|
)
|
||||||
assert set(out.keys()) == {"machines", "stages", "summarize_eta", "trend_24h", "totals"}
|
assert set(out.keys()) == {"machines", "stages", "summarize_eta", "trend_24h", "totals"}
|
||||||
assert [m["key"] for m in out["machines"]] == ["gpu", "macmini", "macbook"]
|
assert [m["key"] for m in out["machines"]] == ["nas", "macmini"]
|
||||||
for m in out["machines"]:
|
for m in out["machines"]:
|
||||||
assert set(m.keys()) == {
|
assert set(m.keys()) == {
|
||||||
"key", "label", "state", "stages", "pending", "processing", "failed",
|
"key", "label", "state", "stages", "pending", "processing", "failed",
|
||||||
@@ -381,7 +291,7 @@ def test_compose_overview_contract_shape():
|
|||||||
assert set(out["trend_24h"][0].keys()) == {"hour", "inflow", "done"}
|
assert set(out["trend_24h"][0].keys()) == {"hour", "inflow", "done"}
|
||||||
assert set(out["totals"].keys()) == {"pending", "processing", "failed"}
|
assert set(out["totals"].keys()) == {"pending", "processing", "failed"}
|
||||||
# 머신 label 고정 (raw 모델명 노출 금지 — label 만)
|
# 머신 label 고정 (raw 모델명 노출 금지 — label 만)
|
||||||
assert [m["label"] for m in out["machines"]] == ["GPU 서버", "맥미니", "맥북 M5 Max"]
|
assert [m["label"] for m in out["machines"]] == ["나스", "맥미니"]
|
||||||
|
|
||||||
|
|
||||||
# ─── build_stages (단계별 현황 — 2026-06-11 사용자 피드백: 완료 가시화) ──────
|
# ─── build_stages (단계별 현황 — 2026-06-11 사용자 피드백: 완료 가시화) ──────
|
||||||
|
|||||||
Reference in New Issue
Block a user