From b91b05e889a5a565f36dffdf6d3e3324847c361f Mon Sep 17 00:00:00 2001 From: hyungi Date: Thu, 2 Jul 2026 16:51:32 +0900 Subject: [PATCH] =?UTF-8?q?refactor(board):=20=EC=B2=98=EB=A6=AC=20?= =?UTF-8?q?=EB=A8=B8=EC=8B=A0=20=EB=B3=B4=EB=93=9C=20=EB=82=98=EC=8A=A4+?= =?UTF-8?q?=EB=A7=A5=EB=AF=B8=EB=8B=88=202=EB=85=B8=EB=93=9C=20=EC=9E=AC?= =?UTF-8?q?=EA=B5=AC=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 2026-07-02 컷오버 반영 — GPU 서버 퇴역, 맥북 night-drain 보류(06-29 결정). - 레인 2개: 나스(추출/마크다운/청크·임베딩 등 DS 본체 Docker 스테이지), 맥미니(분류/요약/심층분석 — 단일 생성 LLM 허브 + bge-m3/리랭크) - summarize 풀 분리(summarize_by_machine·ai_model_version 조인 SQL) 제거 — FE 유일 소비자 확인 후 응답 스키마에서 정리 (5쿼리 -> 4쿼리) - 맥북 전제 UI 제거: 요약 오프로드 분담막대·요약 합류 칩·번다운 합류 변곡점 마커·잠듦 문구·전역 스트립 맥북 칩(맥미니 칩으로 대체) - deferred_pending = LLM 백오프 신호로 맥미니 카드 귀속 (기능 보존) - 번다운 차트·정직 ETA·실패 드로어·백그라운드 작업 등 머신 무관 기능 보존 - background_jobs 머신 귀속 기본값 gpu -> nas - 단위테스트 2노드 기준 재작성 (27 passed) Co-Authored-By: Claude Fable 5 --- app/api/queue_overview.py | 19 +- app/services/queue_overview.py | 146 +++--------- .../lib/components/ProcessingFlowBoard.svelte | 63 +---- .../src/lib/components/QueueDrawer.svelte | 4 +- frontend/src/lib/types/queue.ts | 11 +- frontend/src/lib/utils/queueDisplay.ts | 23 +- frontend/src/routes/+layout.svelte | 6 +- tests/test_queue_overview.py | 222 ++++++------------ 8 files changed, 130 insertions(+), 364 deletions(-) diff --git a/app/api/queue_overview.py b/app/api/queue_overview.py index 647433a..124b2f0 100644 --- a/app/api/queue_overview.py +++ b/app/api/queue_overview.py @@ -37,8 +37,8 @@ class CurrentItem(BaseModel): class MachineCard(BaseModel): - """머신 카드 — stage 귀속 합산 + 완료 실적(summarize 는 풀 분리) + state.""" - key: Literal["gpu", "macmini", "macbook"] + """머신 카드 — stage 귀속 합산 + 완료 실적 + state (나스/맥미니 2노드).""" + key: Literal["nas", "macmini"] label: str state: Literal["active", "deferred", "idle"] stages: list[str] @@ -59,20 +59,6 @@ class SummarizeEta(BaseModel): eta_minutes: int | None -class MachineDone(BaseModel): - """머신 1대의 summarize 완료 실적 (분담 표시용).""" - done_1h: int - done_today: int - - -class SummarizeByMachine(BaseModel): - """summarize 풀의 머신별 완료 실적 분담 — 보드 레인의 '맥미니 vs 맥북' - 오프로드 가시화용. rows_to_summarize_split 이 이미 계산하던 값의 노출 - (ds-board-merged A-1, 신규 수집 SQL 0).""" - macmini: MachineDone - macbook: MachineDone - - class TrendBucket(BaseModel): """summarize 24h 추이 버킷 — hour 는 KST "HH:00" 라벨.""" hour: str @@ -122,7 +108,6 @@ class QueueOverviewResponse(BaseModel): machines: list[MachineCard] stages: list[StageRow] summarize_eta: SummarizeEta - summarize_by_machine: SummarizeByMachine trend_24h: list[TrendBucket] totals: Totals background_jobs: list[BackgroundJobItem] = [] diff --git a/app/services/queue_overview.py b/app/services/queue_overview.py index 4c17e17..ee14702 100644 --- a/app/services/queue_overview.py +++ b/app/services/queue_overview.py @@ -3,19 +3,16 @@ GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue / documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약). -구조: SQL 수집부(build_overview 내부 5쿼리)와 판정부(순수 함수)를 분리. +구조: SQL 수집부(build_overview 내부 4쿼리)와 판정부(순수 함수)를 분리. 판정부(rows_to_* / build_machines / build_summarize_eta / build_trend / build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능. -귀속 규칙 (단일 진실): -- stage→machine 정적 맵: gpu = extract/embed/chunk/markdown/preview/thumbnail/ - fulltext/stt · macmini = classify/summarize · macbook = deep_summary - (단, settings.ai.deep 부재 시 deep_summary 도 macmini 귀속). -- summarize 는 풀(pool): pending/processing/failed 는 macmini 귀속이되, 완료 - 실적(done_*)은 documents.ai_model_version 조인으로 분리 — 'qwen-macbook' - 이면 macbook 실적, 아니면 macmini 실적. -- deferred_pending(payload.deferred_until 미래)은 macbook 카드 귀속 - (보류 = 맥북 불가 신호). +귀속 규칙 (단일 진실 — 2026-07-02 컷오버 후 나스+맥미니 2노드): +- stage→machine 정적 맵: nas = extract/embed/chunk/markdown/preview/thumbnail/ + fulltext/stt (DS 본체 Docker — 임베딩·리랭크 모델 콜은 맥미니로 나감) · + macmini = classify/summarize/deep_summary (단일 생성 LLM 허브). +- deferred_pending(payload.deferred_until 미래)은 LLM 백오프 신호 — + summarize/deep_summary 소속인 macmini 카드 귀속. """ from datetime import datetime, timedelta @@ -25,42 +22,33 @@ from zoneinfo import ZoneInfo from sqlalchemy import bindparam, text from sqlalchemy.ext.asyncio import AsyncSession -from core.config import settings - KST = ZoneInfo("Asia/Seoul") -# 내부 판별용 alias — 응답에 raw 모델명 노출 금지, 머신 label 만 노출. -_MACBOOK_MODEL_ALIAS = "qwen-macbook" - # stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서) -_GPU_STAGES = ( +_NAS_STAGES = ( "extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt", ) -_MACMINI_STAGES = ("classify", "summarize") -_MACBOOK_STAGES = ("deep_summary",) -_STAGE_ORDER = _GPU_STAGES + _MACMINI_STAGES + _MACBOOK_STAGES +_MACMINI_STAGES = ("classify", "summarize", "deep_summary") +_STAGE_ORDER = _NAS_STAGES + _MACMINI_STAGES -_MACHINE_KEYS = ("gpu", "macmini", "macbook") +_MACHINE_KEYS = ("nas", "macmini") _MACHINE_LABELS = { - "gpu": "GPU 서버", + "nas": "나스", "macmini": "맥미니", - "macbook": "맥북 M5 Max", } # 머신 카드당 current 표시 상한 _CURRENT_LIMIT = 2 -def stage_machine_map(deep_enabled: bool) -> dict[str, str]: - """stage → machine key 맵. deep 슬롯 부재 시 deep_summary 는 macmini 귀속.""" +def stage_machine_map() -> dict[str, str]: + """stage → machine key 맵 (정적 — 나스/맥미니 2노드).""" mapping: dict[str, str] = {} - for s in _GPU_STAGES: - mapping[s] = "gpu" + for s in _NAS_STAGES: + mapping[s] = "nas" for s in _MACMINI_STAGES: mapping[s] = "macmini" - for s in _MACBOOK_STAGES: - mapping[s] = "macbook" if deep_enabled else "macmini" return mapping @@ -90,23 +78,6 @@ def rows_to_stage_stats(rows) -> dict[str, dict]: return stats -def rows_to_summarize_split(rows) -> dict[str, dict]: - """summarize 완료 실적 분리 쿼리 행 → {"macbook"|"macmini": {done_*}}. - - is_macbook = documents.ai_model_version 이 'qwen-macbook' 인지 (내부 판별 전용). - """ - split = { - "macbook": {"done_1h": 0, "done_today": 0, "done_15m": 0}, - "macmini": {"done_1h": 0, "done_today": 0, "done_15m": 0}, - } - for row in rows: - key = "macbook" if row[0] else "macmini" - split[key]["done_1h"] += int(row[1] or 0) - split[key]["done_today"] += int(row[2] or 0) - split[key]["done_15m"] += int(row[3] or 0) - return split - - def display_title(row: dict) -> str: """표시용 제목 — title > original_filename > file_path basename > 문서 id.""" if row.get("title"): @@ -120,13 +91,10 @@ def display_title(row: dict) -> str: def build_machines( stage_stats: dict[str, dict], - summarize_split: dict[str, dict], current_rows: list[dict], - *, - deep_enabled: bool, ) -> list[dict]: - """머신 카드 3장 (gpu / macmini / macbook) 구성 — 귀속 규칙의 판정부.""" - smap = stage_machine_map(deep_enabled) + """머신 카드 2장 (nas / macmini) 구성 — 귀속 규칙의 판정부.""" + smap = stage_machine_map() def g(stage: str, field: str) -> int: return stage_stats.get(stage, {}).get(field, 0) @@ -149,29 +117,23 @@ def build_machines( pending = sum(g(s, "pending") for s in stages) processing = sum(g(s, "processing") for s in stages) failed = sum(g(s, "failed") for s in stages) + done_1h = sum(g(s, "done_1h") for s in stages) + done_today = sum(g(s, "done_today") for s in stages) + done_15m = sum(g(s, "done_15m") for s in stages) - # 완료 실적: summarize 는 풀이라 stage 합산에서 제외하고 split 로 귀속 - done_1h = sum(g(s, "done_1h") for s in stages if s != "summarize") - done_today = sum(g(s, "done_today") for s in stages if s != "summarize") - done_15m = sum(g(s, "done_15m") for s in stages if s != "summarize") - if key in summarize_split: - done_1h += summarize_split[key]["done_1h"] - done_today += summarize_split[key]["done_today"] - done_15m += summarize_split[key]["done_15m"] - - # 보류 백오프 = 맥북 불가 신호 → macbook 카드 귀속 (deep 슬롯 유무 무관) + # 보류 백오프 = LLM 불가 신호 → LLM stage 소속인 macmini 카드 귀속 deferred_pending = ( g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending") - if key == "macbook" else 0 + if key == "macmini" else 0 ) # state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11). # 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" — # 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은 - # 실제로 일이 멈춰 있고 백오프만 쌓인 상태(sleep/불가 지속)에서만. + # 실제로 일이 멈춰 있고 백오프만 쌓인 상태(LLM 허브 불가 지속)에서만. if processing > 0 or done_15m > 0: state = "active" - elif key == "macbook" and deferred_pending > 0: + elif deferred_pending > 0: state = "deferred" else: state = "idle" @@ -213,16 +175,6 @@ def build_summarize_eta(stage_stats: dict[str, dict]) -> dict: } -def build_summarize_by_machine(summarize_split: dict[str, dict]) -> dict: - """summarize 머신별 완료 실적 분담 (macmini vs macbook) — 보드 레인의 - 오프로드 가시화용. rows_to_summarize_split 이 이미 만든 값을 응답 형태로 - 투영(done_1h/done_today 만, done_15m 은 내부 state 판정 전용이라 제외).""" - def m(key: str) -> dict: - s = summarize_split.get(key, {}) - return {"done_1h": int(s.get("done_1h", 0)), "done_today": int(s.get("done_today", 0))} - return {"macmini": m("macmini"), "macbook": m("macbook")} - - def build_trend( inflow_buckets: dict[str, int], done_buckets: dict[str, int], @@ -287,28 +239,23 @@ def build_totals(stage_stats: dict[str, dict]) -> dict: def compose_overview( stage_stats: dict[str, dict], - summarize_split: dict[str, dict], inflow_buckets: dict[str, int], done_buckets: dict[str, int], current_rows: list[dict], *, - deep_enabled: bool, now_kst: datetime, ) -> dict: """수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요.""" return { - "machines": build_machines( - stage_stats, summarize_split, current_rows, deep_enabled=deep_enabled - ), + "machines": build_machines(stage_stats, current_rows), "stages": build_stages(stage_stats), "summarize_eta": build_summarize_eta(stage_stats), - "summarize_by_machine": build_summarize_by_machine(summarize_split), "trend_24h": build_trend(inflow_buckets, done_buckets, now_kst), "totals": build_totals(stage_stats), } -# ─── SQL 수집부 (총 5쿼리) ──────────────────────────────────────────────────── +# ─── SQL 수집부 (총 4쿼리) ──────────────────────────────────────────────────── # 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방) _STAGE_STATS_SQL = """ @@ -333,23 +280,7 @@ _STAGE_STATS_SQL = """ GROUP BY stage """ -# 2) summarize 풀 완료 실적 분리 (documents.ai_model_version 조인, 1방) -# 스캔 하한 = 오늘 0시(KST)와 1h 전 중 더 이른 시각 (자정 직후 1h 창 보전). -_SUMMARIZE_SPLIT_SQL = """ - SELECT - COALESCE(d.ai_model_version = :macbook_alias, false) AS is_macbook, - COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '1 hour') AS done_1h, - COUNT(*) FILTER (WHERE q.completed_at > :kst_midnight) AS done_today, - COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '15 minutes') AS done_15m - FROM processing_queue q - JOIN documents d ON d.id = q.document_id - WHERE q.stage = 'summarize' - AND q.status = 'completed' - AND q.completed_at > LEAST(:kst_midnight, NOW() - INTERVAL '1 hour') - GROUP BY 1 -""" - -# 3/4) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방) +# 2/3) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방) _TREND_INFLOW_SQL = """ SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'), 'YYYY-MM-DD HH24:00') AS bucket, @@ -371,7 +302,7 @@ _TREND_DONE_SQL = """ GROUP BY 1 """ -# 5) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서) +# 4) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서) _CURRENT_SQL = """ SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path FROM processing_queue q @@ -383,20 +314,13 @@ _CURRENT_SQL = """ async def build_overview(session: AsyncSession) -> dict: - """5쿼리 수집 → compose_overview 판정 → 응답 dict.""" + """4쿼리 수집 → compose_overview 판정 → 응답 dict.""" now_kst = datetime.now(KST) kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0) - deep_enabled = settings.ai is not None and settings.ai.deep is not None stage_rows = ( await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight}) ).all() - split_rows = ( - await session.execute( - text(_SUMMARIZE_SPLIT_SQL), - {"kst_midnight": kst_midnight, "macbook_alias": _MACBOOK_MODEL_ALIAS}, - ) - ).all() inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all() done_rows = (await session.execute(text(_TREND_DONE_SQL))).all() current_result = (await session.execute(text(_CURRENT_SQL))).all() @@ -414,11 +338,9 @@ async def build_overview(session: AsyncSession) -> dict: result = compose_overview( rows_to_stage_stats(stage_rows), - rows_to_summarize_split(split_rows), {row[0]: int(row[1]) for row in inflow_rows}, {row[0]: int(row[1]) for row in done_rows}, current_rows, - deep_enabled=deep_enabled, now_kst=now_kst, ) # 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]). @@ -426,13 +348,13 @@ async def build_overview(session: AsyncSession) -> dict: return result -# kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = gpu(오케스트레이션 호스트). +# kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = nas(오케스트레이션 호스트). _BG_JOB_MACHINE = { "global_digest": "macmini", "morning_briefing": "macmini", "section_summary": "macmini", - "hier_backfill": "gpu", - "hier_redecompose": "gpu", + "hier_backfill": "nas", + "hier_redecompose": "nas", } @@ -466,7 +388,7 @@ async def _fetch_background_jobs(session: AsyncSession) -> list[dict]: "processed": int(r["processed"] or 0), "total": r["total"], "elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]), "error": r["error"], - "machine": _BG_JOB_MACHINE.get(r["kind"], "gpu"), + "machine": _BG_JOB_MACHINE.get(r["kind"], "nas"), } for r in rows ] diff --git a/frontend/src/lib/components/ProcessingFlowBoard.svelte b/frontend/src/lib/components/ProcessingFlowBoard.svelte index 5ef80ca..5db4f56 100644 --- a/frontend/src/lib/components/ProcessingFlowBoard.svelte +++ b/frontend/src/lib/components/ProcessingFlowBoard.svelte @@ -1,6 +1,7 @@