feat(study): Phase 4-B v1 세션 단위 종합 분석 (자유 마크다운)

Phase 4-A 가 wrong/unsure 한 문제씩 풀이 캐시. 4-B 는 세션 전체 wrong/unsure
5~30건을 묶어 200~400자 자연어 요약 1건 생성. 결과 화면 헤더 카드.

큐 인프라는 4-A study_question_jobs 와 분리 — FK 단일 의미 + 운영 SQL 명확성
+ 4-A/4-B 가드/payload/재시도 정책 차이. 신규 study_quiz_session_jobs (큐) +
study_quiz_session_analysis (결과 캐시 PK=session_id, UPSERT) + 전용 consumer.

Backend:
- migrations/233 — study_quiz_session_jobs (FK study_quiz_sessions NOT NULL,
  status pending/processing/completed/failed/skipped, max_attempts=2)
- migrations/234 — partial unique idx (session_id) WHERE pending/processing
- migrations/235 — study_quiz_session_analysis (session_id PK, summary_md,
  confidence, model_name, generated_at, is_stale)
- models/study_quiz_session_job — ORM + enqueue_session_analysis_job() (멱등)
- models/study_quiz_session_analysis — ORM (PK = session_id)
- services/study/session_summary_guard — GUARD_PATTERN (정규식) +
  normalize_confidence() 단일 source, worker + tests 가 import 공유
- services/study/session_summary_rag — gather_session_summary_context()
  documents 만 (PR-3 _gather_document_evidence 재사용). evidence 없어도 호출
  허용 (4-A 와 다른 정책 — 세션 기록 자체가 evidence)
- services/study/session_analysis_enqueue — auto (finalize/fallback) +
  request_session_analysis_regenerate (manual). manual 은 wrong/unsure < 5
  즉시 차단, active job 차단, 기존 analysis 있으면 is_stale=true 박기
- prompts/study_session_summary_envelope.txt — envelope JSON
  {summary_md, confidence}. 정량 정수만 인용 가능, 비율/추세/범위/날짜 금지
- workers/study_session_analysis_worker — terminal status 분기:
  · wrong/unsure < 5 → status=skipped, error_code=insufficient_attempts
  · question_text/outcome 부족 → skipped, evidence_missing
  · GUARD_PATTERN match → failed, guard_fail
  · 800자 hard cap + confidence normalize
  · timeout/parse/unknown → 재시도 후보
  · UPSERT study_quiz_session_analysis ON CONFLICT DO UPDATE (PK session_id)
- workers/study_session_queue_consumer — 4-A consumer 패턴 복제. BATCH_SIZE=1
  + STALE_MINUTES=10. MLX gate 4-A 와 공유 (Semaphore(1))
- main.py — APScheduler add_job(consume_study_session_queue, ..., 1분 주기)
- session_finalize — 끝에서 enqueue_session_analysis_auto (best-effort)
- api/study_topics:
  · QuizSessionAnalysisOut + ai_session_analysis 응답 필드 (analysis row +
    최신 job status/error_code)
  · GET fallback enqueue (기존 analysis 또는 active job 없으면만, non-blocking)
  · POST /quiz-sessions/{sid}/regenerate-summary — manual 트리거

Frontend (quiz-sessions/[sid]/+page.svelte):
- 결과 헤더에 세션 요약 카드 (AI 풀이 indicator 직후, 바로 할 일 직전)
- summary_md 박혔으면 markdown 렌더, 없으면 job_status / error_code 분기:
  · pending/processing → "AI 가 세션 분석 중"
  · insufficient_attempts → "오답·모르겠음 5건 미만"
  · evidence_missing → "자료 부족"
  · guard_fail → "환각 검증 차단" + 재생성 링크
- confidence='low' 배지 + is_stale "재생성 중" 배지
- 재생성 버튼 + regenerateSummary() — reason 별 toast 분기

ship gate:
- tests/test_session_summary_guard_pattern.py — 허용 5 + 차단 7 케이스 +
  normalize_confidence 표준/비표준 검증. python3 직접 실행 패스.

Plan: ~/.claude/plans/nifty-sparking-spindle.md (Phase 4-B v1)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-05-02 07:20:29 +09:00
parent c7630b9815
commit 6785d53d3d
16 changed files with 1074 additions and 0 deletions
+108
View File
@@ -1687,10 +1687,22 @@ class QuizSessionAttemptItem(BaseModel):
reviewed_at: datetime | None
class QuizSessionAnalysisOut(BaseModel):
"""Phase 4-B v1: 결과 화면 헤더 카드용. summary_md 박혔으면 본문 표시,
없으면 job_status / job_error_code 보고 placeholder 분기."""
summary_md: str | None
confidence: str | None
generated_at: datetime | None
is_stale: bool
job_status: str | None # 최신 job — 'pending'/'processing'/'completed'/'failed'/'skipped'
job_error_code: str | None # 최신 job 의 error_code (실패/skip 사유 노출용)
class QuizSessionDetailResponse(BaseModel):
summary: QuizSessionSummary
questions: list[dict] # ReviewQuestionItem 호환 shape
attempts: list[QuizSessionAttemptItem] # 풀이된 것만 (cursor 까지)
ai_session_analysis: QuizSessionAnalysisOut | None = None
def _attempt_stats_dict_default() -> dict:
@@ -1836,10 +1848,69 @@ async def get_quiz_session(
qs.id, type(e).__name__, e,
)
# Phase 4-B v1: ai_session_analysis 응답 (결과 캐시 LEFT JOIN + 최신 job)
ai_session_analysis: QuizSessionAnalysisOut | None = None
if qs.status == "done":
try:
from models.study_quiz_session_analysis import StudyQuizSessionAnalysis
from models.study_quiz_session_job import StudyQuizSessionJob
an_row = (
await session.execute(
select(StudyQuizSessionAnalysis).where(
StudyQuizSessionAnalysis.study_quiz_session_id == qs.id
)
)
).scalar_one_or_none()
latest_job = (
await session.execute(
select(StudyQuizSessionJob)
.where(StudyQuizSessionJob.study_quiz_session_id == qs.id)
.order_by(StudyQuizSessionJob.id.desc())
.limit(1)
)
).scalar_one_or_none()
if an_row is not None or latest_job is not None:
ai_session_analysis = QuizSessionAnalysisOut(
summary_md=an_row.summary_md if an_row else None,
confidence=an_row.confidence if an_row else None,
generated_at=an_row.generated_at if an_row else None,
is_stale=bool(an_row.is_stale) if an_row else False,
job_status=latest_job.status if latest_job else None,
job_error_code=latest_job.error_code if latest_job else None,
)
# GET fallback enqueue — 기존 analysis 또는 active job 없으면만 시도.
# best-effort: 실패가 GET 응답 깨지 않게 try/except.
try:
if an_row is None and (latest_job is None or latest_job.status not in ("pending", "processing")):
from services.study.session_analysis_enqueue import enqueue_session_analysis_auto
res = await enqueue_session_analysis_auto(
session, user_id=user.id, study_quiz_session_id=qs.id,
)
await session.commit()
logger.debug(
"phase4b_get_backfill session=%s enqueued=%s",
qs.id, res["enqueued"],
)
except Exception as e:
logger.warning(
"phase4b_get_backfill_failed session=%s: %s: %s",
qs.id, type(e).__name__, e,
)
except Exception as e:
logger.warning(
"phase4b_get_analysis_failed session=%s: %s: %s",
qs.id, type(e).__name__, e,
)
return QuizSessionDetailResponse(
summary=summary,
questions=questions_payload,
attempts=attempts_payload,
ai_session_analysis=ai_session_analysis,
)
@@ -1873,6 +1944,43 @@ async def patch_quiz_session(
return await _build_session_summary(qs, session)
# ─── Phase 4-B v1: 세션 분석 재생성 ───
class RegenerateSummaryResponse(BaseModel):
enqueued: bool
reason: str | None = None # insufficient_attempts / already_active / not_done / not_found / race_lost
@router.post(
"/{topic_id}/quiz-sessions/{session_id}/regenerate-summary",
response_model=RegenerateSummaryResponse,
)
async def regenerate_session_summary(
topic_id: int,
session_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""사용자 [재생성] 버튼 — wrong/unsure < 5 즉시 차단 + active job 차단 + is_stale 처리.
Plan ~/.claude/plans/nifty-sparking-spindle.md (Phase 4-B v1) 의 Manual 트리거 정책 그대로.
"""
topic = await session.get(StudyTopic, topic_id)
_verify_topic_ownership(topic, user)
qs = await session.get(StudyQuizSession, session_id)
if qs is None or qs.user_id != user.id or qs.study_topic_id != topic_id:
raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다")
from services.study.session_analysis_enqueue import request_session_analysis_regenerate
res = await request_session_analysis_regenerate(
session, user_id=user.id, study_quiz_session_id=session_id,
)
await session.commit()
return RegenerateSummaryResponse(enqueued=res["enqueued"], reason=res.get("reason"))
# ─── PR-12-A: 반복 출제 / 유사 유형 배치 카운트 ───
+4
View File
@@ -45,6 +45,7 @@ async def lifespan(app: FastAPI):
from workers.news_collector import run as news_collector_run
from workers.queue_consumer import consume_queue
from workers.study_queue_consumer import consume_study_queue
from workers.study_session_queue_consumer import consume_study_session_queue
from workers.study_question_embed_worker import (
refresh_stale_related as study_q_related_refresh,
run as study_q_embed_run,
@@ -79,6 +80,9 @@ async def lifespan(app: FastAPI):
# Phase 4-A: study_question_jobs 처리 — wrong/unsure AI 풀이 prefetch.
# MLX gate 직렬화 + BATCH_SIZE=1 로 GPU 부하 통제. STALE_MINUTES=10 자체 복구.
scheduler.add_job(consume_study_queue, "interval", minutes=1, id="study_queue_consumer")
# Phase 4-B v1: study_quiz_session_jobs 처리 — 세션 단위 자유 마크다운 분석.
# 4-A 와 같은 MLX gate 공유 — 4-A 처리 중이면 직렬 대기.
scheduler.add_job(consume_study_session_queue, "interval", minutes=1, id="study_session_queue_consumer")
# PR-B 레거시 tier 백필 — 30분 주기로 호출되지만 KST 00:00~06:00 시간대만 실제 enqueue.
# safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화.
scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill")
+35
View File
@@ -0,0 +1,35 @@
"""study_quiz_session_analysis ORM (Phase 4-B v1) — 세션 단위 분석 결과 캐시.
session_id PK — 한 세션 = 한 분석 결과. worker 가 ON CONFLICT DO UPDATE 로 UPSERT.
job 이력은 study_quiz_session_jobs 에 별도 누적, 결과 캐시는 1 row.
is_stale=TRUE 는 [재생성] 클릭 후 worker 처리 끝까지만.
"""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class StudyQuizSessionAnalysis(Base):
__tablename__ = "study_quiz_session_analysis"
study_quiz_session_id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey("study_quiz_sessions.id", ondelete="CASCADE"),
primary_key=True,
)
user_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("users.id", ondelete="CASCADE"), nullable=False
)
summary_md: Mapped[str] = mapped_column(Text, nullable=False)
confidence: Mapped[str | None] = mapped_column(String(10))
model_name: Mapped[str | None] = mapped_column(String(120))
generated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
is_stale: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
+80
View File
@@ -0,0 +1,80 @@
"""study_quiz_session_jobs ORM (Phase 4-B v1) — 세션 단위 분석 작업 큐.
study_question_jobs 와 분리 — FK 단일 의미 (study_quiz_session_id NOT NULL)
+ 운영 SQL 명확성 + 4-A/4-B 가드/재시도 정책 차이.
terminal status (completed/failed/skipped) 는 completed_at 항상 기록.
재시도는 기존 row 를 pending 으로 되살리지 않고 새 row 생성 — 이력 누적.
v1 은 단일 작업 종류 ('analysis') 라 kind 컬럼 없이 session_id 만 키.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any
from sqlalchemy import BigInteger, DateTime, ForeignKey, SmallInteger, String, Text, text
from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class StudyQuizSessionJob(Base):
__tablename__ = "study_quiz_session_jobs"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
study_quiz_session_id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey("study_quiz_sessions.id", ondelete="CASCADE"),
nullable=False,
)
user_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("users.id", ondelete="CASCADE"), nullable=False
)
status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending")
attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=0)
max_attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=2)
error_code: Mapped[str | None] = mapped_column(String(40))
error_message: Mapped[str | None] = mapped_column(Text)
payload: Mapped[dict | None] = mapped_column(JSONB)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
async def enqueue_session_analysis_job(
session: AsyncSession,
*,
study_quiz_session_id: int,
user_id: int,
payload: dict[str, Any] | None = None,
) -> bool:
"""study_quiz_session_jobs 에 row 추가 (DB 레벨 중복 방어).
같은 session_id 의 활성 행 (pending/processing) 이 이미 있으면 False 반환.
terminal 이력은 별도 row 로 누적되므로 이번 호출이 failed/skipped/completed row 와
무관하게 새 active 행을 만들 수 있다.
Returns: True = 새 enqueue 발생, False = 중복으로 건너뜀.
"""
values: dict[str, Any] = {
"study_quiz_session_id": study_quiz_session_id,
"user_id": user_id,
"status": "pending",
}
if payload is not None:
values["payload"] = payload
stmt = (
pg_insert(StudyQuizSessionJob)
.values(**values)
.on_conflict_do_nothing(
index_elements=["study_quiz_session_id"],
index_where=text("status IN ('pending', 'processing')"),
)
)
result = await session.execute(stmt)
return result.rowcount > 0
@@ -0,0 +1,37 @@
당신은 한국 기사시험 학습 보조 AI 입니다.
사용자가 한 풀이 세션을 막 끝냈고, 그 결과를 짧게 정리하는 역할입니다.
【세션 정량 데이터 — 이 값들만 인용 가능】
- 총 {total}문제 / 정답 {correct}건 / 오답 {wrong}건 / 모르겠음 {unsure}건
- 새로 맞힘 {newly_correct}건 / 다시 틀림 {relapsed}건
- 회복 {recovered}건 / 누적 반복 오답 {chronic_remaining}건
【과목 분포】
{subject_distribution_block}
【이번 세션의 오답·모르겠음 문제들 (qid 별)】
{wrong_unsure_block}
【참고 자료 (있는 경우만)】
{documents_evidence_block}
【지침】
1. 이번 세션에서 사용자가 어느 영역에서 흔들렸는지 200~400자 마크다운으로 요약.
2. 톤은 "판정" 보다 "흔들린 것으로 보입니다", "관련 해설을 먼저 확인한 뒤 ..." 같이 부드럽게.
3. **위 정량 데이터에 박힌 정수 (예: 오답 1건, 모르겠음 83건) 외의 수치는 절대 언급 금지**:
- 정답률 N% 같은 비율
- 최근/지난 N일 추세
- X~Y 문항 같은 범위 추천
- 회차 카운트 추정
- 날짜 표현
4. 참고 자료 블록이 비어있거나 부족하면 그 사실을 짧게 명시 ("자료 근거가 부족합니다").
자료가 없어도 세션 기록 자체로 흔들린 영역 요약은 작성한다.
5. confidence 는 출력 근거 강도에 따라 high/medium/low 중 하나:
- high: 자료 + 다른 ai_explanation 으로 패턴이 명확
- medium: 일부 근거 + 일반 지식 보강
- low: 자료 부족, 세션 기록만 기반
6. 추천은 "관련 해설을 다시 보세요" / "같은 영역 문제를 더 풀어보세요" 같은 일반 권장만.
구체 행동 지시 (몇 분 / 몇 문항 / 며칠 후) 는 금지.
【출력 형식 — raw JSON 한 객체. 메타 설명 / 코드 펜스 / 인사 없이.】
{{"summary_md": "<200~400자 마크다운>", "confidence": "<high|medium|low>"}}
@@ -0,0 +1,126 @@
"""Phase 4-B v1 세션 분석 enqueue 헬퍼 — finalize/fallback/manual 공유.
3 가지 진입점이 다른 정책으로 동작:
- auto (finalize/fallback): wrong/unsure < 5 도 enqueue 허용 (worker 가 skipped 처리).
GET fallback 은 best-effort + idempotent + non-blocking.
- manual (regenerate endpoint): wrong/unsure < 5 또는 active job 존재 시 즉시 차단.
사용자 즉시 안내 (UX).
is_stale 정책:
- active job 있음: is_stale 변경 X
- active 없음 + 기존 analysis 있음: is_stale=TRUE 박기 + 새 job
- active 없음 + 기존 analysis 없음: is_stale 처리 X + 새 job
"""
from __future__ import annotations
import logging
from typing import Literal
from sqlalchemy import func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from models.study_question import StudyQuestionAttempt
from models.study_quiz_session import StudyQuizSession
from models.study_quiz_session_analysis import StudyQuizSessionAnalysis
from models.study_quiz_session_job import StudyQuizSessionJob, enqueue_session_analysis_job
logger = logging.getLogger(__name__)
MIN_ATTEMPTS_FOR_ANALYSIS = 5
async def _count_wrong_unsure(session: AsyncSession, study_quiz_session_id: int) -> int:
row = await session.execute(
select(func.count())
.select_from(StudyQuestionAttempt)
.where(
StudyQuestionAttempt.quiz_session_id == study_quiz_session_id,
StudyQuestionAttempt.outcome.in_(("wrong", "unsure")),
)
)
return int(row.scalar() or 0)
async def _has_active_job(session: AsyncSession, study_quiz_session_id: int) -> bool:
row = await session.execute(
select(func.count())
.select_from(StudyQuizSessionJob)
.where(
StudyQuizSessionJob.study_quiz_session_id == study_quiz_session_id,
StudyQuizSessionJob.status.in_(("pending", "processing")),
)
)
return int(row.scalar() or 0) > 0
async def _has_existing_analysis(session: AsyncSession, study_quiz_session_id: int) -> bool:
row = await session.execute(
select(StudyQuizSessionAnalysis.study_quiz_session_id).where(
StudyQuizSessionAnalysis.study_quiz_session_id == study_quiz_session_id
)
)
return row.scalar_one_or_none() is not None
async def enqueue_session_analysis_auto(
session: AsyncSession,
*,
user_id: int,
study_quiz_session_id: int,
) -> dict:
"""finalize_session 끝 + GET fallback 공통 자동 트리거.
wrong/unsure 카운트 무관 enqueue (worker 가 < 5 면 insufficient_attempts skipped).
active 행은 partial unique idx 가 차단. is_stale 은 안 건드림 (자동 트리거라 UX 무관).
"""
enqueued = await enqueue_session_analysis_job(
session,
study_quiz_session_id=study_quiz_session_id,
user_id=user_id,
)
return {"enqueued": enqueued}
async def request_session_analysis_regenerate(
session: AsyncSession,
*,
user_id: int,
study_quiz_session_id: int,
) -> dict:
"""사용자 [재생성] 버튼 — manual endpoint 가 호출.
Returns:
{'enqueued': bool, 'reason': Literal['insufficient_attempts','already_active'] | None}
"""
qs = await session.get(StudyQuizSession, study_quiz_session_id)
if qs is None or qs.user_id != user_id:
return {"enqueued": False, "reason": "not_found"}
if qs.status != "done":
return {"enqueued": False, "reason": "not_done"}
# 1. wrong/unsure < 5 즉시 차단
cnt = await _count_wrong_unsure(session, study_quiz_session_id)
if cnt < MIN_ATTEMPTS_FOR_ANALYSIS:
return {"enqueued": False, "reason": "insufficient_attempts"}
# 2. active job 있으면 즉시 차단 (is_stale 변경 X)
if await _has_active_job(session, study_quiz_session_id):
return {"enqueued": False, "reason": "already_active"}
# 3. 기존 analysis 있으면 is_stale=TRUE
if await _has_existing_analysis(session, study_quiz_session_id):
await session.execute(
update(StudyQuizSessionAnalysis)
.where(StudyQuizSessionAnalysis.study_quiz_session_id == study_quiz_session_id)
.values(is_stale=True)
)
# 4. 새 job
enqueued = await enqueue_session_analysis_job(
session,
study_quiz_session_id=study_quiz_session_id,
user_id=user_id,
payload={"trigger": "manual"},
)
return {"enqueued": enqueued, "reason": None if enqueued else "race_lost"}
+19
View File
@@ -237,6 +237,25 @@ async def finalize_session(
quiz_session_id, type(e).__name__, e,
)
# 7. Phase 4-B v1: 세션 단위 분석 enqueue (best-effort).
# wrong/unsure < 5 면 worker 가 insufficient_attempts 로 skipped — finalize 는 무관.
try:
from services.study.session_analysis_enqueue import enqueue_session_analysis_auto
res = await enqueue_session_analysis_auto(
session, user_id=user_id, study_quiz_session_id=quiz_session_id,
)
import logging
logging.getLogger(__name__).info(
"phase4b_finalize_enqueue session=%s enqueued=%s",
quiz_session_id, res["enqueued"],
)
except Exception as e:
import logging
logging.getLogger(__name__).warning(
"phase4b_finalize_enqueue_failed session=%s: %s: %s",
quiz_session_id, type(e).__name__, e,
)
return SessionSummary(
correct=correct,
wrong=wrong,
@@ -0,0 +1,39 @@
"""Phase 4-B v1 환각 가드 단일 source — worker + 단위 테스트가 같은 정규식을 import.
GUARD_PATTERN: AI 응답 본문에서 차단해야 패턴.
- % 기호 (정답률 N% 추정 위험)
- "최근 N일" / "지난 N일" (추세 표현)
- "X~Y 문항|개|문제" (범위 추천 "5~10문항")
- "N회차" (회차 카운트 추정)
- 날짜 표현 (YYYY-MM-DD / N월 N일)
prompt 박힌 정량 정수 (`오답 1`, `모르겠음 83`) 통과.
"""
from __future__ import annotations
import re
GUARD_PATTERN = re.compile(
r"(\d+\s*%" # 정답률 16%, 50% 등
r"|최근\s*\d+\s*일" # "최근 5일"
r"|지난\s*\d+\s*일" # "지난 7일"
r"|\d+\s*~\s*\d+\s*(문항|개|문제)" # "5~10문항"
r"|\d+\s*회차" # "7회차"
r"|\d{4}-\d{2}-\d{2}" # "2026-05-02"
r"|\d+\s*월\s*\d+\s*일" # "5월 2일"
r")"
)
_VALID_CONFIDENCE = {"high", "medium", "low"}
def normalize_confidence(value: object) -> str:
"""모델이 'unknown'/'mid'/'maybe' 같은 비표준 값 박는 케이스 방어.
표준 (high/medium/low) 값은 'low' 보정 (보수적).
"""
if not isinstance(value, str):
return "low"
v = value.strip().lower()
return v if v in _VALID_CONFIDENCE else "low"
+45
View File
@@ -0,0 +1,45 @@
"""Phase 4-B v1 세션 단위 RAG — documents 만 (같은 토픽 다른 문제는 prompt 에 직접 박힘).
PR-3 explanation_rag _gather_document_evidence 재사용. query wrong/unsure
question_text concat ( 80 × 최대 10).
"""
from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
from models.study_question import StudyQuestion
from services.study.explanation_rag import (
EvidenceItem,
_gather_document_evidence,
_truncate,
)
async def gather_session_summary_context(
session: AsyncSession,
*,
user_id: int,
study_topic_id: int,
wrong_unsure_questions: list[StudyQuestion],
) -> list[EvidenceItem]:
"""세션 분석용 documents RAG. 빈 리스트 반환은 정상 (4-B 정책상 호출 진행).
query: wrong/unsure question_text 80자를 " | " join (최대 10).
"""
if not wrong_unsure_questions:
return []
query = " | ".join(
_truncate(q.question_text or "", 80)
for q in wrong_unsure_questions[:10]
)
if not query.strip():
return []
client = AIClient()
try:
return await _gather_document_evidence(
session, user_id, study_topic_id, query, client
)
finally:
await client.close()
@@ -0,0 +1,318 @@
"""Phase 4-B v1 세션 분석 worker — 자유 마크다운 요약 1건 생성.
Plan: ~/.claude/plans/nifty-sparking-spindle.md
study_quiz_session_jobs (kind 단일 'analysis') row 1 처리:
1. 세션의 wrong/unsure attempts + 연결 question 메타 fetch
2. wrong/unsure < 5 insufficient_attempts skip
3. RAG (documents , 없어도 호출 진행 4-A 다른 정책)
4. envelope JSON 호출
5. 가드 GUARD_PATTERN 정규식 + 800 hard cap + confidence normalize
6. study_quiz_session_analysis UPSERT (PK = session_id)
terminal status (completed/failed/skipped) 모두 completed_at 기록.
재시도는 4-A 동일 guard_fail/insufficient/evidence_missing 자동 X,
llm_timeout/parse_fail/unknown attempts < max_attempts pending.
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response
from models.study_question import StudyQuestion, StudyQuestionAttempt
from models.study_quiz_session import StudyQuizSession
from models.study_quiz_session_analysis import StudyQuizSessionAnalysis
from models.study_quiz_session_job import StudyQuizSessionJob
from services.search.llm_gate import get_mlx_gate
from services.study.session_summary_guard import GUARD_PATTERN, normalize_confidence
from services.study.session_summary_rag import gather_session_summary_context
logger = logging.getLogger(__name__)
# 4-A 와 동일 안전 마진 (26B 평균 ~10s, gate 직렬화 고려)
LLM_TIMEOUT_S = 30.0
# wrong/unsure 5 미만은 분석 의미 X — insufficient_attempts skip
MIN_ATTEMPTS_FOR_ANALYSIS = 5
# 응답 800자 hard cap (말줄임표 포함 ≤ 801)
SUMMARY_MAX_CHARS = 800
_PROMPT_FILE = "study_session_summary_envelope.txt"
_prompt_template_cache: str | None = None
def _load_envelope_prompt() -> str:
global _prompt_template_cache
if _prompt_template_cache is None:
prompts_dir = Path(__file__).resolve().parent.parent / "prompts"
_prompt_template_cache = (prompts_dir / _PROMPT_FILE).read_text(encoding="utf-8")
return _prompt_template_cache
def _format_subject_distribution(dist: dict | None) -> str:
if not dist:
return "(분포 없음)"
return ", ".join(f"{k}: {v}" for k, v in dist.items() if v)
def _format_wrong_unsure_block(attempts_data: list[dict]) -> str:
"""각 attempt 를 prompt 1 항목 line 으로 변환."""
lines: list[str] = []
for a in attempts_data:
q = a["question"]
outcome = a["outcome"]
head = f"- Q{q.id} [{q.subject or '(미분류)'} / {q.scope or '(scope 없음)'}] " \
f"outcome={outcome} 정답 {q.correct_choice}" \
f"(사용자 선택 {a['selected_choice'] if a['selected_choice'] is not None else '-'}번)"
lines.append(head)
qt = (q.question_text or "")[:80].replace("\n", " ")
if qt:
lines.append(f" · {qt}")
if a.get("ai_explanation"):
ae = a["ai_explanation"][:200].replace("\n", " ")
lines.append(f" · 풀이: {ae}")
return "\n".join(lines) if lines else "(없음)"
def _format_evidence_block(items) -> str:
if not items:
return "(없음)"
return "\n".join(f"- [{it.title}] {it.snippet}" for it in items)
def _render_session_summary_prompt(qs: StudyQuizSession, attempts_data: list[dict], ctx_docs) -> str:
total_n = len(qs.question_ids or [])
return (
_load_envelope_prompt()
.replace("{total}", str(total_n))
.replace("{correct}", str(qs.correct_count))
.replace("{wrong}", str(qs.wrong_count))
.replace("{unsure}", str(qs.unsure_count))
.replace("{newly_correct}", str(qs.newly_correct_count))
.replace("{relapsed}", str(qs.relapsed_count))
.replace("{recovered}", str(qs.recovered_count))
.replace("{chronic_remaining}", str(qs.chronic_remaining_count))
.replace("{subject_distribution_block}", _format_subject_distribution(qs.subject_distribution))
.replace("{wrong_unsure_block}", _format_wrong_unsure_block(attempts_data))
.replace("{documents_evidence_block}", _format_evidence_block(ctx_docs))
)
async def _fetch_wrong_unsure_attempts(session: AsyncSession, study_quiz_session_id: int) -> list[dict]:
"""이 세션의 wrong/unsure attempts + 연결 question + (있으면) ready ai_explanation."""
rows = (
await session.execute(
select(StudyQuestionAttempt, StudyQuestion)
.join(StudyQuestion, StudyQuestion.id == StudyQuestionAttempt.study_question_id)
.where(
StudyQuestionAttempt.quiz_session_id == study_quiz_session_id,
StudyQuestionAttempt.outcome.in_(("wrong", "unsure")),
StudyQuestion.deleted_at.is_(None),
)
.order_by(StudyQuestionAttempt.answered_at.asc())
)
).all()
out: list[dict] = []
for attempt, q in rows:
ae = q.ai_explanation if q.ai_explanation_status == "ready" else None
out.append({
"question": q,
"outcome": attempt.outcome,
"selected_choice": attempt.selected_choice,
"ai_explanation": ae,
})
return out
async def _upsert_analysis(
session: AsyncSession,
*,
study_quiz_session_id: int,
user_id: int,
summary_md: str,
confidence: str,
model_name: str,
now: datetime,
) -> None:
"""ON CONFLICT DO UPDATE — session_id PK. 재생성 시 기존 row 갱신 + is_stale=False."""
stmt = pg_insert(StudyQuizSessionAnalysis).values(
study_quiz_session_id=study_quiz_session_id,
user_id=user_id,
summary_md=summary_md,
confidence=confidence,
model_name=model_name,
generated_at=now,
is_stale=False,
)
stmt = stmt.on_conflict_do_update(
index_elements=["study_quiz_session_id"],
set_=dict(
summary_md=stmt.excluded.summary_md,
confidence=stmt.excluded.confidence,
model_name=stmt.excluded.model_name,
generated_at=stmt.excluded.generated_at,
is_stale=False,
),
)
await session.execute(stmt)
async def run_session_analysis_job(session: AsyncSession, job: StudyQuizSessionJob) -> None:
"""Phase 4-B v1 worker. caller 가 commit. job.status 호출 전 'pending' 가정."""
now = lambda: datetime.now(timezone.utc) # noqa: E731
job.attempts += 1
job.status = "processing"
job.started_at = now()
await session.flush()
try:
qs = await session.get(StudyQuizSession, job.study_quiz_session_id)
if qs is None:
job.error_code = "evidence_missing"
job.error_message = "session not found"
job.status = "skipped"
job.completed_at = now()
return
if qs.status != "done":
job.error_code = "evidence_missing"
job.error_message = f"session status={qs.status} (not done)"
job.status = "skipped"
job.completed_at = now()
return
# 1. wrong/unsure attempts fetch
attempts_data = await _fetch_wrong_unsure_attempts(session, qs.id)
if len(attempts_data) < MIN_ATTEMPTS_FOR_ANALYSIS:
job.error_code = "insufficient_attempts"
job.error_message = f"wrong/unsure count={len(attempts_data)} < {MIN_ATTEMPTS_FOR_ANALYSIS}"
job.status = "skipped"
job.completed_at = now()
return
# 2. evidence_missing — question_text/outcome/subject 자체가 부족한 예외만
valid_attempts = [
a for a in attempts_data
if a["question"].question_text and a["outcome"]
]
if len(valid_attempts) < MIN_ATTEMPTS_FOR_ANALYSIS:
job.error_code = "evidence_missing"
job.error_message = "wrong/unsure attempts lack question_text or outcome"
job.status = "skipped"
job.completed_at = now()
return
# 3. RAG (documents 만, 없어도 호출 진행)
questions = [a["question"] for a in valid_attempts]
ctx_docs = await gather_session_summary_context(
session,
user_id=qs.user_id,
study_topic_id=qs.study_topic_id,
wrong_unsure_questions=questions,
)
# 4. prompt 조립 + MLX
prompt = _render_session_summary_prompt(qs, valid_attempts, ctx_docs)
ai_client = AIClient()
try:
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_S):
raw_text = await ai_client.call_primary(prompt)
primary_name = (
ai_client.ai.primary.model
if hasattr(ai_client.ai, "primary") and hasattr(ai_client.ai.primary, "model")
else "primary"
)
finally:
await ai_client.close()
if not raw_text or not raw_text.strip():
job.error_code = "llm_timeout"
job.error_message = "empty response from primary"
return
# 5. envelope 파싱
envelope = parse_json_response(raw_text)
if envelope is None or not isinstance(envelope, dict):
job.error_code = "parse_fail"
job.error_message = "envelope JSON parse failed"
return
summary_md = (envelope.get("summary_md") or "").strip()
confidence_raw = envelope.get("confidence")
if not summary_md:
job.error_code = "parse_fail"
job.error_message = "empty summary_md"
return
# 6. 환각 가드 — services/study/session_summary_guard 모듈 import.
if GUARD_PATTERN.search(summary_md):
job.error_code = "guard_fail"
job.error_message = "numeric/range pattern detected in summary_md"
job.status = "failed"
job.completed_at = now()
return
# 7. 길이 hard cap
if len(summary_md) > SUMMARY_MAX_CHARS:
summary_md = summary_md[:SUMMARY_MAX_CHARS].rstrip() + ""
# 8. confidence normalize
confidence = normalize_confidence(confidence_raw)
# 9. UPSERT
ts = now()
await _upsert_analysis(
session,
study_quiz_session_id=qs.id,
user_id=qs.user_id,
summary_md=summary_md,
confidence=confidence,
model_name=f"mlx:{primary_name}",
now=ts,
)
job.status = "completed"
job.completed_at = ts
return
except (asyncio.TimeoutError, httpx.HTTPError) as e:
job.error_code = "llm_timeout"
job.error_message = f"{type(e).__name__}: {e}"
logger.warning(
"session_analysis_timeout job_id=%s sid=%s: %s",
job.id, job.study_quiz_session_id, e,
)
except (json.JSONDecodeError, ValueError) as e:
job.error_code = "parse_fail"
job.error_message = f"{type(e).__name__}: {e}"
logger.warning(
"session_analysis_parse_fail job_id=%s sid=%s: %s",
job.id, job.study_quiz_session_id, e,
)
except Exception as e:
# 예상 못한 예외 — 명시 'unknown' 박아 finally 가 None 을 retryable 로 안 보게.
job.error_code = "unknown"
job.error_message = f"{type(e).__name__}: {e}"
logger.exception(
"session_analysis_unknown_fail job_id=%s sid=%s",
job.id, job.study_quiz_session_id,
)
finally:
# 재시도 분기 — guard_fail/evidence_missing/insufficient_attempts 는 위 try 에서 종결.
# 여기 도달 케이스는 llm_timeout / parse_fail / unknown.
if job.status == "processing":
retryable = job.error_code in ("llm_timeout", "parse_fail", "unknown")
if retryable and job.attempts < job.max_attempts:
job.status = "pending"
else:
job.status = "failed"
job.completed_at = now()
@@ -0,0 +1,78 @@
"""Phase 4-B v1 study_quiz_session_jobs consumer — APScheduler 1분 간격.
세션 단위 전용 study_question_jobs (4-A) 분리. 운영 SQL 명확성.
BATCH_SIZE=1, MLX gate Semaphore(1) 4-A 공유 4-A 처리 중이면 직렬 대기.
STALE_MINUTES=10 자체 복구.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.database import async_session
from core.utils import setup_logger
from models.study_quiz_session_job import StudyQuizSessionJob
from workers.study_session_analysis_worker import run_session_analysis_job
logger = setup_logger("study_session_queue_consumer")
BATCH_SIZE = 1
STALE_MINUTES = 10
async def reset_stale_session_jobs() -> None:
"""processing 으로 STALE_MINUTES 이상 방치된 job 을 pending 으로 복구."""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_MINUTES)
try:
async with async_session() as session:
stmt = (
update(StudyQuizSessionJob)
.where(
StudyQuizSessionJob.status == "processing",
StudyQuizSessionJob.started_at.is_not(None),
StudyQuizSessionJob.started_at < cutoff,
)
.values(status="pending", started_at=None)
)
result = await session.execute(stmt)
await session.commit()
n = result.rowcount or 0
if n > 0:
logger.warning("study_session_jobs_stale_reset count=%s", n)
except SQLAlchemyError as e:
logger.exception("study_session_jobs_stale_reset_failed: %s", e)
async def consume_study_session_queue() -> None:
"""APScheduler 진입점. pending session_jobs 를 BATCH_SIZE 만큼 처리."""
await reset_stale_session_jobs()
async with async_session() as session:
rows = (
await session.execute(
select(StudyQuizSessionJob)
.where(StudyQuizSessionJob.status == "pending")
.order_by(StudyQuizSessionJob.id.asc())
.limit(BATCH_SIZE)
)
).scalars().all()
for job_row in rows:
async with async_session() as s:
try:
job = await s.get(StudyQuizSessionJob, job_row.id)
if job is None or job.status != "pending":
continue
await run_session_analysis_job(s, job)
await s.commit()
logger.info(
"session_analysis_processed id=%s sid=%s status=%s error_code=%s attempts=%s",
job.id, job.study_quiz_session_id, job.status, job.error_code,
job.attempts,
)
except Exception as e:
await s.rollback()
logger.exception("session_analysis_outer_failed job_id=%s: %s", job_row.id, e)
@@ -99,6 +99,38 @@
let summary = $derived(detail?.summary);
let unreviewedCount = $derived(summary?.unreviewed_wrong_unsure_count ?? 0);
// Phase 4-B v1: 세션 단위 분석 카드 상태
let aiAnalysis = $derived(detail?.ai_session_analysis ?? null);
let regeneratingSummary = $state(false);
async function regenerateSummary() {
if (regeneratingSummary) return;
regeneratingSummary = true;
try {
const res = await api(`/study-topics/${topicId}/quiz-sessions/${sessionId}/regenerate-summary`, {
method: 'POST',
});
if (!res.enqueued) {
const msg = ({
insufficient_attempts: '오답·모르겠음이 5건 미만이라 분석을 생성하지 않습니다',
already_active: '이미 분석을 생성하고 있습니다. 잠시 후 다시 확인하세요',
not_done: '세션이 완료되지 않았습니다',
not_found: '세션을 찾을 수 없습니다',
race_lost: '다른 호출이 먼저 enqueue 했습니다. 잠시 후 다시 확인하세요',
}[res.reason] ?? '재생성을 시작할 수 없습니다');
addToast('warning', msg);
} else {
addToast('success', '분석을 다시 생성합니다 (1분 주기 처리)');
// detail 의 ai_session_analysis 갱신을 위해 가벼운 재로드
await load();
}
} catch (err) {
addToast('error', err?.detail || '재생성 호출 실패');
} finally {
regeneratingSummary = false;
}
}
// Phase 4-A: AI 풀이 캐시 진척 — wrong/unsure attempts 와 question.ai_explanation_status 결합.
let aiExplProgress = $derived.by(() => {
if (!detail) return null;
@@ -314,6 +346,43 @@
</div>
{/if}
<!-- Phase 4-B v1: 세션 단위 자유 마크다운 분석 -->
{#if aiAnalysis}
<div class="rounded border border-accent/30 bg-accent/5 p-3 text-xs text-text flex flex-col gap-2">
<div class="flex items-center gap-2 flex-wrap">
<Sparkles size={12} class="text-accent" />
<span class="font-medium">세션 요약</span>
{#if aiAnalysis.confidence === 'low'}
<span class="text-[10px] text-warning bg-warning/10 rounded px-1.5 py-0.5">신뢰도 낮음</span>
{/if}
{#if aiAnalysis.is_stale}
<span class="text-[10px] text-dim">재생성 중</span>
{/if}
<button
type="button"
class="ml-auto text-[10px] text-dim hover:text-text disabled:opacity-50"
disabled={regeneratingSummary}
onclick={regenerateSummary}
>
{regeneratingSummary ? '...' : '재생성'}
</button>
</div>
{#if aiAnalysis.summary_md}
<div class="markdown-body math-area leading-relaxed">{@html renderMathMarkdown(aiAnalysis.summary_md)}</div>
{:else if aiAnalysis.job_status === 'pending' || aiAnalysis.job_status === 'processing'}
<div class="text-dim">AI 가 세션 분석 중입니다 (1분 주기 처리)</div>
{:else if aiAnalysis.job_error_code === 'insufficient_attempts'}
<div class="text-dim">오답·모르겠음이 5건 미만이라 분석을 생성하지 않습니다.</div>
{:else if aiAnalysis.job_error_code === 'evidence_missing'}
<div class="text-dim">관련 자료가 부족해 분석을 건너뛰었습니다.</div>
{:else if aiAnalysis.job_error_code === 'guard_fail'}
<div class="text-dim">분석 결과가 환각 검증에서 차단됐습니다. <button type="button" class="text-accent hover:underline" onclick={regenerateSummary}>재생성</button></div>
{:else}
<div class="text-dim">분석 대기 중입니다.</div>
{/if}
</div>
{/if}
<!-- Phase 2-B: 바로 할 일 (지금 시점 progress 기반 동적 카운트) — 클릭 시 복습함 해당 탭 -->
{#if (s.pending_review_count ?? 0) + (s.chronic_count ?? 0) + (s.regressed_count ?? 0) > 0}
<div class="rounded border border-warning/30 bg-warning/5 p-3 text-xs text-text flex flex-col gap-1.5">
@@ -0,0 +1,22 @@
-- 233_study_quiz_session_jobs.sql
-- Phase 4-B v1: 세션 단위 분석 (자유 마크다운) 전용 큐.
-- study_question_jobs 와 분리 — FK 단일 의미 (study_quiz_session_id NOT NULL)
-- + 운영 SQL 명확성 + 4-A/4-B 가드/재시도 정책 차이.
--
-- terminal status (completed/failed/skipped) 는 completed_at 항상 기록.
-- 재시도는 기존 row 를 pending 으로 되살리지 않고 새 row 생성 — 이력 누적.
CREATE TABLE study_quiz_session_jobs (
id BIGSERIAL PRIMARY KEY,
study_quiz_session_id BIGINT NOT NULL REFERENCES study_quiz_sessions(id) ON DELETE CASCADE,
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
attempts SMALLINT NOT NULL DEFAULT 0,
max_attempts SMALLINT NOT NULL DEFAULT 2,
error_code VARCHAR(40),
error_message TEXT,
payload JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ
);
@@ -0,0 +1,8 @@
-- 234_study_session_jobs_active_uq.sql
-- (study_quiz_session_id) 활성 행 중복 차단.
-- v1 은 단일 작업 종류 ('analysis') 라 kind 컬럼 없이 session_id 만 키.
-- terminal (completed/failed/skipped) 는 누적 이력.
CREATE UNIQUE INDEX uq_study_session_jobs_active
ON study_quiz_session_jobs (study_quiz_session_id)
WHERE status IN ('pending', 'processing');
@@ -0,0 +1,14 @@
-- 235_study_quiz_session_analysis.sql
-- Phase 4-B v1: 세션 단위 분석 결과 캐시. session_id PK — 한 세션 = 한 분석.
-- worker 가 ON CONFLICT DO UPDATE 로 UPSERT. job 이력은 별도 누적, 결과 캐시는 1 row.
-- is_stale=TRUE 는 사용자 [재생성] 클릭 후 worker 처리 끝까지만. 워커가 마지막에 FALSE 로 다시 박음.
CREATE TABLE study_quiz_session_analysis (
study_quiz_session_id BIGINT PRIMARY KEY REFERENCES study_quiz_sessions(id) ON DELETE CASCADE,
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
summary_md TEXT NOT NULL,
confidence VARCHAR(10),
model_name VARCHAR(120),
generated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
is_stale BOOLEAN NOT NULL DEFAULT FALSE
);
@@ -0,0 +1,72 @@
"""Phase 4-B v1 환각 가드 정규식 단위 테스트 — ship gate.
worker 같은 모듈을 import 검증. plan 검증 D 케이스 (허용 5 + 차단 7).
"""
from __future__ import annotations
import sys
from pathlib import Path
# 프로젝트 루트의 app/ 를 import path 에 추가 (Document Server 패턴)
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT / "app"))
from services.study.session_summary_guard import GUARD_PATTERN, normalize_confidence # noqa: E402
# ─── GUARD_PATTERN 허용 케이스 (search() == None 이어야 함) ───
ALLOWED_CASES = [
"오답 5건이 있었습니다.",
"모르겠음 83건이 남았습니다.",
"배관 영역에서 흔들린 것으로 보입니다.",
"같은 영역 문제를 더 풀어보세요.",
"정답을 다시 한 번 확인해보세요.",
]
# ─── GUARD_PATTERN 차단 케이스 (search() != None 이어야 함) ───
BLOCKED_CASES = [
"정답률 16%였습니다.",
"최근 5일 동안 약했습니다.",
"지난 7일간 비슷한 실수가 반복됐습니다.",
"5~10문항을 더 풀어보세요.",
"2026-05-02 기준으로 보면...",
"5월 2일 이후 흐름은...",
"지난 7회차에서 반복됐습니다.",
]
def test_guard_pattern_allows_normal_summary():
for case in ALLOWED_CASES:
match = GUARD_PATTERN.search(case)
assert match is None, f"false positive: {case!r} matched {match!r}"
def test_guard_pattern_blocks_numeric_hallucination():
for case in BLOCKED_CASES:
match = GUARD_PATTERN.search(case)
assert match is not None, f"false negative: {case!r} not matched"
def test_normalize_confidence_standard_values():
for v in ("high", "medium", "low"):
assert normalize_confidence(v) == v
assert normalize_confidence(v.upper()) == v
assert normalize_confidence(f" {v} ") == v
def test_normalize_confidence_nonstandard_values():
for v in ("unknown", "mid", "maybe", "", "true", None, 123, [], {}):
assert normalize_confidence(v) == "low"
if __name__ == "__main__":
# 직접 실행 시 모든 케이스 빠른 점검
test_guard_pattern_allows_normal_summary()
test_guard_pattern_blocks_numeric_hallucination()
test_normalize_confidence_standard_values()
test_normalize_confidence_nonstandard_values()
print("OK")