From 8d3b648b5f5ffb9806e6c43b4c7b53c0806c001a Mon Sep 17 00:00:00 2001 From: hyungi Date: Thu, 25 Jun 2026 07:27:34 +0900 Subject: [PATCH] =?UTF-8?q?feat(ingest):=20P2=20DS=20write-back=20?= =?UTF-8?q?=E2=80=94=20/ingest/study/attempts=20=EB=A9=B1=EB=93=B1=20final?= =?UTF-8?q?ize=20=EC=9E=AC=EC=83=9D=20(study=E2=86=92viewer)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 뷰어 로컬 풀이 세션을 DS 로 흘려 학습엔진(SR/pattern/오답/4-A·4-B) 재생. 기본 inert(flag off). - 마이그 373~376: study_quiz_sessions 에 finalized_at(멱등 마커)·client_session_uuid·source + UNIQUE(client_session_uuid, study_topic_id) partial. - outcome.py derive_outcome = 채점 단일 소스(라이브 submit_attempt 도 이걸로 리팩터 → 정오 어휘 한 곳, ingest 는 raw 신호 selected+unsure 만 싣고 DS 산출 = '무수정 재생' 성립). - ingest_study.py: Bearer(VIEWER_SYNC_TOKEN)+study_ingest_enabled gate. pub_id→source_id→question 해소(graceful skip)·principal=question.user_id(mixed 거부)·topic 별 DS 세션(source=viewer·uuid) 생성+attempt+finalize_session 무수정 재생+finalized_at, 1-tx 원자. uuid 존재=already_ingested 캐시반환(멱등 → at-least-once 재전송에도 SR 이중 advance 0). - config study_ingest_enabled + compose 매핑 + main 등록. 검증: py_compile·ephemeral 마이그(373~376 라이브스키마 위 클린)·single-statement. 배포 후 합성 세션 멱등/무이중SR 실측 예정. 배포=inert(STUDY_INGEST_ENABLED 미설정=503). Co-Authored-By: Claude Opus 4.8 (1M context) --- app/api/ingest_study.py | 230 ++++++++++++++++++ app/api/study_questions.py | 19 +- app/core/config.py | 4 + app/main.py | 2 + app/models/study_quiz_session.py | 4 + app/services/study/outcome.py | 25 ++ docker-compose.yml | 2 + migrations/373_quiz_session_finalized_at.sql | 4 + migrations/374_quiz_session_client_uuid.sql | 3 + migrations/375_quiz_session_source.sql | 3 + .../376_quiz_session_client_uuid_uq.sql | 3 + 11 files changed, 287 insertions(+), 12 deletions(-) create mode 100644 app/api/ingest_study.py create mode 100644 app/services/study/outcome.py create mode 100644 migrations/373_quiz_session_finalized_at.sql create mode 100644 migrations/374_quiz_session_client_uuid.sql create mode 100644 migrations/375_quiz_session_source.sql create mode 100644 migrations/376_quiz_session_client_uuid_uq.sql diff --git a/app/api/ingest_study.py b/app/api/ingest_study.py new file mode 100644 index 0000000..994399b --- /dev/null +++ b/app/api/ingest_study.py @@ -0,0 +1,230 @@ +"""뷰어 write-back ingest (study-to-viewer P2) — 뷰어 로컬 풀이 세션을 DS 로 흘려 finalize 재생. + +흐름(plan study-to-viewer-slice1 P2, r2/r3 불변식): + 뷰어 outbox → POST /ingest/study/attempts (Bearer VIEWER_SYNC_TOKEN, study_ingest_enabled gate) + → pub_id→published.source_id→StudyQuestion 해소(부재 graceful skip) → principal=question.user_id + → topic 별 그룹(뷰어 subject 퀴즈가 여러 DS topic 걸칠 수 있음) → topic 마다 DS quiz_session + (source='viewer', client_session_uuid) 생성 + attempt(derive_outcome=채점 단일 소스) + 세션 done + → finalize_session **무수정 재생**(SR/pattern/progress + 4-A/4-B enqueue) → finalized_at 마커 + → 전부 1 트랜잭션(원자) 후 commit. + +멱등(r2 P2-2): client_session_uuid 로 기존 세션 있으면 이미 적재된 것 → 캐시 요약 반환(재실행 0). + 원자 1-tx 라 'uuid 존재 ⟺ finalize 완료' → at-least-once outbox 재전송에도 SR 이중 advance 없음. +user_id 리터럴 금지(r2): principal = 해소된 질문의 owner(단일, mixed 면 거부). +""" + +from __future__ import annotations + +import hmac +import logging +from collections import defaultdict +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends, Header, HTTPException +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.config import settings +from core.database import async_session +from models.published import Published +from models.study_question import StudyQuestion, StudyQuestionAttempt +from models.study_quiz_session import StudyQuizSession +from services.study.outcome import derive_outcome +from services.study.publish_projection import KIND_QUESTION +from services.study.session_finalize import finalize_session + +logger = logging.getLogger(__name__) +router = APIRouter() + + +def _verify_token(authorization: str | None = Header(default=None)) -> None: + """뷰어↔DS 발행 채널 Bearer(read 와 동일 토큰, r3 단일토큰 수용). default-deny(미설정=503).""" + if not settings.viewer_sync_token: + raise HTTPException(status_code=503, detail="viewer_sync_token not configured") + if not authorization or not authorization.lower().startswith("bearer "): + raise HTTPException(status_code=401, detail="missing Bearer token") + token = authorization[7:].strip() + if not hmac.compare_digest(token, settings.viewer_sync_token): + raise HTTPException(status_code=403, detail="invalid token") + + +async def _session() -> AsyncSession: + async with async_session() as s: + yield s + + +class IngestAttempt(BaseModel): + question_pub_id: str + selected_choice: int | None = None + is_unsure: bool = False + answered_at: str | None = None # 클라(오프라인) ISO 시각 — 미래 스큐 클램프, id 가 타이브레이커 + + +class IngestBody(BaseModel): + client_session_uuid: str + attempts: list[IngestAttempt] + + +def _parse_answered_at(s: str | None, now: datetime) -> datetime: + if not s: + return now + try: + dt = datetime.fromisoformat(s.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return min(dt, now) # 미래 스큐는 now 로 클램프(클라 시계 오염 방지) + except Exception: + return now + + +@router.post("/attempts") +async def ingest_attempts( + body: IngestBody, + _auth: None = Depends(_verify_token), + session: AsyncSession = Depends(_session), +): + if not settings.study_ingest_enabled: + raise HTTPException(status_code=503, detail="study_ingest not enabled") + if not body.client_session_uuid or not body.attempts: + raise HTTPException(status_code=400, detail="client_session_uuid 와 attempts 필요") + + # 멱등: 이 uuid 로 이미 적재됐나(원자 1-tx 라 존재=완료). 있으면 캐시 요약 반환(재실행 0). + existing = ( + await session.execute( + select(StudyQuizSession).where( + StudyQuizSession.client_session_uuid == body.client_session_uuid + ) + ) + ).scalars().all() + if existing: + return { + "status": "already_ingested", + "sessions": [ + { + "topic_id": s.study_topic_id, + "correct": s.correct_count, + "wrong": s.wrong_count, + "unsure": s.unsure_count, + } + for s in existing + ], + } + + # pub_id → source_id(내부 질문 id) 해소. deleted tombstone 제외. + pub_ids = list({a.question_pub_id for a in body.attempts}) + pub_rows = ( + await session.execute( + select(Published.pub_id, Published.source_id).where( + Published.kind == KIND_QUESTION, + Published.pub_id.in_(pub_ids), + Published.deleted.is_(False), + ) + ) + ).all() + src_by_pubid = {r.pub_id: r.source_id for r in pub_rows} + + # 질문 fetch(미삭제). principal = owner(단일). + source_ids = list(set(src_by_pubid.values())) + q_rows = ( + await session.execute( + select(StudyQuestion).where( + StudyQuestion.id.in_(source_ids), StudyQuestion.deleted_at.is_(None) + ) + ) + ).scalars().all() + q_by_id = {q.id: q for q in q_rows} + owners = {q.user_id for q in q_by_id.values()} + if len(owners) > 1: + raise HTTPException(status_code=400, detail="여러 사용자 소유 질문 혼재 — 단일 principal 위반") + if not owners: + raise HTTPException(status_code=404, detail="해소 가능한 질문 없음") + user_id = owners.pop() + + now = datetime.now(timezone.utc) + + # topic 별 그룹(해소 실패 attempt 는 graceful skip). 같은 (uuid, topic) 1 세션. + by_topic: dict[int, list[tuple[IngestAttempt, StudyQuestion]]] = defaultdict(list) + skipped: list[str] = [] + for a in body.attempts: + src = src_by_pubid.get(a.question_pub_id) + q = q_by_id.get(src) if src is not None else None + if q is None: + skipped.append(a.question_pub_id) + continue + by_topic[q.study_topic_id].append((a, q)) + if not by_topic: + raise HTTPException(status_code=404, detail="해소된 attempt 없음") + + summaries = [] + for topic_id, items in by_topic.items(): + qids = [q.id for (_, q) in items] + qs = StudyQuizSession( + user_id=user_id, + study_topic_id=topic_id, + question_ids=qids, + subject_distribution={}, + status="done", + cursor=len(qids), + source="viewer", + client_session_uuid=body.client_session_uuid, + finished_at=now, + created_at=now, + updated_at=now, + ) + session.add(qs) + await session.flush() # qs.id + + c = w = u = 0 + for a, q in items: + try: + sel, is_corr, outcome = derive_outcome(a.selected_choice, a.is_unsure, q.correct_choice) + except ValueError: + skipped.append(a.question_pub_id) # 선택 없고 unsure 아님 = 무효 → skip + continue + if outcome == "correct": + c += 1 + elif outcome == "wrong": + w += 1 + elif outcome == "unsure": + u += 1 + session.add( + StudyQuestionAttempt( + user_id=user_id, + study_question_id=q.id, + study_topic_id=topic_id, + selected_choice=sel, + correct_choice=q.correct_choice, + is_correct=is_corr, + outcome=outcome, + quiz_session_id=qs.id, + answered_at=_parse_answered_at(a.answered_at, now), + ) + ) + qs.correct_count, qs.wrong_count, qs.unsure_count = c, w, u + await session.flush() + + # finalize 무수정 재생(progress/SR/pattern + 4-A/4-B enqueue). 그 후 멱등 마커. + summary = await finalize_session( + session, user_id=user_id, study_topic_id=topic_id, quiz_session_id=qs.id + ) + qs.finalized_at = now + summaries.append( + { + "topic_id": topic_id, + "quiz_session_id": qs.id, + "correct": summary.correct, + "wrong": summary.wrong, + "unsure": summary.unsure, + "newly_correct": summary.newly_correct, + "relapsed": summary.relapsed, + "recovered": summary.recovered, + } + ) + + await session.commit() + logger.info( + "study_ingest uuid=%s user=%s sessions=%s skipped=%s", + body.client_session_uuid, user_id, len(summaries), len(skipped), + ) + return {"status": "ingested", "skipped": skipped, "sessions": summaries} diff --git a/app/api/study_questions.py b/app/api/study_questions.py index 9de5a44..f4d3094 100644 --- a/app/api/study_questions.py +++ b/app/api/study_questions.py @@ -41,6 +41,7 @@ from services.study.explanation_rag import ( ) from services.study.publish_enqueue import enqueue_publish, enqueue_question_publish from services.study.publish_projection import KIND_EXPLANATION, KIND_QUESTION +from services.study.outcome import derive_outcome logger = logging.getLogger(__name__) router = APIRouter() @@ -1005,19 +1006,13 @@ async def submit_attempt( q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) - if body.is_unsure: - selected = None - is_correct = False - outcome = "unsure" - elif body.selected_choice is None: - raise HTTPException( - status_code=422, - detail="selected_choice (1~4) 또는 is_unsure=true 가 필요합니다", + # 채점 단일 소스 — 뷰어 ingest 와 동일 함수(P2). 선택 없고 unsure 아니면 422. + try: + selected, is_correct, outcome = derive_outcome( + body.selected_choice, body.is_unsure, q.correct_choice ) - else: - selected = body.selected_choice - is_correct = selected == q.correct_choice - outcome = "correct" if is_correct else "wrong" + except ValueError as e: + raise HTTPException(status_code=422, detail=str(e)) # PR-10: 세션 연동. 기본은 None. quiz_session: StudyQuizSession | None = None diff --git a/app/core/config.py b/app/core/config.py index 8065c36..42909c4 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -187,6 +187,8 @@ class Settings(BaseModel): study_card_extract_enabled: bool = True # 발행 레이어(docsrv-viewer-publish): publish_outbox 워커 게이트. 저자/4-A enqueue 결선(P0-1b) 후 true. study_publish_enabled: bool = False + # 뷰어 write-back ingest(study-to-viewer P2) 게이트. /ingest/study/attempts 활성. 기본 false=inert(503). + study_ingest_enabled: bool = False # internal endpoint Bearer token (Mac mini derived-worker 호출용) internal_worker_token: str = "" @@ -202,6 +204,7 @@ def load_settings() -> Settings: study_explanation_enabled = os.getenv("STUDY_EXPLANATION_ENABLED", "true").lower() in ("1", "true", "yes") study_card_extract_enabled = os.getenv("STUDY_CARD_EXTRACT_ENABLED", "true").lower() in ("1", "true", "yes") study_publish_enabled = os.getenv("STUDY_PUBLISH_ENABLED", "false").lower() in ("1", "true", "yes") + study_ingest_enabled = os.getenv("STUDY_INGEST_ENABLED", "false").lower() in ("1", "true", "yes") internal_worker_token = os.getenv("INTERNAL_WORKER_TOKEN", "") viewer_sync_token = os.getenv("VIEWER_SYNC_TOKEN", "") jwt_secret = os.getenv("JWT_SECRET", "") @@ -337,6 +340,7 @@ def load_settings() -> Settings: study_explanation_enabled=study_explanation_enabled, study_card_extract_enabled=study_card_extract_enabled, study_publish_enabled=study_publish_enabled, + study_ingest_enabled=study_ingest_enabled, internal_worker_token=internal_worker_token, viewer_sync_token=viewer_sync_token, pipeline_held_stages=pipeline_held_stages, diff --git a/app/main.py b/app/main.py index fae44b1..61a55b6 100644 --- a/app/main.py +++ b/app/main.py @@ -10,6 +10,7 @@ from api.audio import router as audio_router from api.internal_study import router as internal_study_router from api.internal_worker import router as internal_worker_router from api.published import router as published_router +from api.ingest_study import router as ingest_study_router from api.auth import router as auth_router from api.briefing import router as briefing_router from api.config import router as config_router @@ -238,6 +239,7 @@ app.include_router(audio_router, prefix="/api/audio", tags=["audio"]) app.include_router(internal_study_router, prefix="/internal/study", tags=["internal-study"]) app.include_router(internal_worker_router, prefix="/internal/worker", tags=["internal-worker"]) app.include_router(published_router, prefix="/published", tags=["published"]) +app.include_router(ingest_study_router, prefix="/ingest/study", tags=["ingest-study"]) app.include_router(video_router, prefix="/api/video", tags=["video"]) app.include_router(study_sessions_router, prefix="/api/study-sessions", tags=["study-sessions"]) app.include_router(study_topics_router, prefix="/api/study-topics", tags=["study-topics"]) diff --git a/app/models/study_quiz_session.py b/app/models/study_quiz_session.py index fd504ff..3519b5f 100644 --- a/app/models/study_quiz_session.py +++ b/app/models/study_quiz_session.py @@ -50,6 +50,10 @@ class StudyQuizSession(Base): chronic_remaining_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + # study-to-viewer P2: 뷰어 ingest 멱등/출처. 라이브 세션=finalized_at·client_session_uuid NULL, source='live'. + finalized_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) # 멱등 마커(mig 373) + client_session_uuid: Mapped[str | None] = mapped_column(String(64)) # 뷰어 세션 UUID(mig 374, uq mig376) + source: Mapped[str] = mapped_column(String(20), nullable=False, default="live") # live|viewer(mig 375) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=datetime.now, nullable=False ) diff --git a/app/services/study/outcome.py b/app/services/study/outcome.py new file mode 100644 index 0000000..2e75954 --- /dev/null +++ b/app/services/study/outcome.py @@ -0,0 +1,25 @@ +"""채점(outcome) 산출 단일 소스 (study-to-viewer P2). + +라이브 attempt 엔드포인트(submit_attempt)와 뷰어 ingest 가 **동일 함수**로 채점 → +정오 어휘가 한 곳(서버)에서 결정(plan r2: ingest 는 raw 신호 selected+unsure 만 싣고 +DS 가 산출 = '무수정 재생'을 실제로 성립시키는 형태). correct_choice 는 항상 현재 DB 값. + +규칙(라이브 study_questions.py:1008-1020 동일): + is_unsure=True → (None, False, 'unsure') # unsure 가 정오 override, selected 폐기 + selected None → ValueError # 선택 없고 unsure 도 아니면 무효(엔드포인트가 처리) + 그 외 → selected==correct → (selected, is_correct, 'correct'|'wrong') +""" + +from __future__ import annotations + + +def derive_outcome( + selected_choice: int | None, is_unsure: bool, correct_choice: int +) -> tuple[int | None, bool, str]: + """(selected, is_correct, outcome) 반환. skipped 는 여기서 안 나옴(선택 없으면 호출측이 거부/skip).""" + if is_unsure: + return None, False, "unsure" + if selected_choice is None: + raise ValueError("selected_choice (1~4) 또는 is_unsure=true 가 필요합니다") + is_correct = selected_choice == correct_choice + return selected_choice, is_correct, ("correct" if is_correct else "wrong") diff --git a/docker-compose.yml b/docker-compose.yml index a9869f6..46a7303 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -213,6 +213,8 @@ services: # docsrv-viewer-publish: 발행 워커/저작 enqueue 게이트(기본 false=inert) + 뷰어↔DS feed Bearer. - STUDY_PUBLISH_ENABLED=${STUDY_PUBLISH_ENABLED:-false} - VIEWER_SYNC_TOKEN=${VIEWER_SYNC_TOKEN:-} + # study-to-viewer P2: 뷰어 write-back ingest 게이트(기본 false=inert, 검증 후 점등). + - STUDY_INGEST_ENABLED=${STUDY_INGEST_ENABLED:-false} # Voice Memo PoC v1 — bot 계정 한정 long-expiry access token. default false → 일반 운영 영향 0. # 활성화: host .env 에 VOICE_MEMO_BOT_TOKEN_ENABLED=true. plan: rosy-launching-otter.md - VOICE_MEMO_BOT_TOKEN_ENABLED=${VOICE_MEMO_BOT_TOKEN_ENABLED:-false} diff --git a/migrations/373_quiz_session_finalized_at.sql b/migrations/373_quiz_session_finalized_at.sql new file mode 100644 index 0000000..f53a30b --- /dev/null +++ b/migrations/373_quiz_session_finalized_at.sql @@ -0,0 +1,4 @@ +-- 373_quiz_session_finalized_at.sql +-- 발행 ingest(study-to-viewer P2) finalize 멱등 마커. finalize 성공 후 스탬프 → +-- 같은 세션 재전송(at-least-once outbox) 시 SR 이중 advance 차단. 라이브 세션은 NULL 유지(무영향). +ALTER TABLE study_quiz_sessions ADD COLUMN IF NOT EXISTS finalized_at TIMESTAMPTZ; diff --git a/migrations/374_quiz_session_client_uuid.sql b/migrations/374_quiz_session_client_uuid.sql new file mode 100644 index 0000000..ccc9ba6 --- /dev/null +++ b/migrations/374_quiz_session_client_uuid.sql @@ -0,0 +1,3 @@ +-- 374_quiz_session_client_uuid.sql +-- 뷰어 로컬 세션 UUID. ingest 가 (uuid, topic) 로 DS 세션 find-or-create = 멱등 키. 라이브=NULL. +ALTER TABLE study_quiz_sessions ADD COLUMN IF NOT EXISTS client_session_uuid TEXT; diff --git a/migrations/375_quiz_session_source.sql b/migrations/375_quiz_session_source.sql new file mode 100644 index 0000000..e6a0f3c --- /dev/null +++ b/migrations/375_quiz_session_source.sql @@ -0,0 +1,3 @@ +-- 375_quiz_session_source.sql +-- 세션 출처 구분(live | viewer). 감사/필터용. 기존 행=live. +ALTER TABLE study_quiz_sessions ADD COLUMN IF NOT EXISTS source VARCHAR(20) NOT NULL DEFAULT 'live'; diff --git a/migrations/376_quiz_session_client_uuid_uq.sql b/migrations/376_quiz_session_client_uuid_uq.sql new file mode 100644 index 0000000..5883215 --- /dev/null +++ b/migrations/376_quiz_session_client_uuid_uq.sql @@ -0,0 +1,3 @@ +-- 376_quiz_session_client_uuid_uq.sql +-- (client_session_uuid, study_topic_id) 유일 — 뷰어 1세션이 topic 별 1 DS세션. partial(uuid 있는 viewer 행만). +CREATE UNIQUE INDEX IF NOT EXISTS study_quiz_sessions_client_uuid_topic_uq ON study_quiz_sessions (client_session_uuid, study_topic_id) WHERE client_session_uuid IS NOT NULL;