From 642c1b7c362e07e7fe29d1e372c4f6a3ed449597 Mon Sep 17 00:00:00 2001 From: hyungi Date: Tue, 23 Jun 2026 20:38:19 +0900 Subject: [PATCH] =?UTF-8?q?feat(publish):=20P0-1=20=EB=B0=9C=ED=96=89=20?= =?UTF-8?q?=EB=A0=88=EC=9D=B4=EC=96=B4=20=EC=8A=A4=ED=82=A4=EB=A7=88+proje?= =?UTF-8?q?ction+=EC=9B=8C=EC=BB=A4=20(study=E2=86=92viewer)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit docsrv-viewer-publish 발행 인프라 — 뷰어가 read API로 당길 published projection + transactional outbox + 단일 라이터 발행 워커. study_publish_enabled=false 기본 (저자/4-A enqueue 결선 P0-1b 전까지 inert). read-only 경로·additive·소프트락 무관. - migrations 365~370: published(kind·pub_id opaque+stable·rev·payload_hash·deleted·schema_version) + UNIQUE(kind,pub_id)/(kind,source_id) + rev idx + publish_outbox + 미처리 부분 idx - models/published.py: Published·PublishOutbox (관계 없음=mapper 안전) - services/study/publish_projection.py: project_question/explanation + payload_hash(정렬 sha256) - services/study/publish_enqueue.py: enqueue_publish/question + backfill(bounded page) - workers/study_publish_worker.py: outbox drain → pg_advisory_xact_lock 단일라이터 rev 부여 + (payload_hash,deleted) 디둡 + 배치내 중복 flush - config: study_publish_enabled(기본 false) · main: publish_outbox_consumer 1m max_instances=1 plan: plans/2026-06-23-study-to-viewer-slice1-plan.html (P0-1, 3R 적대리뷰 통과) 검증: py_compile·payload_hash 단위·마이그 1문/파일·매퍼 standalone. 전체 매퍼/마이그 apply=배포 게이트. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/core/config.py | 4 + app/main.py | 4 + app/models/published.py | 60 +++++++++ app/services/study/publish_enqueue.py | 77 +++++++++++ app/services/study/publish_projection.py | 59 +++++++++ app/workers/study_publish_worker.py | 120 ++++++++++++++++++ migrations/365_published.sql | 21 +++ migrations/366_published_kind_pubid_uq.sql | 3 + migrations/367_published_kind_source_uq.sql | 3 + migrations/368_published_rev_idx.sql | 3 + migrations/369_publish_outbox.sql | 15 +++ .../370_publish_outbox_unprocessed_idx.sql | 3 + 12 files changed, 372 insertions(+) create mode 100644 app/models/published.py create mode 100644 app/services/study/publish_enqueue.py create mode 100644 app/services/study/publish_projection.py create mode 100644 app/workers/study_publish_worker.py create mode 100644 migrations/365_published.sql create mode 100644 migrations/366_published_kind_pubid_uq.sql create mode 100644 migrations/367_published_kind_source_uq.sql create mode 100644 migrations/368_published_rev_idx.sql create mode 100644 migrations/369_publish_outbox.sql create mode 100644 migrations/370_publish_outbox_unprocessed_idx.sql diff --git a/app/core/config.py b/app/core/config.py index c72dc4a..cf97b32 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -185,6 +185,8 @@ class Settings(BaseModel): study_explanation_enabled: bool = True # 공부 암기노트 Phase 1: card_extract 폴러/consumer 게이트. owner 분리 시 false 로. study_card_extract_enabled: bool = True + # 발행 레이어(docsrv-viewer-publish): publish_outbox 워커 게이트. 저자/4-A enqueue 결선(P0-1b) 후 true. + study_publish_enabled: bool = False # internal endpoint Bearer token (Mac mini derived-worker 호출용) internal_worker_token: str = "" @@ -196,6 +198,7 @@ def load_settings() -> Settings: database_url = os.getenv("DATABASE_URL", "") study_explanation_enabled = os.getenv("STUDY_EXPLANATION_ENABLED", "true").lower() in ("1", "true", "yes") study_card_extract_enabled = os.getenv("STUDY_CARD_EXTRACT_ENABLED", "true").lower() in ("1", "true", "yes") + study_publish_enabled = os.getenv("STUDY_PUBLISH_ENABLED", "false").lower() in ("1", "true", "yes") internal_worker_token = os.getenv("INTERNAL_WORKER_TOKEN", "") jwt_secret = os.getenv("JWT_SECRET", "") totp_secret = os.getenv("TOTP_SECRET", "") @@ -329,6 +332,7 @@ def load_settings() -> Settings: upload=upload_cfg, study_explanation_enabled=study_explanation_enabled, study_card_extract_enabled=study_card_extract_enabled, + study_publish_enabled=study_publish_enabled, internal_worker_token=internal_worker_token, pipeline_held_stages=pipeline_held_stages, mlx_gate_concurrency=mlx_gate_concurrency, diff --git a/app/main.py b/app/main.py index 27197e9..cf0f2c7 100644 --- a/app/main.py +++ b/app/main.py @@ -70,6 +70,7 @@ async def lifespan(app: FastAPI): from workers.study_session_queue_consumer import consume_study_session_queue from workers.study_memo_card_jobs_consumer import consume_study_memo_card_queue from workers.study_card_enqueue import run as study_card_enqueue_run + from workers.study_publish_worker import consume_publish_outbox from workers.study_reminder import run as study_reminder_run from workers.study_weakness import run as study_weakness_run from workers.study_question_embed_worker import ( @@ -140,6 +141,9 @@ async def lifespan(app: FastAPI): # 별 테이블/별 consumer 로 기존 study queue 와 격리. settings.study_card_extract_enabled 게이트. scheduler.add_job(consume_study_memo_card_queue, "interval", minutes=1, id="study_memo_card_consumer") scheduler.add_job(study_card_enqueue_run, "interval", minutes=1, id="study_card_enqueue") + # 발행 레이어(docsrv-viewer-publish): publish_outbox drain → published rev 부여. + # study_publish_enabled=false(기본) 면 worker 내부 no-op. 단일 라이터(pg_advisory_xact_lock) max_instances=1. + scheduler.add_job(consume_publish_outbox, "interval", minutes=1, id="publish_outbox_consumer", max_instances=1) # PR-B 레거시 tier 백필 — 30분 주기로 호출되지만 KST 00:00~06:00 시간대만 실제 enqueue. # safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화. scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill") diff --git a/app/models/published.py b/app/models/published.py new file mode 100644 index 0000000..a6f3f7b --- /dev/null +++ b/app/models/published.py @@ -0,0 +1,60 @@ +"""발행 레이어 ORM (docsrv-viewer-publish) — published projection + publish_outbox. + +관계(relationship) 없음 = 독립 테이블, configure_mappers 무영향. 마이그 365~370. + published = 뷰어가 read API(P0-2)로 당기는 render-ready projection(kind-discriminated). + publish_outbox = 저작/4-A 트랜잭션이 같은 tx에서 INSERT, 발행 워커가 drain 하며 rev 부여. + +불변식(plan study-to-viewer-slice1): + pub_id opaque+stable = dedup키 = progress키 / rev = 워커 커밋순 gapless(pg_advisory_lock 단일 라이터) + / (payload_hash, deleted) 디둡 / 삭제 = tombstone(deleted=true) / schema_version = 엔벨로프 버전. +""" + +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import BigInteger, Boolean, DateTime, SmallInteger, String, Text +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column + +from core.database import Base + + +class Published(Base): + __tablename__ = "published" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + kind: Mapped[str] = mapped_column(String(40), nullable=False) + source_id: Mapped[int] = mapped_column(BigInteger, nullable=False) + pub_id: Mapped[str] = mapped_column(Text, nullable=False) + payload: Mapped[dict] = mapped_column(JSONB, nullable=False) + payload_hash: Mapped[str] = mapped_column(Text, nullable=False) + schema_version: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=1) + rev: Mapped[int] = mapped_column(BigInteger, nullable=False) + deleted: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, nullable=False + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, nullable=False + ) + + # UNIQUE(kind, pub_id)=mig366, UNIQUE(kind, source_id)=mig367, idx(rev)=mig368. + + +class PublishOutbox(Base): + __tablename__ = "publish_outbox" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + kind: Mapped[str] = mapped_column(String(40), nullable=False) + source_id: Mapped[int] = mapped_column(BigInteger, nullable=False) + payload: Mapped[dict] = mapped_column(JSONB, nullable=False) + payload_hash: Mapped[str] = mapped_column(Text, nullable=False) + schema_version: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=1) + deleted: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, nullable=False + ) + processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + + # 미처리 부분 인덱스 idx(id) WHERE processed_at IS NULL = mig370. diff --git a/app/services/study/publish_enqueue.py b/app/services/study/publish_enqueue.py new file mode 100644 index 0000000..063bd06 --- /dev/null +++ b/app/services/study/publish_enqueue.py @@ -0,0 +1,77 @@ +"""발행 outbox enqueue + 초기 백필 (docsrv-viewer-publish). + +enqueue_publish: 저작/4-A 트랜잭션이 같은 session(=같은 Postgres tx)에서 호출 → caller commit + (P0-1 규율: 콘텐츠 변경과 outbox INSERT 원자성, dual-write 회피). payload/hash 스냅샷. +enqueue_question_publish: 문항 + (ready면)해설을 함께 적재. 저작 쓰기/4-A 완료/백필 공용. +backfill_publish_questions: 기존 active 문항을 bounded 로 1회 outbox 적재(초기 백필, P2-1 bounded page). + 멱등 = 발행 워커의 (payload_hash, deleted) 디둡이 no-op 재투영 흡수(중복 enqueue 무해). + +★주의: 저작 엔드포인트(study_questions create/update)·4-A 워커에서의 enqueue 결선은 P0-1b + (기존 hot 파일 수정이라 별 increment). 본 모듈은 호출 라이브러리 + 수동/백필 진입점. +""" + +from __future__ import annotations + +from typing import Any + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from models.published import PublishOutbox +from models.study_question import StudyQuestion +from services.study.publish_projection import ( + KIND_EXPLANATION, + KIND_QUESTION, + SCHEMA_VERSION, + payload_hash, + project_explanation, + project_question, +) + + +async def enqueue_publish( + session: AsyncSession, + *, + kind: str, + source_id: int, + payload: dict[str, Any] | None, + deleted: bool = False, +) -> None: + """outbox 1행 INSERT. caller 가 commit (저자 tx 동봉). deleted=True 면 tombstone(payload={}).""" + body: dict[str, Any] = payload if payload is not None else {} + session.add( + PublishOutbox( + kind=kind, + source_id=source_id, + payload=body, + payload_hash=payload_hash(body), + schema_version=SCHEMA_VERSION, + deleted=deleted, + ) + ) + + +async def enqueue_question_publish(session: AsyncSession, q: Any) -> None: + """문항 + (ready면)해설을 outbox 적재. caller commit.""" + await enqueue_publish(session, kind=KIND_QUESTION, source_id=q.id, payload=project_question(q)) + expl = project_explanation(q) + if expl is not None: + await enqueue_publish(session, kind=KIND_EXPLANATION, source_id=q.id, payload=expl) + + +async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: + """active(미삭제) 문항을 id>after_id 부터 bounded 로 outbox 적재. + + 반환 = enqueue 한 문항 수(0 이면 끝). 큰 셋은 마지막 id 로 페이지 반복. caller commit. + """ + rows = ( + await session.execute( + select(StudyQuestion) + .where(StudyQuestion.deleted_at.is_(None), StudyQuestion.id > after_id) + .order_by(StudyQuestion.id.asc()) + .limit(limit) + ) + ).scalars().all() + for q in rows: + await enqueue_question_publish(session, q) + return len(rows) diff --git a/app/services/study/publish_projection.py b/app/services/study/publish_projection.py new file mode 100644 index 0000000..9b7fbce --- /dev/null +++ b/app/services/study/publish_projection.py @@ -0,0 +1,59 @@ +"""발행 projection — 소스 행을 render-ready payload + 안정 해시로 변환 (순수 함수). + +뷰어가 보는 '단일 진실'은 이 payload 까지 (DS 내부 실험 스키마는 계약 뒤 격리). +kind 별 projector. payload_hash = 정렬된 JSON 의 sha256 = (payload_hash, deleted) 디둡 키. + +★주의(plan study-to-viewer-slice1 r2): 과목/시험메타를 per-question payload 에 인라인 — + bulk subject rename 시 N행 churn. 정규화(과목=별 kind subject ref)는 churn 최적화 후속(P0-1b), + 읽기 정합엔 무영향. 지금은 인라인(상관관계 단순)으로 두고 후속 PR 에서 분리. +SCHEMA_VERSION = 엔벨로프 버전. payload 모양 진화 시 bump + 뷰어 range 수용(P0-2). +""" + +from __future__ import annotations + +import hashlib +import json +from typing import Any + +SCHEMA_VERSION = 1 + +KIND_QUESTION = "study_question" +KIND_EXPLANATION = "study_explanation" + + +def payload_hash(payload: dict[str, Any]) -> str: + """정렬 JSON 의 sha256 — (payload_hash, deleted) 디둡 키. 키 순서/공백 비의존.""" + canonical = json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + return hashlib.sha256(canonical.encode("utf-8")).hexdigest() + + +def project_question(q: Any) -> dict[str, Any]: + """study_question → 발행 payload. 정답 포함(개인 학습툴, plan Q2). 이미지는 ref 만(P0-4, 후속).""" + return { + "topic_id": q.study_topic_id, + "question_text": q.question_text, + "choices": [q.choice_1, q.choice_2, q.choice_3, q.choice_4], + "correct_choice": q.correct_choice, + "subject": q.subject, + "scope": q.scope, + "exam_name": q.exam_name, + "exam_round": q.exam_round, + "exam_question_number": q.exam_question_number, + "explanation": q.explanation, # 수동 해설(있으면). AI 해설은 별 kind. + } + + +def project_explanation(q: Any) -> dict[str, Any] | None: + """study_question 의 AI 해설 → 별 발행 kind. ready 일 때만(없으면 None=발행 안 함). + + 재조우 표시용 선발행. 신규 오답은 4-A 워커가 ~90s 후 ready→재발행(P2-3 결선, P0-1b). + """ + if getattr(q, "ai_explanation_status", None) != "ready" or not getattr(q, "ai_explanation", None): + return None + gen = getattr(q, "ai_explanation_generated_at", None) + return { + "question_source_id": q.id, + "explanation_md": q.ai_explanation, + "model": getattr(q, "ai_explanation_model", None), + "generated_at": gen.isoformat() if gen else None, + } diff --git a/app/workers/study_publish_worker.py b/app/workers/study_publish_worker.py new file mode 100644 index 0000000..968b24f --- /dev/null +++ b/app/workers/study_publish_worker.py @@ -0,0 +1,120 @@ +"""발행 워커 — publish_outbox drain → published 에 rev 부여 (docsrv-viewer-publish). + +APScheduler 1분(max_instances=1). pg_advisory_xact_lock 단일 라이터 → rev 커밋순 gapless +(인플라이트 갭 차단: bigserial seq 폴링이 아니라 outbox id 순 + 단일 라이터 rev 부여). + outbox 를 id(커밋순) 순으로 처리, (kind, source_id) 당 published upsert: + - 기존 행과 (payload_hash, deleted) 동일 → no-op(디둡, rev 안 올림) + processed 마킹 + - 그 외 → pub_id 재사용(기존)|신규 uuid, rev = MAX(rev)+1, payload/hash/deleted 갱신 + tombstone(deleted=True)은 디둡 복합키라 안 삼켜짐. 배치 단일 트랜잭션. + 배치 내 같은 (kind, source_id) 가 두 번 오면 flush 로 직전 반영을 다음 select 가 보게 함(최신 승). + +study_publish_enabled=False(기본) 면 no-op — 저자/4-A enqueue 결선(P0-1b) 전까지 inert. +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timezone + +from sqlalchemy import func, select, text + +from core.config import settings +from core.database import async_session +from core.utils import setup_logger +from models.published import Published, PublishOutbox + +logger = setup_logger("study_publish_worker") + +BATCH_SIZE = 500 +# pg_advisory_xact_lock 전역 단일 라이터 키(발행 워커 전용 임의 상수, 타 advisory 락과 비충돌). +ADVISORY_LOCK_KEY = 838201 + + +async def consume_publish_outbox() -> None: + """APScheduler 진입점. 미처리 outbox 를 rev 부여하며 published 로 반영.""" + if not settings.study_publish_enabled: + logger.debug("study_publish 비활성 (study_publish_enabled=false)") + return + + async with async_session() as session: + try: + # 1) 전역 단일 라이터 락(트랜잭션 스코프 — commit/rollback 시 자동 해제). + await session.execute( + text("SELECT pg_advisory_xact_lock(:k)").bindparams(k=ADVISORY_LOCK_KEY) + ) + # 2) 현재 최대 rev. + max_rev = int( + (await session.execute(select(func.coalesce(func.max(Published.rev), 0)))).scalar() or 0 + ) + # 3) 미처리 outbox 를 커밋순(id)으로. + rows = ( + await session.execute( + select(PublishOutbox) + .where(PublishOutbox.processed_at.is_(None)) + .order_by(PublishOutbox.id.asc()) + .limit(BATCH_SIZE) + ) + ).scalars().all() + if not rows: + return + + now = datetime.now(timezone.utc) + published_count = 0 + for ob in rows: + existing = ( + await session.execute( + select(Published).where( + Published.kind == ob.kind, + Published.source_id == ob.source_id, + ) + ) + ).scalar_one_or_none() + + # (payload_hash, deleted) 디둡 — no-op 재투영은 rev 안 올림. + if ( + existing is not None + and existing.payload_hash == ob.payload_hash + and existing.deleted == ob.deleted + ): + ob.processed_at = now + continue + + max_rev += 1 + if existing is None: + session.add( + Published( + kind=ob.kind, + source_id=ob.source_id, + pub_id=uuid.uuid4().hex, + payload=ob.payload, + payload_hash=ob.payload_hash, + schema_version=ob.schema_version, + rev=max_rev, + deleted=ob.deleted, + created_at=now, + updated_at=now, + ) + ) + else: + existing.payload = ob.payload + existing.payload_hash = ob.payload_hash + existing.schema_version = ob.schema_version + existing.deleted = ob.deleted + existing.rev = max_rev + existing.updated_at = now + + ob.processed_at = now + # 배치 내 동일 (kind, source_id) 후속 행이 직전 반영을 보도록 flush(최신 승). + await session.flush() + published_count += 1 + + await session.commit() + logger.info( + "publish_outbox_drained scanned=%s published=%s max_rev=%s", + len(rows), + published_count, + max_rev, + ) + except Exception as e: + await session.rollback() + logger.exception("publish_outbox_drain_failed: %s", e) diff --git a/migrations/365_published.sql b/migrations/365_published.sql new file mode 100644 index 0000000..a9b5c18 --- /dev/null +++ b/migrations/365_published.sql @@ -0,0 +1,21 @@ +-- 365_published.sql +-- 발행 레이어(docsrv-viewer-publish) projection 테이블. 뷰어가 read API로 당겨 자기 SQLite로 복제. +-- kind-discriminated 단일 테이블(study_question | study_explanation | ... 후속 news/document). +-- pub_id = opaque+stable(워커가 (kind,source_id)당 1회 부여, republish=rev bump에도 불변) = 뷰어 dedup키=progress키. +-- source_id = 내부 소스 행 id (pub_id→내부 역매핑, ingest write-back 해소용). +-- rev = 발행 워커 커밋순 gapless 커서(pg_advisory_lock 단일 라이터). 뷰어 feed = WHERE rev>since. +-- payload_hash = sha256(정렬 JSON). (payload_hash, deleted) 디둡 — no-op 재투영 억제, tombstone 보존. +-- deleted = tombstone(삭제/만료도 feed 1급 이벤트). schema_version = 엔벨로프 버전(미지원 가시거부). +CREATE TABLE IF NOT EXISTS published ( + id BIGSERIAL PRIMARY KEY, + kind VARCHAR(40) NOT NULL, + source_id BIGINT NOT NULL, + pub_id TEXT NOT NULL, + payload JSONB NOT NULL, + payload_hash TEXT NOT NULL, + schema_version SMALLINT NOT NULL DEFAULT 1, + rev BIGINT NOT NULL, + deleted BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/migrations/366_published_kind_pubid_uq.sql b/migrations/366_published_kind_pubid_uq.sql new file mode 100644 index 0000000..cb01db0 --- /dev/null +++ b/migrations/366_published_kind_pubid_uq.sql @@ -0,0 +1,3 @@ +-- 366_published_kind_pubid_uq.sql +-- pub_id 는 kind 내 유일(뷰어 dedup/progress 키 무결성, pub_id→내부 역해소 유일성 보장). +CREATE UNIQUE INDEX IF NOT EXISTS published_kind_pubid_uq ON published (kind, pub_id); diff --git a/migrations/367_published_kind_source_uq.sql b/migrations/367_published_kind_source_uq.sql new file mode 100644 index 0000000..f98b1cc --- /dev/null +++ b/migrations/367_published_kind_source_uq.sql @@ -0,0 +1,3 @@ +-- 367_published_kind_source_uq.sql +-- (kind, source_id) 당 발행 행 1개 — 발행 워커 upsert 타깃 + pub_id 재사용(같은 source=같은 pub_id) 키. +CREATE UNIQUE INDEX IF NOT EXISTS published_kind_source_uq ON published (kind, source_id); diff --git a/migrations/368_published_rev_idx.sql b/migrations/368_published_rev_idx.sql new file mode 100644 index 0000000..9141c7f --- /dev/null +++ b/migrations/368_published_rev_idx.sql @@ -0,0 +1,3 @@ +-- 368_published_rev_idx.sql +-- 뷰어 pull-sync feed: SELECT ... WHERE rev > :since ORDER BY rev LIMIT :page (P0-2). +CREATE INDEX IF NOT EXISTS published_rev_idx ON published (rev); diff --git a/migrations/369_publish_outbox.sql b/migrations/369_publish_outbox.sql new file mode 100644 index 0000000..02aebe1 --- /dev/null +++ b/migrations/369_publish_outbox.sql @@ -0,0 +1,15 @@ +-- 369_publish_outbox.sql +-- transactional outbox — 저작/4-A 트랜잭션이 같은 tx에서 여기 INSERT(P0-1 규율), +-- 단일 발행 워커가 id(커밋순) 순으로 drain 하며 published 에 rev 부여(소스 updated_at 폴링 금지=갭 재발). +-- processed_at = 워커 drain 시 스탬프(NULL=미처리). payload/hash 는 enqueue 시점 스냅샷. +CREATE TABLE IF NOT EXISTS publish_outbox ( + id BIGSERIAL PRIMARY KEY, + kind VARCHAR(40) NOT NULL, + source_id BIGINT NOT NULL, + payload JSONB NOT NULL, + payload_hash TEXT NOT NULL, + schema_version SMALLINT NOT NULL DEFAULT 1, + deleted BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + processed_at TIMESTAMPTZ +); diff --git a/migrations/370_publish_outbox_unprocessed_idx.sql b/migrations/370_publish_outbox_unprocessed_idx.sql new file mode 100644 index 0000000..9a9b127 --- /dev/null +++ b/migrations/370_publish_outbox_unprocessed_idx.sql @@ -0,0 +1,3 @@ +-- 370_publish_outbox_unprocessed_idx.sql +-- 워커 drain 쿼리: WHERE processed_at IS NULL ORDER BY id (커밋순). 부분 인덱스로 미처리분만. +CREATE INDEX IF NOT EXISTS publish_outbox_unprocessed_idx ON publish_outbox (id) WHERE processed_at IS NULL;