feat(observability): 큐 밖 백그라운드 작업(backfill)을 처리 머신 보드에 노출

processing_queue 는 파이프라인 stage 전용이라 hier_overnight_backfill 같은 off-queue
관리 스크립트 작업이 대시보드 보드에 안 잡혀, 다른 세션이 모르고 fastapi 를 재생성해
in-flight 재분해를 끊는 사고가 발생(2026-06-14). 사각지대 해소.

- migrations/357_background_jobs.sql: background_jobs 테이블(kind/label/state/processed/
  total/heartbeat). worker_jobs(user_id 필수, worker-pool 전용)와 별개.
- services/background_jobs.py: start/heartbeat/finish 헬퍼 — 자율 트랜잭션(즉시 commit →
  실시간 가시화) + best-effort(관측 실패가 본작업 안 깸).
- hier_overnight_backfill: 작업 시작/절 ~10개마다 heartbeat/종료 계측.
- queue_overview: /api/queue/overview 응답에 background_jobs 추가(running + 최근 6h 완료,
  stale=heartbeat 끊김 추정). SAVEPOINT 로 테이블 부재/오류 시 보드 본체 무영향.
- ProcessingFlowBoard: "백그라운드 작업" 패널(진행/경과/state, stale 끊김 경고).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-14 12:23:37 +09:00
parent 6d978289b8
commit ebbcaf86d8
7 changed files with 240 additions and 4 deletions
+15
View File
@@ -103,6 +103,20 @@ class StageRow(BaseModel):
oldest_pending_age_sec: int | None
class BackgroundJobItem(BaseModel):
"""큐 밖 관리 스크립트(백필 등) 작업 — processing_queue 가 못 보는 사각지대 노출.
stale = running 인데 heartbeat 가 오래 끊김(프로세스 사망 추정)."""
id: int
kind: str
label: str | None
state: Literal["running", "done", "failed"]
processed: int
total: int | None
elapsed_sec: int
stale: bool
error: str | None
class QueueOverviewResponse(BaseModel):
machines: list[MachineCard]
stages: list[StageRow]
@@ -110,6 +124,7 @@ class QueueOverviewResponse(BaseModel):
summarize_by_machine: SummarizeByMachine
trend_24h: list[TrendBucket]
totals: Totals
background_jobs: list[BackgroundJobItem] = []
class FailedItem(BaseModel):
+93
View File
@@ -0,0 +1,93 @@
"""off-queue 관리 스크립트(백필 등) 진행 가시화 — background_jobs (migration 357).
processing_queue 는 파이프라인 stage 전용이라 hier_overnight_backfill /
section_summary_pilot 같은 스크립트 작업은 대시보드 보드에 안 잡힌다. 이 모듈로
스크립트가 진행상황을 남기면 queue_overview 가 "백그라운드 작업" 패널로 노출한다.
설계 불변식:
- **자율 트랜잭션**: 각 기록은 engine.begin() 짧은 트랜잭션으로 즉시 commit한다.
스크립트 본 작업은 별도 세션(긴 트랜잭션)이라, 같이 묶으면 commit 전까지 안 보여
실시간 가시화가 깨진다. 그래서 전용 connection 으로 독립 commit.
- **best-effort**: 관측 기록 실패가 본 작업을 깨면 안 된다 — 모든 함수 try/except,
실패 시 warning 로그만. job_id=None 이면 조용히 no-op (start 실패해도 이어서 동작).
"""
import json
import logging
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine
logger = logging.getLogger(__name__)
async def start_job(
engine: AsyncEngine, kind: str, label: str | None = None, total: int | None = None
) -> int | None:
"""작업 시작 기록 → background_jobs.id (실패 시 None — 호출측은 그대로 진행)."""
try:
async with engine.begin() as conn:
row = (
await conn.execute(
text(
"INSERT INTO background_jobs (kind, label, total) "
"VALUES (:k, :l, :t) RETURNING id"
),
{"k": kind, "l": label, "t": total},
)
).first()
return int(row[0]) if row else None
except Exception as exc: # noqa: BLE001 — 관측은 부가, 본작업 보호
logger.warning(f"[background_jobs] start 실패(무시): {type(exc).__name__}: {exc}")
return None
async def heartbeat(
engine: AsyncEngine,
job_id: int | None,
*,
processed: int | None = None,
total: int | None = None,
detail: dict | None = None,
) -> None:
"""진행 갱신(processed/total/detail). job_id=None 또는 실패 시 no-op."""
if job_id is None:
return
try:
async with engine.begin() as conn:
await conn.execute(
text(
"UPDATE background_jobs SET "
"processed = COALESCE(:p, processed), "
"total = COALESCE(:t, total), "
"detail = COALESCE(CAST(:d AS jsonb), detail), "
"updated_at = now() WHERE id = :id"
),
{
"id": job_id,
"p": processed,
"t": total,
"d": json.dumps(detail, ensure_ascii=False) if detail is not None else None,
},
)
except Exception as exc: # noqa: BLE001
logger.warning(f"[background_jobs] heartbeat 실패(무시): {type(exc).__name__}: {exc}")
async def finish_job(
engine: AsyncEngine, job_id: int | None, *, state: str = "done", error: str | None = None
) -> None:
"""종료 기록(done/failed). job_id=None 또는 실패 시 no-op."""
if job_id is None:
return
try:
async with engine.begin() as conn:
await conn.execute(
text(
"UPDATE background_jobs SET state = :s, error = :e, "
"finished_at = now(), updated_at = now() WHERE id = :id"
),
{"id": job_id, "s": state, "e": (error or None)},
)
except Exception as exc: # noqa: BLE001
logger.warning(f"[background_jobs] finish 실패(무시): {type(exc).__name__}: {exc}")
+39 -1
View File
@@ -412,7 +412,7 @@ async def build_overview(session: AsyncSession) -> dict:
for row in current_result
]
return compose_overview(
result = compose_overview(
rows_to_stage_stats(stage_rows),
rows_to_summarize_split(split_rows),
{row[0]: int(row[1]) for row in inflow_rows},
@@ -421,6 +421,44 @@ async def build_overview(session: AsyncSession) -> dict:
deep_enabled=deep_enabled,
now_kst=now_kst,
)
# 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]).
result["background_jobs"] = await _fetch_background_jobs(session)
return result
_BACKGROUND_JOBS_SQL = """
SELECT id, kind, label, state, processed, total,
EXTRACT(EPOCH FROM (now() - started_at))::int AS elapsed_sec,
(state = 'running' AND updated_at < now() - interval '5 minutes') AS stale,
error
FROM background_jobs
WHERE state = 'running' OR finished_at > now() - interval '6 hours'
ORDER BY (state = 'running') DESC, started_at DESC
LIMIT 20
"""
async def _fetch_background_jobs(session: AsyncSession) -> list[dict]:
"""running + 최근 6h 완료 background_jobs. 테이블 없거나 오류면 [] (보드 무영향).
요청 세션과 **별도 connection**으로 조회한다 — 테이블 부재(마이그 357 미적용 등) 시
SELECT 실패가 요청 세션의 트랜잭션을 오염시키지 않도록 물리적으로 분리(실패 시 그
임시 connection만 폐기). 관측은 부가 기능이라 보드 본체를 절대 깨면 안 된다.
"""
try:
async with session.bind.connect() as conn: # 풀에서 독립 connection
rows = (await conn.execute(text(_BACKGROUND_JOBS_SQL))).mappings().all()
except Exception: # noqa: BLE001 — 관측 부가, 보드 본체 보호
return []
return [
{
"id": r["id"], "kind": r["kind"], "label": r["label"], "state": r["state"],
"processed": int(r["processed"] or 0), "total": r["total"],
"elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]),
"error": r["error"],
}
for r in rows
]
# ─── 실패 처리 (plan ds-board-engines-1) ─────────────────────────────────────
@@ -210,6 +210,19 @@
// 맥북이 요약을 실제로 가져가는 중인가 (합류 표식 게이트)
const offloadActive = $derived(split.macbook.done_1h > 0);
// ─── 백그라운드 작업 (큐 밖 스크립트 backfill) — processing_queue 사각지대 노출 ───
const bgJobs = $derived(overview.background_jobs ?? []);
function fmtElapsed(s: number): string {
if (s < 60) return `${s}s`;
if (s < 3600) return `${Math.floor(s / 60)}m`;
return `${Math.floor(s / 3600)}h${Math.floor((s % 3600) / 60)}m`;
}
function bgDot(j: { state: string; stale: boolean }): string {
if (j.state === 'running') return j.stale ? 'bg-warning' : 'bg-success';
if (j.state === 'failed') return 'bg-error';
return 'bg-faint';
}
// ─── 지배 백로그 = 요약. 정직 ETA(유입 차감) — summarize_eta ───
const eta = $derived(overview.summarize_eta);
// 정직 ETA 라벨: eta_minutes null = 유입이 소화를 앞섬(소진 불가)
@@ -466,6 +479,32 @@
</div>
{/if}
<!-- 백그라운드 작업 (큐 밖 스크립트 backfill 등 — processing_queue 가 못 보는 사각지대) -->
{#if bgJobs.length > 0}
<div class="mt-3">
<div class="text-[11px] font-bold text-dim uppercase tracking-wider mb-2">백그라운드 작업</div>
<div class="grid gap-2">
{#each bgJobs as j (j.id)}
<div class="bg-surface border rounded-card px-3.5 py-2.5 {j.stale ? 'border-warning' : j.state === 'failed' ? 'border-error' : 'border-default'}">
<div class="flex items-center gap-2 flex-wrap">
<span class="w-2 h-2 rounded-full shrink-0 {bgDot(j)}"></span>
<span class="text-[9px] font-bold rounded px-1.5 py-px bg-default text-dim font-mono">{j.kind}</span>
<span class="text-xs font-semibold text-text truncate">{j.label ?? '작업'}</span>
<span class="text-[11px] text-dim tabular-nums ml-auto">
{#if j.total}{j.processed.toLocaleString()}/{j.total.toLocaleString()}{:else}{j.processed.toLocaleString()}{/if} · {fmtElapsed(j.elapsed_sec)}
</span>
</div>
{#if j.stale}
<div class="text-[10px] text-warning mt-1.5">heartbeat 끊김 — 프로세스 중단 추정 (재개 필요할 수 있음)</div>
{:else if j.state === 'failed'}
<div class="text-[10px] text-error mt-1.5 truncate">실패{#if j.error} · {j.error}{/if}</div>
{/if}
</div>
{/each}
</div>
</div>
{/if}
<!-- 실패 처리 드로어 -->
{#if failOpen}
<div class="border border-error/40 rounded-card mt-3 overflow-hidden bg-surface">
+15
View File
@@ -75,6 +75,20 @@ export interface QueueStageRow {
oldest_pending_age_sec: number | null;
}
/** ( ) processing_queue .
* stale = running heartbeat ( ). */
export interface BackgroundJob {
id: number;
kind: string;
label: string | null;
state: 'running' | 'done' | 'failed';
processed: number;
total: number | null;
elapsed_sec: number;
stale: boolean;
error: string | null;
}
export interface QueueOverview {
machines: MachineOverview[];
summarize_eta: SummarizeEta;
@@ -82,6 +96,7 @@ export interface QueueOverview {
trend_24h: TrendPoint[];
stages: QueueStageRow[];
totals: QueueTotals;
background_jobs?: BackgroundJob[];
}
/** ─── 실패 처리 (ds-board-engines-1) — GET /api/queue/failed · POST /retry|/skip ─── */
+19
View File
@@ -0,0 +1,19 @@
-- 2026-06-14 PR-Background-Jobs-Observability: 큐 밖 관리 스크립트(백필 등) 진행 가시화.
-- processing_queue 는 파이프라인 stage 전용 — hier_overnight_backfill / section_summary_pilot
-- 같은 off-queue 관리 스크립트는 여기에 진행상황을 남겨 대시보드 보드가 노출한다.
-- worker_jobs(user_id NOT NULL, worker-pool 전용)와 별개 — 이건 owner 없는 관리 작업 heartbeat.
-- 단일 statement (asyncpg multi-statement 불허 컨벤션). 인덱스는 소량 테이블이라 생략.
CREATE TABLE IF NOT EXISTS background_jobs (
id BIGSERIAL PRIMARY KEY,
kind TEXT NOT NULL, -- 'hier_redecompose' | 'section_summary' | ...
label TEXT, -- 사람이 읽는 대상 표기 (예: 'doc 5210 (Sec VIII)')
state TEXT NOT NULL DEFAULT 'running'
CHECK (state IN ('running', 'done', 'failed')),
processed INTEGER NOT NULL DEFAULT 0, -- 처리한 단위 수 (절/leaf 등)
total INTEGER, -- 전체 단위 수 (미상이면 NULL)
detail JSONB NOT NULL DEFAULT '{}'::jsonb,
error TEXT,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
finished_at TIMESTAMPTZ
);
+20 -3
View File
@@ -32,6 +32,7 @@ from core.config import settings
from services.hier_decomp.builder import build_hier_tree
from services.hier_decomp.persist import persist_hier_tree
from services.search.llm_gate import Priority, acquire_mlx_gate
from services.background_jobs import finish_job, heartbeat, start_job
# 단일 진실: 절 분석 상수/헬퍼 (PROMPT_VERSION 일치 = 멱등 보존)
from section_summary_pilot import (
@@ -140,8 +141,10 @@ def _make_engine():
return create_async_engine(os.environ["DATABASE_URL"], pool_pre_ping=True)
async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at):
"""doc 의 미분석 hier leaf 분석 → upsert. stop_at(epoch) 넘으면 leaf 경계 중단."""
async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at,
engine=None, job_id=None, base_processed=0):
"""doc 의 미분석 hier leaf 분석 → upsert. stop_at(epoch) 넘으면 leaf 경계 중단.
engine/job_id 주어지면 background_jobs ~10절마다 진행 heartbeat(보드 가시화)."""
rows = (await session.execute(LEAF_SQL, {"doc": doc_id, "pv": PROMPT_VERSION})).mappings().all()
ok = fail = skip = 0
timings, types = [], []
@@ -187,6 +190,8 @@ async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, s
"content_hash": r["content_hash"], "error": err,
})
await session.commit()
if job_id and (ok + fail + skip) % 10 == 0:
await heartbeat(engine, job_id, processed=base_processed + ok + fail + skip)
await session.commit()
return {"ok": ok, "fail": fail, "skip": skip, "leaves": len(rows),
"timings": timings, "types": types, "aborted": aborted}
@@ -256,6 +261,12 @@ async def cmd_run(args):
_candidate_params(allowlist, doc_ids))).mappings().all()
_log(f"후보 doc {len(cands)} 선별. 시작.")
# 관측: 큐 밖 작업이라 대시보드 보드가 못 보므로 background_jobs 에 진행 노출(best-effort)
_job_kind = "hier_redecompose" if reprocess else "hier_backfill"
_job_label = (f"doc {args.doc} {'재분해' if reprocess else '분해'}" if doc_ids
else f"{len(cands)}개 문서 {'재분해' if reprocess else '분해'}")
job_id = await start_job(engine, _job_kind, _job_label, total=None)
for c in cands:
if time.time() >= stop_at:
_log(f"⏰ deadline 버퍼 도달 — doc 경계에서 중단 (처리 {tot_docs} doc)")
@@ -272,7 +283,10 @@ async def cmd_run(args):
"timings": [], "types": [], "aborted": False}
else:
async with sm() as session:
astat = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at)
astat = await _analyze_doc_leaves(
session, client, doc_id, doc_domain, model_name, stop_at,
engine=engine, job_id=job_id,
base_processed=(tot_ok + tot_fail + tot_skip))
except Exception as exc:
_log(f" ✗ doc={doc_id} 처리 실패(건너뜀): {type(exc).__name__}: {repr(exc)[:160]}")
continue
@@ -280,6 +294,8 @@ async def cmd_run(args):
tot_docs += 1
tot_ok += astat["ok"]; tot_fail += astat["fail"]; tot_skip += astat["skip"]
all_timings += astat["timings"]; all_types += astat["types"]
await heartbeat(engine, job_id, processed=(tot_ok + tot_fail + tot_skip),
total=tot_leaves_created)
avg = statistics.mean(astat["timings"]) if astat["timings"] else 0
_log(f" ✓ doc={doc_id} ({len(body):,}{doc_domain.split('/')[0]}) "
f"leaf생성={leaves_created} 분석ok={astat['ok']} fail={astat['fail']} skip={astat['skip']} "
@@ -287,6 +303,7 @@ async def cmd_run(args):
if astat["aborted"]:
_log("⏰ leaf 분석 중 deadline 도달 — 중단")
break
await finish_job(engine, job_id, state="done")
finally:
await client.close()
await engine.dispose()