From 3b7fd900e42bd58a18b6d0087bd485eec0b8b2c2 Mon Sep 17 00:00:00 2001 From: hyungi Date: Thu, 2 Jul 2026 09:47:57 +0900 Subject: [PATCH] =?UTF-8?q?fix(summarize):=20map=5Fresults=20persist=20ali?= =?UTF-8?q?asing=20=E2=80=94=20=EC=9C=A0=EB=8B=9B=20=EC=8A=A4=EB=83=85?= =?UTF-8?q?=EC=83=B7=20=EC=86=8C=EA=B8=89=20=EC=98=A4=EC=97=BC=EC=9C=BC?= =?UTF-8?q?=EB=A1=9C=20UPDATE=20=EC=8A=A4=ED=82=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 60254 라이브 E2E 에서 발견: 완주는 성공했으나 payload.presegment.map_results 에 unit 0 만 persist. 원인 = map_results dict 를 in-place 변경 → 직전 commit 의 SQLAlchemy committed 스냅샷이 같은 중첩 객체를 참조 → old==new 판정 → 2번째 commit 부터 UPDATE 스킵. 멱등 재개 시 완료 유닛 재호출 비용 발생(정확성 무영향). fix = 매 유닛 map_results/preseg/payload 전부 새 dict 재구성(공유 참조 0). test = FakeSession 이 commit 시점 payload 객체 참조를 박제, 사후 직렬화로 스냅샷 유닛 수가 1..n 단조 증가 단정 — 구 코드에 대해 FAILED 네거티브 검증 완료. Co-Authored-By: Claude Fable 5 --- app/workers/deep_summary_worker.py | 27 +++++++++++++++++---------- tests/test_deep_summary_mapreduce.py | 21 +++++++++++++++++++-- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/app/workers/deep_summary_worker.py b/app/workers/deep_summary_worker.py index 0fce0cc..afb5bb7 100644 --- a/app/workers/deep_summary_worker.py +++ b/app/workers/deep_summary_worker.py @@ -387,22 +387,29 @@ async def _process_map_reduce( f"({perr}) — 유닛 재시도 대상" ) continue - map_results[key] = { - "index": unit.index, - "titles": [t for t in unit.section_titles if t][:8], - "tldr": out.tldr, - "detail": out.detail, - "inconsistencies": _filter_inconsistencies(out.inconsistencies or []), + # ★매 유닛 새 dict 로 재구성 (in-place 변경 금지) — 직전 commit 의 committed + # 스냅샷이 같은 중첩 객체를 참조하면 old==new 로 보여 SQLAlchemy 가 UPDATE 를 + # 스킵한다(60254 라이브에서 unit 0 만 persist 된 aliasing 버그의 fix). + map_results = { + **map_results, + key: { + "index": unit.index, + "titles": [t for t in unit.section_titles if t][:8], + "tldr": out.tldr, + "detail": out.detail, + "inconsistencies": _filter_inconsistencies(out.inconsistencies or []), + }, } - preseg.update({ + preseg = { + **preseg, "tier": plan.tier, "over_pct": plan.over_pct, "total_est_tokens": plan.total_est_tokens, "units": n, "map_results": map_results, - }) - payload["presegment"] = dict(preseg) - queue_row.payload = dict(payload) # 재할당 = JSONB 변경 감지 + } + payload = {**payload, "presegment": preseg} + queue_row.payload = payload # 재할당 = JSONB 변경 감지 await session.commit() # 유닛 단위 멱등 재개 지점 if failed_units: diff --git a/tests/test_deep_summary_mapreduce.py b/tests/test_deep_summary_mapreduce.py index 5165534..15afc72 100644 --- a/tests/test_deep_summary_mapreduce.py +++ b/tests/test_deep_summary_mapreduce.py @@ -47,11 +47,19 @@ REDUCE_JSON = ( class FakeSession: - def __init__(self): + """commit 시점의 queue_row.payload 를 **객체 참조**로 박제 — SQLAlchemy 의 committed + 스냅샷과 동일하게, 이후 in-place 변경이 과거 커밋 객체에 소급 반영되는 aliasing + (60254 라이브에서 unit 0 만 persist 된 버그)을 검증 시점 직렬화로 탐지한다.""" + + def __init__(self, row=None): self.commits = 0 + self._row = row + self.snapshots: list = [] async def commit(self): self.commits += 1 + if self._row is not None: + self.snapshots.append(self._row.payload) # 참조 박제 — 복사 금지(의도) class FakeClient: @@ -145,8 +153,9 @@ async def test_map_reduce_end_to_end(monkeypatch, _patch_telemetry): client = FakeClient() monkeypatch.setattr(dsw, "AIClient", lambda: client) - doc, session = _doc(), FakeSession() + doc = _doc() row = SimpleNamespace(payload={"envelope": {"x": 1}}) + session = FakeSession(row) await dsw._process_map_reduce( doc, row, _envelope(), "generic", plan, session, @@ -166,6 +175,14 @@ async def test_map_reduce_end_to_end(monkeypatch, _patch_telemetry): assert row.payload["presegment"]["units"] == n assert len(row.payload["presegment"]["map_results"]) == n assert session.commits == n + # ★aliasing 회귀 방지: 각 commit 이 박제한 payload 객체를 사후에 봤을 때 + # map_results 가 1,2,...,n 로 단조 증가해야 한다. in-place 변경(구 버그)이면 + # 모든 스냅샷이 같은 dict 를 공유해 [n,n,...,n] 으로 보인다 = SQLAlchemy 가 + # committed 스냅샷과 new 가 같다고 판정해 UPDATE 를 스킵하는 것과 등가. + per_commit_units = [ + len(s["presegment"]["map_results"]) for s in session.snapshots + ] + assert per_commit_units == list(range(1, n + 1)) # telemetry 1건 (reduce 기준) events = _patch_telemetry assert len(events) == 1 and events[0]["error_code"] is None