diff --git a/app/api/ingest_study.py b/app/api/ingest_study.py index 994399b..b7822ea 100644 --- a/app/api/ingest_study.py +++ b/app/api/ingest_study.py @@ -23,6 +23,7 @@ from datetime import datetime, timezone from fastapi import APIRouter, Depends, Header, HTTPException from pydantic import BaseModel from sqlalchemy import select +from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from core.config import settings @@ -66,6 +67,22 @@ class IngestBody(BaseModel): attempts: list[IngestAttempt] +def _already_ingested(rows) -> dict: + """이미 적재된 세션들의 캐시 요약(멱등 응답). 최초 멱등체크 + 동시경합 흡수 양쪽에서 사용.""" + return { + "status": "already_ingested", + "sessions": [ + { + "topic_id": s.study_topic_id, + "correct": s.correct_count, + "wrong": s.wrong_count, + "unsure": s.unsure_count, + } + for s in rows + ], + } + + def _parse_answered_at(s: str | None, now: datetime) -> datetime: if not s: return now @@ -98,18 +115,7 @@ async def ingest_attempts( ) ).scalars().all() if existing: - return { - "status": "already_ingested", - "sessions": [ - { - "topic_id": s.study_topic_id, - "correct": s.correct_count, - "wrong": s.wrong_count, - "unsure": s.unsure_count, - } - for s in existing - ], - } + return _already_ingested(existing) # pub_id → source_id(내부 질문 id) 해소. deleted tombstone 제외. pub_ids = list({a.question_pub_id for a in body.attempts}) @@ -156,73 +162,92 @@ async def ingest_attempts( if not by_topic: raise HTTPException(status_code=404, detail="해소된 attempt 없음") - summaries = [] - for topic_id, items in by_topic.items(): - qids = [q.id for (_, q) in items] - qs = StudyQuizSession( - user_id=user_id, - study_topic_id=topic_id, - question_ids=qids, - subject_distribution={}, - status="done", - cursor=len(qids), - source="viewer", - client_session_uuid=body.client_session_uuid, - finished_at=now, - created_at=now, - updated_at=now, - ) - session.add(qs) - await session.flush() # qs.id + try: + summaries = [] + for topic_id, items in by_topic.items(): + qids = [q.id for (_, q) in items] + qs = StudyQuizSession( + user_id=user_id, + study_topic_id=topic_id, + question_ids=qids, + subject_distribution={}, + status="done", + cursor=len(qids), + source="viewer", + client_session_uuid=body.client_session_uuid, + finished_at=now, + created_at=now, + updated_at=now, + ) + session.add(qs) + await session.flush() # qs.id - c = w = u = 0 - for a, q in items: - try: - sel, is_corr, outcome = derive_outcome(a.selected_choice, a.is_unsure, q.correct_choice) - except ValueError: - skipped.append(a.question_pub_id) # 선택 없고 unsure 아님 = 무효 → skip - continue - if outcome == "correct": - c += 1 - elif outcome == "wrong": - w += 1 - elif outcome == "unsure": - u += 1 - session.add( - StudyQuestionAttempt( - user_id=user_id, - study_question_id=q.id, - study_topic_id=topic_id, - selected_choice=sel, - correct_choice=q.correct_choice, - is_correct=is_corr, - outcome=outcome, - quiz_session_id=qs.id, - answered_at=_parse_answered_at(a.answered_at, now), + c = w = u = 0 + for a, q in items: + try: + sel, is_corr, outcome = derive_outcome(a.selected_choice, a.is_unsure, q.correct_choice) + except ValueError: + skipped.append(a.question_pub_id) # 선택 없고 unsure 아님 = 무효 → skip + continue + if outcome == "correct": + c += 1 + elif outcome == "wrong": + w += 1 + elif outcome == "unsure": + u += 1 + session.add( + StudyQuestionAttempt( + user_id=user_id, + study_question_id=q.id, + study_topic_id=topic_id, + selected_choice=sel, + correct_choice=q.correct_choice, + is_correct=is_corr, + outcome=outcome, + quiz_session_id=qs.id, + answered_at=_parse_answered_at(a.answered_at, now), + ) + ) + qs.correct_count, qs.wrong_count, qs.unsure_count = c, w, u + await session.flush() + + # finalize 무수정 재생(progress/SR/pattern + 4-A/4-B enqueue). 그 후 멱등 마커. + summary = await finalize_session( + session, user_id=user_id, study_topic_id=topic_id, quiz_session_id=qs.id + ) + qs.finalized_at = now + summaries.append( + { + "topic_id": topic_id, + "quiz_session_id": qs.id, + "correct": summary.correct, + "wrong": summary.wrong, + "unsure": summary.unsure, + "newly_correct": summary.newly_correct, + "relapsed": summary.relapsed, + "recovered": summary.recovered, + } + ) + + await session.commit() + except IntegrityError: + # 동시 같은 client_session_uuid 경합 — 상대가 먼저 commit → (client_session_uuid, + # study_topic_id) uq(mig376) 위반. 데이터는 안전(원자 1-tx 전체 롤백 → SR 이중 advance + # 없음). 승자 결과로 graceful 수렴(500 대신 already_ingested). uuid 경합이 아닌 진짜 + # 무결성 오류면 재조회가 비어 → re-raise 로 표면화. + await session.rollback() + winner = ( + await session.execute( + select(StudyQuizSession).where( + StudyQuizSession.client_session_uuid == body.client_session_uuid ) ) - qs.correct_count, qs.wrong_count, qs.unsure_count = c, w, u - await session.flush() + ).scalars().all() + if not winner: + raise + logger.info("study_ingest uuid=%s 동시경합 흡수 → already_ingested", body.client_session_uuid) + return _already_ingested(winner) - # finalize 무수정 재생(progress/SR/pattern + 4-A/4-B enqueue). 그 후 멱등 마커. - summary = await finalize_session( - session, user_id=user_id, study_topic_id=topic_id, quiz_session_id=qs.id - ) - qs.finalized_at = now - summaries.append( - { - "topic_id": topic_id, - "quiz_session_id": qs.id, - "correct": summary.correct, - "wrong": summary.wrong, - "unsure": summary.unsure, - "newly_correct": summary.newly_correct, - "relapsed": summary.relapsed, - "recovered": summary.recovered, - } - ) - - await session.commit() logger.info( "study_ingest uuid=%s user=%s sessions=%s skipped=%s", body.client_session_uuid, user_id, len(summaries), len(skipped),