diff --git a/app/api/study_cards.py b/app/api/study_cards.py index 37f9aa1..a2c5898 100644 --- a/app/api/study_cards.py +++ b/app/api/study_cards.py @@ -21,12 +21,14 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user +from core.config import settings from core.database import get_session from models.study_memo_card import StudyMemoCard, StudyMemoCardEvidence, record_card_view from models.study_memo_card_progress import StudyMemoCardProgress, rate_card from models.study_question import StudyQuestion from models.user import User from services.study.card_normalize import compute_dedup_hash +from services.study.publish_enqueue import enqueue_card_progress_publish, enqueue_card_publish router = APIRouter() @@ -248,9 +250,18 @@ async def approve_batch( StudyMemoCard.needs_review, ) .values(needs_review=False, flagged_by=None, flagged_at=None) + .returning(StudyMemoCard.id) ) + approved_ids = list(result.scalars().all()) + # 방금 검수완료된 카드 발행(같은 tx, flag off 면 no-op). S-2. + if settings.study_publish_enabled and approved_ids: + cards = ( + await session.execute(select(StudyMemoCard).where(StudyMemoCard.id.in_(approved_ids))) + ).scalars().all() + for c in cards: + await enqueue_card_publish(session, c) await session.commit() - return {"approved": result.rowcount or 0} + return {"approved": len(approved_ids)} # ─── 복습(SR) 트랙 ─── @@ -310,6 +321,9 @@ async def rate( if outcome is None: raise HTTPException(status_code=422, detail=f"invalid outcome: {body.outcome!r}") progress = await rate_card(session, card=card, outcome=outcome, now=datetime.now(timezone.utc)) + # 카드 SR 상태 발행(같은 tx, flag off=no-op) — ALL row(sentinel/terminal 포함). S-4. + if settings.study_publish_enabled: + await enqueue_card_progress_publish(session, progress) await session.commit() return RateResult( card_id=card.id, outcome=outcome, review_stage=progress.review_stage, due_at=progress.due_at @@ -392,6 +406,9 @@ async def update_card( card.flagged_by = None card.flagged_at = None + # 발행 재투영/tombstone(같은 tx) — 검수완료=발행·검수대기복귀=tombstone(상태 기반). S-2. + if settings.study_publish_enabled: + await enqueue_card_publish(session, card) try: await session.commit() except IntegrityError: @@ -414,4 +431,7 @@ async def delete_card( card = await session.get(StudyMemoCard, card_id) card = _verify_card(card, user) card.deleted_at = datetime.now(timezone.utc) + # 발행 tombstone(같은 tx) — 삭제는 feed 1급 이벤트. S-2. + if settings.study_publish_enabled: + await enqueue_card_publish(session, card) await session.commit() diff --git a/app/api/study_questions.py b/app/api/study_questions.py index f4d3094..93651dc 100644 --- a/app/api/study_questions.py +++ b/app/api/study_questions.py @@ -40,7 +40,7 @@ from services.study.explanation_rag import ( render_evidence_block, ) from services.study.publish_enqueue import enqueue_publish, enqueue_question_publish -from services.study.publish_projection import KIND_EXPLANATION, KIND_QUESTION +from services.study.publish_projection import KIND_CARD, KIND_EXPLANATION, KIND_QUESTION from services.study.outcome import derive_outcome logger = logging.getLogger(__name__) @@ -911,7 +911,11 @@ async def update_question( # 카드는 '구' ai_explanation 에서 추출됐으므로 정정 후 stale 가능 — 즉시 가시화 플래그. # 최종 stale 정리는 card_extract 워커의 supersede 가 책임(새 버전 추출 시 구버전 retire). if AI_STALE_TRIGGER & fields_set: - await flag_cards_for_source(session, source_question_id=q.id, reason="source_changed") + flagged_card_ids = await flag_cards_for_source(session, source_question_id=q.id, reason="source_changed") + # 발행 자격 잃은(검수대기 복귀) 파생 카드 tombstone(같은 tx). S-2. + if settings.study_publish_enabled: + for cid in flagged_card_ids: + await enqueue_publish(session, kind=KIND_CARD, source_id=cid, payload=None, deleted=True) q.updated_at = datetime.now(timezone.utc) # 발행 재투영(같은 tx) — 문항 갱신 반영. 해설은 ready 일 때만 동봉, stale→tombstone 은 P1-3. P0-1b. @@ -979,7 +983,11 @@ async def soft_delete_question( ) # 공부 암기노트: 소스 문제 삭제 시 파생 암기카드를 검토 대기로 마킹(source_deleted). # study_questions 는 soft-delete 만이라 카드 FK CASCADE 는 미발동 — 이 훅이 실 경로. - await flag_cards_for_source(session, source_question_id=q.id, reason="source_deleted") + flagged_card_ids = await flag_cards_for_source(session, source_question_id=q.id, reason="source_deleted") + # 발행 자격 잃은 파생 카드 tombstone(같은 tx). S-2. + if settings.study_publish_enabled: + for cid in flagged_card_ids: + await enqueue_publish(session, kind=KIND_CARD, source_id=cid, payload=None, deleted=True) # 발행 tombstone(같은 tx) — 삭제는 feed 1급 이벤트(raw DELETE 금지·워커 경유). 해설 본문 있으면 그 kind 도. P0-1b. if settings.study_publish_enabled: await enqueue_publish(session, kind=KIND_QUESTION, source_id=q.id, payload=None, deleted=True) diff --git a/app/models/study_memo_card.py b/app/models/study_memo_card.py index 15a19b6..b8407a0 100644 --- a/app/models/study_memo_card.py +++ b/app/models/study_memo_card.py @@ -25,6 +25,7 @@ from sqlalchemy import ( String, Text, func, + select, text, update, ) @@ -99,13 +100,25 @@ async def supersede_old_cards( *, source_question_id: int, keep_generated_at: datetime | None, -) -> int: +) -> list[int]: """같은 문제의 '다른 버전' 카드를 deleted_at 마킹(retire). 새 source_generated_at 카드 적재 '전에' 호출 — 살아있는 구버전 카드가 dedup PARTIAL UNIQUE 로 새 추출을 막는 것을 방지(정정-후 stale 잔류 0). 같은 버전은 보존. - Returns: retire 된 행 수. + Returns: retire 되며 '발행 중이던'(needs_review=False) 카드 id 목록 — 발행 tombstone + 대상(호출측이 enqueue). 검수 안 됐던(미발행) retire 카드는 tombstone 불요라 제외. """ + # 발행 중이던 retire 대상 선캡처(update 전) — 미발행 카드 스푸리어스 tombstone 회피. + published_retired = ( + await session.execute( + select(StudyMemoCard.id).where( + StudyMemoCard.source_question_id == source_question_id, + StudyMemoCard.deleted_at.is_(None), + StudyMemoCard.source_generated_at.is_distinct_from(keep_generated_at), + StudyMemoCard.needs_review.is_(False), + ) + ) + ).scalars().all() stmt = ( update(StudyMemoCard) .where( @@ -115,8 +128,8 @@ async def supersede_old_cards( ) .values(deleted_at=func.now()) ) - result = await session.execute(stmt) - return result.rowcount or 0 + await session.execute(stmt) + return list(published_retired) async def append_card( @@ -216,13 +229,24 @@ async def flag_cards_for_source( *, source_question_id: int, reason: str, -) -> int: +) -> list[int]: """소스 문제 정정/삭제 시 파생 카드를 needs_review=auto 마킹(임시 플래그). 최종 stale 정리는 워커 supersede 가 책임 — 이건 사용자 가시화용 즉시 플래그. reason: 'source_changed' | 'source_deleted'. - Returns: 마킹된 행 수. + Returns: 플래그로 '발행 자격을 잃은'(직전 needs_review=False) 카드 id 목록 — 발행 + tombstone 대상(호출측 enqueue). 이미 검수대기였던(미발행) 카드는 제외. """ + # 발행 중이던 카드 선캡처(update 전) — 플래그로 needs_review=True 가 되면 발행 자격 상실. + published_ids = ( + await session.execute( + select(StudyMemoCard.id).where( + StudyMemoCard.source_question_id == source_question_id, + StudyMemoCard.deleted_at.is_(None), + StudyMemoCard.needs_review.is_(False), + ) + ) + ).scalars().all() stmt = ( update(StudyMemoCard) .where( @@ -231,5 +255,5 @@ async def flag_cards_for_source( ) .values(needs_review=True, flagged_by=reason, flagged_at=func.now()) ) - result = await session.execute(stmt) - return result.rowcount or 0 + await session.execute(stmt) + return list(published_ids) diff --git a/app/services/study/publish_enqueue.py b/app/services/study/publish_enqueue.py index 0857b9a..40698fe 100644 --- a/app/services/study/publish_enqueue.py +++ b/app/services/study/publish_enqueue.py @@ -18,14 +18,20 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from models.published import PublishOutbox +from models.study_memo_card import StudyMemoCard +from models.study_memo_card_progress import StudyMemoCardProgress from models.study_question import StudyQuestion from models.study_topic import StudyTopic from services.study.publish_projection import ( + KIND_CARD, + KIND_CARD_PROGRESS, KIND_EXPLANATION, KIND_QUESTION, KIND_TOPIC, SCHEMA_VERSION, payload_hash, + project_card, + project_card_progress, project_explanation, project_question, project_topic, @@ -102,3 +108,67 @@ async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, l for t in rows: await enqueue_topic_publish(session, t) return len(rows) + + +async def enqueue_card_publish(session: AsyncSession, card: Any) -> None: + """카드 상태 기반 발행/tombstone (S-2). caller commit. + + 검수완료(needs_review=False) & 미삭제 만 발행 — 그 외(검수대기 복귀·삭제·retire)는 + tombstone(feed 1급 삭제 이벤트). 발행 자격이 카드 상태에 매여 있어 호출측은 '카드를 + 건드렸다'만 알면 되고 publish/tombstone 분기는 여기 단일화(경로별 가드 기억 회피). + """ + if card.deleted_at is not None or card.needs_review: + await enqueue_publish(session, kind=KIND_CARD, source_id=card.id, payload=None, deleted=True) + else: + await enqueue_publish(session, kind=KIND_CARD, source_id=card.id, payload=project_card(card)) + + +async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: + """검수완료(needs_review=False)·미삭제 카드를 id>after_id 부터 bounded 로 outbox 적재(S-2 초기 백필). + + 반환 = enqueue 한 카드 수(0 이면 끝). 멱등 = 워커 (payload_hash, deleted) 디둡. caller commit. + """ + rows = ( + await session.execute( + select(StudyMemoCard) + .where( + StudyMemoCard.deleted_at.is_(None), + StudyMemoCard.needs_review.is_(False), + StudyMemoCard.id > after_id, + ) + .order_by(StudyMemoCard.id.asc()) + .limit(limit) + ) + ).scalars().all() + for c in rows: + await enqueue_card_publish(session, c) + return len(rows) + + +async def enqueue_card_progress_publish(session: AsyncSession, progress: Any) -> None: + """카드 SR progress row 발행(S-4). caller commit. rate_card 결과(ALL row, sentinel/terminal 포함).""" + await enqueue_publish( + session, + kind=KIND_CARD_PROGRESS, + source_id=progress.id, + payload=project_card_progress(progress), + ) + + +async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: + """모든 card progress row 를 id>after_id 부터 bounded 로 outbox 적재(S-4 초기 백필). + + ★필터 없음 = ALL row(due_at NULL sentinel·terminal 포함) — due-only 백필은 sentinel 누락. + 반환 = enqueue 한 row 수(0 이면 끝). 멱등 = 워커 디둡. caller commit. + """ + rows = ( + await session.execute( + select(StudyMemoCardProgress) + .where(StudyMemoCardProgress.id > after_id) + .order_by(StudyMemoCardProgress.id.asc()) + .limit(limit) + ) + ).scalars().all() + for p in rows: + await enqueue_card_progress_publish(session, p) + return len(rows) diff --git a/app/services/study/publish_projection.py b/app/services/study/publish_projection.py index a6205ae..1404cdb 100644 --- a/app/services/study/publish_projection.py +++ b/app/services/study/publish_projection.py @@ -20,6 +20,8 @@ SCHEMA_VERSION = 1 KIND_QUESTION = "study_question" KIND_EXPLANATION = "study_explanation" KIND_TOPIC = "study_topic" +KIND_CARD = "study_card" # ★뷰어 pubstudy.ts 의 KIND_CARD 와 일치 필수(S-3 forward-contract). +KIND_CARD_PROGRESS = "study_card_progress" # 카드 SR 상태 read model (S-4, viewer C-4 소비). def payload_hash(payload: dict[str, Any]) -> str: @@ -60,6 +62,41 @@ def project_explanation(q: Any) -> dict[str, Any] | None: } +def project_card(c: Any) -> dict[str, Any]: + """study_memo_card → 발행 payload (S-2). 순수 변환 — 발행 자격(needs_review=false & + 미삭제) 판단은 호출측(enqueue_card_publish)이 카드 상태로. payload 계약 = 뷰어 + pubstudy.ts getCards 와 동형(format·cue·fact·cloze_text·source_question_id·source_generated_at). + """ + gen = getattr(c, "source_generated_at", None) + return { + "format": c.format, + "cue": c.cue, + "fact": c.fact, + "cloze_text": c.cloze_text, + "source_question_id": c.source_question_id, + "source_generated_at": gen.isoformat() if gen else None, + } + + +def project_card_progress(p: Any) -> dict[str, Any]: + """study_memo_card_progress → 발행 payload (S-4) = 카드 SR 상태 read model. + + ★ALL row 발행(due_at NULL sentinel=암-on-new · terminal=졸업 포함). due-only 발행하면 + sentinel 누락 → viewer 가 '미확인' 오분류. SR 계산은 DS(sr_schedule), 여긴 결과만. + card_id = pub_card 의 source_id(=DS card.id) → viewer C-4 가 pub_card LEFT JOIN 하는 키. + """ + due = getattr(p, "due_at", None) + rev = getattr(p, "last_reviewed_at", None) + return { + "card_id": p.card_id, + "topic_id": p.study_topic_id, + "last_outcome": p.last_outcome, + "last_reviewed_at": rev.isoformat() if rev else None, + "due_at": due.isoformat() if due else None, + "review_stage": p.review_stage, + } + + def project_topic(t: Any) -> dict[str, Any]: """study_topic → 발행 payload (S-1, plan study-viewer-port). diff --git a/app/workers/study_memo_card_worker.py b/app/workers/study_memo_card_worker.py index 502ff69..39c3a79 100644 --- a/app/workers/study_memo_card_worker.py +++ b/app/workers/study_memo_card_worker.py @@ -34,6 +34,8 @@ from models.study_memo_card_job import StudyMemoCardJob from models.study_question import StudyQuestion from models.user import User # noqa: F401 (mapper 초기화 defensive) from services.search.llm_gate import Priority, acquire_mlx_gate +from services.study.publish_enqueue import enqueue_publish +from services.study.publish_projection import KIND_CARD from services.study.explanation_rag import ( gather_explanation_context, render_evidence_block, @@ -184,9 +186,13 @@ async def run_card_extract_job(session: AsyncSession, job: StudyMemoCardJob) -> return # 5. 성공 — 구버전 카드 retire 후 append (dedup partial unique 충돌 회피). - await supersede_old_cards( + retired_published_ids = await supersede_old_cards( session, source_question_id=question.id, keep_generated_at=source_version ) + # 발행 중이던 구버전 카드 tombstone(같은 tx) — 재추출 retire 후 viewer stale 잔류 0. S-2. + if settings.study_publish_enabled: + for cid in retired_published_ids: + await enqueue_publish(session, kind=KIND_CARD, source_id=cid, payload=None, deleted=True) model_name = f"mlx:{primary_name}" inserted = 0 for g in guarded: diff --git a/scripts/backfill_publish_card_progress.py b/scripts/backfill_publish_card_progress.py new file mode 100644 index 0000000..9d098e6 --- /dev/null +++ b/scripts/backfill_publish_card_progress.py @@ -0,0 +1,61 @@ +"""S-4 초기 백필 — 모든 study_memo_card_progress row 를 발행 outbox 에 적재. + +★ALL row(필터 없음) — due_at NULL sentinel(암-on-new)·terminal(졸업) 포함. due-only 백필은 +sentinel 누락 → viewer 미확인 오분류. 멱등(워커 (payload_hash, deleted) 디둡). flag on 시 워커 drain. + +실행 (GPU 서버): + docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_card_progress.py + docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_card_progress.py --dry-run +""" + +import argparse +import asyncio +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from sqlalchemy import func, select + +from core.config import settings +from core.database import async_session +from models.study_memo_card_progress import StudyMemoCardProgress +from services.study.publish_enqueue import backfill_publish_card_progress + +# 개인 학습툴 progress row 대비 넉넉. 도달 시 가드 경보. +PAGE = 100000 + + +async def run(dry_run: bool) -> None: + async with async_session() as session: + total = ( + await session.execute( + select(func.count()).select_from(StudyMemoCardProgress) + ) + ).scalar() or 0 + + print(f"[info] study_publish_enabled={settings.study_publish_enabled} " + f"(False 면 적재는 되나 워커가 drain 안 함)") + print(f"[info] card progress row {total}건 (ALL row 발행)") + if dry_run: + print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.") + return + + async with async_session() as session: + n = await backfill_publish_card_progress(session, after_id=0, limit=PAGE) + await session.commit() + + print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") + if n >= PAGE: + print(f"[warn] PAGE({PAGE}) 도달 — progress 가 더 있을 수 있음. after_id 페이징 추가 필요.") + + +def main() -> None: + parser = argparse.ArgumentParser(description="S-4 pub_card_progress 초기 백필") + parser.add_argument("--dry-run", action="store_true", default=False) + args = parser.parse_args() + asyncio.run(run(args.dry_run)) + + +if __name__ == "__main__": + main() diff --git a/scripts/backfill_publish_cards.py b/scripts/backfill_publish_cards.py new file mode 100644 index 0000000..fe9f308 --- /dev/null +++ b/scripts/backfill_publish_cards.py @@ -0,0 +1,66 @@ +"""S-2 초기 백필 — 검수완료(needs_review=False)·미삭제 study_memo_cards 를 발행 outbox 에 적재. + +publish_outbox 에만 적재(멱등: 워커 (payload_hash, deleted) 디둡). study_publish_enabled=True +일 때 발행 워커가 drain → published(kind=study_card) rev 부여 → viewer pull-sync. + +실행 (GPU 서버): + docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_cards.py + docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_cards.py --dry-run +""" + +import argparse +import asyncio +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from sqlalchemy import func, select + +from core.config import settings +from core.database import async_session +from models.study_memo_card import StudyMemoCard +from services.study.publish_enqueue import backfill_publish_cards + +# 개인 학습툴 카드 수 대비 넉넉(단일 outbox 적재 tx, 워커는 BATCH_SIZE 로 drain). 도달 시 가드 경보. +PAGE = 100000 + + +async def run(dry_run: bool) -> None: + async with async_session() as session: + active = ( + await session.execute( + select(func.count()) + .select_from(StudyMemoCard) + .where( + StudyMemoCard.deleted_at.is_(None), + StudyMemoCard.needs_review.is_(False), + ) + ) + ).scalar() or 0 + + print(f"[info] study_publish_enabled={settings.study_publish_enabled} " + f"(False 면 적재는 되나 워커가 drain 안 함)") + print(f"[info] 검수완료·미삭제 카드 {active}건") + if dry_run: + print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.") + return + + async with async_session() as session: + n = await backfill_publish_cards(session, after_id=0, limit=PAGE) + await session.commit() + + print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") + if n >= PAGE: + print(f"[warn] PAGE({PAGE}) 도달 — 카드가 더 있을 수 있음. after_id 페이징 추가 필요.") + + +def main() -> None: + parser = argparse.ArgumentParser(description="S-2 pub_card 초기 백필") + parser.add_argument("--dry-run", action="store_true", default=False) + args = parser.parse_args() + asyncio.run(run(args.dry_run)) + + +if __name__ == "__main__": + main()