feat(publish): S-1 pub_topics 발행 — projection+저작훅+백필 (study→viewer) #49
@@ -47,6 +47,8 @@ from models.eid_study_weakness import EidStudyWeakness
|
||||
from models.eid_review_set_draft import EidReviewSetDraft
|
||||
from models.user import User
|
||||
from services.search.llm_gate import Priority, acquire_mlx_gate
|
||||
from services.study.publish_enqueue import enqueue_publish, enqueue_topic_publish
|
||||
from services.study.publish_projection import KIND_TOPIC
|
||||
from services.study.subject_note_rag import (
|
||||
SubjectNoteContext,
|
||||
gather_subject_note_context,
|
||||
@@ -467,6 +469,9 @@ async def create_study_topic(
|
||||
session.add(topic)
|
||||
try:
|
||||
await session.flush()
|
||||
# 발행 outbox 적재(같은 tx, flag off 면 no-op) — 신규 주제 발행. S-1.
|
||||
if settings.study_publish_enabled:
|
||||
await enqueue_topic_publish(session, topic)
|
||||
await session.commit()
|
||||
except IntegrityError:
|
||||
await session.rollback()
|
||||
@@ -696,6 +701,10 @@ async def update_study_topic(
|
||||
topic.focused_at = datetime.now(timezone.utc) if body.focused else None
|
||||
|
||||
topic.updated_at = datetime.now(timezone.utc)
|
||||
# 발행 재투영(같은 tx) — 주제 메타 갱신 반영. payload(name·exam_round_size) 무변경(focused 등)
|
||||
# 은 워커 (payload_hash, deleted) 디둡이 rev 안 올리고 흡수 = churn 없음. S-1.
|
||||
if settings.study_publish_enabled:
|
||||
await enqueue_topic_publish(session, topic)
|
||||
try:
|
||||
await session.commit()
|
||||
except IntegrityError:
|
||||
@@ -771,6 +780,9 @@ async def delete_study_topic(
|
||||
)
|
||||
|
||||
topic.deleted_at = datetime.now(timezone.utc)
|
||||
# 발행 tombstone(같은 tx) — 삭제는 feed 1급 이벤트(raw DELETE 금지·워커 경유). S-1.
|
||||
if settings.study_publish_enabled:
|
||||
await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=None, deleted=True)
|
||||
await session.commit()
|
||||
|
||||
|
||||
|
||||
@@ -19,13 +19,16 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from models.published import PublishOutbox
|
||||
from models.study_question import StudyQuestion
|
||||
from models.study_topic import StudyTopic
|
||||
from services.study.publish_projection import (
|
||||
KIND_EXPLANATION,
|
||||
KIND_QUESTION,
|
||||
KIND_TOPIC,
|
||||
SCHEMA_VERSION,
|
||||
payload_hash,
|
||||
project_explanation,
|
||||
project_question,
|
||||
project_topic,
|
||||
)
|
||||
|
||||
|
||||
@@ -75,3 +78,27 @@ async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0
|
||||
for q in rows:
|
||||
await enqueue_question_publish(session, q)
|
||||
return len(rows)
|
||||
|
||||
|
||||
async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None:
|
||||
"""주제 메타를 outbox 적재(S-1). caller commit. 저작 create/update 결선 + 백필 공용."""
|
||||
await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=project_topic(topic))
|
||||
|
||||
|
||||
async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
|
||||
"""active(미삭제) 주제를 id>after_id 부터 bounded 로 outbox 적재(S-1 초기 백필).
|
||||
|
||||
반환 = enqueue 한 주제 수(0 이면 끝). 큰 셋은 마지막 id 로 페이지 반복. caller commit.
|
||||
멱등 = 발행 워커의 (payload_hash, deleted) 디둡이 no-op 재투영 흡수(중복 enqueue 무해).
|
||||
"""
|
||||
rows = (
|
||||
await session.execute(
|
||||
select(StudyTopic)
|
||||
.where(StudyTopic.deleted_at.is_(None), StudyTopic.id > after_id)
|
||||
.order_by(StudyTopic.id.asc())
|
||||
.limit(limit)
|
||||
)
|
||||
).scalars().all()
|
||||
for t in rows:
|
||||
await enqueue_topic_publish(session, t)
|
||||
return len(rows)
|
||||
|
||||
@@ -19,6 +19,7 @@ SCHEMA_VERSION = 1
|
||||
|
||||
KIND_QUESTION = "study_question"
|
||||
KIND_EXPLANATION = "study_explanation"
|
||||
KIND_TOPIC = "study_topic"
|
||||
|
||||
|
||||
def payload_hash(payload: dict[str, Any]) -> str:
|
||||
@@ -57,3 +58,18 @@ def project_explanation(q: Any) -> dict[str, Any] | None:
|
||||
"model": getattr(q, "ai_explanation_model", None),
|
||||
"generated_at": gen.isoformat() if gen else None,
|
||||
}
|
||||
|
||||
|
||||
def project_topic(t: Any) -> dict[str, Any]:
|
||||
"""study_topic → 발행 payload (S-1, plan study-viewer-port).
|
||||
|
||||
topic 메타만 신규 발행 — viewer 가 주제 단위 퀴즈를 만들 최소 정보.
|
||||
회차 목록은 발행 안 함 = viewer 가 pub_content(study_question) 의 exam_name/exam_round 로
|
||||
파생(추가 발행 불요, plan S-1 결정). topic_id 는 project_question 의 topic_id(=study_topic_id)
|
||||
와 동일 DS 식별자라 viewer 가 문항→주제 상관에 사용(pub_id 는 opaque 라 상관 키 아님).
|
||||
"""
|
||||
return {
|
||||
"topic_id": t.id,
|
||||
"name": t.name,
|
||||
"exam_round_size": t.exam_round_size,
|
||||
}
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
"""S-1 초기 백필 — 기존 active study_topics 를 발행 outbox 에 1회 적재.
|
||||
|
||||
publish_outbox 에만 적재한다(멱등: 발행 워커의 (payload_hash, deleted) 디둡이
|
||||
중복 enqueue 를 no-op 으로 흡수). study_publish_enabled=True 일 때 발행 워커가
|
||||
1분 주기로 drain → published 에 rev 부여 → viewer pull-sync.
|
||||
|
||||
주제 수는 개인 학습툴이라 소량 — bounded page 사실상 1페이지지만 PAGE 도달 시
|
||||
overflow 가드로 페이징 누락을 경보(silent truncation 금지).
|
||||
|
||||
실행 (GPU 서버):
|
||||
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_topics.py
|
||||
# dry-run(적재 없이 카운트만):
|
||||
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_topics.py --dry-run
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
# fastapi 컨테이너 WORKDIR=/app — `from models...` import 가능하게 path 추가.
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
from sqlalchemy import func, select
|
||||
|
||||
from core.config import settings
|
||||
from core.database import async_session
|
||||
from models.study_topic import StudyTopic
|
||||
from services.study.publish_enqueue import backfill_publish_topics
|
||||
|
||||
# 개인 학습툴 주제 수 대비 넉넉. 도달 시 overflow 가드가 경보.
|
||||
PAGE = 5000
|
||||
|
||||
|
||||
async def run(dry_run: bool) -> None:
|
||||
async with async_session() as session:
|
||||
active = (
|
||||
await session.execute(
|
||||
select(func.count())
|
||||
.select_from(StudyTopic)
|
||||
.where(StudyTopic.deleted_at.is_(None))
|
||||
)
|
||||
).scalar() or 0
|
||||
|
||||
print(f"[info] study_publish_enabled={settings.study_publish_enabled} "
|
||||
f"(False 면 적재는 되나 워커가 drain 안 함)")
|
||||
print(f"[info] active 주제 {active}건")
|
||||
if dry_run:
|
||||
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
|
||||
return
|
||||
|
||||
async with async_session() as session:
|
||||
n = await backfill_publish_topics(session, after_id=0, limit=PAGE)
|
||||
await session.commit()
|
||||
|
||||
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
|
||||
if n >= PAGE:
|
||||
print(f"[warn] PAGE({PAGE}) 도달 — 주제가 더 있을 수 있음. after_id 페이징 추가 필요.")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="S-1 pub_topics 초기 백필")
|
||||
parser.add_argument("--dry-run", action="store_true", default=False)
|
||||
args = parser.parse_args()
|
||||
asyncio.run(run(args.dry_run))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user