Files
hyungi ebbcaf86d8 feat(observability): 큐 밖 백그라운드 작업(backfill)을 처리 머신 보드에 노출
processing_queue 는 파이프라인 stage 전용이라 hier_overnight_backfill 같은 off-queue
관리 스크립트 작업이 대시보드 보드에 안 잡혀, 다른 세션이 모르고 fastapi 를 재생성해
in-flight 재분해를 끊는 사고가 발생(2026-06-14). 사각지대 해소.

- migrations/357_background_jobs.sql: background_jobs 테이블(kind/label/state/processed/
  total/heartbeat). worker_jobs(user_id 필수, worker-pool 전용)와 별개.
- services/background_jobs.py: start/heartbeat/finish 헬퍼 — 자율 트랜잭션(즉시 commit →
  실시간 가시화) + best-effort(관측 실패가 본작업 안 깸).
- hier_overnight_backfill: 작업 시작/절 ~10개마다 heartbeat/종료 계측.
- queue_overview: /api/queue/overview 응답에 background_jobs 추가(running + 최근 6h 완료,
  stale=heartbeat 끊김 추정). SAVEPOINT 로 테이블 부재/오류 시 보드 본체 무영향.
- ProcessingFlowBoard: "백그라운드 작업" 패널(진행/경과/state, stale 끊김 경고).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-14 12:27:18 +09:00

94 lines
3.7 KiB
Python

"""off-queue 관리 스크립트(백필 등) 진행 가시화 — background_jobs (migration 357).
processing_queue 는 파이프라인 stage 전용이라 hier_overnight_backfill /
section_summary_pilot 같은 스크립트 작업은 대시보드 보드에 안 잡힌다. 이 모듈로
스크립트가 진행상황을 남기면 queue_overview 가 "백그라운드 작업" 패널로 노출한다.
설계 불변식:
- **자율 트랜잭션**: 각 기록은 engine.begin() 짧은 트랜잭션으로 즉시 commit한다.
스크립트 본 작업은 별도 세션(긴 트랜잭션)이라, 같이 묶으면 commit 전까지 안 보여
실시간 가시화가 깨진다. 그래서 전용 connection 으로 독립 commit.
- **best-effort**: 관측 기록 실패가 본 작업을 깨면 안 된다 — 모든 함수 try/except,
실패 시 warning 로그만. job_id=None 이면 조용히 no-op (start 실패해도 이어서 동작).
"""
import json
import logging
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine
logger = logging.getLogger(__name__)
async def start_job(
engine: AsyncEngine, kind: str, label: str | None = None, total: int | None = None
) -> int | None:
"""작업 시작 기록 → background_jobs.id (실패 시 None — 호출측은 그대로 진행)."""
try:
async with engine.begin() as conn:
row = (
await conn.execute(
text(
"INSERT INTO background_jobs (kind, label, total) "
"VALUES (:k, :l, :t) RETURNING id"
),
{"k": kind, "l": label, "t": total},
)
).first()
return int(row[0]) if row else None
except Exception as exc: # noqa: BLE001 — 관측은 부가, 본작업 보호
logger.warning(f"[background_jobs] start 실패(무시): {type(exc).__name__}: {exc}")
return None
async def heartbeat(
engine: AsyncEngine,
job_id: int | None,
*,
processed: int | None = None,
total: int | None = None,
detail: dict | None = None,
) -> None:
"""진행 갱신(processed/total/detail). job_id=None 또는 실패 시 no-op."""
if job_id is None:
return
try:
async with engine.begin() as conn:
await conn.execute(
text(
"UPDATE background_jobs SET "
"processed = COALESCE(:p, processed), "
"total = COALESCE(:t, total), "
"detail = COALESCE(CAST(:d AS jsonb), detail), "
"updated_at = now() WHERE id = :id"
),
{
"id": job_id,
"p": processed,
"t": total,
"d": json.dumps(detail, ensure_ascii=False) if detail is not None else None,
},
)
except Exception as exc: # noqa: BLE001
logger.warning(f"[background_jobs] heartbeat 실패(무시): {type(exc).__name__}: {exc}")
async def finish_job(
engine: AsyncEngine, job_id: int | None, *, state: str = "done", error: str | None = None
) -> None:
"""종료 기록(done/failed). job_id=None 또는 실패 시 no-op."""
if job_id is None:
return
try:
async with engine.begin() as conn:
await conn.execute(
text(
"UPDATE background_jobs SET state = :s, error = :e, "
"finished_at = now(), updated_at = now() WHERE id = :id"
),
{"id": job_id, "s": state, "e": (error or None)},
)
except Exception as exc: # noqa: BLE001
logger.warning(f"[background_jobs] finish 실패(무시): {type(exc).__name__}: {exc}")