Files
hyungi_document_server/app/api/ingest_study.py
T
hyungi d50be9f2e7 fix(publish): ingest_study 동시경합을 already_ingested 로 흡수 (500 회피)
같은 client_session_uuid 동시 POST 2건이 최초 멱등체크를 둘 다 통과 → 둘째가
(client_session_uuid, study_topic_id) uq(mig376) 위반으로 IntegrityError → 미처리 500.
데이터는 안전(원자 1-tx 롤백, SR 이중 advance 없음)이나 비우아한 500이 문제.

변이 구간(quiz_session insert ~ commit)을 try/except IntegrityError 로 감싸 승자 결과
재조회 후 already_ingested 반환. uuid 경합이 아닌 진짜 무결성 오류는 재조회 비어 re-raise.
멱등 응답 빌더 _already_ingested 헬퍼 추출(최초 체크 + 경합 흡수 공용).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 13:22:36 +09:00

256 lines
10 KiB
Python

"""뷰어 write-back ingest (study-to-viewer P2) — 뷰어 로컬 풀이 세션을 DS 로 흘려 finalize 재생.
흐름(plan study-to-viewer-slice1 P2, r2/r3 불변식):
뷰어 outbox → POST /ingest/study/attempts (Bearer VIEWER_SYNC_TOKEN, study_ingest_enabled gate)
→ pub_id→published.source_id→StudyQuestion 해소(부재 graceful skip) → principal=question.user_id
→ topic 별 그룹(뷰어 subject 퀴즈가 여러 DS topic 걸칠 수 있음) → topic 마다 DS quiz_session
(source='viewer', client_session_uuid) 생성 + attempt(derive_outcome=채점 단일 소스) + 세션 done
→ finalize_session **무수정 재생**(SR/pattern/progress + 4-A/4-B enqueue) → finalized_at 마커
→ 전부 1 트랜잭션(원자) 후 commit.
멱등(r2 P2-2): client_session_uuid 로 기존 세션 있으면 이미 적재된 것 → 캐시 요약 반환(재실행 0).
원자 1-tx 라 'uuid 존재 ⟺ finalize 완료' → at-least-once outbox 재전송에도 SR 이중 advance 없음.
user_id 리터럴 금지(r2): principal = 해소된 질문의 owner(단일, mixed 면 거부).
"""
from __future__ import annotations
import hmac
import logging
from collections import defaultdict
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, Header, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.database import async_session
from models.published import Published
from models.study_question import StudyQuestion, StudyQuestionAttempt
from models.study_quiz_session import StudyQuizSession
from services.study.outcome import derive_outcome
from services.study.publish_projection import KIND_QUESTION
from services.study.session_finalize import finalize_session
logger = logging.getLogger(__name__)
router = APIRouter()
def _verify_token(authorization: str | None = Header(default=None)) -> None:
"""뷰어↔DS 발행 채널 Bearer(read 와 동일 토큰, r3 단일토큰 수용). default-deny(미설정=503)."""
if not settings.viewer_sync_token:
raise HTTPException(status_code=503, detail="viewer_sync_token not configured")
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="missing Bearer token")
token = authorization[7:].strip()
if not hmac.compare_digest(token, settings.viewer_sync_token):
raise HTTPException(status_code=403, detail="invalid token")
async def _session() -> AsyncSession:
async with async_session() as s:
yield s
class IngestAttempt(BaseModel):
question_pub_id: str
selected_choice: int | None = None
is_unsure: bool = False
answered_at: str | None = None # 클라(오프라인) ISO 시각 — 미래 스큐 클램프, id 가 타이브레이커
class IngestBody(BaseModel):
client_session_uuid: str
attempts: list[IngestAttempt]
def _already_ingested(rows) -> dict:
"""이미 적재된 세션들의 캐시 요약(멱등 응답). 최초 멱등체크 + 동시경합 흡수 양쪽에서 사용."""
return {
"status": "already_ingested",
"sessions": [
{
"topic_id": s.study_topic_id,
"correct": s.correct_count,
"wrong": s.wrong_count,
"unsure": s.unsure_count,
}
for s in rows
],
}
def _parse_answered_at(s: str | None, now: datetime) -> datetime:
if not s:
return now
try:
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return min(dt, now) # 미래 스큐는 now 로 클램프(클라 시계 오염 방지)
except Exception:
return now
@router.post("/attempts")
async def ingest_attempts(
body: IngestBody,
_auth: None = Depends(_verify_token),
session: AsyncSession = Depends(_session),
):
if not settings.study_ingest_enabled:
raise HTTPException(status_code=503, detail="study_ingest not enabled")
if not body.client_session_uuid or not body.attempts:
raise HTTPException(status_code=400, detail="client_session_uuid 와 attempts 필요")
# 멱등: 이 uuid 로 이미 적재됐나(원자 1-tx 라 존재=완료). 있으면 캐시 요약 반환(재실행 0).
existing = (
await session.execute(
select(StudyQuizSession).where(
StudyQuizSession.client_session_uuid == body.client_session_uuid
)
)
).scalars().all()
if existing:
return _already_ingested(existing)
# pub_id → source_id(내부 질문 id) 해소. deleted tombstone 제외.
pub_ids = list({a.question_pub_id for a in body.attempts})
pub_rows = (
await session.execute(
select(Published.pub_id, Published.source_id).where(
Published.kind == KIND_QUESTION,
Published.pub_id.in_(pub_ids),
Published.deleted.is_(False),
)
)
).all()
src_by_pubid = {r.pub_id: r.source_id for r in pub_rows}
# 질문 fetch(미삭제). principal = owner(단일).
source_ids = list(set(src_by_pubid.values()))
q_rows = (
await session.execute(
select(StudyQuestion).where(
StudyQuestion.id.in_(source_ids), StudyQuestion.deleted_at.is_(None)
)
)
).scalars().all()
q_by_id = {q.id: q for q in q_rows}
owners = {q.user_id for q in q_by_id.values()}
if len(owners) > 1:
raise HTTPException(status_code=400, detail="여러 사용자 소유 질문 혼재 — 단일 principal 위반")
if not owners:
raise HTTPException(status_code=404, detail="해소 가능한 질문 없음")
user_id = owners.pop()
now = datetime.now(timezone.utc)
# topic 별 그룹(해소 실패 attempt 는 graceful skip). 같은 (uuid, topic) 1 세션.
by_topic: dict[int, list[tuple[IngestAttempt, StudyQuestion]]] = defaultdict(list)
skipped: list[str] = []
for a in body.attempts:
src = src_by_pubid.get(a.question_pub_id)
q = q_by_id.get(src) if src is not None else None
if q is None:
skipped.append(a.question_pub_id)
continue
by_topic[q.study_topic_id].append((a, q))
if not by_topic:
raise HTTPException(status_code=404, detail="해소된 attempt 없음")
try:
summaries = []
for topic_id, items in by_topic.items():
qids = [q.id for (_, q) in items]
qs = StudyQuizSession(
user_id=user_id,
study_topic_id=topic_id,
question_ids=qids,
subject_distribution={},
status="done",
cursor=len(qids),
source="viewer",
client_session_uuid=body.client_session_uuid,
finished_at=now,
created_at=now,
updated_at=now,
)
session.add(qs)
await session.flush() # qs.id
c = w = u = 0
for a, q in items:
try:
sel, is_corr, outcome = derive_outcome(a.selected_choice, a.is_unsure, q.correct_choice)
except ValueError:
skipped.append(a.question_pub_id) # 선택 없고 unsure 아님 = 무효 → skip
continue
if outcome == "correct":
c += 1
elif outcome == "wrong":
w += 1
elif outcome == "unsure":
u += 1
session.add(
StudyQuestionAttempt(
user_id=user_id,
study_question_id=q.id,
study_topic_id=topic_id,
selected_choice=sel,
correct_choice=q.correct_choice,
is_correct=is_corr,
outcome=outcome,
quiz_session_id=qs.id,
answered_at=_parse_answered_at(a.answered_at, now),
)
)
qs.correct_count, qs.wrong_count, qs.unsure_count = c, w, u
await session.flush()
# finalize 무수정 재생(progress/SR/pattern + 4-A/4-B enqueue). 그 후 멱등 마커.
summary = await finalize_session(
session, user_id=user_id, study_topic_id=topic_id, quiz_session_id=qs.id
)
qs.finalized_at = now
summaries.append(
{
"topic_id": topic_id,
"quiz_session_id": qs.id,
"correct": summary.correct,
"wrong": summary.wrong,
"unsure": summary.unsure,
"newly_correct": summary.newly_correct,
"relapsed": summary.relapsed,
"recovered": summary.recovered,
}
)
await session.commit()
except IntegrityError:
# 동시 같은 client_session_uuid 경합 — 상대가 먼저 commit → (client_session_uuid,
# study_topic_id) uq(mig376) 위반. 데이터는 안전(원자 1-tx 전체 롤백 → SR 이중 advance
# 없음). 승자 결과로 graceful 수렴(500 대신 already_ingested). uuid 경합이 아닌 진짜
# 무결성 오류면 재조회가 비어 → re-raise 로 표면화.
await session.rollback()
winner = (
await session.execute(
select(StudyQuizSession).where(
StudyQuizSession.client_session_uuid == body.client_session_uuid
)
)
).scalars().all()
if not winner:
raise
logger.info("study_ingest uuid=%s 동시경합 흡수 → already_ingested", body.client_session_uuid)
return _already_ingested(winner)
logger.info(
"study_ingest uuid=%s user=%s sessions=%s skipped=%s",
body.client_session_uuid, user_id, len(summaries), len(skipped),
)
return {"status": "ingested", "skipped": skipped, "sessions": summaries}