fix(summarize): map_results persist aliasing — 유닛 스냅샷 소급 오염으로 UPDATE 스킵
60254 라이브 E2E 에서 발견: 완주는 성공했으나 payload.presegment.map_results 에 unit 0 만 persist. 원인 = map_results dict 를 in-place 변경 → 직전 commit 의 SQLAlchemy committed 스냅샷이 같은 중첩 객체를 참조 → old==new 판정 → 2번째 commit 부터 UPDATE 스킵. 멱등 재개 시 완료 유닛 재호출 비용 발생(정확성 무영향). fix = 매 유닛 map_results/preseg/payload 전부 새 dict 재구성(공유 참조 0). test = FakeSession 이 commit 시점 payload 객체 참조를 박제, 사후 직렬화로 스냅샷 유닛 수가 1..n 단조 증가 단정 — 구 코드에 대해 FAILED 네거티브 검증 완료. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -387,22 +387,29 @@ async def _process_map_reduce(
|
|||||||
f"({perr}) — 유닛 재시도 대상"
|
f"({perr}) — 유닛 재시도 대상"
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
map_results[key] = {
|
# ★매 유닛 새 dict 로 재구성 (in-place 변경 금지) — 직전 commit 의 committed
|
||||||
"index": unit.index,
|
# 스냅샷이 같은 중첩 객체를 참조하면 old==new 로 보여 SQLAlchemy 가 UPDATE 를
|
||||||
"titles": [t for t in unit.section_titles if t][:8],
|
# 스킵한다(60254 라이브에서 unit 0 만 persist 된 aliasing 버그의 fix).
|
||||||
"tldr": out.tldr,
|
map_results = {
|
||||||
"detail": out.detail,
|
**map_results,
|
||||||
"inconsistencies": _filter_inconsistencies(out.inconsistencies or []),
|
key: {
|
||||||
|
"index": unit.index,
|
||||||
|
"titles": [t for t in unit.section_titles if t][:8],
|
||||||
|
"tldr": out.tldr,
|
||||||
|
"detail": out.detail,
|
||||||
|
"inconsistencies": _filter_inconsistencies(out.inconsistencies or []),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
preseg.update({
|
preseg = {
|
||||||
|
**preseg,
|
||||||
"tier": plan.tier,
|
"tier": plan.tier,
|
||||||
"over_pct": plan.over_pct,
|
"over_pct": plan.over_pct,
|
||||||
"total_est_tokens": plan.total_est_tokens,
|
"total_est_tokens": plan.total_est_tokens,
|
||||||
"units": n,
|
"units": n,
|
||||||
"map_results": map_results,
|
"map_results": map_results,
|
||||||
})
|
}
|
||||||
payload["presegment"] = dict(preseg)
|
payload = {**payload, "presegment": preseg}
|
||||||
queue_row.payload = dict(payload) # 재할당 = JSONB 변경 감지
|
queue_row.payload = payload # 재할당 = JSONB 변경 감지
|
||||||
await session.commit() # 유닛 단위 멱등 재개 지점
|
await session.commit() # 유닛 단위 멱등 재개 지점
|
||||||
|
|
||||||
if failed_units:
|
if failed_units:
|
||||||
|
|||||||
@@ -47,11 +47,19 @@ REDUCE_JSON = (
|
|||||||
|
|
||||||
|
|
||||||
class FakeSession:
|
class FakeSession:
|
||||||
def __init__(self):
|
"""commit 시점의 queue_row.payload 를 **객체 참조**로 박제 — SQLAlchemy 의 committed
|
||||||
|
스냅샷과 동일하게, 이후 in-place 변경이 과거 커밋 객체에 소급 반영되는 aliasing
|
||||||
|
(60254 라이브에서 unit 0 만 persist 된 버그)을 검증 시점 직렬화로 탐지한다."""
|
||||||
|
|
||||||
|
def __init__(self, row=None):
|
||||||
self.commits = 0
|
self.commits = 0
|
||||||
|
self._row = row
|
||||||
|
self.snapshots: list = []
|
||||||
|
|
||||||
async def commit(self):
|
async def commit(self):
|
||||||
self.commits += 1
|
self.commits += 1
|
||||||
|
if self._row is not None:
|
||||||
|
self.snapshots.append(self._row.payload) # 참조 박제 — 복사 금지(의도)
|
||||||
|
|
||||||
|
|
||||||
class FakeClient:
|
class FakeClient:
|
||||||
@@ -145,8 +153,9 @@ async def test_map_reduce_end_to_end(monkeypatch, _patch_telemetry):
|
|||||||
|
|
||||||
client = FakeClient()
|
client = FakeClient()
|
||||||
monkeypatch.setattr(dsw, "AIClient", lambda: client)
|
monkeypatch.setattr(dsw, "AIClient", lambda: client)
|
||||||
doc, session = _doc(), FakeSession()
|
doc = _doc()
|
||||||
row = SimpleNamespace(payload={"envelope": {"x": 1}})
|
row = SimpleNamespace(payload={"envelope": {"x": 1}})
|
||||||
|
session = FakeSession(row)
|
||||||
|
|
||||||
await dsw._process_map_reduce(
|
await dsw._process_map_reduce(
|
||||||
doc, row, _envelope(), "generic", plan, session,
|
doc, row, _envelope(), "generic", plan, session,
|
||||||
@@ -166,6 +175,14 @@ async def test_map_reduce_end_to_end(monkeypatch, _patch_telemetry):
|
|||||||
assert row.payload["presegment"]["units"] == n
|
assert row.payload["presegment"]["units"] == n
|
||||||
assert len(row.payload["presegment"]["map_results"]) == n
|
assert len(row.payload["presegment"]["map_results"]) == n
|
||||||
assert session.commits == n
|
assert session.commits == n
|
||||||
|
# ★aliasing 회귀 방지: 각 commit 이 박제한 payload 객체를 사후에 봤을 때
|
||||||
|
# map_results 가 1,2,...,n 로 단조 증가해야 한다. in-place 변경(구 버그)이면
|
||||||
|
# 모든 스냅샷이 같은 dict 를 공유해 [n,n,...,n] 으로 보인다 = SQLAlchemy 가
|
||||||
|
# committed 스냅샷과 new 가 같다고 판정해 UPDATE 를 스킵하는 것과 등가.
|
||||||
|
per_commit_units = [
|
||||||
|
len(s["presegment"]["map_results"]) for s in session.snapshots
|
||||||
|
]
|
||||||
|
assert per_commit_units == list(range(1, n + 1))
|
||||||
# telemetry 1건 (reduce 기준)
|
# telemetry 1건 (reduce 기준)
|
||||||
events = _patch_telemetry
|
events = _patch_telemetry
|
||||||
assert len(events) == 1 and events[0]["error_code"] is None
|
assert len(events) == 1 and events[0]["error_code"] is None
|
||||||
|
|||||||
Reference in New Issue
Block a user