"""처리 머신 보드 + ETA 집계 (plan ds-processing-ui-6an, 안2+안5/6). GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue / documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약). 구조: SQL 수집부(build_overview 내부 5쿼리)와 판정부(순수 함수)를 분리. 판정부(rows_to_* / build_machines / build_summarize_eta / build_trend / build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능. 귀속 규칙 (단일 진실): - stage→machine 정적 맵: gpu = extract/embed/chunk/markdown/preview/thumbnail/ fulltext/stt · macmini = classify/summarize · macbook = deep_summary (단, settings.ai.deep 부재 시 deep_summary 도 macmini 귀속). - summarize 는 풀(pool): pending/processing/failed 는 macmini 귀속이되, 완료 실적(done_*)은 documents.ai_model_version 조인으로 분리 — 'qwen-macbook' 이면 macbook 실적, 아니면 macmini 실적. - deferred_pending(payload.deferred_until 미래)은 macbook 카드 귀속 (보류 = 맥북 불가 신호). """ from datetime import datetime, timedelta from posixpath import basename from zoneinfo import ZoneInfo from sqlalchemy import bindparam, text from sqlalchemy.ext.asyncio import AsyncSession from core.config import settings KST = ZoneInfo("Asia/Seoul") # 내부 판별용 alias — 응답에 raw 모델명 노출 금지, 머신 label 만 노출. _MACBOOK_MODEL_ALIAS = "qwen-macbook" # stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서) _GPU_STAGES = ( "extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt", ) _MACMINI_STAGES = ("classify", "summarize") _MACBOOK_STAGES = ("deep_summary",) _STAGE_ORDER = _GPU_STAGES + _MACMINI_STAGES + _MACBOOK_STAGES _MACHINE_KEYS = ("gpu", "macmini", "macbook") _MACHINE_LABELS = { "gpu": "GPU 서버", "macmini": "맥미니", "macbook": "맥북 M5 Max", } # 머신 카드당 current 표시 상한 _CURRENT_LIMIT = 2 def stage_machine_map(deep_enabled: bool) -> dict[str, str]: """stage → machine key 맵. deep 슬롯 부재 시 deep_summary 는 macmini 귀속.""" mapping: dict[str, str] = {} for s in _GPU_STAGES: mapping[s] = "gpu" for s in _MACMINI_STAGES: mapping[s] = "macmini" for s in _MACBOOK_STAGES: mapping[s] = "macbook" if deep_enabled else "macmini" return mapping def _zero_stage() -> dict: return { "pending": 0, "processing": 0, "failed": 0, "done_1h": 0, "done_today": 0, "done_15m": 0, "deferred_pending": 0, "created_1h": 0, "oldest_pending_at": None, } def rows_to_stage_stats(rows) -> dict[str, dict]: """stage×status 집계 쿼리 행 → {stage: {pending, ..., created_1h}} 변환.""" stats: dict[str, dict] = {} for row in rows: stats[row[0]] = { "pending": int(row[1] or 0), "processing": int(row[2] or 0), "failed": int(row[3] or 0), "done_1h": int(row[4] or 0), "done_today": int(row[5] or 0), "done_15m": int(row[6] or 0), "deferred_pending": int(row[7] or 0), "created_1h": int(row[8] or 0), "oldest_pending_at": row[9] if len(row) > 9 else None, } return stats def rows_to_summarize_split(rows) -> dict[str, dict]: """summarize 완료 실적 분리 쿼리 행 → {"macbook"|"macmini": {done_*}}. is_macbook = documents.ai_model_version 이 'qwen-macbook' 인지 (내부 판별 전용). """ split = { "macbook": {"done_1h": 0, "done_today": 0, "done_15m": 0}, "macmini": {"done_1h": 0, "done_today": 0, "done_15m": 0}, } for row in rows: key = "macbook" if row[0] else "macmini" split[key]["done_1h"] += int(row[1] or 0) split[key]["done_today"] += int(row[2] or 0) split[key]["done_15m"] += int(row[3] or 0) return split def display_title(row: dict) -> str: """표시용 제목 — title > original_filename > file_path basename > 문서 id.""" if row.get("title"): return row["title"] if row.get("original_filename"): return row["original_filename"] if row.get("file_path"): return basename(row["file_path"].rstrip("/")) return f"문서 #{row['document_id']}" def build_machines( stage_stats: dict[str, dict], summarize_split: dict[str, dict], current_rows: list[dict], *, deep_enabled: bool, ) -> list[dict]: """머신 카드 3장 (gpu / macmini / macbook) 구성 — 귀속 규칙의 판정부.""" smap = stage_machine_map(deep_enabled) def g(stage: str, field: str) -> int: return stage_stats.get(stage, {}).get(field, 0) # current 귀속: processing 행을 머신별 최대 2건 (summarize processing → macmini) current_by_machine: dict[str, list[dict]] = {k: [] for k in _MACHINE_KEYS} for row in current_rows: machine = smap.get(row["stage"]) if machine and len(current_by_machine[machine]) < _CURRENT_LIMIT: current_by_machine[machine].append({ "document_id": row["document_id"], "title": display_title(row), "stage": row["stage"], }) machines = [] for key in _MACHINE_KEYS: stages = [s for s in _STAGE_ORDER if smap[s] == key] pending = sum(g(s, "pending") for s in stages) processing = sum(g(s, "processing") for s in stages) failed = sum(g(s, "failed") for s in stages) # 완료 실적: summarize 는 풀이라 stage 합산에서 제외하고 split 로 귀속 done_1h = sum(g(s, "done_1h") for s in stages if s != "summarize") done_today = sum(g(s, "done_today") for s in stages if s != "summarize") done_15m = sum(g(s, "done_15m") for s in stages if s != "summarize") if key in summarize_split: done_1h += summarize_split[key]["done_1h"] done_today += summarize_split[key]["done_today"] done_15m += summarize_split[key]["done_15m"] # 보류 백오프 = 맥북 불가 신호 → macbook 카드 귀속 (deep 슬롯 유무 무관) deferred_pending = ( g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending") if key == "macbook" else 0 ) # state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11). # 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" — # 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은 # 실제로 일이 멈춰 있고 백오프만 쌓인 상태(sleep/불가 지속)에서만. if processing > 0 or done_15m > 0: state = "active" elif key == "macbook" and deferred_pending > 0: state = "deferred" else: state = "idle" machines.append({ "key": key, "label": _MACHINE_LABELS[key], "state": state, "stages": stages, "pending": pending, "processing": processing, "failed": failed, "done_1h": done_1h, "done_today": done_today, "deferred_pending": deferred_pending, "current": current_by_machine[key], }) return machines def compute_eta_minutes(pending: int, done_1h: int, inflow_1h: int) -> int | None: """ETA(분) = 순소화율 기반. done > inflow 일 때만 산출, 아니면 None (소화 불가).""" if done_1h > inflow_1h: return round(pending / (done_1h - inflow_1h) * 60) return None def build_summarize_eta(stage_stats: dict[str, dict]) -> dict: """summarize 풀 ETA — pending 은 보류(deferred) 포함 총수.""" s = stage_stats.get("summarize", _zero_stage()) pending = s["pending"] done_rate = s["done_1h"] inflow_rate = s["created_1h"] return { "pending": pending, "done_rate_1h": done_rate, "inflow_rate_1h": inflow_rate, "eta_minutes": compute_eta_minutes(pending, done_rate, inflow_rate), } def build_summarize_by_machine(summarize_split: dict[str, dict]) -> dict: """summarize 머신별 완료 실적 분담 (macmini vs macbook) — 보드 레인의 오프로드 가시화용. rows_to_summarize_split 이 이미 만든 값을 응답 형태로 투영(done_1h/done_today 만, done_15m 은 내부 state 판정 전용이라 제외).""" def m(key: str) -> dict: s = summarize_split.get(key, {}) return {"done_1h": int(s.get("done_1h", 0)), "done_today": int(s.get("done_today", 0))} return {"macmini": m("macmini"), "macbook": m("macbook")} def build_trend( inflow_buckets: dict[str, int], done_buckets: dict[str, int], now_kst: datetime, ) -> list[dict]: """summarize 24h 추이 — KST 시간 버킷 24개 (오래된 것부터, 빈 버킷 0). 버킷 key = "YYYY-MM-DD HH:00" (KST). SQL to_char 출력과 동일 포맷. """ base = now_kst.replace(minute=0, second=0, microsecond=0) trend = [] for i in range(23, -1, -1): bucket = base - timedelta(hours=i) key = bucket.strftime("%Y-%m-%d %H:00") trend.append({ "hour": bucket.strftime("%H:00"), "inflow": inflow_buckets.get(key, 0), "done": done_buckets.get(key, 0), }) return trend def build_stages(stage_stats: dict[str, dict], now=None) -> list[dict]: """단계별 현황 행 — '단계 상세' 패널용 (2026-06-11 사용자 피드백: 완료가 보여야 한다). 파이프라인 순서 유지, 미지 stage 는 뒤에. 숨김/강조 판단은 FE 몫 — 여기선 사실만. oldest_pending_age_sec = 가장 오래된 pending 의 경과 초 (pending 없으면 None). """ from datetime import datetime, timezone now = now or datetime.now(timezone.utc) extra = [s for s in stage_stats if s not in _STAGE_ORDER] rows = [] for stage in [*_STAGE_ORDER, *extra]: st = stage_stats.get(stage) or _zero_stage() oldest = st.get("oldest_pending_at") age = None if oldest is not None: if oldest.tzinfo is None: oldest = oldest.replace(tzinfo=timezone.utc) age = max(0, int((now - oldest).total_seconds())) rows.append({ "stage": stage, "pending": st["pending"], "processing": st["processing"], "failed": st["failed"], "done_1h": st["done_1h"], "created_1h": st["created_1h"], "done_today": st["done_today"], "oldest_pending_age_sec": age, }) return rows def build_totals(stage_stats: dict[str, dict]) -> dict: """전 stage 합계.""" return { "pending": sum(s["pending"] for s in stage_stats.values()), "processing": sum(s["processing"] for s in stage_stats.values()), "failed": sum(s["failed"] for s in stage_stats.values()), } def compose_overview( stage_stats: dict[str, dict], summarize_split: dict[str, dict], inflow_buckets: dict[str, int], done_buckets: dict[str, int], current_rows: list[dict], *, deep_enabled: bool, now_kst: datetime, ) -> dict: """수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요.""" return { "machines": build_machines( stage_stats, summarize_split, current_rows, deep_enabled=deep_enabled ), "stages": build_stages(stage_stats), "summarize_eta": build_summarize_eta(stage_stats), "summarize_by_machine": build_summarize_by_machine(summarize_split), "trend_24h": build_trend(inflow_buckets, done_buckets, now_kst), "totals": build_totals(stage_stats), } # ─── SQL 수집부 (총 5쿼리) ──────────────────────────────────────────────────── # 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방) _STAGE_STATS_SQL = """ SELECT stage, COUNT(*) FILTER (WHERE status = 'pending') AS pending, COUNT(*) FILTER (WHERE status = 'processing') AS processing, COUNT(*) FILTER (WHERE status = 'failed') AS failed, COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') AS done_1h, COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > :kst_midnight) AS done_today, COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '15 minutes') AS done_15m, COUNT(*) FILTER (WHERE status = 'pending' AND payload ->> 'deferred_until' IS NOT NULL AND (payload ->> 'deferred_until')::timestamptz > NOW()) AS deferred_pending, COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '1 hour') AS created_1h, MIN(created_at) FILTER (WHERE status = 'pending') AS oldest_pending_at FROM processing_queue GROUP BY stage """ # 2) summarize 풀 완료 실적 분리 (documents.ai_model_version 조인, 1방) # 스캔 하한 = 오늘 0시(KST)와 1h 전 중 더 이른 시각 (자정 직후 1h 창 보전). _SUMMARIZE_SPLIT_SQL = """ SELECT COALESCE(d.ai_model_version = :macbook_alias, false) AS is_macbook, COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '1 hour') AS done_1h, COUNT(*) FILTER (WHERE q.completed_at > :kst_midnight) AS done_today, COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '15 minutes') AS done_15m FROM processing_queue q JOIN documents d ON d.id = q.document_id WHERE q.stage = 'summarize' AND q.status = 'completed' AND q.completed_at > LEAST(:kst_midnight, NOW() - INTERVAL '1 hour') GROUP BY 1 """ # 3/4) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방) _TREND_INFLOW_SQL = """ SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'), 'YYYY-MM-DD HH24:00') AS bucket, COUNT(*) AS n FROM processing_queue WHERE stage = 'summarize' AND created_at > NOW() - INTERVAL '24 hours' GROUP BY 1 """ _TREND_DONE_SQL = """ SELECT to_char(date_trunc('hour', completed_at AT TIME ZONE 'Asia/Seoul'), 'YYYY-MM-DD HH24:00') AS bucket, COUNT(*) AS n FROM processing_queue WHERE stage = 'summarize' AND status = 'completed' AND completed_at > NOW() - INTERVAL '24 hours' GROUP BY 1 """ # 5) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서) _CURRENT_SQL = """ SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path FROM processing_queue q JOIN documents d ON d.id = q.document_id WHERE q.status = 'processing' ORDER BY q.started_at DESC NULLS LAST LIMIT 50 """ async def build_overview(session: AsyncSession) -> dict: """5쿼리 수집 → compose_overview 판정 → 응답 dict.""" now_kst = datetime.now(KST) kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0) deep_enabled = settings.ai is not None and settings.ai.deep is not None stage_rows = ( await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight}) ).all() split_rows = ( await session.execute( text(_SUMMARIZE_SPLIT_SQL), {"kst_midnight": kst_midnight, "macbook_alias": _MACBOOK_MODEL_ALIAS}, ) ).all() inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all() done_rows = (await session.execute(text(_TREND_DONE_SQL))).all() current_result = (await session.execute(text(_CURRENT_SQL))).all() current_rows = [ { "stage": row[0], "document_id": row[1], "title": row[2], "original_filename": row[3], "file_path": row[4], } for row in current_result ] result = compose_overview( rows_to_stage_stats(stage_rows), rows_to_summarize_split(split_rows), {row[0]: int(row[1]) for row in inflow_rows}, {row[0]: int(row[1]) for row in done_rows}, current_rows, deep_enabled=deep_enabled, now_kst=now_kst, ) # 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]). result["background_jobs"] = await _fetch_background_jobs(session) return result # kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = gpu(오케스트레이션 호스트). _BG_JOB_MACHINE = { "global_digest": "macmini", "morning_briefing": "macmini", "section_summary": "macmini", "hier_backfill": "gpu", "hier_redecompose": "gpu", } _BACKGROUND_JOBS_SQL = """ SELECT id, kind, label, state, processed, total, EXTRACT(EPOCH FROM (now() - started_at))::int AS elapsed_sec, (state = 'running' AND updated_at < now() - interval '5 minutes') AS stale, error FROM background_jobs WHERE state = 'running' OR finished_at > now() - interval '6 hours' ORDER BY (state = 'running') DESC, started_at DESC LIMIT 20 """ async def _fetch_background_jobs(session: AsyncSession) -> list[dict]: """running + 최근 6h 완료 background_jobs. 테이블 없거나 오류면 [] (보드 무영향). 요청 세션과 **별도 connection**으로 조회한다 — 테이블 부재(마이그 357 미적용 등) 시 SELECT 실패가 요청 세션의 트랜잭션을 오염시키지 않도록 물리적으로 분리(실패 시 그 임시 connection만 폐기). 관측은 부가 기능이라 보드 본체를 절대 깨면 안 된다. """ try: async with session.bind.connect() as conn: # 풀에서 독립 connection rows = (await conn.execute(text(_BACKGROUND_JOBS_SQL))).mappings().all() except Exception: # noqa: BLE001 — 관측 부가, 보드 본체 보호 return [] return [ { "id": r["id"], "kind": r["kind"], "label": r["label"], "state": r["state"], "processed": int(r["processed"] or 0), "total": r["total"], "elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]), "error": r["error"], "machine": _BG_JOB_MACHINE.get(r["kind"], "gpu"), } for r in rows ] # ─── 실패 처리 (plan ds-board-engines-1) ───────────────────────────────────── # 실패 = 자동 재시도(max_attempts=3) 소진 후 영구 정지 상태. 여기 함수들은 # 사용자 명시 조치 전용 — 자동 호출 경로 없음 (보드 실패 드로어가 유일 호출자). # 실패 행은 completed_at 이 비어 있을 수 있어(소비자 실패 경로가 미기록) # started_at 을 시각 fallback 으로 쓴다. _FAILED_LIST_SQL = """ SELECT q.id, q.stage, q.document_id, q.attempts, q.max_attempts, q.error_message, COALESCE(q.completed_at, q.started_at) AS failed_at, d.title, d.original_filename, d.file_path FROM processing_queue q JOIN documents d ON d.id = q.document_id WHERE q.status = 'failed' ORDER BY q.stage, COALESCE(q.completed_at, q.started_at) DESC NULLS LAST LIMIT 300 """ # 재시도: failed → pending (attempts 리셋 = 자동 재시도 3회 새로 부여). # error_message 는 감사용으로 보존 — 성공 시 완료 행에 남아도 무해. # uq_queue_active((doc,stage) pending/processing 부분 유니크)와 충돌하는 행 — # 같은 문서·단계가 이미 재enqueue 된 경우 — 는 건드리지 않고 건수만 보고. _RETRY_SQL = """ UPDATE processing_queue q SET status = 'pending', attempts = 0, started_at = NULL, completed_at = NULL WHERE q.id IN :ids AND q.status = 'failed' AND NOT EXISTS ( SELECT 1 FROM processing_queue p WHERE p.document_id = q.document_id AND p.stage = q.stage AND p.status IN ('pending', 'processing') AND p.id <> q.id ) RETURNING q.id """ # 건너뛰기: failed → completed + payload 마킹 (감사 추적). # enqueue_next_stage 는 의도적으로 호출하지 않는다 — 실패 문서(빈 텍스트 등)가 # 하류 단계로 흘러가는 것 방지. 후속 단계가 필요하면 재시도가 정상 경로. _SKIP_SQL = """ UPDATE processing_queue SET status = 'completed', completed_at = NOW(), payload = COALESCE(payload, '{}'::jsonb) || jsonb_build_object('skipped_by_user', true, 'skipped_at', NOW()::text) WHERE id IN :ids AND status = 'failed' RETURNING id """ async def fetch_failed_items(session: AsyncSession) -> list[dict]: """영구 실패 행 목록 (문서 제목 포함, 최대 300건).""" rows = (await session.execute(text(_FAILED_LIST_SQL))).all() return [ { "id": r[0], "stage": r[1], "document_id": r[2], "attempts": int(r[3] or 0), "max_attempts": int(r[4] or 0), "error_message": r[5], "failed_at": r[6], "title": display_title({ "document_id": r[2], "title": r[7], "original_filename": r[8], "file_path": r[9], }), } for r in rows ] async def retry_failed(session: AsyncSession, ids: list[int]) -> dict: """failed → pending 복귀. not_retried = active 충돌 + 이미 failed 아님.""" unique_ids = list(set(ids)) stmt = text(_RETRY_SQL).bindparams(bindparam("ids", expanding=True)) retried = (await session.execute(stmt, {"ids": unique_ids})).all() await session.commit() return { "requested": len(unique_ids), "retried": len(retried), "not_retried": len(unique_ids) - len(retried), } async def skip_failed(session: AsyncSession, ids: list[int]) -> dict: """failed → completed(건너뛰기 마킹). 후속 단계 연쇄 없음.""" unique_ids = list(set(ids)) stmt = text(_SKIP_SQL).bindparams(bindparam("ids", expanding=True)) skipped = (await session.execute(stmt, {"ids": unique_ids})).all() await session.commit() return { "requested": len(unique_ids), "skipped": len(skipped), "not_skipped": len(unique_ids) - len(skipped), }