2ad32c5c84
arxiv/openalex 수집기가 run_cap 도달로 카테고리/시드 중도 절단돼도 워터마크를 newest 로 전진시켜, [oldest-ingested, 옛 watermark] 사이 미적재 항목이 다음 run 의 watermark 필터에 영구 배제되던 silent data loss 수정. capped 플래그: cap 으로 루프 절단 시 set → 워터마크 미전진. 미전진하면 다음 run 이 최신부터 재스캔하며 적재분은 dedup-skip(cap 미소모)하고 gap 까지 내려가 이어 적재 → 백로그 run 당 cap 소화(livelock 회피). 정상 완주(watermark 도달/cursor 소진) 시에만 전진. bulk(CLI)은 cap 무관. docstring 의 '다음 run 이월' 약속을 실제 동작과 일치. 검증: py_compile 통과. kosha 부분실패 per-case commit 은 R4 후속. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
379 lines
16 KiB
Python
379 lines
16 KiB
Python
"""arXiv 키워드 필터 수집기 — B-3 PR2 (plan safety-library-b3-1).
|
|
|
|
bespoke arXiv API(Atom) 수집기. 카테고리 RSS 통째(firehose)가 아니라
|
|
cat:{category} AND (abs:키워드 ...) 로 안전/신뢰성/압력용기 관련분만 좁혀 수집한다.
|
|
|
|
- signal-only: 초록만 색인(embed+chunk), summarize 절대 미enqueue — 맥미니 Qwen 큐 무접촉.
|
|
- DOI 보유 → paper.doi(서지 holder, partial-unique 인덱스 진입). 없으면 versionless arXiv id 로
|
|
dedup(향후 PR4 reconcile 가 DOI 백필).
|
|
- etiquette: 요청 간 ≥3s + HTTP 429 지수 백오프. 카테고리별 submittedDate 워터마크로 증분.
|
|
- per-run insert cap(_RUN_CAP) — 광역 수집이 GPU bge-m3 embed 큐를 범람시키지 않게(적대리뷰 A major).
|
|
잔여는 silent-cap 금지(csb idiom): 누락 건수 로깅.
|
|
- keyless. enabled=False news_sources 행(6h 뉴스 사이클 비대상) + main.py CronTrigger(자체 폴링).
|
|
- arXiv API 는 https 필수(http=301). UA = CRAWL_UA.
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
|
|
from core.crawl_politeness import CRAWL_UA
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.document import Document
|
|
from models.news_source import NewsSource
|
|
from models.queue import enqueue_stage
|
|
from services.papers.doi import arxiv_doi, normalize_doi
|
|
from services.papers.holder import find_paper_holder
|
|
from workers.news_collector import (
|
|
FeedError,
|
|
_get_or_create_health,
|
|
_record_failure,
|
|
_record_success,
|
|
)
|
|
|
|
logger = setup_logger("arxiv_collector")
|
|
|
|
_ARXIV_API = "https://export.arxiv.org/api/query"
|
|
_SOURCE_NAME = "arXiv 안전·공학 (keyword)"
|
|
|
|
# 신규 카테고리만 — 기존 RSS 행(id 62 physics.app-ph, id 64 cond-mat.mtrl-sci)과 비중복.
|
|
_CATEGORIES = (
|
|
"eess.SY", # systems & control
|
|
"physics.flu-dyn", # 유체 — 압력/유동
|
|
"physics.comp-ph", # 전산물리
|
|
"math.OC", # 최적화·제어
|
|
"math.NA", # 수치해석 (FEM 등)
|
|
"stat.AP", # 응용통계 — 신뢰성
|
|
"cs.CE", # computational engineering
|
|
)
|
|
# 압력용기·공정안전·구조건전성 도메인 키워드(abs: OR 게이트). 좁게 유지 = 관련성↑·볼륨↓ (튜너블).
|
|
_KEYWORDS = (
|
|
"pressure vessel",
|
|
"process safety",
|
|
"structural integrity",
|
|
"fracture mechanics",
|
|
"fatigue life",
|
|
"corrosion",
|
|
)
|
|
|
|
_RUN_CAP = 80 # 1회 run 신규 적재 상한(임베드 큐 보호). bulk 시 해제.
|
|
_PAGE_SIZE = 50 # max_results per request
|
|
_MAX_PAGES_PER_CAT = 4 # 카테고리당 최대 페이지(증분이라 보통 1페이지에 워터마크 도달)
|
|
_REQ_SLEEP = 3.0 # arXiv etiquette ≥3s
|
|
_MAX_RETRY = 4
|
|
_BACKOFF_BASE = 5.0
|
|
|
|
_NS = {
|
|
"a": "http://www.w3.org/2005/Atom",
|
|
"arxiv": "http://arxiv.org/schemas/atom",
|
|
"opensearch": "http://a9.com/-/spec/opensearch/1.1/",
|
|
}
|
|
_ABS_ID_RE = re.compile(r"arxiv\.org/abs/(.+?)(v\d+)?$")
|
|
_WS_RE = re.compile(r"\s+")
|
|
|
|
|
|
# ───────────────────────── 순수 파서 (fixture 단위 테스트 대상) ─────────────────────────
|
|
|
|
@dataclass
|
|
class ArxivEntry:
|
|
arxiv_id: str # versionless, 예: "1209.2405"
|
|
version: str | None # "v1" 또는 None
|
|
title: str
|
|
summary: str # 초록
|
|
published: datetime | None
|
|
doi: str | None # normalize_doi 적용
|
|
journal_ref: str | None
|
|
primary_category: str | None
|
|
categories: list = field(default_factory=list)
|
|
abs_url: str | None = None
|
|
pdf_url: str | None = None
|
|
|
|
|
|
def _clean(text: str | None) -> str:
|
|
return _WS_RE.sub(" ", text).strip() if text else ""
|
|
|
|
|
|
def _parse_id(raw_id: str | None) -> tuple[str | None, str | None]:
|
|
"""'http://arxiv.org/abs/1209.2405v1' → ('1209.2405', 'v1'). versionless id 가 dedup 키."""
|
|
m = _ABS_ID_RE.search((raw_id or "").strip())
|
|
if not m:
|
|
return None, None
|
|
return m.group(1), m.group(2)
|
|
|
|
|
|
def _parse_dt(s: str | None) -> datetime | None:
|
|
if not s:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(s.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def build_search_query(category: str, keywords=_KEYWORDS) -> str:
|
|
"""cat:{category} AND (abs:kw1 OR abs:"kw with space" ...). 공백 키워드는 따옴표 구절."""
|
|
kw = " OR ".join(f'abs:"{k}"' if " " in k else f"abs:{k}" for k in keywords)
|
|
return f"cat:{category} AND ({kw})"
|
|
|
|
|
|
def parse_arxiv_feed(xml_text: str) -> tuple[int, list[ArxivEntry]]:
|
|
"""arXiv Atom 응답 → (total_results, [ArxivEntry]). 순수 함수."""
|
|
root = ET.fromstring(xml_text)
|
|
raw_total = root.findtext("opensearch:totalResults", default="0", namespaces=_NS)
|
|
try:
|
|
total = int(raw_total)
|
|
except (TypeError, ValueError):
|
|
total = 0
|
|
entries: list[ArxivEntry] = []
|
|
for e in root.findall("a:entry", _NS):
|
|
aid, ver = _parse_id(e.findtext("a:id", namespaces=_NS))
|
|
if not aid:
|
|
continue
|
|
prim = e.find("arxiv:primary_category", _NS)
|
|
abs_url = pdf_url = None
|
|
for ln in e.findall("a:link", _NS):
|
|
if ln.get("rel") == "alternate" and (ln.get("type") or "").startswith("text/html"):
|
|
abs_url = ln.get("href")
|
|
elif ln.get("title") == "pdf":
|
|
pdf_url = ln.get("href")
|
|
entries.append(ArxivEntry(
|
|
arxiv_id=aid,
|
|
version=ver,
|
|
title=_clean(e.findtext("a:title", namespaces=_NS)),
|
|
summary=_clean(e.findtext("a:summary", namespaces=_NS)),
|
|
published=_parse_dt(e.findtext("a:published", namespaces=_NS)),
|
|
doi=normalize_doi(e.findtext("arxiv:doi", namespaces=_NS)),
|
|
journal_ref=_clean(e.findtext("arxiv:journal_ref", namespaces=_NS)) or None,
|
|
primary_category=prim.get("term") if prim is not None else None,
|
|
categories=[c.get("term") for c in e.findall("a:category", _NS)],
|
|
abs_url=abs_url,
|
|
pdf_url=pdf_url,
|
|
))
|
|
return total, entries
|
|
|
|
|
|
# ───────────────────────── 적재 (DB — PR2 라이브 검증) ─────────────────────────
|
|
|
|
def _build_paper_meta(source: NewsSource, entry: ArxivEntry, doi: str | None) -> dict:
|
|
"""extract_meta — license + source + paper 식별. 서지 holder 는 paper.doi(있으면) 보유."""
|
|
paper: dict = {"arxiv_id": entry.arxiv_id}
|
|
if doi:
|
|
paper["doi"] = doi # partial-unique 인덱스 진입 (교차소스 dedup)
|
|
if entry.journal_ref:
|
|
paper["journal_ref"] = entry.journal_ref
|
|
if entry.primary_category:
|
|
paper["primary_category"] = entry.primary_category
|
|
meta: dict = {
|
|
"source_id": source.id,
|
|
"source_name": source.name,
|
|
"source_region": "INT", # arXiv = 국제 preprint. paper.jurisdiction 은 NULL 유지(A-2).
|
|
"paper": paper,
|
|
# arXiv 기본 라이선스 = 비배포(보수적). restricted 부재 → 초록은 RAG 사용 가능.
|
|
# (명시 CC 검출은 OAI 인터페이스 필요 — Atom API 미포함, PR 후속/관찰.)
|
|
"license": {"scheme": "arxiv", "redistribute": False, "attribution": "arXiv"},
|
|
}
|
|
if entry.published:
|
|
meta["published_at"] = entry.published.isoformat()
|
|
return meta
|
|
|
|
|
|
async def _ingest_entry(session, source: NewsSource, entry: ArxivEntry) -> bool:
|
|
"""1건 적재. 반환 = 신규 여부. signal-only(embed+chunk, summarize 없음)."""
|
|
arxiv_hash = hashlib.sha256(f"arxiv|{entry.arxiv_id}".encode()).hexdigest()[:32]
|
|
# 재수집 dedup(arXiv id) — .first()(다중행 방어)
|
|
dup = await session.execute(
|
|
select(Document.id).where(Document.file_hash == arxiv_hash).limit(1)
|
|
)
|
|
if dup.scalars().first():
|
|
return False
|
|
# arXiv canonical DOI = 저널 DOI 또는 arXiv DataCite DOI(프리프린트도 paper.doi 보유 → PR3 와 dedup)
|
|
doi = entry.doi or arxiv_doi(entry.arxiv_id)
|
|
# 교차소스 dedup(DOI holder 이미 존재 — partial-unique 인덱스 백스톱 선제 회피)
|
|
if doi and await find_paper_holder(session, doi):
|
|
return False
|
|
|
|
body = entry.summary or entry.title
|
|
doc = Document(
|
|
file_path=f"crawl/arxiv/{entry.arxiv_id}",
|
|
file_hash=arxiv_hash,
|
|
file_format="article",
|
|
file_size=len(body.encode()),
|
|
file_type="note",
|
|
title=entry.title,
|
|
extracted_text=f"{entry.title}\n\n{body}",
|
|
extracted_at=datetime.now(timezone.utc),
|
|
extractor_version="arxiv-api-signal",
|
|
md_status="skipped",
|
|
md_extraction_error="arXiv abstract: signal-only, markdown 비대상",
|
|
source_channel="crawl",
|
|
data_origin="external",
|
|
edit_url=entry.abs_url,
|
|
review_status="approved",
|
|
material_type="paper",
|
|
jurisdiction=None, # paper = NULL 불변(A-2). 지역은 extract_meta.paper.source_region.
|
|
published_date=entry.published.date() if entry.published else None,
|
|
extract_meta=_build_paper_meta(source, entry, doi),
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
# signal-only: 검색 색인만. summarize/fulltext 절대 enqueue 안 함(맥미니 큐 무접촉).
|
|
await enqueue_stage(session, doc.id, "embed")
|
|
await enqueue_stage(session, doc.id, "chunk")
|
|
return True
|
|
|
|
|
|
async def _get_or_create_source(session) -> NewsSource:
|
|
result = await session.execute(
|
|
select(NewsSource).where(NewsSource.name == _SOURCE_NAME)
|
|
)
|
|
source = result.scalars().first()
|
|
if source is None:
|
|
source = NewsSource(
|
|
name=_SOURCE_NAME, feed_url=_ARXIV_API, feed_type="atom",
|
|
fetch_method="signal-only", fulltext_policy="none",
|
|
source_channel="crawl", category="Engineering", language="en",
|
|
country=None, # paper → jurisdiction NULL (country 미전파)
|
|
material_type="paper",
|
|
license_scheme="arxiv", license_redistribute=False,
|
|
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 자체 폴링
|
|
)
|
|
session.add(source)
|
|
await session.flush()
|
|
return source
|
|
|
|
|
|
def _watermark(source: NewsSource, category: str) -> datetime | None:
|
|
raw = (source.selector_override or {}).get("arxiv_watermark", {}).get(category)
|
|
if not raw:
|
|
return None
|
|
return _parse_dt(raw)
|
|
|
|
|
|
def _set_watermark(source: NewsSource, category: str, value: datetime) -> None:
|
|
cfg = dict(source.selector_override or {})
|
|
wm = dict(cfg.get("arxiv_watermark") or {})
|
|
wm[category] = value.isoformat()
|
|
cfg["arxiv_watermark"] = wm
|
|
source.selector_override = cfg # JSONB 변경 감지 위해 재할당
|
|
|
|
|
|
async def _fetch(client: httpx.AsyncClient, query: str, start: int) -> str:
|
|
params = {
|
|
"search_query": query, "start": start, "max_results": _PAGE_SIZE,
|
|
"sortBy": "submittedDate", "sortOrder": "descending",
|
|
}
|
|
for attempt in range(_MAX_RETRY):
|
|
resp = await client.get(_ARXIV_API, params=params)
|
|
if resp.status_code == 429:
|
|
await asyncio.sleep(_BACKOFF_BASE * (2 ** attempt))
|
|
continue
|
|
resp.raise_for_status()
|
|
return resp.text
|
|
raise FeedError(f"arXiv 429 재시도 초과: {query[:48]}")
|
|
|
|
|
|
async def run(bulk: bool = False, limit: int = 0) -> None:
|
|
"""daily 진입점(스케줄러). bulk/limit 은 CLI 전용(bulk=cap 해제·깊은 페이징)."""
|
|
now = datetime.now(timezone.utc)
|
|
async with async_session() as session:
|
|
source = await _get_or_create_source(session)
|
|
await session.commit()
|
|
source_id = source.id
|
|
|
|
run_cap = (limit or 10**9) if bulk else (min(limit, _RUN_CAP) if limit else _RUN_CAP)
|
|
inserted = 0
|
|
seen = 0
|
|
failures: list[str] = []
|
|
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, headers={"User-Agent": CRAWL_UA}, follow_redirects=True
|
|
) as client:
|
|
for category in _CATEGORIES:
|
|
if inserted >= run_cap:
|
|
break
|
|
query = build_search_query(category)
|
|
async with async_session() as session:
|
|
src = await session.get(NewsSource, source_id)
|
|
watermark = _watermark(src, category)
|
|
newest_seen: datetime | None = None
|
|
capped = False # 이번 run 이 cap 으로 카테고리 중도 절단됐는지 (R4)
|
|
max_pages = (10**6 if bulk else _MAX_PAGES_PER_CAT)
|
|
try:
|
|
for page in range(max_pages):
|
|
if inserted >= run_cap:
|
|
capped = True
|
|
break
|
|
xml_text = await _fetch(client, query, page * _PAGE_SIZE)
|
|
total, entries = parse_arxiv_feed(xml_text)
|
|
if not entries:
|
|
break
|
|
stop = False
|
|
for entry in entries:
|
|
seen += 1
|
|
if entry.published:
|
|
newest_seen = max(newest_seen or entry.published, entry.published)
|
|
# 증분: 워터마크 이하 도달 시 이 카테고리 종료(이미 본 구간)
|
|
if watermark and not bulk and entry.published <= watermark:
|
|
stop = True
|
|
break
|
|
async with async_session() as session:
|
|
src = await session.get(NewsSource, source_id)
|
|
if await _ingest_entry(session, src, entry):
|
|
inserted += 1
|
|
await session.commit()
|
|
else:
|
|
await session.rollback()
|
|
if inserted >= run_cap:
|
|
capped = True
|
|
break
|
|
await asyncio.sleep(_REQ_SLEEP)
|
|
if stop or (page + 1) * _PAGE_SIZE >= total:
|
|
break
|
|
# 카테고리 워터마크 전진 — cap 으로 절단된 run 은 미전진 (R4).
|
|
# 절단 시 newest_seen 으로 전진하면 [oldest-ingested, 옛 watermark] 사이
|
|
# 미적재 항목이 다음 run 의 watermark 필터(entry.published <= watermark)에
|
|
# 영구 배제(silent data loss). 미전진하면 다음 run 이 최신부터 재스캔하며
|
|
# 적재분은 dedup-skip(_ingest_entry False, cap 미소모)하고 gap 까지 내려가
|
|
# 이어 적재 → 백로그가 run 당 cap 씩 소화(livelock 회피). bulk 은 cap 무관.
|
|
if newest_seen and not capped:
|
|
async with async_session() as session:
|
|
src = await session.get(NewsSource, source_id)
|
|
_set_watermark(src, category, newest_seen)
|
|
await session.commit()
|
|
except (httpx.HTTPError, FeedError, ET.ParseError) as e:
|
|
msg = f"[{category}] {e or repr(e)}"
|
|
logger.error(f"[arxiv] {msg}")
|
|
failures.append(msg)
|
|
|
|
async with async_session() as session:
|
|
health = await _get_or_create_health(session, source_id)
|
|
if failures and inserted == 0:
|
|
_record_failure(health, "; ".join(failures)[:500], now)
|
|
else:
|
|
_record_success(health, inserted, False, now)
|
|
await session.commit()
|
|
|
|
deferred = "" if inserted < run_cap else f" (cap {run_cap} 도달 — 잔여는 다음 run 이월)"
|
|
logger.info(
|
|
f"[arxiv] {len(_CATEGORIES)}개 카테고리 스캔 {seen}건 → 신규 {inserted}건{deferred}"
|
|
+ (f" / 실패 {len(failures)}건" if failures else "")
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# CLI = 수동/백필 전용. --bulk = cap 해제·깊은 페이징, --limit N = 상한 N(라이브 검증용).
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="arXiv 안전·공학 키워드 수집기")
|
|
parser.add_argument("--bulk", action="store_true", help="cap 해제 + 깊은 페이징 백필")
|
|
parser.add_argument("--limit", type=int, default=0, help="신규 적재 상한(0=기본 cap)")
|
|
args = parser.parse_args()
|
|
asyncio.run(run(bulk=args.bulk, limit=args.limit))
|