From 9c22337647464592276be238732e8debf7489002 Mon Sep 17 00:00:00 2001 From: hyungi Date: Tue, 16 Jun 2026 13:18:17 +0900 Subject: [PATCH] =?UTF-8?q?fix(search):=20=EA=B3=B5=EC=9C=A0=20AsyncSessio?= =?UTF-8?q?n=20=EB=8F=99=EC=8B=9C=20=EC=BF=BC=EB=A6=AC=20=EC=A7=81?= =?UTF-8?q?=EB=A0=AC=ED=99=94/=EC=84=B8=EC=85=98=20=EB=B6=84=EB=A6=AC=20+?= =?UTF-8?q?=20rewrite=20axis=20=EB=88=84=EB=9D=BD=20(R2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit asyncio.gather 가 단일 AsyncSession 에 동시 execute 를 진입시켜 부하 의존적 'another operation in progress' 비결정 크래시 (정상 순차 경로에서만 검증돼 잠복). 사이트별 처방(균일 처방 회피): - search_with_rewrite._variant_retrieve: variant 마다 독립 async_session() fan-out (사용자 대면 — N variant 병렬 유지) - study explanation_rag / subject_note_rag: 백그라운드 prefetch 라 순차 직렬화 (rerank 도 순차 — DB 순차+rerank gather 분할은 _gather_* 4곳 침습이라 보류, 배경 작업의 rerank 병렬 이득 미미) 추가: rewrite(multi-query) 경로가 axis 필터(material_type/jurisdiction/year)를 single-query path 와 달리 조용히 누락 — search_with_rewrite 에 axis 인자 + _variant_retrieve 가 search_text/search_vector 에 전달. 검증: py_compile 통과. 동시 N variant 부하 테스트(staging)로 크래시 소거 확인 예정. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/services/search/search_pipeline.py | 25 ++++++++++++++++++------- app/services/study/explanation_rag.py | 15 +++++++++------ app/services/study/subject_note_rag.py | 10 +++++++--- 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/app/services/search/search_pipeline.py b/app/services/search/search_pipeline.py index 996737f..b9273b2 100644 --- a/app/services/search/search_pipeline.py +++ b/app/services/search/search_pipeline.py @@ -32,6 +32,8 @@ from typing import TYPE_CHECKING, Literal from sqlalchemy.ext.asyncio import AsyncSession +from core.database import async_session + from . import query_analyzer, query_rewriter from .fusion_service import ( DEFAULT_FUSION, @@ -188,6 +190,7 @@ async def run_search( snapshot_chunk_id_max=snapshot_chunk_id_max, reranker_backend=reranker_backend, rewrite_backend=rewrite_backend, + axis=axis, ) timing: dict[str, float] = {} @@ -536,6 +539,7 @@ async def search_with_rewrite( snapshot_chunk_id_max: int | None, reranker_backend: str | None, rewrite_backend: str, + axis: "AxisFilter | None" = None, ) -> PipelineResult: """Phase 2Q multi-query retrieval 합성 path (plan v6 §5.5). @@ -579,13 +583,20 @@ async def search_with_rewrite( async def _variant_retrieve( v: str, ) -> "tuple[list[SearchResult], list[SearchResult], dict[int, list[SearchResult]]]": - text = await search_text(session, v, per_variant_k) - raw_chunks = await search_vector( - session, v, per_variant_k, - embedding_backend=embedding_backend, - snapshot_doc_id_max=snapshot_doc_id_max, - snapshot_chunk_id_max=snapshot_chunk_id_max, - ) + # 변형별 독립 AsyncSession (fan-out). 공유 session 을 asyncio.gather 로 동시 + # execute 에 넘기면 SQLAlchemy async 가 'another operation in progress' 로 + # 부하 의존적 비결정 크래시 — variant 마다 독립 연결로 분리한다. + # axis(material_type/jurisdiction/year) 도 single-query path 와 동일하게 전달 + # (rewrite 경로가 axis 필터를 조용히 누락하던 결함 수정). + async with async_session() as vsession: + text = await search_text(vsession, v, per_variant_k, axis=axis) + raw_chunks = await search_vector( + vsession, v, per_variant_k, + embedding_backend=embedding_backend, + snapshot_doc_id_max=snapshot_doc_id_max, + snapshot_chunk_id_max=snapshot_chunk_id_max, + axis=axis, + ) vector, chunks_by_doc = compress_chunks_to_docs(raw_chunks, per_variant_k) return text, vector, chunks_by_doc diff --git a/app/services/study/explanation_rag.py b/app/services/study/explanation_rag.py index dc088b1..4e3616d 100644 --- a/app/services/study/explanation_rag.py +++ b/app/services/study/explanation_rag.py @@ -252,12 +252,15 @@ async def gather_explanation_context( client = AIClient() query = _build_query(question) try: - # 두 조회 병렬화 (rerank 호출이 별개라 lock 충돌 없음) - docs, questions = await asyncio.gather( - _gather_document_evidence(session, user_id, question.study_topic_id, query, client), - _gather_question_evidence( - session, user_id, question.study_topic_id, question.id, query, client - ), + # 같은 AsyncSession 을 asyncio.gather 로 동시 execute 에 넘기면 SQLAlchemy async 가 + # 'another operation in progress' 로 부하 의존적 비결정 크래시(이전 주석 'lock 충돌 + # 없음' 은 rerank HTTP 만 보고 DB execute 동시성을 간과한 오인). 백그라운드 prefetch + # 라 순차 직렬화 — 사용자 대면 rewrite 경로(독립 세션 fan-out)와는 다른 처방. + docs = await _gather_document_evidence( + session, user_id, question.study_topic_id, query, client + ) + questions = await _gather_question_evidence( + session, user_id, question.study_topic_id, question.id, query, client ) return ExplanationContext(documents=docs, questions=questions) finally: diff --git a/app/services/study/subject_note_rag.py b/app/services/study/subject_note_rag.py index 17bcfeb..0f398bb 100644 --- a/app/services/study/subject_note_rag.py +++ b/app/services/study/subject_note_rag.py @@ -238,9 +238,13 @@ async def gather_subject_note_context( client = AIClient() query = _build_query(subject, scope) try: - docs, questions = await asyncio.gather( - _gather_document_evidence(session, user_id, study_topic_id, query, client), - _gather_question_evidence(session, user_id, study_topic_id, subject, scope, query, client), + # 같은 AsyncSession 동시 execute 회피 — 순차 직렬화(백그라운드 prefetch). + # explanation_rag.gather_explanation_context 와 동형(R2 공유세션 동시성 수정). + docs = await _gather_document_evidence( + session, user_id, study_topic_id, query, client + ) + questions = await _gather_question_evidence( + session, user_id, study_topic_id, subject, scope, query, client ) return SubjectNoteContext(documents=docs, questions=questions) finally: