feat(papers): B-3 PR2 — arXiv 키워드 필터 수집기 (signal-only, per-run cap)
plan safety-library-b3-1 PR2 (keyless). DOI 코어(PR1) 위 첫 실수집기.
- bespoke arXiv API(Atom) 수집기: cat:{category} AND (abs:키워드) — RSS 통째(firehose) 아님.
신규 7 카테고리(eess.SY·physics.flu-dyn/comp-ph·math.OC/NA·stat.AP·cs.CE) x 압력용기/공정안전 키워드.
- signal-only: 초록만 색인(embed+chunk), summarize 절대 미enqueue(맥미니 큐 무접촉).
- DOI 보유 -> extract_meta.paper.doi(holder, partial-unique 인덱스). 없으면 arXiv id dedup.
교차소스 dedup = find_paper_holder(PR1) + arxiv id file_hash. paper.source_region=INT(jurisdiction NULL 유지).
- per-run insert cap(_RUN_CAP=80) — 광역 수집이 GPU embed 큐 범람 방지(적대리뷰 A major), 잔여 로깅.
- etiquette: >=3s + 429 백오프 + 카테고리별 submittedDate 워터마크 증분. https 필수(http=301).
- enabled=False news_sources 행 + main.py CronTrigger(daily 07:30 KST). __main__ CLI(--bulk/--limit).
순수 파서·쿼리빌더 fixture 단위 18 passed(arxiv 실응답 박제: DOI/journal_ref/둘다없음 3경로).
적재(run/_ingest_entry)는 news_collector signal-only 패턴 미러 — 배포 후 라이브 검증.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,368 @@
|
||||
"""arXiv 키워드 필터 수집기 — B-3 PR2 (plan safety-library-b3-1).
|
||||
|
||||
bespoke arXiv API(Atom) 수집기. 카테고리 RSS 통째(firehose)가 아니라
|
||||
cat:{category} AND (abs:키워드 ...) 로 안전/신뢰성/압력용기 관련분만 좁혀 수집한다.
|
||||
|
||||
- signal-only: 초록만 색인(embed+chunk), summarize 절대 미enqueue — 맥미니 Qwen 큐 무접촉.
|
||||
- DOI 보유 → paper.doi(서지 holder, partial-unique 인덱스 진입). 없으면 versionless arXiv id 로
|
||||
dedup(향후 PR4 reconcile 가 DOI 백필).
|
||||
- etiquette: 요청 간 ≥3s + HTTP 429 지수 백오프. 카테고리별 submittedDate 워터마크로 증분.
|
||||
- per-run insert cap(_RUN_CAP) — 광역 수집이 GPU bge-m3 embed 큐를 범람시키지 않게(적대리뷰 A major).
|
||||
잔여는 silent-cap 금지(csb idiom): 누락 건수 로깅.
|
||||
- keyless. enabled=False news_sources 행(6h 뉴스 사이클 비대상) + main.py CronTrigger(자체 폴링).
|
||||
- arXiv API 는 https 필수(http=301). UA = CRAWL_UA.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import re
|
||||
import xml.etree.ElementTree as ET
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import httpx
|
||||
from sqlalchemy import select
|
||||
|
||||
from core.crawl_politeness import CRAWL_UA
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.news_source import NewsSource
|
||||
from models.queue import enqueue_stage
|
||||
from services.papers.doi import normalize_doi
|
||||
from services.papers.holder import find_paper_holder
|
||||
from workers.news_collector import (
|
||||
FeedError,
|
||||
_get_or_create_health,
|
||||
_record_failure,
|
||||
_record_success,
|
||||
)
|
||||
|
||||
logger = setup_logger("arxiv_collector")
|
||||
|
||||
_ARXIV_API = "https://export.arxiv.org/api/query"
|
||||
_SOURCE_NAME = "arXiv 안전·공학 (keyword)"
|
||||
|
||||
# 신규 카테고리만 — 기존 RSS 행(id 62 physics.app-ph, id 64 cond-mat.mtrl-sci)과 비중복.
|
||||
_CATEGORIES = (
|
||||
"eess.SY", # systems & control
|
||||
"physics.flu-dyn", # 유체 — 압력/유동
|
||||
"physics.comp-ph", # 전산물리
|
||||
"math.OC", # 최적화·제어
|
||||
"math.NA", # 수치해석 (FEM 등)
|
||||
"stat.AP", # 응용통계 — 신뢰성
|
||||
"cs.CE", # computational engineering
|
||||
)
|
||||
# 압력용기·공정안전·구조건전성 도메인 키워드(abs: OR 게이트). 좁게 유지 = 관련성↑·볼륨↓ (튜너블).
|
||||
_KEYWORDS = (
|
||||
"pressure vessel",
|
||||
"process safety",
|
||||
"structural integrity",
|
||||
"fracture mechanics",
|
||||
"fatigue life",
|
||||
"corrosion",
|
||||
)
|
||||
|
||||
_RUN_CAP = 80 # 1회 run 신규 적재 상한(임베드 큐 보호). bulk 시 해제.
|
||||
_PAGE_SIZE = 50 # max_results per request
|
||||
_MAX_PAGES_PER_CAT = 4 # 카테고리당 최대 페이지(증분이라 보통 1페이지에 워터마크 도달)
|
||||
_REQ_SLEEP = 3.0 # arXiv etiquette ≥3s
|
||||
_MAX_RETRY = 4
|
||||
_BACKOFF_BASE = 5.0
|
||||
|
||||
_NS = {
|
||||
"a": "http://www.w3.org/2005/Atom",
|
||||
"arxiv": "http://arxiv.org/schemas/atom",
|
||||
"opensearch": "http://a9.com/-/spec/opensearch/1.1/",
|
||||
}
|
||||
_ABS_ID_RE = re.compile(r"arxiv\.org/abs/(.+?)(v\d+)?$")
|
||||
_WS_RE = re.compile(r"\s+")
|
||||
|
||||
|
||||
# ───────────────────────── 순수 파서 (fixture 단위 테스트 대상) ─────────────────────────
|
||||
|
||||
@dataclass
|
||||
class ArxivEntry:
|
||||
arxiv_id: str # versionless, 예: "1209.2405"
|
||||
version: str | None # "v1" 또는 None
|
||||
title: str
|
||||
summary: str # 초록
|
||||
published: datetime | None
|
||||
doi: str | None # normalize_doi 적용
|
||||
journal_ref: str | None
|
||||
primary_category: str | None
|
||||
categories: list = field(default_factory=list)
|
||||
abs_url: str | None = None
|
||||
pdf_url: str | None = None
|
||||
|
||||
|
||||
def _clean(text: str | None) -> str:
|
||||
return _WS_RE.sub(" ", text).strip() if text else ""
|
||||
|
||||
|
||||
def _parse_id(raw_id: str | None) -> tuple[str | None, str | None]:
|
||||
"""'http://arxiv.org/abs/1209.2405v1' → ('1209.2405', 'v1'). versionless id 가 dedup 키."""
|
||||
m = _ABS_ID_RE.search((raw_id or "").strip())
|
||||
if not m:
|
||||
return None, None
|
||||
return m.group(1), m.group(2)
|
||||
|
||||
|
||||
def _parse_dt(s: str | None) -> datetime | None:
|
||||
if not s:
|
||||
return None
|
||||
try:
|
||||
return datetime.fromisoformat(s.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def build_search_query(category: str, keywords=_KEYWORDS) -> str:
|
||||
"""cat:{category} AND (abs:kw1 OR abs:"kw with space" ...). 공백 키워드는 따옴표 구절."""
|
||||
kw = " OR ".join(f'abs:"{k}"' if " " in k else f"abs:{k}" for k in keywords)
|
||||
return f"cat:{category} AND ({kw})"
|
||||
|
||||
|
||||
def parse_arxiv_feed(xml_text: str) -> tuple[int, list[ArxivEntry]]:
|
||||
"""arXiv Atom 응답 → (total_results, [ArxivEntry]). 순수 함수."""
|
||||
root = ET.fromstring(xml_text)
|
||||
raw_total = root.findtext("opensearch:totalResults", default="0", namespaces=_NS)
|
||||
try:
|
||||
total = int(raw_total)
|
||||
except (TypeError, ValueError):
|
||||
total = 0
|
||||
entries: list[ArxivEntry] = []
|
||||
for e in root.findall("a:entry", _NS):
|
||||
aid, ver = _parse_id(e.findtext("a:id", namespaces=_NS))
|
||||
if not aid:
|
||||
continue
|
||||
prim = e.find("arxiv:primary_category", _NS)
|
||||
abs_url = pdf_url = None
|
||||
for ln in e.findall("a:link", _NS):
|
||||
if ln.get("rel") == "alternate" and (ln.get("type") or "").startswith("text/html"):
|
||||
abs_url = ln.get("href")
|
||||
elif ln.get("title") == "pdf":
|
||||
pdf_url = ln.get("href")
|
||||
entries.append(ArxivEntry(
|
||||
arxiv_id=aid,
|
||||
version=ver,
|
||||
title=_clean(e.findtext("a:title", namespaces=_NS)),
|
||||
summary=_clean(e.findtext("a:summary", namespaces=_NS)),
|
||||
published=_parse_dt(e.findtext("a:published", namespaces=_NS)),
|
||||
doi=normalize_doi(e.findtext("arxiv:doi", namespaces=_NS)),
|
||||
journal_ref=_clean(e.findtext("arxiv:journal_ref", namespaces=_NS)) or None,
|
||||
primary_category=prim.get("term") if prim is not None else None,
|
||||
categories=[c.get("term") for c in e.findall("a:category", _NS)],
|
||||
abs_url=abs_url,
|
||||
pdf_url=pdf_url,
|
||||
))
|
||||
return total, entries
|
||||
|
||||
|
||||
# ───────────────────────── 적재 (DB — PR2 라이브 검증) ─────────────────────────
|
||||
|
||||
def _build_paper_meta(source: NewsSource, entry: ArxivEntry) -> dict:
|
||||
"""extract_meta — license + source + paper 식별. 서지 holder 는 paper.doi(있으면) 보유."""
|
||||
paper: dict = {"arxiv_id": entry.arxiv_id}
|
||||
if entry.doi:
|
||||
paper["doi"] = entry.doi # partial-unique 인덱스 진입 (교차소스 dedup)
|
||||
if entry.journal_ref:
|
||||
paper["journal_ref"] = entry.journal_ref
|
||||
if entry.primary_category:
|
||||
paper["primary_category"] = entry.primary_category
|
||||
meta: dict = {
|
||||
"source_id": source.id,
|
||||
"source_name": source.name,
|
||||
"source_region": "INT", # arXiv = 국제 preprint. paper.jurisdiction 은 NULL 유지(A-2).
|
||||
"paper": paper,
|
||||
# arXiv 기본 라이선스 = 비배포(보수적). restricted 부재 → 초록은 RAG 사용 가능.
|
||||
# (명시 CC 검출은 OAI 인터페이스 필요 — Atom API 미포함, PR 후속/관찰.)
|
||||
"license": {"scheme": "arxiv", "redistribute": False, "attribution": "arXiv"},
|
||||
}
|
||||
if entry.published:
|
||||
meta["published_at"] = entry.published.isoformat()
|
||||
return meta
|
||||
|
||||
|
||||
async def _ingest_entry(session, source: NewsSource, entry: ArxivEntry) -> bool:
|
||||
"""1건 적재. 반환 = 신규 여부. signal-only(embed+chunk, summarize 없음)."""
|
||||
arxiv_hash = hashlib.sha256(f"arxiv|{entry.arxiv_id}".encode()).hexdigest()[:32]
|
||||
# 재수집 dedup(arXiv id) — .first()(다중행 방어)
|
||||
dup = await session.execute(
|
||||
select(Document.id).where(Document.file_hash == arxiv_hash).limit(1)
|
||||
)
|
||||
if dup.scalars().first():
|
||||
return False
|
||||
# 교차소스 dedup(DOI holder 이미 존재 — partial-unique 인덱스 백스톱 선제 회피)
|
||||
if entry.doi and await find_paper_holder(session, entry.doi):
|
||||
return False
|
||||
|
||||
body = entry.summary or entry.title
|
||||
doc = Document(
|
||||
file_path=f"crawl/arxiv/{entry.arxiv_id}",
|
||||
file_hash=arxiv_hash,
|
||||
file_format="article",
|
||||
file_size=len(body.encode()),
|
||||
file_type="note",
|
||||
title=entry.title,
|
||||
extracted_text=f"{entry.title}\n\n{body}",
|
||||
extracted_at=datetime.now(timezone.utc),
|
||||
extractor_version="arxiv-api-signal",
|
||||
md_status="skipped",
|
||||
md_extraction_error="arXiv abstract: signal-only, markdown 비대상",
|
||||
source_channel="crawl",
|
||||
data_origin="external",
|
||||
edit_url=entry.abs_url,
|
||||
review_status="approved",
|
||||
material_type="paper",
|
||||
jurisdiction=None, # paper = NULL 불변(A-2). 지역은 extract_meta.paper.source_region.
|
||||
published_date=entry.published.date() if entry.published else None,
|
||||
extract_meta=_build_paper_meta(source, entry),
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
# signal-only: 검색 색인만. summarize/fulltext 절대 enqueue 안 함(맥미니 큐 무접촉).
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
return True
|
||||
|
||||
|
||||
async def _get_or_create_source(session) -> NewsSource:
|
||||
result = await session.execute(
|
||||
select(NewsSource).where(NewsSource.name == _SOURCE_NAME)
|
||||
)
|
||||
source = result.scalars().first()
|
||||
if source is None:
|
||||
source = NewsSource(
|
||||
name=_SOURCE_NAME, feed_url=_ARXIV_API, feed_type="atom",
|
||||
fetch_method="signal-only", fulltext_policy="none",
|
||||
source_channel="crawl", category="Engineering", language="en",
|
||||
country=None, # paper → jurisdiction NULL (country 미전파)
|
||||
material_type="paper",
|
||||
license_scheme="arxiv", license_redistribute=False,
|
||||
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 자체 폴링
|
||||
)
|
||||
session.add(source)
|
||||
await session.flush()
|
||||
return source
|
||||
|
||||
|
||||
def _watermark(source: NewsSource, category: str) -> datetime | None:
|
||||
raw = (source.selector_override or {}).get("arxiv_watermark", {}).get(category)
|
||||
if not raw:
|
||||
return None
|
||||
return _parse_dt(raw)
|
||||
|
||||
|
||||
def _set_watermark(source: NewsSource, category: str, value: datetime) -> None:
|
||||
cfg = dict(source.selector_override or {})
|
||||
wm = dict(cfg.get("arxiv_watermark") or {})
|
||||
wm[category] = value.isoformat()
|
||||
cfg["arxiv_watermark"] = wm
|
||||
source.selector_override = cfg # JSONB 변경 감지 위해 재할당
|
||||
|
||||
|
||||
async def _fetch(client: httpx.AsyncClient, query: str, start: int) -> str:
|
||||
params = {
|
||||
"search_query": query, "start": start, "max_results": _PAGE_SIZE,
|
||||
"sortBy": "submittedDate", "sortOrder": "descending",
|
||||
}
|
||||
for attempt in range(_MAX_RETRY):
|
||||
resp = await client.get(_ARXIV_API, params=params)
|
||||
if resp.status_code == 429:
|
||||
await asyncio.sleep(_BACKOFF_BASE * (2 ** attempt))
|
||||
continue
|
||||
resp.raise_for_status()
|
||||
return resp.text
|
||||
raise FeedError(f"arXiv 429 재시도 초과: {query[:48]}")
|
||||
|
||||
|
||||
async def run(bulk: bool = False, limit: int = 0) -> None:
|
||||
"""daily 진입점(스케줄러). bulk/limit 은 CLI 전용(bulk=cap 해제·깊은 페이징)."""
|
||||
now = datetime.now(timezone.utc)
|
||||
async with async_session() as session:
|
||||
source = await _get_or_create_source(session)
|
||||
await session.commit()
|
||||
source_id = source.id
|
||||
|
||||
run_cap = (limit or 10**9) if bulk else (min(limit, _RUN_CAP) if limit else _RUN_CAP)
|
||||
inserted = 0
|
||||
seen = 0
|
||||
failures: list[str] = []
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
timeout=30.0, headers={"User-Agent": CRAWL_UA}, follow_redirects=True
|
||||
) as client:
|
||||
for category in _CATEGORIES:
|
||||
if inserted >= run_cap:
|
||||
break
|
||||
query = build_search_query(category)
|
||||
async with async_session() as session:
|
||||
src = await session.get(NewsSource, source_id)
|
||||
watermark = _watermark(src, category)
|
||||
newest_seen: datetime | None = None
|
||||
max_pages = (10**6 if bulk else _MAX_PAGES_PER_CAT)
|
||||
try:
|
||||
for page in range(max_pages):
|
||||
if inserted >= run_cap:
|
||||
break
|
||||
xml_text = await _fetch(client, query, page * _PAGE_SIZE)
|
||||
total, entries = parse_arxiv_feed(xml_text)
|
||||
if not entries:
|
||||
break
|
||||
stop = False
|
||||
for entry in entries:
|
||||
seen += 1
|
||||
if entry.published:
|
||||
newest_seen = max(newest_seen or entry.published, entry.published)
|
||||
# 증분: 워터마크 이하 도달 시 이 카테고리 종료(이미 본 구간)
|
||||
if watermark and not bulk and entry.published <= watermark:
|
||||
stop = True
|
||||
break
|
||||
async with async_session() as session:
|
||||
src = await session.get(NewsSource, source_id)
|
||||
if await _ingest_entry(session, src, entry):
|
||||
inserted += 1
|
||||
await session.commit()
|
||||
else:
|
||||
await session.rollback()
|
||||
if inserted >= run_cap:
|
||||
break
|
||||
await asyncio.sleep(_REQ_SLEEP)
|
||||
if stop or (page + 1) * _PAGE_SIZE >= total:
|
||||
break
|
||||
# 카테고리 워터마크 전진(이번 run 최신 발행일)
|
||||
if newest_seen:
|
||||
async with async_session() as session:
|
||||
src = await session.get(NewsSource, source_id)
|
||||
_set_watermark(src, category, newest_seen)
|
||||
await session.commit()
|
||||
except (httpx.HTTPError, FeedError, ET.ParseError) as e:
|
||||
msg = f"[{category}] {e or repr(e)}"
|
||||
logger.error(f"[arxiv] {msg}")
|
||||
failures.append(msg)
|
||||
|
||||
async with async_session() as session:
|
||||
health = await _get_or_create_health(session, source_id)
|
||||
if failures and inserted == 0:
|
||||
_record_failure(health, "; ".join(failures)[:500], now)
|
||||
else:
|
||||
_record_success(health, now)
|
||||
await session.commit()
|
||||
|
||||
deferred = "" if inserted < run_cap else f" (cap {run_cap} 도달 — 잔여는 다음 run 이월)"
|
||||
logger.info(
|
||||
f"[arxiv] {len(_CATEGORIES)}개 카테고리 스캔 {seen}건 → 신규 {inserted}건{deferred}"
|
||||
+ (f" / 실패 {len(failures)}건" if failures else "")
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# CLI = 수동/백필 전용. --bulk = cap 해제·깊은 페이징, --limit N = 상한 N(라이브 검증용).
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="arXiv 안전·공학 키워드 수집기")
|
||||
parser.add_argument("--bulk", action="store_true", help="cap 해제 + 깊은 페이징 백필")
|
||||
parser.add_argument("--limit", type=int, default=0, help="신규 적재 상한(0=기본 cap)")
|
||||
args = parser.parse_args()
|
||||
asyncio.run(run(bulk=args.bulk, limit=args.limit))
|
||||
Reference in New Issue
Block a user