"""Cluster-level LLM 호출 + JSON 파싱 + timeout + drop금지 fallback. 핵심 결정: - AIClient._call_chat 직접 호출 (client.py 수정 회피, fallback 로직 재사용) - 전역 MLX gate(BACKGROUND) 경유로 동시성 제어 (services.search.llm_gate 단일 게이트) - Per-call timeout = config.digest_llm_timeout_s (asyncio.wait_for, gate 안쪽) - JSON 파싱 실패 → 1회 재시도 → 그래도 실패 시 minimal fallback (drop 금지) - fallback: topic_label="주요 뉴스 묶음", summary = top member ai_summary[:200] """ import asyncio from pathlib import Path from typing import Any from ai.client import parse_json_response from core.config import settings from core.utils import setup_logger from services.search.llm_gate import Priority, acquire_mlx_gate logger = setup_logger("digest_summarizer") # 2026-06-15: config 단일소스 (구 하드코딩 25s = 빠른 Gemma 기준, Qwen 27B 교체 후 누락). LLM_CALL_TIMEOUT = settings.digest_llm_timeout_s FALLBACK_SUMMARY_LIMIT = 200 _PROMPT_PATH = Path(__file__).resolve().parent.parent.parent / "prompts" / "digest_topic.txt" _PROMPT_TEMPLATE: str | None = None def _load_prompt() -> str: global _PROMPT_TEMPLATE if _PROMPT_TEMPLATE is None: _PROMPT_TEMPLATE = _PROMPT_PATH.read_text(encoding="utf-8") return _PROMPT_TEMPLATE def build_prompt(selected: list[dict]) -> str: """digest_topic.txt 템플릿에 selected article들의 ai_summary_truncated 주입. 템플릿 placeholder: {articles_block} """ template = _load_prompt() lines = [] for i, m in enumerate(selected, start=1): text = (m.get("ai_summary_truncated") or m.get("ai_summary") or m.get("title") or "").strip() lines.append(f"[{i}] {text}") articles_block = "\n".join(lines) return template.replace("{articles_block}", articles_block) async def _try_call_llm(client: Any, prompt: str) -> str: """전역 MLX gate(BACKGROUND) + per-call timeout 으로 감싼 단일 호출. 영구 룰(llm_gate): Mac mini endpoint 는 단일 게이트 공유, 새 Semaphore 금지. 동시성 lever = config.mlx_gate_concurrency. timeout 은 gate 안쪽에서만. """ async with acquire_mlx_gate(Priority.BACKGROUND): return await asyncio.wait_for( client._call_chat(client.ai.primary, prompt), timeout=LLM_CALL_TIMEOUT, ) def _make_fallback(cluster: dict) -> dict: """cluster 의 top member 데이터로 minimal fallback 생성 — 정보 손실 회피.""" members = cluster["members"] if not members: return { "topic_label": "주요 뉴스 묶음", "summary": "", "llm_fallback_used": True, } top = max(members, key=lambda m: m.get("_rel", m.get("weight", 0.0))) text = (top.get("ai_summary") or top.get("title") or "").strip() return { "topic_label": "주요 뉴스 묶음", "summary": text[:FALLBACK_SUMMARY_LIMIT], "llm_fallback_used": True, } async def summarize_cluster_with_fallback( client: Any, cluster: dict, selected: list[dict], ) -> dict: """cluster 1개에 대해 LLM 호출 + JSON 파싱 + fallback. Returns: {topic_label, summary, llm_fallback_used} """ prompt = build_prompt(selected) for attempt in range(settings.digest_llm_attempts): # config 단일소스 (기본 2 = 1회 재시도) try: raw = await _try_call_llm(client, prompt) except asyncio.TimeoutError: logger.warning( f"LLM 호출 timeout {LLM_CALL_TIMEOUT}s " f"(attempt={attempt + 1}, cluster size={len(cluster['members'])})" ) continue except Exception as e: logger.warning( f"LLM 호출 실패 attempt={attempt + 1} " f"(cluster size={len(cluster['members'])}): {e}" ) continue parsed = parse_json_response(raw) if ( parsed and isinstance(parsed.get("topic_label"), str) and isinstance(parsed.get("summary"), str) and parsed["topic_label"].strip() and parsed["summary"].strip() ): return { "topic_label": parsed["topic_label"].strip(), "summary": parsed["summary"].strip(), "llm_fallback_used": False, } logger.warning( f"JSON 파싱 실패 attempt={attempt + 1} " f"(cluster size={len(cluster['members'])}, raw_len={len(raw) if raw else 0})" ) return _make_fallback(cluster)