Files
hyungi_document_server/scripts/marker_reprocess_existing_success.py
Hyungi Ahn f2a5c729b7 fix(scripts): marker reprocess SQL — CAST(:payload AS jsonb) 로 named-param 충돌 해소
`:payload::jsonb` 의 `::` postfix 캐스트가 SQLAlchemy text() 의 named-param prefix
`:` 와 충돌해 asyncpg syntax error. doc 3757 sample reprocess 시 발견.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 14:27:17 +09:00

196 lines
7.2 KiB
Python

"""Phase 1B.5 ImgAuth — 기존 marker success 문서 targeted 재변환.
목적:
Phase 1B 배포 (2026-05-01) ~ 1B.5 직전 사이에 marker_worker 가 만든 success 문서들은
md_content 안에 깨진 ref (`![](_page_0_Picture_3.jpeg)` 등) 만 있고 NAS 파일 부재.
1B.5 deploy 후 이 문서들을 force_reprocess=true 로 다시 큐에 넣어 이미지 persist +
md_content ref 정규화 (`docimg:img_NNN`) 을 적용한다.
사용자 못박은 절차 (plan: piped-humming-crystal.md Step 6):
1. 1B.5 merge/deploy 전에는 기존 success 문서 절대 건드리지 않음
2. 1B.5 deploy + 신규 업로드 1~2건 + sample 5건 검증 후에만 28건 진행
3. anchor/baseline 보존: pre-snapshot CSV + 재변환 전후 quality metric 비교값
4. Phase 2 cron 와 분리 (별 단계, 야간 시간 단발 실행)
실행:
# 1) 후보 + pre-snapshot 출력 (CSV stdout)
docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py --dry-run
# 2) sample 모드 (지정한 doc_id 만 enqueue)
docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py \\
--apply --only 4809,5127,5180,5183
# 3) 전체 28건 enqueue (sample 검증 통과 후)
docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py --apply
# 4) snapshot CSV 파일 저장
docker compose exec fastapi python /app/scripts/marker_reprocess_existing_success.py \\
--dry-run --snapshot-csv /app/logs/marker_pre_imgauth_snapshot_2026MMDD.csv
배포 후 검증:
- 모든 후보가 md_status='success' 유지
- document_images row 분포 (예상: ~20건 이미지 보유, ~8건 없음)
- md_content 안 `docimg:` ref 수 == document_images row 수 per doc
- anchor doc 4809 quality 비교 (heading_count / table_row_count / text_length_ratio ±5%)
"""
import argparse
import asyncio
import csv
import json
import os
import sys
from io import StringIO
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
CANDIDATES_SQL = """
SELECT id, md_status, md_content_hash, md_extraction_engine, md_extraction_engine_version,
md_extraction_quality, md_generated_at,
file_format, file_path, title
FROM documents
WHERE md_status = 'success'
AND md_extraction_engine = 'marker'
ORDER BY id
"""
def _serialize_row(row) -> dict:
quality = row.md_extraction_quality
return {
"id": row.id,
"md_status": row.md_status,
"md_content_hash": row.md_content_hash,
"md_extraction_engine": row.md_extraction_engine,
"md_extraction_engine_version": row.md_extraction_engine_version,
"md_extraction_quality": json.dumps(quality, ensure_ascii=False) if quality else "",
"md_generated_at": row.md_generated_at.isoformat() if row.md_generated_at else "",
"file_format": row.file_format,
"file_path": row.file_path,
"title": row.title or "",
}
async def run(*, apply: bool, only_ids: set[int] | None, snapshot_csv: str | None) -> int:
database_url = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://pkm:pkm@localhost:5432/pkm",
)
engine = create_async_engine(database_url)
session_factory = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
try:
async with session_factory() as session:
rows = (await session.execute(text(CANDIDATES_SQL))).all()
if only_ids:
rows = [r for r in rows if r.id in only_ids]
print(f"=== marker success 후보 = {len(rows)}건 ===")
if not rows:
print("후보 없음 — 종료.")
return 0
# pre-snapshot CSV 출력
buf = StringIO()
writer = csv.DictWriter(
buf,
fieldnames=[
"id", "md_status", "md_content_hash", "md_extraction_engine",
"md_extraction_engine_version", "md_extraction_quality",
"md_generated_at", "file_format", "file_path", "title",
],
)
writer.writeheader()
for row in rows:
writer.writerow(_serialize_row(row))
csv_text = buf.getvalue()
if snapshot_csv:
with open(snapshot_csv, "w", encoding="utf-8") as f:
f.write(csv_text)
print(f"[snapshot] {snapshot_csv}{len(rows)}행 기록")
else:
print("\n=== Pre-snapshot CSV ===")
print(csv_text)
if not apply:
print(f"\n[dry-run] {len(rows)}건 영향. --apply 로 실제 enqueue.")
return 0
# enqueue — UNIQUE(document_id, stage) WHERE status IN ('pending', 'processing')
# 가 있으므로 활성 markdown 행이 없는 doc 만 통과. 충돌 시 silent skip.
# CAST(:payload AS jsonb) — `::jsonb` postfix 캐스트는 SQLAlchemy text()
# 의 named-param prefix `:` 와 충돌해 syntax error 발생.
ENQUEUE_SQL = text("""
INSERT INTO processing_queue (document_id, stage, status, payload)
VALUES (:doc_id, 'markdown', 'pending', CAST(:payload AS jsonb))
ON CONFLICT DO NOTHING
""")
payload = json.dumps({
"force_reprocess": True,
"reason": "phase_1b5_imgauth_targeted_reprocess",
})
inserted = 0
for row in rows:
result = await session.execute(
ENQUEUE_SQL, {"doc_id": row.id, "payload": payload}
)
if result.rowcount > 0:
inserted += 1
await session.commit()
print(f"\n[apply] enqueue 완료 — {inserted}/{len(rows)} 건 신규 markdown 큐 추가")
print(" (skip = 이미 활성 markdown 큐 행이 있는 문서)")
return 0
finally:
await engine.dispose()
def _parse_only_ids(arg: str | None) -> set[int] | None:
if not arg:
return None
out: set[int] = set()
for part in arg.split(","):
part = part.strip()
if part:
out.add(int(part))
return out or None
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--apply", action="store_true", help="실제 enqueue (기본 dry-run)")
parser.add_argument("--dry-run", action="store_true", help="명시적 dry-run (default 동등)")
parser.add_argument(
"--only", type=str, default=None,
help="쉼표 구분 doc_id 화이트리스트 (sample 검증용, 예: 4809,5127,5180)",
)
parser.add_argument(
"--snapshot-csv", type=str, default=None,
help="pre-snapshot 을 stdout 대신 이 경로의 CSV 파일로 저장",
)
args = parser.parse_args()
if args.apply and args.dry_run:
parser.error("--apply 와 --dry-run 동시 지정 불가")
only_ids = _parse_only_ids(args.only)
return asyncio.run(run(
apply=args.apply,
only_ids=only_ids,
snapshot_csv=args.snapshot_csv,
))
if __name__ == "__main__":
sys.exit(main())