Files
hyungi_document_server/app/services/study/explanation_rag.py
T
hyungi f940f50c60 feat(search): route retrieval through corpus_chunks view (Hier-Decomp-1 c2)
baseline chunk 벡터검색을 document_chunks → corpus_chunks 뷰(in_corpus=true)로 rewire.
in_corpus=false(비활성 hier leaf 등) 자동 제외 = 검색 오염 구조적 차단(B choke point).

- retrieval_service: baseline chunks_table=corpus_chunks, _VALID_CHUNKS_TABLE 에 corpus_chunks 허용,
  snapshot_clause 조건 corpus_chunks 포함(eval snapshot 보존). candidate(cand_*) 경로 불변.
  documents 측(FTS+doc embedding) 무변경 — doc row 는 교체 무관.
- models/chunk: 5 신규 컬럼 매핑(parent_id/level/node_type/is_leaf/in_corpus). server_default 로
  기존 chunk_worker INSERT 무영향(legacy=in_corpus true/is_leaf false).
- subject_note_rag/explanation_rag: RAG chunk 로드에 in_corpus=true 필터(교체 doc legacy 중복 방지).

게이트: G4b(rewire 불변) before/after IDENTICAL(현재 view==table no-op) / G4a(누출) synthetic
in_corpus=false leaf 가 corpus_chunks 0건·document_chunks raw top(dist 0.0) 양방향 증명. /health 200.

plan: hierarchical-decomposition-tiered-nesting-marmot.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 12:58:28 +00:00

274 lines
9.0 KiB
Python

"""study_questions AI 풀이 RAG 근거 수집 — PR-3.
RAG 입력 풀:
- 1순위: 같은 study_topic 매핑된 documents 본문 (study_topic_documents) → ai_summary + extracted_text 청크
- 2순위: 같은 토픽의 다른 study_questions question_text + explanation + (status='ready') ai_explanation
현재 question_id 자기 자신 제외. 재귀적 hallucination 방지를 위해 ai_explanation 은 ready 상태만.
- 제외: study_sessions.ocr_text (필기) / 외부 웹 / 다른 토픽
bge-reranker-v2-m3 (TEI :80/rerank) 로 top-K 줄임.
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
from models.chunk import DocumentChunk
from models.document import Document
from models.study_question import StudyQuestion
from models.study_topic import StudyTopicDocument
logger = logging.getLogger(__name__)
# top-K
DOC_TOPK = 5
Q_TOPK = 3
# document 본문 청크가 너무 길면 reranker 입력 제한 (TEI VRAM 보호) — 기존 rerank_service 와 일관
MAX_RERANK_INPUT = 60
# 각 evidence snippet 길이 제한 (LLM 컨텍스트 관리)
DOC_SNIPPET_LEN = 400
Q_SNIPPET_LEN = 300
# 토픽 내 다른 문제 후보 hard cap (reranker 부담 방지)
MAX_QUESTION_CANDIDATES = 30
# reranker timeout
RERANK_TIMEOUT_S = 5.0
@dataclass
class EvidenceItem:
source_type: str # 'document' | 'question'
source_id: int
title: str
snippet: str
def to_dict(self) -> dict:
return {
"source_type": self.source_type,
"source_id": self.source_id,
"title": self.title,
"snippet": self.snippet,
}
@dataclass
class ExplanationContext:
"""LLM 프롬프트 조립용. evidence 는 응답에 그대로 노출."""
documents: list[EvidenceItem]
questions: list[EvidenceItem]
@property
def all(self) -> list[EvidenceItem]:
return [*self.documents, *self.questions]
def _truncate(text: str, n: int) -> str:
if not text:
return ""
s = text.strip()
return s if len(s) <= n else s[:n].rstrip() + ""
def _build_query(question: StudyQuestion) -> str:
"""rerank query — 문제 본문 + 보기 1~4 합쳐 1 문자열."""
return " | ".join([
question.question_text or "",
question.choice_1 or "",
question.choice_2 or "",
question.choice_3 or "",
question.choice_4 or "",
])
async def _rerank(client: AIClient, query: str, texts: list[str], top_k: int) -> list[int]:
"""bge-reranker 호출. 실패 시 [0..N] 자연 순서 fallback. 반환: top_k indices."""
if not texts:
return []
if len(texts) > MAX_RERANK_INPUT:
texts = texts[:MAX_RERANK_INPUT]
try:
async with asyncio.timeout(RERANK_TIMEOUT_S):
results = await client.rerank(query, texts)
# results: [{"index": int, "score": float}, ...]
idxs = [r["index"] for r in results if 0 <= r.get("index", -1) < len(texts)]
return idxs[:top_k]
except (asyncio.TimeoutError, httpx.HTTPError) as e:
logger.warning("study_explanation_rerank_fallback: %s: %s", type(e).__name__, e)
return list(range(min(top_k, len(texts))))
async def _gather_document_evidence(
session: AsyncSession,
user_id: int,
study_topic_id: int,
query: str,
client: AIClient,
) -> list[EvidenceItem]:
"""study_topic 매핑 documents 의 청크 + ai_summary 를 reranker 통과해 top-K 5건."""
doc_id_rows = (
await session.execute(
select(StudyTopicDocument.document_id).where(
StudyTopicDocument.study_topic_id == study_topic_id,
StudyTopicDocument.user_id == user_id,
)
)
).scalars().all()
doc_ids = list(doc_id_rows)
if not doc_ids:
return []
# 매핑된 documents 메타 (제목·요약 표기)
doc_meta_rows = (
await session.execute(
select(Document.id, Document.title, Document.ai_summary).where(
Document.id.in_(doc_ids),
Document.deleted_at.is_(None),
)
)
).all()
doc_meta: dict[int, tuple[str | None, str | None]] = {
r.id: (r.title, r.ai_summary) for r in doc_meta_rows
}
if not doc_meta:
return []
valid_doc_ids = list(doc_meta.keys())
# 1) 청크 후보 — doc_id 별 chunk_index 0..3 (앞부분 우선) 으로 hard cap
chunk_rows = (
await session.execute(
select(DocumentChunk.doc_id, DocumentChunk.chunk_index, DocumentChunk.text)
.where(
DocumentChunk.doc_id.in_(valid_doc_ids),
DocumentChunk.chunk_index < 4,
# Hier-Decomp-1 c2: 교체된 doc 의 legacy(in_corpus=false) chunk 중복 로드 방지.
DocumentChunk.in_corpus.is_(True),
)
.order_by(DocumentChunk.doc_id, DocumentChunk.chunk_index)
)
).all()
# 2) ai_summary 도 후보로 추가 (요약 자체가 핵심 근거가 될 때 많음)
candidates: list[tuple[int, str]] = [] # (doc_id, text)
for r in chunk_rows:
if r.text:
candidates.append((r.doc_id, r.text))
for did, (_title, summary) in doc_meta.items():
if summary:
candidates.append((did, summary))
if not candidates:
return []
texts = [_truncate(t, 800) for _, t in candidates]
top_idxs = await _rerank(client, query, texts, DOC_TOPK)
seen_doc_ids: set[int] = set()
out: list[EvidenceItem] = []
for i in top_idxs:
did, text = candidates[i]
# 한 문서당 1 evidence (다양성 확보)
if did in seen_doc_ids:
continue
seen_doc_ids.add(did)
title = doc_meta.get(did, (None, None))[0] or f"문서 #{did}"
out.append(EvidenceItem(
source_type="document",
source_id=did,
title=title,
snippet=_truncate(text, DOC_SNIPPET_LEN),
))
if len(out) >= DOC_TOPK:
break
return out
async def _gather_question_evidence(
session: AsyncSession,
user_id: int,
study_topic_id: int,
current_question_id: int,
query: str,
client: AIClient,
) -> list[EvidenceItem]:
"""같은 토픽의 다른 문제 + 해설 reranker 통과 top-K 3건. 자기 자신 제외."""
rows = (
await session.execute(
select(StudyQuestion).where(
StudyQuestion.user_id == user_id,
StudyQuestion.study_topic_id == study_topic_id,
StudyQuestion.id != current_question_id,
StudyQuestion.deleted_at.is_(None),
)
.order_by(StudyQuestion.created_at.desc())
.limit(MAX_QUESTION_CANDIDATES)
)
).scalars().all()
if not rows:
return []
candidates_text: list[str] = []
for q in rows:
parts = [q.question_text or ""]
if q.explanation:
parts.append(q.explanation)
# ai_explanation 은 ready 상태만 — 재귀적 hallucination 방지
if q.ai_explanation and q.ai_explanation_status == "ready":
parts.append(q.ai_explanation)
candidates_text.append(" | ".join(parts))
top_idxs = await _rerank(client, query, candidates_text, Q_TOPK)
out: list[EvidenceItem] = []
for i in top_idxs:
q = rows[i]
title_head = _truncate(q.question_text or "", 40)
# 자기 자신 제외 보장 (이중 안전장치)
if q.id == current_question_id:
continue
out.append(EvidenceItem(
source_type="question",
source_id=q.id,
title=f"Q{q.id}: {title_head}",
snippet=_truncate(candidates_text[i], Q_SNIPPET_LEN),
))
return out
async def gather_explanation_context(
session: AsyncSession,
user_id: int,
question: StudyQuestion,
) -> ExplanationContext:
"""현재 문제에 대한 RAG 근거 수집 (자료 + 같은 토픽 다른 문제). 자기 자신 제외."""
client = AIClient()
query = _build_query(question)
try:
# 두 조회 병렬화 (rerank 호출이 별개라 lock 충돌 없음)
docs, questions = await asyncio.gather(
_gather_document_evidence(session, user_id, question.study_topic_id, query, client),
_gather_question_evidence(
session, user_id, question.study_topic_id, question.id, query, client
),
)
return ExplanationContext(documents=docs, questions=questions)
finally:
await client.close()
def render_evidence_block(items: list[EvidenceItem]) -> str:
"""프롬프트 본문에 끼울 텍스트 블록. 비어있으면 '(없음)'."""
if not items:
return "(없음)"
lines = []
for it in items:
if it.source_type == "document":
lines.append(f"- [자료: {it.title}] {it.snippet}")
else:
lines.append(f"- [{it.title}] {it.snippet}")
return "\n".join(lines)