"""GET /api/queue/overview 판정부 단위테스트 — DB 불요 (plan ds-processing-ui-6an). services/queue_overview 의 SQL 수집부와 분리된 순수 판정 함수 (stage_machine_map / build_machines / build_summarize_eta / build_trend / build_totals / compute_eta_minutes / rows_to_* / display_title) 를 mock 행으로 검증한다. 통합(실 SQL)은 배포 후 라이브 smoke 로 확인. 2026-07-02 컷오버 후 2노드(나스+맥미니) 기준 — 구 3노드 레인은 제거됨. """ from datetime import datetime from zoneinfo import ZoneInfo from services.queue_overview import ( build_machines, build_summarize_eta, build_totals, build_trend, compose_overview, compute_eta_minutes, display_title, rows_to_stage_stats, stage_machine_map, ) KST = ZoneInfo("Asia/Seoul") def _stage(**kw) -> dict: """stage 통계 1건 — 미지정 필드 0.""" base = { "pending": 0, "processing": 0, "failed": 0, "done_1h": 0, "done_today": 0, "done_15m": 0, "deferred_pending": 0, "created_1h": 0, } base.update(kw) return base def _machine(machines: list[dict], key: str) -> dict: return next(m for m in machines if m["key"] == key) # ─── stage→machine 귀속 맵 ──────────────────────────────────────────────────── def test_stage_machine_map_two_nodes(): smap = stage_machine_map() for s in ("extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt"): assert smap[s] == "nas" assert smap["classify"] == "macmini" assert smap["summarize"] == "macmini" assert smap["deep_summary"] == "macmini" # ─── 머신 카드 귀속 합산 ────────────────────────────────────────────────────── def test_nas_stage_counts_attribution(): stats = { "extract": _stage(pending=3, processing=1, done_1h=5, done_today=9, done_15m=1), "stt": _stage(failed=2, done_1h=1, done_today=2), } machines = build_machines(stats, []) nas = _machine(machines, "nas") assert (nas["pending"], nas["processing"], nas["failed"]) == (3, 1, 2) assert (nas["done_1h"], nas["done_today"]) == (6, 11) # nas 의 stages 는 정적 8종 전부 (집계 0 이어도 표시) assert nas["stages"] == [ "extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt", ] def test_macmini_llm_stages_attribution(): """classify/summarize/deep_summary 전부 macmini 귀속 (단일 생성 LLM 허브).""" stats = { "classify": _stage(done_1h=2, done_today=3), "summarize": _stage(pending=7, failed=1, done_1h=10, done_today=20), "deep_summary": _stage(pending=2, processing=1, done_1h=3, done_today=4), } machines = build_machines(stats, []) macmini = _machine(machines, "macmini") assert macmini["pending"] == 9 and macmini["failed"] == 1 assert macmini["processing"] == 1 assert macmini["done_1h"] == 2 + 10 + 3 assert macmini["done_today"] == 3 + 20 + 4 assert macmini["stages"] == ["classify", "summarize", "deep_summary"] assert _machine(machines, "nas")["pending"] == 0 def test_deferred_pending_on_macmini_card(): """보류(deferred_until 미래)는 summarize+deep_summary 합산으로 macmini 카드 귀속 (보류 = LLM 백오프 신호).""" stats = { "summarize": _stage(pending=5, deferred_pending=2), "deep_summary": _stage(pending=1, deferred_pending=1), } machines = build_machines(stats, []) assert _machine(machines, "macmini")["deferred_pending"] == 3 assert _machine(machines, "nas")["deferred_pending"] == 0 # ─── state 판정 ─────────────────────────────────────────────────────────────── def test_macmini_state_active_wins_over_deferred_while_working(): """가동 > 보류 (사용자 피드백 2026-06-11): 일하고 있으면 백오프 잔여가 있어도 '가동'. 보류 건수는 deferred_pending 필드가 별도로 전달 — 카드 라인이 표시. """ stats = {"summarize": _stage(pending=1, deferred_pending=1, done_15m=3)} machines = build_machines(stats, []) mm = _machine(machines, "macmini") assert mm["state"] == "active" assert mm["deferred_pending"] == 1 def test_macmini_state_deferred_only_when_not_working(): """일이 멈춰 있고(처리 0·최근 완료 0) 백오프만 쌓인 상태에서만 '보류'.""" stats = {"summarize": _stage(pending=1, deferred_pending=1)} machines = build_machines(stats, []) assert _machine(machines, "macmini")["state"] == "deferred" def test_macmini_state_idle(): machines = build_machines({}, []) assert _machine(machines, "macmini")["state"] == "idle" def test_nas_state_active_on_processing(): stats = {"extract": _stage(processing=1)} machines = build_machines(stats, []) assert _machine(machines, "nas")["state"] == "active" def test_nas_state_active_on_recent_done(): stats = {"embed": _stage(done_15m=2)} machines = build_machines(stats, []) assert _machine(machines, "nas")["state"] == "active" def test_nas_state_idle_when_old_done_only(): stats = {"embed": _stage(done_1h=5, done_today=9)} # 15분 내 완료 없음 machines = build_machines(stats, []) assert _machine(machines, "nas")["state"] == "idle" def test_macmini_state_active_on_summarize_processing(): stats = {"summarize": _stage(processing=1)} machines = build_machines(stats, []) assert _machine(machines, "macmini")["state"] == "active" # ─── current 귀속 ───────────────────────────────────────────────────────────── def test_current_summarize_to_macmini_max_two(): rows = [ {"stage": "summarize", "document_id": 1, "title": "문서A", "original_filename": None, "file_path": None}, {"stage": "summarize", "document_id": 2, "title": "문서B", "original_filename": None, "file_path": None}, {"stage": "summarize", "document_id": 3, "title": "문서C", "original_filename": None, "file_path": None}, {"stage": "extract", "document_id": 4, "title": "문서D", "original_filename": None, "file_path": None}, ] machines = build_machines({}, rows) macmini = _machine(machines, "macmini") nas = _machine(machines, "nas") assert [c["document_id"] for c in macmini["current"]] == [1, 2] # 최대 2건 assert macmini["current"][0] == {"document_id": 1, "title": "문서A", "stage": "summarize"} assert [c["document_id"] for c in nas["current"]] == [4] def test_current_deep_summary_to_macmini(): rows = [{"stage": "deep_summary", "document_id": 9, "title": "심층", "original_filename": None, "file_path": None}] machines = build_machines({}, rows) assert _machine(machines, "macmini")["current"][0]["document_id"] == 9 def test_display_title_fallback_chain(): assert display_title({"document_id": 1, "title": "제목"}) == "제목" assert display_title({"document_id": 1, "title": None, "original_filename": "a.pdf"}) == "a.pdf" assert display_title( {"document_id": 1, "title": None, "original_filename": None, "file_path": "/documents/PKM/Inbox/b.hwp"} ) == "b.hwp" assert display_title( {"document_id": 7, "title": None, "original_filename": None, "file_path": None} ) == "문서 #7" # ─── summarize ETA ──────────────────────────────────────────────────────────── def test_eta_minutes_positive_drain(): # 순소화 6건/h, 잔량 30건 → 300분 assert compute_eta_minutes(30, 10, 4) == 300 def test_eta_minutes_null_when_not_draining(): assert compute_eta_minutes(30, 4, 10) is None # 유입 > 소화 assert compute_eta_minutes(30, 5, 5) is None # 동률도 null assert compute_eta_minutes(30, 0, 0) is None def test_eta_minutes_zero_pending(): assert compute_eta_minutes(0, 10, 4) == 0 def test_build_summarize_eta_pending_includes_deferred(): stats = {"summarize": _stage(pending=12, deferred_pending=5, done_1h=8, created_1h=2)} eta = build_summarize_eta(stats) assert eta == { "pending": 12, # 보류 포함 총수 (pending 자체에 deferred 포함) "done_rate_1h": 8, "inflow_rate_1h": 2, "eta_minutes": round(12 / 6 * 60), } def test_build_summarize_eta_empty_stats(): eta = build_summarize_eta({}) assert eta == {"pending": 0, "done_rate_1h": 0, "inflow_rate_1h": 0, "eta_minutes": None} # ─── trend 24h ──────────────────────────────────────────────────────────────── def test_trend_24_buckets_oldest_first_with_gaps(): now_kst = datetime(2026, 6, 11, 14, 30, tzinfo=KST) inflow = {"2026-06-11 13:00": 3, "2026-06-10 15:00": 1} # 15:00 어제 = 최고령 버킷 done = {"2026-06-11 14:00": 2} trend = build_trend(inflow, done, now_kst) assert len(trend) == 24 assert trend[0] == {"hour": "15:00", "inflow": 1, "done": 0} # 오래된 것부터 assert trend[-1] == {"hour": "14:00", "inflow": 0, "done": 2} # 현재 시각 버킷 assert trend[-2] == {"hour": "13:00", "inflow": 3, "done": 0} # 빈 버킷은 0 assert sum(b["inflow"] for b in trend) == 4 assert sum(b["done"] for b in trend) == 2 def test_trend_ignores_out_of_window_bucket(): """창 밖(24버킷 미포함) key 는 무시 — cutoff 경계 행이 섞여도 안전.""" now_kst = datetime(2026, 6, 11, 14, 30, tzinfo=KST) inflow = {"2026-06-10 14:00": 99} # 14:00 어제 — 창의 최고령(15:00 어제) 이전 trend = build_trend(inflow, {}, now_kst) assert sum(b["inflow"] for b in trend) == 0 def test_trend_kst_midnight_crossing_labels(): now_kst = datetime(2026, 6, 11, 2, 5, tzinfo=KST) trend = build_trend({}, {}, now_kst) assert trend[-1]["hour"] == "02:00" assert trend[0]["hour"] == "03:00" # 전날 03:00 (라벨은 HH:00 만) assert [b["hour"] for b in trend[-3:]] == ["00:00", "01:00", "02:00"] # ─── totals / row 변환 / 전체 조립 ─────────────────────────────────────────── def test_totals_sum_all_stages(): stats = { "extract": _stage(pending=1, processing=2, failed=3), "summarize": _stage(pending=4, failed=1), "deep_summary": _stage(pending=2), } assert build_totals(stats) == {"pending": 7, "processing": 2, "failed": 4} def test_rows_to_stage_stats_conversion(): rows = [ ("extract", 3, 1, 0, 5, 9, 1, 0, 2), ("summarize", 7, None, 1, 10, 20, 0, 2, 4), # None 방어 ] stats = rows_to_stage_stats(rows) assert stats["extract"]["pending"] == 3 and stats["extract"]["created_1h"] == 2 assert stats["summarize"]["processing"] == 0 assert stats["summarize"]["deferred_pending"] == 2 def test_compose_overview_contract_shape(): """응답 dict 의 키가 FE 계약 shape 과 정확히 일치하는지 고정.""" out = compose_overview( {"summarize": _stage(pending=1)}, {}, {}, [], now_kst=datetime(2026, 6, 11, 14, 30, tzinfo=KST), ) assert set(out.keys()) == {"machines", "stages", "summarize_eta", "trend_24h", "totals"} assert [m["key"] for m in out["machines"]] == ["nas", "macmini"] for m in out["machines"]: assert set(m.keys()) == { "key", "label", "state", "stages", "pending", "processing", "failed", "done_1h", "done_today", "deferred_pending", "current", } assert m["state"] in ("active", "deferred", "idle") assert set(out["summarize_eta"].keys()) == {"pending", "done_rate_1h", "inflow_rate_1h", "eta_minutes"} assert len(out["trend_24h"]) == 24 assert set(out["trend_24h"][0].keys()) == {"hour", "inflow", "done"} assert set(out["totals"].keys()) == {"pending", "processing", "failed"} # 머신 label 고정 (raw 모델명 노출 금지 — label 만) assert [m["label"] for m in out["machines"]] == ["나스", "맥미니"] # ─── build_stages (단계별 현황 — 2026-06-11 사용자 피드백: 완료 가시화) ────── def test_build_stages_order_fields_and_age(): from datetime import timedelta, timezone from services.queue_overview import build_stages now = datetime(2026, 6, 11, 14, 0, tzinfo=timezone.utc) stats = { "summarize": {**_stage(pending=5, done_today=12), "oldest_pending_at": now - timedelta(hours=4)}, "extract": _stage(failed=2), } rows = build_stages(stats, now=now) by = {r["stage"]: r for r in rows} # 파이프라인 순서: extract 가 summarize 보다 앞 assert rows[0]["stage"] == "extract" assert by["summarize"]["pending"] == 5 assert by["summarize"]["done_today"] == 12 assert by["summarize"]["oldest_pending_age_sec"] == 4 * 3600 assert by["extract"]["failed"] == 2 assert by["extract"]["oldest_pending_age_sec"] is None # 전 stage 행 존재 (빈 단계 숨김은 FE 몫) assert {"stage", "pending", "processing", "failed", "done_1h", "created_1h", "done_today", "oldest_pending_age_sec"} == set(rows[0].keys()) def test_build_stages_exposes_rates(): """ds-board-engines-1: done_1h/created_1h 노출 — 흐름 노드 처리율·ETA·유입 우세 재료.""" from services.queue_overview import build_stages stats = {"embed": _stage(pending=4, done_1h=600, created_1h=120, done_today=900)} rows = build_stages(stats) embed = next(r for r in rows if r["stage"] == "embed") assert (embed["done_1h"], embed["created_1h"], embed["done_today"]) == (600, 120, 900)