From 21a78fbbf0ff7e987cc62fb13f03fed28f5c2368 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 8 Apr 2026 15:12:13 +0900 Subject: [PATCH] =?UTF-8?q?fix(search):=20semaphore=EB=A1=9C=20LLM=20concu?= =?UTF-8?q?rrency=3D1=20=EA=B0=95=EC=A0=9C=20+=20run=5Feval=20analyze=20?= =?UTF-8?q?=ED=8C=8C=EB=9D=BC=EB=AF=B8=ED=84=B0=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 배경 1차 Phase 2.2 eval에서 발견: 23개 쿼리가 순차 호출되지만 각 request의 background analyzer task는 모두 동시에 MLX에 요청 날림 → MLX single-inference 서버 queue 폭발 → 22개가 15초 timeout. cache 채워지지 않음. ## 수정 ### query_analyzer.py - LLM_CONCURRENCY = 1 상수 추가 - _LLM_SEMAPHORE: lazy init asyncio.Semaphore (event loop 바인딩) - analyze() 내부: semaphore → timeout(실제 LLM 호출만) 이중 래핑 semaphore 대기 시간이 timeout에 포함되지 않도록 주의 ### run_eval.py - --analyze true|false 파라미터 추가 (Phase 2.1+ 측정용) - call_search / evaluate 시그니처에 analyze 전달 ## 기대 효과 - prewarm/background/동기 호출 모두 1개씩 순차 MLX 호출 - 23개 대기 시 최악 230초 소요, 단 모두 성공해서 cache 채움 - MLX 서버 부하 안정 Co-Authored-By: Claude Opus 4.6 (1M context) --- app/services/search/query_analyzer.py | 28 ++++++++++++++++++++++----- tests/search_eval/run_eval.py | 19 ++++++++++++++---- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/app/services/search/query_analyzer.py b/app/services/search/query_analyzer.py index b034ce5..8fe2c09 100644 --- a/app/services/search/query_analyzer.py +++ b/app/services/search/query_analyzer.py @@ -46,6 +46,9 @@ LLM_TIMEOUT_MS = 15000 # async 구조 (background), 동기 경로 금지 # ↑ 실측: gemma-4-26b-a4b-it-8bit MLX, 축소 프롬프트(prompt_tok=802) 7~11초. # generation이 dominant (max_tokens 무효, 자연 EOS ~289 tok 생성). # background 실행이라 15초도 안전. 상향 필요 시 여기서만 조정. +LLM_CONCURRENCY = 1 # MLX는 single-inference, 동시 호출 시 queue 폭발. +# ↑ 실측: 23 concurrent 요청 → 22개 15초 timeout. semaphore로 순차 강제. +# prewarm/background/동기 호출 모두 이 semaphore 경유. MIN_CACHE_CONFIDENCE = 0.5 # 이 미만은 캐시 금지 MAX_NORMALIZED_QUERIES = 3 @@ -64,6 +67,17 @@ _CACHE: dict[str, dict[str, Any]] = {} _PENDING: set[asyncio.Task[Any]] = set() # 동일 쿼리 중복 실행 방지 (진행 중인 쿼리 집합) _INFLIGHT: set[str] = set() +# MLX concurrency 제한 (single-inference → 1) +# 첫 호출 시 lazy init (event loop이 준비된 후) +_LLM_SEMAPHORE: asyncio.Semaphore | None = None + + +def _get_llm_semaphore() -> asyncio.Semaphore: + """첫 호출 시 현재 event loop에 바인딩된 semaphore 생성.""" + global _LLM_SEMAPHORE + if _LLM_SEMAPHORE is None: + _LLM_SEMAPHORE = asyncio.Semaphore(LLM_CONCURRENCY) + return _LLM_SEMAPHORE def _cache_key(query: str) -> str: @@ -222,12 +236,16 @@ async def analyze(query: str, ai_client: AIClient | None = None) -> dict: client_owned = True t_start = time.perf_counter() + semaphore = _get_llm_semaphore() + # ⚠️ 중요: semaphore 대기는 timeout 포함되면 안됨 (대기만 해도 timeout 발동) + # timeout은 실제 LLM 호출 구간에만 적용. try: - async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): - raw = await ai_client._call_chat( - ai_client.ai.primary, - ANALYZE_PROMPT.replace("{query}", query), - ) + async with semaphore: + async with asyncio.timeout(LLM_TIMEOUT_MS / 1000): + raw = await ai_client._call_chat( + ai_client.ai.primary, + ANALYZE_PROMPT.replace("{query}", query), + ) except asyncio.TimeoutError: elapsed = (time.perf_counter() - t_start) * 1000 logger.warning( diff --git a/tests/search_eval/run_eval.py b/tests/search_eval/run_eval.py index a7ac06f..e506248 100644 --- a/tests/search_eval/run_eval.py +++ b/tests/search_eval/run_eval.py @@ -134,6 +134,7 @@ async def call_search( limit: int = 20, fusion: str | None = None, rerank: str | None = None, + analyze: str | None = None, ) -> tuple[list[int], float]: """검색 API 호출 → (doc_ids, latency_ms).""" url = f"{base_url.rstrip('/')}/api/search/" @@ -143,6 +144,8 @@ async def call_search( params["fusion"] = fusion if rerank is not None: params["rerank"] = rerank + if analyze is not None: + params["analyze"] = analyze import time @@ -169,6 +172,7 @@ async def evaluate( mode: str = "hybrid", fusion: str | None = None, rerank: str | None = None, + analyze: str | None = None, ) -> list[QueryResult]: """전체 쿼리셋 평가.""" results: list[QueryResult] = [] @@ -177,7 +181,7 @@ async def evaluate( for q in queries: try: returned_ids, latency_ms = await call_search( - client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank + client, base_url, token, q.query, mode=mode, fusion=fusion, rerank=rerank, analyze=analyze ) results.append( QueryResult( @@ -415,6 +419,13 @@ def main() -> int: choices=["true", "false"], help="bge-reranker-v2-m3 활성화 (Phase 1.3+, 미지정 시 서버 기본값=true)", ) + parser.add_argument( + "--analyze", + type=str, + default=None, + choices=["true", "false"], + help="QueryAnalyzer 활성화 (Phase 2.1+, cache hit 시 multilingual 적용)", + ) parser.add_argument( "--token", type=str, @@ -454,21 +465,21 @@ def main() -> int: if args.base_url: print(f"\n>>> evaluating: {args.base_url}") results = asyncio.run( - evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank) + evaluate(queries, args.base_url, args.token, "single", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze) ) print_summary("single", results) all_results.extend(results) else: print(f"\n>>> baseline: {args.baseline_url}") baseline_results = asyncio.run( - evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank) + evaluate(queries, args.baseline_url, args.token, "baseline", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze) ) baseline_summary = print_summary("baseline", baseline_results) print(f"\n>>> candidate: {args.candidate_url}") candidate_results = asyncio.run( evaluate( - queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank + queries, args.candidate_url, args.token, "candidate", mode=args.mode, fusion=args.fusion, rerank=args.rerank, analyze=args.analyze ) ) candidate_summary = print_summary("candidate", candidate_results)