diff --git a/app/api/study_topics.py b/app/api/study_topics.py index 345a227..5708516 100644 --- a/app/api/study_topics.py +++ b/app/api/study_topics.py @@ -47,6 +47,8 @@ from models.eid_study_weakness import EidStudyWeakness from models.eid_review_set_draft import EidReviewSetDraft from models.user import User from services.search.llm_gate import Priority, acquire_mlx_gate +from services.study.publish_enqueue import enqueue_publish, enqueue_topic_publish +from services.study.publish_projection import KIND_TOPIC from services.study.subject_note_rag import ( SubjectNoteContext, gather_subject_note_context, @@ -467,6 +469,9 @@ async def create_study_topic( session.add(topic) try: await session.flush() + # 발행 outbox 적재(같은 tx, flag off 면 no-op) — 신규 주제 발행. S-1. + if settings.study_publish_enabled: + await enqueue_topic_publish(session, topic) await session.commit() except IntegrityError: await session.rollback() @@ -696,6 +701,10 @@ async def update_study_topic( topic.focused_at = datetime.now(timezone.utc) if body.focused else None topic.updated_at = datetime.now(timezone.utc) + # 발행 재투영(같은 tx) — 주제 메타 갱신 반영. payload(name·exam_round_size) 무변경(focused 등) + # 은 워커 (payload_hash, deleted) 디둡이 rev 안 올리고 흡수 = churn 없음. S-1. + if settings.study_publish_enabled: + await enqueue_topic_publish(session, topic) try: await session.commit() except IntegrityError: @@ -771,6 +780,9 @@ async def delete_study_topic( ) topic.deleted_at = datetime.now(timezone.utc) + # 발행 tombstone(같은 tx) — 삭제는 feed 1급 이벤트(raw DELETE 금지·워커 경유). S-1. + if settings.study_publish_enabled: + await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=None, deleted=True) await session.commit() diff --git a/app/services/study/publish_enqueue.py b/app/services/study/publish_enqueue.py index 063bd06..0857b9a 100644 --- a/app/services/study/publish_enqueue.py +++ b/app/services/study/publish_enqueue.py @@ -19,13 +19,16 @@ from sqlalchemy.ext.asyncio import AsyncSession from models.published import PublishOutbox from models.study_question import StudyQuestion +from models.study_topic import StudyTopic from services.study.publish_projection import ( KIND_EXPLANATION, KIND_QUESTION, + KIND_TOPIC, SCHEMA_VERSION, payload_hash, project_explanation, project_question, + project_topic, ) @@ -75,3 +78,27 @@ async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0 for q in rows: await enqueue_question_publish(session, q) return len(rows) + + +async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None: + """주제 메타를 outbox 적재(S-1). caller commit. 저작 create/update 결선 + 백필 공용.""" + await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=project_topic(topic)) + + +async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: + """active(미삭제) 주제를 id>after_id 부터 bounded 로 outbox 적재(S-1 초기 백필). + + 반환 = enqueue 한 주제 수(0 이면 끝). 큰 셋은 마지막 id 로 페이지 반복. caller commit. + 멱등 = 발행 워커의 (payload_hash, deleted) 디둡이 no-op 재투영 흡수(중복 enqueue 무해). + """ + rows = ( + await session.execute( + select(StudyTopic) + .where(StudyTopic.deleted_at.is_(None), StudyTopic.id > after_id) + .order_by(StudyTopic.id.asc()) + .limit(limit) + ) + ).scalars().all() + for t in rows: + await enqueue_topic_publish(session, t) + return len(rows) diff --git a/app/services/study/publish_projection.py b/app/services/study/publish_projection.py index 9b7fbce..a6205ae 100644 --- a/app/services/study/publish_projection.py +++ b/app/services/study/publish_projection.py @@ -19,6 +19,7 @@ SCHEMA_VERSION = 1 KIND_QUESTION = "study_question" KIND_EXPLANATION = "study_explanation" +KIND_TOPIC = "study_topic" def payload_hash(payload: dict[str, Any]) -> str: @@ -57,3 +58,18 @@ def project_explanation(q: Any) -> dict[str, Any] | None: "model": getattr(q, "ai_explanation_model", None), "generated_at": gen.isoformat() if gen else None, } + + +def project_topic(t: Any) -> dict[str, Any]: + """study_topic → 발행 payload (S-1, plan study-viewer-port). + + topic 메타만 신규 발행 — viewer 가 주제 단위 퀴즈를 만들 최소 정보. + 회차 목록은 발행 안 함 = viewer 가 pub_content(study_question) 의 exam_name/exam_round 로 + 파생(추가 발행 불요, plan S-1 결정). topic_id 는 project_question 의 topic_id(=study_topic_id) + 와 동일 DS 식별자라 viewer 가 문항→주제 상관에 사용(pub_id 는 opaque 라 상관 키 아님). + """ + return { + "topic_id": t.id, + "name": t.name, + "exam_round_size": t.exam_round_size, + } diff --git a/scripts/backfill_publish_topics.py b/scripts/backfill_publish_topics.py new file mode 100644 index 0000000..171d088 --- /dev/null +++ b/scripts/backfill_publish_topics.py @@ -0,0 +1,69 @@ +"""S-1 초기 백필 — 기존 active study_topics 를 발행 outbox 에 1회 적재. + +publish_outbox 에만 적재한다(멱등: 발행 워커의 (payload_hash, deleted) 디둡이 +중복 enqueue 를 no-op 으로 흡수). study_publish_enabled=True 일 때 발행 워커가 +1분 주기로 drain → published 에 rev 부여 → viewer pull-sync. + +주제 수는 개인 학습툴이라 소량 — bounded page 사실상 1페이지지만 PAGE 도달 시 +overflow 가드로 페이징 누락을 경보(silent truncation 금지). + +실행 (GPU 서버): + docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_topics.py + # dry-run(적재 없이 카운트만): + docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_topics.py --dry-run +""" + +import argparse +import asyncio +import os +import sys + +# fastapi 컨테이너 WORKDIR=/app — `from models...` import 가능하게 path 추가. +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from sqlalchemy import func, select + +from core.config import settings +from core.database import async_session +from models.study_topic import StudyTopic +from services.study.publish_enqueue import backfill_publish_topics + +# 개인 학습툴 주제 수 대비 넉넉. 도달 시 overflow 가드가 경보. +PAGE = 5000 + + +async def run(dry_run: bool) -> None: + async with async_session() as session: + active = ( + await session.execute( + select(func.count()) + .select_from(StudyTopic) + .where(StudyTopic.deleted_at.is_(None)) + ) + ).scalar() or 0 + + print(f"[info] study_publish_enabled={settings.study_publish_enabled} " + f"(False 면 적재는 되나 워커가 drain 안 함)") + print(f"[info] active 주제 {active}건") + if dry_run: + print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.") + return + + async with async_session() as session: + n = await backfill_publish_topics(session, after_id=0, limit=PAGE) + await session.commit() + + print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") + if n >= PAGE: + print(f"[warn] PAGE({PAGE}) 도달 — 주제가 더 있을 수 있음. after_id 페이징 추가 필요.") + + +def main() -> None: + parser = argparse.ArgumentParser(description="S-1 pub_topics 초기 백필") + parser.add_argument("--dry-run", action="store_true", default=False) + args = parser.parse_args() + asyncio.run(run(args.dry_run)) + + +if __name__ == "__main__": + main()