"""뷰어 write-back ingest (study-to-viewer P2) — 뷰어 로컬 풀이 세션을 DS 로 흘려 finalize 재생. 흐름(plan study-to-viewer-slice1 P2, r2/r3 불변식): 뷰어 outbox → POST /ingest/study/attempts (Bearer VIEWER_SYNC_TOKEN, study_ingest_enabled gate) → pub_id→published.source_id→StudyQuestion 해소(부재 graceful skip) → principal=question.user_id → topic 별 그룹(뷰어 subject 퀴즈가 여러 DS topic 걸칠 수 있음) → topic 마다 DS quiz_session (source='viewer', client_session_uuid) 생성 + attempt(derive_outcome=채점 단일 소스) + 세션 done → finalize_session **무수정 재생**(SR/pattern/progress + 4-A/4-B enqueue) → finalized_at 마커 → 전부 1 트랜잭션(원자) 후 commit. 멱등(r2 P2-2): client_session_uuid 로 기존 세션 있으면 이미 적재된 것 → 캐시 요약 반환(재실행 0). 원자 1-tx 라 'uuid 존재 ⟺ finalize 완료' → at-least-once outbox 재전송에도 SR 이중 advance 없음. user_id 리터럴 금지(r2): principal = 해소된 질문의 owner(단일, mixed 면 거부). """ from __future__ import annotations import hmac import logging from collections import defaultdict from datetime import datetime, timezone from fastapi import APIRouter, Depends, Header, HTTPException from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from core.config import settings from core.database import async_session from models.published import Published from models.study_question import StudyQuestion, StudyQuestionAttempt from models.study_quiz_session import StudyQuizSession from services.study.outcome import derive_outcome from services.study.publish_projection import KIND_QUESTION from services.study.session_finalize import finalize_session logger = logging.getLogger(__name__) router = APIRouter() def _verify_token(authorization: str | None = Header(default=None)) -> None: """뷰어↔DS 발행 채널 Bearer(read 와 동일 토큰, r3 단일토큰 수용). default-deny(미설정=503).""" if not settings.viewer_sync_token: raise HTTPException(status_code=503, detail="viewer_sync_token not configured") if not authorization or not authorization.lower().startswith("bearer "): raise HTTPException(status_code=401, detail="missing Bearer token") token = authorization[7:].strip() if not hmac.compare_digest(token, settings.viewer_sync_token): raise HTTPException(status_code=403, detail="invalid token") async def _session() -> AsyncSession: async with async_session() as s: yield s class IngestAttempt(BaseModel): question_pub_id: str selected_choice: int | None = None is_unsure: bool = False answered_at: str | None = None # 클라(오프라인) ISO 시각 — 미래 스큐 클램프, id 가 타이브레이커 class IngestBody(BaseModel): client_session_uuid: str attempts: list[IngestAttempt] def _already_ingested(rows) -> dict: """이미 적재된 세션들의 캐시 요약(멱등 응답). 최초 멱등체크 + 동시경합 흡수 양쪽에서 사용.""" return { "status": "already_ingested", "sessions": [ { "topic_id": s.study_topic_id, "correct": s.correct_count, "wrong": s.wrong_count, "unsure": s.unsure_count, } for s in rows ], } def _parse_answered_at(s: str | None, now: datetime) -> datetime: if not s: return now try: dt = datetime.fromisoformat(s.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return min(dt, now) # 미래 스큐는 now 로 클램프(클라 시계 오염 방지) except Exception: return now @router.post("/attempts") async def ingest_attempts( body: IngestBody, _auth: None = Depends(_verify_token), session: AsyncSession = Depends(_session), ): if not settings.study_ingest_enabled: raise HTTPException(status_code=503, detail="study_ingest not enabled") if not body.client_session_uuid or not body.attempts: raise HTTPException(status_code=400, detail="client_session_uuid 와 attempts 필요") # 멱등: 이 uuid 로 이미 적재됐나(원자 1-tx 라 존재=완료). 있으면 캐시 요약 반환(재실행 0). existing = ( await session.execute( select(StudyQuizSession).where( StudyQuizSession.client_session_uuid == body.client_session_uuid ) ) ).scalars().all() if existing: return _already_ingested(existing) # pub_id → source_id(내부 질문 id) 해소. deleted tombstone 제외. pub_ids = list({a.question_pub_id for a in body.attempts}) pub_rows = ( await session.execute( select(Published.pub_id, Published.source_id).where( Published.kind == KIND_QUESTION, Published.pub_id.in_(pub_ids), Published.deleted.is_(False), ) ) ).all() src_by_pubid = {r.pub_id: r.source_id for r in pub_rows} # 질문 fetch(미삭제). principal = owner(단일). source_ids = list(set(src_by_pubid.values())) q_rows = ( await session.execute( select(StudyQuestion).where( StudyQuestion.id.in_(source_ids), StudyQuestion.deleted_at.is_(None) ) ) ).scalars().all() q_by_id = {q.id: q for q in q_rows} owners = {q.user_id for q in q_by_id.values()} if len(owners) > 1: raise HTTPException(status_code=400, detail="여러 사용자 소유 질문 혼재 — 단일 principal 위반") if not owners: raise HTTPException(status_code=404, detail="해소 가능한 질문 없음") user_id = owners.pop() now = datetime.now(timezone.utc) # topic 별 그룹(해소 실패 attempt 는 graceful skip). 같은 (uuid, topic) 1 세션. by_topic: dict[int, list[tuple[IngestAttempt, StudyQuestion]]] = defaultdict(list) skipped: list[str] = [] for a in body.attempts: src = src_by_pubid.get(a.question_pub_id) q = q_by_id.get(src) if src is not None else None if q is None: skipped.append(a.question_pub_id) continue by_topic[q.study_topic_id].append((a, q)) if not by_topic: raise HTTPException(status_code=404, detail="해소된 attempt 없음") try: summaries = [] for topic_id, items in by_topic.items(): qids = [q.id for (_, q) in items] qs = StudyQuizSession( user_id=user_id, study_topic_id=topic_id, question_ids=qids, subject_distribution={}, status="done", cursor=len(qids), source="viewer", client_session_uuid=body.client_session_uuid, finished_at=now, created_at=now, updated_at=now, ) session.add(qs) await session.flush() # qs.id c = w = u = 0 for a, q in items: try: sel, is_corr, outcome = derive_outcome(a.selected_choice, a.is_unsure, q.correct_choice) except ValueError: skipped.append(a.question_pub_id) # 선택 없고 unsure 아님 = 무효 → skip continue if outcome == "correct": c += 1 elif outcome == "wrong": w += 1 elif outcome == "unsure": u += 1 session.add( StudyQuestionAttempt( user_id=user_id, study_question_id=q.id, study_topic_id=topic_id, selected_choice=sel, correct_choice=q.correct_choice, is_correct=is_corr, outcome=outcome, quiz_session_id=qs.id, answered_at=_parse_answered_at(a.answered_at, now), ) ) qs.correct_count, qs.wrong_count, qs.unsure_count = c, w, u await session.flush() # finalize 무수정 재생(progress/SR/pattern + 4-A/4-B enqueue). 그 후 멱등 마커. summary = await finalize_session( session, user_id=user_id, study_topic_id=topic_id, quiz_session_id=qs.id ) qs.finalized_at = now summaries.append( { "topic_id": topic_id, "quiz_session_id": qs.id, "correct": summary.correct, "wrong": summary.wrong, "unsure": summary.unsure, "newly_correct": summary.newly_correct, "relapsed": summary.relapsed, "recovered": summary.recovered, } ) await session.commit() except IntegrityError: # 동시 같은 client_session_uuid 경합 — 상대가 먼저 commit → (client_session_uuid, # study_topic_id) uq(mig376) 위반. 데이터는 안전(원자 1-tx 전체 롤백 → SR 이중 advance # 없음). 승자 결과로 graceful 수렴(500 대신 already_ingested). uuid 경합이 아닌 진짜 # 무결성 오류면 재조회가 비어 → re-raise 로 표면화. await session.rollback() winner = ( await session.execute( select(StudyQuizSession).where( StudyQuizSession.client_session_uuid == body.client_session_uuid ) ) ).scalars().all() if not winner: raise logger.info("study_ingest uuid=%s 동시경합 흡수 → already_ingested", body.client_session_uuid) return _already_ingested(winner) logger.info( "study_ingest uuid=%s user=%s sessions=%s skipped=%s", body.client_session_uuid, user_id, len(summaries), len(skipped), ) return {"status": "ingested", "skipped": skipped, "sessions": summaries}