Compare commits

...

10 Commits

Author SHA1 Message Date
hyungi bb9e0905f2 feat(study): 습관 루프 — 오늘의 몫·스트릭/잔디·아침 리마인더 webhook
organic 사용 0 진단(트리거 없음·due 산더미·반복 감각 부재) 대응 3조각:
- 오늘의 몫: /api/study/daily (문제5·카드5·개념1·검수3, 가용량 보정) + /study 홈 최상단
  체크리스트/완료 상태. 진도·복습 백로그는 접이식 강등(무변경+cap, D2).
- 스트릭/잔디: KST 일 단위 활동 집계(attempts+document_reads+카드평가 근사).
- 아침 리마인더: study_reminder 09 KST 슬롯 한정 Synology Chat incoming webhook
  (STUDY_REMINDER_WEBHOOK_URL, 빈 값=off, 몫 완료 시 skip). 07-01 push 폐기 결정의
  flip 조건(홈 pull 실패) 충족에 따른 사용자 합의 D1.
- 마이그 383: study_memo_cards.reviewed_at (검수 처리 집계용, 승인/수정확정/폐기 시 박힘).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 11:18:02 +09:00
hyungi 371ee4ebe6 feat(presegment): 영구 실패 Chat 알람 — ALERT_FAIL_STAGES allowlist(기본 deep_summary,summarize) 2026-07-03 08:14:12 +09:00
hyungi a55bb3453d feat(presegment): PR3 테스트 — 알람·dedupe·override 검증·재개·무회귀 19건
tests/test_presegment_pr3.py: alerts no-op(env 미설정, 프로세스당 1회 로그)/
synochat·ntfy 포맷/실패 무raise(webhook 전부 fake — 실호출 0), HOLD 알람
발화+alerted_at 7일 dedupe, validate_override_boundaries(정상/dict형/중첩/
캡초과/커버리지 부족/범위 밖/TODO 잔존/공백 경고), leaf_spans 원문 재구성,
units_override 가 tier 판정(plan_summarize_units) 우회하고 map-reduce 재개,
잘못된 override(캡 초과·source_len 불일치)=재-HOLD+알람+LLM 콜 0,
override 없는 소형(단일콜)·whole(HOLD+알람) 문서 무회귀.

기존 test_summarize_units 26 + test_deep_summary_mapreduce 등 인접 100건
pass 유지 (test_pipeline_hold 1건 실패는 main 기존 결함 — 본 PR 무관).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:23 +09:00
hyungi 9061f2e25c feat(presegment): PR3 C — 유인 분할 CLI scripts/presegment_attended.py
컨테이너 안(docker exec)에서 실행하는 3-subcommand CLI:
  list   — awaiting_split 보류 큐 행(문서·tier·over%·토큰·초과 섹션·보류/재개 시각)
  export — 문서 통계+hier 개요(overview.md)·자동 pack 유닛 제안(summarize_units
           재사용)·초과 섹션 (start,end) 스팬+본문 덤프(섹션당 파일, 200K자 분할,
           파일명에 절대 오프셋)·boundaries 템플릿 JSON(자동팩 채움+초과=todo 마커)
           +README(유인 클로드 세션 작업 안내)
  apply  — 경계 검증(단조증가·비중첩·본문 범위·커버리지 90%+공백 경고·유닛 캡
           초과 시 유닛 명시 거부·todo 잔존 거부·source_len 드리프트 거부) 통과 시
           payload.presegment.units_override 기록 + awaiting_split=false +
           deferred_until 제거(즉시 재개) + status pending·alerted_at/map_results
           정리. --dry-run 지원.

stdout = 사람이 읽는 요약 + '{' 로 시작하는 기계 파싱용 JSON 라인.
DB 접속 = 기존 scripts/ 패턴(DATABASE_URL env, backfill_tier.py 동형).
마이그레이션 없음 — payload JSONB 만 사용.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:23 +09:00
hyungi 33427d4a42 feat(presegment): PR3 A+B — HOLD 웹훅 알람 + units_override 재개 경로
A. services/alerts.py 신설 — send_alert(title, message):
   ALERT_WEBHOOK_URL 미설정=no-op(프로세스당 1회 INFO), ALERT_WEBHOOK_KIND
   synochat(기본)|ntfy, httpx 5s, 실패=WARNING만(절대 raise 금지).
   deep_summary HOLD/override 거부 시 발화 — 문서 id·제목·tier·over%·토큰·
   초과 섹션 상위3·재개 예정시각·유인 분할 힌트. dedupe=payload.presegment
   .alerted_at(7일) — 매 24h 재보류마다 재알람 방지.

B. units_override 재개 — payload.presegment.units_override 존재 시 tier
   재판정·HOLD 없이 (start,end,title) 문자 오프셋 경계로 유닛 구성 후 기존
   PR2 map-reduce 그대로(유닛 단위 멱등 commit·reduce·doc 기록). 방어:
   source_len 불일치·형식 오류·유닛 추정토큰 > CAP*1.1 이면 실패 대신
   재-HOLD + 알람(잘못된 override 의 900s 콜 재생산 차단). override 없는
   문서는 기존 경로 무회귀.

summarize_units 에 공용 순수함수 추가: choose_override_source(md_content
우선)·validate_override_boundaries(단조·비중첩·범위·커버리지 90%·유닛 캡)·
units_from_boundaries·leaf_spans + greedy_pack 유닛에 leaf_indexes 기록
(export CLI 스팬 계산용).

plan ds-presegment-mapreduce-2 / env DEEP_SUMMARY_HOLD_RETRY_MINUTES 유지.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:02 +09:00
hyungi b91b05e889 refactor(board): 처리 머신 보드 나스+맥미니 2노드 재구성
2026-07-02 컷오버 반영 — GPU 서버 퇴역, 맥북 night-drain 보류(06-29 결정).

- 레인 2개: 나스(추출/마크다운/청크·임베딩 등 DS 본체 Docker 스테이지),
  맥미니(분류/요약/심층분석 — 단일 생성 LLM 허브 + bge-m3/리랭크)
- summarize 풀 분리(summarize_by_machine·ai_model_version 조인 SQL) 제거
  — FE 유일 소비자 확인 후 응답 스키마에서 정리 (5쿼리 -> 4쿼리)
- 맥북 전제 UI 제거: 요약 오프로드 분담막대·요약 합류 칩·번다운 합류
  변곡점 마커·잠듦 문구·전역 스트립 맥북 칩(맥미니 칩으로 대체)
- deferred_pending = LLM 백오프 신호로 맥미니 카드 귀속 (기능 보존)
- 번다운 차트·정직 ETA·실패 드로어·백그라운드 작업 등 머신 무관 기능 보존
- background_jobs 머신 귀속 기본값 gpu -> nas
- 단위테스트 2노드 기준 재작성 (27 passed)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-02 16:51:32 +09:00
hyungi 304a2b9c0f Merge pull request 'Feat/two node endpoints' (#51) from feat/two-node-endpoints into main
Reviewed-on: #51
2026-07-02 14:31:27 +09:00
hyungi d53fcc2b36 feat(search): MAX_RERANK_INPUT env 조정 가능화 — 2노드 리랭크 지연 대응
맥미니 llama.cpp 리랭크는 후보 수 선형(실측 50=0.60s/200=1.89s) — NAS 배포에서
MAX_RERANK_INPUT=50 으로 tail 지연 축소. 기본 200 = 현행 무회귀.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-02 13:30:04 +09:00
hyungi 43594620b1 fix(tests): rerank fixture 경로 정정 — captured_responses.*.raw 가 실응답 리스트
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-02 13:11:33 +09:00
hyungi b73a5cc601 feat(infra): 2노드 이관 P1-4 — rerank 프로토콜 스위치(tei|llamacpp)·OCR/STT 명시 게이트·413 재홈
- AIModelConfig.protocol 판별자 신설(기본 tei = 무회귀), llamacpp = /v1/rerank
  요청·응답 스키마 정규화(ai/rerank_protocol.py 순수함수 + 단위테스트 4)
- OCR_ENABLED/STT_ENABLED 명시 게이트 — GPU CUDA 서비스(Surya/faster-whisper)
  폐기 대응, silent 아님(경고 로그 + extract_meta 터미널 기록)
- DS Caddyfile request_body 100MB — 413 정책을 edge(home-caddy)에서 내부로 재홈
  (DSM 리버스 프록시 전환 대비, upload.max_bytes 정합)
- SSE X-Accel-Buffering는 기점검 결과 기구현(eid_chat)이라 무변경

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-02 13:11:06 +09:00
30 changed files with 2136 additions and 400 deletions
+8
View File
@@ -19,6 +19,14 @@ http://document.hyungi.net {
Referrer-Policy strict-origin-when-cross-origin Referrer-Policy strict-origin-when-cross-origin
-Server -Server
} }
# 2노드 이관(2026-07-02): 업로드 100MB 한도 집행을 edge(home-caddy)에서 DS 내부로 재홈.
# 인그레스가 DSM 리버스 프록시(한도 GUI 미노출)로 바뀌어도 413 단일 소스 유지.
# config.yaml upload.max_bytes(100000000)와 정합.
request_body {
max_size 100MB
}
encode { encode {
gzip gzip
match { match {
+29 -9
View File
@@ -290,23 +290,43 @@ class AIClient:
return response.json()["embedding"] return response.json()["embedding"]
async def rerank(self, query: str, texts: list[str]) -> list[dict]: async def rerank(self, query: str, texts: list[str]) -> list[dict]:
"""TEI bge-reranker-v2-m3 호출 (Phase 1.3). """리랭커 호출 — ai.models.rerank.protocol 로 백엔드 분기 (2노드 이관 2026-07-02).
TEI POST /rerank API: 공통 반환 계약: [{"index": int, "score": float}, ...] (score 내림차순)
"tei" (기본, 무회귀) — TEI POST /rerank:
request: {"query": str, "texts": [str, ...]} request: {"query": str, "texts": [str, ...]}
response: [{"index": int, "score": float}, ...] (정렬됨) response: [{"index": int, "score": float}, ...] (정렬됨)
"llamacpp" — llama.cpp POST /v1/rerank (bge-reranker GGUF, 맥미니 :8807):
request: {"model": str, "query": str, "documents": [str, ...]}
response: {"results": [{"index": int, "relevance_score": float}, ...]}
→ normalize_llamacpp_rerank 로 TEI 형태 정규화.
미지원 protocol = ValueError (명시 실패 — silent fallback 금지).
timeout은 self.ai.rerank.timeout (config.yaml). timeout은 self.ai.rerank.timeout (config.yaml).
호출자(rerank_service)가 asyncio.Semaphore + try/except로 감쌈. 호출자(rerank_service)가 asyncio.Semaphore + try/except로 감쌈.
""" """
protocol = getattr(self.ai.rerank, "protocol", "tei") or "tei"
timeout = float(self.ai.rerank.timeout) if self.ai.rerank.timeout else 5.0 timeout = float(self.ai.rerank.timeout) if self.ai.rerank.timeout else 5.0
response = await self._http.post( if protocol == "tei":
self.ai.rerank.endpoint, response = await self._http.post(
json={"query": query, "texts": texts}, self.ai.rerank.endpoint,
timeout=timeout, json={"query": query, "texts": texts},
) timeout=timeout,
response.raise_for_status() )
return response.json() response.raise_for_status()
return response.json()
if protocol == "llamacpp":
from ai.rerank_protocol import normalize_llamacpp_rerank
response = await self._http.post(
self.ai.rerank.endpoint,
json={"model": self.ai.rerank.model, "query": query, "documents": texts},
timeout=timeout,
)
response.raise_for_status()
return normalize_llamacpp_rerank(response.json())
raise ValueError(f"unknown rerank protocol: {protocol}")
async def _call_chat(self, model_config, prompt: str) -> str: async def _call_chat(self, model_config, prompt: str) -> str:
"""OpenAI 호환 API 호출 (R6: 무동의 클라우드 폴백 제거). """OpenAI 호환 API 호출 (R6: 무동의 클라우드 폴백 제거).
+24
View File
@@ -0,0 +1,24 @@
"""rerank 백엔드 응답 정규화 — 2노드 이관 (2026-07-02, main-server-retirement-1 P1-4).
TEI(/rerank)와 llama.cpp(/v1/rerank)는 요청/응답 스키마가 다르다.
소비자(rerank_service)는 TEI 형태 [{"index": int, "score": float}]를 기대하므로
llama.cpp 응답을 여기서 정규화한다. 순수 함수(stdlib only) — 단위 테스트 대상.
"""
def normalize_llamacpp_rerank(payload: dict) -> list[dict]:
"""llama.cpp /v1/rerank 응답을 TEI 형태로 정규화.
입력: {"results": [{"index": int, "relevance_score": float}, ...], ...}
반환: [{"index": int, "score": float}, ...] (score 내림차순 — TEI '정렬됨' 계약 유지)
index/relevance_score 가 없는 항목은 버린다 (소비자 측 idx/sc None 가드와 동일 방어).
"""
results = payload.get("results") or []
normalized = [
{"index": r["index"], "score": float(r["relevance_score"])}
for r in results
if r.get("index") is not None and r.get("relevance_score") is not None
]
normalized.sort(key=lambda r: -r["score"])
return normalized
+2 -17
View File
@@ -37,8 +37,8 @@ class CurrentItem(BaseModel):
class MachineCard(BaseModel): class MachineCard(BaseModel):
"""머신 카드 — stage 귀속 합산 + 완료 실적(summarize 는 풀 분리) + state.""" """머신 카드 — stage 귀속 합산 + 완료 실적 + state (나스/맥미니 2노드)."""
key: Literal["gpu", "macmini", "macbook"] key: Literal["nas", "macmini"]
label: str label: str
state: Literal["active", "deferred", "idle"] state: Literal["active", "deferred", "idle"]
stages: list[str] stages: list[str]
@@ -59,20 +59,6 @@ class SummarizeEta(BaseModel):
eta_minutes: int | None eta_minutes: int | None
class MachineDone(BaseModel):
"""머신 1대의 summarize 완료 실적 (분담 표시용)."""
done_1h: int
done_today: int
class SummarizeByMachine(BaseModel):
"""summarize 풀의 머신별 완료 실적 분담 — 보드 레인의 '맥미니 vs 맥북'
오프로드 가시화용. rows_to_summarize_split 이 이미 계산하던 값의 노출
(ds-board-merged A-1, 신규 수집 SQL 0)."""
macmini: MachineDone
macbook: MachineDone
class TrendBucket(BaseModel): class TrendBucket(BaseModel):
"""summarize 24h 추이 버킷 — hour 는 KST "HH:00" 라벨.""" """summarize 24h 추이 버킷 — hour 는 KST "HH:00" 라벨."""
hour: str hour: str
@@ -122,7 +108,6 @@ class QueueOverviewResponse(BaseModel):
machines: list[MachineCard] machines: list[MachineCard]
stages: list[StageRow] stages: list[StageRow]
summarize_eta: SummarizeEta summarize_eta: SummarizeEta
summarize_by_machine: SummarizeByMachine
trend_24h: list[TrendBucket] trend_24h: list[TrendBucket]
totals: Totals totals: Totals
background_jobs: list[BackgroundJobItem] = [] background_jobs: list[BackgroundJobItem] = []
+11 -1
View File
@@ -249,7 +249,12 @@ async def approve_batch(
StudyMemoCard.deleted_at.is_(None), StudyMemoCard.deleted_at.is_(None),
StudyMemoCard.needs_review, StudyMemoCard.needs_review,
) )
.values(needs_review=False, flagged_by=None, flagged_at=None) .values(
needs_review=False,
flagged_by=None,
flagged_at=None,
reviewed_at=datetime.now(timezone.utc),
)
.returning(StudyMemoCard.id) .returning(StudyMemoCard.id)
) )
approved_ids = list(result.scalars().all()) approved_ids = list(result.scalars().all())
@@ -397,6 +402,7 @@ async def update_card(
card.needs_review = False card.needs_review = False
card.flagged_by = None card.flagged_by = None
card.flagged_at = None card.flagged_at = None
card.reviewed_at = datetime.now(timezone.utc)
elif "needs_review" in fields_set: elif "needs_review" in fields_set:
card.needs_review = bool(body.needs_review) card.needs_review = bool(body.needs_review)
if card.needs_review: if card.needs_review:
@@ -405,6 +411,7 @@ async def update_card(
else: else:
card.flagged_by = None card.flagged_by = None
card.flagged_at = None card.flagged_at = None
card.reviewed_at = datetime.now(timezone.utc)
# 발행 재투영/tombstone(같은 tx) — 검수완료=발행·검수대기복귀=tombstone(상태 기반). S-2. # 발행 재투영/tombstone(같은 tx) — 검수완료=발행·검수대기복귀=tombstone(상태 기반). S-2.
if settings.study_publish_enabled: if settings.study_publish_enabled:
@@ -431,6 +438,9 @@ async def delete_card(
card = await session.get(StudyMemoCard, card_id) card = await session.get(StudyMemoCard, card_id)
card = _verify_card(card, user) card = _verify_card(card, user)
card.deleted_at = datetime.now(timezone.utc) card.deleted_at = datetime.now(timezone.utc)
if card.needs_review:
# 검수 대기분의 폐기 = 검수 처리의 한 형태 ('오늘의 몫' 검수 집계 포함).
card.reviewed_at = card.deleted_at
# 발행 tombstone(같은 tx) — 삭제는 feed 1급 이벤트. S-2. # 발행 tombstone(같은 tx) — 삭제는 feed 1급 이벤트. S-2.
if settings.study_publish_enabled: if settings.study_publish_enabled:
await enqueue_card_publish(session, card) await enqueue_card_publish(session, card)
+13
View File
@@ -16,6 +16,7 @@ from core.database import get_session
from models.user import User from models.user import User
from services.study import concept_curriculum as cc from services.study import concept_curriculum as cc
from services.study import concept_links as cl from services.study import concept_links as cl
from services.study import daily_unit as du
router = APIRouter() router = APIRouter()
@@ -23,6 +24,18 @@ router = APIRouter()
DEFAULT_TOPIC_ID = 4 DEFAULT_TOPIC_ID = 4
@router.get("/daily")
async def get_daily(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
topic_id: int = DEFAULT_TOPIC_ID,
):
"""오늘의 몫(유한한 데일리 단위) + 스트릭/잔디 — 습관 루프 홈 재료. read-only."""
state = await du.daily_state(session, user.id, topic_id)
sg = await du.streak_and_grass(session, user.id)
return {**state, **sg}
@router.get("/curriculum") @router.get("/curriculum")
async def get_curriculum( async def get_curriculum(
user: Annotated[User, Depends(get_current_user)], user: Annotated[User, Depends(get_current_user)],
+21
View File
@@ -35,6 +35,12 @@ class AIModelConfig(BaseModel):
# OpenAI 호환 분기(mlx)만 적용 — Anthropic 분기는 미적용(별 범위). # OpenAI 호환 분기(mlx)만 적용 — Anthropic 분기는 미적용(별 범위).
repetition_penalty: float | None = None repetition_penalty: float | None = None
top_k: int | None = None top_k: int | None = None
# 2노드 이관 (2026-07-02): rerank 백엔드 프로토콜 판별자.
# "tei" = TEI POST /rerank {"query","texts"} → [{"index","score"}] (기본, 무회귀)
# "llamacpp" = llama.cpp POST /v1/rerank {"model","query","documents"}
# → {"results":[{"index","relevance_score"}]} (맥미니 :8807)
# 미지원 값 = client.rerank 가 ValueError (silent fallback 금지). rerank 블록 외 무시.
protocol: str = "tei"
class DeepSummaryBacklogConfig(BaseModel): class DeepSummaryBacklogConfig(BaseModel):
@@ -145,6 +151,12 @@ class Settings(BaseModel):
# STT (faster-whisper, §3) # STT (faster-whisper, §3)
stt_endpoint: str = "http://stt-service:3300" stt_endpoint: str = "http://stt-service:3300"
# 2노드 이관 (2026-07-02): GPU CUDA 서비스(Surya OCR / faster-whisper) 폐기 대응 명시 게이트.
# false = 해당 경로 명시 비활성 — OCR 은 _call_ocr 이 경고 로그 후 None(기존 soft-fail 의미론),
# STT 는 터미널 skip + extract_meta 기록. silent 저품질 fallback 아님 (로그/메타로 가시).
ocr_enabled: bool = True
stt_enabled: bool = True
# §3 file_watcher: Roon 음원 경로 (prefix match 로 skip). # §3 file_watcher: Roon 음원 경로 (prefix match 로 skip).
# 빈 문자열이면 skip 없음. 예: "/documents/PKM/../Music/roon-library" 또는 # 빈 문자열이면 skip 없음. 예: "/documents/PKM/../Music/roon-library" 또는
# NFS 경유 별도 마운트된 Roon 라이브러리. # NFS 경유 별도 마운트된 Roon 라이브러리.
@@ -197,6 +209,9 @@ class Settings(BaseModel):
maintenance_note: str = "" maintenance_note: str = ""
# 뷰어 write-back ingest(study-to-viewer P2) 게이트. /ingest/study/attempts 활성. 기본 false=inert(503). # 뷰어 write-back ingest(study-to-viewer P2) 게이트. /ingest/study/attempts 활성. 기본 false=inert(503).
study_ingest_enabled: bool = False study_ingest_enabled: bool = False
# 습관 루프(2026-07-03 D1): '오늘의 몫' 아침 리마인더 → Synology Chat incoming webhook URL.
# 빈 값 = 발신 안 함 (명시적 off = kill switch. study_reminder 워커가 skip 로그 남김).
study_reminder_webhook_url: str = ""
# internal endpoint Bearer token (Mac mini derived-worker 호출용) # internal endpoint Bearer token (Mac mini derived-worker 호출용)
internal_worker_token: str = "" internal_worker_token: str = ""
@@ -216,6 +231,7 @@ def load_settings() -> Settings:
maintenance_mode = os.getenv("MAINTENANCE_MODE", "false").lower() in ("1", "true", "yes") maintenance_mode = os.getenv("MAINTENANCE_MODE", "false").lower() in ("1", "true", "yes")
maintenance_note = os.getenv("MAINTENANCE_NOTE", "") maintenance_note = os.getenv("MAINTENANCE_NOTE", "")
study_ingest_enabled = os.getenv("STUDY_INGEST_ENABLED", "false").lower() in ("1", "true", "yes") study_ingest_enabled = os.getenv("STUDY_INGEST_ENABLED", "false").lower() in ("1", "true", "yes")
study_reminder_webhook_url = os.getenv("STUDY_REMINDER_WEBHOOK_URL", "")
internal_worker_token = os.getenv("INTERNAL_WORKER_TOKEN", "") internal_worker_token = os.getenv("INTERNAL_WORKER_TOKEN", "")
viewer_sync_token = os.getenv("VIEWER_SYNC_TOKEN", "") viewer_sync_token = os.getenv("VIEWER_SYNC_TOKEN", "")
jwt_secret = os.getenv("JWT_SECRET", "") jwt_secret = os.getenv("JWT_SECRET", "")
@@ -224,6 +240,8 @@ def load_settings() -> Settings:
kordoc_endpoint = os.getenv("KORDOC_ENDPOINT", "http://kordoc-service:3100") kordoc_endpoint = os.getenv("KORDOC_ENDPOINT", "http://kordoc-service:3100")
ocr_endpoint = os.getenv("OCR_ENDPOINT", "http://ocr-service:3200") ocr_endpoint = os.getenv("OCR_ENDPOINT", "http://ocr-service:3200")
stt_endpoint = os.getenv("STT_ENDPOINT", "http://stt-service:3300") stt_endpoint = os.getenv("STT_ENDPOINT", "http://stt-service:3300")
ocr_enabled = os.getenv("OCR_ENABLED", "true").lower() in ("1", "true", "yes")
stt_enabled = os.getenv("STT_ENABLED", "true").lower() in ("1", "true", "yes")
roon_library_path = os.getenv("ROON_LIBRARY_PATH", "") roon_library_path = os.getenv("ROON_LIBRARY_PATH", "")
# ADDITIONAL_WATCH_TARGETS — 쉼표 구분 (공백 제거) # ADDITIONAL_WATCH_TARGETS — 쉼표 구분 (공백 제거)
@@ -343,6 +361,8 @@ def load_settings() -> Settings:
kordoc_endpoint=kordoc_endpoint, kordoc_endpoint=kordoc_endpoint,
ocr_endpoint=ocr_endpoint, ocr_endpoint=ocr_endpoint,
stt_endpoint=stt_endpoint, stt_endpoint=stt_endpoint,
ocr_enabled=ocr_enabled,
stt_enabled=stt_enabled,
roon_library_path=roon_library_path, roon_library_path=roon_library_path,
additional_watch_targets=additional_watch_targets, additional_watch_targets=additional_watch_targets,
taxonomy=taxonomy, taxonomy=taxonomy,
@@ -355,6 +375,7 @@ def load_settings() -> Settings:
maintenance_mode=maintenance_mode, maintenance_mode=maintenance_mode,
maintenance_note=maintenance_note, maintenance_note=maintenance_note,
study_ingest_enabled=study_ingest_enabled, study_ingest_enabled=study_ingest_enabled,
study_reminder_webhook_url=study_reminder_webhook_url,
internal_worker_token=internal_worker_token, internal_worker_token=internal_worker_token,
viewer_sync_token=viewer_sync_token, viewer_sync_token=viewer_sync_token,
pipeline_held_stages=pipeline_held_stages, pipeline_held_stages=pipeline_held_stages,
+2
View File
@@ -65,6 +65,8 @@ class StudyMemoCard(Base):
needs_review: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) needs_review: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
flagged_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) flagged_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
flagged_by: Mapped[str | None] = mapped_column(String(40)) flagged_by: Mapped[str | None] = mapped_column(String(40))
# 검수 처리 시각 — 승인/수정확정/soft-delete 시 박힘 (migration 383, '오늘의 몫' 검수 집계).
reviewed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
model: Mapped[str | None] = mapped_column(String(120)) model: Mapped[str | None] = mapped_column(String(120))
generated_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) generated_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
+69
View File
@@ -0,0 +1,69 @@
"""운영 알람 webhook (presegment PR3 — HOLD 유인 전환 게이트).
deep_summary HOLD(awaiting_split) 처럼 "사람이 개입해야 풀리는" 상태를 웹훅으로 발화한다.
환경변수:
ALERT_WEBHOOK_URL — 미설정 = no-op (프로세스당 1회 INFO 로그만).
ALERT_WEBHOOK_KIND — 'synochat' | 'ntfy' (기본 synochat).
synochat: Synology Chat incoming webhook — POST form `payload={"text": "..."}`.
ntfy: POST body=message. 제목은 query param(?title=) — HTTP 헤더는 latin-1
한정이라 한글 제목이 깨진다 (ntfy 는 query param title 을 공식 지원).
불변식: 알람은 절대 raise 하지 않는다 — 실패는 WARNING 로그만. 알람이 워커를
죽이면 본전(요약 파이프라인)이 무너진다. 호출부는 반환값(bool)에 의존하지 말 것.
"""
from __future__ import annotations
import json
import os
import httpx
from core.utils import setup_logger
logger = setup_logger("alerts")
ALERT_TIMEOUT_SECONDS = 5.0
# 프로세스당 1회만 "미설정 no-op" 로그 — 매 HOLD 마다 로그 오염 방지.
_noop_logged = False
async def send_alert(title: str, message: str) -> bool:
"""webhook 으로 알람 1건 발화. 성공 True / no-op·실패 False. 절대 raise 금지."""
global _noop_logged
url = (os.getenv("ALERT_WEBHOOK_URL") or "").strip()
if not url:
if not _noop_logged:
logger.info("ALERT_WEBHOOK_URL 미설정 — 알람 no-op (이 로그는 프로세스당 1회)")
_noop_logged = True
return False
kind = (os.getenv("ALERT_WEBHOOK_KIND") or "synochat").strip().lower()
if kind not in ("synochat", "ntfy"):
logger.warning(f"ALERT_WEBHOOK_KIND={kind!r} 미지원 — synochat 으로 폴백")
kind = "synochat"
try:
async with httpx.AsyncClient(timeout=ALERT_TIMEOUT_SECONDS) as client:
if kind == "ntfy":
resp = await client.post(
url,
params={"title": title},
content=message.encode("utf-8"),
)
else: # synochat
text = f"{title}\n{message}" if title else message
resp = await client.post(
url,
data={"payload": json.dumps({"text": text}, ensure_ascii=False)},
)
if resp.status_code >= 400:
logger.warning(
f"알람 webhook({kind}) HTTP {resp.status_code}: {resp.text[:200]}"
)
return False
return True
except Exception as exc: # noqa: BLE001 — 알람 실패가 워커를 죽이면 안 됨
logger.warning(f"알람 webhook({kind}) 발화 실패: {exc}")
return False
+34 -112
View File
@@ -3,19 +3,16 @@
GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue / GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue /
documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약). documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약).
구조: SQL 수집부(build_overview 내부 5쿼리)와 판정부(순수 함수)를 분리. 구조: SQL 수집부(build_overview 내부 4쿼리)와 판정부(순수 함수)를 분리.
판정부(rows_to_* / build_machines / build_summarize_eta / build_trend / 판정부(rows_to_* / build_machines / build_summarize_eta / build_trend /
build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능. build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능.
귀속 규칙 (단일 진실): 귀속 규칙 (단일 진실 — 2026-07-02 컷오버 후 나스+맥미니 2노드):
- stage→machine 정적 맵: gpu = extract/embed/chunk/markdown/preview/thumbnail/ - stage→machine 정적 맵: nas = extract/embed/chunk/markdown/preview/thumbnail/
fulltext/stt · macmini = classify/summarize · macbook = deep_summary fulltext/stt (DS 본체 Docker — 임베딩·리랭크 모델 콜은 맥미니로 나감) ·
(단, settings.ai.deep 부재 시 deep_summary 도 macmini 귀속). macmini = classify/summarize/deep_summary (단일 생성 LLM 허브).
- summarize 는 풀(pool): pending/processing/failed 는 macmini 귀속이되, 완료 - deferred_pending(payload.deferred_until 미래)은 LLM 백오프 신호 —
실적(done_*)은 documents.ai_model_version 조인으로 분리 — 'qwen-macbook' summarize/deep_summary 소속인 macmini 카드 귀속.
이면 macbook 실적, 아니면 macmini 실적.
- deferred_pending(payload.deferred_until 미래)은 macbook 카드 귀속
(보류 = 맥북 불가 신호).
""" """
from datetime import datetime, timedelta from datetime import datetime, timedelta
@@ -25,42 +22,33 @@ from zoneinfo import ZoneInfo
from sqlalchemy import bindparam, text from sqlalchemy import bindparam, text
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
KST = ZoneInfo("Asia/Seoul") KST = ZoneInfo("Asia/Seoul")
# 내부 판별용 alias — 응답에 raw 모델명 노출 금지, 머신 label 만 노출.
_MACBOOK_MODEL_ALIAS = "qwen-macbook"
# stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서) # stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서)
_GPU_STAGES = ( _NAS_STAGES = (
"extract", "embed", "chunk", "markdown", "extract", "embed", "chunk", "markdown",
"preview", "thumbnail", "fulltext", "stt", "preview", "thumbnail", "fulltext", "stt",
) )
_MACMINI_STAGES = ("classify", "summarize") _MACMINI_STAGES = ("classify", "summarize", "deep_summary")
_MACBOOK_STAGES = ("deep_summary",) _STAGE_ORDER = _NAS_STAGES + _MACMINI_STAGES
_STAGE_ORDER = _GPU_STAGES + _MACMINI_STAGES + _MACBOOK_STAGES
_MACHINE_KEYS = ("gpu", "macmini", "macbook") _MACHINE_KEYS = ("nas", "macmini")
_MACHINE_LABELS = { _MACHINE_LABELS = {
"gpu": "GPU 서버", "nas": "나스",
"macmini": "맥미니", "macmini": "맥미니",
"macbook": "맥북 M5 Max",
} }
# 머신 카드당 current 표시 상한 # 머신 카드당 current 표시 상한
_CURRENT_LIMIT = 2 _CURRENT_LIMIT = 2
def stage_machine_map(deep_enabled: bool) -> dict[str, str]: def stage_machine_map() -> dict[str, str]:
"""stage → machine key 맵. deep 슬롯 부재 시 deep_summary 는 macmini 귀속.""" """stage → machine key 맵 (정적 — 나스/맥미니 2노드)."""
mapping: dict[str, str] = {} mapping: dict[str, str] = {}
for s in _GPU_STAGES: for s in _NAS_STAGES:
mapping[s] = "gpu" mapping[s] = "nas"
for s in _MACMINI_STAGES: for s in _MACMINI_STAGES:
mapping[s] = "macmini" mapping[s] = "macmini"
for s in _MACBOOK_STAGES:
mapping[s] = "macbook" if deep_enabled else "macmini"
return mapping return mapping
@@ -90,23 +78,6 @@ def rows_to_stage_stats(rows) -> dict[str, dict]:
return stats return stats
def rows_to_summarize_split(rows) -> dict[str, dict]:
"""summarize 완료 실적 분리 쿼리 행 → {"macbook"|"macmini": {done_*}}.
is_macbook = documents.ai_model_version 이 'qwen-macbook' 인지 (내부 판별 전용).
"""
split = {
"macbook": {"done_1h": 0, "done_today": 0, "done_15m": 0},
"macmini": {"done_1h": 0, "done_today": 0, "done_15m": 0},
}
for row in rows:
key = "macbook" if row[0] else "macmini"
split[key]["done_1h"] += int(row[1] or 0)
split[key]["done_today"] += int(row[2] or 0)
split[key]["done_15m"] += int(row[3] or 0)
return split
def display_title(row: dict) -> str: def display_title(row: dict) -> str:
"""표시용 제목 — title > original_filename > file_path basename > 문서 id.""" """표시용 제목 — title > original_filename > file_path basename > 문서 id."""
if row.get("title"): if row.get("title"):
@@ -120,13 +91,10 @@ def display_title(row: dict) -> str:
def build_machines( def build_machines(
stage_stats: dict[str, dict], stage_stats: dict[str, dict],
summarize_split: dict[str, dict],
current_rows: list[dict], current_rows: list[dict],
*,
deep_enabled: bool,
) -> list[dict]: ) -> list[dict]:
"""머신 카드 3장 (gpu / macmini / macbook) 구성 — 귀속 규칙의 판정부.""" """머신 카드 2장 (nas / macmini) 구성 — 귀속 규칙의 판정부."""
smap = stage_machine_map(deep_enabled) smap = stage_machine_map()
def g(stage: str, field: str) -> int: def g(stage: str, field: str) -> int:
return stage_stats.get(stage, {}).get(field, 0) return stage_stats.get(stage, {}).get(field, 0)
@@ -149,29 +117,23 @@ def build_machines(
pending = sum(g(s, "pending") for s in stages) pending = sum(g(s, "pending") for s in stages)
processing = sum(g(s, "processing") for s in stages) processing = sum(g(s, "processing") for s in stages)
failed = sum(g(s, "failed") for s in stages) failed = sum(g(s, "failed") for s in stages)
done_1h = sum(g(s, "done_1h") for s in stages)
done_today = sum(g(s, "done_today") for s in stages)
done_15m = sum(g(s, "done_15m") for s in stages)
# 완료 실적: summarize 는 풀이라 stage 합산에서 제외하고 split 로 귀속 # 보류 백오프 = LLM 불가 신호 → LLM stage 소속인 macmini 카드 귀속
done_1h = sum(g(s, "done_1h") for s in stages if s != "summarize")
done_today = sum(g(s, "done_today") for s in stages if s != "summarize")
done_15m = sum(g(s, "done_15m") for s in stages if s != "summarize")
if key in summarize_split:
done_1h += summarize_split[key]["done_1h"]
done_today += summarize_split[key]["done_today"]
done_15m += summarize_split[key]["done_15m"]
# 보류 백오프 = 맥북 불가 신호 → macbook 카드 귀속 (deep 슬롯 유무 무관)
deferred_pending = ( deferred_pending = (
g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending") g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending")
if key == "macbook" else 0 if key == "macmini" else 0
) )
# state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11). # state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11).
# 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" — # 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" —
# 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은 # 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은
# 실제로 일이 멈춰 있고 백오프만 쌓인 상태(sleep/불가 지속)에서만. # 실제로 일이 멈춰 있고 백오프만 쌓인 상태(LLM 허브 불가 지속)에서만.
if processing > 0 or done_15m > 0: if processing > 0 or done_15m > 0:
state = "active" state = "active"
elif key == "macbook" and deferred_pending > 0: elif deferred_pending > 0:
state = "deferred" state = "deferred"
else: else:
state = "idle" state = "idle"
@@ -213,16 +175,6 @@ def build_summarize_eta(stage_stats: dict[str, dict]) -> dict:
} }
def build_summarize_by_machine(summarize_split: dict[str, dict]) -> dict:
"""summarize 머신별 완료 실적 분담 (macmini vs macbook) — 보드 레인의
오프로드 가시화용. rows_to_summarize_split 이 이미 만든 값을 응답 형태로
투영(done_1h/done_today 만, done_15m 은 내부 state 판정 전용이라 제외)."""
def m(key: str) -> dict:
s = summarize_split.get(key, {})
return {"done_1h": int(s.get("done_1h", 0)), "done_today": int(s.get("done_today", 0))}
return {"macmini": m("macmini"), "macbook": m("macbook")}
def build_trend( def build_trend(
inflow_buckets: dict[str, int], inflow_buckets: dict[str, int],
done_buckets: dict[str, int], done_buckets: dict[str, int],
@@ -287,28 +239,23 @@ def build_totals(stage_stats: dict[str, dict]) -> dict:
def compose_overview( def compose_overview(
stage_stats: dict[str, dict], stage_stats: dict[str, dict],
summarize_split: dict[str, dict],
inflow_buckets: dict[str, int], inflow_buckets: dict[str, int],
done_buckets: dict[str, int], done_buckets: dict[str, int],
current_rows: list[dict], current_rows: list[dict],
*, *,
deep_enabled: bool,
now_kst: datetime, now_kst: datetime,
) -> dict: ) -> dict:
"""수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요.""" """수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요."""
return { return {
"machines": build_machines( "machines": build_machines(stage_stats, current_rows),
stage_stats, summarize_split, current_rows, deep_enabled=deep_enabled
),
"stages": build_stages(stage_stats), "stages": build_stages(stage_stats),
"summarize_eta": build_summarize_eta(stage_stats), "summarize_eta": build_summarize_eta(stage_stats),
"summarize_by_machine": build_summarize_by_machine(summarize_split),
"trend_24h": build_trend(inflow_buckets, done_buckets, now_kst), "trend_24h": build_trend(inflow_buckets, done_buckets, now_kst),
"totals": build_totals(stage_stats), "totals": build_totals(stage_stats),
} }
# ─── SQL 수집부 (총 5쿼리) ──────────────────────────────────────────────────── # ─── SQL 수집부 (총 4쿼리) ────────────────────────────────────────────────────
# 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방) # 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방)
_STAGE_STATS_SQL = """ _STAGE_STATS_SQL = """
@@ -333,23 +280,7 @@ _STAGE_STATS_SQL = """
GROUP BY stage GROUP BY stage
""" """
# 2) summarize 풀 완료 실적 분리 (documents.ai_model_version 조인, 1방) # 2/3) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
# 스캔 하한 = 오늘 0시(KST)와 1h 전 중 더 이른 시각 (자정 직후 1h 창 보전).
_SUMMARIZE_SPLIT_SQL = """
SELECT
COALESCE(d.ai_model_version = :macbook_alias, false) AS is_macbook,
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '1 hour') AS done_1h,
COUNT(*) FILTER (WHERE q.completed_at > :kst_midnight) AS done_today,
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '15 minutes') AS done_15m
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.stage = 'summarize'
AND q.status = 'completed'
AND q.completed_at > LEAST(:kst_midnight, NOW() - INTERVAL '1 hour')
GROUP BY 1
"""
# 3/4) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
_TREND_INFLOW_SQL = """ _TREND_INFLOW_SQL = """
SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'), SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'),
'YYYY-MM-DD HH24:00') AS bucket, 'YYYY-MM-DD HH24:00') AS bucket,
@@ -371,7 +302,7 @@ _TREND_DONE_SQL = """
GROUP BY 1 GROUP BY 1
""" """
# 5) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서) # 4) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서)
_CURRENT_SQL = """ _CURRENT_SQL = """
SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path
FROM processing_queue q FROM processing_queue q
@@ -383,20 +314,13 @@ _CURRENT_SQL = """
async def build_overview(session: AsyncSession) -> dict: async def build_overview(session: AsyncSession) -> dict:
"""5쿼리 수집 → compose_overview 판정 → 응답 dict.""" """4쿼리 수집 → compose_overview 판정 → 응답 dict."""
now_kst = datetime.now(KST) now_kst = datetime.now(KST)
kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0) kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0)
deep_enabled = settings.ai is not None and settings.ai.deep is not None
stage_rows = ( stage_rows = (
await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight}) await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight})
).all() ).all()
split_rows = (
await session.execute(
text(_SUMMARIZE_SPLIT_SQL),
{"kst_midnight": kst_midnight, "macbook_alias": _MACBOOK_MODEL_ALIAS},
)
).all()
inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all() inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all()
done_rows = (await session.execute(text(_TREND_DONE_SQL))).all() done_rows = (await session.execute(text(_TREND_DONE_SQL))).all()
current_result = (await session.execute(text(_CURRENT_SQL))).all() current_result = (await session.execute(text(_CURRENT_SQL))).all()
@@ -414,11 +338,9 @@ async def build_overview(session: AsyncSession) -> dict:
result = compose_overview( result = compose_overview(
rows_to_stage_stats(stage_rows), rows_to_stage_stats(stage_rows),
rows_to_summarize_split(split_rows),
{row[0]: int(row[1]) for row in inflow_rows}, {row[0]: int(row[1]) for row in inflow_rows},
{row[0]: int(row[1]) for row in done_rows}, {row[0]: int(row[1]) for row in done_rows},
current_rows, current_rows,
deep_enabled=deep_enabled,
now_kst=now_kst, now_kst=now_kst,
) )
# 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]). # 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]).
@@ -426,13 +348,13 @@ async def build_overview(session: AsyncSession) -> dict:
return result return result
# kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = gpu(오케스트레이션 호스트). # kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = nas(오케스트레이션 호스트).
_BG_JOB_MACHINE = { _BG_JOB_MACHINE = {
"global_digest": "macmini", "global_digest": "macmini",
"morning_briefing": "macmini", "morning_briefing": "macmini",
"section_summary": "macmini", "section_summary": "macmini",
"hier_backfill": "gpu", "hier_backfill": "nas",
"hier_redecompose": "gpu", "hier_redecompose": "nas",
} }
@@ -466,7 +388,7 @@ async def _fetch_background_jobs(session: AsyncSession) -> list[dict]:
"processed": int(r["processed"] or 0), "total": r["total"], "processed": int(r["processed"] or 0), "total": r["total"],
"elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]), "elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]),
"error": r["error"], "error": r["error"],
"machine": _BG_JOB_MACHINE.get(r["kind"], "gpu"), "machine": _BG_JOB_MACHINE.get(r["kind"], "nas"),
} }
for r in rows for r in rows
] ]
+6 -2
View File
@@ -17,6 +17,7 @@ snippet 생성:
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import os
import re import re
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@@ -33,8 +34,11 @@ logger = setup_logger("rerank")
# 동시 rerank 호출 제한 (GPU saturation 방지) # 동시 rerank 호출 제한 (GPU saturation 방지)
RERANK_SEMAPHORE = asyncio.Semaphore(2) RERANK_SEMAPHORE = asyncio.Semaphore(2)
# rerank input 크기 제한 (latency / VRAM hard cap) # rerank input 크기 제한 (latency / VRAM hard cap).
MAX_RERANK_INPUT = 200 # 2노드 이관(2026-07-02): env MAX_RERANK_INPUT 로 조정 가능 — 맥미니 llama.cpp 리랭크는
# 후보 수에 선형(NAS발 실측 50=0.60s / 100=0.95s / 200=1.89s)이라 NAS 배포는 50 권장.
# 기본 200 = 현행(GPU TEI) 무회귀.
MAX_RERANK_INPUT = int(os.getenv("MAX_RERANK_INPUT", "200"))
MAX_CHUNKS_PER_DOC = 2 MAX_CHUNKS_PER_DOC = 2
# Soft timeout (초) # Soft timeout (초)
+215
View File
@@ -0,0 +1,215 @@
"""daily_unit — '오늘의 몫' (유한한 데일리 학습 단위) 집계 + 스트릭/잔디.
습관 루프 트랙(2026-07-03): 문제엔진·SR·이론 ·암기카드까지 완성인데 organic 사용 0.
진단 = 트리거 없음 + 열면 due 산더미 + 반복의 감각 부재. 모듈은 '유한한 단위'
'반복의 감각(스트릭)' read-only 재료를 만든다. write 0 (기존 attempt/rate/read 정본).
targets 기본(사용자 합의 D4) = 문제 5 · 카드 5 · 개념 1 · 카드검수 3.
가용량(effective) 보정: 남은 재료가 target 미만이면 target 낮춰 '영영 완주 불가' 방지.
'오늘' 경계 = KST(Asia/Seoul, 스케줄러 tz 동일). DB 저장은 UTC 경계 변환 비교.
스트릭/잔디 근사 주의: 카드 평가는 per-event 로그가 없어(progress.last_reviewed_at )
과거일이 과소집계될 있다(같은 카드 재평가 이전 날짜 증발). 문제풀이·회독은 전이력
정확. 오늘 몫에 문제풀이가 포함되므로 완주일은 attempts 정확히 남는다.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from sqlalchemy import and_, func, or_, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from models.study_concept_progress import StudyConceptProgress
from models.study_memo_card import StudyMemoCard
from models.study_memo_card_progress import StudyMemoCardProgress
from models.study_question import StudyQuestionAttempt
from services.study import concept_curriculum as cc
KST = ZoneInfo("Asia/Seoul")
# 기본 targets (사용자 합의 2026-07-03 D4 — 착수 장벽 최소화가 관건이라 소량).
TARGET_QUESTIONS = 5
TARGET_CARDS = 5
TARGET_CONCEPTS = 1
TARGET_REVIEWS = 3
GRASS_DAYS = 84 # 12주
def kst_day_start(now: datetime) -> datetime:
"""오늘(KST) 0시의 UTC datetime."""
local = now.astimezone(KST)
start_local = local.replace(hour=0, minute=0, second=0, microsecond=0)
return start_local.astimezone(timezone.utc)
async def _count(session: AsyncSession, stmt) -> int:
return (await session.execute(stmt)).scalar_one()
async def daily_state(
session: AsyncSession, user_id: int, topic_id: int, now: datetime | None = None
) -> dict:
"""오늘의 몫 상태 — 항목별 done/target + 완주 여부 + 다음 개념 CTA 재료."""
now = now or datetime.now(timezone.utc)
start = kst_day_start(now)
# ── 금일 완료량 ──
q_done = await _count(
session,
select(func.count())
.select_from(StudyQuestionAttempt)
.where(
StudyQuestionAttempt.user_id == user_id,
StudyQuestionAttempt.answered_at >= start,
),
)
card_done = await _count(
session,
select(func.count())
.select_from(StudyMemoCardProgress)
.where(
StudyMemoCardProgress.user_id == user_id,
StudyMemoCardProgress.last_reviewed_at.is_not(None),
StudyMemoCardProgress.last_reviewed_at >= start,
),
)
concept_done = await _count(
session,
select(func.count())
.select_from(StudyConceptProgress)
.where(
StudyConceptProgress.user_id == user_id,
StudyConceptProgress.last_read_at.is_not(None),
StudyConceptProgress.last_read_at >= start,
),
)
review_done = await _count(
session,
select(func.count())
.select_from(StudyMemoCard)
.where(
StudyMemoCard.user_id == user_id,
StudyMemoCard.reviewed_at.is_not(None),
StudyMemoCard.reviewed_at >= start,
),
)
# ── 가용량 (effective target 보정용) ──
# 카드: /study-cards/due 와 동일 술어(신규 progress 없음 OR 예정 due) — cold-start 정합.
P = StudyMemoCardProgress
card_avail = await _count(
session,
select(func.count())
.select_from(StudyMemoCard)
.outerjoin(P, and_(P.card_id == StudyMemoCard.id, P.user_id == user_id))
.where(
StudyMemoCard.user_id == user_id,
StudyMemoCard.deleted_at.is_(None),
StudyMemoCard.needs_review.is_(False),
or_(
P.id.is_(None),
and_(
P.due_at.is_not(None),
P.due_at <= now,
or_(P.review_stage.is_(None), P.review_stage < 4),
),
),
),
)
review_avail = await _count(
session,
select(func.count())
.select_from(StudyMemoCard)
.where(
StudyMemoCard.user_id == user_id,
StudyMemoCard.deleted_at.is_(None),
StudyMemoCard.needs_review.is_(True),
),
)
# 개념: today_concepts 재사용 (재복습 due + 미독) — 첫 항목은 CTA 딥링크 재료.
today = await cc.today_concepts(session, user_id, topic_id, limit=1)
concept_avail = today["due_total"] + today["unread_total"]
next_concept = today["concepts"][0] if today["concepts"] else None
def eff(target: int, done: int, avail: int) -> int:
# 남은 재료가 target 에 못 미치면 낮춤 (완주 가능 보장). 이미 done>=target 이면 target.
return min(target, done + avail)
items = {
"questions": {"done": q_done, "target": TARGET_QUESTIONS}, # 문제는 2,100 — 가용 보정 불요
"cards": {"done": card_done, "target": eff(TARGET_CARDS, card_done, card_avail)},
"concepts": {"done": concept_done, "target": eff(TARGET_CONCEPTS, concept_done, concept_avail)},
"reviews": {"done": review_done, "target": eff(TARGET_REVIEWS, review_done, review_avail)},
}
for it in items.values():
it["complete"] = it["target"] == 0 or it["done"] >= it["target"]
active_targets = [it for it in items.values() if it["target"] > 0]
complete = bool(active_targets) and all(it["complete"] for it in active_targets)
return {
"topic_id": topic_id,
"date_kst": now.astimezone(KST).date().isoformat(),
"items": items,
"complete": complete,
"next_concept": (
{"doc_id": next_concept["doc_id"], "title": next_concept["title"]}
if next_concept
else None
),
}
# 잔디 = KST 일 단위 활동량. attempts + document_reads 는 전이력 정확, 카드 평가일은 근사(docstring).
_GRASS_SQL = text(
"""
SELECT d, sum(n)::int AS n FROM (
SELECT (answered_at AT TIME ZONE 'Asia/Seoul')::date AS d, count(*) AS n
FROM study_question_attempts
WHERE user_id = :uid AND answered_at >= :since
GROUP BY 1
UNION ALL
SELECT (read_at AT TIME ZONE 'Asia/Seoul')::date AS d, count(*) AS n
FROM document_reads
WHERE user_id = :uid AND read_at >= :since
GROUP BY 1
UNION ALL
SELECT (last_reviewed_at AT TIME ZONE 'Asia/Seoul')::date AS d, count(*) AS n
FROM study_memo_card_progress
WHERE user_id = :uid AND last_reviewed_at IS NOT NULL AND last_reviewed_at >= :since
GROUP BY 1
) u GROUP BY d
"""
)
async def streak_and_grass(
session: AsyncSession, user_id: int, now: datetime | None = None, days: int = GRASS_DAYS
) -> dict:
"""잔디(최근 days일, KST 일 단위 활동량) + 스트릭(오늘 또는 어제부터 역산 연속 활동일)."""
now = now or datetime.now(timezone.utc)
today_local = now.astimezone(KST).date()
since = kst_day_start(now) - timedelta(days=days - 1)
rows = (
await session.execute(_GRASS_SQL, {"uid": user_id, "since": since})
).all()
by_day = {r.d: r.n for r in rows}
grass = []
for i in range(days - 1, -1, -1):
d = today_local - timedelta(days=i)
grass.append({"d": d.isoformat(), "n": int(by_day.get(d, 0))})
# 스트릭: 오늘 활동 있으면 오늘부터, 없으면 어제부터 역산 (오늘은 아직 진행 중일 수 있음).
streak = 0
cursor = today_local if by_day.get(today_local, 0) > 0 else today_local - timedelta(days=1)
while by_day.get(cursor, 0) > 0:
streak += 1
cursor -= timedelta(days=1)
return {"streak": streak, "today_active": by_day.get(today_local, 0) > 0, "grass": grass}
+193 -3
View File
@@ -66,6 +66,9 @@ class SummarizeUnit:
text: str = "" text: str = ""
est_tokens: int = 0 est_tokens: int = 0
over_cap: bool = False # 단독 섹션이 CAP 초과 (hybrid 시 클로드 대상) over_cap: bool = False # 단독 섹션이 CAP 초과 (hybrid 시 클로드 대상)
# PR3: 이 유닛을 구성한 leaf 의 서수(extract_leaves 순서) — export CLI 가
# leaf_spans 와 결합해 유닛 (start,end) 스팬을 계산한다. 페이로드 미기록.
leaf_indexes: list[int] = field(default_factory=list)
@dataclass @dataclass
@@ -92,20 +95,22 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
units: list[SummarizeUnit] = [] units: list[SummarizeUnit] = []
cur_titles: list[str | None] = [] cur_titles: list[str | None] = []
cur_texts: list[str] = [] cur_texts: list[str] = []
cur_indexes: list[int] = []
cur_tokens = 0 cur_tokens = 0
def _flush() -> None: def _flush() -> None:
nonlocal cur_titles, cur_texts, cur_tokens nonlocal cur_titles, cur_texts, cur_indexes, cur_tokens
if cur_texts: if cur_texts:
units.append(SummarizeUnit( units.append(SummarizeUnit(
index=len(units), index=len(units),
section_titles=cur_titles, section_titles=cur_titles,
text="\n\n".join(cur_texts), text="\n\n".join(cur_texts),
est_tokens=cur_tokens, est_tokens=cur_tokens,
leaf_indexes=cur_indexes,
)) ))
cur_titles, cur_texts, cur_tokens = [], [], 0 cur_titles, cur_texts, cur_indexes, cur_tokens = [], [], [], 0
for leaf in leaves: for li, leaf in enumerate(leaves):
t = estimate_tokens(leaf.text) t = estimate_tokens(leaf.text)
if t > cap: if t > cap:
_flush() _flush()
@@ -115,12 +120,14 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
text=leaf.text, text=leaf.text,
est_tokens=t, est_tokens=t,
over_cap=True, over_cap=True,
leaf_indexes=[li],
)) ))
continue continue
if cur_tokens + t > cap: if cur_tokens + t > cap:
_flush() _flush()
cur_titles.append(leaf.section_title) cur_titles.append(leaf.section_title)
cur_texts.append(leaf.text) cur_texts.append(leaf.text)
cur_indexes.append(li)
cur_tokens += t cur_tokens += t
_flush() _flush()
return units return units
@@ -222,3 +229,186 @@ def build_reduce_units_block(
block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))] block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))]
truncated = True truncated = True
return block, truncated return block, truncated
# ─── PR3 — 유인 분할(units_override) 경계 순수함수 (worker + attended CLI 공용) ───
#
# HOLD(hybrid/whole) 문서를 사람이(유인 클로드 세션) 분할한 경계
# [(start, end, title)] 로 재개하는 경로. 오프셋 = 소스 텍스트의 Python 문자
# (code point) 인덱스 — export 가 덤프한 파일과 apply/워커의 슬라이스가 같은
# 기준을 공유해야 한다 (builder 의 char_start 는 UTF-16 단위라 여기서 미사용).
OVERRIDE_MIN_COVERAGE_PCT = 90.0 # apply 게이트 — 전체 본문의 90%+ 커버 필수
OVERRIDE_GAP_WARN_CHARS = 1_000 # 이보다 큰 공백 구간은 경고로 노출
@dataclass
class OverrideCheck:
"""validate_override_boundaries 결과 — ok=False 면 errors 에 사유."""
ok: bool
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
coverage_pct: float = 0.0
# 정규화된 (start, end, title) — units_from_boundaries 입력으로 그대로 사용
boundaries: list[tuple[int, int, str | None]] = field(default_factory=list)
unit_tokens: list[int] = field(default_factory=list)
def choose_override_source(
md_content: str | None, extracted_text: str | None
) -> tuple[str, str]:
"""units_override 오프셋 기준 텍스트 선택 — (source_name, text).
canonical markdown(md_content) 우선, 부재/공백 extracted_text 폴백.
export CLI · apply CLI · 워커 재개가 반드시 같은 규칙을 공유해야
(start,end) 오프셋이 일치한다. 선택 결과는 units_override.source 박제.
"""
if md_content and md_content.strip():
return "md_content", md_content
return "extracted_text", extracted_text or ""
def _normalize_boundary(entry, idx: int, errors: list[str]) -> tuple[int, int, str | None] | None:
"""boundaries 1건 정규화 — [start,end,title?] 배열 또는 {start,end,title} 객체.
export 템플릿의 초과 스팬은 "todo" 키를 달고 나온다 미해결(todo 잔존) 상태로
apply 하면 여기서 에러 (사람이 CAP 이하 경계로 분할을 완성해야 통과).
"""
if isinstance(entry, dict):
if entry.get("todo"):
errors.append(
f"유닛 {idx}: TODO 미해결 — 초과 스팬을 CAP 이하 경계들로 분할한 뒤 todo 키를 제거해야 함"
)
return None
start, end, title = entry.get("start"), entry.get("end"), entry.get("title")
elif isinstance(entry, (list, tuple)) and 2 <= len(entry) <= 3:
start, end = entry[0], entry[1]
title = entry[2] if len(entry) == 3 else None
else:
errors.append(f"유닛 {idx}: 형식 오류 — [start, end, title] 또는 {{start, end, title}} 여야 함")
return None
if (
isinstance(start, bool) or isinstance(end, bool)
or not isinstance(start, int) or not isinstance(end, int)
):
errors.append(f"유닛 {idx}: start/end 는 정수여야 함 (start={start!r}, end={end!r})")
return None
if title is not None and not isinstance(title, str):
title = str(title)
return start, end, title
def validate_override_boundaries(
text: str,
raw_boundaries,
*,
cap: int = CAP_TOKENS,
min_coverage_pct: float = OVERRIDE_MIN_COVERAGE_PCT,
gap_warn_chars: int = OVERRIDE_GAP_WARN_CHARS,
) -> OverrideCheck:
"""units_override 경계 검증 — 단조증가·비중첩·본문 범위 내·커버리지·유닛별 캡.
apply CLI = 기본 게이트(cap=CAP_TOKENS, coverage>=90%).
워커 방어 = cap 슬랙(CAP*1.1)·coverage 0 으로 완화 호출 잘못된 override
900s 콜을 재생산하는 것만 차단하고, 품질 게이트는 apply 시점에 이미 통과했다고 본다.
"""
errors: list[str] = []
warnings: list[str] = []
if not isinstance(raw_boundaries, (list, tuple)) or not raw_boundaries:
return OverrideCheck(ok=False, errors=["boundaries 가 비어있거나 리스트가 아님"])
normalized: list[tuple[int, int, str | None]] = []
for i, entry in enumerate(raw_boundaries):
norm = _normalize_boundary(entry, i, errors)
if norm is not None:
normalized.append(norm)
if errors:
return OverrideCheck(ok=False, errors=errors, warnings=warnings)
n = len(text)
prev_end = None
unit_tokens: list[int] = []
covered = 0
for i, (start, end, title) in enumerate(normalized):
label = f"유닛 {i}" + (f" ({title})" if title else "")
if start < 0 or end > n:
errors.append(f"{label}: 본문 범위 밖 — [{start}, {end}) vs len={n}")
continue
if start >= end:
errors.append(f"{label}: start >= end ([{start}, {end}))")
continue
if prev_end is not None and start < prev_end:
errors.append(f"{label}: 직전 유닛과 중첩/역순 — start={start} < 직전 end={prev_end}")
if prev_end is not None and start - prev_end > gap_warn_chars:
warnings.append(f"{label} 앞 공백 구간 {start - prev_end:,}자 ([{prev_end}, {start})) — 의도 확인")
est = estimate_tokens(text[start:end])
unit_tokens.append(est)
if est > cap:
errors.append(f"{label}: 추정 {est:,} tok > cap {cap:,} — 이 스팬을 더 분할해야 함")
covered += end - start
prev_end = max(prev_end or 0, end)
if not errors:
head_gap = normalized[0][0]
tail_gap = n - normalized[-1][1]
if head_gap > gap_warn_chars:
warnings.append(f"문서 선두 공백 구간 {head_gap:,}자 ([0, {normalized[0][0]})) — 의도 확인")
if tail_gap > gap_warn_chars:
warnings.append(f"문서 말미 공백 구간 {tail_gap:,}자 ([{normalized[-1][1]}, {n})) — 의도 확인")
coverage_pct = round(covered * 100.0 / n, 2) if n else 0.0
if not errors and coverage_pct < min_coverage_pct:
errors.append(
f"커버리지 {coverage_pct}% < {min_coverage_pct}% — 경계가 본문 대부분을 덮어야 함"
)
return OverrideCheck(
ok=not errors,
errors=errors,
warnings=warnings,
coverage_pct=coverage_pct,
boundaries=normalized if not errors else [],
unit_tokens=unit_tokens,
)
def units_from_boundaries(
text: str, boundaries: list[tuple[int, int, str | None]]
) -> list[SummarizeUnit]:
"""정규화·검증 통과한 (start,end,title) 리스트 → SummarizeUnit 리스트.
유닛 index = 경계 서수 boundaries payload 박제되므로 attempt 안정
(map_results 멱등 재개 키와 정합).
"""
units: list[SummarizeUnit] = []
for i, (start, end, title) in enumerate(boundaries):
seg = text[start:end]
units.append(SummarizeUnit(
index=i,
section_titles=[title],
text=seg,
est_tokens=estimate_tokens(seg),
))
return units
def leaf_spans(text: str, leaves: list[HierNode]) -> list[tuple[int, int]]:
"""extract_leaves 결과 leaf 들의 원문 (start,end) 문자 스팬.
_segment 원문을 연속 파티션( preamble 폐기)으로 자르므로, 커서 순차
탐색이 항상 정확한 위치를 찾는다 (동일 본문 반복이 있어도 순서가 앞선 leaf
오프셋을 가져간다).
"""
spans: list[tuple[int, int]] = []
cursor = 0
for leaf in leaves:
pos = text.find(leaf.text, cursor)
if pos < 0:
# 이론상 불가(연속 파티션) — 방어적으로 전체 재탐색
pos = text.find(leaf.text)
if pos < 0:
raise ValueError(f"leaf 본문을 원문에서 찾지 못함 (title={leaf.section_title!r})")
spans.append((pos, pos + len(leaf.text)))
cursor = pos + len(leaf.text)
return spans
+189 -12
View File
@@ -14,7 +14,7 @@ import asyncio
import json import json
import os import os
import time import time
from datetime import datetime, timezone from datetime import datetime, timedelta, timezone
from pydantic import BaseModel, Field, ValidationError from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import desc, select from sqlalchemy import desc, select
@@ -29,15 +29,19 @@ from core.utils import setup_logger
from models.document import Document from models.document import Document
from models.queue import ProcessingQueue, StageDeferred from models.queue import ProcessingQueue, StageDeferred
from policy.prompt_render import render_26b, policy_version as compute_policy_version from policy.prompt_render import render_26b, policy_version as compute_policy_version
from services.alerts import send_alert
from services.document_telemetry import record_analyze_event from services.document_telemetry import record_analyze_event
from services.search.llm_gate import Priority, acquire_mlx_gate from services.search.llm_gate import Priority, acquire_mlx_gate
from services.summarize_units import ( from services.summarize_units import (
CAP_TOKENS, CAP_TOKENS,
UnitPlan, UnitPlan,
build_reduce_units_block, build_reduce_units_block,
choose_override_source,
estimate_tokens, estimate_tokens,
plan_summarize_units, plan_summarize_units,
render_map_slice, render_map_slice,
units_from_boundaries,
validate_override_boundaries,
) )
logger = setup_logger("deep_summary_worker") logger = setup_logger("deep_summary_worker")
@@ -45,9 +49,16 @@ logger = setup_logger("deep_summary_worker")
DEEP_SUMMARY_TASK = "p3c_deep_summary" DEEP_SUMMARY_TASK = "p3c_deep_summary"
# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce # presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce
REDUCE_TASK = "p3c_deep_summary_reduce" REDUCE_TASK = "p3c_deep_summary_reduce"
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격. PR3(알람·경계 주입) 전까지는 # HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격 — attempts 미소모(StageDeferred)라
# 이 간격으로 재계획만 반복한다 — attempts 미소모(StageDeferred)라 영구 failed 없음. # 영구 failed 없음. PR3: HOLD 시 웹훅 알람 + units_override 주입 시 즉시 재개.
HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440")) HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440"))
# HOLD 알람 dedupe — payload.presegment.alerted_at 이 이 일수 이내면 재발화 억제
# (매 24h 재보류마다 재알람 방지). apply CLI 가 override 기록 시 alerted_at 을 지워
# 다음 이벤트(예: override 거부)는 신선하게 발화된다.
ALERT_DEDUPE_DAYS = 7
# units_override 방어 캡 슬랙 — apply 게이트(CAP)보다 10% 여유. 초과 유닛은 실패
# 대신 재-HOLD + 알람 (잘못된 override 가 900s 콜을 재생산하는 것 차단).
OVERRIDE_CAP_SLACK = 1.1
# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어). # reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어).
REDUCE_BUDGET_FLOOR_TOKENS = 1_000 REDUCE_BUDGET_FLOOR_TOKENS = 1_000
@@ -111,6 +122,17 @@ async def process(
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw)) envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
# ─── presegment PR3 — units_override 재개 경로 (유인 분할 경계 주입) ───
# apply CLI(scripts/presegment_attended.py) 가 payload.presegment.units_override 를
# 기록한 문서는 tier 재판정·HOLD 없이 그 경계로 유닛을 구성해 기존 PR2
# map-reduce 경로를 그대로 탄다. override 없는 문서는 아래 기존 경로와 바이트 동일.
if (payload.get("presegment") or {}).get("units_override"):
await _process_units_override(
doc, queue_row, envelope, subject_domain, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
return
# ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ─── # ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ───
# TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way: # TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way:
# auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce) # auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce)
@@ -123,7 +145,9 @@ async def process(
# units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce # units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce
# 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다. # 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다.
if unit_plan.tier != "auto" or not unit_plan.units: if unit_plan.tier != "auto" or not unit_plan.units:
await _hold_awaiting_split(session, queue_row, unit_plan, document_id) await _hold_awaiting_split(
session, queue_row, unit_plan, document_id, doc_title=doc.title
)
await _process_map_reduce( await _process_map_reduce(
doc, queue_row, envelope, subject_domain, unit_plan, session, doc, queue_row, envelope, subject_domain, unit_plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable, defer_on_deep_unavailable=defer_on_deep_unavailable,
@@ -250,43 +274,196 @@ async def process(
) )
def _hold_alert_due(preseg: dict, now: datetime) -> bool:
"""HOLD 알람 dedupe — alerted_at 이 없거나 ALERT_DEDUPE_DAYS 초과 시에만 발화."""
ts = preseg.get("alerted_at")
if not ts:
return True
try:
prev = datetime.fromisoformat(str(ts))
except ValueError:
return True # 깨진 타임스탬프 = 기록 신뢰 불가 → 발화하고 재기록
if prev.tzinfo is None:
prev = prev.replace(tzinfo=timezone.utc)
return (now - prev) >= timedelta(days=ALERT_DEDUPE_DAYS)
async def _hold_awaiting_split( async def _hold_awaiting_split(
session: AsyncSession, queue_row: ProcessingQueue, plan: UnitPlan, document_id: int session: AsyncSession,
queue_row: ProcessingQueue,
plan: UnitPlan,
document_id: int,
doc_title: str | None = None,
) -> None: ) -> None:
"""HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류. """HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류.
payload.presegment.awaiting_split 마킹을 먼저 commit StageDeferred 핸들러 payload.presegment.awaiting_split 마킹을 먼저 commit StageDeferred 핸들러
(queue_consumer) 세션에서 행을 다시 읽어 deferred_until 병합하므로 유실 없음. (queue_consumer) 세션에서 행을 다시 읽어 deferred_until 병합하므로 유실 없음.
알람(ntfy)·클로드 경계 주입은 PR3 전까지는 HOLD_RETRY_MINUTES 간격 재계획만 반복. PR3: 유인 전환 게이트 웹훅 알람 발화(alerted_at dedupe 24h 재보류마다 재알람 방지).
무인 자동 cloud 호출 금지 준수(클로드 경로는 항상 유인 게이트). 무인 자동 cloud 호출 금지 준수(클로드 경로는 항상 유인 게이트).
""" """
payload = dict(queue_row.payload or {}) payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {}) preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
oversized = [
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20]
preseg.update({ preseg.update({
"awaiting_split": True, "awaiting_split": True,
"tier": plan.tier, "tier": plan.tier,
"over_pct": plan.over_pct, "over_pct": plan.over_pct,
"total_est_tokens": plan.total_est_tokens, "total_est_tokens": plan.total_est_tokens,
"units": len(plan.units), "units": len(plan.units),
# 클로드가 분할해야 할 초과 섹션 표본 (PR3 알람 본문용) # 클로드가 분할해야 할 초과 섹션 표본 (알람 본문 + export CLI 안내용)
"oversized_sections": [ "oversized_sections": oversized,
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20],
}) })
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지 queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit() await session.commit()
logger.info( logger.info(
f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} " f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} "
f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} " f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} "
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, 클로드 분할=PR3 유인)" f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, alert={'발화' if alert_due else 'dedupe'})"
) )
if alert_due:
# commit 이후 발화 — 알람이 5s 행에 걸려도 payload 마킹은 이미 영속.
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
top3 = ", ".join(t for t in oversized[:3] if t) or "(제목 없음)"
await send_alert(
f"[DS] deep_summary HOLD — doc {document_id} 유인 분할 필요",
(
f"문서: {doc_title or '(제목 없음)'} (id={document_id})\n"
f"tier={plan.tier} / over%={plan.over_pct} / "
f"total_est_tokens={plan.total_est_tokens:,} / units={len(plan.units)}\n"
f"초과 섹션(상위 3): {top3}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"유인 분할: scripts/presegment_attended.py export --doc {document_id}"
),
)
raise StageDeferred( raise StageDeferred(
f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES
) )
async def _rehold_bad_override(
session: AsyncSession, queue_row: ProcessingQueue, doc: Document, reason: str
) -> None:
"""잘못된 units_override — 실패 대신 재-HOLD + 알람 (900s 콜 재생산 차단).
units_override payload 보존(사람이 원인 조사) + override_rejected 사유 기록.
apply CLI alerted_at 지워두므로 거부는 즉시 발화되고, 이후 24h 재보류
루프는 ALERT_DEDUPE_DAYS dedupe 억제된다.
"""
document_id = doc.id
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
preseg.update({
"awaiting_split": True,
"override_rejected": reason,
"override_rejected_at": now.isoformat(),
})
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit()
logger.warning(f"[deep] id={document_id} units_override 거부 → 재-HOLD: {reason}")
if alert_due:
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
await send_alert(
f"[DS] deep_summary 유인 분할 경계 거부 — doc {document_id}",
(
f"문서: {doc.title or '(제목 없음)'} (id={document_id})\n"
f"사유: {reason}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"수정: scripts/presegment_attended.py export --doc {document_id} "
f"→ apply --doc {document_id} --boundaries FILE"
),
)
raise StageDeferred(
f"override_rejected:{reason[:80]}", retry_after_minutes=HOLD_RETRY_MINUTES
)
async def _process_units_override(
doc: Document,
queue_row: ProcessingQueue,
envelope: EscalationEnvelope,
subject_domain: str,
session: AsyncSession,
*,
defer_on_deep_unavailable: bool,
) -> None:
"""PR3 — apply CLI 가 기록한 유인 분할 경계로 map-reduce 재개.
경계 = units_override.source 텍스트의 (start, end) 문자 오프셋. 방어 검증
(source_len 일치·단조·비중첩·범위·유닛 *1.1) 실패 -HOLD + 알람
apply 이후 본문이 재생성됐거나 수기 주입이 깨진 경우 900s 콜로 흐르지 않는다.
통과 기존 PR2 _process_map_reduce 그대로 탄다( 결과 유닛 단위 commit·
reduce·ai_detail_summary 기록 유닛 index payload 박제 경계 서수라 안정).
"""
document_id = doc.id
preseg = dict((queue_row.payload or {}).get("presegment") or {})
override = preseg.get("units_override")
if isinstance(override, (list, tuple)):
# 수기 주입 호환 — bare [(start,end,title)] 리스트도 허용
override = {"boundaries": list(override)}
if not isinstance(override, dict):
await _rehold_bad_override(
session, queue_row, doc, f"units_override 형식 오류 (type={type(override).__name__})"
)
source = override.get("source")
if source is None:
source, text_src = choose_override_source(doc.md_content, doc.extracted_text)
elif source in ("md_content", "extracted_text"):
text_src = (doc.md_content if source == "md_content" else doc.extracted_text) or ""
else:
await _rehold_bad_override(
session, queue_row, doc, f"units_override.source={source!r} 미지원"
)
expected_len = override.get("source_len")
if expected_len is not None and expected_len != len(text_src):
await _rehold_bad_override(
session, queue_row, doc,
f"source_len 불일치 — override={expected_len:,} vs 현재 {source}={len(text_src):,}"
" (본문 재생성됨 — export 부터 재실행)",
)
check = validate_override_boundaries(
text_src,
override.get("boundaries") or [],
cap=int(CAP_TOKENS * OVERRIDE_CAP_SLACK),
min_coverage_pct=0.0, # 커버리지 품질 게이트는 apply CLI 가 이미 통과시킴
)
if not check.ok:
await _rehold_bad_override(session, queue_row, doc, "; ".join(check.errors[:5]))
units = units_from_boundaries(text_src, check.boundaries)
plan = UnitPlan(
mode="map_reduce",
tier="override",
total_est_tokens=estimate_tokens(text_src),
over_pct=float(preseg.get("over_pct") or 0.0),
units=units,
)
logger.info(
f"[deep] id={document_id} units_override 재개 — source={source} units={len(units)} "
f"coverage={check.coverage_pct}% max_unit_tokens={max(check.unit_tokens, default=0)}"
)
await _process_map_reduce(
doc, queue_row, envelope, subject_domain, plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
async def _call_26b( async def _call_26b(
client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int
): ):
+5
View File
@@ -110,6 +110,11 @@ def _get_pdf_page_count(
async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None: async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None:
"""OCR 서비스 호출 — 타임아웃 페이지 수 비례""" """OCR 서비스 호출 — 타임아웃 페이지 수 비례"""
if not settings.ocr_enabled:
# 2노드 이관(2026-07-02): GPU Surya 폐기 — 명시 비활성. None 반환 = 기존 soft-fail
# 의미론(호출자가 ocr_attempted/skip_reason 메타 기록). 스캔 문서는 비전 배치 경로 별도.
logger.warning("[ocr] OCR_ENABLED=false — skip (스캔·이미지 추출은 비전 배치 경로)")
return None
container_path = f"/documents/{file_path.relative_to(Path(settings.nas_mount_path))}" container_path = f"/documents/{file_path.relative_to(Path(settings.nas_mount_path))}"
timeout = 60 if is_image else min(600, max(120, max_pages * 3)) timeout = 60 if is_image else min(600, max(120, max_pages * 3))
try: try:
+32
View File
@@ -23,6 +23,15 @@ logger = setup_logger("queue_consumer")
# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만. # pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만.
_hold_logged = False _hold_logged = False
# PR3 후속(2026-07-03): 영구 실패 알람 — 사람이 개입해야 풀리는 상태라 Chat 웹훅 발화.
# allowlist 로 소음 제한: embed/chunk 류 대량 배치가 일제히 실패하면 문서 수만큼 알람이
# 쏟아지므로, 건당 가치가 높고 발생률이 낮은 LLM 스테이지만 기본 대상으로 한다.
_ALERT_FAIL_STAGES = {
s.strip()
for s in os.getenv("ALERT_FAIL_STAGES", "deep_summary,summarize").split(",")
if s.strip()
}
# stage별 배치 크기 # stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움. # stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. # deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
@@ -353,6 +362,8 @@ async def _process_stage(stage, worker_fn):
except Exception as e: except Exception as e:
# 실패 처리 # 실패 처리
permanently_failed = False
doc_title = None
async with async_session() as session: async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id) item = await session.get(ProcessingQueue, queue_id)
if not item: if not item:
@@ -363,7 +374,16 @@ async def _process_stage(stage, worker_fn):
item.error_message = err_text[:500] item.error_message = err_text[:500]
if item.attempts >= item.max_attempts: if item.attempts >= item.max_attempts:
item.status = "failed" item.status = "failed"
permanently_failed = True
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}") logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
if stage in _ALERT_FAIL_STAGES:
# 알람용 제목 best-effort — 실패해도 알람 자체는 발화한다.
try:
from models.document import Document
_doc = await session.get(Document, document_id)
doc_title = getattr(_doc, "title", None)
except Exception:
doc_title = None
# B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다. # B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다.
# 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형 # 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형
# batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan # batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan
@@ -385,6 +405,18 @@ async def _process_stage(stage, worker_fn):
item.started_at = None item.started_at = None
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}") logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
await session.commit() await session.commit()
if permanently_failed and stage in _ALERT_FAIL_STAGES:
# 영구 실패 = 무인 파이프라인이 스스로 못 푸는 상태 → 유인 전환 알람.
# send_alert 는 절대 raise 하지 않음(no-op/실패 = False 반환뿐).
from services.alerts import send_alert
await send_alert(
f"[DS] {stage} 영구 실패 — doc {document_id}",
(
f"{doc_title or '(제목 미상)'}\n"
f"에러: {err_text[:300]}\n"
f"확인: scripts/presegment_attended.py list (보류/거부 사유) 또는 큐 재큐"
),
)
async def consume_queue(): async def consume_queue():
+8
View File
@@ -42,6 +42,14 @@ async def process(document_id: int, session: AsyncSession) -> None:
logger.warning(f"[stt] id={document_id} file_path 없음 — skip") logger.warning(f"[stt] id={document_id} file_path 없음 — skip")
return return
if not settings.stt_enabled:
# 2노드 이관(2026-07-02): GPU stt-service 폐기 — 명시 비활성. silent 금지:
# 경고 로그 + extract_meta 터미널 기록 (재시도 안 함, 상태 가시).
doc.extract_meta = {**(doc.extract_meta or {}), "stt_skip_reason": "disabled", "stt_terminal": True}
await session.commit()
logger.warning(f"[stt] id={document_id} STT_ENABLED=false — 터미널 skip (전사 없음)")
return
# NAS 마운트 경로로 절대화 (services/stt 컨테이너도 동일 경로에 bind mount) # NAS 마운트 경로로 절대화 (services/stt 컨테이너도 동일 경로에 bind mount)
container_path = str(Path(settings.nas_mount_path) / doc.file_path) container_path = str(Path(settings.nas_mount_path) / doc.file_path)
+56
View File
@@ -14,21 +14,72 @@ due 0 이면 row 미생성(noise 방지). 놓친 시각은 그냥 skip(stale 복
from __future__ import annotations from __future__ import annotations
import json
import logging import logging
from collections import defaultdict from collections import defaultdict
from datetime import datetime, timezone from datetime import datetime, timezone
import httpx
from sqlalchemy import func, or_, select from sqlalchemy import func, or_, select
from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.postgresql import insert as pg_insert
from core.config import settings
from core.database import async_session from core.database import async_session
from models.study_question_progress import StudyQuestionProgress from models.study_question_progress import StudyQuestionProgress
from models.study_reminder import StudyReminder from models.study_reminder import StudyReminder
from models.study_topic import StudyTopic from models.study_topic import StudyTopic
from models.user import User # noqa: F401 (mapper 초기화 defensive) from models.user import User # noqa: F401 (mapper 초기화 defensive)
from services.study.daily_unit import KST, daily_state
logger = logging.getLogger("study_reminder") logger = logging.getLogger("study_reminder")
# 습관 루프 D1(2026-07-03): push 채널 flip — Synology Chat incoming webhook, 아침 1회만.
# 13/19 KST 슬롯은 집계만(노이즈 방지). '오늘의 몫' 완료 시 발신 skip(초대 프레임, 위협 아님).
WEBHOOK_HOUR_KST = 9
STUDY_HOME_URL = "https://document.hyungi.net/study"
_ITEM_LABELS = [("questions", "문제"), ("cards", "카드"), ("concepts", "개념"), ("reviews", "검수")]
async def _send_daily_nudge(by_user: dict[int, dict], now: datetime) -> None:
"""아침 슬롯 한정 — 오늘의 몫 미완 사용자에게 Synology Chat webhook 1건."""
url = settings.study_reminder_webhook_url
if not url:
logger.info("study_reminder nudge skip — STUDY_REMINDER_WEBHOOK_URL 미설정(off)")
return
async with async_session() as session:
for uid, agg in by_user.items():
if not agg["names"]:
continue
topic_id = agg["names"][0]["topic_id"]
try:
state = await daily_state(session, uid, topic_id, now)
except Exception:
logger.exception("study_reminder nudge daily_state 실패 user=%s", uid)
continue
if state["complete"]:
continue # 오늘 몫 완료 — 발신 없음
parts = [
f"{label} {it['target']}"
for key, label in _ITEM_LABELS
if (it := state["items"][key])["target"] > 0 and not it["complete"]
]
if not parts:
continue
msg = f"공부 오늘의 몫 — {' · '.join(parts)} (약 6분)\n{STUDY_HOME_URL}"
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
url, data={"payload": json.dumps({"text": msg}, ensure_ascii=False)}
)
resp.raise_for_status()
logger.info("study_reminder nudge sent user=%s", uid)
except Exception:
# 발신 실패 = 내일 다시 (재시도 없음 — 아침 1회 원칙, 실패는 로그만)
logger.exception("study_reminder nudge 발신 실패 user=%s", uid)
async def run() -> None: async def run() -> None:
"""APScheduler cron 진입점. focus 토픽 due 집계 → study_reminders append.""" """APScheduler cron 진입점. focus 토픽 due 집계 → study_reminders append."""
@@ -90,3 +141,8 @@ async def run() -> None:
await session.commit() await session.commit()
if inserted: if inserted:
logger.info("study_reminder fired slot=%s users=%d", slot.isoformat(), inserted) logger.info("study_reminder fired slot=%s users=%d", slot.isoformat(), inserted)
# 아침(09 KST) 슬롯만 채널 발신 — due 유무와 무관하게 '오늘의 몫' 미완이 기준
# (due 0 이어도 미독 개념/신규 카드가 몫을 구성할 수 있음).
if now.astimezone(KST).hour == WEBHOOK_HOUR_KST:
await _send_daily_nudge(by_user, now)
+3
View File
@@ -60,6 +60,9 @@ ai:
rerank: rerank:
endpoint: "http://reranker:80/rerank" endpoint: "http://reranker:80/rerank"
model: "bge-reranker-v2-m3" model: "bge-reranker-v2-m3"
# 2노드 이관: "tei"(GPU TEI /rerank, 기본) | "llamacpp"(맥미니 llama.cpp,
# 예: endpoint http://100.76.254.116:8807/v1/rerank). 미지원 값 = 기동 시 ValueError.
protocol: "tei"
# Phase 3.5a answerability classifier. 2026-05-14 GPU LLM 제거 후 Mac mini 26B 로 swap. # Phase 3.5a answerability classifier. 2026-05-14 GPU LLM 제거 후 Mac mini 26B 로 swap.
# classifier_service 가 hasattr 체크로 optional 이므로 이 섹션 제거 시 classifier gate 는 자동 skip (score-only). # classifier_service 가 hasattr 체크로 optional 이므로 이 섹션 제거 시 classifier gate 는 자동 skip (score-only).
@@ -1,6 +1,7 @@
<script lang="ts"> <script lang="ts">
// 처리 머신 보드 v3통합안 (plan ds-board-merged: C2 머신레인 + C3 번다운/정직ETA). // 처리 머신 보드 v42026-07-02 컷오버 후 2노드 (나스+맥미니).
// · 머신 3레인(GPU/맥미니/맥북) = "누가 일하나" + 요약 오프로드(맥북 합류) 가시화 // · 머신 2레인(나스/맥미니) = "누가 일하나" — 나스=DS 본체 Docker(추출/마크다운/
// 청크·임베딩 등), 맥미니=단일 생성 LLM 허브(분류/요약/심층분석 + bge-m3/리랭크)
// · 지배 백로그 번다운 패널 = "언제 끝나나" + 유입 차감한 정직 ETA(summarize_eta) // · 지배 백로그 번다운 패널 = "언제 끝나나" + 유입 차감한 정직 ETA(summarize_eta)
// · 신선도 '갱신 N초 전' + stale 경고 / 실패 드로어·상세 패널은 v2 자산 재사용. // · 신선도 '갱신 N초 전' + stale 경고 / 실패 드로어·상세 패널은 v2 자산 재사용.
// 데이터 = GET /api/queue/overview (60s 폴링 store) + GET /api/queue/failed (드로어). // 데이터 = GET /api/queue/overview (60s 폴링 store) + GET /api/queue/failed (드로어).
@@ -193,7 +194,7 @@
const machineByKey = $derived( const machineByKey = $derived(
new Map<FlowMachine, MachineOverview>(overview.machines.map((m) => [m.key as FlowMachine, m])), new Map<FlowMachine, MachineOverview>(overview.machines.map((m) => [m.key as FlowMachine, m])),
); );
const LANE_ORDER: FlowMachine[] = ['gpu', 'macmini', 'macbook']; const LANE_ORDER: FlowMachine[] = ['nas', 'macmini'];
const lanes = $derived( const lanes = $derived(
LANE_ORDER.map((key) => ({ LANE_ORDER.map((key) => ({
key, key,
@@ -203,13 +204,6 @@
})), })),
); );
// 요약 오프로드 분담 — 맥미니 vs 맥북 (A-1 summarize_by_machine)
const split = $derived(overview.summarize_by_machine);
const splitTotal1h = $derived(Math.max(1, split.macmini.done_1h + split.macbook.done_1h));
const macbookSharePct = $derived(Math.round((split.macbook.done_1h / splitTotal1h) * 100));
// 맥북이 요약을 실제로 가져가는 중인가 (합류 표식 게이트)
const offloadActive = $derived(split.macbook.done_1h > 0);
// ─── 백그라운드 작업 (큐 밖 스크립트 backfill) — processing_queue 사각지대 노출 ─── // ─── 백그라운드 작업 (큐 밖 스크립트 backfill) — processing_queue 사각지대 노출 ───
const bgJobs = $derived(overview.background_jobs ?? []); const bgJobs = $derived(overview.background_jobs ?? []);
const runningBg = $derived(bgJobs.filter((j) => j.state === 'running')); const runningBg = $derived(bgJobs.filter((j) => j.state === 'running'));
@@ -266,7 +260,7 @@
: `갱신 ${Math.round(ageSec / 60)}분 전`, : `갱신 ${Math.round(ageSec / 60)}분 전`,
); );
// ─── 24h 번다운 (C3) — 요약 유입 vs 소화 + 맥북 합류 변곡점 마커 ─── // ─── 24h 번다운 (C3) — 요약 유입 vs 소화 ───
const burn = $derived.by(() => { const burn = $derived.by(() => {
const t = overview.trend_24h; const t = overview.trend_24h;
if (!t || t.length === 0) return null; if (!t || t.length === 0) return null;
@@ -279,20 +273,12 @@
t.map((b, i) => `${(i * step).toFixed(1)},${y(sel(b))}`).join(' '); t.map((b, i) => `${(i * step).toFixed(1)},${y(sel(b))}`).join(' ');
const doneLine = line((b) => b.done); const doneLine = line((b) => b.done);
const area = `0,${h} ${doneLine} ${w.toFixed(1)},${h}`; const area = `0,${h} ${doneLine} ${w.toFixed(1)},${h}`;
// 합류 변곡점 = done 최대 버킷 (맥북 야간 drain 합류 추정)
let mi = 0;
t.forEach((b, i) => {
if (b.done > t[mi].done) mi = i;
});
return { return {
w, w,
h, h,
area, area,
doneLine, doneLine,
inflowLine: line((b) => b.inflow), inflowLine: line((b) => b.inflow),
markX: (mi * step).toFixed(1),
markHour: t[mi].hour,
markDone: t[mi].done,
peak: max, peak: max,
}; };
}); });
@@ -332,7 +318,7 @@
</span> </span>
</div> </div>
<!-- 머신 레인 (누가 일하나 + 요약 오프로드) --> <!-- 머신 레인 (누가 일하나) -->
<div class="grid gap-2 mb-3"> <div class="grid gap-2 mb-3">
{#each lanes as lane (lane.key)} {#each lanes as lane (lane.key)}
<div class="bg-surface border border-default rounded-card px-3.5 py-2.5"> <div class="bg-surface border border-default rounded-card px-3.5 py-2.5">
@@ -342,11 +328,8 @@
<span class="text-[10px] text-faint font-mono">{lane.meta.model}</span> <span class="text-[10px] text-faint font-mono">{lane.meta.model}</span>
<span class="text-[11px] text-dim tabular-nums ml-1">{formatRate(lane.card?.done_1h ?? 0)}/h</span> <span class="text-[11px] text-dim tabular-nums ml-1">{formatRate(lane.card?.done_1h ?? 0)}/h</span>
{#each bgForMachine(lane.key) as j (j.id)}<span class="text-[10px] font-semibold text-success tabular-nums ml-1">생성 중: {j.label ?? j.kind}{#if j.total} {j.processed}/{j.total}{/if}</span>{/each} {#each bgForMachine(lane.key) as j (j.id)}<span class="text-[10px] font-semibold text-success tabular-nums ml-1">생성 중: {j.label ?? j.kind}{#if j.total} {j.processed}/{j.total}{/if}</span>{/each}
{#if lane.key === 'macbook' && (lane.card?.deferred_pending ?? 0) > 0} {#if (lane.card?.deferred_pending ?? 0) > 0}
<span class="text-[10px] font-semibold text-warning tabular-nums">보류 {lane.card?.deferred_pending}</span> <span class="text-[10px] font-semibold text-warning tabular-nums" title="LLM 백오프 — 자동 재개 대기">보류 {lane.card?.deferred_pending}</span>
{/if}
{#if lane.card?.state === 'deferred'}
<span class="text-[9px] text-warning">잠듦 — 요약은 맥미니로 복귀</span>
{/if} {/if}
</div> </div>
<div class="flex items-stretch gap-1.5 flex-wrap"> <div class="flex items-stretch gap-1.5 flex-wrap">
@@ -368,26 +351,8 @@
</div> </div>
<div class="text-sm font-extrabold tabular-nums leading-tight text-text">{n.pending.toLocaleString()}<span class="text-[9px] text-faint font-normal ml-0.5">대기</span></div> <div class="text-sm font-extrabold tabular-nums leading-tight text-text">{n.pending.toLocaleString()}<span class="text-[9px] text-faint font-normal ml-0.5">대기</span></div>
<div class="text-[9px] text-dim tabular-nums whitespace-nowrap">{formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()}</div> <div class="text-[9px] text-dim tabular-nums whitespace-nowrap">{formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()}</div>
{#if n.def.key === 'summarize'}
<div class="mt-1 h-1 w-full rounded-full overflow-hidden flex" title="맥미니 {split.macmini.done_1h}/h · 맥북 {split.macbook.done_1h}/h">
<span class="block h-full mtag-macmini-bar" style="width:{100 - macbookSharePct}%"></span>
<span class="block h-full mtag-macbook-bar" style="width:{macbookSharePct}%"></span>
</div>
<div class="text-[9px] text-faint tabular-nums whitespace-nowrap mt-0.5">맥미니 {split.macmini.done_1h} · 맥북 {split.macbook.done_1h}/h</div>
{/if}
</button> </button>
{/each} {/each}
{#if lane.key === 'macbook' && offloadActive}
<button
class="text-left rounded-lg border border-dashed border-warning/50 px-2.5 py-1.5 cursor-pointer hover:bg-surface-hover min-w-[96px]"
onclick={() => toggleNode('summarize')}
title="맥북이 요약을 맥미니에서 가져와 처리 중"
>
<div class="flex items-center gap-1 text-[11px] font-semibold text-text whitespace-nowrap">요약 합류 <span class="text-[8px] font-bold text-warning">OFFLOAD</span></div>
<div class="text-sm font-extrabold tabular-nums leading-tight text-text">{split.macbook.done_1h}<span class="text-[9px] text-faint font-normal ml-0.5">/h</span></div>
<div class="text-[9px] text-dim tabular-nums whitespace-nowrap">요약의 {macbookSharePct}% 담당</div>
</button>
{/if}
</div> </div>
</div> </div>
{/each} {/each}
@@ -399,15 +364,11 @@
<div class="flex items-center gap-2 mb-2"> <div class="flex items-center gap-2 mb-2">
<span class="text-[11px] font-bold text-text">요약 백로그 24시간</span> <span class="text-[11px] font-bold text-text">요약 백로그 24시간</span>
<span class="text-[9px] text-faint">유입(회색) vs 소화(녹색)</span> <span class="text-[9px] text-faint">유입(회색) vs 소화(녹색)</span>
{#if offloadActive}<span class="text-[9px] text-warning ml-auto">맥북 합류 {burn.markHour} — 소화 급증</span>{/if}
</div> </div>
<svg viewBox="0 0 {burn.w} {burn.h}" class="block w-full" style="height:64px" preserveAspectRatio="none" role="img" aria-label="요약 백로그 24시간 번다운"> <svg viewBox="0 0 {burn.w} {burn.h}" class="block w-full" style="height:64px" preserveAspectRatio="none" role="img" aria-label="요약 백로그 24시간 번다운">
<polygon points={burn.area} fill="currentColor" class="text-success" opacity="0.12" /> <polygon points={burn.area} fill="currentColor" class="text-success" opacity="0.12" />
<polyline points={burn.inflowLine} fill="none" stroke="currentColor" stroke-width="1.2" class="text-faint" /> <polyline points={burn.inflowLine} fill="none" stroke="currentColor" stroke-width="1.2" class="text-faint" />
<polyline points={burn.doneLine} fill="none" stroke="currentColor" stroke-width="1.6" class="text-success" /> <polyline points={burn.doneLine} fill="none" stroke="currentColor" stroke-width="1.6" class="text-success" />
{#if offloadActive}
<line x1={burn.markX} y1="0" x2={burn.markX} y2={burn.h} stroke="currentColor" stroke-width="1" stroke-dasharray="2 2" class="text-warning" opacity="0.7" />
{/if}
</svg> </svg>
<div class="flex flex-wrap gap-x-4 gap-y-1 mt-2 pt-2 border-t border-default text-[10px] text-dim tabular-nums"> <div class="flex flex-wrap gap-x-4 gap-y-1 mt-2 pt-2 border-t border-default text-[10px] text-dim tabular-nums">
{#each mainNodes.filter((n) => n.pending > 0 && n.def.key !== 'summarize') as n (n.def.key)} {#each mainNodes.filter((n) => n.pending > 0 && n.def.key !== 'summarize') as n (n.def.key)}
@@ -558,13 +519,9 @@
</div> </div>
<style> <style>
/* 머신 색 — 디자인 토큰 외 3색 (gpu 청/macmini 보라/macbook 황) — 이 컴포넌트 한정 */ /* 머신 색 — 디자인 토큰 외 2색 (nas 청/macmini 보라) — 이 컴포넌트 한정 */
.mtag-gpu { background: #e7eef6; color: #3b6ea5; } .mtag-nas { background: #e7eef6; color: #3b6ea5; }
.mtag-macmini { background: #efe9f7; color: #8a5fbf; } .mtag-macmini { background: #efe9f7; color: #8a5fbf; }
.mtag-macbook { background: #f7eedd; color: #b07a10; }
/* 요약 오프로드 분담 막대 채움 (맥미니 보라 / 맥북 황) */
.mtag-macmini-bar { background: #8a5fbf; }
.mtag-macbook-bar { background: #b07a10; }
.node-sel { outline: 2px solid #3b6ea5; outline-offset: 1px; } .node-sel { outline: 2px solid #3b6ea5; outline-offset: 1px; }
.detail-frame { border-color: #3b6ea5; } .detail-frame { border-color: #3b6ea5; }
.detail-head { background: #e7eef6; } .detail-head { background: #e7eef6; }
@@ -1,6 +1,6 @@
<script lang="ts"> <script lang="ts">
// 처리 현황 드로어 (안6 라이트) — 전 페이지 상태 스트립 클릭 시 우측에서 열림. // 처리 현황 드로어 (안6 라이트) — 전 페이지 상태 스트립 클릭 시 우측에서 열림.
// 머신 미니카드 3 + ETA 한 줄 + 실패 합계 + 홈 링크 축약본. 상세는 홈 보드가 담당. // 머신 미니카드 2(나스/맥미니) + ETA 한 줄 + 실패 합계 + 홈 링크 축약본. 상세는 홈 보드가 담당.
// 데이터 = queueOverview store 공유 (60s 폴링, 실패 시 null → 안내문으로 degrade). // 데이터 = queueOverview store 공유 (60s 폴링, 실패 시 null → 안내문으로 degrade).
// 열림 상태는 uiState 단일 drawer slot('queue') — 사이드바 드로어와 동시 오픈 차단. // 열림 상태는 uiState 단일 drawer slot('queue') — 사이드바 드로어와 동시 오픈 차단.
import { X } from 'lucide-svelte'; import { X } from 'lucide-svelte';
@@ -51,7 +51,7 @@
<div class="p-4 space-y-3"> <div class="p-4 space-y-3">
{#if data} {#if data}
<!-- 머신 미니카드 3 --> <!-- 머신 미니카드 (나스/맥미니) -->
{#each data.machines as m (m.key)} {#each data.machines as m (m.key)}
<div class="bg-surface border border-default rounded-lg px-3.5 py-2.5"> <div class="bg-surface border border-default rounded-lg px-3.5 py-2.5">
<div class="flex items-center justify-between gap-2"> <div class="flex items-center justify-between gap-2">
+2 -9
View File
@@ -5,7 +5,7 @@
* . * .
*/ */
export type MachineKey = 'gpu' | 'macmini' | 'macbook'; export type MachineKey = 'nas' | 'macmini';
/** 머신 상태 — active(가동) / deferred(보류) / idle(대기) */ /** 머신 상태 — active(가동) / deferred(보류) / idle(대기) */
export type MachineState = 'active' | 'deferred' | 'idle'; export type MachineState = 'active' | 'deferred' | 'idle';
@@ -29,7 +29,7 @@ export interface MachineOverview {
/** 최근 1시간 완료 건수 (처리율 N/h 표기) */ /** 최근 1시간 완료 건수 (처리율 N/h 표기) */
done_1h: number; done_1h: number;
done_today: number; done_today: number;
/** 보류 건수 — 맥북 sleep 등으로 자동 재개 대기 중 */ /** 보류 건수 — LLM 허브 백오프 등으로 자동 재개 대기 중 */
deferred_pending: number; deferred_pending: number;
current: MachineCurrentItem[]; current: MachineCurrentItem[];
} }
@@ -50,12 +50,6 @@ export interface TrendPoint {
done: number; done: number;
} }
/** summarize 머신별 완료 실적 분담 (오프로드 가시화 — ds-board-merged A-1) */
export interface SummarizeByMachine {
macmini: { done_1h: number; done_today: number };
macbook: { done_1h: number; done_today: number };
}
export interface QueueTotals { export interface QueueTotals {
pending: number; pending: number;
processing: number; processing: number;
@@ -93,7 +87,6 @@ export interface BackgroundJob {
export interface QueueOverview { export interface QueueOverview {
machines: MachineOverview[]; machines: MachineOverview[];
summarize_eta: SummarizeEta; summarize_eta: SummarizeEta;
summarize_by_machine: SummarizeByMachine;
trend_24h: TrendPoint[]; trend_24h: TrendPoint[];
stages: QueueStageRow[]; stages: QueueStageRow[];
totals: QueueTotals; totals: QueueTotals;
+11 -12
View File
@@ -62,7 +62,7 @@ export function formatAgeSec(sec: number): string {
* / 1 (: 맥미니 ). * / 1 (: 맥미니 ).
*/ */
export type FlowMachine = 'gpu' | 'macmini' | 'macbook'; export type FlowMachine = 'nas' | 'macmini';
export interface FlowNodeDef { export interface FlowNodeDef {
key: string; key: string;
@@ -79,26 +79,25 @@ export interface FlowNodeDef {
/** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */ /** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */
export const FLOW_NODES: FlowNodeDef[] = [ export const FLOW_NODES: FlowNodeDef[] = [
{ key: 'extract', label: '추출', stages: ['extract'], machine: 'gpu', engine: 'Surya OCR', sub: 'ocr-service' }, { key: 'extract', label: '추출', stages: ['extract'], machine: 'nas', engine: 'kordoc', sub: 'kordoc' },
{ key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'gpu', engine: 'Marker', sub: 'marker-service' }, { key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'nas', engine: 'Marker', sub: 'marker-service' },
{ key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' }, { key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' },
{ key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' }, { key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' },
{ key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'gpu', engine: 'TEI bge-m3', sub: 'text-embeddings-inference' }, { key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'nas', engine: 'bge-m3 (맥미니 콜)', sub: 'embed worker' },
{ key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macbook', engine: 'Qwen3.6-27B', sub: 'deep_summary' }, { key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'deep_summary' },
]; ];
/** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */ /** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */
export const AUX_NODES: FlowNodeDef[] = [ export const AUX_NODES: FlowNodeDef[] = [
{ key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'gpu', engine: 'Playwright', sub: 'playwright-fetcher' }, { key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'nas', engine: 'Playwright', sub: 'playwright-fetcher' },
{ key: 'stt', label: '전사', stages: ['stt'], machine: 'gpu', engine: 'Whisper', sub: 'stt-service' }, { key: 'stt', label: '전사', stages: ['stt'], machine: 'nas', engine: 'Whisper', sub: 'stt-service' },
{ key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'gpu', engine: '유틸', sub: 'ffmpeg' }, { key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'nas', engine: '유틸', sub: 'ffmpeg' },
]; ];
/** 머신 스트립 메타 — 모델 표기 단일 지점 */ /** 머신 스트립 메타 — 모델 표기 단일 지점 (2026-07-02 컷오버: 나스+맥미니 2노드) */
export const MACHINE_META: Record<FlowMachine, { label: string; model: string }> = { export const MACHINE_META: Record<FlowMachine, { label: string; model: string }> = {
gpu: { label: 'GPU 서버', model: '특화 엔진' }, nas: { label: '나스', model: 'DS 본체 Docker · 특화 엔진' },
macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · 24/7' }, macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · bge-m3 · 24/7' },
macbook: { label: '맥북 M5 Max', model: 'Qwen3.6-27B · 야간 drain' },
}; };
/** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */ /** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */
+3 -3
View File
@@ -72,7 +72,7 @@
// 처리 현황 스트립 (안6 라이트) — 60s 폴링 store 공유. fetch 실패/401 시 // 처리 현황 스트립 (안6 라이트) — 60s 폴링 store 공유. fetch 실패/401 시
// store 가 null → 스트립 자체를 숨김 (silent 비차단, 로그인 페이지 동일). // store 가 null → 스트립 자체를 숨김 (silent 비차단, 로그인 페이지 동일).
let queue = $derived($queueOverview); let queue = $derived($queueOverview);
let queueMacbook = $derived(queue?.machines?.find((m) => m.key === 'macbook') ?? null); let queueMacmini = $derived(queue?.machines?.find((m) => m.key === 'macmini') ?? null);
function toggleQueueDrawer() { function toggleQueueDrawer() {
if (ui.isDrawerOpen('queue')) ui.closeDrawer(); if (ui.isDrawerOpen('queue')) ui.closeDrawer();
else ui.openDrawer('queue'); else ui.openDrawer('queue');
@@ -189,8 +189,8 @@
</span> </span>
<span class="tabular-nums shrink-0">대기 <strong class="text-text">{queue.totals.pending.toLocaleString()}</strong></span> <span class="tabular-nums shrink-0">대기 <strong class="text-text">{queue.totals.pending.toLocaleString()}</strong></span>
<span class="tabular-nums shrink-0 {queue.totals.failed > 0 ? 'text-error font-semibold' : ''}">실패 <strong class={queue.totals.failed > 0 ? '' : 'text-text'}>{queue.totals.failed.toLocaleString()}</strong></span> <span class="tabular-nums shrink-0 {queue.totals.failed > 0 ? 'text-error font-semibold' : ''}">실패 <strong class={queue.totals.failed > 0 ? '' : 'text-text'}>{queue.totals.failed.toLocaleString()}</strong></span>
{#if queueMacbook} {#if queueMacmini}
<span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(queueMacbook.state)}"> {MACHINE_STATE_LABEL[queueMacbook.state]}</span> <span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(queueMacmini.state)}">미니 {MACHINE_STATE_LABEL[queueMacmini.state]}</span>
{/if} {/if}
<span class="ml-auto flex items-center gap-0.5 text-faint shrink-0">자세히 <ChevronDown size={11} /></span> <span class="ml-auto flex items-center gap-0.5 text-faint shrink-0">자세히 <ChevronDown size={11} /></span>
</button> </button>
+123 -9
View File
@@ -1,15 +1,20 @@
<script> <script>
// /study — 학습 hub + 데일리 랜딩('오늘의 공부' 대시보드). // /study — 학습 hub + 데일리 랜딩. 최상단 = '오늘의 몫'(유한한 데일리 단위 + 스트릭/잔디,
// 상단 = 이론 홈(진도·오늘의 개념·복습 due, 재노출 트리거). 하단 = 기존 모드 진입. // 습관 루프 2026-07-03). 진도·복습 백로그는 접이식으로 강등(열자마자 due 산더미 금지, D2).
import { onMount } from 'svelte'; import { onMount } from 'svelte';
import { goto } from '$app/navigation';
import { api } from '$lib/api'; import { api } from '$lib/api';
import { addToast } from '$lib/stores/toast'; import { addToast } from '$lib/stores/toast';
import { BookOpen, PenLine, GraduationCap, FolderKanban, Layers, Repeat, Flag, Inbox, Activity, CalendarCheck, Target } from 'lucide-svelte'; import { BookOpen, PenLine, GraduationCap, FolderKanban, Layers, Repeat, Flag, Inbox, Activity, CalendarCheck, Target, Flame, Play, CheckCircle2 } from 'lucide-svelte';
let cardReviewCount = $state(0); let cardReviewCount = $state(0);
let questionFlagCount = $state(0); let questionFlagCount = $state(0);
// 오늘의 공부 (이론 홈) // 오늘의 몫 (데일리 단위)
let daily = $state(null);
let dailyLoading = $state(true);
// 이론 홈 (백로그 접이식 내부)
let curriculum = $state(null); let curriculum = $state(null);
let todayConcepts = $state([]); let todayConcepts = $state([]);
let weakConcepts = $state([]); // 약점 개념(관련 기출 정답률 낮음) let weakConcepts = $state([]); // 약점 개념(관련 기출 정답률 낮음)
@@ -19,6 +24,46 @@
curriculum && curriculum.total ? Math.round((curriculum.read / curriculum.total) * 100) : 0 curriculum && curriculum.total ? Math.round((curriculum.read / curriculum.total) * 100) : 0
); );
// 오늘의 몫 4항목 (target 0 = 재료 없음 → 행 숨김)
const DAILY_ITEMS = [
{ key: 'questions', label: '문제 풀이', icon: GraduationCap },
{ key: 'cards', label: '카드 복습', icon: Repeat },
{ key: 'concepts', label: '개념 읽기', icon: BookOpen },
{ key: 'reviews', label: '카드 검수', icon: Layers },
];
function grassClass(n) {
if (n <= 0) return 'bg-bg border border-default/50';
if (n <= 2) return 'bg-accent/30';
if (n <= 9) return 'bg-accent/60';
return 'bg-accent';
}
async function loadDaily() {
dailyLoading = true;
try {
daily = await api('/study/daily');
} catch {
// 실패해도 허브 나머지는 동작 (조용히)
} finally {
dailyLoading = false;
}
}
/** 오늘의 몫 문제 5 — learning stage 5문항 세션 시작(진행 중 세션 있으면 이어풀기). */
async function startDailyQuiz() {
const topicId = daily?.topic_id ?? 4;
try {
const res = await api(`/study-topics/${topicId}/quiz-sessions`, {
method: 'POST',
body: JSON.stringify({ stage: 'learning', size: 5, quiz_mode: 'random' }),
});
goto(`/study/topics/${topicId}/review?session=${res.id}`);
} catch (err) {
addToast('error', err?.detail || '문제풀이 시작 실패');
}
}
async function loadDashboard() { async function loadDashboard() {
dashLoading = true; dashLoading = true;
try { try {
@@ -46,12 +91,14 @@
todayConcepts = todayConcepts.filter((c) => c.doc_id !== doc.doc_id); todayConcepts = todayConcepts.filter((c) => c.doc_id !== doc.doc_id);
addToast('success', `회독: ${doc.title}`); addToast('success', `회독: ${doc.title}`);
loadDashboard(); // 진도 갱신 loadDashboard(); // 진도 갱신
loadDaily(); // 오늘의 몫 갱신
} catch { } catch {
addToast('error', '회독 처리 실패'); addToast('error', '회독 처리 실패');
} }
} }
onMount(async () => { onMount(async () => {
loadDaily();
loadDashboard(); loadDashboard();
try { try {
const r = await api('/study-cards/needs-review/count'); const r = await api('/study-cards/needs-review/count');
@@ -72,16 +119,82 @@
<p class="text-sm text-dim mt-1">주제별 퀴즈·복습(SRS)·통계 / 학습 자료 회독 / 손글씨 필사 세션.</p> <p class="text-sm text-dim mt-1">주제별 퀴즈·복습(SRS)·통계 / 학습 자료 회독 / 손글씨 필사 세션.</p>
</header> </header>
<!-- 오늘의 공부 (이론 홈 대시보드 = 데일리 트리거) --> <!-- 오늘의 몫 — 유한한 데일리 단위 + 스트릭/잔디 (첫 화면은 이것과 완료 상태만) -->
<section class="mb-5 rounded-lg border border-default bg-surface p-4 md:p-5"> <section class="mb-5 rounded-lg border border-default bg-surface p-4 md:p-5">
<div class="flex items-center gap-2 mb-3"> <div class="flex items-center gap-2 mb-3">
<CalendarCheck size={18} class="text-accent" /> <CalendarCheck size={18} class="text-accent" />
<h2 class="text-base font-semibold text-text">오늘의 공부</h2> <h2 class="text-base font-semibold text-text">오늘의 </h2>
{#if curriculum} {#if daily}
<span class="ml-auto text-xs text-dim">이론 회독 <span class="text-text font-medium">{curriculum.read}</span> / {curriculum.total} ({readPct}%)</span> <span class="ml-auto flex items-center gap-1.5 text-xs {daily.streak > 0 ? 'text-accent' : 'text-dim'}">
<Flame size={14} />
{#if daily.streak > 0}{daily.streak}일 연속{:else}오늘부터 시작{/if}
</span>
{/if} {/if}
</div> </div>
{#if dailyLoading}
<p class="text-xs text-dim">불러오는 중…</p>
{:else if daily}
{#if daily.complete}
<div class="mb-3 flex items-center gap-2 rounded border border-accent/40 bg-accent/10 px-3 py-2.5 text-sm text-accent">
<CheckCircle2 size={16} /> 오늘 몫 끝. 내일 또 만나요.
</div>
{/if}
<ul class="space-y-1.5 mb-4">
{#each DAILY_ITEMS as item (item.key)}
{@const it = daily.items[item.key]}
{#if it && it.target > 0}
<li class="flex items-center gap-2.5 rounded border px-3 py-2 {it.complete ? 'border-accent/40 bg-accent/5' : 'border-default'}">
<item.icon size={15} class={it.complete ? 'text-accent' : 'text-dim'} />
<span class="text-sm {it.complete ? 'text-dim line-through' : 'text-text'}">{item.label}</span>
<span class="text-xs {it.complete ? 'text-accent' : 'text-dim'} font-medium">{Math.min(it.done, it.target)}/{it.target}</span>
<span class="ml-auto"></span>
{#if it.complete}
<CheckCircle2 size={15} class="text-accent shrink-0" />
{:else if item.key === 'questions'}
<button
type="button"
onclick={startDailyQuiz}
class="shrink-0 flex items-center gap-1 rounded bg-accent px-2.5 py-1 text-xs font-medium text-white hover:opacity-90 transition-opacity"
><Play size={11} /> 풀기</button>
{:else if item.key === 'cards'}
<a href="/study/cards-study" class="shrink-0 rounded border border-default px-2.5 py-1 text-xs text-dim hover:border-accent hover:text-accent transition-colors">복습</a>
{:else if item.key === 'concepts'}
<a href={daily.next_concept ? `/study/read/${daily.next_concept.doc_id}` : '/study/sources'}
class="shrink-0 rounded border border-default px-2.5 py-1 text-xs text-dim hover:border-accent hover:text-accent transition-colors">읽기</a>
{:else}
<a href="/study/cards-review" class="shrink-0 rounded border border-default px-2.5 py-1 text-xs text-dim hover:border-accent hover:text-accent transition-colors">검수</a>
{/if}
</li>
{/if}
{/each}
</ul>
<!-- 잔디 (최근 12주, KST 일 단위 활동량) -->
{#if daily.grass?.length}
<div class="overflow-x-auto">
<div class="grid grid-rows-7 grid-flow-col gap-[3px] w-max">
{#each daily.grass as g (g.d)}
<span class="w-2.5 h-2.5 rounded-[2px] {grassClass(g.n)}" title="{g.d}: {g.n}"></span>
{/each}
</div>
</div>
{/if}
{:else}
<p class="text-xs text-dim">오늘의 몫을 불러오지 못했습니다.</p>
{/if}
</section>
<!-- 진도·복습 백로그 (강등 — 첫 화면에서 due 산더미 노출 금지) -->
<details class="mb-5 rounded-lg border border-default bg-surface">
<summary class="cursor-pointer select-none px-4 py-3 text-sm text-dim hover:text-text transition-colors">
진도 · 복습 백로그
{#if curriculum}
<span class="text-xs text-faint">(이론 회독 {readPct}% · 문항 복습 {curriculum.question_due} · 개념 재복습 {curriculum.concept_due})</span>
{/if}
</summary>
<div class="px-4 pb-4 md:px-5 md:pb-5">
{#if dashLoading} {#if dashLoading}
<p class="text-xs text-dim">불러오는 중…</p> <p class="text-xs text-dim">불러오는 중…</p>
{:else} {:else}
@@ -144,7 +257,8 @@
</div> </div>
{/if} {/if}
{/if} {/if}
</section> </div>
</details>
<a <a
href="/study/topics" href="/study/topics"
@@ -0,0 +1,3 @@
-- 383: 카드 검수 처리 시각 (습관 루프 '오늘의 몫' 검수 N장/일 집계용).
-- 승인(needs_review false 전환)·수정확정·soft-delete 시 박힘. NULL = 미처리.
ALTER TABLE study_memo_cards ADD COLUMN IF NOT EXISTS reviewed_at TIMESTAMPTZ
+456
View File
@@ -0,0 +1,456 @@
"""presegment PR3 — HOLD 거대문서 유인 분할 CLI (plan ds-presegment-mapreduce-2).
deep_summary 워커가 HOLD(payload.presegment.awaiting_split=true) 보류한
hybrid/whole tier 거대문서를, 사람이(유인 클로드 세션) 경계를 완성해 재개시키는 도구.
사용법 (fastapi 컨테이너 안에서 실행):
docker compose exec fastapi python /app/scripts/presegment_attended.py list
docker compose exec fastapi python /app/scripts/presegment_attended.py export --doc 44443 --out /app/logs/preseg_44443
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json --dry-run
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json
워크플로우:
1. list awaiting_split 문서 확인.
2. export 문서 통계·hier 개요·자동 pack 유닛 제안·초과 섹션 본문 덤프·boundaries
템플릿 JSON 생성. 유인 클로드 세션은 파일들만 읽고 TODO 스팬을
CAP 이하 경계들로 분할해 템플릿을 완성한다.
3. apply 완성된 boundaries 검증(단조·비중첩·범위·커버리지 90%+·유닛 )
payload.presegment.units_override 기록 + awaiting_split 해제 +
deferred_until 제거(즉시 재개). 워커가 다음 사이클에 map-reduce 재개.
stdout 규약: 사람이 읽는 요약 + '{' 시작하는 기계 파싱용 JSON 라인(1 1라인).
사람용 행은 절대 '{' 시작하지 않는다.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "app"))
from sqlalchemy import text as sql_text # noqa: E402
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine # noqa: E402
from services.hier_decomp.builder import build_hier_tree # noqa: E402
from services.summarize_units import ( # noqa: E402
CAP_TOKENS,
OVERRIDE_MIN_COVERAGE_PCT,
TRIGGER_TOKENS,
choose_override_source,
estimate_tokens,
extract_leaves,
leaf_spans,
plan_summarize_units,
validate_override_boundaries,
)
# 초과 섹션 본문 덤프 분할 단위 (유인 세션 컨텍스트 보호)
DUMP_CHUNK_CHARS = 200_000
def _jsonl(obj: dict) -> None:
"""기계 파싱용 JSON 라인 — 반드시 '{' 로 시작하는 단독 라인."""
print(json.dumps(obj, ensure_ascii=False, default=str))
def _session_factory():
database_url = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://pkm:pkm@postgres:5432/pkm",
)
engine = create_async_engine(database_url)
return engine, async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# ─── list ────────────────────────────────────────────────────────────────────
LIST_SQL = """
SELECT q.id AS queue_id, q.document_id, q.status, q.attempts,
q.payload::text AS payload_text,
LEFT(COALESCE(d.title, '(제목 없음)'), 80) AS title
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.stage = 'deep_summary'
AND q.status IN ('pending', 'processing', 'failed')
AND (q.payload -> 'presegment' ->> 'awaiting_split') = 'true'
ORDER BY q.id
"""
async def cmd_list() -> int:
engine, factory = _session_factory()
try:
async with factory() as session:
rows = (await session.execute(sql_text(LIST_SQL))).mappings().all()
finally:
await engine.dispose()
print(f"awaiting_split 보류 문서 {len(rows)}")
for r in rows:
payload = json.loads(r["payload_text"] or "{}")
preseg = payload.get("presegment") or {}
oversized = preseg.get("oversized_sections") or []
print(
f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']} "
f"tier={preseg.get('tier')} over%={preseg.get('over_pct')} "
f"tokens={preseg.get('total_est_tokens'):,} units={preseg.get('units')}"
if isinstance(preseg.get("total_est_tokens"), int)
else f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']}"
)
print(f" 초과 섹션 {len(oversized)}건: {', '.join(str(t) for t in oversized[:3] if t)}")
print(
f" 보류 알람={preseg.get('alerted_at') or '-'} / "
f"재개 예정={payload.get('deferred_until') or '(즉시)'}"
+ (f" / 거부 사유={preseg.get('override_rejected')}" if preseg.get("override_rejected") else "")
)
_jsonl({
"cmd": "list",
"queue_id": r["queue_id"],
"doc_id": r["document_id"],
"title": r["title"],
"status": r["status"],
"tier": preseg.get("tier"),
"over_pct": preseg.get("over_pct"),
"total_est_tokens": preseg.get("total_est_tokens"),
"units": preseg.get("units"),
"oversized_sections": oversized,
"alerted_at": preseg.get("alerted_at"),
"deferred_until": payload.get("deferred_until"),
"override_rejected": preseg.get("override_rejected"),
})
return 0
# ─── export ──────────────────────────────────────────────────────────────────
def _safe_name(title: str | None, fallback: str) -> str:
t = re.sub(r"[^0-9A-Za-z가-힣._-]+", "_", (title or fallback)).strip("_")
return (t or fallback)[:60]
def _build_outline(text: str) -> str:
"""hier_decomp builder 재사용 — window-split 억제(요약 계획과 동일 환경) 개요."""
nodes = build_hier_tree(text, leaf_target_max=sys.maxsize, leaf_hard_max=sys.maxsize)
lines = []
for n in nodes:
indent = " " * max(n.level, 0)
title = n.section_title or "(preamble)"
tok = estimate_tokens(n.text)
mark = " [CAP 초과]" if n.is_leaf and tok > CAP_TOKENS else ""
lines.append(f"{indent}- L{n.level} {title}{tok:,} tok{mark}")
return "\n".join(lines)
async def cmd_export(doc_id: int, out_dir: str) -> int:
engine, factory = _session_factory()
try:
async with factory() as session:
row = (await session.execute(
sql_text(
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
),
{"d": doc_id},
)).mappings().first()
finally:
await engine.dispose()
if not row:
print(f"[error] 문서 id={doc_id} 없음")
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
return 1
source, text = choose_override_source(row["md_content"], row["extracted_text"])
if not text.strip():
print(f"[error] 문서 id={doc_id} 본문 비어있음 (md_content/extracted_text 둘 다)")
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "empty_text"})
return 1
plan = plan_summarize_units(text)
leaves = extract_leaves(text)
spans = leaf_spans(text, leaves)
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
files: list[str] = []
now_iso = datetime.now(timezone.utc).isoformat(timespec="seconds")
# ① 통계 + hier 개요
oversized_units = [u for u in plan.units if u.over_cap]
overview = [
f"# presegment export — doc {doc_id}",
"",
f"- 제목: {row['title'] or '(제목 없음)'}",
f"- source: {source} (len={len(text):,}자, 오프셋 기준 텍스트)",
f"- 추정 토큰: {plan.total_est_tokens:,} (trigger={TRIGGER_TOKENS:,} / cap={CAP_TOKENS:,})",
f"- plan: mode={plan.mode} tier={plan.tier} over%={plan.over_pct}",
f"- 유닛: 자동 pack {len(plan.units) - len(oversized_units)}개 + CAP 초과 {len(oversized_units)}",
f"- 생성: {now_iso}",
"",
"## 유닛 제안 (summarize_units greedy-pack)",
"",
]
for u in plan.units:
if not u.leaf_indexes:
continue
s = spans[u.leaf_indexes[0]][0]
e = spans[u.leaf_indexes[-1]][1]
titles = " · ".join(t for t in u.section_titles if t) or "(무제 구간)"
flag = " ★CAP 초과 — 분할 필요" if u.over_cap else ""
overview.append(f"- 유닛 {u.index}: [{s}, {e}) {u.est_tokens:,} tok — {titles[:120]}{flag}")
overview += ["", "## hier 개요", "", _build_outline(text), ""]
(out / "overview.md").write_text("\n".join(overview), encoding="utf-8")
files.append("overview.md")
# ③ 초과 섹션 본문 덤프 (섹션당 파일 · 200K자 단위 분할 · 파일명에 절대 스팬)
boundaries: list[dict] = []
for u in plan.units:
if not u.leaf_indexes:
continue
s = spans[u.leaf_indexes[0]][0]
e = spans[u.leaf_indexes[-1]][1]
title = next((t for t in u.section_titles if t), None)
if not u.over_cap:
# ④ 자동 pack 유닛은 템플릿에 채워둔다
boundaries.append({"start": s, "end": e, "title": title or f"유닛 {u.index}"})
continue
boundaries.append({
"start": s, "end": e, "title": title or f"유닛 {u.index}",
"todo": (
f"CAP 초과({u.est_tokens:,} tok > {CAP_TOKENS:,}) — 이 스팬을 cap 이하 "
"경계 여러 개로 교체하고 todo 키를 제거할 것"
),
})
seg = text[s:e]
base = f"oversized_{u.index:03d}_{_safe_name(title, f'unit{u.index}')}"
for k in range(0, len(seg), DUMP_CHUNK_CHARS):
cs, ce = s + k, s + min(k + DUMP_CHUNK_CHARS, len(seg))
fname = f"{base}.{cs}_{ce}.md"
# 본문 원문 그대로 (헤더 미부착 — 파일 내 로컬 오프셋 + 파일명 cs 로 절대 오프셋 계산)
(out / fname).write_text(text[cs:ce], encoding="utf-8")
files.append(fname)
# ④ boundaries 템플릿 JSON
template = {
"doc_id": doc_id,
"source": source,
"source_len": len(text),
"cap_tokens": CAP_TOKENS,
"generated_at": now_iso,
"boundaries": boundaries,
}
(out / "boundaries_template.json").write_text(
json.dumps(template, ensure_ascii=False, indent=2), encoding="utf-8"
)
files.append("boundaries_template.json")
# 유인 세션용 작업 안내
readme = f"""# doc {doc_id} 유인 분할 안내
1. overview.md 구조 파악 (유닛 제안 + hier 개요).
2. oversized_*.md 본문을 읽고 의미 경계를 정한다.
- 파일명 `..._<cs>_<ce>.md` cs = 파일 문자의 절대 오프셋.
- 절대 오프셋 = cs + 파일 로컬 오프셋.
3. boundaries_template.json todo 항목을 cap({CAP_TOKENS:,} tok) 이하 경계 여러 개로
교체하고 todo 키를 제거한다. 나머지 자동 pack 항목은 그대로 둬도 된다.
- 토큰 추정: 한글 0.529 tok/ · 기타 0.217 tok/ (services/summarize_units.py).
- 규칙: start 단조증가 · 비중첩 · 전체 커버리지 {OVERRIDE_MIN_COVERAGE_PCT:.0f}%+ · 유닛당 cap 이하.
4. 검증/적용:
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE> --dry-run
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE>
"""
(out / "README.md").write_text(readme, encoding="utf-8")
files.append("README.md")
print(f"doc {doc_id} [{row['title'] or '(제목 없음)'}] export 완료 → {out}")
print(
f" source={source} len={len(text):,}자 tokens={plan.total_est_tokens:,} "
f"tier={plan.tier} over%={plan.over_pct}"
)
print(f" 자동 pack 유닛 {len(plan.units) - len(oversized_units)}개 / TODO(초과) {len(oversized_units)}")
print(f" 파일 {len(files)}개: {', '.join(files[:6])}{' ...' if len(files) > 6 else ''}")
_jsonl({
"cmd": "export", "ok": True, "doc_id": doc_id, "out": str(out),
"source": source, "source_len": len(text),
"total_est_tokens": plan.total_est_tokens, "tier": plan.tier,
"over_pct": plan.over_pct,
"units_auto": len(plan.units) - len(oversized_units),
"units_todo": len(oversized_units),
"files": files,
})
return 0
# ─── apply ───────────────────────────────────────────────────────────────────
QUEUE_ROW_SQL = """
SELECT id, status, attempts, payload::text AS payload_text
FROM processing_queue
WHERE document_id = :d AND stage = 'deep_summary'
AND status IN ('pending', 'processing', 'failed')
ORDER BY id DESC
LIMIT 1
"""
APPLY_UPDATE_SQL = """
UPDATE processing_queue
SET payload = CAST(:payload AS JSONB),
status = 'pending',
attempts = 0,
error_message = NULL
WHERE id = :qid
"""
async def cmd_apply(doc_id: int, boundaries_file: str, dry_run: bool) -> int:
raw = json.loads(Path(boundaries_file).read_text(encoding="utf-8"))
if isinstance(raw, dict):
boundaries = raw.get("boundaries") or []
declared_source = raw.get("source")
declared_len = raw.get("source_len")
if raw.get("doc_id") not in (None, doc_id):
print(f"[error] boundaries 파일 doc_id={raw.get('doc_id')} != --doc {doc_id}")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "doc_id_mismatch"})
return 1
else:
boundaries, declared_source, declared_len = raw, None, None
engine, factory = _session_factory()
try:
async with factory() as session:
doc = (await session.execute(
sql_text(
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
),
{"d": doc_id},
)).mappings().first()
if not doc:
print(f"[error] 문서 id={doc_id} 없음")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
return 1
qrow = (await session.execute(sql_text(QUEUE_ROW_SQL), {"d": doc_id})).mappings().first()
if not qrow:
print(f"[error] doc {doc_id} 의 활성 deep_summary 큐 행 없음 (pending/processing/failed)")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_row_not_found"})
return 1
if qrow["status"] == "processing":
print(f"[error] queue {qrow['id']} 가 processing 중 — 워커 완료/보류 후 재시도")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_processing"})
return 1
# 오프셋 기준 텍스트 — export 와 동일 규칙 (파일에 source 선언 시 그 선언 우선)
if declared_source in ("md_content", "extracted_text"):
source = declared_source
text = (doc["md_content"] if source == "md_content" else doc["extracted_text"]) or ""
else:
source, text = choose_override_source(doc["md_content"], doc["extracted_text"])
if declared_len is not None and declared_len != len(text):
print(
f"[error] source_len 불일치 — 파일={declared_len:,} vs 현재 {source}={len(text):,}"
" (본문 재생성됨 — export 부터 재실행)"
)
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "source_len_mismatch"})
return 1
check = validate_override_boundaries(text, boundaries)
for w in check.warnings:
print(f" [warn] {w}")
if not check.ok:
print(f"[error] 경계 검증 실패 — {len(check.errors)}건:")
for e in check.errors:
print(f" - {e}")
_jsonl({
"cmd": "apply", "ok": False, "doc_id": doc_id,
"error": "validation_failed", "errors": check.errors,
"warnings": check.warnings, "coverage_pct": check.coverage_pct,
})
return 1
print(
f"doc {doc_id} [{doc['title'] or '(제목 없음)'}] 경계 검증 통과 — "
f"유닛 {len(check.boundaries)}개 / 커버리지 {check.coverage_pct}% / "
f"최대 유닛 {max(check.unit_tokens):,} tok (cap {CAP_TOKENS:,})"
)
for i, ((s, e, t), tok) in enumerate(zip(check.boundaries, check.unit_tokens)):
print(f" 유닛 {i}: [{s}, {e}) {tok:,} tok — {t or '(무제)'}")
payload = json.loads(qrow["payload_text"] or "{}")
preseg = dict(payload.get("presegment") or {})
preseg["units_override"] = {
"source": source,
"source_len": len(text),
"boundaries": [[s, e, t] for s, e, t in check.boundaries],
"applied_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
}
preseg["awaiting_split"] = False
# 알람 dedupe 리셋(다음 이벤트는 신선하게 발화) + 이전 거부/맵 잔재 제거
for k in ("alerted_at", "override_rejected", "override_rejected_at", "map_results"):
preseg.pop(k, None)
payload["presegment"] = preseg
payload.pop("deferred_until", None) # 즉시 재개
if dry_run:
print(f" [dry-run] queue {qrow['id']} 미변경 — 위 경계로 적용 가능")
_jsonl({
"cmd": "apply", "ok": True, "dry_run": True, "doc_id": doc_id,
"queue_id": qrow["id"], "units": len(check.boundaries),
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
})
return 0
await session.execute(
sql_text(APPLY_UPDATE_SQL),
{"payload": json.dumps(payload, ensure_ascii=False), "qid": qrow["id"]},
)
await session.commit()
print(
f" 적용 완료 — queue {qrow['id']} status=pending, deferred_until 제거 "
f"(다음 queue_consumer 사이클에 재개)"
)
_jsonl({
"cmd": "apply", "ok": True, "dry_run": False, "doc_id": doc_id,
"queue_id": qrow["id"], "units": len(check.boundaries),
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
})
return 0
finally:
await engine.dispose()
# ─── main ────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(description="presegment 유인 분할 CLI (PR3)")
sub = parser.add_subparsers(dest="cmd", required=True)
sub.add_parser("list", help="awaiting_split 보류 문서 목록")
p_export = sub.add_parser("export", help="유인 분할 작업 패키지 덤프")
p_export.add_argument("--doc", type=int, required=True)
p_export.add_argument("--out", default=None, help="출력 디렉토리 (기본 ./preseg_export_<doc>)")
p_apply = sub.add_parser("apply", help="완성된 boundaries 검증·적용(재개)")
p_apply.add_argument("--doc", type=int, required=True)
p_apply.add_argument("--boundaries", required=True, help="boundaries JSON 파일 경로")
p_apply.add_argument("--dry-run", action="store_true", help="검증만 하고 DB 미변경")
args = parser.parse_args()
if args.cmd == "list":
return asyncio.run(cmd_list())
if args.cmd == "export":
out = args.out or f"./preseg_export_{args.doc}"
return asyncio.run(cmd_export(args.doc, out))
if args.cmd == "apply":
return asyncio.run(cmd_apply(args.doc, args.boundaries, args.dry_run))
return 2
if __name__ == "__main__":
sys.exit(main())
+486
View File
@@ -0,0 +1,486 @@
"""presegment PR3 — HOLD 알람·units_override 재개·유인 분할 경계 검증 단위테스트.
test_deep_summary_mapreduce.py (PR2) 선례를 따르는 seam 단위 검증:
- services.alerts.send_alert: env 미설정 no-op(프로세스당 1 로그)·synochat/ntfy
포맷·실패 절대 raise 금지 (webhook 전부 fake 실호출 0).
- _hold_awaiting_split: 알람 발화 + alerted_at dedupe(7).
- validate_override_boundaries: 정상/중첩/캡초과/커버리지 부족/TODO 잔존/범위 .
- process(): units_override 존재 tier 판정(plan_summarize_units) 우회
map-reduce 재개 / 잘못된 override -HOLD + 알람 / override 없는 문서 무회귀.
"""
from __future__ import annotations
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import httpx # noqa: E402
import services.alerts as alerts # noqa: E402
from ai.envelope import EscalationEnvelope # noqa: E402
from models.queue import StageDeferred # noqa: E402
from services.summarize_units import ( # noqa: E402
CAP_TOKENS,
choose_override_source,
estimate_tokens,
extract_leaves,
leaf_spans,
plan_summarize_units,
units_from_boundaries,
validate_override_boundaries,
)
import workers.deep_summary_worker as dsw # noqa: E402
# ─── fixtures (PR2 테스트와 동일 계열) ───────────────────────────────────────
# 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole (HOLD 대상)
GIANT_WHOLE_MD = "# 통짜\n" + ("" * 60_000)
# trigger(25K tok) 이하 소형 문서 — 기존 단일콜 경로 (무회귀 확인용)
SMALL_MD = "# 소형\n" + ("" * 2_000)
MAP_JSON = (
'{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",'
' "inconsistencies": [], "confidence": 0.9}'
)
REDUCE_JSON = (
'{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",'
' "inconsistencies": [], "confidence": 0.8}'
)
class FakeSession:
def __init__(self, row=None):
self.commits = 0
self._row = row
async def commit(self):
self.commits += 1
class FakeProcSession(FakeSession):
"""process() 레벨 — session.get(Document) + execute(select queue_row) fake."""
def __init__(self, doc, row):
super().__init__(row)
self._doc = doc
async def get(self, model, pk):
return self._doc
async def execute(self, stmt):
row = self._row
return SimpleNamespace(scalar_one_or_none=lambda: row)
class FakeClient:
"""deep 슬롯 보유 — call_deep_or_defer 가 call_deep 을 탄다 (PR2 테스트 동일)."""
def __init__(self):
self.ai = SimpleNamespace(
deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000)
)
self.prompts: list[str] = []
async def call_deep(self, prompt: str, system=None) -> str:
self.prompts.append(prompt)
if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커
return REDUCE_JSON
return MAP_JSON
async def close(self):
pass
def _doc(text: str = GIANT_WHOLE_MD, md_content: str | None = None):
return SimpleNamespace(
id=999,
title="테스트 문서",
extracted_text=text,
md_content=md_content,
ai_detail_summary=None,
ai_inconsistencies=None,
ai_analysis_tier="triage",
ai_processed_at=None,
)
def _envelope_raw():
return {
"from_stage": "classify",
"escalation_reasons": ["long_context"],
"risk_flags": [],
"distilled_context": "4B 요지",
"original_pointers": {"doc_ids": [999]},
}
def _envelope():
return EscalationEnvelope.from_json(json.dumps(_envelope_raw()))
@pytest.fixture
def _patch_telemetry(monkeypatch):
events: list[dict] = []
async def fake_record(**kwargs):
events.append(kwargs)
monkeypatch.setattr(dsw, "record_analyze_event", fake_record)
return events
@pytest.fixture
def _alert_recorder(monkeypatch):
calls: list[tuple[str, str]] = []
async def fake_alert(title: str, message: str) -> bool:
calls.append((title, message))
return True
monkeypatch.setattr(dsw, "send_alert", fake_alert)
return calls
# ─── A. services.alerts ──────────────────────────────────────────────────────
class _FakeHttpxClient:
"""httpx.AsyncClient 대체 — post 호출 캡처. 실호출 0."""
captured: list[dict] = []
fail_with: Exception | None = None
status_code = 200
def __init__(self, timeout=None):
self.timeout = timeout
async def __aenter__(self):
return self
async def __aexit__(self, *exc):
return False
async def post(self, url, **kwargs):
if _FakeHttpxClient.fail_with is not None:
raise _FakeHttpxClient.fail_with
_FakeHttpxClient.captured.append({"url": url, **kwargs})
return SimpleNamespace(status_code=_FakeHttpxClient.status_code, text="ok")
@pytest.fixture
def _fake_httpx(monkeypatch):
_FakeHttpxClient.captured = []
_FakeHttpxClient.fail_with = None
_FakeHttpxClient.status_code = 200
monkeypatch.setattr(alerts.httpx, "AsyncClient", _FakeHttpxClient)
return _FakeHttpxClient
@pytest.mark.asyncio
async def test_send_alert_noop_without_env(monkeypatch, _fake_httpx):
monkeypatch.delenv("ALERT_WEBHOOK_URL", raising=False)
monkeypatch.setattr(alerts, "_noop_logged", False)
infos: list[str] = []
monkeypatch.setattr(alerts.logger, "info", lambda msg, *a, **k: infos.append(msg))
assert await alerts.send_alert("t", "m") is False
assert await alerts.send_alert("t", "m") is False
assert len(infos) == 1 # 프로세스당 1회만 로그
assert _fake_httpx.captured == [] # webhook 미호출
@pytest.mark.asyncio
async def test_send_alert_synochat_format(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
monkeypatch.delenv("ALERT_WEBHOOK_KIND", raising=False) # 기본 = synochat
assert await alerts.send_alert("제목", "본문 줄1\n줄2") is True
assert len(_fake_httpx.captured) == 1
call = _fake_httpx.captured[0]
assert call["url"] == "http://chat.local/webhook"
payload = json.loads(call["data"]["payload"])
assert payload == {"text": "제목\n본문 줄1\n줄2"}
@pytest.mark.asyncio
async def test_send_alert_ntfy_format(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://ntfy.local/ds")
monkeypatch.setenv("ALERT_WEBHOOK_KIND", "ntfy")
assert await alerts.send_alert("한글 제목", "알람 본문") is True
call = _fake_httpx.captured[0]
# 한글 제목은 헤더(latin-1 한정) 대신 query param
assert call["params"] == {"title": "한글 제목"}
assert call["content"] == "알람 본문".encode("utf-8")
@pytest.mark.asyncio
async def test_send_alert_failure_never_raises(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
_fake_httpx.fail_with = httpx.ConnectError("down")
assert await alerts.send_alert("t", "m") is False # raise 없이 False
_fake_httpx.fail_with = None
_fake_httpx.status_code = 500
assert await alerts.send_alert("t", "m") is False # HTTP 5xx 도 False
# ─── B. HOLD 알람 + alerted_at dedupe ───────────────────────────────────────
@pytest.mark.asyncio
async def test_hold_alerts_once_then_dedupes(_alert_recorder):
plan = plan_summarize_units(GIANT_WHOLE_MD)
assert plan.tier == "whole"
row = SimpleNamespace(payload={"envelope": {"x": 1}})
session = FakeSession()
# 1차 HOLD — 알람 발화 + alerted_at 기록
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 1
title, message = _alert_recorder[0]
assert "999" in title
assert "테스트 문서" in message and "whole" in message
assert f"export --doc 999" in message # 유인 분할 힌트
alerted_at = row.payload["presegment"]["alerted_at"]
assert datetime.fromisoformat(alerted_at)
# 2차 재보류(24h 후 재계획 시나리오) — 7일 이내 → 재알람 억제
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 1
assert row.payload["presegment"]["alerted_at"] == alerted_at # 미갱신
# 3차 — alerted_at 이 8일 전이면 재발화 + 갱신
stale = (datetime.now(timezone.utc) - timedelta(days=8)).isoformat()
payload = dict(row.payload)
payload["presegment"] = {**payload["presegment"], "alerted_at": stale}
row.payload = payload
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 2
assert row.payload["presegment"]["alerted_at"] != stale
# ─── C. validate_override_boundaries ────────────────────────────────────────
def test_validate_ok_full_coverage():
text = "" * 40_000 # ≈ 21,160 tok
bounds = [[0, 20_000, "전반"], [20_000, 40_000, "후반"]]
check = validate_override_boundaries(text, bounds)
assert check.ok and check.errors == []
assert check.coverage_pct == 100.0
assert check.boundaries == [(0, 20_000, "전반"), (20_000, 40_000, "후반")]
assert all(t <= CAP_TOKENS for t in check.unit_tokens)
def test_validate_dict_form_and_units_from_boundaries():
text = "" * 10_000
bounds = [{"start": 0, "end": 5_000, "title": "a"}, {"start": 5_000, "end": 10_000, "title": "b"}]
check = validate_override_boundaries(text, bounds)
assert check.ok
units = units_from_boundaries(text, check.boundaries)
assert [u.index for u in units] == [0, 1]
assert units[0].text == text[0:5_000]
assert units[0].section_titles == ["a"]
assert units[0].est_tokens == estimate_tokens(text[0:5_000])
def test_validate_rejects_overlap():
text = "" * 10_000
check = validate_override_boundaries(text, [[0, 6_000, "a"], [5_000, 10_000, "b"]])
assert not check.ok
assert any("중첩" in e for e in check.errors)
def test_validate_rejects_cap_exceed():
text = "" * 40_000 # 단일 유닛 ≈ 21,160 tok > CAP 12,000
check = validate_override_boundaries(text, [[0, 40_000, "통짜"]])
assert not check.ok
assert any("cap" in e and "유닛 0" in e for e in check.errors) # 어느 유닛인지 명시
def test_validate_rejects_low_coverage():
text = "" * 10_000
check = validate_override_boundaries(text, [[0, 1_000, "머리만"]])
assert not check.ok
assert any("커버리지" in e for e in check.errors)
def test_validate_rejects_out_of_range_and_todo():
text = "" * 1_000
check = validate_override_boundaries(text, [[0, 2_000, ""]])
assert not check.ok and any("범위 밖" in e for e in check.errors)
check2 = validate_override_boundaries(
text, [{"start": 0, "end": 1_000, "title": "t", "todo": "분할 필요"}]
)
assert not check2.ok and any("TODO" in e for e in check2.errors)
def test_validate_warns_on_gap_but_passes_coverage():
text = "" * 100_000
# 4% 공백 구간 — 커버리지 96% (>=90) 통과 + 경고
bounds = [[0, 20_000, "a"], [24_000, 100_000, None]]
# 24000~100000 = 76000자 ≈ 40,204 tok > CAP → cap 완화해 gap 경고만 검증
check = validate_override_boundaries(text, bounds, cap=50_000)
assert check.ok
assert any("공백 구간" in w for w in check.warnings)
def test_choose_override_source_prefers_md_content():
assert choose_override_source("# md 본문", "추출본") == ("md_content", "# md 본문")
assert choose_override_source(" \n", "추출본") == ("extracted_text", "추출본")
assert choose_override_source(None, None) == ("extracted_text", "")
def test_leaf_spans_reconstruct_source():
md = "# 절 1\n" + "" * 100 + "\n# 절 2\n" + "" * 100
leaves = extract_leaves(md)
spans = leaf_spans(md, leaves)
assert len(spans) == len(leaves)
for (s, e), leaf in zip(spans, leaves):
assert md[s:e] == leaf.text
# 인접 파티션 — 이어붙이면 원문 전체
assert spans[0][0] == 0 and spans[-1][1] == len(md)
# ─── D. units_override 재개 경로 (worker process 레벨) ──────────────────────
def _override_payload(text: str, n_units: int = 3, source: str = "extracted_text") -> dict:
step = len(text) // n_units
bounds = []
for i in range(n_units):
s = i * step
e = len(text) if i == n_units - 1 else (i + 1) * step
bounds.append([s, e, f"파트 {i + 1}"])
return {
"envelope": _envelope_raw(),
"subject_domain": "generic",
"presegment": {
"tier": "whole", "over_pct": 61.46, "awaiting_split": False,
"units_override": {
"source": source, "source_len": len(text), "boundaries": bounds,
},
},
}
@pytest.mark.asyncio
async def test_units_override_bypasses_tier_gate(monkeypatch, _patch_telemetry, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD) # override 없으면 whole → HOLD 였을 문서
row = SimpleNamespace(payload=_override_payload(GIANT_WHOLE_MD, n_units=3))
session = FakeProcSession(doc, row)
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
def _boom(*a, **k):
raise AssertionError("units_override 는 plan_summarize_units(tier 재판정)를 타면 안 됨")
monkeypatch.setattr(dsw, "plan_summarize_units", _boom)
await dsw.process(999, session)
# map 3 + reduce 1, 모든 콜 캡 준수 (오버헤드 = 정책 템플릿+envelope ~3K)
assert len(client.prompts) == 4
for p in client.prompts:
assert estimate_tokens(p) <= CAP_TOKENS + 3_000
assert doc.ai_detail_summary == "최종 상세."
assert doc.ai_analysis_tier == "deep"
preseg = row.payload["presegment"]
assert preseg["tier"] == "override"
assert preseg["over_pct"] == 61.46 # HOLD 당시 실측치 보존
assert len(preseg["map_results"]) == 3
assert preseg["units_override"]["boundaries"][0][2] == "파트 1" # override 보존(감사)
assert _alert_recorder == [] # 정상 재개 — 알람 없음
assert len(_patch_telemetry) == 1
@pytest.mark.asyncio
async def test_bad_override_reholds_and_alerts(monkeypatch, _patch_telemetry, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD)
payload = _override_payload(GIANT_WHOLE_MD, n_units=1) # 단일 유닛 ≈ 31.7K tok > CAP*1.1
row = SimpleNamespace(payload=payload)
session = FakeProcSession(doc, row)
def _no_llm():
raise AssertionError("잘못된 override 는 LLM 콜(900s 재생산)로 흐르면 안 됨")
monkeypatch.setattr(dsw, "AIClient", _no_llm)
with pytest.raises(StageDeferred) as ei:
await dsw.process(999, session)
assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES
preseg = row.payload["presegment"]
assert preseg["awaiting_split"] is True # 재-HOLD
assert "cap" in preseg["override_rejected"]
assert "units_override" in preseg # 원인 조사용 보존
assert len(_alert_recorder) == 1 # 거부 알람
assert "거부" in _alert_recorder[0][0]
assert doc.ai_detail_summary is None
assert _patch_telemetry == []
@pytest.mark.asyncio
async def test_override_source_len_mismatch_reholds(monkeypatch, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD)
payload = _override_payload(GIANT_WHOLE_MD, n_units=3)
payload["presegment"]["units_override"]["source_len"] = 12_345 # 본문 재생성 시나리오
row = SimpleNamespace(payload=payload)
session = FakeProcSession(doc, row)
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
with pytest.raises(StageDeferred):
await dsw.process(999, session)
assert row.payload["presegment"]["awaiting_split"] is True
assert "source_len 불일치" in row.payload["presegment"]["override_rejected"]
assert len(_alert_recorder) == 1
@pytest.mark.asyncio
async def test_no_override_small_doc_keeps_single_call_path(
monkeypatch, _patch_telemetry, _alert_recorder
):
"""무회귀 — units_override 없는 소형 문서는 기존 단일콜 경로 그대로."""
doc = _doc(SMALL_MD)
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
session = FakeProcSession(doc, row)
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
await dsw.process(999, session)
assert len(client.prompts) == 1 # 단일콜 (map-reduce 아님)
assert SMALL_MD[:200].split("\n")[1][:50] in client.prompts[0] # 원문 슬라이스 포함
assert doc.ai_detail_summary == "유닛 상세."
assert "presegment" not in row.payload # payload 무변경
assert _alert_recorder == []
@pytest.mark.asyncio
async def test_no_override_whole_doc_still_holds_with_alert(
monkeypatch, _patch_telemetry, _alert_recorder
):
"""override 미주입 whole 문서 — 기존 HOLD 시멘틱 유지 + PR3 알람만 추가."""
doc = _doc(GIANT_WHOLE_MD)
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
session = FakeProcSession(doc, row)
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
with pytest.raises(StageDeferred):
await dsw.process(999, session)
preseg = row.payload["presegment"]
assert preseg["awaiting_split"] is True and preseg["tier"] == "whole"
assert len(_alert_recorder) == 1
assert "HOLD" in _alert_recorder[0][0]
+66 -156
View File
@@ -4,6 +4,8 @@ services/queue_overview 의 SQL 수집부와 분리된 순수 판정 함수
(stage_machine_map / build_machines / build_summarize_eta / build_trend / (stage_machine_map / build_machines / build_summarize_eta / build_trend /
build_totals / compute_eta_minutes / rows_to_* / display_title) build_totals / compute_eta_minutes / rows_to_* / display_title)
mock 행으로 검증한다. 통합( SQL) 배포 라이브 smoke 확인. mock 행으로 검증한다. 통합( SQL) 배포 라이브 smoke 확인.
2026-07-02 컷오버 2노드(나스+맥미니) 기준 3노드 레인은 제거됨.
""" """
from datetime import datetime from datetime import datetime
@@ -18,7 +20,6 @@ from services.queue_overview import (
compute_eta_minutes, compute_eta_minutes,
display_title, display_title,
rows_to_stage_stats, rows_to_stage_stats,
rows_to_summarize_split,
stage_machine_map, stage_machine_map,
) )
@@ -36,186 +37,115 @@ def _stage(**kw) -> dict:
return base return base
def _split(macbook: dict | None = None, macmini: dict | None = None) -> dict:
"""summarize 풀 완료 실적 split — 미지정 0."""
zero = {"done_1h": 0, "done_today": 0, "done_15m": 0}
return {
"macbook": {**zero, **(macbook or {})},
"macmini": {**zero, **(macmini or {})},
}
def _machine(machines: list[dict], key: str) -> dict: def _machine(machines: list[dict], key: str) -> dict:
return next(m for m in machines if m["key"] == key) return next(m for m in machines if m["key"] == key)
# ─── stage→machine 귀속 맵 ──────────────────────────────────────────────────── # ─── stage→machine 귀속 맵 ────────────────────────────────────────────────────
def test_stage_machine_map_deep_enabled(): def test_stage_machine_map_two_nodes():
smap = stage_machine_map(deep_enabled=True) smap = stage_machine_map()
for s in ("extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt"): for s in ("extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt"):
assert smap[s] == "gpu" assert smap[s] == "nas"
assert smap["classify"] == "macmini" assert smap["classify"] == "macmini"
assert smap["summarize"] == "macmini" assert smap["summarize"] == "macmini"
assert smap["deep_summary"] == "macbook"
def test_stage_machine_map_deep_disabled():
"""deep 슬롯 부재 시 deep_summary 도 macmini 귀속."""
smap = stage_machine_map(deep_enabled=False)
assert smap["deep_summary"] == "macmini" assert smap["deep_summary"] == "macmini"
# ─── 머신 카드 귀속 합산 ────────────────────────────────────────────────────── # ─── 머신 카드 귀속 합산 ──────────────────────────────────────────────────────
def test_gpu_stage_counts_attribution(): def test_nas_stage_counts_attribution():
stats = { stats = {
"extract": _stage(pending=3, processing=1, done_1h=5, done_today=9, done_15m=1), "extract": _stage(pending=3, processing=1, done_1h=5, done_today=9, done_15m=1),
"stt": _stage(failed=2, done_1h=1, done_today=2), "stt": _stage(failed=2, done_1h=1, done_today=2),
} }
machines = build_machines(stats, _split(), [], deep_enabled=True) machines = build_machines(stats, [])
gpu = _machine(machines, "gpu") nas = _machine(machines, "nas")
assert (gpu["pending"], gpu["processing"], gpu["failed"]) == (3, 1, 2) assert (nas["pending"], nas["processing"], nas["failed"]) == (3, 1, 2)
assert (gpu["done_1h"], gpu["done_today"]) == (6, 11) assert (nas["done_1h"], nas["done_today"]) == (6, 11)
# gpu 의 stages 는 정적 8종 전부 (집계 0 이어도 표시) # nas 의 stages 는 정적 8종 전부 (집계 0 이어도 표시)
assert gpu["stages"] == [ assert nas["stages"] == [
"extract", "embed", "chunk", "markdown", "extract", "embed", "chunk", "markdown",
"preview", "thumbnail", "fulltext", "stt", "preview", "thumbnail", "fulltext", "stt",
] ]
def test_summarize_pool_split_attribution(): def test_macmini_llm_stages_attribution():
"""summarize pending/failed = macmini 귀속, 완료 실적은 split 로 분리 — """classify/summarize/deep_summary 전부 macmini 귀속 (단일 생성 LLM 허브)."""
stage-level summarize done 수치는 카드에 이중 합산되지 않는다."""
stats = { stats = {
"classify": _stage(done_1h=2, done_today=3), "classify": _stage(done_1h=2, done_today=3),
"summarize": _stage(pending=7, failed=1, done_1h=10, done_today=20), "summarize": _stage(pending=7, failed=1, done_1h=10, done_today=20),
"deep_summary": _stage(pending=2, processing=1, done_1h=3, done_today=4),
} }
split = _split(macbook={"done_1h": 4, "done_today": 8}, macmini={"done_1h": 6, "done_today": 12}) machines = build_machines(stats, [])
machines = build_machines(stats, split, [], deep_enabled=True)
macmini = _machine(machines, "macmini") macmini = _machine(machines, "macmini")
macbook = _machine(machines, "macbook") assert macmini["pending"] == 9 and macmini["failed"] == 1
assert macmini["processing"] == 1
assert macmini["pending"] == 7 and macmini["failed"] == 1 assert macmini["done_1h"] == 2 + 10 + 3
assert macmini["done_1h"] == 2 + 6 # classify + macmini 몫 (10 아님) assert macmini["done_today"] == 3 + 20 + 4
assert macmini["done_today"] == 3 + 12 assert macmini["stages"] == ["classify", "summarize", "deep_summary"]
assert macbook["done_1h"] == 4 and macbook["done_today"] == 8 assert _machine(machines, "nas")["pending"] == 0
assert macbook["pending"] == 0 # 풀 pending 은 macmini 만
def test_summarize_by_machine_projection(): def test_deferred_pending_on_macmini_card():
"""build_summarize_by_machine = split 의 done_1h/done_today 를 머신별로 투영 """보류(deferred_until 미래)는 summarize+deep_summary 합산으로 macmini 카드 귀속
(done_15m 제외 내부 state 판정 전용).""" (보류 = LLM 백오프 신호)."""
from services.queue_overview import build_summarize_by_machine
split = _split(
macbook={"done_1h": 226, "done_today": 312, "done_15m": 60},
macmini={"done_1h": 37, "done_today": 94, "done_15m": 9},
)
sbm = build_summarize_by_machine(split)
assert sbm == {
"macmini": {"done_1h": 37, "done_today": 94},
"macbook": {"done_1h": 226, "done_today": 312},
}
assert "done_15m" not in sbm["macbook"]
def test_compose_overview_includes_summarize_by_machine():
"""compose_overview 응답 계약에 summarize_by_machine 포함 (FE 레인 분담 재료)."""
now_kst = datetime(2026, 6, 13, 13, 0, tzinfo=KST)
stats = {"summarize": _stage(pending=1317, done_1h=264)}
split = _split(macbook={"done_1h": 226, "done_today": 312}, macmini={"done_1h": 37, "done_today": 94})
ov = compose_overview(stats, split, {}, {}, [], deep_enabled=True, now_kst=now_kst)
assert ov["summarize_by_machine"]["macbook"]["done_1h"] == 226
assert ov["summarize_by_machine"]["macmini"]["done_today"] == 94
def test_deep_disabled_deep_summary_counts_to_macmini():
stats = {"deep_summary": _stage(pending=2, processing=1, done_1h=3, done_today=4)}
machines = build_machines(stats, _split(), [], deep_enabled=False)
macmini = _machine(machines, "macmini")
macbook = _machine(machines, "macbook")
assert macmini["pending"] == 2 and macmini["processing"] == 1
assert macmini["done_1h"] == 3 and macmini["done_today"] == 4
assert macbook["stages"] == [] and macbook["pending"] == 0
assert _machine(machines, "macmini")["stages"] == ["classify", "summarize", "deep_summary"]
def test_deferred_pending_always_on_macbook_card():
"""보류(deferred_until 미래)는 summarize+deep_summary 합산으로 macbook 카드 귀속.
deep 슬롯 유무와 무관 (보류 = 맥북 불가 신호)."""
stats = { stats = {
"summarize": _stage(pending=5, deferred_pending=2), "summarize": _stage(pending=5, deferred_pending=2),
"deep_summary": _stage(pending=1, deferred_pending=1), "deep_summary": _stage(pending=1, deferred_pending=1),
} }
for deep_enabled in (True, False): machines = build_machines(stats, [])
machines = build_machines(stats, _split(), [], deep_enabled=deep_enabled) assert _machine(machines, "macmini")["deferred_pending"] == 3
assert _machine(machines, "macbook")["deferred_pending"] == 3 assert _machine(machines, "nas")["deferred_pending"] == 0
assert _machine(machines, "gpu")["deferred_pending"] == 0
assert _machine(machines, "macmini")["deferred_pending"] == 0
# ─── state 판정 ─────────────────────────────────────────────────────────────── # ─── state 판정 ───────────────────────────────────────────────────────────────
def test_macbook_state_active_wins_over_deferred_while_working(): def test_macmini_state_active_wins_over_deferred_while_working():
"""가동 > 보류 (사용자 피드백 2026-06-11): 일하고 있으면 백오프 잔여가 있어도 '가동'. """가동 > 보류 (사용자 피드백 2026-06-11): 일하고 있으면 백오프 잔여가 있어도 '가동'.
보류 건수는 deferred_pending 필드가 별도로 전달 카드 라인이 표시. 보류 건수는 deferred_pending 필드가 별도로 전달 카드 라인이 표시.
""" """
stats = {"summarize": _stage(pending=1, deferred_pending=1)} stats = {"summarize": _stage(pending=1, deferred_pending=1, done_15m=3)}
split = _split(macbook={"done_15m": 3}) machines = build_machines(stats, [])
machines = build_machines(stats, split, [], deep_enabled=True) mm = _machine(machines, "macmini")
mb = _machine(machines, "macbook") assert mm["state"] == "active"
assert mb["state"] == "active" assert mm["deferred_pending"] == 1
assert mb["deferred_pending"] == 1
def test_macbook_state_deferred_only_when_not_working(): def test_macmini_state_deferred_only_when_not_working():
"""일이 멈춰 있고(처리 0·최근 완료 0) 백오프만 쌓인 상태에서만 '보류'.""" """일이 멈춰 있고(처리 0·최근 완료 0) 백오프만 쌓인 상태에서만 '보류'."""
stats = {"summarize": _stage(pending=1, deferred_pending=1)} stats = {"summarize": _stage(pending=1, deferred_pending=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True) machines = build_machines(stats, [])
assert _machine(machines, "macbook")["state"] == "deferred" assert _machine(machines, "macmini")["state"] == "deferred"
def test_macbook_state_active_on_recent_qwen_done(): def test_macmini_state_idle():
split = _split(macbook={"done_15m": 1}) machines = build_machines({}, [])
machines = build_machines({}, split, [], deep_enabled=True)
assert _machine(machines, "macbook")["state"] == "active"
def test_macbook_state_idle():
machines = build_machines({}, _split(), [], deep_enabled=True)
assert _machine(machines, "macbook")["state"] == "idle"
def test_gpu_state_active_on_processing():
stats = {"extract": _stage(processing=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "active"
def test_gpu_state_active_on_recent_done():
stats = {"embed": _stage(done_15m=2)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "active"
def test_gpu_state_idle_when_old_done_only():
stats = {"embed": _stage(done_1h=5, done_today=9)} # 15분 내 완료 없음
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "idle"
def test_macmini_state_not_active_on_macbook_pool_done():
"""summarize 풀 완료가 전부 macbook 몫이면 macmini 는 active 아님 (귀속 기준)."""
stats = {"summarize": _stage(done_15m=1)}
split = _split(macbook={"done_15m": 1})
machines = build_machines(stats, split, [], deep_enabled=True)
assert _machine(machines, "macmini")["state"] == "idle" assert _machine(machines, "macmini")["state"] == "idle"
def test_nas_state_active_on_processing():
stats = {"extract": _stage(processing=1)}
machines = build_machines(stats, [])
assert _machine(machines, "nas")["state"] == "active"
def test_nas_state_active_on_recent_done():
stats = {"embed": _stage(done_15m=2)}
machines = build_machines(stats, [])
assert _machine(machines, "nas")["state"] == "active"
def test_nas_state_idle_when_old_done_only():
stats = {"embed": _stage(done_1h=5, done_today=9)} # 15분 내 완료 없음
machines = build_machines(stats, [])
assert _machine(machines, "nas")["state"] == "idle"
def test_macmini_state_active_on_summarize_processing(): def test_macmini_state_active_on_summarize_processing():
stats = {"summarize": _stage(processing=1)} stats = {"summarize": _stage(processing=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True) machines = build_machines(stats, [])
assert _machine(machines, "macmini")["state"] == "active" assert _machine(machines, "macmini")["state"] == "active"
@@ -228,21 +158,18 @@ def test_current_summarize_to_macmini_max_two():
{"stage": "summarize", "document_id": 3, "title": "문서C", "original_filename": None, "file_path": None}, {"stage": "summarize", "document_id": 3, "title": "문서C", "original_filename": None, "file_path": None},
{"stage": "extract", "document_id": 4, "title": "문서D", "original_filename": None, "file_path": None}, {"stage": "extract", "document_id": 4, "title": "문서D", "original_filename": None, "file_path": None},
] ]
machines = build_machines({}, _split(), rows, deep_enabled=True) machines = build_machines({}, rows)
macmini = _machine(machines, "macmini") macmini = _machine(machines, "macmini")
gpu = _machine(machines, "gpu") nas = _machine(machines, "nas")
assert [c["document_id"] for c in macmini["current"]] == [1, 2] # 최대 2건 assert [c["document_id"] for c in macmini["current"]] == [1, 2] # 최대 2건
assert macmini["current"][0] == {"document_id": 1, "title": "문서A", "stage": "summarize"} assert macmini["current"][0] == {"document_id": 1, "title": "문서A", "stage": "summarize"}
assert [c["document_id"] for c in gpu["current"]] == [4] assert [c["document_id"] for c in nas["current"]] == [4]
assert _machine(machines, "macbook")["current"] == []
def test_current_deep_summary_follows_deep_slot(): def test_current_deep_summary_to_macmini():
rows = [{"stage": "deep_summary", "document_id": 9, "title": "심층", "original_filename": None, "file_path": None}] rows = [{"stage": "deep_summary", "document_id": 9, "title": "심층", "original_filename": None, "file_path": None}]
enabled = build_machines({}, _split(), rows, deep_enabled=True) machines = build_machines({}, rows)
disabled = build_machines({}, _split(), rows, deep_enabled=False) assert _machine(machines, "macmini")["current"][0]["document_id"] == 9
assert _machine(enabled, "macbook")["current"][0]["document_id"] == 9
assert _machine(disabled, "macmini")["current"][0]["document_id"] == 9
def test_display_title_fallback_chain(): def test_display_title_fallback_chain():
@@ -344,32 +271,15 @@ def test_rows_to_stage_stats_conversion():
assert stats["summarize"]["deferred_pending"] == 2 assert stats["summarize"]["deferred_pending"] == 2
def test_rows_to_summarize_split_conversion():
rows = [
(True, 4, 8, 1), # is_macbook
(False, 6, 12, 0),
]
split = rows_to_summarize_split(rows)
assert split["macbook"] == {"done_1h": 4, "done_today": 8, "done_15m": 1}
assert split["macmini"] == {"done_1h": 6, "done_today": 12, "done_15m": 0}
def test_rows_to_summarize_split_empty():
split = rows_to_summarize_split([])
assert split["macbook"]["done_1h"] == 0 and split["macmini"]["done_today"] == 0
def test_compose_overview_contract_shape(): def test_compose_overview_contract_shape():
"""응답 dict 의 키가 FE 계약 shape 과 정확히 일치하는지 고정.""" """응답 dict 의 키가 FE 계약 shape 과 정확히 일치하는지 고정."""
out = compose_overview( out = compose_overview(
{"summarize": _stage(pending=1)}, {"summarize": _stage(pending=1)},
_split(),
{}, {}, [], {}, {}, [],
deep_enabled=True,
now_kst=datetime(2026, 6, 11, 14, 30, tzinfo=KST), now_kst=datetime(2026, 6, 11, 14, 30, tzinfo=KST),
) )
assert set(out.keys()) == {"machines", "stages", "summarize_eta", "trend_24h", "totals"} assert set(out.keys()) == {"machines", "stages", "summarize_eta", "trend_24h", "totals"}
assert [m["key"] for m in out["machines"]] == ["gpu", "macmini", "macbook"] assert [m["key"] for m in out["machines"]] == ["nas", "macmini"]
for m in out["machines"]: for m in out["machines"]:
assert set(m.keys()) == { assert set(m.keys()) == {
"key", "label", "state", "stages", "pending", "processing", "failed", "key", "label", "state", "stages", "pending", "processing", "failed",
@@ -381,7 +291,7 @@ def test_compose_overview_contract_shape():
assert set(out["trend_24h"][0].keys()) == {"hour", "inflow", "done"} assert set(out["trend_24h"][0].keys()) == {"hour", "inflow", "done"}
assert set(out["totals"].keys()) == {"pending", "processing", "failed"} assert set(out["totals"].keys()) == {"pending", "processing", "failed"}
# 머신 label 고정 (raw 모델명 노출 금지 — label 만) # 머신 label 고정 (raw 모델명 노출 금지 — label 만)
assert [m["label"] for m in out["machines"]] == ["GPU 서버", "맥미니", "맥북 M5 Max"] assert [m["label"] for m in out["machines"]] == ["나스", "맥미니"]
# ─── build_stages (단계별 현황 — 2026-06-11 사용자 피드백: 완료 가시화) ────── # ─── build_stages (단계별 현황 — 2026-06-11 사용자 피드백: 완료 가시화) ──────
+54
View File
@@ -0,0 +1,54 @@
"""rerank 프로토콜 정규화 단위 테스트 — 2노드 이관 P1-4 (llama.cpp /v1/rerank).
순수 함수(ai/rerank_protocol.py) 대상 HTTP/DB 의존 없음.
실행: PYTHONPATH=app pytest tests/test_rerank_protocol.py
"""
import json
from pathlib import Path
from ai.rerank_protocol import normalize_llamacpp_rerank
FIXTURES = Path(__file__).parent / "fixtures"
def test_normalize_llamacpp_shape_and_desc_sort():
payload = {
"model": "bge-reranker-v2-m3",
"results": [
{"index": 0, "relevance_score": 0.12},
{"index": 1, "relevance_score": 2.21},
{"index": 2, "relevance_score": -1.5},
],
}
out = normalize_llamacpp_rerank(payload)
# TEI 계약: [{"index","score"}] score 내림차순
assert [r["index"] for r in out] == [1, 0, 2]
assert all(set(r) == {"index", "score"} for r in out)
assert out[0]["score"] == 2.21
def test_normalize_llamacpp_missing_fields_skipped():
payload = {
"results": [
{"index": 0}, # relevance_score 없음 → 버림
{"relevance_score": 1.0}, # index 없음 → 버림
{"index": 3, "relevance_score": 0.5},
]
}
assert normalize_llamacpp_rerank(payload) == [{"index": 3, "score": 0.5}]
def test_normalize_llamacpp_empty_and_absent_results():
assert normalize_llamacpp_rerank({}) == []
assert normalize_llamacpp_rerank({"results": []}) == []
def test_tei_fixture_shape_is_already_contract():
"""TEI 캡처 fixture(Phase 2B G0-1 spec 박제)의 실응답이 정규화 없이 계약 형태임을 확인."""
doc = json.loads((FIXTURES / "tei_rerank_response.json").read_text())
captured = doc["captured_responses"]["baseline_bge_v2_m3"]["raw"]
assert isinstance(captured, list) and captured
assert {"index", "score"} <= set(captured[0])
# spec 문자열도 계약과 일치 (score desc 정렬 포함)
assert "index" in doc["response_shape"] and "score" in doc["response_shape"]