feat(papers): B-3 PR4 — 레거시 arXiv DOI reconcile + arXiv DataCite DOI 통일 (keyless)

plan safety-library-b3-1 PR4. paper.doi 없는 paper 행을 arXiv DataCite DOI 로 스탬프해
partial-unique 인덱스 편입 → 재유입 차단('동일-DOI 재유입 차단만').
- doi.py: parse_arxiv_id(본문→arXiv id) + arxiv_doi(10.48550/arxiv.{id}, OpenAlex canonical 실측 일치).
- ★arXiv DOI 통일: arxiv_collector 도 프리프린트(저널 DOI 없음)에 arxiv_doi 부여 → PR2/PR3/PR4 가 같은
  함수로 같은 paper.doi → 교차소스 dedup 성립(이전엔 프리프린트 paper.doi 부재로 PR2↔PR3 dup 갭).
- paper_doi_reconcile.py: 전용 worker(dedup_reconcile=file_hash 캐시와 별개 — 적대리뷰 B·C major).
  keyless·결정적(OpenAlex 호출 0)·in-DB·enqueue 0(콘텐츠 무변경). 선재 DOI holder 시 parent_doi
  마킹(unique 위반 회피). add_job daily 03:50 KST. __main__ CLI.

단위 28 passed(+parse_arxiv_id·arxiv_doi). 라이브 PASS (prod, running fastapi 무접촉):
레거시 197행 arXiv DataCite 스탬프·ASME 2행 skip·선재중복 0 / dedup 불변식 206 distinct 206(인덱스 무위반) /
paper summarize active 0(signal-only). 멱등.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Code
2026-06-13 22:54:24 +00:00
parent fdabca2a2f
commit 244d526ae2
5 changed files with 132 additions and 6 deletions
+4
View File
@@ -58,6 +58,7 @@ async def lifespan(app: FastAPI):
from workers.news_collector import run as news_collector_run
from workers.arxiv_collector import run as arxiv_collector_run
from workers.openalex_collector import run as openalex_collector_run
from workers.paper_doi_reconcile import run as paper_doi_reconcile_run
from workers.fulltext_worker import reconcile_unresolved as fulltext_reconcile_run
from workers.kosha_collector import run as kosha_collector_run
from workers.csb_collector import run as csb_collector_run
@@ -141,6 +142,9 @@ async def lifespan(app: FastAPI):
# plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산.
# soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌).
scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile")
# B-3 PR4: 레거시 paper 행 arXiv DataCite DOI 스탬프(재유입 차단). keyless·in-DB·enqueue 0.
# dedup_reconcile(03:30)·fulltext_reconcile(03:40) 와 별 worker·비충돌 슬롯.
scheduler.add_job(paper_doi_reconcile_run, CronTrigger(hour=3, minute=50, timezone=KST), id="paper_doi_reconcile")
# crawl-24x7 C-2: KOSHA 재해사례 diff + GUIDE 점진 백필 (daily, 새벽 잡들과 비충돌 슬롯).
scheduler.add_job(kosha_collector_run, CronTrigger(hour=6, minute=40, timezone=KST), id="kosha_collector")
# 사이클 3 C-2 잔여: CSB sitemap lastmod diff (weekly 월, cap 40 + 워터마크 점진 백필).
+24
View File
@@ -14,6 +14,7 @@ plan safety-library-b3-1 PR1 (keyless·마이그 0).
"""
import hashlib
import re
# 소문자화 후 비교하므로 전부 소문자 prefix. 긴 것부터(dx.doi.org 가 doi.org 보다 먼저).
_DOI_PREFIXES = (
@@ -50,6 +51,29 @@ def normalize_doi(raw: str | None) -> str | None:
return s
# arXiv id: 신형 'YYMM.NNNNN'(+vN) 또는 구형 'archive(.SUBJ)/NNNNNNN'. 'arXiv:' 접두 흡수.
_ARXIV_ID_RE = re.compile(
r"arxiv:\s*([a-z\-]+(?:\.[a-z]{2})?/\d{7}|\d{4}\.\d{4,5})(v\d+)?", re.IGNORECASE
)
def parse_arxiv_id(text: str | None) -> str | None:
"""본문/제목에서 arXiv id(versionless) 추출. 없으면 None. 레거시 reconcile 의 입력."""
if not text:
return None
m = _ARXIV_ID_RE.search(text)
return m.group(1) if m else None
def arxiv_doi(arxiv_id: str | None) -> str | None:
"""arXiv DataCite DOI = 10.48550/arxiv.{id} (정규화). 저널 DOI 없는 프리프린트의 canonical
paper.doi 통일 키 — OpenAlex 가 프리프린트에 동일 DOI 부여(실측 확인). 모든 수집기·reconcile 가
같은 함수로 같은 DOI 를 써야 교차소스 dedup 이 성립."""
if not arxiv_id:
return None
return normalize_doi(f"10.48550/arXiv.{arxiv_id}")
def paper_doi_hash(normalized_doi: str) -> str:
"""서지 holder 의 Document.file_hash — sha256('paper|{doi}')[:32].
+8 -6
View File
@@ -29,7 +29,7 @@ from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from services.papers.doi import normalize_doi
from services.papers.doi import arxiv_doi, normalize_doi
from services.papers.holder import find_paper_holder
from workers.news_collector import (
FeedError,
@@ -161,11 +161,11 @@ def parse_arxiv_feed(xml_text: str) -> tuple[int, list[ArxivEntry]]:
# ───────────────────────── 적재 (DB — PR2 라이브 검증) ─────────────────────────
def _build_paper_meta(source: NewsSource, entry: ArxivEntry) -> dict:
def _build_paper_meta(source: NewsSource, entry: ArxivEntry, doi: str | None) -> dict:
"""extract_meta — license + source + paper 식별. 서지 holder 는 paper.doi(있으면) 보유."""
paper: dict = {"arxiv_id": entry.arxiv_id}
if entry.doi:
paper["doi"] = entry.doi # partial-unique 인덱스 진입 (교차소스 dedup)
if doi:
paper["doi"] = doi # partial-unique 인덱스 진입 (교차소스 dedup)
if entry.journal_ref:
paper["journal_ref"] = entry.journal_ref
if entry.primary_category:
@@ -193,8 +193,10 @@ async def _ingest_entry(session, source: NewsSource, entry: ArxivEntry) -> bool:
)
if dup.scalars().first():
return False
# arXiv canonical DOI = 저널 DOI 또는 arXiv DataCite DOI(프리프린트도 paper.doi 보유 → PR3 와 dedup)
doi = entry.doi or arxiv_doi(entry.arxiv_id)
# 교차소스 dedup(DOI holder 이미 존재 — partial-unique 인덱스 백스톱 선제 회피)
if entry.doi and await find_paper_holder(session, entry.doi):
if doi and await find_paper_holder(session, doi):
return False
body = entry.summary or entry.title
@@ -217,7 +219,7 @@ async def _ingest_entry(session, source: NewsSource, entry: ArxivEntry) -> bool:
material_type="paper",
jurisdiction=None, # paper = NULL 불변(A-2). 지역은 extract_meta.paper.source_region.
published_date=entry.published.date() if entry.published else None,
extract_meta=_build_paper_meta(source, entry),
extract_meta=_build_paper_meta(source, entry, doi),
)
session.add(doc)
await session.flush()
+76
View File
@@ -0,0 +1,76 @@
"""레거시 paper 행 DOI reconcile — B-3 PR4 (plan safety-library-b3-1).
paper.doi 없는 paper 행(레거시 194 arXiv/ASME 초록 + 수집기 누락분)을 arXiv DataCite DOI 로 스탬프해
partial-unique 인덱스에 편입 → 향후 수집기 재유입을 find_paper_holder 가 차단('동일-DOI 재유입 차단만').
- KEYLESS·결정적: arXiv id(paper.arxiv_id 또는 extracted_text 파싱) → arxiv_doi(10.48550/arxiv.{id},
OpenAlex canonical 실측 일치). OpenAlex 호출 불요. arXiv id 없는 ASME RSS 행은 skip.
- ★dedup_reconcile(file_hash 캐시 재계산·무IO)와 **별 worker** — 적대리뷰 B·C major: 외부키/네트워크
실패모드를 캐시 무결성 잡에 결합하지 않음(여긴 keyless 라 네트워크도 없음, 순수 in-DB 메타 갱신).
- 이미 같은 DOI holder 존재 = 선재 중복 → 스탬프 대신 parent_doi 마킹(unique 위반 회피).
- 콘텐츠(extracted_text) 무변경, 메타만 갱신 → 어떤 stage 도 enqueue 안 함(summarize/embed/chunk 0 자명).
"""
import asyncio
from sqlalchemy import select
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from services.papers.doi import arxiv_doi, parse_arxiv_id, with_paper_doi, with_parent_doi
from services.papers.holder import find_paper_holder
logger = setup_logger("paper_doi_reconcile")
_DOI_TEXT = Document.extract_meta[("paper", "doi")].astext
async def run(limit: int = 0) -> None:
"""paper.doi 없는 paper 행을 arXiv DOI 로 스탬프(멱등). limit=0 = 전건."""
stamped = marked_dup = skipped_no_arxiv = 0
async with async_session() as session:
q = (
select(Document)
.where(Document.material_type == "paper", _DOI_TEXT.is_(None))
.order_by(Document.id)
)
if limit:
q = q.limit(limit)
rows = (await session.execute(q)).scalars().all()
for row in rows:
meta = dict(row.extract_meta or {})
paper = dict(meta.get("paper") or {})
arxiv_id = paper.get("arxiv_id") or parse_arxiv_id(row.extracted_text)
doi = arxiv_doi(arxiv_id)
if not doi:
skipped_no_arxiv += 1
continue
paper["arxiv_id"] = arxiv_id
meta["paper"] = paper
holder = await find_paper_holder(session, doi)
if holder is not None and holder.id != row.id:
# 선재 중복(다른 행이 이미 이 DOI holder) → 자식 마킹(인덱스 밖, unique 위반 회피)
row.extract_meta = with_parent_doi(meta, doi)
marked_dup += 1
else:
# 스탬프 → 이 행이 holder, partial-unique 인덱스 진입 (재유입 차단 성립)
row.extract_meta = with_paper_doi(meta, doi)
stamped += 1
# 콘텐츠 무변경 → enqueue 없음 (summarize/embed/chunk 0)
await session.commit()
logger.info(
f"[paper_doi_reconcile] paper.doi 없는 {len(rows)}행 → 스탬프 {stamped} · "
f"선재중복 마킹 {marked_dup} · arXiv id 없음 skip {skipped_no_arxiv}"
)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="레거시 paper DOI reconcile (arXiv DataCite, keyless)")
parser.add_argument("--limit", type=int, default=0, help="처리 상한(0=전건)")
args = parser.parse_args()
asyncio.run(run(limit=args.limit))
+20
View File
@@ -9,8 +9,10 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "app"))
from services.papers.doi import ( # noqa: E402
arxiv_doi,
normalize_doi,
paper_doi_hash,
parse_arxiv_id,
read_paper_doi,
with_paper_doi,
with_parent_doi,
@@ -109,3 +111,21 @@ def test_read_paper_doi():
assert read_paper_doi(None) is None
assert read_paper_doi({"paper": {"parent_doi": "10.1/p"}}) is None # child 는 doi 없음
assert read_paper_doi({"paper": {}}) is None
# ─── PR4: arXiv id 파싱 + arXiv DataCite DOI (교차소스 dedup 통일 키) ───
def test_parse_arxiv_id():
assert parse_arxiv_id("Title arXiv:2606.10236v1 Announce Type: new Abstract") == "2606.10236"
assert parse_arxiv_id("see arXiv:2601.02852 for details") == "2601.02852"
assert parse_arxiv_id("arXiv:cond-mat/0703470v2") == "cond-mat/0703470"
assert parse_arxiv_id("no arxiv here") is None
assert parse_arxiv_id(None) is None
def test_arxiv_doi_canonical():
# OpenAlex canonical 실측 일치: 10.48550/arxiv.{id} (소문자)
assert arxiv_doi("2606.10236") == "10.48550/arxiv.2606.10236"
assert arxiv_doi(None) is None
# 수집기·reconcile 가 같은 함수 → 같은 paper.doi (교차소스 dedup 성립)
assert arxiv_doi(parse_arxiv_id("x arXiv:2606.10236v1 y")) == "10.48550/arxiv.2606.10236"