feat(publish): S-4 pub_card_progress 발행 — 카드 SR 상태 read model (study→viewer)

DS 가 가진 카드 SR progress row 를 발행(kind=study_card_progress) = read model.
viewer C-4 복습큐/미확인 set-difference 재료. plan study-viewer-port S-4.
- projection: KIND_CARD_PROGRESS + project_card_progress(card_id·topic_id·last_outcome·
  last_reviewed_at·due_at·review_stage). ★ALL row(due_at NULL sentinel=암-on-new·terminal
  포함) — due-only 발행 금지(sentinel 누락→viewer 미확인 오분류).
- enqueue: enqueue_card_progress_publish + backfill_publish_card_progress(필터 없음).
- 훅: /study-cards/{id}/rate 의 rate_card 직후(같은 tx·flag 게이트). 단일 write 사이트.
  SR 계산=DS(sr_schedule 무변경), 발행=결과만.
- 카드 삭제 시 progress tombstone 안 함 = DS SR 보존(재승인 복원), orphan 은 viewer C-4 가 로컬 드롭.
- scripts/backfill_publish_card_progress.py.

py_compile PASS · project_card_progress 단위검증(sentinel due_at=None 보존).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-25 16:00:10 +09:00
parent af5640ef49
commit 08c5213168
4 changed files with 117 additions and 1 deletions
+4 -1
View File
@@ -28,7 +28,7 @@ from models.study_memo_card_progress import StudyMemoCardProgress, rate_card
from models.study_question import StudyQuestion
from models.user import User
from services.study.card_normalize import compute_dedup_hash
from services.study.publish_enqueue import enqueue_card_publish
from services.study.publish_enqueue import enqueue_card_progress_publish, enqueue_card_publish
router = APIRouter()
@@ -321,6 +321,9 @@ async def rate(
if outcome is None:
raise HTTPException(status_code=422, detail=f"invalid outcome: {body.outcome!r}")
progress = await rate_card(session, card=card, outcome=outcome, now=datetime.now(timezone.utc))
# 카드 SR 상태 발행(같은 tx, flag off=no-op) — ALL row(sentinel/terminal 포함). S-4.
if settings.study_publish_enabled:
await enqueue_card_progress_publish(session, progress)
await session.commit()
return RateResult(
card_id=card.id, outcome=outcome, review_stage=progress.review_stage, due_at=progress.due_at
+32
View File
@@ -19,16 +19,19 @@ from sqlalchemy.ext.asyncio import AsyncSession
from models.published import PublishOutbox
from models.study_memo_card import StudyMemoCard
from models.study_memo_card_progress import StudyMemoCardProgress
from models.study_question import StudyQuestion
from models.study_topic import StudyTopic
from services.study.publish_projection import (
KIND_CARD,
KIND_CARD_PROGRESS,
KIND_EXPLANATION,
KIND_QUESTION,
KIND_TOPIC,
SCHEMA_VERSION,
payload_hash,
project_card,
project_card_progress,
project_explanation,
project_question,
project_topic,
@@ -140,3 +143,32 @@ async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, li
for c in rows:
await enqueue_card_publish(session, c)
return len(rows)
async def enqueue_card_progress_publish(session: AsyncSession, progress: Any) -> None:
"""카드 SR progress row 발행(S-4). caller commit. rate_card 결과(ALL row, sentinel/terminal 포함)."""
await enqueue_publish(
session,
kind=KIND_CARD_PROGRESS,
source_id=progress.id,
payload=project_card_progress(progress),
)
async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
"""모든 card progress row 를 id>after_id 부터 bounded 로 outbox 적재(S-4 초기 백필).
★필터 없음 = ALL row(due_at NULL sentinel·terminal 포함) — due-only 백필은 sentinel 누락.
반환 = enqueue 한 row 수(0 이면 끝). 멱등 = 워커 디둡. caller commit.
"""
rows = (
await session.execute(
select(StudyMemoCardProgress)
.where(StudyMemoCardProgress.id > after_id)
.order_by(StudyMemoCardProgress.id.asc())
.limit(limit)
)
).scalars().all()
for p in rows:
await enqueue_card_progress_publish(session, p)
return len(rows)
+20
View File
@@ -21,6 +21,7 @@ KIND_QUESTION = "study_question"
KIND_EXPLANATION = "study_explanation"
KIND_TOPIC = "study_topic"
KIND_CARD = "study_card" # ★뷰어 pubstudy.ts 의 KIND_CARD 와 일치 필수(S-3 forward-contract).
KIND_CARD_PROGRESS = "study_card_progress" # 카드 SR 상태 read model (S-4, viewer C-4 소비).
def payload_hash(payload: dict[str, Any]) -> str:
@@ -77,6 +78,25 @@ def project_card(c: Any) -> dict[str, Any]:
}
def project_card_progress(p: Any) -> dict[str, Any]:
"""study_memo_card_progress → 발행 payload (S-4) = 카드 SR 상태 read model.
★ALL row 발행(due_at NULL sentinel=암-on-new · terminal=졸업 포함). due-only 발행하면
sentinel 누락 → viewer 가 '미확인' 오분류. SR 계산은 DS(sr_schedule), 여긴 결과만.
card_id = pub_card 의 source_id(=DS card.id) → viewer C-4 가 pub_card LEFT JOIN 하는 키.
"""
due = getattr(p, "due_at", None)
rev = getattr(p, "last_reviewed_at", None)
return {
"card_id": p.card_id,
"topic_id": p.study_topic_id,
"last_outcome": p.last_outcome,
"last_reviewed_at": rev.isoformat() if rev else None,
"due_at": due.isoformat() if due else None,
"review_stage": p.review_stage,
}
def project_topic(t: Any) -> dict[str, Any]:
"""study_topic → 발행 payload (S-1, plan study-viewer-port).
+61
View File
@@ -0,0 +1,61 @@
"""S-4 초기 백필 — 모든 study_memo_card_progress row 를 발행 outbox 에 적재.
★ALL row(필터 없음) — due_at NULL sentinel(암-on-new)·terminal(졸업) 포함. due-only 백필은
sentinel 누락 → viewer 미확인 오분류. 멱등(워커 (payload_hash, deleted) 디둡). flag on 시 워커 drain.
실행 (GPU 서버):
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_card_progress.py
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_card_progress.py --dry-run
"""
import argparse
import asyncio
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from sqlalchemy import func, select
from core.config import settings
from core.database import async_session
from models.study_memo_card_progress import StudyMemoCardProgress
from services.study.publish_enqueue import backfill_publish_card_progress
# 개인 학습툴 progress row 대비 넉넉. 도달 시 가드 경보.
PAGE = 100000
async def run(dry_run: bool) -> None:
async with async_session() as session:
total = (
await session.execute(
select(func.count()).select_from(StudyMemoCardProgress)
)
).scalar() or 0
print(f"[info] study_publish_enabled={settings.study_publish_enabled} "
f"(False 면 적재는 되나 워커가 drain 안 함)")
print(f"[info] card progress row {total}건 (ALL row 발행)")
if dry_run:
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
return
async with async_session() as session:
n = await backfill_publish_card_progress(session, after_id=0, limit=PAGE)
await session.commit()
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
if n >= PAGE:
print(f"[warn] PAGE({PAGE}) 도달 — progress 가 더 있을 수 있음. after_id 페이징 추가 필요.")
def main() -> None:
parser = argparse.ArgumentParser(description="S-4 pub_card_progress 초기 백필")
parser.add_argument("--dry-run", action="store_true", default=False)
args = parser.parse_args()
asyncio.run(run(args.dry_run))
if __name__ == "__main__":
main()