diff --git a/app/workers/deep_summary_worker.py b/app/workers/deep_summary_worker.py index 0fce0cc..afb5bb7 100644 --- a/app/workers/deep_summary_worker.py +++ b/app/workers/deep_summary_worker.py @@ -387,22 +387,29 @@ async def _process_map_reduce( f"({perr}) — 유닛 재시도 대상" ) continue - map_results[key] = { - "index": unit.index, - "titles": [t for t in unit.section_titles if t][:8], - "tldr": out.tldr, - "detail": out.detail, - "inconsistencies": _filter_inconsistencies(out.inconsistencies or []), + # ★매 유닛 새 dict 로 재구성 (in-place 변경 금지) — 직전 commit 의 committed + # 스냅샷이 같은 중첩 객체를 참조하면 old==new 로 보여 SQLAlchemy 가 UPDATE 를 + # 스킵한다(60254 라이브에서 unit 0 만 persist 된 aliasing 버그의 fix). + map_results = { + **map_results, + key: { + "index": unit.index, + "titles": [t for t in unit.section_titles if t][:8], + "tldr": out.tldr, + "detail": out.detail, + "inconsistencies": _filter_inconsistencies(out.inconsistencies or []), + }, } - preseg.update({ + preseg = { + **preseg, "tier": plan.tier, "over_pct": plan.over_pct, "total_est_tokens": plan.total_est_tokens, "units": n, "map_results": map_results, - }) - payload["presegment"] = dict(preseg) - queue_row.payload = dict(payload) # 재할당 = JSONB 변경 감지 + } + payload = {**payload, "presegment": preseg} + queue_row.payload = payload # 재할당 = JSONB 변경 감지 await session.commit() # 유닛 단위 멱등 재개 지점 if failed_units: diff --git a/tests/test_deep_summary_mapreduce.py b/tests/test_deep_summary_mapreduce.py index 5165534..15afc72 100644 --- a/tests/test_deep_summary_mapreduce.py +++ b/tests/test_deep_summary_mapreduce.py @@ -47,11 +47,19 @@ REDUCE_JSON = ( class FakeSession: - def __init__(self): + """commit 시점의 queue_row.payload 를 **객체 참조**로 박제 — SQLAlchemy 의 committed + 스냅샷과 동일하게, 이후 in-place 변경이 과거 커밋 객체에 소급 반영되는 aliasing + (60254 라이브에서 unit 0 만 persist 된 버그)을 검증 시점 직렬화로 탐지한다.""" + + def __init__(self, row=None): self.commits = 0 + self._row = row + self.snapshots: list = [] async def commit(self): self.commits += 1 + if self._row is not None: + self.snapshots.append(self._row.payload) # 참조 박제 — 복사 금지(의도) class FakeClient: @@ -145,8 +153,9 @@ async def test_map_reduce_end_to_end(monkeypatch, _patch_telemetry): client = FakeClient() monkeypatch.setattr(dsw, "AIClient", lambda: client) - doc, session = _doc(), FakeSession() + doc = _doc() row = SimpleNamespace(payload={"envelope": {"x": 1}}) + session = FakeSession(row) await dsw._process_map_reduce( doc, row, _envelope(), "generic", plan, session, @@ -166,6 +175,14 @@ async def test_map_reduce_end_to_end(monkeypatch, _patch_telemetry): assert row.payload["presegment"]["units"] == n assert len(row.payload["presegment"]["map_results"]) == n assert session.commits == n + # ★aliasing 회귀 방지: 각 commit 이 박제한 payload 객체를 사후에 봤을 때 + # map_results 가 1,2,...,n 로 단조 증가해야 한다. in-place 변경(구 버그)이면 + # 모든 스냅샷이 같은 dict 를 공유해 [n,n,...,n] 으로 보인다 = SQLAlchemy 가 + # committed 스냅샷과 new 가 같다고 판정해 UPDATE 를 스킵하는 것과 등가. + per_commit_units = [ + len(s["presegment"]["map_results"]) for s in session.snapshots + ] + assert per_commit_units == list(range(1, n + 1)) # telemetry 1건 (reduce 기준) events = _patch_telemetry assert len(events) == 1 and events[0]["error_code"] is None