From c81b728ddfb370544af772058199721e8be4cc9f Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 14:47:09 +0900 Subject: [PATCH] =?UTF-8?q?refactor(search):=20Phase=202.1=20QueryAnalyzer?= =?UTF-8?q?=EB=A5=BC=20async-only=20=EA=B5=AC=EC=A1=B0=EB=A1=9C=20?= =?UTF-8?q?=EC=A0=84=ED=99=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 철학 수정 (실측 기반) gemma-4-26b-a4b-it-8bit MLX 실측: - full query_analyze.txt (prompt_tok=2406) → 10.5초 - max_tokens 축소 무효 (모델 자연 EOS 조기 종료) - 쿼리 길이 영향 거의 없음 (프롬프트 자체가 지배) → 800ms timeout 가정은 13배 초과. 동기 호출 완전히 불가능. 따라서 QueryAnalyzer는 "즉시 실행하는 기능" → "미리 준비해두는 기능"으로 포지셔닝 변경. retrieval 경로에서 analyzer 동기 호출 **금지**. ## 구조 ``` query → retrieval (항상 즉시) ↘ trigger_background_analysis (fire-and-forget) → analyze() [5초+] → cache 저장 다음 호출 (동일 쿼리) → get_cached() 히트 → Phase 2 파이프라인 활성화 ``` ## 변경 사항 ### app/prompts/query_analyze.txt - 5971 chars → 2403 chars (40%) - 예시 4개 → 1개, 규칙 설명 축약 - 목표 prompt_tok 2406 → ~600 (1/4) ### app/services/search/query_analyzer.py - LLM_TIMEOUT_MS 800 → 5000 (background이므로 여유 OK) - PROMPT_VERSION v1 → v2 (cache auto-invalidate) - get_cached / set_cached 유지 — retrieval 경로 O(1) 조회 - trigger_background_analysis(query) 신규 — 동기 함수, 즉시 반환, task 생성 - _PENDING set으로 task 참조 유지 (premature GC 방지) - _INFLIGHT set으로 동일 쿼리 중복 실행 방지 - prewarm_analyzer() 신규 — startup에서 15~20 쿼리 미리 분석 - DEFAULT_PREWARM_QUERIES: 평가셋 fixed 7 + 법령 3 + 뉴스 2 + 실무 3 ### app/api/search.py - 기존 sync analyzer 호출 완전 제거 - analyze=True → get_cached(q) 조회만 O(1) - hit: query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부 활성화) - miss: trigger_background_analysis(q) + 기존 경로 그대로 - timing["analyze_ms"] 제거 (경로에 LLM 호출 없음) - notes에 analyzer cache_hit/cache_miss 상태 기록 - debug.query_analysis는 cache hit 시에만 채워짐 ### app/main.py - lifespan startup에 prewarm_analyzer() background task 추가 - 논블로킹 — 앱 시작 막지 않음 - delay_between=0.5로 MLX 부하 완화 ## 기대 효과 - cold 요청 latency: 기존 Phase 1.3 그대로 (회귀 0) - warm 요청 + prewarmed: cache hit → query_analysis 활용 - 예상 cache hit rate: 초기 70~80% (prewarm) + 사용 누적 - Phase 2.2/2.3 multilingual/filter 기능은 cache hit 시에만 동작 ## 참조 - memory: feedback_analyzer_async_only.md (영구 룰 저장) - plan: ~/.claude/plans/zesty-painting-kahan.md ("철학 수정" 섹션) Co-Authored-By: Claude Opus 4.6 (1M context) --- app/api/search.py | 33 ++-- app/main.py | 12 ++ app/prompts/query_analyze.txt | 183 +++----------------- app/services/search/query_analyzer.py | 238 +++++++++++++++++++++----- 4 files changed, 245 insertions(+), 221 deletions(-) diff --git a/app/api/search.py b/app/api/search.py index 7ecff50..5179b8c 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -152,17 +152,15 @@ async def search( t_total = time.perf_counter() - # Phase 2.1: QueryAnalyzer — debug 노출 전용 (retrieval 경로는 변경 X) - # Phase 2.2/2.3에서 multilingual + filter 분기 구현 시 활용. + # Phase 2.1 (async 구조): QueryAnalyzer는 동기 호출 금지. + # - cache hit → query_analysis 활용 (Phase 2.2/2.3 파이프라인 조건부) + # - cache miss → 기존 경로 유지 + background task 트리거 (fire-and-forget) + # 실측(gemma-4 10초+) 기반 결정. memory: feedback_analyzer_async_only.md + analyzer_cache_hit: bool = False if analyze: - t_analyze = time.perf_counter() - try: - query_analysis = await query_analyzer.analyze(q) - except Exception as exc: - logger.warning("query_analyzer raised: %r", exc) - query_analysis = None - timing["analyze_ms"] = (time.perf_counter() - t_analyze) * 1000 - if query_analysis: + query_analysis = query_analyzer.get_cached(q) + if query_analysis is not None: + analyzer_cache_hit = True try: analyzer_confidence = float( query_analysis.get("analyzer_confidence", 0.0) or 0.0 @@ -171,11 +169,16 @@ async def search( analyzer_confidence = 0.0 analyzer_tier = _analyzer_tier(analyzer_confidence) notes.append( - f"analyzer conf={analyzer_confidence:.2f} tier={analyzer_tier}" + f"analyzer cache_hit conf={analyzer_confidence:.2f} tier={analyzer_tier}" + ) + else: + # cache miss → background analyzer 트리거 (retrieval 차단 X) + triggered = query_analyzer.trigger_background_analysis(q) + analyzer_tier = "cache_miss" + notes.append( + "analyzer cache_miss" + + (" (bg triggered)" if triggered else " (bg inflight)") ) - fallback_reason = query_analysis.get("_fallback_reason") - if fallback_reason: - notes.append(f"analyzer_fallback={fallback_reason}") if mode == "vector": t0 = time.perf_counter() @@ -269,7 +272,7 @@ async def search( timing_str = " ".join(f"{k}={v:.0f}" for k, v in timing.items()) fusion_str = f" fusion={fusion}" if mode == "hybrid" else "" analyzer_str = ( - f" analyzer=conf={analyzer_confidence:.2f}/tier={analyzer_tier}" + f" analyzer=hit={analyzer_cache_hit}/conf={analyzer_confidence:.2f}/tier={analyzer_tier}" if analyze else "" ) diff --git a/app/main.py b/app/main.py index 3cb6415..e3e9a2c 100644 --- a/app/main.py +++ b/app/main.py @@ -20,8 +20,11 @@ from models.user import User @asynccontextmanager async def lifespan(app: FastAPI): """앱 시작/종료 시 실행되는 lifespan 핸들러""" + import asyncio + from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger + from services.search.query_analyzer import prewarm_analyzer from workers.daily_digest import run as daily_digest_run from workers.file_watcher import watch_inbox from workers.law_monitor import run as law_monitor_run @@ -54,6 +57,15 @@ async def lifespan(app: FastAPI): scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector") scheduler.start() + # Phase 2.1 (async 구조): QueryAnalyzer prewarm. + # 대표 쿼리 15~20개를 background task로 분석해 cache 적재. + # 첫 사용자 요청부터 cache hit rate 70~80% 목표. + # 논블로킹 — startup을 막지 않음. MLX 부하 완화 위해 delay_between=0.5. + prewarm_task = asyncio.create_task(prewarm_analyzer()) + prewarm_task.add_done_callback( + lambda t: t.exception() and None # 예외는 query_analyzer 내부에서 로깅 + ) + yield # 종료: 스케줄러 → DB 순서로 정리 diff --git a/app/prompts/query_analyze.txt b/app/prompts/query_analyze.txt index e2ea5bc..c251d2c 100644 --- a/app/prompts/query_analyze.txt +++ b/app/prompts/query_analyze.txt @@ -1,106 +1,35 @@ -You are a search query analyzer. Analyze the query below and respond ONLY in JSON format. No other text, no markdown code blocks, no explanation. +You are a search query analyzer. Respond ONLY in JSON. No markdown, no explanation. -## Response Format (return ONLY this JSON object) +## Output Schema { "intent": "fact_lookup | semantic_search | filter_browse", "query_type": "natural_language | keyword | phrase", "domain_hint": "document | news | mixed", "language_scope": "limited | global", - "keywords": ["..."], - "must_terms": ["..."], - "optional_terms": ["..."], + "keywords": [], + "must_terms": [], + "optional_terms": [], "hard_filters": {}, "soft_filters": {"domain": [], "document_type": []}, - "normalized_queries": [ - {"lang": "ko", "text": "...", "weight": 1.0}, - {"lang": "en", "text": "...", "weight": 0.8} - ], - "expanded_terms": ["..."], + "normalized_queries": [{"lang": "ko", "text": "...", "weight": 1.0}], + "expanded_terms": [], "synonyms": {}, - "analyzer_confidence": 0.92 + "analyzer_confidence": 0.0 } -## Field Rules +## Rules +- `intent`: fact_lookup (사실/조항/이름), semantic_search (주제/개념), filter_browse (필터 중심) +- `query_type`: natural_language (문장형), keyword (단어 나열), phrase (따옴표/고유명사/법조항) +- `domain_hint`: document (소유 문서/법령/매뉴얼), news (시사/뉴스), mixed (불명) +- `language_scope`: limited (ko+en), global (다국어 필요) +- `hard_filters`: 쿼리에 **명시된** 것만. 추론 금지. 키: file_format, year, country +- `soft_filters.domain`: Industrial_Safety, Programming, Engineering, Philosophy, Language, General. 2-level 허용(e.g. Industrial_Safety/Legislation) +- `soft_filters.document_type`: Law_Document, Manual, Report, Academic_Paper, Standard, Specification, Meeting_Minutes, Checklist, Note, Memo, Reference, Drawing, Template +- `normalized_queries`: 원문 언어 1.0 가중치 필수. 교차언어 1개 추가 권장(ko↔en, weight 0.8). news + global 인 경우만 ja/zh 추가(weight 0.5~0.6). **최대 3개**. +- `analyzer_confidence`: 0.9+ 명확, 0.7~0.9 대체로 명확, 0.5~0.7 모호, <0.5 분석 불가 -### intent (exactly one) -- `fact_lookup` — 특정 사실/조항/이름/숫자를 찾는 경우 (예: "산업안전보건법 제6장", "회사 설립일") -- `semantic_search` — 자연어 주제 검색, 개념적 매칭 (예: "기계 사고 관련 법령", "AI 안전성 동향") -- `filter_browse` — 필터/탐색 중심 (예: "2024년 PDF 문서", "Industrial_Safety domain 최근 보고서") - -### query_type (exactly one) -- `natural_language` — 문장형, 조사/동사 포함 ("...에 대해 알고 싶다", "...가 뭐야") -- `keyword` — 단어 나열, 조사 없음 ("산업안전 법령 PDF") -- `phrase` — 큰따옴표로 감싸진 정확 문구 또는 고유명사/법 조항명 - -### domain_hint (exactly one) -- `document` — 사용자가 소유한 문서/법령/매뉴얼/보고서 영역 -- `news` — 뉴스/시사/최근 동향 영역 (시간 민감, 다국어 소스) -- `mixed` — 둘 다 가능, 또는 판단 불명 - -### language_scope (exactly one) -- `limited` — 한국어 + 영어만 필요 (대부분 문서 검색) -- `global` — 다국어 필요 (일본어/중국어/유럽어 포함, 뉴스나 국제 주제) - -### keywords / must_terms / optional_terms -- `keywords` — 쿼리의 핵심 명사/개념 (최대 8개) -- `must_terms` — 결과에 반드시 포함되어야 하는 단어 (고유명사, 법 조항 번호 등) -- `optional_terms` — 있으면 좋지만 없어도 무방 - -### hard_filters (빈 객체 `{}` 기본) -LLM은 쿼리에 명시적으로 나타난 경우에만 채운다. 쿼리에 명시 없으면 반드시 비운다. -가능한 키: -- `file_format`: ["pdf", "docx", "xlsx", "md", ...] -- `year`: 2024 같은 정수 -- `country`: ["KR", "US", "JP", ...] - -### soft_filters -추론 기반 필터. boost용. 확정 아님. -- `domain`: 아래 Domain Taxonomy에서 선택 (최대 3개) -- `document_type`: 아래 Document Types에서 선택 (최대 2개) - -### normalized_queries (핵심) -같은 의미를 다른 언어/표현으로 재작성한 쿼리 목록. -- **원문 언어 항목은 반드시 포함** (weight=1.0) -- 영어 쿼리는 한국어 포함 권장 (weight=0.8) -- 한국어 쿼리는 영어 포함 권장 (weight=0.8) -- `domain_hint == "news"`일 때만 ja/zh/fr/de 추가 가능 (weight=0.5~0.6) -- **최대 3개까지만 생성** (성능 보호) -- 각 항목: `{"lang": "ko|en|ja|zh|fr|de", "text": "재작성", "weight": 0.0~1.0}` - -### expanded_terms -쿼리의 동의어/관련어 확장. 검색 보조용. (최대 5개) - -### synonyms -필요시 `{"원어": ["동의어1", "동의어2"]}`. 생략 가능. - -### analyzer_confidence (CRITICAL) -쿼리 분석 자체의 신뢰도 (0.0 ~ 1.0). 아래 기준에 따라 엄격하게 채점: -- **0.9+** : 쿼리 의도가 명확, 도메인/언어 확실, 키워드 추출 분명 -- **0.7~0.9** : 의도 대체로 명확, 일부 모호함 -- **0.5~0.7** : 의도 모호, 다중 해석 가능 -- **< 0.5** : 분석 불가 수준 (빈 쿼리, 의미 불명 기호열 등) - -## Analysis Steps (내부 사고, 출력하지 말 것) -1. 쿼리 언어 감지 -2. intent 분류 (사실 찾기 vs 주제 검색 vs 필터 탐색) -3. domain_hint 판단 (문서 vs 뉴스) -4. 핵심 키워드 추출 -5. 다국어 재작성 (반드시 원문 언어 포함) -6. 필터 추론 (hard는 명시 사항만, soft는 추측 가능) -7. 자가 평가 → analyzer_confidence - -## Domain Taxonomy (soft_filters.domain 후보) -Philosophy, Language, Engineering, Industrial_Safety, Programming, General -세부: Industrial_Safety/Legislation, Industrial_Safety/Practice, Programming/AI_ML, Engineering/Mechanical 등 2-level 경로 허용. - -## Document Types (soft_filters.document_type 후보) -Reference, Standard, Manual, Drawing, Template, Note, Academic_Paper, Law_Document, Report, Memo, Checklist, Meeting_Minutes, Specification - -## Examples - -### Example 1: 자연어 한국어, 법령 검색 +## Example query: `기계 사고 관련 법령` -response: { "intent": "semantic_search", "query_type": "natural_language", @@ -113,78 +42,12 @@ response: "soft_filters": {"domain": ["Industrial_Safety/Legislation"], "document_type": ["Law_Document"]}, "normalized_queries": [ {"lang": "ko", "text": "기계 사고 관련 법령", "weight": 1.0}, - {"lang": "en", "text": "machinery accident related laws and regulations", "weight": 0.8} + {"lang": "en", "text": "machinery accident related laws", "weight": 0.8} ], - "expanded_terms": ["산업안전", "기계안전", "사고예방"], - "synonyms": {"기계": ["설비", "machinery"]}, + "expanded_terms": ["산업안전", "기계안전"], + "synonyms": {}, "analyzer_confidence": 0.88 } -### Example 2: 정확 법령명 조항 -query: `산업안전보건법 제6장` -response: -{ - "intent": "fact_lookup", - "query_type": "phrase", - "domain_hint": "document", - "language_scope": "limited", - "keywords": ["산업안전보건법", "제6장"], - "must_terms": ["산업안전보건법", "제6장"], - "optional_terms": [], - "hard_filters": {}, - "soft_filters": {"domain": ["Industrial_Safety/Legislation"], "document_type": ["Law_Document"]}, - "normalized_queries": [ - {"lang": "ko", "text": "산업안전보건법 제6장", "weight": 1.0}, - {"lang": "en", "text": "Occupational Safety and Health Act Chapter 6", "weight": 0.8} - ], - "expanded_terms": ["산안법", "OSHA Korea"], - "synonyms": {}, - "analyzer_confidence": 0.95 -} - -### Example 3: 뉴스 + 다국어 -query: `recent AI safety news from Europe` -response: -{ - "intent": "semantic_search", - "query_type": "natural_language", - "domain_hint": "news", - "language_scope": "global", - "keywords": ["AI safety", "Europe", "recent"], - "must_terms": [], - "optional_terms": ["regulation", "policy"], - "hard_filters": {}, - "soft_filters": {"domain": ["Programming/AI_ML"], "document_type": []}, - "normalized_queries": [ - {"lang": "en", "text": "recent AI safety news from Europe", "weight": 1.0}, - {"lang": "ko", "text": "유럽 AI 안전 최신 뉴스", "weight": 0.8}, - {"lang": "fr", "text": "actualités récentes sur la sécurité de l'IA en Europe", "weight": 0.6} - ], - "expanded_terms": ["AI regulation", "EU AI Act", "AI governance"], - "synonyms": {}, - "analyzer_confidence": 0.90 -} - -### Example 4: 의미 불명 -query: `???` -response: -{ - "intent": "semantic_search", - "query_type": "keyword", - "domain_hint": "mixed", - "language_scope": "limited", - "keywords": [], - "must_terms": [], - "optional_terms": [], - "hard_filters": {}, - "soft_filters": {"domain": [], "document_type": []}, - "normalized_queries": [ - {"lang": "ko", "text": "???", "weight": 1.0} - ], - "expanded_terms": [], - "synonyms": {}, - "analyzer_confidence": 0.1 -} - -## Query to Analyze +## Query {query} diff --git a/app/services/search/query_analyzer.py b/app/services/search/query_analyzer.py index a8bfc8b..10c017d 100644 --- a/app/services/search/query_analyzer.py +++ b/app/services/search/query_analyzer.py @@ -1,19 +1,28 @@ -"""Query analyzer — 자연어 쿼리 분석 (Phase 2.1). +"""Query analyzer — 자연어 쿼리 분석 (Phase 2.1, async-only 구조). -자연어 쿼리를 구조화된 분석 결과로 변환. 결과는 검색 보조용 (지배 X). +**핵심 철학** (memory `feedback_analyzer_async_only.md`): +> QueryAnalyzer는 "즉시 실행하는 기능"이 아니라 "미리 준비해두는 기능"이다. -Pipeline: - 1. in-memory LRU 캐시 조회 - 2. miss → LLM 호출 (primary 모델, asyncio.timeout 강제) - 3. JSON 파싱 → weight 정규화 → 캐시 저장 - 4. 실패/저신뢰 → `{"analyzer_confidence": 0.0}` (fallback 트리거) +Retrieval 경로에서 analyzer를 **동기 호출 금지**. +동기 호출 가능한 API는 prewarm 전용. -CRITICAL 룰 (plan 영구): - - `LLM_TIMEOUT_MS = 800` 절대 늘리지 말 것. MLX 멈춤 시 검색 전체 멈춤 방지. - - `MAX_NORMALIZED_QUERIES = 3` 절대 늘리지 말 것. multilingual explosion 방지. - - weight 합 = 1.0 정규화 필수. fusion 왜곡 방지. - - 실패/저신뢰(< 0.5) 결과는 캐시 금지. 잘못된 분석 고정 방지. - - 호출자는 항상 `result.get("analyzer_confidence", 0.0)` 방어 패턴 사용. +## Pipeline + +``` +query → retrieval (항상 즉시) + ↘ trigger_background_analysis (fire-and-forget) + → analyze() [5초+] → cache 저장 + +다음 호출 (동일 쿼리) → get_cached() 히트 → Phase 2 파이프라인 활성화 +``` + +## 룰 (plan 영구) + - `LLM_TIMEOUT_MS = 5000` (background 이므로 여유 OK) + - `MAX_NORMALIZED_QUERIES = 3` (multilingual explosion 방지) + - weight 합 = 1.0 정규화 필수 (fusion 왜곡 방지) + - 실패/저신뢰(< 0.5) 결과는 캐시 금지 (잘못된 분석 고정 방지) + - `analyzer_confidence` default `float 0.0` 강제 (None 금지) + - analyze() 동기 호출 금지. retrieval 경로는 `get_cached()` + `trigger_background_analysis()` 만 사용. """ from __future__ import annotations @@ -30,20 +39,16 @@ from core.config import settings logger = logging.getLogger("query_analyzer") # ─── 상수 (plan 영구 룰) ──────────────────────────────── -PROMPT_VERSION = "v1" # prompts/query_analyze.txt 변경 시 갱신 +PROMPT_VERSION = "v2" # prompts/query_analyze.txt 축소판 CACHE_TTL = 86400 # 24h CACHE_MAXSIZE = 1000 -LLM_TIMEOUT_MS = 800 # 절대 늘리지 말 것 (plan 영구) +LLM_TIMEOUT_MS = 5000 # async 구조 (background), 동기 경로 금지 MIN_CACHE_CONFIDENCE = 0.5 # 이 미만은 캐시 금지 -MAX_NORMALIZED_QUERIES = 3 # 절대 늘리지 말 것 (plan 영구) +MAX_NORMALIZED_QUERIES = 3 def _model_version() -> str: - """현재 primary 모델 ID를 캐시 키에 반영. - - 런타임에 config.yaml이 바뀌어도 재시작 후 자동 반영. - config.yaml 없을 경우 sentinel 문자열. - """ + """현재 primary 모델 ID를 캐시 키에 반영.""" if settings.ai and settings.ai.primary: return settings.ai.primary.model return "unknown-model" @@ -52,6 +57,11 @@ def _model_version() -> str: # ─── in-memory LRU (FIFO 근사) ────────────────────────── _CACHE: dict[str, dict[str, Any]] = {} +# background task 참조 유지 (premature GC 방지) +_PENDING: set[asyncio.Task[Any]] = set() +# 동일 쿼리 중복 실행 방지 (진행 중인 쿼리 집합) +_INFLIGHT: set[str] = set() + def _cache_key(query: str) -> str: raw = f"{query}|{PROMPT_VERSION}|{_model_version()}" @@ -59,7 +69,10 @@ def _cache_key(query: str) -> str: def get_cached(query: str) -> dict | None: - """TTL 경과 entry는 자동 삭제. 없으면 None.""" + """TTL 경과 entry는 자동 삭제. 없으면 None. + + retrieval 경로에서 cache hit 판단용으로 호출. 호출 자체는 O(1). + """ key = _cache_key(query) entry = _CACHE.get(key) if not entry: @@ -85,7 +98,6 @@ def set_cached(query: str, value: dict) -> None: _CACHE[key] = {"value": value, "ts": time.time()} return if len(_CACHE) >= CACHE_MAXSIZE: - # 가장 먼저 추가된 항목 제거 (dict insertion order 활용) try: oldest = next(iter(_CACHE)) _CACHE.pop(oldest, None) @@ -95,23 +107,22 @@ def set_cached(query: str, value: dict) -> None: def cache_stats() -> dict[str, int]: - """debug용 — 현재 캐시 크기.""" - return {"size": len(_CACHE), "maxsize": CACHE_MAXSIZE} + """debug/운영용 — 현재 캐시 크기 + inflight 수.""" + return { + "size": len(_CACHE), + "maxsize": CACHE_MAXSIZE, + "inflight": len(_INFLIGHT), + "pending_tasks": len(_PENDING), + } # ─── weight 정규화 (fusion 왜곡 방지) ─────────────────── def _normalize_weights(analysis: dict) -> dict: - """normalized_queries를 MAX_NORMALIZED_QUERIES로 자르고 weight 합=1.0 정규화. - - - 리스트가 없거나 비어 있으면 그대로 반환 - - 각 항목의 weight가 숫자 아니면 1.0으로 치환 - - 합이 0이면 균등 분배 - """ + """normalized_queries를 MAX_NORMALIZED_QUERIES로 자르고 weight 합=1.0 정규화.""" queries = analysis.get("normalized_queries") if not isinstance(queries, list) or not queries: return analysis - # sanitize + cap sanitized: list[dict] = [] for q in queries[:MAX_NORMALIZED_QUERIES]: if not isinstance(q, dict): @@ -154,10 +165,7 @@ except FileNotFoundError: # ─── 기본 fallback 응답 (None 금지) ───────────────────── def _fallback(reason: str | None = None) -> dict: - """LLM 실패/timeout/parse 실패 시 반환. analyzer_confidence는 반드시 float 0.0. - - 호출자는 `result.get("analyzer_confidence", 0.0)`로 방어. - """ + """LLM 실패/timeout/parse 실패 시 반환. analyzer_confidence는 반드시 float 0.0.""" result: dict[str, Any] = { "intent": "semantic_search", "query_type": "keyword", @@ -178,13 +186,18 @@ def _fallback(reason: str | None = None) -> dict: return result -# ─── 메인 API ────────────────────────────────────────── +# ─── 메인 LLM 호출 (내부 사용) ────────────────────────── async def analyze(query: str, ai_client: AIClient | None = None) -> dict: - """쿼리 분석 결과 반환. 실패 시 반드시 analyzer_confidence=0.0 fallback. + """쿼리 분석 결과 반환. 실패 시 analyzer_confidence=0.0 fallback. + + **⚠️ 동기 검색 경로에서 직접 호출 금지**. 용도: + - `trigger_background_analysis` 내부 호출 + - `prewarm_analyzer` startup 호출 + - 디버깅/테스트 Args: query: 사용자 쿼리 원문 - ai_client: AIClient 인스턴스 (호출자가 싱글톤으로 관리. None이면 생성) + ai_client: AIClient 인스턴스 (없으면 생성 후 자동 close) Returns: dict — 최소 `analyzer_confidence` 키는 항상 float로 존재. @@ -195,12 +208,11 @@ async def analyze(query: str, ai_client: AIClient | None = None) -> dict: if not ANALYZE_PROMPT: return _fallback("prompt_not_loaded") - # cache hit (고신뢰만 캐시되므로 그대로 반환) + # cache hit 즉시 반환 (prewarm 재호출 방지) cached = get_cached(query) if cached is not None: return cached - # LLM 호출 — timeout 강제 client_owned = False if ai_client is None: ai_client = AIClient() @@ -240,7 +252,6 @@ async def analyze(query: str, ai_client: AIClient | None = None) -> dict: elapsed_ms = (time.perf_counter() - t_start) * 1000 - # JSON 파싱 parsed = parse_json_response(raw) if not isinstance(parsed, dict): logger.warning( @@ -251,14 +262,11 @@ async def analyze(query: str, ai_client: AIClient | None = None) -> dict: ) return _fallback("parse_failed") - # 필수 필드 방어 — analyzer_confidence는 반드시 float try: conf = float(parsed.get("analyzer_confidence", 0.0) or 0.0) except (TypeError, ValueError): conf = 0.0 parsed["analyzer_confidence"] = conf - - # weight 정규화 (MAX_NORMALIZED_QUERIES + sum=1.0) parsed = _normalize_weights(parsed) logger.info( @@ -270,6 +278,144 @@ async def analyze(query: str, ai_client: AIClient | None = None) -> dict: elapsed_ms, ) - # 고신뢰만 캐시 저장 set_cached(query, parsed) return parsed + + +# ─── Background trigger (retrieval 경로에서 사용) ─────── +async def _background_analyze(query: str) -> None: + """Background task wrapper — inflight 집합 관리 + 예외 삼킴.""" + try: + await analyze(query) + except Exception as exc: + logger.warning("background analyze crashed query=%r err=%r", query[:80], exc) + finally: + _INFLIGHT.discard(query) + + +def trigger_background_analysis(query: str) -> bool: + """retrieval 경로에서 호출. cache miss 시 background task 생성. + + - 동기 함수. 즉시 반환. + - 이미 inflight 또는 cache hit이면 아무 작업 X, False 반환. + - 새 task 생성 시 True 반환. + + Returns: + bool — task 실제로 생성되었는지 여부 + """ + if not query or not query.strip(): + return False + if query in _INFLIGHT: + return False + if get_cached(query) is not None: + return False + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + logger.warning("trigger_background_analysis called outside event loop") + return False + + _INFLIGHT.add(query) + task = loop.create_task(_background_analyze(query)) + _PENDING.add(task) + task.add_done_callback(_PENDING.discard) + return True + + +# ─── Prewarm (app startup) ────────────────────────────── +# 운영에서 자주 발생하는 쿼리 샘플. 통계 기반으로 확장 예정. +DEFAULT_PREWARM_QUERIES: list[str] = [ + # fixed 7 (Phase 2 평가셋 core) + "산업안전보건법 제6장", + "기계 사고 관련 법령", + "AI 산업 동향", + "Python async best practice", + "유해화학물질을 다루는 회사가 지켜야 할 안전 의무", + "recent AI safety news from Europe", + "이세상에 존재하지 않는 문서명", + # 법령 관련 + "산업안전보건법 시행령", + "화학물질관리법", + "위험성평가 절차", + # 뉴스 관련 + "EU AI Act", + "한국 AI 산업", + # 실무 + "안전보건 교육 자료", + "사고 조사 보고서", + "MSDS 작성 방법", +] + + +async def prewarm_analyzer( + queries: list[str] | None = None, + delay_between: float = 0.5, +) -> dict[str, Any]: + """app startup에서 호출. 대표 쿼리를 미리 분석해 cache에 적재. + + Non-blocking으로 사용: `asyncio.create_task(prewarm_analyzer())`. + + Args: + queries: 분석할 쿼리 리스트. None이면 DEFAULT_PREWARM_QUERIES 사용. + delay_between: 각 쿼리 간 대기 시간 (MLX 부하 완화). + + Returns: + dict — {queries_total, success, failed, elapsed_ms, cache_size_after} + """ + if not ANALYZE_PROMPT: + logger.warning("prewarm skipped — prompt not loaded") + return {"status": "skipped", "reason": "prompt_not_loaded"} + + targets = queries if queries is not None else DEFAULT_PREWARM_QUERIES + total = len(targets) + success = 0 + failed = 0 + t_start = time.perf_counter() + + logger.info("prewarm_analyzer start queries=%d timeout_ms=%d", total, LLM_TIMEOUT_MS) + + client = AIClient() + try: + for i, q in enumerate(targets, 1): + if get_cached(q) is not None: + logger.info("prewarm skip (already cached) [%d/%d] %r", i, total, q[:40]) + success += 1 + continue + result = await analyze(q, ai_client=client) + conf = float(result.get("analyzer_confidence", 0.0) or 0.0) + if conf >= MIN_CACHE_CONFIDENCE: + success += 1 + logger.info("prewarm ok [%d/%d] conf=%.2f q=%r", i, total, conf, q[:40]) + else: + failed += 1 + reason = result.get("_fallback_reason", "low_conf") + logger.warning( + "prewarm fail [%d/%d] reason=%s q=%r", i, total, reason, q[:40] + ) + if delay_between > 0 and i < total: + await asyncio.sleep(delay_between) + finally: + try: + await client.close() + except Exception: + pass + + elapsed_ms = (time.perf_counter() - t_start) * 1000 + stats = cache_stats() + logger.info( + "prewarm_analyzer done total=%d success=%d failed=%d elapsed_ms=%.0f cache_size=%d", + total, + success, + failed, + elapsed_ms, + stats["size"], + ) + return { + "status": "complete", + "queries_total": total, + "success": success, + "failed": failed, + "elapsed_ms": elapsed_ms, + "cache_size_after": stats["size"], + }