diff --git a/app/services/search/search_pipeline.py b/app/services/search/search_pipeline.py index 996737f..b9273b2 100644 --- a/app/services/search/search_pipeline.py +++ b/app/services/search/search_pipeline.py @@ -32,6 +32,8 @@ from typing import TYPE_CHECKING, Literal from sqlalchemy.ext.asyncio import AsyncSession +from core.database import async_session + from . import query_analyzer, query_rewriter from .fusion_service import ( DEFAULT_FUSION, @@ -188,6 +190,7 @@ async def run_search( snapshot_chunk_id_max=snapshot_chunk_id_max, reranker_backend=reranker_backend, rewrite_backend=rewrite_backend, + axis=axis, ) timing: dict[str, float] = {} @@ -536,6 +539,7 @@ async def search_with_rewrite( snapshot_chunk_id_max: int | None, reranker_backend: str | None, rewrite_backend: str, + axis: "AxisFilter | None" = None, ) -> PipelineResult: """Phase 2Q multi-query retrieval 합성 path (plan v6 §5.5). @@ -579,13 +583,20 @@ async def search_with_rewrite( async def _variant_retrieve( v: str, ) -> "tuple[list[SearchResult], list[SearchResult], dict[int, list[SearchResult]]]": - text = await search_text(session, v, per_variant_k) - raw_chunks = await search_vector( - session, v, per_variant_k, - embedding_backend=embedding_backend, - snapshot_doc_id_max=snapshot_doc_id_max, - snapshot_chunk_id_max=snapshot_chunk_id_max, - ) + # 변형별 독립 AsyncSession (fan-out). 공유 session 을 asyncio.gather 로 동시 + # execute 에 넘기면 SQLAlchemy async 가 'another operation in progress' 로 + # 부하 의존적 비결정 크래시 — variant 마다 독립 연결로 분리한다. + # axis(material_type/jurisdiction/year) 도 single-query path 와 동일하게 전달 + # (rewrite 경로가 axis 필터를 조용히 누락하던 결함 수정). + async with async_session() as vsession: + text = await search_text(vsession, v, per_variant_k, axis=axis) + raw_chunks = await search_vector( + vsession, v, per_variant_k, + embedding_backend=embedding_backend, + snapshot_doc_id_max=snapshot_doc_id_max, + snapshot_chunk_id_max=snapshot_chunk_id_max, + axis=axis, + ) vector, chunks_by_doc = compress_chunks_to_docs(raw_chunks, per_variant_k) return text, vector, chunks_by_doc diff --git a/app/services/study/explanation_rag.py b/app/services/study/explanation_rag.py index dc088b1..4e3616d 100644 --- a/app/services/study/explanation_rag.py +++ b/app/services/study/explanation_rag.py @@ -252,12 +252,15 @@ async def gather_explanation_context( client = AIClient() query = _build_query(question) try: - # 두 조회 병렬화 (rerank 호출이 별개라 lock 충돌 없음) - docs, questions = await asyncio.gather( - _gather_document_evidence(session, user_id, question.study_topic_id, query, client), - _gather_question_evidence( - session, user_id, question.study_topic_id, question.id, query, client - ), + # 같은 AsyncSession 을 asyncio.gather 로 동시 execute 에 넘기면 SQLAlchemy async 가 + # 'another operation in progress' 로 부하 의존적 비결정 크래시(이전 주석 'lock 충돌 + # 없음' 은 rerank HTTP 만 보고 DB execute 동시성을 간과한 오인). 백그라운드 prefetch + # 라 순차 직렬화 — 사용자 대면 rewrite 경로(독립 세션 fan-out)와는 다른 처방. + docs = await _gather_document_evidence( + session, user_id, question.study_topic_id, query, client + ) + questions = await _gather_question_evidence( + session, user_id, question.study_topic_id, question.id, query, client ) return ExplanationContext(documents=docs, questions=questions) finally: diff --git a/app/services/study/subject_note_rag.py b/app/services/study/subject_note_rag.py index 17bcfeb..0f398bb 100644 --- a/app/services/study/subject_note_rag.py +++ b/app/services/study/subject_note_rag.py @@ -238,9 +238,13 @@ async def gather_subject_note_context( client = AIClient() query = _build_query(subject, scope) try: - docs, questions = await asyncio.gather( - _gather_document_evidence(session, user_id, study_topic_id, query, client), - _gather_question_evidence(session, user_id, study_topic_id, subject, scope, query, client), + # 같은 AsyncSession 동시 execute 회피 — 순차 직렬화(백그라운드 prefetch). + # explanation_rag.gather_explanation_context 와 동형(R2 공유세션 동시성 수정). + docs = await _gather_document_evidence( + session, user_id, study_topic_id, query, client + ) + questions = await _gather_question_evidence( + session, user_id, study_topic_id, subject, scope, query, client ) return SubjectNoteContext(documents=docs, questions=questions) finally: