Files
hyungi_document_server/app/services/study/explanation_rag.py
T
hyungi 9c22337647 fix(search): 공유 AsyncSession 동시 쿼리 직렬화/세션 분리 + rewrite axis 누락 (R2)
asyncio.gather 가 단일 AsyncSession 에 동시 execute 를 진입시켜 부하 의존적
'another operation in progress' 비결정 크래시 (정상 순차 경로에서만 검증돼 잠복).
사이트별 처방(균일 처방 회피):
- search_with_rewrite._variant_retrieve: variant 마다 독립 async_session() fan-out
  (사용자 대면 — N variant 병렬 유지)
- study explanation_rag / subject_note_rag: 백그라운드 prefetch 라 순차 직렬화
  (rerank 도 순차 — DB 순차+rerank gather 분할은 _gather_* 4곳 침습이라 보류,
   배경 작업의 rerank 병렬 이득 미미)

추가: rewrite(multi-query) 경로가 axis 필터(material_type/jurisdiction/year)를
single-query path 와 달리 조용히 누락 — search_with_rewrite 에 axis 인자 + _variant_retrieve
가 search_text/search_vector 에 전달.

검증: py_compile 통과. 동시 N variant 부하 테스트(staging)로 크래시 소거 확인 예정.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:18:17 +09:00

281 lines
9.6 KiB
Python

"""study_questions AI 풀이 RAG 근거 수집 — PR-3.
RAG 입력 풀:
- 1순위: 같은 study_topic 매핑된 documents 본문 (study_topic_documents) → ai_summary + extracted_text 청크
- 2순위: 같은 토픽의 다른 study_questions question_text + explanation + (status='ready') ai_explanation
현재 question_id 자기 자신 제외. 재귀적 hallucination 방지를 위해 ai_explanation 은 ready 상태만.
- 제외: study_sessions.ocr_text (필기) / 외부 웹 / 다른 토픽
bge-reranker-v2-m3 (TEI :80/rerank) 로 top-K 줄임.
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
from models.chunk import DocumentChunk
from models.document import Document
from models.study_question import StudyQuestion
from models.study_topic import StudyTopicDocument
from services.search.license_filter import restricted_exclude_orm
logger = logging.getLogger(__name__)
# top-K
DOC_TOPK = 5
Q_TOPK = 3
# document 본문 청크가 너무 길면 reranker 입력 제한 (TEI VRAM 보호) — 기존 rerank_service 와 일관
MAX_RERANK_INPUT = 60
# 각 evidence snippet 길이 제한 (LLM 컨텍스트 관리)
DOC_SNIPPET_LEN = 400
Q_SNIPPET_LEN = 300
# 토픽 내 다른 문제 후보 hard cap (reranker 부담 방지)
MAX_QUESTION_CANDIDATES = 30
# reranker timeout
RERANK_TIMEOUT_S = 5.0
@dataclass
class EvidenceItem:
source_type: str # 'document' | 'question'
source_id: int
title: str
snippet: str
def to_dict(self) -> dict:
return {
"source_type": self.source_type,
"source_id": self.source_id,
"title": self.title,
"snippet": self.snippet,
}
@dataclass
class ExplanationContext:
"""LLM 프롬프트 조립용. evidence 는 응답에 그대로 노출."""
documents: list[EvidenceItem]
questions: list[EvidenceItem]
@property
def all(self) -> list[EvidenceItem]:
return [*self.documents, *self.questions]
def _truncate(text: str, n: int) -> str:
if not text:
return ""
s = text.strip()
return s if len(s) <= n else s[:n].rstrip() + ""
def _build_query(question: StudyQuestion) -> str:
"""rerank query — 문제 본문 + 보기 1~4 합쳐 1 문자열."""
return " | ".join([
question.question_text or "",
question.choice_1 or "",
question.choice_2 or "",
question.choice_3 or "",
question.choice_4 or "",
])
async def _rerank(client: AIClient, query: str, texts: list[str], top_k: int) -> list[int]:
"""bge-reranker 호출. 실패 시 [0..N] 자연 순서 fallback. 반환: top_k indices."""
if not texts:
return []
if len(texts) > MAX_RERANK_INPUT:
texts = texts[:MAX_RERANK_INPUT]
try:
async with asyncio.timeout(RERANK_TIMEOUT_S):
results = await client.rerank(query, texts)
# results: [{"index": int, "score": float}, ...]
idxs = [r["index"] for r in results if 0 <= r.get("index", -1) < len(texts)]
return idxs[:top_k]
except (asyncio.TimeoutError, httpx.HTTPError) as e:
logger.warning("study_explanation_rerank_fallback: %s: %s", type(e).__name__, e)
return list(range(min(top_k, len(texts))))
async def _gather_document_evidence(
session: AsyncSession,
user_id: int,
study_topic_id: int,
query: str,
client: AIClient,
) -> list[EvidenceItem]:
"""study_topic 매핑 documents 의 청크 + ai_summary 를 reranker 통과해 top-K 5건."""
doc_id_rows = (
await session.execute(
select(StudyTopicDocument.document_id).where(
StudyTopicDocument.study_topic_id == study_topic_id,
StudyTopicDocument.user_id == user_id,
)
)
).scalars().all()
doc_ids = list(doc_id_rows)
if not doc_ids:
return []
# 매핑된 documents 메타 (제목·요약 표기)
# B-4: licensed_restricted 제외 → valid_doc_ids 에서 빠지므로 아래 청크 쿼리(doc_id IN)도
# 자동 차단. study 풀이 RAG 도 retrieval/digest 와 동일 단일 술어 공유(a안 U-2①).
doc_meta_rows = (
await session.execute(
select(Document.id, Document.title, Document.ai_summary).where(
Document.id.in_(doc_ids),
Document.deleted_at.is_(None),
restricted_exclude_orm(),
)
)
).all()
doc_meta: dict[int, tuple[str | None, str | None]] = {
r.id: (r.title, r.ai_summary) for r in doc_meta_rows
}
if not doc_meta:
return []
valid_doc_ids = list(doc_meta.keys())
# 1) 청크 후보 — doc_id 별 chunk_index 0..3 (앞부분 우선) 으로 hard cap
chunk_rows = (
await session.execute(
select(DocumentChunk.doc_id, DocumentChunk.chunk_index, DocumentChunk.text)
.where(
DocumentChunk.doc_id.in_(valid_doc_ids),
DocumentChunk.chunk_index < 4,
# Hier-Decomp-1 c2: 교체된 doc 의 legacy(in_corpus=false) chunk 중복 로드 방지.
DocumentChunk.in_corpus.is_(True),
)
.order_by(DocumentChunk.doc_id, DocumentChunk.chunk_index)
)
).all()
# 2) ai_summary 도 후보로 추가 (요약 자체가 핵심 근거가 될 때 많음)
candidates: list[tuple[int, str]] = [] # (doc_id, text)
for r in chunk_rows:
if r.text:
candidates.append((r.doc_id, r.text))
for did, (_title, summary) in doc_meta.items():
if summary:
candidates.append((did, summary))
if not candidates:
return []
texts = [_truncate(t, 800) for _, t in candidates]
top_idxs = await _rerank(client, query, texts, DOC_TOPK)
seen_doc_ids: set[int] = set()
out: list[EvidenceItem] = []
for i in top_idxs:
did, text = candidates[i]
# 한 문서당 1 evidence (다양성 확보)
if did in seen_doc_ids:
continue
seen_doc_ids.add(did)
title = doc_meta.get(did, (None, None))[0] or f"문서 #{did}"
out.append(EvidenceItem(
source_type="document",
source_id=did,
title=title,
snippet=_truncate(text, DOC_SNIPPET_LEN),
))
if len(out) >= DOC_TOPK:
break
return out
async def _gather_question_evidence(
session: AsyncSession,
user_id: int,
study_topic_id: int,
current_question_id: int,
query: str,
client: AIClient,
) -> list[EvidenceItem]:
"""같은 토픽의 다른 문제 + 해설 reranker 통과 top-K 3건. 자기 자신 제외."""
rows = (
await session.execute(
select(StudyQuestion).where(
StudyQuestion.user_id == user_id,
StudyQuestion.study_topic_id == study_topic_id,
StudyQuestion.id != current_question_id,
StudyQuestion.deleted_at.is_(None),
)
.order_by(StudyQuestion.created_at.desc())
.limit(MAX_QUESTION_CANDIDATES)
)
).scalars().all()
if not rows:
return []
candidates_text: list[str] = []
for q in rows:
parts = [q.question_text or ""]
if q.explanation:
parts.append(q.explanation)
# ai_explanation 은 ready 상태만 — 재귀적 hallucination 방지
if q.ai_explanation and q.ai_explanation_status == "ready":
parts.append(q.ai_explanation)
candidates_text.append(" | ".join(parts))
top_idxs = await _rerank(client, query, candidates_text, Q_TOPK)
out: list[EvidenceItem] = []
for i in top_idxs:
q = rows[i]
title_head = _truncate(q.question_text or "", 40)
# 자기 자신 제외 보장 (이중 안전장치)
if q.id == current_question_id:
continue
out.append(EvidenceItem(
source_type="question",
source_id=q.id,
title=f"Q{q.id}: {title_head}",
snippet=_truncate(candidates_text[i], Q_SNIPPET_LEN),
))
return out
async def gather_explanation_context(
session: AsyncSession,
user_id: int,
question: StudyQuestion,
) -> ExplanationContext:
"""현재 문제에 대한 RAG 근거 수집 (자료 + 같은 토픽 다른 문제). 자기 자신 제외."""
client = AIClient()
query = _build_query(question)
try:
# 같은 AsyncSession 을 asyncio.gather 로 동시 execute 에 넘기면 SQLAlchemy async 가
# 'another operation in progress' 로 부하 의존적 비결정 크래시(이전 주석 'lock 충돌
# 없음' 은 rerank HTTP 만 보고 DB execute 동시성을 간과한 오인). 백그라운드 prefetch
# 라 순차 직렬화 — 사용자 대면 rewrite 경로(독립 세션 fan-out)와는 다른 처방.
docs = await _gather_document_evidence(
session, user_id, question.study_topic_id, query, client
)
questions = await _gather_question_evidence(
session, user_id, question.study_topic_id, question.id, query, client
)
return ExplanationContext(documents=docs, questions=questions)
finally:
await client.close()
def render_evidence_block(items: list[EvidenceItem]) -> str:
"""프롬프트 본문에 끼울 텍스트 블록. 비어있으면 '(없음)'."""
if not items:
return "(없음)"
lines = []
for it in items:
if it.source_type == "document":
lines.append(f"- [자료: {it.title}] {it.snippet}")
else:
lines.append(f"- [{it.title}] {it.snippet}")
return "\n".join(lines)