diff --git a/app/api/queue_overview.py b/app/api/queue_overview.py index 383cc71..9b93366 100644 --- a/app/api/queue_overview.py +++ b/app/api/queue_overview.py @@ -103,6 +103,20 @@ class StageRow(BaseModel): oldest_pending_age_sec: int | None +class BackgroundJobItem(BaseModel): + """큐 밖 관리 스크립트(백필 등) 작업 — processing_queue 가 못 보는 사각지대 노출. + stale = running 인데 heartbeat 가 오래 끊김(프로세스 사망 추정).""" + id: int + kind: str + label: str | None + state: Literal["running", "done", "failed"] + processed: int + total: int | None + elapsed_sec: int + stale: bool + error: str | None + + class QueueOverviewResponse(BaseModel): machines: list[MachineCard] stages: list[StageRow] @@ -110,6 +124,7 @@ class QueueOverviewResponse(BaseModel): summarize_by_machine: SummarizeByMachine trend_24h: list[TrendBucket] totals: Totals + background_jobs: list[BackgroundJobItem] = [] class FailedItem(BaseModel): diff --git a/app/services/background_jobs.py b/app/services/background_jobs.py new file mode 100644 index 0000000..a527b80 --- /dev/null +++ b/app/services/background_jobs.py @@ -0,0 +1,93 @@ +"""off-queue 관리 스크립트(백필 등) 진행 가시화 — background_jobs (migration 357). + +processing_queue 는 파이프라인 stage 전용이라 hier_overnight_backfill / +section_summary_pilot 같은 스크립트 작업은 대시보드 보드에 안 잡힌다. 이 모듈로 +스크립트가 진행상황을 남기면 queue_overview 가 "백그라운드 작업" 패널로 노출한다. + +설계 불변식: +- **자율 트랜잭션**: 각 기록은 engine.begin() 짧은 트랜잭션으로 즉시 commit한다. + 스크립트 본 작업은 별도 세션(긴 트랜잭션)이라, 같이 묶으면 commit 전까지 안 보여 + 실시간 가시화가 깨진다. 그래서 전용 connection 으로 독립 commit. +- **best-effort**: 관측 기록 실패가 본 작업을 깨면 안 된다 — 모든 함수 try/except, + 실패 시 warning 로그만. job_id=None 이면 조용히 no-op (start 실패해도 이어서 동작). +""" + +import json +import logging + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine + +logger = logging.getLogger(__name__) + + +async def start_job( + engine: AsyncEngine, kind: str, label: str | None = None, total: int | None = None +) -> int | None: + """작업 시작 기록 → background_jobs.id (실패 시 None — 호출측은 그대로 진행).""" + try: + async with engine.begin() as conn: + row = ( + await conn.execute( + text( + "INSERT INTO background_jobs (kind, label, total) " + "VALUES (:k, :l, :t) RETURNING id" + ), + {"k": kind, "l": label, "t": total}, + ) + ).first() + return int(row[0]) if row else None + except Exception as exc: # noqa: BLE001 — 관측은 부가, 본작업 보호 + logger.warning(f"[background_jobs] start 실패(무시): {type(exc).__name__}: {exc}") + return None + + +async def heartbeat( + engine: AsyncEngine, + job_id: int | None, + *, + processed: int | None = None, + total: int | None = None, + detail: dict | None = None, +) -> None: + """진행 갱신(processed/total/detail). job_id=None 또는 실패 시 no-op.""" + if job_id is None: + return + try: + async with engine.begin() as conn: + await conn.execute( + text( + "UPDATE background_jobs SET " + "processed = COALESCE(:p, processed), " + "total = COALESCE(:t, total), " + "detail = COALESCE(CAST(:d AS jsonb), detail), " + "updated_at = now() WHERE id = :id" + ), + { + "id": job_id, + "p": processed, + "t": total, + "d": json.dumps(detail, ensure_ascii=False) if detail is not None else None, + }, + ) + except Exception as exc: # noqa: BLE001 + logger.warning(f"[background_jobs] heartbeat 실패(무시): {type(exc).__name__}: {exc}") + + +async def finish_job( + engine: AsyncEngine, job_id: int | None, *, state: str = "done", error: str | None = None +) -> None: + """종료 기록(done/failed). job_id=None 또는 실패 시 no-op.""" + if job_id is None: + return + try: + async with engine.begin() as conn: + await conn.execute( + text( + "UPDATE background_jobs SET state = :s, error = :e, " + "finished_at = now(), updated_at = now() WHERE id = :id" + ), + {"id": job_id, "s": state, "e": (error or None)}, + ) + except Exception as exc: # noqa: BLE001 + logger.warning(f"[background_jobs] finish 실패(무시): {type(exc).__name__}: {exc}") diff --git a/app/services/queue_overview.py b/app/services/queue_overview.py index 682660f..5168648 100644 --- a/app/services/queue_overview.py +++ b/app/services/queue_overview.py @@ -412,7 +412,7 @@ async def build_overview(session: AsyncSession) -> dict: for row in current_result ] - return compose_overview( + result = compose_overview( rows_to_stage_stats(stage_rows), rows_to_summarize_split(split_rows), {row[0]: int(row[1]) for row in inflow_rows}, @@ -421,6 +421,44 @@ async def build_overview(session: AsyncSession) -> dict: deep_enabled=deep_enabled, now_kst=now_kst, ) + # 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]). + result["background_jobs"] = await _fetch_background_jobs(session) + return result + + +_BACKGROUND_JOBS_SQL = """ + SELECT id, kind, label, state, processed, total, + EXTRACT(EPOCH FROM (now() - started_at))::int AS elapsed_sec, + (state = 'running' AND updated_at < now() - interval '5 minutes') AS stale, + error + FROM background_jobs + WHERE state = 'running' OR finished_at > now() - interval '6 hours' + ORDER BY (state = 'running') DESC, started_at DESC + LIMIT 20 +""" + + +async def _fetch_background_jobs(session: AsyncSession) -> list[dict]: + """running + 최근 6h 완료 background_jobs. 테이블 없거나 오류면 [] (보드 무영향). + + 요청 세션과 **별도 connection**으로 조회한다 — 테이블 부재(마이그 357 미적용 등) 시 + SELECT 실패가 요청 세션의 트랜잭션을 오염시키지 않도록 물리적으로 분리(실패 시 그 + 임시 connection만 폐기). 관측은 부가 기능이라 보드 본체를 절대 깨면 안 된다. + """ + try: + async with session.bind.connect() as conn: # 풀에서 독립 connection + rows = (await conn.execute(text(_BACKGROUND_JOBS_SQL))).mappings().all() + except Exception: # noqa: BLE001 — 관측 부가, 보드 본체 보호 + return [] + return [ + { + "id": r["id"], "kind": r["kind"], "label": r["label"], "state": r["state"], + "processed": int(r["processed"] or 0), "total": r["total"], + "elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]), + "error": r["error"], + } + for r in rows + ] # ─── 실패 처리 (plan ds-board-engines-1) ───────────────────────────────────── diff --git a/frontend/src/lib/components/ProcessingFlowBoard.svelte b/frontend/src/lib/components/ProcessingFlowBoard.svelte index 16752dc..c5272a1 100644 --- a/frontend/src/lib/components/ProcessingFlowBoard.svelte +++ b/frontend/src/lib/components/ProcessingFlowBoard.svelte @@ -210,6 +210,19 @@ // 맥북이 요약을 실제로 가져가는 중인가 (합류 표식 게이트) const offloadActive = $derived(split.macbook.done_1h > 0); + // ─── 백그라운드 작업 (큐 밖 스크립트 backfill) — processing_queue 사각지대 노출 ─── + const bgJobs = $derived(overview.background_jobs ?? []); + function fmtElapsed(s: number): string { + if (s < 60) return `${s}s`; + if (s < 3600) return `${Math.floor(s / 60)}m`; + return `${Math.floor(s / 3600)}h${Math.floor((s % 3600) / 60)}m`; + } + function bgDot(j: { state: string; stale: boolean }): string { + if (j.state === 'running') return j.stale ? 'bg-warning' : 'bg-success'; + if (j.state === 'failed') return 'bg-error'; + return 'bg-faint'; + } + // ─── 지배 백로그 = 요약. 정직 ETA(유입 차감) — summarize_eta ─── const eta = $derived(overview.summarize_eta); // 정직 ETA 라벨: eta_minutes null = 유입이 소화를 앞섬(소진 불가) @@ -466,6 +479,32 @@ {/if} + + {#if bgJobs.length > 0} +
+
백그라운드 작업
+
+ {#each bgJobs as j (j.id)} +
+
+ + {j.kind} + {j.label ?? '작업'} + + {#if j.total}{j.processed.toLocaleString()}/{j.total.toLocaleString()}{:else}{j.processed.toLocaleString()}건{/if} · {fmtElapsed(j.elapsed_sec)} + +
+ {#if j.stale} +
heartbeat 끊김 — 프로세스 중단 추정 (재개 필요할 수 있음)
+ {:else if j.state === 'failed'} +
실패{#if j.error} · {j.error}{/if}
+ {/if} +
+ {/each} +
+
+ {/if} + {#if failOpen}
diff --git a/frontend/src/lib/types/queue.ts b/frontend/src/lib/types/queue.ts index 0915bbf..32d3c23 100644 --- a/frontend/src/lib/types/queue.ts +++ b/frontend/src/lib/types/queue.ts @@ -75,6 +75,20 @@ export interface QueueStageRow { oldest_pending_age_sec: number | null; } +/** 큐 밖 관리 스크립트(백필 등) 작업 — processing_queue 가 못 보는 사각지대. + * stale = running 인데 heartbeat 끊김(프로세스 사망 추정). */ +export interface BackgroundJob { + id: number; + kind: string; + label: string | null; + state: 'running' | 'done' | 'failed'; + processed: number; + total: number | null; + elapsed_sec: number; + stale: boolean; + error: string | null; +} + export interface QueueOverview { machines: MachineOverview[]; summarize_eta: SummarizeEta; @@ -82,6 +96,7 @@ export interface QueueOverview { trend_24h: TrendPoint[]; stages: QueueStageRow[]; totals: QueueTotals; + background_jobs?: BackgroundJob[]; } /** ─── 실패 처리 (ds-board-engines-1) — GET /api/queue/failed · POST /retry|/skip ─── */ diff --git a/migrations/357_background_jobs.sql b/migrations/357_background_jobs.sql new file mode 100644 index 0000000..b7fae24 --- /dev/null +++ b/migrations/357_background_jobs.sql @@ -0,0 +1,19 @@ +-- 2026-06-14 PR-Background-Jobs-Observability: 큐 밖 관리 스크립트(백필 등) 진행 가시화. +-- processing_queue 는 파이프라인 stage 전용 — hier_overnight_backfill / section_summary_pilot +-- 같은 off-queue 관리 스크립트는 여기에 진행상황을 남겨 대시보드 보드가 노출한다. +-- worker_jobs(user_id NOT NULL, worker-pool 전용)와 별개 — 이건 owner 없는 관리 작업 heartbeat. +-- 단일 statement (asyncpg multi-statement 불허 컨벤션). 인덱스는 소량 테이블이라 생략. +CREATE TABLE IF NOT EXISTS background_jobs ( + id BIGSERIAL PRIMARY KEY, + kind TEXT NOT NULL, -- 'hier_redecompose' | 'section_summary' | ... + label TEXT, -- 사람이 읽는 대상 표기 (예: 'doc 5210 (Sec VIII)') + state TEXT NOT NULL DEFAULT 'running' + CHECK (state IN ('running', 'done', 'failed')), + processed INTEGER NOT NULL DEFAULT 0, -- 처리한 단위 수 (절/leaf 등) + total INTEGER, -- 전체 단위 수 (미상이면 NULL) + detail JSONB NOT NULL DEFAULT '{}'::jsonb, + error TEXT, + started_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + finished_at TIMESTAMPTZ +); diff --git a/scripts/hier_overnight_backfill.py b/scripts/hier_overnight_backfill.py index 095fd07..44ea269 100644 --- a/scripts/hier_overnight_backfill.py +++ b/scripts/hier_overnight_backfill.py @@ -32,6 +32,7 @@ from core.config import settings from services.hier_decomp.builder import build_hier_tree from services.hier_decomp.persist import persist_hier_tree from services.search.llm_gate import Priority, acquire_mlx_gate +from services.background_jobs import finish_job, heartbeat, start_job # 단일 진실: 절 분석 상수/헬퍼 (PROMPT_VERSION 일치 = 멱등 보존) from section_summary_pilot import ( @@ -140,8 +141,10 @@ def _make_engine(): return create_async_engine(os.environ["DATABASE_URL"], pool_pre_ping=True) -async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at): - """doc 의 미분석 hier leaf 분석 → upsert. stop_at(epoch) 넘으면 leaf 경계 중단.""" +async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at, + engine=None, job_id=None, base_processed=0): + """doc 의 미분석 hier leaf 분석 → upsert. stop_at(epoch) 넘으면 leaf 경계 중단. + engine/job_id 주어지면 background_jobs 에 ~10절마다 진행 heartbeat(보드 가시화).""" rows = (await session.execute(LEAF_SQL, {"doc": doc_id, "pv": PROMPT_VERSION})).mappings().all() ok = fail = skip = 0 timings, types = [], [] @@ -187,6 +190,8 @@ async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, s "content_hash": r["content_hash"], "error": err, }) await session.commit() + if job_id and (ok + fail + skip) % 10 == 0: + await heartbeat(engine, job_id, processed=base_processed + ok + fail + skip) await session.commit() return {"ok": ok, "fail": fail, "skip": skip, "leaves": len(rows), "timings": timings, "types": types, "aborted": aborted} @@ -256,6 +261,12 @@ async def cmd_run(args): _candidate_params(allowlist, doc_ids))).mappings().all() _log(f"후보 doc {len(cands)} 선별. 시작.") + # 관측: 큐 밖 작업이라 대시보드 보드가 못 보므로 background_jobs 에 진행 노출(best-effort) + _job_kind = "hier_redecompose" if reprocess else "hier_backfill" + _job_label = (f"doc {args.doc} {'재분해' if reprocess else '분해'}" if doc_ids + else f"{len(cands)}개 문서 {'재분해' if reprocess else '분해'}") + job_id = await start_job(engine, _job_kind, _job_label, total=None) + for c in cands: if time.time() >= stop_at: _log(f"⏰ deadline 버퍼 도달 — doc 경계에서 중단 (처리 {tot_docs} doc)") @@ -272,7 +283,10 @@ async def cmd_run(args): "timings": [], "types": [], "aborted": False} else: async with sm() as session: - astat = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at) + astat = await _analyze_doc_leaves( + session, client, doc_id, doc_domain, model_name, stop_at, + engine=engine, job_id=job_id, + base_processed=(tot_ok + tot_fail + tot_skip)) except Exception as exc: _log(f" ✗ doc={doc_id} 처리 실패(건너뜀): {type(exc).__name__}: {repr(exc)[:160]}") continue @@ -280,6 +294,8 @@ async def cmd_run(args): tot_docs += 1 tot_ok += astat["ok"]; tot_fail += astat["fail"]; tot_skip += astat["skip"] all_timings += astat["timings"]; all_types += astat["types"] + await heartbeat(engine, job_id, processed=(tot_ok + tot_fail + tot_skip), + total=tot_leaves_created) avg = statistics.mean(astat["timings"]) if astat["timings"] else 0 _log(f" ✓ doc={doc_id} ({len(body):,}자 {doc_domain.split('/')[0]}) " f"leaf생성={leaves_created} 분석ok={astat['ok']} fail={astat['fail']} skip={astat['skip']} " @@ -287,6 +303,7 @@ async def cmd_run(args): if astat["aborted"]: _log("⏰ leaf 분석 중 deadline 도달 — 중단") break + await finish_job(engine, job_id, state="done") finally: await client.close() await engine.dispose()