diff --git a/app/models/published.py b/app/models/published.py index 8fbe278..fe6db50 100644 --- a/app/models/published.py +++ b/app/models/published.py @@ -56,5 +56,9 @@ class PublishOutbox(Base): DateTime(timezone=True), default=datetime.now, nullable=False ) processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + # mig377: 행별 격리 재시도/terminal. attempts=savepoint 실패 누적, failed_at=MAX 초과 terminal + # (set 시 워커 select 에서 제외 → head-of-line block 방지). + attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=0) + failed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) # 미처리 부분 인덱스 idx(id) WHERE processed_at IS NULL = mig372. diff --git a/app/workers/study_publish_worker.py b/app/workers/study_publish_worker.py index 968b24f..90fb4d6 100644 --- a/app/workers/study_publish_worker.py +++ b/app/workers/study_publish_worker.py @@ -28,6 +28,8 @@ logger = setup_logger("study_publish_worker") BATCH_SIZE = 500 # pg_advisory_xact_lock 전역 단일 라이터 키(발행 워커 전용 임의 상수, 타 advisory 락과 비충돌). ADVISORY_LOCK_KEY = 838201 +# 행별 격리 재시도 상한 — 초과 시 failed_at 스탬프(terminal)로 select 에서 제외. +MAX_OUTBOX_ATTEMPTS = 5 async def consume_publish_outbox() -> None: @@ -46,11 +48,15 @@ async def consume_publish_outbox() -> None: max_rev = int( (await session.execute(select(func.coalesce(func.max(Published.rev), 0)))).scalar() or 0 ) - # 3) 미처리 outbox 를 커밋순(id)으로. + # 3) 미처리 outbox 를 커밋순(id)으로. failed_at(terminal) 은 제외 — poison 행이 + # head-of-line 을 영구 점유하지 않게 함. rows = ( await session.execute( select(PublishOutbox) - .where(PublishOutbox.processed_at.is_(None)) + .where( + PublishOutbox.processed_at.is_(None), + PublishOutbox.failed_at.is_(None), + ) .order_by(PublishOutbox.id.asc()) .limit(BATCH_SIZE) ) @@ -60,59 +66,86 @@ async def consume_publish_outbox() -> None: now = datetime.now(timezone.utc) published_count = 0 + failed_count = 0 for ob in rows: - existing = ( - await session.execute( - select(Published).where( - Published.kind == ob.kind, - Published.source_id == ob.source_id, - ) - ) - ).scalar_one_or_none() + try: + # 행 단위 savepoint 격리 — 한 행의 예외가 배치 전체(앞 행 processed_at 포함)를 + # 롤백해 poison 행이 다음 사이클에 다시 최저 id 로 선택되는 무한 재선택을 차단. + async with session.begin_nested(): + existing = ( + await session.execute( + select(Published).where( + Published.kind == ob.kind, + Published.source_id == ob.source_id, + ) + ) + ).scalar_one_or_none() - # (payload_hash, deleted) 디둡 — no-op 재투영은 rev 안 올림. - if ( - existing is not None - and existing.payload_hash == ob.payload_hash - and existing.deleted == ob.deleted - ): - ob.processed_at = now + # (payload_hash, deleted) 디둡 — no-op 재투영은 rev 안 올림. + is_noop = ( + existing is not None + and existing.payload_hash == ob.payload_hash + and existing.deleted == ob.deleted + ) + if is_noop: + ob.processed_at = now + else: + new_rev = max_rev + 1 + if existing is None: + session.add( + Published( + kind=ob.kind, + source_id=ob.source_id, + pub_id=uuid.uuid4().hex, + payload=ob.payload, + payload_hash=ob.payload_hash, + schema_version=ob.schema_version, + rev=new_rev, + deleted=ob.deleted, + created_at=now, + updated_at=now, + ) + ) + else: + existing.payload = ob.payload + existing.payload_hash = ob.payload_hash + existing.schema_version = ob.schema_version + existing.deleted = ob.deleted + existing.rev = new_rev + existing.updated_at = now + ob.processed_at = now + # 배치 내 동일 (kind, source_id) 후속 행이 직전 반영을 보도록 flush(최신 승). + await session.flush() + except Exception as row_err: + # savepoint 롤백 = 이 행의 쓰기(processed_at 포함) 취소. attempts/failed_at 만 + # 바깥 트랜잭션에 누적돼 최종 commit 으로 영속(영구 재선택 방지). + ob.attempts = (ob.attempts or 0) + 1 + if ob.attempts >= MAX_OUTBOX_ATTEMPTS: + ob.failed_at = now + failed_count += 1 + logger.error( + "publish_outbox_row_terminal id=%s kind=%s source_id=%s attempts=%s: %s", + ob.id, ob.kind, ob.source_id, ob.attempts, row_err, + ) + else: + logger.warning( + "publish_outbox_row_retry id=%s kind=%s source_id=%s attempts=%s: %s", + ob.id, ob.kind, ob.source_id, ob.attempts, row_err, + ) continue - - max_rev += 1 - if existing is None: - session.add( - Published( - kind=ob.kind, - source_id=ob.source_id, - pub_id=uuid.uuid4().hex, - payload=ob.payload, - payload_hash=ob.payload_hash, - schema_version=ob.schema_version, - rev=max_rev, - deleted=ob.deleted, - created_at=now, - updated_at=now, - ) - ) else: - existing.payload = ob.payload - existing.payload_hash = ob.payload_hash - existing.schema_version = ob.schema_version - existing.deleted = ob.deleted - existing.rev = max_rev - existing.updated_at = now - - ob.processed_at = now - # 배치 내 동일 (kind, source_id) 후속 행이 직전 반영을 보도록 flush(최신 승). - await session.flush() - published_count += 1 + # savepoint 커밋 성공 시에만 rev 카운터 전진(실패 행은 rev 미소모 → 드물게 gap, + # 단일 라이터·커밋순 부여라 viewer since-rev 증분 동기 정합엔 무해). + if not is_noop: + max_rev = new_rev + published_count += 1 await session.commit() logger.info( - "publish_outbox_drained scanned=%s published=%s max_rev=%s", + "publish_outbox_drained scanned=%s published=%s failed=%s max_rev=%s", len(rows), published_count, + failed_count, max_rev, ) except Exception as e: diff --git a/migrations/377_publish_outbox_attempts_failed.sql b/migrations/377_publish_outbox_attempts_failed.sql new file mode 100644 index 0000000..f55387d --- /dev/null +++ b/migrations/377_publish_outbox_attempts_failed.sql @@ -0,0 +1,8 @@ +-- 377_publish_outbox_attempts_failed.sql +-- publish_outbox poison row head-of-line block 차단. 발행 워커가 행별 savepoint 격리 후 +-- 예외 시 attempts++ 하고 MAX 초과 시 failed_at 스탬프(terminal) → 그 행을 select 에서 제외해 +-- 후속 발행이 막히지 않게 함. 기존 미처리 행은 attempts=0 / failed_at=NULL 로 정상 재처리. +-- (단일 ALTER = 1 statement = asyncpg prepared 호환.) +ALTER TABLE publish_outbox + ADD COLUMN IF NOT EXISTS attempts SMALLINT NOT NULL DEFAULT 0, + ADD COLUMN IF NOT EXISTS failed_at TIMESTAMPTZ;