fix(publish): publish_outbox poison row head-of-line block 차단

배치 단일 트랜잭션이라 한 행의 예외가 배치 전체(앞 행 processed_at 포함)를 롤백 →
poison 행이 매 사이클 최저 id 로 재선택되어 후속 발행이 영구 정지. outbox 모델에
재시도/terminal 컬럼이 전무(processing_queue·study_jobs 의 per-item 격리 패턴 미적용).

- mig377: publish_outbox 에 attempts/failed_at 추가
- 워커: 행별 savepoint(begin_nested) 격리 — 예외 시 attempts++, MAX(5) 초과 시
  failed_at 스탬프(terminal) 후 select 제외. 실패 행은 rev 미소모(드문 gap 은 단일
  라이터·커밋순 부여라 viewer since-rev 증분 동기에 무해).

study_publish_enabled=false 기본이라 현재 inert, 발행 활성화(P0-1b) 전 선결.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-29 13:06:40 +09:00
parent d030a2b7b0
commit b9f9d88d99
3 changed files with 91 additions and 46 deletions
+4
View File
@@ -56,5 +56,9 @@ class PublishOutbox(Base):
DateTime(timezone=True), default=datetime.now, nullable=False
)
processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# mig377: 행별 격리 재시도/terminal. attempts=savepoint 실패 누적, failed_at=MAX 초과 terminal
# (set 시 워커 select 에서 제외 → head-of-line block 방지).
attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=0)
failed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# 미처리 부분 인덱스 idx(id) WHERE processed_at IS NULL = mig372.
+79 -46
View File
@@ -28,6 +28,8 @@ logger = setup_logger("study_publish_worker")
BATCH_SIZE = 500
# pg_advisory_xact_lock 전역 단일 라이터 키(발행 워커 전용 임의 상수, 타 advisory 락과 비충돌).
ADVISORY_LOCK_KEY = 838201
# 행별 격리 재시도 상한 — 초과 시 failed_at 스탬프(terminal)로 select 에서 제외.
MAX_OUTBOX_ATTEMPTS = 5
async def consume_publish_outbox() -> None:
@@ -46,11 +48,15 @@ async def consume_publish_outbox() -> None:
max_rev = int(
(await session.execute(select(func.coalesce(func.max(Published.rev), 0)))).scalar() or 0
)
# 3) 미처리 outbox 를 커밋순(id)으로.
# 3) 미처리 outbox 를 커밋순(id)으로. failed_at(terminal) 은 제외 — poison 행이
# head-of-line 을 영구 점유하지 않게 함.
rows = (
await session.execute(
select(PublishOutbox)
.where(PublishOutbox.processed_at.is_(None))
.where(
PublishOutbox.processed_at.is_(None),
PublishOutbox.failed_at.is_(None),
)
.order_by(PublishOutbox.id.asc())
.limit(BATCH_SIZE)
)
@@ -60,59 +66,86 @@ async def consume_publish_outbox() -> None:
now = datetime.now(timezone.utc)
published_count = 0
failed_count = 0
for ob in rows:
existing = (
await session.execute(
select(Published).where(
Published.kind == ob.kind,
Published.source_id == ob.source_id,
)
)
).scalar_one_or_none()
try:
# 행 단위 savepoint 격리 — 한 행의 예외가 배치 전체(앞 행 processed_at 포함)를
# 롤백해 poison 행이 다음 사이클에 다시 최저 id 로 선택되는 무한 재선택을 차단.
async with session.begin_nested():
existing = (
await session.execute(
select(Published).where(
Published.kind == ob.kind,
Published.source_id == ob.source_id,
)
)
).scalar_one_or_none()
# (payload_hash, deleted) 디둡 — no-op 재투영은 rev 안 올림.
if (
existing is not None
and existing.payload_hash == ob.payload_hash
and existing.deleted == ob.deleted
):
ob.processed_at = now
# (payload_hash, deleted) 디둡 — no-op 재투영은 rev 안 올림.
is_noop = (
existing is not None
and existing.payload_hash == ob.payload_hash
and existing.deleted == ob.deleted
)
if is_noop:
ob.processed_at = now
else:
new_rev = max_rev + 1
if existing is None:
session.add(
Published(
kind=ob.kind,
source_id=ob.source_id,
pub_id=uuid.uuid4().hex,
payload=ob.payload,
payload_hash=ob.payload_hash,
schema_version=ob.schema_version,
rev=new_rev,
deleted=ob.deleted,
created_at=now,
updated_at=now,
)
)
else:
existing.payload = ob.payload
existing.payload_hash = ob.payload_hash
existing.schema_version = ob.schema_version
existing.deleted = ob.deleted
existing.rev = new_rev
existing.updated_at = now
ob.processed_at = now
# 배치 내 동일 (kind, source_id) 후속 행이 직전 반영을 보도록 flush(최신 승).
await session.flush()
except Exception as row_err:
# savepoint 롤백 = 이 행의 쓰기(processed_at 포함) 취소. attempts/failed_at 만
# 바깥 트랜잭션에 누적돼 최종 commit 으로 영속(영구 재선택 방지).
ob.attempts = (ob.attempts or 0) + 1
if ob.attempts >= MAX_OUTBOX_ATTEMPTS:
ob.failed_at = now
failed_count += 1
logger.error(
"publish_outbox_row_terminal id=%s kind=%s source_id=%s attempts=%s: %s",
ob.id, ob.kind, ob.source_id, ob.attempts, row_err,
)
else:
logger.warning(
"publish_outbox_row_retry id=%s kind=%s source_id=%s attempts=%s: %s",
ob.id, ob.kind, ob.source_id, ob.attempts, row_err,
)
continue
max_rev += 1
if existing is None:
session.add(
Published(
kind=ob.kind,
source_id=ob.source_id,
pub_id=uuid.uuid4().hex,
payload=ob.payload,
payload_hash=ob.payload_hash,
schema_version=ob.schema_version,
rev=max_rev,
deleted=ob.deleted,
created_at=now,
updated_at=now,
)
)
else:
existing.payload = ob.payload
existing.payload_hash = ob.payload_hash
existing.schema_version = ob.schema_version
existing.deleted = ob.deleted
existing.rev = max_rev
existing.updated_at = now
ob.processed_at = now
# 배치 내 동일 (kind, source_id) 후속 행이 직전 반영을 보도록 flush(최신 승).
await session.flush()
published_count += 1
# savepoint 커밋 성공 시에만 rev 카운터 전진(실패 행은 rev 미소모 → 드물게 gap,
# 단일 라이터·커밋순 부여라 viewer since-rev 증분 동기 정합엔 무해).
if not is_noop:
max_rev = new_rev
published_count += 1
await session.commit()
logger.info(
"publish_outbox_drained scanned=%s published=%s max_rev=%s",
"publish_outbox_drained scanned=%s published=%s failed=%s max_rev=%s",
len(rows),
published_count,
failed_count,
max_rev,
)
except Exception as e:
@@ -0,0 +1,8 @@
-- 377_publish_outbox_attempts_failed.sql
-- publish_outbox poison row head-of-line block 차단. 발행 워커가 행별 savepoint 격리 후
-- 예외 시 attempts++ 하고 MAX 초과 시 failed_at 스탬프(terminal) → 그 행을 select 에서 제외해
-- 후속 발행이 막히지 않게 함. 기존 미처리 행은 attempts=0 / failed_at=NULL 로 정상 재처리.
-- (단일 ALTER = 1 statement = asyncpg prepared 호환.)
ALTER TABLE publish_outbox
ADD COLUMN IF NOT EXISTS attempts SMALLINT NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS failed_at TIMESTAMPTZ;