642c1b7c36
docsrv-viewer-publish 발행 인프라 — 뷰어가 read API로 당길 published projection + transactional outbox + 단일 라이터 발행 워커. study_publish_enabled=false 기본 (저자/4-A enqueue 결선 P0-1b 전까지 inert). read-only 경로·additive·소프트락 무관. - migrations 365~370: published(kind·pub_id opaque+stable·rev·payload_hash·deleted·schema_version) + UNIQUE(kind,pub_id)/(kind,source_id) + rev idx + publish_outbox + 미처리 부분 idx - models/published.py: Published·PublishOutbox (관계 없음=mapper 안전) - services/study/publish_projection.py: project_question/explanation + payload_hash(정렬 sha256) - services/study/publish_enqueue.py: enqueue_publish/question + backfill(bounded page) - workers/study_publish_worker.py: outbox drain → pg_advisory_xact_lock 단일라이터 rev 부여 + (payload_hash,deleted) 디둡 + 배치내 중복 flush - config: study_publish_enabled(기본 false) · main: publish_outbox_consumer 1m max_instances=1 plan: plans/2026-06-23-study-to-viewer-slice1-plan.html (P0-1, 3R 적대리뷰 통과) 검증: py_compile·payload_hash 단위·마이그 1문/파일·매퍼 standalone. 전체 매퍼/마이그 apply=배포 게이트. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
121 lines
4.9 KiB
Python
121 lines
4.9 KiB
Python
"""발행 워커 — publish_outbox drain → published 에 rev 부여 (docsrv-viewer-publish).
|
|
|
|
APScheduler 1분(max_instances=1). pg_advisory_xact_lock 단일 라이터 → rev 커밋순 gapless
|
|
(인플라이트 갭 차단: bigserial seq 폴링이 아니라 outbox id 순 + 단일 라이터 rev 부여).
|
|
outbox 를 id(커밋순) 순으로 처리, (kind, source_id) 당 published upsert:
|
|
- 기존 행과 (payload_hash, deleted) 동일 → no-op(디둡, rev 안 올림) + processed 마킹
|
|
- 그 외 → pub_id 재사용(기존)|신규 uuid, rev = MAX(rev)+1, payload/hash/deleted 갱신
|
|
tombstone(deleted=True)은 디둡 복합키라 안 삼켜짐. 배치 단일 트랜잭션.
|
|
배치 내 같은 (kind, source_id) 가 두 번 오면 flush 로 직전 반영을 다음 select 가 보게 함(최신 승).
|
|
|
|
study_publish_enabled=False(기본) 면 no-op — 저자/4-A enqueue 결선(P0-1b) 전까지 inert.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from sqlalchemy import func, select, text
|
|
|
|
from core.config import settings
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.published import Published, PublishOutbox
|
|
|
|
logger = setup_logger("study_publish_worker")
|
|
|
|
BATCH_SIZE = 500
|
|
# pg_advisory_xact_lock 전역 단일 라이터 키(발행 워커 전용 임의 상수, 타 advisory 락과 비충돌).
|
|
ADVISORY_LOCK_KEY = 838201
|
|
|
|
|
|
async def consume_publish_outbox() -> None:
|
|
"""APScheduler 진입점. 미처리 outbox 를 rev 부여하며 published 로 반영."""
|
|
if not settings.study_publish_enabled:
|
|
logger.debug("study_publish 비활성 (study_publish_enabled=false)")
|
|
return
|
|
|
|
async with async_session() as session:
|
|
try:
|
|
# 1) 전역 단일 라이터 락(트랜잭션 스코프 — commit/rollback 시 자동 해제).
|
|
await session.execute(
|
|
text("SELECT pg_advisory_xact_lock(:k)").bindparams(k=ADVISORY_LOCK_KEY)
|
|
)
|
|
# 2) 현재 최대 rev.
|
|
max_rev = int(
|
|
(await session.execute(select(func.coalesce(func.max(Published.rev), 0)))).scalar() or 0
|
|
)
|
|
# 3) 미처리 outbox 를 커밋순(id)으로.
|
|
rows = (
|
|
await session.execute(
|
|
select(PublishOutbox)
|
|
.where(PublishOutbox.processed_at.is_(None))
|
|
.order_by(PublishOutbox.id.asc())
|
|
.limit(BATCH_SIZE)
|
|
)
|
|
).scalars().all()
|
|
if not rows:
|
|
return
|
|
|
|
now = datetime.now(timezone.utc)
|
|
published_count = 0
|
|
for ob in rows:
|
|
existing = (
|
|
await session.execute(
|
|
select(Published).where(
|
|
Published.kind == ob.kind,
|
|
Published.source_id == ob.source_id,
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
|
|
# (payload_hash, deleted) 디둡 — no-op 재투영은 rev 안 올림.
|
|
if (
|
|
existing is not None
|
|
and existing.payload_hash == ob.payload_hash
|
|
and existing.deleted == ob.deleted
|
|
):
|
|
ob.processed_at = now
|
|
continue
|
|
|
|
max_rev += 1
|
|
if existing is None:
|
|
session.add(
|
|
Published(
|
|
kind=ob.kind,
|
|
source_id=ob.source_id,
|
|
pub_id=uuid.uuid4().hex,
|
|
payload=ob.payload,
|
|
payload_hash=ob.payload_hash,
|
|
schema_version=ob.schema_version,
|
|
rev=max_rev,
|
|
deleted=ob.deleted,
|
|
created_at=now,
|
|
updated_at=now,
|
|
)
|
|
)
|
|
else:
|
|
existing.payload = ob.payload
|
|
existing.payload_hash = ob.payload_hash
|
|
existing.schema_version = ob.schema_version
|
|
existing.deleted = ob.deleted
|
|
existing.rev = max_rev
|
|
existing.updated_at = now
|
|
|
|
ob.processed_at = now
|
|
# 배치 내 동일 (kind, source_id) 후속 행이 직전 반영을 보도록 flush(최신 승).
|
|
await session.flush()
|
|
published_count += 1
|
|
|
|
await session.commit()
|
|
logger.info(
|
|
"publish_outbox_drained scanned=%s published=%s max_rev=%s",
|
|
len(rows),
|
|
published_count,
|
|
max_rev,
|
|
)
|
|
except Exception as e:
|
|
await session.rollback()
|
|
logger.exception("publish_outbox_drain_failed: %s", e)
|