feat(publish): S-1 pub_topics 발행 — projection+저작훅+백필 (study→viewer) #49

Merged
hyungi merged 1 commits from feat/study-port-s1-pubtopics into main 2026-06-25 14:39:29 +09:00
4 changed files with 124 additions and 0 deletions
+12
View File
@@ -47,6 +47,8 @@ from models.eid_study_weakness import EidStudyWeakness
from models.eid_review_set_draft import EidReviewSetDraft
from models.user import User
from services.search.llm_gate import Priority, acquire_mlx_gate
from services.study.publish_enqueue import enqueue_publish, enqueue_topic_publish
from services.study.publish_projection import KIND_TOPIC
from services.study.subject_note_rag import (
SubjectNoteContext,
gather_subject_note_context,
@@ -467,6 +469,9 @@ async def create_study_topic(
session.add(topic)
try:
await session.flush()
# 발행 outbox 적재(같은 tx, flag off 면 no-op) — 신규 주제 발행. S-1.
if settings.study_publish_enabled:
await enqueue_topic_publish(session, topic)
await session.commit()
except IntegrityError:
await session.rollback()
@@ -696,6 +701,10 @@ async def update_study_topic(
topic.focused_at = datetime.now(timezone.utc) if body.focused else None
topic.updated_at = datetime.now(timezone.utc)
# 발행 재투영(같은 tx) — 주제 메타 갱신 반영. payload(name·exam_round_size) 무변경(focused 등)
# 은 워커 (payload_hash, deleted) 디둡이 rev 안 올리고 흡수 = churn 없음. S-1.
if settings.study_publish_enabled:
await enqueue_topic_publish(session, topic)
try:
await session.commit()
except IntegrityError:
@@ -771,6 +780,9 @@ async def delete_study_topic(
)
topic.deleted_at = datetime.now(timezone.utc)
# 발행 tombstone(같은 tx) — 삭제는 feed 1급 이벤트(raw DELETE 금지·워커 경유). S-1.
if settings.study_publish_enabled:
await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=None, deleted=True)
await session.commit()
+27
View File
@@ -19,13 +19,16 @@ from sqlalchemy.ext.asyncio import AsyncSession
from models.published import PublishOutbox
from models.study_question import StudyQuestion
from models.study_topic import StudyTopic
from services.study.publish_projection import (
KIND_EXPLANATION,
KIND_QUESTION,
KIND_TOPIC,
SCHEMA_VERSION,
payload_hash,
project_explanation,
project_question,
project_topic,
)
@@ -75,3 +78,27 @@ async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0
for q in rows:
await enqueue_question_publish(session, q)
return len(rows)
async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None:
"""주제 메타를 outbox 적재(S-1). caller commit. 저작 create/update 결선 + 백필 공용."""
await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=project_topic(topic))
async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
"""active(미삭제) 주제를 id>after_id 부터 bounded 로 outbox 적재(S-1 초기 백필).
반환 = enqueue 주제 (0 이면 ). 셋은 마지막 id 페이지 반복. caller commit.
멱등 = 발행 워커의 (payload_hash, deleted) 디둡이 no-op 재투영 흡수(중복 enqueue 무해).
"""
rows = (
await session.execute(
select(StudyTopic)
.where(StudyTopic.deleted_at.is_(None), StudyTopic.id > after_id)
.order_by(StudyTopic.id.asc())
.limit(limit)
)
).scalars().all()
for t in rows:
await enqueue_topic_publish(session, t)
return len(rows)
+16
View File
@@ -19,6 +19,7 @@ SCHEMA_VERSION = 1
KIND_QUESTION = "study_question"
KIND_EXPLANATION = "study_explanation"
KIND_TOPIC = "study_topic"
def payload_hash(payload: dict[str, Any]) -> str:
@@ -57,3 +58,18 @@ def project_explanation(q: Any) -> dict[str, Any] | None:
"model": getattr(q, "ai_explanation_model", None),
"generated_at": gen.isoformat() if gen else None,
}
def project_topic(t: Any) -> dict[str, Any]:
"""study_topic → 발행 payload (S-1, plan study-viewer-port).
topic 메타만 신규 발행 viewer 주제 단위 퀴즈를 만들 최소 정보.
회차 목록은 발행 = viewer pub_content(study_question) exam_name/exam_round
파생(추가 발행 불요, plan S-1 결정). topic_id project_question topic_id(=study_topic_id)
동일 DS 식별자라 viewer 문항주제 상관에 사용(pub_id opaque 상관 아님).
"""
return {
"topic_id": t.id,
"name": t.name,
"exam_round_size": t.exam_round_size,
}
+69
View File
@@ -0,0 +1,69 @@
"""S-1 초기 백필 — 기존 active study_topics 를 발행 outbox 에 1회 적재.
publish_outbox 에만 적재한다(멱등: 발행 워커의 (payload_hash, deleted) 디둡이
중복 enqueue no-op 으로 흡수). study_publish_enabled=True 발행 워커가
1 주기로 drain published rev 부여 viewer pull-sync.
주제 수는 개인 학습툴이라 소량 bounded page 사실상 1페이지지만 PAGE 도달
overflow 가드로 페이징 누락을 경보(silent truncation 금지).
실행 (GPU 서버):
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_topics.py
# dry-run(적재 없이 카운트만):
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_topics.py --dry-run
"""
import argparse
import asyncio
import os
import sys
# fastapi 컨테이너 WORKDIR=/app — `from models...` import 가능하게 path 추가.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from sqlalchemy import func, select
from core.config import settings
from core.database import async_session
from models.study_topic import StudyTopic
from services.study.publish_enqueue import backfill_publish_topics
# 개인 학습툴 주제 수 대비 넉넉. 도달 시 overflow 가드가 경보.
PAGE = 5000
async def run(dry_run: bool) -> None:
async with async_session() as session:
active = (
await session.execute(
select(func.count())
.select_from(StudyTopic)
.where(StudyTopic.deleted_at.is_(None))
)
).scalar() or 0
print(f"[info] study_publish_enabled={settings.study_publish_enabled} "
f"(False 면 적재는 되나 워커가 drain 안 함)")
print(f"[info] active 주제 {active}")
if dry_run:
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
return
async with async_session() as session:
n = await backfill_publish_topics(session, after_id=0, limit=PAGE)
await session.commit()
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
if n >= PAGE:
print(f"[warn] PAGE({PAGE}) 도달 — 주제가 더 있을 수 있음. after_id 페이징 추가 필요.")
def main() -> None:
parser = argparse.ArgumentParser(description="S-1 pub_topics 초기 백필")
parser.add_argument("--dry-run", action="store_true", default=False)
args = parser.parse_args()
asyncio.run(run(args.dry_run))
if __name__ == "__main__":
main()