diff --git a/app/services/briefing/pipeline.py b/app/services/briefing/pipeline.py index 3d7df8d..d92252b 100644 --- a/app/services/briefing/pipeline.py +++ b/app/services/briefing/pipeline.py @@ -16,7 +16,9 @@ from sqlalchemy import delete from ai.client import AIClient from core.database import async_session +from core.database import engine as db_engine from core.utils import setup_logger +from services import background_jobs as bgj from models.briefing import BriefingTopic, MorningBriefing from services.briefing.clustering import LAMBDA, cluster_global from services.briefing.comparator import ( @@ -143,7 +145,7 @@ async def _save_briefing( return new.id -async def run_briefing_pipeline(target_date: date | None = None) -> dict[str, Any]: +async def run_briefing_pipeline(target_date: date | None = None, job_id: int | None = None) -> dict[str, Any]: """야간 뉴스 브리핑 1회 실행. cron 또는 수동 regenerate API 에서 호출. Returns: @@ -217,10 +219,18 @@ async def run_briefing_pipeline(target_date: date | None = None) -> dict[str, An ) jobs.append((rank, cluster, selected, historical_docs)) + if job_id is not None: + await bgj.heartbeat(db_engine, job_id, total=len(jobs)) + _prog = {"n": 0} + async def _run_one(cluster, selected, historical_docs): - return await compare_cluster_with_fallback( + r = await compare_cluster_with_fallback( client, cluster, selected, historical_docs=historical_docs ) + if job_id is not None: + _prog["n"] += 1 + await bgj.heartbeat(db_engine, job_id, processed=_prog["n"]) + return r results = await asyncio.gather( *[_run_one(c, s, h) for (_, c, s, h) in jobs] diff --git a/app/services/digest/pipeline.py b/app/services/digest/pipeline.py index 42c9266..0a08c33 100644 --- a/app/services/digest/pipeline.py +++ b/app/services/digest/pipeline.py @@ -20,7 +20,9 @@ from sqlalchemy import delete from ai.client import AIClient from core.database import async_session +from core.database import engine as db_engine from core.utils import setup_logger +from services import background_jobs as bgj from models.digest import DigestTopic, GlobalDigest from .clustering import LAMBDA, cluster_country @@ -74,7 +76,7 @@ def _build_topic_row( ) -async def run_digest_pipeline() -> dict: +async def run_digest_pipeline(job_id: int | None = None) -> dict: """전체 파이프라인 실행. worker entry 에서 호출. Returns: @@ -119,8 +121,16 @@ async def run_digest_pipeline() -> dict: selected = select_for_llm(cluster) jobs.append((country, rank, cluster, selected)) + if job_id is not None: + await bgj.heartbeat(db_engine, job_id, total=len(jobs)) + _prog = {"n": 0} + async def _run_one(cluster, selected): - return await summarize_cluster_with_fallback(client, cluster, selected) + r = await summarize_cluster_with_fallback(client, cluster, selected) + if job_id is not None: + _prog["n"] += 1 + await bgj.heartbeat(db_engine, job_id, processed=_prog["n"]) + return r results = await asyncio.gather(*[_run_one(c, s) for (_, _, c, s) in jobs]) diff --git a/app/services/queue_overview.py b/app/services/queue_overview.py index 5168648..4c17e17 100644 --- a/app/services/queue_overview.py +++ b/app/services/queue_overview.py @@ -426,6 +426,16 @@ async def build_overview(session: AsyncSession) -> dict: return result +# kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = gpu(오케스트레이션 호스트). +_BG_JOB_MACHINE = { + "global_digest": "macmini", + "morning_briefing": "macmini", + "section_summary": "macmini", + "hier_backfill": "gpu", + "hier_redecompose": "gpu", +} + + _BACKGROUND_JOBS_SQL = """ SELECT id, kind, label, state, processed, total, EXTRACT(EPOCH FROM (now() - started_at))::int AS elapsed_sec, @@ -456,6 +466,7 @@ async def _fetch_background_jobs(session: AsyncSession) -> list[dict]: "processed": int(r["processed"] or 0), "total": r["total"], "elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]), "error": r["error"], + "machine": _BG_JOB_MACHINE.get(r["kind"], "gpu"), } for r in rows ] diff --git a/app/workers/briefing_worker.py b/app/workers/briefing_worker.py index 5ee9fcf..3d4c67c 100644 --- a/app/workers/briefing_worker.py +++ b/app/workers/briefing_worker.py @@ -9,7 +9,9 @@ import asyncio from datetime import date from core.config import settings +from core.database import engine as db_engine from core.utils import setup_logger +from services.background_jobs import finish_job, start_job from services.briefing.pipeline import run_briefing_pipeline logger = setup_logger("briefing_worker") @@ -27,19 +29,24 @@ async def run(target_date: date | None = None) -> dict | None: if "briefing" in settings.pipeline_held_stages: logger.info("[briefing] 보류 (pipeline.held_stages) — 이번 실행 skip") return None + # 보드 가시화: 큐 밖 cron 생성 작업이라 background_jobs 로 노출 (best-effort, 맥미니 귀속) + job_id = await start_job(db_engine, "morning_briefing", label="조간 브리핑 생성") try: result = await asyncio.wait_for( - run_briefing_pipeline(target_date), + run_briefing_pipeline(target_date, job_id=job_id), timeout=PIPELINE_HARD_CAP, ) + await finish_job(db_engine, job_id, state="done") logger.info(f"[briefing] 워커 완료: {result}") return result except asyncio.TimeoutError: + await finish_job(db_engine, job_id, state="failed", error=f"HARD CAP {PIPELINE_HARD_CAP}s 초과") logger.error( f"[briefing] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. " f"기존 briefing 은 commit 시점에만 갱신되므로 그대로 유지됨." ) except Exception as e: + await finish_job(db_engine, job_id, state="failed", error=str(e)[:300]) logger.exception(f"[briefing] 워커 실패: {e}") return None diff --git a/app/workers/digest_worker.py b/app/workers/digest_worker.py index dc81490..24c0b12 100644 --- a/app/workers/digest_worker.py +++ b/app/workers/digest_worker.py @@ -11,7 +11,9 @@ global_digests / digest_topics 테이블에 저장한다. import asyncio from core.config import settings +from core.database import engine as db_engine from core.utils import setup_logger +from services.background_jobs import finish_job, start_job from services.digest.pipeline import run_digest_pipeline logger = setup_logger("digest_worker") @@ -29,19 +31,24 @@ async def run() -> None: if "digest" in settings.pipeline_held_stages: logger.info("[global_digest] 보류 (pipeline.held_stages) — 이번 실행 skip") return + # 보드 가시화: 큐 밖 cron 생성 작업이라 background_jobs 로 노출 (best-effort, 맥미니 귀속) + job_id = await start_job(db_engine, "global_digest", label="글로벌 다이제스트 생성") try: result = await asyncio.wait_for( - run_digest_pipeline(), + run_digest_pipeline(job_id=job_id), timeout=PIPELINE_HARD_CAP, ) + await finish_job(db_engine, job_id, state="done") logger.info(f"[global_digest] 워커 완료: {result}") except asyncio.TimeoutError: + await finish_job(db_engine, job_id, state="failed", error=f"HARD CAP {PIPELINE_HARD_CAP}s 초과") logger.error( f"[global_digest] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. " f"기존 digest 는 commit 시점에만 갱신되므로 그대로 유지됨. " f"다음 cron 실행에서 재시도." ) except Exception as e: + await finish_job(db_engine, job_id, state="failed", error=str(e)[:300]) logger.exception(f"[global_digest] 워커 실패: {e}") diff --git a/frontend/src/lib/components/ProcessingFlowBoard.svelte b/frontend/src/lib/components/ProcessingFlowBoard.svelte index c5272a1..5ef80ca 100644 --- a/frontend/src/lib/components/ProcessingFlowBoard.svelte +++ b/frontend/src/lib/components/ProcessingFlowBoard.svelte @@ -212,6 +212,10 @@ // ─── 백그라운드 작업 (큐 밖 스크립트 backfill) — processing_queue 사각지대 노출 ─── const bgJobs = $derived(overview.background_jobs ?? []); + const runningBg = $derived(bgJobs.filter((j) => j.state === 'running')); + function bgForMachine(key: string) { + return runningBg.filter((j) => j.machine === key); + } function fmtElapsed(s: number): string { if (s < 60) return `${s}s`; if (s < 3600) return `${Math.floor(s / 60)}m`; @@ -333,10 +337,11 @@ {#each lanes as lane (lane.key)}
- + {lane.meta.label} {lane.meta.model} {formatRate(lane.card?.done_1h ?? 0)}/h + {#each bgForMachine(lane.key) as j (j.id)}생성 중: {j.label ?? j.kind}{#if j.total} {j.processed}/{j.total}{/if}{/each} {#if lane.key === 'macbook' && (lane.card?.deferred_pending ?? 0) > 0} 보류 {lane.card?.deferred_pending} {/if} diff --git a/frontend/src/lib/types/queue.ts b/frontend/src/lib/types/queue.ts index 32d3c23..882704c 100644 --- a/frontend/src/lib/types/queue.ts +++ b/frontend/src/lib/types/queue.ts @@ -82,6 +82,7 @@ export interface BackgroundJob { kind: string; label: string | null; state: 'running' | 'done' | 'failed'; + machine: string; processed: number; total: number | null; elapsed_sec: number;