f325bd0509
큐 밖 cron 생성 작업(global_digest/morning_briefing)이 processing_queue stage 가
아니라 보드에 안 잡혀, 맥미니가 11분짜리 digest 를 돌려도 idle 처럼 보였다.
ebbcaf8 의 background_jobs 메커니즘 재사용:
- digest_worker/briefing_worker = start_job→finish_job (best-effort, 본작업 무해)
- pipeline = cluster 완료마다 heartbeat(processed/total) → 진행바
- queue_overview = kind→machine 맵으로 payload 에 machine 필드 (맥미니 귀속)
- 보드 = 머신 레인에 dot 점등 + "생성 중: <label> N/T" 표시
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
56 lines
2.2 KiB
Python
56 lines
2.2 KiB
Python
"""Morning Briefing 워커 — 야간 수집 뉴스 (KST 00:00~05:00) topic×country 비교 분석.
|
||
|
||
- APScheduler cron (매일 05:10 KST, PR-3 에서 등록) + 수동 호출 공용 진입점
|
||
- PIPELINE_HARD_CAP = 600초 hard cap 으로 cron stuck 절대 방지
|
||
- 단독 실행: `python -m workers.briefing_worker`
|
||
"""
|
||
|
||
import asyncio
|
||
from datetime import date
|
||
|
||
from core.config import settings
|
||
from core.database import engine as db_engine
|
||
from core.utils import setup_logger
|
||
from services.background_jobs import finish_job, start_job
|
||
from services.briefing.pipeline import run_briefing_pipeline
|
||
|
||
logger = setup_logger("briefing_worker")
|
||
|
||
# 2026-06-15: config 단일소스 (digest 와 공유 키). 구 600s = 빠른 Gemma 기준.
|
||
PIPELINE_HARD_CAP = settings.digest_pipeline_hard_cap_s
|
||
|
||
|
||
async def run(target_date: date | None = None) -> dict | None:
|
||
"""APScheduler + 수동 호출 공용 진입점.
|
||
|
||
Args:
|
||
target_date: KST 기준 briefing_date (None = 오늘). API regenerate 가 명시 지정 가능.
|
||
"""
|
||
if "briefing" in settings.pipeline_held_stages:
|
||
logger.info("[briefing] 보류 (pipeline.held_stages) — 이번 실행 skip")
|
||
return None
|
||
# 보드 가시화: 큐 밖 cron 생성 작업이라 background_jobs 로 노출 (best-effort, 맥미니 귀속)
|
||
job_id = await start_job(db_engine, "morning_briefing", label="조간 브리핑 생성")
|
||
try:
|
||
result = await asyncio.wait_for(
|
||
run_briefing_pipeline(target_date, job_id=job_id),
|
||
timeout=PIPELINE_HARD_CAP,
|
||
)
|
||
await finish_job(db_engine, job_id, state="done")
|
||
logger.info(f"[briefing] 워커 완료: {result}")
|
||
return result
|
||
except asyncio.TimeoutError:
|
||
await finish_job(db_engine, job_id, state="failed", error=f"HARD CAP {PIPELINE_HARD_CAP}s 초과")
|
||
logger.error(
|
||
f"[briefing] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. "
|
||
f"기존 briefing 은 commit 시점에만 갱신되므로 그대로 유지됨."
|
||
)
|
||
except Exception as e:
|
||
await finish_job(db_engine, job_id, state="failed", error=str(e)[:300])
|
||
logger.exception(f"[briefing] 워커 실패: {e}")
|
||
return None
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(run())
|