feat(publish): P0-1 발행 레이어 스키마+projection+워커 (study→viewer)

docsrv-viewer-publish 발행 인프라 — 뷰어가 read API로 당길 published projection
+ transactional outbox + 단일 라이터 발행 워커. study_publish_enabled=false 기본
(저자/4-A enqueue 결선 P0-1b 전까지 inert). read-only 경로·additive·소프트락 무관.

- migrations 365~370: published(kind·pub_id opaque+stable·rev·payload_hash·deleted·schema_version)
  + UNIQUE(kind,pub_id)/(kind,source_id) + rev idx + publish_outbox + 미처리 부분 idx
- models/published.py: Published·PublishOutbox (관계 없음=mapper 안전)
- services/study/publish_projection.py: project_question/explanation + payload_hash(정렬 sha256)
- services/study/publish_enqueue.py: enqueue_publish/question + backfill(bounded page)
- workers/study_publish_worker.py: outbox drain → pg_advisory_xact_lock 단일라이터 rev 부여
  + (payload_hash,deleted) 디둡 + 배치내 중복 flush
- config: study_publish_enabled(기본 false) · main: publish_outbox_consumer 1m max_instances=1

plan: plans/2026-06-23-study-to-viewer-slice1-plan.html (P0-1, 3R 적대리뷰 통과)
검증: py_compile·payload_hash 단위·마이그 1문/파일·매퍼 standalone. 전체 매퍼/마이그 apply=배포 게이트.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-23 20:38:19 +09:00
parent f66b6e2f17
commit 642c1b7c36
12 changed files with 372 additions and 0 deletions
+4
View File
@@ -185,6 +185,8 @@ class Settings(BaseModel):
study_explanation_enabled: bool = True
# 공부 암기노트 Phase 1: card_extract 폴러/consumer 게이트. owner 분리 시 false 로.
study_card_extract_enabled: bool = True
# 발행 레이어(docsrv-viewer-publish): publish_outbox 워커 게이트. 저자/4-A enqueue 결선(P0-1b) 후 true.
study_publish_enabled: bool = False
# internal endpoint Bearer token (Mac mini derived-worker 호출용)
internal_worker_token: str = ""
@@ -196,6 +198,7 @@ def load_settings() -> Settings:
database_url = os.getenv("DATABASE_URL", "")
study_explanation_enabled = os.getenv("STUDY_EXPLANATION_ENABLED", "true").lower() in ("1", "true", "yes")
study_card_extract_enabled = os.getenv("STUDY_CARD_EXTRACT_ENABLED", "true").lower() in ("1", "true", "yes")
study_publish_enabled = os.getenv("STUDY_PUBLISH_ENABLED", "false").lower() in ("1", "true", "yes")
internal_worker_token = os.getenv("INTERNAL_WORKER_TOKEN", "")
jwt_secret = os.getenv("JWT_SECRET", "")
totp_secret = os.getenv("TOTP_SECRET", "")
@@ -329,6 +332,7 @@ def load_settings() -> Settings:
upload=upload_cfg,
study_explanation_enabled=study_explanation_enabled,
study_card_extract_enabled=study_card_extract_enabled,
study_publish_enabled=study_publish_enabled,
internal_worker_token=internal_worker_token,
pipeline_held_stages=pipeline_held_stages,
mlx_gate_concurrency=mlx_gate_concurrency,
+4
View File
@@ -70,6 +70,7 @@ async def lifespan(app: FastAPI):
from workers.study_session_queue_consumer import consume_study_session_queue
from workers.study_memo_card_jobs_consumer import consume_study_memo_card_queue
from workers.study_card_enqueue import run as study_card_enqueue_run
from workers.study_publish_worker import consume_publish_outbox
from workers.study_reminder import run as study_reminder_run
from workers.study_weakness import run as study_weakness_run
from workers.study_question_embed_worker import (
@@ -140,6 +141,9 @@ async def lifespan(app: FastAPI):
# 별 테이블/별 consumer 로 기존 study queue 와 격리. settings.study_card_extract_enabled 게이트.
scheduler.add_job(consume_study_memo_card_queue, "interval", minutes=1, id="study_memo_card_consumer")
scheduler.add_job(study_card_enqueue_run, "interval", minutes=1, id="study_card_enqueue")
# 발행 레이어(docsrv-viewer-publish): publish_outbox drain → published rev 부여.
# study_publish_enabled=false(기본) 면 worker 내부 no-op. 단일 라이터(pg_advisory_xact_lock) max_instances=1.
scheduler.add_job(consume_publish_outbox, "interval", minutes=1, id="publish_outbox_consumer", max_instances=1)
# PR-B 레거시 tier 백필 — 30분 주기로 호출되지만 KST 00:00~06:00 시간대만 실제 enqueue.
# safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화.
scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill")
+60
View File
@@ -0,0 +1,60 @@
"""발행 레이어 ORM (docsrv-viewer-publish) — published projection + publish_outbox.
관계(relationship) 없음 = 독립 테이블, configure_mappers 무영향. 마이그 365~370.
published = 뷰어가 read API(P0-2) 당기는 render-ready projection(kind-discriminated).
publish_outbox = 저작/4-A 트랜잭션이 같은 tx에서 INSERT, 발행 워커가 drain 하며 rev 부여.
불변식(plan study-to-viewer-slice1):
pub_id opaque+stable = dedup키 = progress키 / rev = 워커 커밋순 gapless(pg_advisory_lock 단일 라이터)
/ (payload_hash, deleted) 디둡 / 삭제 = tombstone(deleted=true) / schema_version = 엔벨로프 버전.
"""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import BigInteger, Boolean, DateTime, SmallInteger, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class Published(Base):
__tablename__ = "published"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
kind: Mapped[str] = mapped_column(String(40), nullable=False)
source_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
pub_id: Mapped[str] = mapped_column(Text, nullable=False)
payload: Mapped[dict] = mapped_column(JSONB, nullable=False)
payload_hash: Mapped[str] = mapped_column(Text, nullable=False)
schema_version: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=1)
rev: Mapped[int] = mapped_column(BigInteger, nullable=False)
deleted: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
# UNIQUE(kind, pub_id)=mig366, UNIQUE(kind, source_id)=mig367, idx(rev)=mig368.
class PublishOutbox(Base):
__tablename__ = "publish_outbox"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
kind: Mapped[str] = mapped_column(String(40), nullable=False)
source_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
payload: Mapped[dict] = mapped_column(JSONB, nullable=False)
payload_hash: Mapped[str] = mapped_column(Text, nullable=False)
schema_version: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=1)
deleted: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# 미처리 부분 인덱스 idx(id) WHERE processed_at IS NULL = mig370.
+77
View File
@@ -0,0 +1,77 @@
"""발행 outbox enqueue + 초기 백필 (docsrv-viewer-publish).
enqueue_publish: 저작/4-A 트랜잭션이 같은 session(=같은 Postgres tx)에서 호출 caller commit
(P0-1 규율: 콘텐츠 변경과 outbox INSERT 원자성, dual-write 회피). payload/hash 스냅샷.
enqueue_question_publish: 문항 + (ready면)해설을 함께 적재. 저작 쓰기/4-A 완료/백필 공용.
backfill_publish_questions: 기존 active 문항을 bounded 1 outbox 적재(초기 백필, P2-1 bounded page).
멱등 = 발행 워커의 (payload_hash, deleted) 디둡이 no-op 재투영 흡수(중복 enqueue 무해).
주의: 저작 엔드포인트(study_questions create/update)·4-A 워커에서의 enqueue 결선은 P0-1b
(기존 hot 파일 수정이라 increment). 모듈은 호출 라이브러리 + 수동/백필 진입점.
"""
from __future__ import annotations
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.published import PublishOutbox
from models.study_question import StudyQuestion
from services.study.publish_projection import (
KIND_EXPLANATION,
KIND_QUESTION,
SCHEMA_VERSION,
payload_hash,
project_explanation,
project_question,
)
async def enqueue_publish(
session: AsyncSession,
*,
kind: str,
source_id: int,
payload: dict[str, Any] | None,
deleted: bool = False,
) -> None:
"""outbox 1행 INSERT. caller 가 commit (저자 tx 동봉). deleted=True 면 tombstone(payload={})."""
body: dict[str, Any] = payload if payload is not None else {}
session.add(
PublishOutbox(
kind=kind,
source_id=source_id,
payload=body,
payload_hash=payload_hash(body),
schema_version=SCHEMA_VERSION,
deleted=deleted,
)
)
async def enqueue_question_publish(session: AsyncSession, q: Any) -> None:
"""문항 + (ready면)해설을 outbox 적재. caller commit."""
await enqueue_publish(session, kind=KIND_QUESTION, source_id=q.id, payload=project_question(q))
expl = project_explanation(q)
if expl is not None:
await enqueue_publish(session, kind=KIND_EXPLANATION, source_id=q.id, payload=expl)
async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
"""active(미삭제) 문항을 id>after_id 부터 bounded 로 outbox 적재.
반환 = enqueue 문항 (0 이면 ). 셋은 마지막 id 페이지 반복. caller commit.
"""
rows = (
await session.execute(
select(StudyQuestion)
.where(StudyQuestion.deleted_at.is_(None), StudyQuestion.id > after_id)
.order_by(StudyQuestion.id.asc())
.limit(limit)
)
).scalars().all()
for q in rows:
await enqueue_question_publish(session, q)
return len(rows)
+59
View File
@@ -0,0 +1,59 @@
"""발행 projection — 소스 행을 render-ready payload + 안정 해시로 변환 (순수 함수).
뷰어가 보는 '단일 진실' payload 까지 (DS 내부 실험 스키마는 계약 격리).
kind projector. payload_hash = 정렬된 JSON sha256 = (payload_hash, deleted) 디둡 .
주의(plan study-to-viewer-slice1 r2): 과목/시험메타를 per-question payload 인라인
bulk subject rename N행 churn. 정규화(과목= kind subject ref) churn 최적화 후속(P0-1b),
읽기 정합엔 무영향. 지금은 인라인(상관관계 단순)으로 두고 후속 PR 에서 분리.
SCHEMA_VERSION = 엔벨로프 버전. payload 모양 진화 bump + 뷰어 range 수용(P0-2).
"""
from __future__ import annotations
import hashlib
import json
from typing import Any
SCHEMA_VERSION = 1
KIND_QUESTION = "study_question"
KIND_EXPLANATION = "study_explanation"
def payload_hash(payload: dict[str, Any]) -> str:
"""정렬 JSON 의 sha256 — (payload_hash, deleted) 디둡 키. 키 순서/공백 비의존."""
canonical = json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
def project_question(q: Any) -> dict[str, Any]:
"""study_question → 발행 payload. 정답 포함(개인 학습툴, plan Q2). 이미지는 ref 만(P0-4, 후속)."""
return {
"topic_id": q.study_topic_id,
"question_text": q.question_text,
"choices": [q.choice_1, q.choice_2, q.choice_3, q.choice_4],
"correct_choice": q.correct_choice,
"subject": q.subject,
"scope": q.scope,
"exam_name": q.exam_name,
"exam_round": q.exam_round,
"exam_question_number": q.exam_question_number,
"explanation": q.explanation, # 수동 해설(있으면). AI 해설은 별 kind.
}
def project_explanation(q: Any) -> dict[str, Any] | None:
"""study_question 의 AI 해설 → 별 발행 kind. ready 일 때만(없으면 None=발행 안 함).
재조우 표시용 선발행. 신규 오답은 4-A 워커가 ~90s ready재발행(P2-3 결선, P0-1b).
"""
if getattr(q, "ai_explanation_status", None) != "ready" or not getattr(q, "ai_explanation", None):
return None
gen = getattr(q, "ai_explanation_generated_at", None)
return {
"question_source_id": q.id,
"explanation_md": q.ai_explanation,
"model": getattr(q, "ai_explanation_model", None),
"generated_at": gen.isoformat() if gen else None,
}
+120
View File
@@ -0,0 +1,120 @@
"""발행 워커 — publish_outbox drain → published 에 rev 부여 (docsrv-viewer-publish).
APScheduler 1(max_instances=1). pg_advisory_xact_lock 단일 라이터 rev 커밋순 gapless
(인플라이트 차단: bigserial seq 폴링이 아니라 outbox id + 단일 라이터 rev 부여).
outbox id(커밋순) 순으로 처리, (kind, source_id) published upsert:
- 기존 행과 (payload_hash, deleted) 동일 no-op(디둡, rev 올림) + processed 마킹
- pub_id 재사용(기존)|신규 uuid, rev = MAX(rev)+1, payload/hash/deleted 갱신
tombstone(deleted=True) 디둡 복합키라 삼켜짐. 배치 단일 트랜잭션.
배치 같은 (kind, source_id) 오면 flush 직전 반영을 다음 select 보게 (최신 ).
study_publish_enabled=False(기본) no-op 저자/4-A enqueue 결선(P0-1b) 전까지 inert.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from sqlalchemy import func, select, text
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.published import Published, PublishOutbox
logger = setup_logger("study_publish_worker")
BATCH_SIZE = 500
# pg_advisory_xact_lock 전역 단일 라이터 키(발행 워커 전용 임의 상수, 타 advisory 락과 비충돌).
ADVISORY_LOCK_KEY = 838201
async def consume_publish_outbox() -> None:
"""APScheduler 진입점. 미처리 outbox 를 rev 부여하며 published 로 반영."""
if not settings.study_publish_enabled:
logger.debug("study_publish 비활성 (study_publish_enabled=false)")
return
async with async_session() as session:
try:
# 1) 전역 단일 라이터 락(트랜잭션 스코프 — commit/rollback 시 자동 해제).
await session.execute(
text("SELECT pg_advisory_xact_lock(:k)").bindparams(k=ADVISORY_LOCK_KEY)
)
# 2) 현재 최대 rev.
max_rev = int(
(await session.execute(select(func.coalesce(func.max(Published.rev), 0)))).scalar() or 0
)
# 3) 미처리 outbox 를 커밋순(id)으로.
rows = (
await session.execute(
select(PublishOutbox)
.where(PublishOutbox.processed_at.is_(None))
.order_by(PublishOutbox.id.asc())
.limit(BATCH_SIZE)
)
).scalars().all()
if not rows:
return
now = datetime.now(timezone.utc)
published_count = 0
for ob in rows:
existing = (
await session.execute(
select(Published).where(
Published.kind == ob.kind,
Published.source_id == ob.source_id,
)
)
).scalar_one_or_none()
# (payload_hash, deleted) 디둡 — no-op 재투영은 rev 안 올림.
if (
existing is not None
and existing.payload_hash == ob.payload_hash
and existing.deleted == ob.deleted
):
ob.processed_at = now
continue
max_rev += 1
if existing is None:
session.add(
Published(
kind=ob.kind,
source_id=ob.source_id,
pub_id=uuid.uuid4().hex,
payload=ob.payload,
payload_hash=ob.payload_hash,
schema_version=ob.schema_version,
rev=max_rev,
deleted=ob.deleted,
created_at=now,
updated_at=now,
)
)
else:
existing.payload = ob.payload
existing.payload_hash = ob.payload_hash
existing.schema_version = ob.schema_version
existing.deleted = ob.deleted
existing.rev = max_rev
existing.updated_at = now
ob.processed_at = now
# 배치 내 동일 (kind, source_id) 후속 행이 직전 반영을 보도록 flush(최신 승).
await session.flush()
published_count += 1
await session.commit()
logger.info(
"publish_outbox_drained scanned=%s published=%s max_rev=%s",
len(rows),
published_count,
max_rev,
)
except Exception as e:
await session.rollback()
logger.exception("publish_outbox_drain_failed: %s", e)