"""C-4 ① API 표준 공지(Important Standards Announcements) 수집 워커 (사이클 3). RSS 없음. 실측(2026-06-11) 결과 '페이지 diff' 가 아니라 공지별 상세 URL 이 있는 목록 페이지(10건/페이지, ?page=N&pageSize=10 페이지네이션 ~12+) — 목록 링크 파싱 → 신규 상세 페이지만 ingest 가 정확하고 dedup 도 자연스럽다 (rss+page 패턴의 HTML 판). 510/570/653 개정 공지가 업무 직결 — 표준 본문은 유료라 공지만 수집 (카드 C-4). 스케줄 = monthly (main.py 5일 07:05 KST) — 최근 2페이지 diff (월 1~2건 공지 페이스). 초기 일괄: docker exec hyungi_document_server-fastapi-1 \ python -m workers.api_standards_collector --bulk # 전 페이지 (~120건, politeness ~30분) 멱등: edit_url(정규화)+file_hash dedup — 재실행 = 신규분만. """ import argparse import asyncio import hashlib import re from datetime import datetime, timezone from sqlalchemy import select from core.crawl_politeness import ( CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page, ) from core.database import async_session from core.utils import setup_logger from models.document import Document from models.news_source import NewsSource from models.queue import enqueue_stage from workers.fulltext_worker import ( _WEB_MIN_BODY_LEN, _extract_body, _raw_html_path, _save_raw_html, _strip_article_footer, ) from workers.news_collector import ( _get_or_create_health, _normalize_url, _record_failure, _record_success, ) from workers.static_corpus_ingest import _page_title logger = setup_logger("api_standards") _BASE = "https://www.api.org" _LISTING_PATH = "/products-and-services/standards/important-standards-announcements" _LISTING_URL = f"{_BASE}{_LISTING_PATH}" _SOURCE_NAME = "API 표준 공지" _SCHEDULED_PAGES = 2 # monthly diff 범위 (20건 — 월 1~2건 페이스에 충분한 겹침) _BULK_MAX_PAGES = 15 # 실측 12페이지 + 여유. 빈 페이지에서 조기 종료. _DETAIL_RE = re.compile( r'href="(' + re.escape(_LISTING_PATH) + r'/[^"?#]+)"' ) _DATE_RE = re.compile( r"(January|February|March|April|May|June|July|August|September|October" r"|November|December)\s+(\d{1,2}),?\s+(\d{4})" ) _MONTHS = {m: i for i, m in enumerate( ["January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"], start=1)} def _parse_listing(html_text: str) -> list[str]: """상세 공지 절대 URL — 순서 보존 dedup (페이지네이션 링크는 ?가 패턴에서 배제).""" seen: set[str] = set() out: list[str] = [] for m in _DETAIL_RE.finditer(html_text): url = f"{_BASE}{m.group(1)}" if url not in seen: seen.add(url) out.append(url) return out def _parse_pub_date(text: str) -> datetime | None: """본문 첫 'Month DD, YYYY' — 공지 게시일 관행. 실패 = None (색인은 채널 게이트로 무조건).""" m = _DATE_RE.search(text) if not m: return None try: return datetime(int(m.group(3)), _MONTHS[m.group(1)], int(m.group(2)), tzinfo=timezone.utc) except ValueError: return None async def _get_or_create_source(session) -> NewsSource: result = await session.execute( select(NewsSource).where(NewsSource.name == _SOURCE_NAME) ) source = result.scalars().first() if source is None: source = NewsSource( name=_SOURCE_NAME, feed_url=_LISTING_URL, feed_type="rss", fetch_method="page", fulltext_policy="none", source_channel="crawl", category="Engineering", language="en", country="US", enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 monthly 폴링 ) session.add(source) await session.flush() return source async def _ingest_detail(session, source: NewsSource, url: str) -> str: """공지 1건. 반환: 'ok' / 'dup' / 'skip'.""" normalized_url = _normalize_url(url) ann_hash = hashlib.sha256(f"api-ann|{normalized_url}".encode()).hexdigest()[:32] existing = await session.execute( select(Document).where( (Document.file_hash == ann_hash) | (Document.edit_url.in_([normalized_url, url])) ).limit(1) ) if existing.scalars().first(): return "dup" try: html_text, final_url = await fetch_page(url) except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e: logger.warning(f"[api-std] fetch 실패 skip: {url} — {type(e).__name__}: {e}") return "skip" body, engine, engine_ver = _extract_body(html_text) if not engine: logger.warning(f"[api-std] 추출 실패 skip (< {_WEB_MIN_BODY_LEN}자): {url}") return "skip" clean_body = _strip_article_footer(body.replace("\x00", "")) if len(clean_body) < _WEB_MIN_BODY_LEN: return "skip" now = datetime.now(timezone.utc) raw_path = _raw_html_path(source.id, ann_hash, now) raw_saved = True try: _save_raw_html(raw_path, html_text) except OSError as e: raw_saved = False logger.error(f"[api-std] 원본 보존 실패 (ingest 는 진행): {e}") pub_dt = _parse_pub_date(clean_body) title = _page_title(html_text, fallback=url.rsplit("/", 1)[-1][:90]) title = re.sub(r"\s*\|\s*API\s*$", "", title).strip() or title doc = Document( file_path=f"crawl/{_SOURCE_NAME}/{ann_hash}", file_hash=ann_hash, file_format="article", file_size=0, file_type="note", title=title, extracted_text=f"{title}\n\n{clean_body}", extracted_at=now, extractor_version=f"listing+page@{engine}", md_content=clean_body, md_status="success", md_extraction_engine=engine, md_extraction_engine_version=engine_ver, md_format_version="1.0", md_generated_at=now, md_source_hash=hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest(), md_content_hash=hashlib.sha256(clean_body.encode("utf-8")).hexdigest(), content_origin="extracted", source_channel="crawl", data_origin="external", edit_url=normalized_url, review_status="approved", ai_domain="Engineering", ai_sub_group=_SOURCE_NAME, ai_tags=["Engineering/API 표준 공지"], # 안전 자료실 A-2 — 표준 '공지' = standard (코드 본문 아님 — ASME/API 본문은 paywall) material_type="standard", jurisdiction="US", published_date=pub_dt.date() if pub_dt else None, extract_meta={ "source_id": source.id, "source_name": _SOURCE_NAME, "published_at": pub_dt.isoformat() if pub_dt else None, "license": {"scheme": "proprietary", "redistribute": False, "attribution": "American Petroleum Institute"}, "fulltext": { "status": "api_announcement", "engine": engine, "final_url": final_url, "raw_html_path": str(raw_path) if raw_saved else None, "body_chars": len(clean_body), "resolved_at": now.isoformat(), }, }, ) doc.file_size = len(doc.extracted_text.encode()) session.add(doc) await session.flush() await enqueue_stage(session, doc.id, "summarize") await enqueue_stage(session, doc.id, "embed") await enqueue_stage(session, doc.id, "chunk") logger.info(f"[api-std] ingest {len(clean_body)}자 ({engine}): {title[:60]}") return "ok" async def run(bulk: bool = False) -> None: """monthly 진입점 (스케줄러) — bulk 는 CLI 전용 (전 페이지 일괄).""" now = datetime.now(timezone.utc) async with async_session() as session: source = await _get_or_create_source(session) await session.commit() source_id = source.id max_pages = _BULK_MAX_PAGES if bulk else _SCHEDULED_PAGES counts = {"ok": 0, "dup": 0, "skip": 0} try: for page in range(1, max_pages + 1): listing_url = ( _LISTING_URL if page == 1 else f"{_LISTING_URL}?page={page}&pageSize=10" ) html_text, _ = await fetch_page(listing_url) detail_urls = _parse_listing(html_text) if not detail_urls: break # 빈 페이지 = 끝 (bulk 조기 종료) for url in detail_urls: async with async_session() as session: src = await session.get(NewsSource, source_id) status = await _ingest_detail(session, src, url) await session.commit() counts[status] += 1 logger.info(f"[api-std] 목록 p{page}: 누적 {counts}") except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e: logger.error(f"[api-std] 목록 수집 실패: {e}") async with async_session() as session: health = await _get_or_create_health(session, source_id) _record_failure(health, str(e) or repr(e), now) await session.commit() return async with async_session() as session: health = await _get_or_create_health(session, source_id) _record_success(health, counts["ok"], False, now) src = await session.get(NewsSource, source_id) src.last_fetched_at = now await session.commit() logger.info(f"[api-std] 완료: {counts}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="API 표준 공지 수집") parser.add_argument("--bulk", action="store_true", help="전 페이지 일괄 (초기 백필)") args = parser.parse_args() asyncio.run(run(bulk=args.bulk))