"""study_questions AI 풀이 RAG 근거 수집 — PR-3. RAG 입력 풀: - 1순위: 같은 study_topic 매핑된 documents 본문 (study_topic_documents) → ai_summary + extracted_text 청크 - 2순위: 같은 토픽의 다른 study_questions question_text + explanation + (status='ready') ai_explanation 현재 question_id 자기 자신 제외. 재귀적 hallucination 방지를 위해 ai_explanation 은 ready 상태만. - 제외: study_sessions.ocr_text (필기) / 외부 웹 / 다른 토픽 bge-reranker-v2-m3 (TEI :80/rerank) 로 top-K 줄임. """ from __future__ import annotations import asyncio import logging from dataclasses import dataclass import httpx from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from ai.client import AIClient from models.chunk import DocumentChunk from models.document import Document from models.study_question import StudyQuestion from models.study_topic import StudyTopicDocument logger = logging.getLogger(__name__) # top-K DOC_TOPK = 5 Q_TOPK = 3 # document 본문 청크가 너무 길면 reranker 입력 제한 (TEI VRAM 보호) — 기존 rerank_service 와 일관 MAX_RERANK_INPUT = 60 # 각 evidence snippet 길이 제한 (LLM 컨텍스트 관리) DOC_SNIPPET_LEN = 400 Q_SNIPPET_LEN = 300 # 토픽 내 다른 문제 후보 hard cap (reranker 부담 방지) MAX_QUESTION_CANDIDATES = 30 # reranker timeout RERANK_TIMEOUT_S = 5.0 @dataclass class EvidenceItem: source_type: str # 'document' | 'question' source_id: int title: str snippet: str def to_dict(self) -> dict: return { "source_type": self.source_type, "source_id": self.source_id, "title": self.title, "snippet": self.snippet, } @dataclass class ExplanationContext: """LLM 프롬프트 조립용. evidence 는 응답에 그대로 노출.""" documents: list[EvidenceItem] questions: list[EvidenceItem] @property def all(self) -> list[EvidenceItem]: return [*self.documents, *self.questions] def _truncate(text: str, n: int) -> str: if not text: return "" s = text.strip() return s if len(s) <= n else s[:n].rstrip() + "…" def _build_query(question: StudyQuestion) -> str: """rerank query — 문제 본문 + 보기 1~4 합쳐 1 문자열.""" return " | ".join([ question.question_text or "", question.choice_1 or "", question.choice_2 or "", question.choice_3 or "", question.choice_4 or "", ]) async def _rerank(client: AIClient, query: str, texts: list[str], top_k: int) -> list[int]: """bge-reranker 호출. 실패 시 [0..N] 자연 순서 fallback. 반환: top_k indices.""" if not texts: return [] if len(texts) > MAX_RERANK_INPUT: texts = texts[:MAX_RERANK_INPUT] try: async with asyncio.timeout(RERANK_TIMEOUT_S): results = await client.rerank(query, texts) # results: [{"index": int, "score": float}, ...] idxs = [r["index"] for r in results if 0 <= r.get("index", -1) < len(texts)] return idxs[:top_k] except (asyncio.TimeoutError, httpx.HTTPError) as e: logger.warning("study_explanation_rerank_fallback: %s: %s", type(e).__name__, e) return list(range(min(top_k, len(texts)))) async def _gather_document_evidence( session: AsyncSession, user_id: int, study_topic_id: int, query: str, client: AIClient, ) -> list[EvidenceItem]: """study_topic 매핑 documents 의 청크 + ai_summary 를 reranker 통과해 top-K 5건.""" doc_id_rows = ( await session.execute( select(StudyTopicDocument.document_id).where( StudyTopicDocument.study_topic_id == study_topic_id, StudyTopicDocument.user_id == user_id, ) ) ).scalars().all() doc_ids = list(doc_id_rows) if not doc_ids: return [] # 매핑된 documents 메타 (제목·요약 표기) doc_meta_rows = ( await session.execute( select(Document.id, Document.title, Document.ai_summary).where( Document.id.in_(doc_ids), Document.deleted_at.is_(None), ) ) ).all() doc_meta: dict[int, tuple[str | None, str | None]] = { r.id: (r.title, r.ai_summary) for r in doc_meta_rows } if not doc_meta: return [] valid_doc_ids = list(doc_meta.keys()) # 1) 청크 후보 — doc_id 별 chunk_index 0..3 (앞부분 우선) 으로 hard cap chunk_rows = ( await session.execute( select(DocumentChunk.doc_id, DocumentChunk.chunk_index, DocumentChunk.text) .where( DocumentChunk.doc_id.in_(valid_doc_ids), DocumentChunk.chunk_index < 4, ) .order_by(DocumentChunk.doc_id, DocumentChunk.chunk_index) ) ).all() # 2) ai_summary 도 후보로 추가 (요약 자체가 핵심 근거가 될 때 많음) candidates: list[tuple[int, str]] = [] # (doc_id, text) for r in chunk_rows: if r.text: candidates.append((r.doc_id, r.text)) for did, (_title, summary) in doc_meta.items(): if summary: candidates.append((did, summary)) if not candidates: return [] texts = [_truncate(t, 800) for _, t in candidates] top_idxs = await _rerank(client, query, texts, DOC_TOPK) seen_doc_ids: set[int] = set() out: list[EvidenceItem] = [] for i in top_idxs: did, text = candidates[i] # 한 문서당 1 evidence (다양성 확보) if did in seen_doc_ids: continue seen_doc_ids.add(did) title = doc_meta.get(did, (None, None))[0] or f"문서 #{did}" out.append(EvidenceItem( source_type="document", source_id=did, title=title, snippet=_truncate(text, DOC_SNIPPET_LEN), )) if len(out) >= DOC_TOPK: break return out async def _gather_question_evidence( session: AsyncSession, user_id: int, study_topic_id: int, current_question_id: int, query: str, client: AIClient, ) -> list[EvidenceItem]: """같은 토픽의 다른 문제 + 해설 reranker 통과 top-K 3건. 자기 자신 제외.""" rows = ( await session.execute( select(StudyQuestion).where( StudyQuestion.user_id == user_id, StudyQuestion.study_topic_id == study_topic_id, StudyQuestion.id != current_question_id, StudyQuestion.deleted_at.is_(None), ) .order_by(StudyQuestion.created_at.desc()) .limit(MAX_QUESTION_CANDIDATES) ) ).scalars().all() if not rows: return [] candidates_text: list[str] = [] for q in rows: parts = [q.question_text or ""] if q.explanation: parts.append(q.explanation) # ai_explanation 은 ready 상태만 — 재귀적 hallucination 방지 if q.ai_explanation and q.ai_explanation_status == "ready": parts.append(q.ai_explanation) candidates_text.append(" | ".join(parts)) top_idxs = await _rerank(client, query, candidates_text, Q_TOPK) out: list[EvidenceItem] = [] for i in top_idxs: q = rows[i] title_head = _truncate(q.question_text or "", 40) # 자기 자신 제외 보장 (이중 안전장치) if q.id == current_question_id: continue out.append(EvidenceItem( source_type="question", source_id=q.id, title=f"Q{q.id}: {title_head}", snippet=_truncate(candidates_text[i], Q_SNIPPET_LEN), )) return out async def gather_explanation_context( session: AsyncSession, user_id: int, question: StudyQuestion, ) -> ExplanationContext: """현재 문제에 대한 RAG 근거 수집 (자료 + 같은 토픽 다른 문제). 자기 자신 제외.""" client = AIClient() query = _build_query(question) try: # 두 조회 병렬화 (rerank 호출이 별개라 lock 충돌 없음) docs, questions = await asyncio.gather( _gather_document_evidence(session, user_id, question.study_topic_id, query, client), _gather_question_evidence( session, user_id, question.study_topic_id, question.id, query, client ), ) return ExplanationContext(documents=docs, questions=questions) finally: await client.close() def render_evidence_block(items: list[EvidenceItem]) -> str: """프롬프트 본문에 끼울 텍스트 블록. 비어있으면 '(없음)'.""" if not items: return "(없음)" lines = [] for it in items: if it.source_type == "document": lines.append(f"- [자료: {it.title}] {it.snippet}") else: lines.append(f"- [{it.title}] {it.snippet}") return "\n".join(lines)