From 244d526ae2f54cc395869aaf3165895bfc652b6a Mon Sep 17 00:00:00 2001 From: Claude Code Date: Sat, 13 Jun 2026 22:54:24 +0000 Subject: [PATCH] =?UTF-8?q?feat(papers):=20B-3=20PR4=20=E2=80=94=20?= =?UTF-8?q?=EB=A0=88=EA=B1=B0=EC=8B=9C=20arXiv=20DOI=20reconcile=20+=20arX?= =?UTF-8?q?iv=20DataCite=20DOI=20=ED=86=B5=EC=9D=BC=20(keyless)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit plan safety-library-b3-1 PR4. paper.doi 없는 paper 행을 arXiv DataCite DOI 로 스탬프해 partial-unique 인덱스 편입 → 재유입 차단('동일-DOI 재유입 차단만'). - doi.py: parse_arxiv_id(본문→arXiv id) + arxiv_doi(10.48550/arxiv.{id}, OpenAlex canonical 실측 일치). - ★arXiv DOI 통일: arxiv_collector 도 프리프린트(저널 DOI 없음)에 arxiv_doi 부여 → PR2/PR3/PR4 가 같은 함수로 같은 paper.doi → 교차소스 dedup 성립(이전엔 프리프린트 paper.doi 부재로 PR2↔PR3 dup 갭). - paper_doi_reconcile.py: 전용 worker(dedup_reconcile=file_hash 캐시와 별개 — 적대리뷰 B·C major). keyless·결정적(OpenAlex 호출 0)·in-DB·enqueue 0(콘텐츠 무변경). 선재 DOI holder 시 parent_doi 마킹(unique 위반 회피). add_job daily 03:50 KST. __main__ CLI. 단위 28 passed(+parse_arxiv_id·arxiv_doi). 라이브 PASS (prod, running fastapi 무접촉): 레거시 197행 arXiv DataCite 스탬프·ASME 2행 skip·선재중복 0 / dedup 불변식 206 distinct 206(인덱스 무위반) / paper summarize active 0(signal-only). 멱등. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/main.py | 4 ++ app/services/papers/doi.py | 24 ++++++++++ app/workers/arxiv_collector.py | 14 +++--- app/workers/paper_doi_reconcile.py | 76 ++++++++++++++++++++++++++++++ tests/test_paper_doi_units.py | 20 ++++++++ 5 files changed, 132 insertions(+), 6 deletions(-) create mode 100644 app/workers/paper_doi_reconcile.py diff --git a/app/main.py b/app/main.py index ca85f58..edfa6f7 100644 --- a/app/main.py +++ b/app/main.py @@ -58,6 +58,7 @@ async def lifespan(app: FastAPI): from workers.news_collector import run as news_collector_run from workers.arxiv_collector import run as arxiv_collector_run from workers.openalex_collector import run as openalex_collector_run + from workers.paper_doi_reconcile import run as paper_doi_reconcile_run from workers.fulltext_worker import reconcile_unresolved as fulltext_reconcile_run from workers.kosha_collector import run as kosha_collector_run from workers.csb_collector import run as csb_collector_run @@ -141,6 +142,9 @@ async def lifespan(app: FastAPI): # plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산. # soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌). scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile") + # B-3 PR4: 레거시 paper 행 arXiv DataCite DOI 스탬프(재유입 차단). keyless·in-DB·enqueue 0. + # dedup_reconcile(03:30)·fulltext_reconcile(03:40) 와 별 worker·비충돌 슬롯. + scheduler.add_job(paper_doi_reconcile_run, CronTrigger(hour=3, minute=50, timezone=KST), id="paper_doi_reconcile") # crawl-24x7 C-2: KOSHA 재해사례 diff + GUIDE 점진 백필 (daily, 새벽 잡들과 비충돌 슬롯). scheduler.add_job(kosha_collector_run, CronTrigger(hour=6, minute=40, timezone=KST), id="kosha_collector") # 사이클 3 C-2 잔여: CSB sitemap lastmod diff (weekly 월, cap 40 + 워터마크 점진 백필). diff --git a/app/services/papers/doi.py b/app/services/papers/doi.py index 99ed8be..d976165 100644 --- a/app/services/papers/doi.py +++ b/app/services/papers/doi.py @@ -14,6 +14,7 @@ plan safety-library-b3-1 PR1 (keyless·마이그 0). """ import hashlib +import re # 소문자화 후 비교하므로 전부 소문자 prefix. 긴 것부터(dx.doi.org 가 doi.org 보다 먼저). _DOI_PREFIXES = ( @@ -50,6 +51,29 @@ def normalize_doi(raw: str | None) -> str | None: return s +# arXiv id: 신형 'YYMM.NNNNN'(+vN) 또는 구형 'archive(.SUBJ)/NNNNNNN'. 'arXiv:' 접두 흡수. +_ARXIV_ID_RE = re.compile( + r"arxiv:\s*([a-z\-]+(?:\.[a-z]{2})?/\d{7}|\d{4}\.\d{4,5})(v\d+)?", re.IGNORECASE +) + + +def parse_arxiv_id(text: str | None) -> str | None: + """본문/제목에서 arXiv id(versionless) 추출. 없으면 None. 레거시 reconcile 의 입력.""" + if not text: + return None + m = _ARXIV_ID_RE.search(text) + return m.group(1) if m else None + + +def arxiv_doi(arxiv_id: str | None) -> str | None: + """arXiv DataCite DOI = 10.48550/arxiv.{id} (정규화). 저널 DOI 없는 프리프린트의 canonical + paper.doi 통일 키 — OpenAlex 가 프리프린트에 동일 DOI 부여(실측 확인). 모든 수집기·reconcile 가 + 같은 함수로 같은 DOI 를 써야 교차소스 dedup 이 성립.""" + if not arxiv_id: + return None + return normalize_doi(f"10.48550/arXiv.{arxiv_id}") + + def paper_doi_hash(normalized_doi: str) -> str: """서지 holder 의 Document.file_hash — sha256('paper|{doi}')[:32]. diff --git a/app/workers/arxiv_collector.py b/app/workers/arxiv_collector.py index 562f69c..386733f 100644 --- a/app/workers/arxiv_collector.py +++ b/app/workers/arxiv_collector.py @@ -29,7 +29,7 @@ from core.utils import setup_logger from models.document import Document from models.news_source import NewsSource from models.queue import enqueue_stage -from services.papers.doi import normalize_doi +from services.papers.doi import arxiv_doi, normalize_doi from services.papers.holder import find_paper_holder from workers.news_collector import ( FeedError, @@ -161,11 +161,11 @@ def parse_arxiv_feed(xml_text: str) -> tuple[int, list[ArxivEntry]]: # ───────────────────────── 적재 (DB — PR2 라이브 검증) ───────────────────────── -def _build_paper_meta(source: NewsSource, entry: ArxivEntry) -> dict: +def _build_paper_meta(source: NewsSource, entry: ArxivEntry, doi: str | None) -> dict: """extract_meta — license + source + paper 식별. 서지 holder 는 paper.doi(있으면) 보유.""" paper: dict = {"arxiv_id": entry.arxiv_id} - if entry.doi: - paper["doi"] = entry.doi # partial-unique 인덱스 진입 (교차소스 dedup) + if doi: + paper["doi"] = doi # partial-unique 인덱스 진입 (교차소스 dedup) if entry.journal_ref: paper["journal_ref"] = entry.journal_ref if entry.primary_category: @@ -193,8 +193,10 @@ async def _ingest_entry(session, source: NewsSource, entry: ArxivEntry) -> bool: ) if dup.scalars().first(): return False + # arXiv canonical DOI = 저널 DOI 또는 arXiv DataCite DOI(프리프린트도 paper.doi 보유 → PR3 와 dedup) + doi = entry.doi or arxiv_doi(entry.arxiv_id) # 교차소스 dedup(DOI holder 이미 존재 — partial-unique 인덱스 백스톱 선제 회피) - if entry.doi and await find_paper_holder(session, entry.doi): + if doi and await find_paper_holder(session, doi): return False body = entry.summary or entry.title @@ -217,7 +219,7 @@ async def _ingest_entry(session, source: NewsSource, entry: ArxivEntry) -> bool: material_type="paper", jurisdiction=None, # paper = NULL 불변(A-2). 지역은 extract_meta.paper.source_region. published_date=entry.published.date() if entry.published else None, - extract_meta=_build_paper_meta(source, entry), + extract_meta=_build_paper_meta(source, entry, doi), ) session.add(doc) await session.flush() diff --git a/app/workers/paper_doi_reconcile.py b/app/workers/paper_doi_reconcile.py new file mode 100644 index 0000000..c0e20e9 --- /dev/null +++ b/app/workers/paper_doi_reconcile.py @@ -0,0 +1,76 @@ +"""레거시 paper 행 DOI reconcile — B-3 PR4 (plan safety-library-b3-1). + +paper.doi 없는 paper 행(레거시 194 arXiv/ASME 초록 + 수집기 누락분)을 arXiv DataCite DOI 로 스탬프해 +partial-unique 인덱스에 편입 → 향후 수집기 재유입을 find_paper_holder 가 차단('동일-DOI 재유입 차단만'). + +- KEYLESS·결정적: arXiv id(paper.arxiv_id 또는 extracted_text 파싱) → arxiv_doi(10.48550/arxiv.{id}, + OpenAlex canonical 실측 일치). OpenAlex 호출 불요. arXiv id 없는 ASME RSS 행은 skip. +- ★dedup_reconcile(file_hash 캐시 재계산·무IO)와 **별 worker** — 적대리뷰 B·C major: 외부키/네트워크 + 실패모드를 캐시 무결성 잡에 결합하지 않음(여긴 keyless 라 네트워크도 없음, 순수 in-DB 메타 갱신). +- 이미 같은 DOI holder 존재 = 선재 중복 → 스탬프 대신 parent_doi 마킹(unique 위반 회피). +- 콘텐츠(extracted_text) 무변경, 메타만 갱신 → 어떤 stage 도 enqueue 안 함(summarize/embed/chunk 0 자명). +""" + +import asyncio + +from sqlalchemy import select + +from core.database import async_session +from core.utils import setup_logger +from models.document import Document +from services.papers.doi import arxiv_doi, parse_arxiv_id, with_paper_doi, with_parent_doi +from services.papers.holder import find_paper_holder + +logger = setup_logger("paper_doi_reconcile") + +_DOI_TEXT = Document.extract_meta[("paper", "doi")].astext + + +async def run(limit: int = 0) -> None: + """paper.doi 없는 paper 행을 arXiv DOI 로 스탬프(멱등). limit=0 = 전건.""" + stamped = marked_dup = skipped_no_arxiv = 0 + async with async_session() as session: + q = ( + select(Document) + .where(Document.material_type == "paper", _DOI_TEXT.is_(None)) + .order_by(Document.id) + ) + if limit: + q = q.limit(limit) + rows = (await session.execute(q)).scalars().all() + + for row in rows: + meta = dict(row.extract_meta or {}) + paper = dict(meta.get("paper") or {}) + arxiv_id = paper.get("arxiv_id") or parse_arxiv_id(row.extracted_text) + doi = arxiv_doi(arxiv_id) + if not doi: + skipped_no_arxiv += 1 + continue + paper["arxiv_id"] = arxiv_id + meta["paper"] = paper + holder = await find_paper_holder(session, doi) + if holder is not None and holder.id != row.id: + # 선재 중복(다른 행이 이미 이 DOI holder) → 자식 마킹(인덱스 밖, unique 위반 회피) + row.extract_meta = with_parent_doi(meta, doi) + marked_dup += 1 + else: + # 스탬프 → 이 행이 holder, partial-unique 인덱스 진입 (재유입 차단 성립) + row.extract_meta = with_paper_doi(meta, doi) + stamped += 1 + # 콘텐츠 무변경 → enqueue 없음 (summarize/embed/chunk 0) + await session.commit() + + logger.info( + f"[paper_doi_reconcile] paper.doi 없는 {len(rows)}행 → 스탬프 {stamped} · " + f"선재중복 마킹 {marked_dup} · arXiv id 없음 skip {skipped_no_arxiv}" + ) + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="레거시 paper DOI reconcile (arXiv DataCite, keyless)") + parser.add_argument("--limit", type=int, default=0, help="처리 상한(0=전건)") + args = parser.parse_args() + asyncio.run(run(limit=args.limit)) diff --git a/tests/test_paper_doi_units.py b/tests/test_paper_doi_units.py index ca3b27c..e0a5be3 100644 --- a/tests/test_paper_doi_units.py +++ b/tests/test_paper_doi_units.py @@ -9,8 +9,10 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent / "app")) from services.papers.doi import ( # noqa: E402 + arxiv_doi, normalize_doi, paper_doi_hash, + parse_arxiv_id, read_paper_doi, with_paper_doi, with_parent_doi, @@ -109,3 +111,21 @@ def test_read_paper_doi(): assert read_paper_doi(None) is None assert read_paper_doi({"paper": {"parent_doi": "10.1/p"}}) is None # child 는 doi 없음 assert read_paper_doi({"paper": {}}) is None + + +# ─── PR4: arXiv id 파싱 + arXiv DataCite DOI (교차소스 dedup 통일 키) ─── + +def test_parse_arxiv_id(): + assert parse_arxiv_id("Title arXiv:2606.10236v1 Announce Type: new Abstract") == "2606.10236" + assert parse_arxiv_id("see arXiv:2601.02852 for details") == "2601.02852" + assert parse_arxiv_id("arXiv:cond-mat/0703470v2") == "cond-mat/0703470" + assert parse_arxiv_id("no arxiv here") is None + assert parse_arxiv_id(None) is None + + +def test_arxiv_doi_canonical(): + # OpenAlex canonical 실측 일치: 10.48550/arxiv.{id} (소문자) + assert arxiv_doi("2606.10236") == "10.48550/arxiv.2606.10236" + assert arxiv_doi(None) is None + # 수집기·reconcile 가 같은 함수 → 같은 paper.doi (교차소스 dedup 성립) + assert arxiv_doi(parse_arxiv_id("x arXiv:2606.10236v1 y")) == "10.48550/arxiv.2606.10236"