From 1842f27d89e026169a140d30de428aabca45f1d7 Mon Sep 17 00:00:00 2001 From: hyungi Date: Wed, 10 Jun 2026 15:08:18 +0900 Subject: [PATCH] =?UTF-8?q?feat(news):=20crawl-24x7=20=EC=82=AC=EC=9D=B4?= =?UTF-8?q?=ED=81=B4=202=20=E2=80=94=20B-2/B-3/C-1/C-2/C-3/C-5=20(?= =?UTF-8?q?=EB=A7=88=EC=9D=B4=EA=B7=B8=20324-326)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 채널 인지화: news_sources.source_channel(324, documents enum 재사용) → 문서 생성 정체성(_doc_identity)·embed/chunk 30일 게이트(crawl=전량 색인)· extract 후속 override(crawl→classify, preview 스킵) 분기. - B-2 Guardian Open Platform: API 디스패치(호스트 분기, 미지 호스트=명시 실패) + show-fields=bodyText 전문 어댑터. fixture live 박제 + call-shape 테스트. - B-3 구독지: playwright-fetcher 격리 컨테이너(동시 1·요청당 브라우저·storage_state ro mount) + politeness 사람속도(30-60s) 브라우저 경로 + fulltext 인증 라우팅 (내용 기반 probe 게이트·relogin_requested 소비=open-스킵보다 앞·본문 페이월 마커 게이트) + source_health probe 컬럼(325) + 세션 박제 스크립트(맥북용). - C-2 KOSHA: 3 API live 검증·fixture 박제(board/attach/guide) — 재해사례 daily diff +첨부 PDF/HWP→extract 파이프라인, GUIDE 일일 cap 점진 백필(silent cap 금지 로그). 키는 URL 직결합(재인코딩 함정 회피). daily 06:40 KST. - C-3 정적 코퍼스: National Board 86 + TWI job-knowledge 153 일괄 CLI(멱등·politeness ·crawl_raw 보존·fulltext_worker 승격 필드 규약 동일). - C-1/C-5 시드(326): 전 URL live 검증 — UK HSE(feed-full)/안전신문/고용노동부 3종 (rss/*.do)/OSHA/EU-OSHA(후보)/SEP/1000-Word(feed-full)/Doing Philosophy/Aeon/Psyche (skip-video quirk). Co-Authored-By: Claude Fable 5 --- app/core/crawl_politeness.py | 93 ++++- app/main.py | 3 + app/models/news_source.py | 14 +- app/models/source_health.py | 10 +- app/workers/fulltext_worker.py | 98 ++++- app/workers/kosha_collector.py | 351 ++++++++++++++++++ app/workers/news_collector.py | 182 ++++++++- app/workers/queue_consumer.py | 3 + app/workers/static_corpus_ingest.py | 262 +++++++++++++ docker-compose.yml | 10 + .../324_news_sources_source_channel.sql | 5 + .../325_source_health_b3_probe_columns.sql | 8 + migrations/326_seed_crawl_cycle2_sources.sql | 33 ++ scripts/capture_subscription_session.py | 59 +++ services/playwright-fetcher/Dockerfile | 14 + services/playwright-fetcher/requirements.txt | 3 + services/playwright-fetcher/server.py | 107 ++++++ ...uardian_open_platform_search_response.json | 1 + tests/fixtures/kosha_attach_response.json | 1 + tests/fixtures/kosha_board_response.json | 1 + tests/fixtures/kosha_guide_response.json | 1 + tests/test_crawl_cycle2_shapes.py | 115 ++++++ 22 files changed, 1358 insertions(+), 16 deletions(-) create mode 100644 app/workers/kosha_collector.py create mode 100644 app/workers/static_corpus_ingest.py create mode 100644 migrations/324_news_sources_source_channel.sql create mode 100644 migrations/325_source_health_b3_probe_columns.sql create mode 100644 migrations/326_seed_crawl_cycle2_sources.sql create mode 100644 scripts/capture_subscription_session.py create mode 100644 services/playwright-fetcher/Dockerfile create mode 100644 services/playwright-fetcher/requirements.txt create mode 100644 services/playwright-fetcher/server.py create mode 100644 tests/fixtures/guardian_open_platform_search_response.json create mode 100644 tests/fixtures/kosha_attach_response.json create mode 100644 tests/fixtures/kosha_board_response.json create mode 100644 tests/fixtures/kosha_guide_response.json create mode 100644 tests/test_crawl_cycle2_shapes.py diff --git a/app/core/crawl_politeness.py b/app/core/crawl_politeness.py index d61e030..7e78ef0 100644 --- a/app/core/crawl_politeness.py +++ b/app/core/crawl_politeness.py @@ -32,6 +32,14 @@ CRAWL_UA = "HyungiPKM-Archiver/1.0 (personal archive; +mailto:hyun49196@gmail.co _DOMAIN_DELAY_MIN = 5.0 _DOMAIN_DELAY_MAX = 15.0 +# 구독 세션(브라우저) fetch 간격 — 사람 속도 (B-3 ④: 기사 간 수십 초) +_AUTH_DELAY_MIN = 30.0 +_AUTH_DELAY_MAX = 60.0 + +# B-3 Playwright 격리 컨테이너 (internal-only, compose DNS) +_FETCHER_URL = "http://playwright-fetcher:3400" +_FETCHER_TIMEOUT = 120.0 # 브라우저 기동 + 네비게이션 + settle 포함 + _ROBOTS_CACHE_TTL = 24 * 3600 # 24h _MAX_PAGE_BYTES = 5 * 1024 * 1024 # 피드 fetch 와 동일 5MB cap _PAGE_TIMEOUT = 20.0 @@ -69,11 +77,15 @@ def _get_lock(domain: str) -> asyncio.Lock: return _domain_locks[domain] -async def _respect_domain_rate(domain: str) -> None: - """같은 도메인 직전 요청에서 5–15초(jitter) 경과할 때까지 대기.""" +async def _respect_domain_rate( + domain: str, + delay_min: float = _DOMAIN_DELAY_MIN, + delay_max: float = _DOMAIN_DELAY_MAX, +) -> None: + """같은 도메인 직전 요청에서 delay(jitter) 경과할 때까지 대기.""" last = _domain_last_request.get(domain) if last is not None: - delay = random.uniform(_DOMAIN_DELAY_MIN, _DOMAIN_DELAY_MAX) + delay = random.uniform(delay_min, delay_max) wait = last + delay - time.monotonic() if wait > 0: # silent sleep 금지 — politeness 동작 검증·운영 관찰 가시성 @@ -175,3 +187,78 @@ async def fetch_page(url: str, *, check_robots: bool = True) -> tuple[str, str]: raise CrawlSkip(f"크기 초과: {len(resp.content)} bytes: {url}") return resp.text, str(resp.request.url) + + +# ── B-3 구독 세션 fetch (Playwright 격리 컨테이너 경유) ────────────────────── + +async def fetch_page_via_browser(url: str, profile: str) -> tuple[str, str]: + """인증 페이지 1건 — playwright-fetcher 에 위임, politeness 는 사람 속도(30~60s). + + (html_text, final_url) 반환. robots 미적용 — 구독 계약 기반 개인 보관 fetch 로 + 공개 크롤러 규약 대상이 아님 (대신 사람 속도 + 동시 1 + 야간 저빈도가 보호 장치). + 예외 어휘는 fetch_page 와 동일 (호출측 분기 재사용). + """ + try: + validate_feed_url(url) + except ValueError as e: + raise CrawlSkip(f"URL 검증 실패: {e}") from e + + domain = _domain_of(url) + async with _get_lock(domain): + await _respect_domain_rate(domain, _AUTH_DELAY_MIN, _AUTH_DELAY_MAX) + try: + async with httpx.AsyncClient(timeout=_FETCHER_TIMEOUT) as client: + resp = await client.post( + f"{_FETCHER_URL}/fetch", json={"url": url, "profile": profile} + ) + except httpx.TimeoutException as e: + raise CrawlFetchError(f"browser fetch timeout: {url}") from e + except httpx.HTTPError as e: + raise CrawlFetchError(f"playwright-fetcher 연결 오류: {e}") from e + finally: + _domain_last_request[domain] = time.monotonic() + + if resp.status_code == 503: + # storage_state 부재 — 수동 세션 박제 대기 (호출측 degrade, 재시도 루프 금지) + raise CrawlBlocked(f"세션 프로필 부재: {profile}") + if resp.status_code != 200: + raise CrawlFetchError(f"playwright-fetcher {resp.status_code}: {url}") + data = resp.json() + html_text = data.get("html", "") + if len(html_text.encode("utf-8", errors="replace")) > _MAX_PAGE_BYTES: + raise CrawlSkip(f"크기 초과 (browser): {url}") + return html_text, data.get("final_url", url) + + +async def probe_session( + profile: str, probe_url: str, min_body_chars: int, paywall_markers: list[str] +) -> dict: + """내용 기반 세션 probe (B-3 ②) — {'ok': bool, 'reason': str|None, 'body_chars': int}. + + 실패를 예외가 아닌 값으로 반환 — 호출측이 source_health 에 기록하고 degrade 분기. + probe 도 실제 publisher fetch 라 동일 도메인 lock + 사람 속도 적용. + """ + domain = _domain_of(probe_url) + async with _get_lock(domain): + await _respect_domain_rate(domain, _AUTH_DELAY_MIN, _AUTH_DELAY_MAX) + try: + async with httpx.AsyncClient(timeout=_FETCHER_TIMEOUT) as client: + resp = await client.post( + f"{_FETCHER_URL}/probe", + json={ + "profile": profile, + "probe_url": probe_url, + "min_body_chars": min_body_chars, + "paywall_markers": paywall_markers, + }, + ) + except httpx.HTTPError as e: + return {"ok": False, "reason": f"fetcher 연결 오류: {e}", "body_chars": 0} + finally: + _domain_last_request[domain] = time.monotonic() + + if resp.status_code == 503: + return {"ok": False, "reason": f"세션 프로필 부재: {profile}", "body_chars": 0} + if resp.status_code != 200: + return {"ok": False, "reason": f"fetcher {resp.status_code}", "body_chars": 0} + return resp.json() diff --git a/app/main.py b/app/main.py index eb49003..208ee86 100644 --- a/app/main.py +++ b/app/main.py @@ -55,6 +55,7 @@ async def lifespan(app: FastAPI): from workers.mailplus_archive import run as mailplus_run from workers.news_collector import run as news_collector_run from workers.fulltext_worker import reconcile_unresolved as fulltext_reconcile_run + from workers.kosha_collector import run as kosha_collector_run from workers.queue_consumer import consume_queue, consume_markdown_queue from workers.study_queue_consumer import consume_study_queue from workers.study_session_queue_consumer import consume_study_session_queue @@ -128,6 +129,8 @@ async def lifespan(app: FastAPI): # plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산. # soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌). scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile") + # crawl-24x7 C-2: KOSHA 재해사례 diff + GUIDE 점진 백필 (daily, 새벽 잡들과 비충돌 슬롯). + scheduler.add_job(kosha_collector_run, CronTrigger(hour=6, minute=40, timezone=KST), id="kosha_collector") scheduler.start() # Phase 2.1 (async 구조): QueryAnalyzer prewarm. diff --git a/app/models/news_source.py b/app/models/news_source.py index 6c58ed8..9518d88 100644 --- a/app/models/news_source.py +++ b/app/models/news_source.py @@ -2,7 +2,7 @@ from datetime import datetime -from sqlalchemy import Boolean, DateTime, Integer, String, Text +from sqlalchemy import Boolean, DateTime, Enum, Integer, String, Text from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column @@ -41,5 +41,15 @@ class NewsSource(Base): feed_content_hash: Mapped[str | None] = mapped_column(String(64)) # 추출 실패 잦은 소스의 site-specific CSS selector (A-2) selector_override: Mapped[dict | None] = mapped_column(JSONB) - # rdf / table-strip / gn-redirect 등 파서 특이 케이스 (B-5) + # rdf / table-strip / gn-redirect / skip-video 등 파서 특이 케이스 (B-5) parser_quirk: Mapped[str | None] = mapped_column(String(30)) + # 채널 — 'news'(다이제스트/브리핑 대상) / 'crawl'(도메인 재료, 0-5 (a)) — migration 324. + # documents.source_channel 로 전파, crawl 채널은 embed/chunk 30일 게이트 미적용. + # documents 와 동일 PG enum 재사용 (Document 모델과 값 목록 동기 유지). + source_channel: Mapped[str] = mapped_column( + Enum("law_monitor", "devonagent", "email", "web_clip", + "tksafety", "inbox_route", "manual", "drive_sync", "news", "memo", + "voice", "hermes", "crawl", + name="source_channel"), + default="news", + ) diff --git a/app/models/source_health.py b/app/models/source_health.py index 9264e39..3e63938 100644 --- a/app/models/source_health.py +++ b/app/models/source_health.py @@ -6,7 +6,7 @@ silent skip 누적 방지의 가시성 기반 — A-8 헬스 패널이 읽는다 from datetime import datetime -from sqlalchemy import BigInteger, DateTime, ForeignKey, Integer, String, Text +from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, String, Text from sqlalchemy.orm import Mapped, mapped_column from core.database import Base @@ -34,3 +34,11 @@ class SourceHealth(Base): updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=datetime.now ) + + # ── B-3 구독 세션 상태 계약 — migration 325 ── + # 쓰기 1종 플래그: A-8 버튼이 기록만, 어댑터가 소비(수동 half-open). + # 소비 위치 = open-스킵 분기보다 앞 (r5 함정 고정 — 데드 버튼 방지). + relogin_requested: Mapped[bool] = mapped_column(Boolean, default=False) + # 내용 기반 probe 결과 (시간 기반 만료 판정 금지 — 페이월 안내문 silent corruption 차단) + last_probe_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + last_probe_ok: Mapped[bool | None] = mapped_column(Boolean) diff --git a/app/workers/fulltext_worker.py b/app/workers/fulltext_worker.py index ab94e35..bbbf001 100644 --- a/app/workers/fulltext_worker.py +++ b/app/workers/fulltext_worker.py @@ -27,10 +27,18 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import aliased from core.config import settings -from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page +from core.crawl_politeness import ( + CrawlBlocked, + CrawlFetchError, + CrawlSkip, + fetch_page, + fetch_page_via_browser, + probe_session, +) from core.database import async_session from core.utils import setup_logger from models.document import Document +from models.news_source import NewsSource from models.queue import ProcessingQueue, enqueue_stage from workers.extract_worker import ( _WEB_MIN_BODY_LEN, @@ -92,6 +100,11 @@ async def _enqueue_downstream(session: AsyncSession, doc: Document) -> None: """승격/격하 공통 후속 — summarize 무조건 + 30일 게이트 통과 시 embed/chunk.""" await enqueue_stage(session, doc.id, "summarize") published_raw = (doc.extract_meta or {}).get("published_at") + if doc.source_channel == "crawl": + # 도메인 재료 코퍼스 — 발행일 무관 전량 색인 (30일 게이트는 뉴스 전용) + await enqueue_stage(session, doc.id, "embed") + await enqueue_stage(session, doc.id, "chunk") + return days_old = 0 if published_raw: try: @@ -111,6 +124,60 @@ def _set_fulltext_meta(doc: Document, **fields) -> None: doc.extract_meta = meta +_PROBE_TTL_SECONDS = 6 * 3600 # probe 유효 시간 — 만료 시 배치 경계에서 재검증 + + +async def _auth_session_ready(session: AsyncSession, source: NewsSource) -> tuple[bool, str]: + """B-3 ② 내용 기반 probe 게이트 + relogin_requested 소비 (수동 half-open). + + 플래그 소비는 '불가용 스킵' 분기보다 앞 — 어댑터 틱마다 도달 (r5 데드 버튼 함정 고정). + probe 실패 상태에서는 auth fetch 0회 (자동 재시도 루프 = 계정 잠금 직행 — B-3 ③). + 복구 경로 = storage_state 갱신 후 relogin_requested 플래그 set (수동). + probe 설정은 source.selector_override JSONB: probe_url / min_body_chars / paywall_markers. + """ + from workers.news_collector import _get_or_create_health + + health = await _get_or_create_health(session, source.id) + now = datetime.now(timezone.utc) + cfg = source.selector_override or {} + probe_url = cfg.get("probe_url") + + force = False + if health.relogin_requested: + health.relogin_requested = False # 소비 = 1회 half-open 시도 + health.updated_at = now + force = True + logger.info(f"[fulltext/auth] {source.name} relogin_requested 소비 — half-open probe") + + if not force: + if health.last_probe_ok is False: + return False, "probe 실패 상태 (storage_state 갱신 + relogin_requested 대기)" + if ( + health.last_probe_ok + and health.last_probe_at + and (now - health.last_probe_at).total_seconds() < _PROBE_TTL_SECONDS + ): + return True, "" + + if not probe_url: + return False, "selector_override.probe_url 미설정" + + result = await probe_session( + source.auth_profile, + probe_url, + int(cfg.get("min_body_chars", 800)), + list(cfg.get("paywall_markers", [])), + ) + health.last_probe_at = now + health.last_probe_ok = bool(result.get("ok")) + health.updated_at = now + if not health.last_probe_ok: + logger.warning(f"[fulltext/auth] {source.name} probe 실패: {result.get('reason')}") + return False, str(result.get("reason")) + logger.info(f"[fulltext/auth] {source.name} probe OK ({result.get('body_chars')}자)") + return True, "" + + async def _degrade(session: AsyncSession, doc: Document, reason: str) -> None: """본문 승격 실패 — RSS 요약 그대로 후속 단계 진행 (기사 유실 0).""" _set_fulltext_meta( @@ -133,8 +200,21 @@ async def process(document_id: int, session: AsyncSession) -> None: meta = doc.extract_meta or {} source_id = meta.get("source_id") + # B-3: 구독 소스(auth_profile)는 Playwright 세션 fetch — probe 게이트 선행 + source = await session.get(NewsSource, source_id) if source_id else None + auth_profile = source.auth_profile if source is not None else None + + if auth_profile: + ready, why = await _auth_session_ready(session, source) + if not ready: + await _degrade(session, doc, f"구독 세션 불가용: {why}") + return + try: - html_text, final_url = await fetch_page(doc.edit_url) + if auth_profile: + html_text, final_url = await fetch_page_via_browser(doc.edit_url, auth_profile) + else: + html_text, final_url = await fetch_page(doc.edit_url) except (CrawlBlocked, CrawlSkip) as e: await _degrade(session, doc, f"{type(e).__name__}: {e}") return @@ -163,6 +243,20 @@ async def process(document_id: int, session: AsyncSession) -> None: await _degrade(session, doc, "푸터 제거 후 본문 부족") return + # B-3: 추출 결과도 페이월 마커로 게이트 — probe 통과 후 만료된 세션의 + # '페이월 안내문' 본문 승격(silent corruption) 차단 + 즉시 probe 상태 강등 + if auth_profile: + from workers.news_collector import _get_or_create_health + + markers = (source.selector_override or {}).get("paywall_markers", []) + hit = next((m for m in markers if m and m.lower() in clean_body.lower()), None) + if hit: + health = await _get_or_create_health(session, source.id) + health.last_probe_ok = False + health.updated_at = datetime.now(timezone.utc) + await _degrade(session, doc, f"본문 페이월 마커 검출({hit}) — 세션 손상 의심") + return + title = doc.title or "" doc.extracted_text = f"{title}\n\n{clean_body}" if title else clean_body doc.extracted_at = now diff --git a/app/workers/kosha_collector.py b/app/workers/kosha_collector.py new file mode 100644 index 0000000..dab61fb --- /dev/null +++ b/app/workers/kosha_collector.py @@ -0,0 +1,351 @@ +"""C-2 KOSHA Open API 수집 워커 (plan crawl-24x7-1). + +3 API (2026-06-10 실키 live 검증 + fixture 박제 — tests/fixtures/kosha_*_response.json): + 재해사례 게시판: GET /B552468/disaster_api02/getdisaster_api02 callApiId=1060 + 재해사례 첨부: GET /B552468/disaster_attach_api02/Disaster_attach_api02 callApiId=1070 + KOSHA GUIDE: GET /B552468/koshaguide/getKoshaGuide callApiId=1050 + +daily 스케줄 1회 (main.py): + 재해사례 = 최근 페이지만 diff (boardno dedup) — 사례 본문 Document(텍스트 네이티브) + + 첨부 PDF/HWP 다운로드 → /documents/crawl_raw/kosha/{boardno}/ 저장 + → 파일 Document + extract enqueue (kordoc HWP/PDF 기존 파이프라인 재사용). + GUIDE = 전체 레지스트리 메타 diff (1039건, 100/page = 11 call) → 신규/개정만, + 일일 ingest cap(기본 25) = backlog 자동 점진 백필(~6주) + 부하 평탄화. + cap 으로 미처리 잔량은 매회 로그 (silent cap 금지). + +키: KOSHA_API_KEY (credentials.env) — 공공데이터포털 '인코딩' 키를 그대로 저장. + httpx params= 로 넘기면 % 가 재인코딩되므로 반드시 URL 문자열에 직접 결합. +개정 감지: GUIDE dedup 키 = 규정번호+공표일자 — 같은 번호의 새 공표일자 = 신규 문서로 적재. +""" + +import asyncio +import hashlib +import os +import random +import re +from datetime import datetime, timezone +from pathlib import Path + +import httpx +from sqlalchemy import select + +from core.config import settings +from core.crawl_politeness import CRAWL_UA +from core.database import async_session +from core.utils import setup_logger +from models.document import Document +from models.news_source import NewsSource +from models.queue import enqueue_stage +from workers.news_collector import ( + FeedError, + _get_or_create_health, + _record_failure, + _record_success, +) + +logger = setup_logger("kosha_collector") + +_BASE = "https://apis.data.go.kr/B552468" +_BOARD_EP = f"{_BASE}/disaster_api02/getdisaster_api02" +_ATTACH_EP = f"{_BASE}/disaster_attach_api02/Disaster_attach_api02" +_GUIDE_EP = f"{_BASE}/koshaguide/getKoshaGuide" + +_CASE_SOURCE = "KOSHA 재해사례" +_GUIDE_SOURCE = "KOSHA GUIDE" + +_CASE_PAGES = 2 # daily diff 범위 (30×2 = 최근 60건 — 등록일 역순 API) +_CASE_ROWS = 30 +_GUIDE_ROWS = 100 +_GUIDE_DAILY_CAP = int(os.getenv("KOSHA_GUIDE_DAILY_CAP", "25")) +_MAX_FILE_BYTES = 50 * 1024 * 1024 +_DOWNLOAD_DELAY = (2.0, 5.0) # portal.kosha.or.kr 파일서버 — 연속 다운로드 간격 + + +def _api_key() -> str: + key = os.getenv("KOSHA_API_KEY", "") + if not key: + raise FeedError("KOSHA_API_KEY 미설정 — KOSHA 수집 불가") + return key + + +async def _api_get(url: str) -> dict: + """공통 GET — 게이트웨이/제공자 이중 에러 체계 검사.""" + async with httpx.AsyncClient(timeout=25) as client: + resp = await client.get(url, headers={"User-Agent": CRAWL_UA}) + if resp.status_code != 200: + raise FeedError(f"KOSHA API {resp.status_code} @ {url.split('?')[0]}") + try: + payload = resp.json() + except ValueError as e: + # 게이트웨이 에러는 XML/plain 으로 옴 (SERVICE_KEY_IS_NOT_REGISTERED 등) + raise FeedError(f"KOSHA API 비-JSON 응답: {resp.text[:120]}") from e + code = (payload.get("header") or {}).get("resultCode") + if code != "00": + raise FeedError(f"KOSHA API resultCode={code}: {(payload.get('header') or {}).get('resultMsg')}") + return payload + + +def _items(payload: dict) -> list[dict]: + """body.items.item — 단건이면 dict 로 오는 data.go.kr 관행 방어.""" + item = ((payload.get("body") or {}).get("items") or {}).get("item") + if item is None: + return [] + return [item] if isinstance(item, dict) else list(item) + + +def _safe_filename(name: str) -> str: + """NAS 파일명 정화 — 경로분리자/제어문자/공백연쇄 제거 (쉘 함정 회피).""" + name = re.sub(r"[/\\\x00-\x1f]", "_", name).strip() + name = re.sub(r"\s+", " ", name) + return name[:140] or "unnamed" + + +async def _download(url: str, dest: Path) -> int: + """첨부/규정 파일 다운로드 — 크기 cap + 디렉토리 생성 + 연속 간격.""" + await asyncio.sleep(random.uniform(*_DOWNLOAD_DELAY)) + async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client: + resp = await client.get(url, headers={"User-Agent": CRAWL_UA}) + if resp.status_code != 200: + raise FeedError(f"파일 다운로드 {resp.status_code}: {url}") + if len(resp.content) > _MAX_FILE_BYTES: + raise FeedError(f"파일 크기 초과 ({len(resp.content)} bytes): {url}") + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(resp.content) + return len(resp.content) + + +async def _get_or_create_source(session, name: str, feed_url: str) -> NewsSource: + result = await session.execute(select(NewsSource).where(NewsSource.name == name)) + source = result.scalars().first() + if source is None: + source = NewsSource( + name=name, feed_url=feed_url, feed_type="rss", fetch_method="api", + fulltext_policy="none", source_channel="crawl", category="Safety", + language="ko", country="KR", + enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 daily 폴링 + ) + session.add(source) + await session.flush() + return source + + +async def _ingest_attachment(session, boardno: str, filenm: str, filepath: str) -> bool: + """첨부 1건 → NAS 저장 + 파일 Document + extract enqueue. 반환 = 신규 여부.""" + safe = _safe_filename(filenm) + rel_path = f"crawl_raw/kosha/{boardno}/{safe}" + existing = await session.execute( + select(Document).where(Document.file_path == rel_path).limit(1) + ) + if existing.scalars().first(): + return False + + dest = Path(settings.nas_mount_path) / rel_path + size = await _download(filepath, dest) + ext = (safe.rsplit(".", 1)[-1].lower() if "." in safe else "bin")[:10] + + doc = Document( + file_path=rel_path, + file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(), + file_format=ext, + file_size=size, + file_type="immutable", + title=safe.rsplit(".", 1)[0], + source_channel="crawl", + data_origin="external", + import_source="kosha_api", + edit_url=filepath, + ai_tags=["Safety/KOSHA재해사례/첨부"], + extract_meta={"kosha": {"boardno": boardno, "kind": "case_attachment"}}, + ) + session.add(doc) + await session.flush() + # extract → (crawl override) classify → embed/chunk — 기존 파일 파이프라인 재사용 + await enqueue_stage(session, doc.id, "extract") + logger.info(f"[kosha] 첨부 ingest: {rel_path} ({size} bytes)") + return True + + +async def collect_disaster_cases(session) -> int: + """재해사례 daily diff — 최근 _CASE_PAGES 페이지, boardno dedup.""" + key = _api_key() + source = await _get_or_create_source(session, _CASE_SOURCE, _BOARD_EP) + new_count = 0 + + for page in range(1, _CASE_PAGES + 1): + payload = await _api_get( + f"{_BOARD_EP}?serviceKey={key}&callApiId=1060&pageNo={page}&numOfRows={_CASE_ROWS}" + ) + items = _items(payload) + if not items: + break + page_all_dup = True + for item in items: + boardno = str(item.get("boardno") or "").strip() + title = (item.get("keyword") or "").strip() + if not boardno or not title: + continue + fhash = hashlib.sha256(f"kosha-case|{boardno}".encode()).hexdigest()[:32] + existing = await session.execute( + select(Document).where(Document.file_hash == fhash).limit(1) + ) + if existing.scalars().first(): + continue + page_all_dup = False + + contents = (item.get("contents") or "").strip() + business = (item.get("business") or "").strip() + now = datetime.now(timezone.utc) + doc = Document( + file_path=f"crawl/{_CASE_SOURCE}/{boardno}", + file_hash=fhash, + file_format="article", + file_size=len(contents.encode()), + file_type="note", + title=title, + extracted_text=f"{title}\n\n[{business}]\n{contents}", + extracted_at=now, + extractor_version="kosha_api", + md_status="skipped", + md_extraction_error="kosha case: 텍스트 네이티브, markdown 변환 비대상", + source_channel="crawl", + data_origin="external", + review_status="approved", + ai_domain="Safety", + ai_sub_group=_CASE_SOURCE, + ai_tags=[f"Safety/KOSHA재해사례/{business or '기타'}"], + extract_meta={ + "source_id": source.id, + "source_name": _CASE_SOURCE, + "published_at": None, + "kosha": {"boardno": boardno, "business": business, + "atcflcnt": item.get("atcflcnt")}, + }, + ) + session.add(doc) + await session.flush() + await enqueue_stage(session, doc.id, "summarize") + await enqueue_stage(session, doc.id, "embed") + await enqueue_stage(session, doc.id, "chunk") + new_count += 1 + + # 첨부 (PDF/HWP) — 본문보다 정보량 큰 정식 사례 보고서 + if int(item.get("atcflcnt") or 0) > 0: + attach = await _api_get( + f"{_ATTACH_EP}?serviceKey={key}&callApiId=1070" + f"&pageNo=1&numOfRows=10&boardno={boardno}" + ) + for att in _items(attach): + filenm = (att.get("filenm") or "").strip() + filepath = (att.get("filepath") or "").strip() + if not filenm or not filepath.startswith("https://"): + continue + try: + await _ingest_attachment(session, boardno, filenm, filepath) + except FeedError as e: + logger.warning(f"[kosha] 첨부 실패 skip ({boardno}/{filenm}): {e}") + if page_all_dup: + break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존 + + logger.info(f"[kosha] 재해사례 신규 {new_count}건") + return new_count + + +async def collect_kosha_guide(session, cap: int = _GUIDE_DAILY_CAP) -> int: + """GUIDE 레지스트리 전체 메타 diff → 신규/개정만 다운로드 (일일 cap 점진 백필).""" + key = _api_key() + await _get_or_create_source(session, _GUIDE_SOURCE, _GUIDE_EP) + new_specs: list[dict] = [] + page, total = 1, None + + while True: + payload = await _api_get( + f"{_GUIDE_EP}?serviceKey={key}&callApiId=1050&pageNo={page}&numOfRows={_GUIDE_ROWS}" + ) + if total is None: + total = int((payload.get("body") or {}).get("totalCount") or 0) + items = _items(payload) + if not items: + break + for item in items: + no = (item.get("techGdlnNo") or "").strip() + ymd = (item.get("techGdlnOfancYmd") or "").strip() + url = (item.get("fileDownloadUrl") or "").strip() + if not no or not url.startswith("https://"): + continue + fhash = hashlib.sha256(f"kosha-guide|{no}|{ymd}".encode()).hexdigest()[:32] + existing = await session.execute( + select(Document).where(Document.file_hash == fhash).limit(1) + ) + if not existing.scalars().first(): + new_specs.append({"no": no, "ymd": ymd, "url": url, + "name": (item.get("techGdlnNm") or no).strip(), + "fhash": fhash}) + if page * _GUIDE_ROWS >= total: + break + page += 1 + + todo, deferred = new_specs[:cap], len(new_specs) - min(len(new_specs), cap) + ingested = 0 + for spec in todo: + safe_no = _safe_filename(spec["no"]) + rel_path = f"crawl_raw/kosha_guide/{safe_no}-{spec['ymd'] or 'nodate'}.pdf" + dest = Path(settings.nas_mount_path) / rel_path + try: + size = await _download(spec["url"], dest) + except FeedError as e: + logger.warning(f"[kosha] GUIDE 다운로드 실패 skip ({spec['no']}): {e}") + continue + doc = Document( + file_path=rel_path, + file_hash=spec["fhash"], + file_format="pdf", + file_size=size, + file_type="immutable", + title=f"{spec['name']} ({spec['no']})", + source_channel="crawl", + data_origin="external", + import_source="kosha_api", + edit_url=spec["url"], + ai_tags=["Safety/KOSHA GUIDE"], + extract_meta={"kosha": {"kind": "guide", "techGdlnNo": spec["no"], + "ofancYmd": spec["ymd"]}}, + ) + session.add(doc) + await session.flush() + await enqueue_stage(session, doc.id, "extract") + ingested += 1 + + # silent cap 금지 — 잔량 가시화 (자동 점진 백필: 내일 cap 만큼 또 소화) + logger.info(f"[kosha] GUIDE 신규/개정 {len(new_specs)}건 중 {ingested}건 ingest" + + (f" (cap {cap}, 잔여 {deferred}건 — 일일 점진 백필)" if deferred > 0 else "")) + return ingested + + +async def run() -> None: + """daily 1회 — 소스별 실패 격리 (재해사례 실패가 GUIDE 를 막지 않게).""" + now = datetime.now(timezone.utc) + for name, collector in ((_CASE_SOURCE, collect_disaster_cases), + (_GUIDE_SOURCE, collect_kosha_guide)): + async with async_session() as session: + result = await session.execute(select(NewsSource).where(NewsSource.name == name)) + source = result.scalars().first() + try: + count = await collector(session) + if source is None: # 첫 실행에서 collector 가 생성 + result = await session.execute( + select(NewsSource).where(NewsSource.name == name)) + source = result.scalars().first() + health = await _get_or_create_health(session, source.id) + _record_success(health, count, False, now) + await session.commit() + except Exception as e: + logger.error(f"[kosha] {name} 수집 실패: {e}") + await session.rollback() # 부분 적재 폐기 후 health 만 기록 + if source is not None: + health = await _get_or_create_health(session, source.id) + _record_failure(health, str(e) or repr(e), now) + await session.commit() + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/app/workers/news_collector.py b/app/workers/news_collector.py index cba44ae..b15c74a 100644 --- a/app/workers/news_collector.py +++ b/app/workers/news_collector.py @@ -45,6 +45,10 @@ CATEGORY_MAP = { "Kultur": "Culture", "Wissenschaft": "Technology", # 프랑스어 "Environnement": "Environment", + # 도메인 채널 (source_channel='crawl', 0-5 (a)) — 양쪽 공통 맵 + "안전": "Safety", "Safety": "Safety", + "공학": "Engineering", "Engineering": "Engineering", + "철학": "Philosophy", "Philosophy": "Philosophy", } @@ -270,6 +274,11 @@ async def _enqueue_processing(session, doc: Document, source: NewsSource, pub_dt await enqueue_stage(session, doc.id, "fulltext") return await enqueue_stage(session, doc.id, "summarize") + if source.source_channel == "crawl": + # 도메인 재료 코퍼스 — 발행일 무관 전량 색인 (30일 게이트는 뉴스 전용) + await enqueue_stage(session, doc.id, "embed") + await enqueue_stage(session, doc.id, "chunk") + return days_old = (datetime.now(timezone.utc) - pub_dt).days if days_old <= 30: await enqueue_stage(session, doc.id, "embed") @@ -285,6 +294,26 @@ def _build_extract_meta(source: NewsSource, pub_dt: datetime) -> dict: } +def _doc_identity(source: NewsSource, source_short: str, category: str) -> dict: + """채널별 문서 정체성 — news 채널은 기존 값 그대로(무회귀), crawl 채널은 도메인 정체성. + + file_path 접두사가 곧 채널 디렉토리. ai_domain 은 다이제스트/검색 필터의 분기 축이라 + crawl 채널이 'News' 를 오염시키지 않게 분리 (0-5 채널 레벨 분리 사상). + """ + if source.source_channel == "crawl": + domain = category if category and category != "Other" else "Domain" + return { + "path_prefix": "crawl", + "ai_domain": domain, + "ai_tags": [f"{domain}/{source_short}"], + } + return { + "path_prefix": "news", + "ai_domain": "News", + "ai_tags": [f"News/{source_short}/{category}"], + } + + async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]: """RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1). @@ -393,6 +422,11 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]: is_feed_full = True link = entry.get("link", "") + + # B-5 quirk: 비디오 항목 필터 (Aeon/Psyche — 텍스트 코퍼스에 비디오 페이지 무가치) + if source.parser_quirk == "skip-video" and re.search(r"/videos?/", link): + continue + published = entry.get("published_parsed") or entry.get("updated_parsed") pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc) @@ -418,9 +452,10 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]: category = _normalize_category(source.category or "") source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문" + ident = _doc_identity(source, source_short, category) doc = Document( - file_path=f"news/{source.name}/{article_id}", + file_path=f"{ident['path_prefix']}/{source.name}/{article_id}", file_hash=article_id, file_format="article", file_size=len(body.encode()), @@ -435,14 +470,14 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]: # fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신. md_status="skipped", md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상", - source_channel="news", + source_channel=source.source_channel, data_origin="external", # 조회와 동일하게 정규화해 저장 — raw(tracking param 포함) 저장 시 URL dedup 무력화 edit_url=normalized_url, review_status="approved", - ai_domain="News", + ai_domain=ident["ai_domain"], ai_sub_group=source_short, - ai_tags=[f"News/{source_short}/{category}"], + ai_tags=ident["ai_tags"], extract_meta=_build_extract_meta(source, pub_dt), ) session.add(doc) @@ -459,6 +494,136 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]: async def _fetch_api(session, source: NewsSource) -> tuple[int, str]: + """API 소스 디스패치 — feed_url 호스트로 제공자 판별 (B-2). + + 레거시 NYT 행(feed_url=api.nytimes.com)은 무변경 경로. 신규 제공자는 호스트 분기 추가. + 미지의 호스트 = NYT 경로로 넘기지 않고 명시 실패 (silent fallback 금지). + """ + host = (urlparse(source.feed_url).hostname or "").lower() + if host.endswith("guardianapis.com"): + return await _fetch_api_guardian(session, source) + if host.endswith("nytimes.com"): + return await _fetch_api_nyt(session, source) + raise FeedError(f"API 제공자 미등록 호스트: {host} — 디스패치 분기 추가 필요") + + +def _guardian_request(feed_url: str, api_key: str) -> tuple[str, dict]: + """Guardian 호출 형태 단일 source-of-truth — fixture 회귀 테스트 대상 + (tests/fixtures/guardian_open_platform_search_response.json 박제 시 호출과 동일해야 함).""" + parsed = urlparse(feed_url) + params = { + **dict(parse_qsl(parsed.query)), + "show-fields": "bodyText,trailText", + "page-size": "20", + "order-by": "newest", + "api-key": api_key, + } + return f"{parsed.scheme}://{parsed.netloc}{parsed.path}", params + + +async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]: + """Guardian Open Platform 수집 (B-2) — show-fields=bodyText 로 정식 전문 JSON. + + feed_url 에 section 쿼리를 박아 등록 (예: https://content.guardianapis.com/search?section=world). + 전문이 API 로 오므로 fulltext stage 불요. 키 미설정 = FeedError (health 실패 기록, + silent fallback 없음 — [[feedback_no_silent_fallback_explicit_opt_in]]). + """ + import os + api_key = os.getenv("GUARDIAN_API_KEY", "") + if not api_key: + raise FeedError("GUARDIAN_API_KEY 미설정 — Guardian 수집 불가") + + endpoint, params = _guardian_request(source.feed_url, api_key) + + try: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.get(endpoint, params=params) + resp.raise_for_status() + except httpx.HTTPStatusError as e: + # 쿼리스트링(api-key 포함) 제거 — path 까지만 로깅 (NYT 와 동일 규율) + safe_url = str(e.request.url).split("?")[0] + raise FeedError(f"Guardian API 실패: {e.response.status_code} @ {safe_url}") from e + except httpx.RequestError as e: + safe_url = str(e.request.url).split("?")[0] if e.request else "unknown" + raise FeedError(f"Guardian API 연결 실패: {safe_url}") from e + + payload = resp.json().get("response", {}) + if payload.get("status") != "ok": + raise FeedError(f"Guardian API status={payload.get('status')}") + + count = 0 + for item in payload.get("results", []): + title = (item.get("webTitle") or "").strip() + if not title: + continue + + fields = item.get("fields") or {} + body_text = (fields.get("bodyText") or "").strip() + trail = _clean_html(fields.get("trailText") or "") + # bodyText = plain text 전문 (HTML 정화 불요). 짧으면(라이브 블로그 잔재 등) trail 격하. + is_full = len(body_text) >= 200 + body = body_text if is_full else (trail or title) + + link = item.get("webUrl", "") + pub_str = item.get("webPublicationDate", "") + try: + pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00")) + except (ValueError, AttributeError): + pub_dt = datetime.now(timezone.utc) + + article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name) + normalized_url = _normalize_url(link) + + # RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first) + existing = await session.execute( + select(Document).where( + (Document.file_hash == article_id) | + (Document.edit_url.in_([normalized_url, link])) + ).limit(1) + ) + if existing.scalars().first(): + continue + + if await _is_portal_duplicate(session, title): + logger.info(f"[{source.name}] portal-dup skip: {title[:60]}") + continue + + category = _normalize_category(item.get("sectionName", source.category or "")) + source_short = source.name.split(" ")[0] + ident = _doc_identity(source, source_short, category) + + doc = Document( + file_path=f"{ident['path_prefix']}/{source.name}/{article_id}", + file_hash=article_id, + file_format="article", + file_size=len(body.encode()), + file_type="note", + title=title, + extracted_text=f"{title}\n\n{body}", + extracted_at=datetime.now(timezone.utc), + extractor_version="guardian_api_full" if is_full else "guardian_api", + md_status="skipped", + md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상", + source_channel=source.source_channel, + data_origin="external", + edit_url=normalized_url, + review_status="approved", + ai_domain=ident["ai_domain"], + ai_sub_group=source_short, + ai_tags=ident["ai_tags"], + extract_meta=_build_extract_meta(source, pub_dt), + ) + session.add(doc) + await session.flush() + + await _enqueue_processing(session, doc, source, pub_dt) + count += 1 + + logger.info(f"[{source.name}] API → {count}건 수집") + return count, "ok" + + +async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]: """NYT API 수집 — 키 마스킹 + health degradation""" import os nyt_key = os.getenv("NYT_API_KEY", "") @@ -519,8 +684,9 @@ async def _fetch_api(session, source: NewsSource) -> tuple[int, str]: category = _normalize_category(article.get("section", source.category or "")) source_short = source.name.split(" ")[0] + ident = _doc_identity(source, source_short, category) doc = Document( - file_path=f"news/{source.name}/{article_id}", + file_path=f"{ident['path_prefix']}/{source.name}/{article_id}", file_hash=article_id, file_format="article", file_size=len(summary.encode()), @@ -534,13 +700,13 @@ async def _fetch_api(session, source: NewsSource) -> tuple[int, str]: # 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상). md_status="skipped", md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상", - source_channel="news", + source_channel=source.source_channel, data_origin="external", edit_url=normalized_url, review_status="approved", - ai_domain="News", + ai_domain=ident["ai_domain"], ai_sub_group=source_short, - ai_tags=[f"News/{source_short}/{category}"], + ai_tags=ident["ai_tags"], extract_meta=_build_extract_meta(source, pub_dt), ) session.add(doc) diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index b198067..1d6b92c 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -140,6 +140,9 @@ async def enqueue_next_stage(document_id: int, current_stage: str): # source_channel-aware override (extract stage 만). source_channel 누락 시 _default. extract_override_by_channel = { "devonagent": ["embed", "chunk"], + # crawl 채널 파일형 (KOSHA 첨부/GUIDE PDF 등): preview 사전 캐시 스킵 — + # 재료 코퍼스 대량 백필이 preview 큐를 점령하지 않게. classify → embed/chunk/markdown 유지. + "crawl": ["classify"], } next_stages = { diff --git a/app/workers/static_corpus_ingest.py b/app/workers/static_corpus_ingest.py new file mode 100644 index 0000000..06d4a41 --- /dev/null +++ b/app/workers/static_corpus_ingest.py @@ -0,0 +1,262 @@ +"""C-3 공학 정적 코퍼스 1회 일괄 ingest (plan crawl-24x7-1). + +National Board 기술 아티클(~86, ASP.NET 구식 — 기사 앵커가 싱글쿼트 href) + +TWI Job Knowledge(~153, sitemap 기반). 지속 크롤링이 아니라 아카이브 일괄 + +저빈도 증분 유형 — 스케줄러 미등록, 수동 CLI: + + docker exec hyungi_document_server-fastapi-1 \ + python -m workers.static_corpus_ingest --corpus all --limit 3 # 검증용 + docker exec -d hyungi_document_server-fastapi-1 \ + python -m workers.static_corpus_ingest --corpus all # 전체 (~45분) + + ※ -d 백그라운드 실행 시 중단은 host pkill 이 아니라 컨테이너 내부 PID kill + ([[feedback_docker_exec_orphan_kill]]). + +멱등: edit_url(정규화)+file_hash dedup — 재실행 = 신규분만 (그대로 monthly 증분 절차). +politeness: fetch_page 재사용 (per-domain 1 + 5~15s jitter + robots). +원본 보존·승격 필드: fulltext_worker 와 동일 규약 (재추출 가능 상태 유지). +실패는 degrade 없이 skip + 말미 목록 출력 (정적 코퍼스 — RSS 요약 같은 격하 대상 부재). +""" + +import argparse +import asyncio +import hashlib +import re +from datetime import datetime, timezone +from html import unescape + +from sqlalchemy import select + +from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page +from core.database import async_session +from core.utils import setup_logger +from models.document import Document +from models.news_source import NewsSource +from models.queue import enqueue_stage +from workers.fulltext_worker import ( + _WEB_MIN_BODY_LEN, + _extract_body, + _raw_html_path, + _save_raw_html, + _strip_article_footer, +) +from workers.news_collector import _article_hash, _normalize_url + +logger = setup_logger("static_corpus") + +_NB_LISTING = "https://www.nationalboard.org/Index.aspx?pageID=164" +_TWI_SITEMAP = "https://www.twi-global.com/sitemap.xml" + + +async def _discover_national_board() -> list[str]: + """목록 페이지의 기사 앵커 — 싱글쿼트 href 가 기본형이라 양쪽 인용부호 매칭.""" + html_text, _ = await fetch_page(_NB_LISTING) + ids = sorted( + {int(i) for i in re.findall( + r"href=['\"]/?Index\.aspx\?pageID=164&(?:amp;)?ID=(\d+)['\"]", html_text)} + ) + return [f"https://www.nationalboard.org/Index.aspx?pageID=164&ID={i}" for i in ids] + + +async def _discover_twi() -> list[str]: + """sitemap 에서 job-knowledge 시리즈만 (faqs/published-papers 는 향후 증분 후보).""" + xml_text, _ = await fetch_page(_TWI_SITEMAP) + urls = re.findall( + r"(https://www\.twi-global\.com/technical-knowledge/job-knowledge/[^<]+)", + xml_text, + ) + return sorted({u for u in urls if not u.rstrip("/").endswith("job-knowledge")}) + + +CORPORA = { + "national-board": { + "source_name": "National Board 기술 아티클", + "listing_url": _NB_LISTING, + "discover": _discover_national_board, + "fetch_method": "page", + }, + "twi": { + "source_name": "TWI Job Knowledge", + "listing_url": _TWI_SITEMAP, + "discover": _discover_twi, + "fetch_method": "sitemap+page", + }, +} + + +async def _get_or_create_source(session, spec: dict) -> NewsSource: + """레지스트리 행 — 출처 추적 + crawl_raw src_{id} 경로 + A-8 패널 가시성. + + enabled=False: 6h 뉴스 사이클 비대상 (피드가 없는 정적 코퍼스 — 증분은 본 CLI 재실행). + """ + result = await session.execute( + select(NewsSource).where(NewsSource.name == spec["source_name"]) + ) + source = result.scalars().first() + if source is None: + source = NewsSource( + name=spec["source_name"], + feed_url=spec["listing_url"], + feed_type="rss", + fetch_method=spec["fetch_method"], + fulltext_policy="none", + source_channel="crawl", + category="Engineering", + language="en", + country="US" if "national" in spec["source_name"].lower() else "GB", + enabled=False, + ) + session.add(source) + await session.flush() + return source + + +def _page_title(html_text: str, fallback: str) -> str: + m = re.search(r']*>([^<]+)", html_text, re.I) + title = unescape(m.group(1)).strip() if m else "" + # 사이트 접미 잡음 제거 (TWI 는 ' - TWI', NB 는 'National Board ...' 꼬리표) + title = re.sub(r"\s*[-|·]\s*(TWI|National Board[^-|]*)\s*$", "", title).strip() + return title or fallback + + +async def _ingest_one(session, source: NewsSource, url: str) -> str: + """기사 1건. 반환: 'ok' / 'dup' / 'skip'(추출부족·차단).""" + normalized_url = _normalize_url(url) + existing = await session.execute( + select(Document).where(Document.edit_url.in_([normalized_url, url])).limit(1) + ) + if existing.scalars().first(): + return "dup" + + try: + html_text, final_url = await fetch_page(url) + except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e: + logger.warning(f"[{source.name}] fetch 실패 skip: {url} — {type(e).__name__}: {e}") + return "skip" + + body, engine, engine_ver = _extract_body(html_text) + if not engine: + logger.warning(f"[{source.name}] 추출 실패 skip (< {_WEB_MIN_BODY_LEN}자): {url}") + return "skip" + clean_body = _strip_article_footer(body.replace("\x00", "")) + if len(clean_body) < _WEB_MIN_BODY_LEN: + logger.warning(f"[{source.name}] 푸터 제거 후 본문 부족 skip: {url}") + return "skip" + + title = _page_title(html_text, fallback=url.rsplit("/", 1)[-1][:90]) + article_id = _article_hash(title, "static", source.name) + dup2 = await session.execute( + select(Document).where(Document.file_hash == article_id).limit(1) + ) + if dup2.scalars().first(): + return "dup" + + now = datetime.now(timezone.utc) + raw_path = _raw_html_path(source.id, article_id, now) + raw_saved = True + try: + _save_raw_html(raw_path, html_text) + except OSError as e: + raw_saved = False + logger.error(f"[{source.name}] 원본 보존 실패 (ingest 는 진행): {e}") + + doc = Document( + file_path=f"crawl/{source.name}/{article_id}", + file_hash=article_id, + file_format="article", + file_size=0, # 아래 extracted_text 확정 후 재계산 + file_type="note", + title=title, + extracted_text=f"{title}\n\n{clean_body}", + extracted_at=now, + extractor_version=f"static+page@{engine}", + md_content=clean_body, + md_status="success", + md_extraction_engine=engine, + md_extraction_engine_version=engine_ver, + md_format_version="1.0", + md_generated_at=now, + md_source_hash=hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest(), + md_content_hash=hashlib.sha256(clean_body.encode("utf-8")).hexdigest(), + content_origin="extracted", + source_channel="crawl", + data_origin="external", + edit_url=normalized_url, + review_status="approved", + ai_domain="Engineering", + ai_sub_group=source.name, + ai_tags=[f"Engineering/{source.name}"], + extract_meta={ + "source_id": source.id, + "source_name": source.name, + "published_at": None, # 정적 코퍼스 — 페이지 발행일 비신뢰, 색인은 채널 게이트로 무조건 + "fulltext": { + "status": "static_corpus", + "engine": engine, + "final_url": final_url, + "raw_html_path": str(raw_path) if raw_saved else None, + "body_chars": len(clean_body), + "resolved_at": now.isoformat(), + }, + }, + ) + doc.file_size = len(doc.extracted_text.encode()) + session.add(doc) + await session.flush() + + # crawl 채널 = 발행일 무관 전량 색인 (summarize 는 맥미니 큐 — D-4 lag 관찰 대상) + await enqueue_stage(session, doc.id, "summarize") + await enqueue_stage(session, doc.id, "embed") + await enqueue_stage(session, doc.id, "chunk") + logger.info(f"[{source.name}] ingest {len(clean_body)}자 ({engine}): {title[:60]}") + return "ok" + + +async def run(corpus: str = "all", limit: int = 0) -> None: + targets = list(CORPORA) if corpus == "all" else [corpus] + for key in targets: + spec = CORPORA[key] + async with async_session() as session: + source = await _get_or_create_source(session, spec) + await session.commit() + source_id = source.id + + try: + urls = await spec["discover"]() + except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e: + logger.error(f"[{spec['source_name']}] 목록 수집 실패 — corpus 건너뜀: {e}") + continue + if limit: + urls = urls[:limit] + logger.info(f"[{spec['source_name']}] 대상 {len(urls)}건 (limit={limit or '없음'})") + + counts = {"ok": 0, "dup": 0, "skip": 0} + failed: list[str] = [] + for i, url in enumerate(urls, 1): + # 커밋 10건 단위 — 장시간 배치 중단 시 진행분 보존 + async with async_session() as session: + src = await session.get(NewsSource, source_id) + status = await _ingest_one(session, src, url) + await session.commit() + counts[status] += 1 + if status == "skip": + failed.append(url) + if i % 10 == 0: + logger.info(f"[{spec['source_name']}] 진행 {i}/{len(urls)} {counts}") + + logger.info(f"[{spec['source_name']}] 완료: {counts}") + if failed: + logger.warning( + f"[{spec['source_name']}] skip {len(failed)}건 — 재시도는 CLI 재실행(멱등):\n " + + "\n ".join(failed) + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="C-3 정적 코퍼스 일괄 ingest") + parser.add_argument("--corpus", choices=[*CORPORA, "all"], default="all") + parser.add_argument("--limit", type=int, default=0, help="corpus 당 상한 (0=전체)") + args = parser.parse_args() + asyncio.run(run(args.corpus, args.limit)) diff --git a/docker-compose.yml b/docker-compose.yml index 7e1283e..8bcf06d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -254,6 +254,16 @@ services: condition: service_healthy restart: unless-stopped + # crawl-24x7 B-3: 구독 세션 Playwright fetch 격리 — internal-only (host 포트·caddy 라우트 금지). + # 브라우저 hang/크래시가 fastapi APScheduler 를 잠식하지 않게 별도 컨테이너 + mem cap. + # 세션 파일(쿠키=credential 등가물)은 repo 밖 호스트 경로 ro mount (600, gitignore 무관 영역). + playwright-fetcher: + build: ./services/playwright-fetcher + volumes: + - /home/hyungi/.local/share/crawl-auth:/auth:ro + mem_limit: 2g + restart: unless-stopped + caddy: image: caddy:2 ports: diff --git a/migrations/324_news_sources_source_channel.sql b/migrations/324_news_sources_source_channel.sql new file mode 100644 index 0000000..d500728 --- /dev/null +++ b/migrations/324_news_sources_source_channel.sql @@ -0,0 +1,5 @@ +-- B/C 그룹 (plan crawl-24x7-1, 0-5 확정): 레지스트리에 채널 컬럼 — additive only. +-- documents.source_channel 과 동일 enum 재사용 ('crawl' 값은 320 에서 별도 트랜잭션으로 추가 완료). +-- 기존 행 전부 'news' 기본값 = 무회귀. crawl 채널 소스의 문서 생성/색인 게이트 분기 기준. +ALTER TABLE news_sources + ADD COLUMN IF NOT EXISTS source_channel source_channel NOT NULL DEFAULT 'news'; diff --git a/migrations/325_source_health_b3_probe_columns.sql b/migrations/325_source_health_b3_probe_columns.sql new file mode 100644 index 0000000..807d133 --- /dev/null +++ b/migrations/325_source_health_b3_probe_columns.sql @@ -0,0 +1,8 @@ +-- B-3 (plan crawl-24x7-1): 구독 세션 상태 노출 계약 — additive only. +-- relogin_requested: 쓰기 1종 플래그 (A-8 버튼이 기록, 어댑터가 소비 = 수동 half-open). +-- 소비 위치 함정(r5 고정): open-스킵 분기보다 앞 — 어댑터 틱마다 확인. +-- last_probe_at/ok: 내용 기반 probe 결과 (시간 기반 만료 판정 금지 — silent corruption 차단). +ALTER TABLE source_health + ADD COLUMN IF NOT EXISTS relogin_requested BOOLEAN NOT NULL DEFAULT FALSE, + ADD COLUMN IF NOT EXISTS last_probe_at TIMESTAMPTZ, + ADD COLUMN IF NOT EXISTS last_probe_ok BOOLEAN; diff --git a/migrations/326_seed_crawl_cycle2_sources.sql b/migrations/326_seed_crawl_cycle2_sources.sql new file mode 100644 index 0000000..bec8ff0 --- /dev/null +++ b/migrations/326_seed_crawl_cycle2_sources.sql @@ -0,0 +1,33 @@ +-- crawl-24x7 사이클 2 소스 seed (B-2 + C-1 안전 + C-5 철학) — 2026-06-10 전 URL live 검증. +-- 262 선례: WHERE NOT EXISTS idempotent, 기존 행 보존, 신규만 insert (단일 statement). +-- 채널: news = 다이제스트/브리핑 대상 / crawl = 도메인 재료 (0-5 분리). +-- 정책: feed-full = 피드 본문이 전문 (UK HSE content:encoded 실측) / page = 기사 페이지 4-tier 승격. +-- EU-OSHA 는 후보 등재만 (enabled=false — 카드 C-1 '우선순위 낮음'). +-- 르몽드 B-3 활성화는 seed 아님 — 세션 박제 후 runtime UPDATE (auth_profile/selector_override). +INSERT INTO news_sources + (name, country, language, feed_type, feed_url, category, enabled, + fetch_method, fulltext_policy, source_channel, parser_quirk) +SELECT v.name, v.country, v.language, v.feed_type, v.feed_url, v.category, v.enabled, + v.fetch_method, v.fulltext_policy, v.source_channel::source_channel, v.parser_quirk +FROM (VALUES + -- B-2: Guardian Open Platform (전문 JSON — 스크래핑 불요, GUARDIAN_API_KEY 필요) + ('Guardian World', 'GB', 'en', 'api', 'https://content.guardianapis.com/search?section=world', 'International', true, 'api', 'none', 'news', NULL), + -- C-1 안전 (Safety) + ('UK HSE Press', 'GB', 'en', 'rss', 'https://press.hse.gov.uk/feed/', 'Safety', true, 'rss', 'feed-full', 'crawl', NULL), + ('안전신문', 'KR', 'ko', 'rss', 'https://www.safetynews.co.kr/rss/allArticle.xml', 'Safety', true, 'rss', 'page', 'crawl', NULL), + ('고용노동부 공지', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/notice.do', 'Safety', true, 'rss', 'page', 'crawl', NULL), + ('고용노동부 정책', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/policy.do', 'Safety', true, 'rss', 'page', 'crawl', NULL), + ('고용노동부 입법행정예고', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/lawinfo.do', 'Safety', true, 'rss', 'page', 'crawl', NULL), + ('OSHA QuickTakes', 'US', 'en', 'rss', 'https://www.osha.gov/sites/default/files/quicktakes.xml', 'Safety', true, 'rss', 'page', 'crawl', NULL), + ('EU-OSHA News', 'EU', 'en', 'rss', 'https://osha.europa.eu/en/rss-feeds/latest/news.xml', 'Safety', false, 'rss', 'page', 'crawl', NULL), + -- C-5 철학 (Philosophy) + ('SEP 신규·개정', 'US', 'en', 'rss', 'https://plato.stanford.edu/rss/sep.xml', 'Philosophy', true, 'rss', 'page', 'crawl', NULL), + ('1000-Word Philosophy', 'US', 'en', 'rss', 'https://1000wordphilosophy.com/feed/', 'Philosophy', true, 'rss', 'feed-full', 'crawl', NULL), + ('Doing Philosophy', 'KR', 'ko', 'rss', 'https://doingphilosophy.kr/feed', 'Philosophy', true, 'rss', 'page', 'crawl', NULL), + ('Aeon', 'GB', 'en', 'rss', 'https://aeon.co/feed.rss', 'Philosophy', true, 'rss', 'page', 'crawl', 'skip-video'), + ('Psyche', 'GB', 'en', 'rss', 'https://psyche.co/feed.rss', 'Philosophy', true, 'rss', 'page', 'crawl', 'skip-video') +) AS v(name, country, language, feed_type, feed_url, category, enabled, + fetch_method, fulltext_policy, source_channel, parser_quirk) +WHERE NOT EXISTS ( + SELECT 1 FROM news_sources ns WHERE ns.name = v.name +); diff --git a/scripts/capture_subscription_session.py b/scripts/capture_subscription_session.py new file mode 100644 index 0000000..c6ee72d --- /dev/null +++ b/scripts/capture_subscription_session.py @@ -0,0 +1,59 @@ +"""B-3 구독 세션 1회 수동 박제 (MacBook 등 GUI 머신에서 실행). + +르몽드 = Google OAuth — 자동화 브라우저 로그인은 Google 이 차단하므로 +로그인 자체는 항상 사람이 headed 브라우저에서 수행하고, 본 스크립트는 +그 결과(쿠키+localStorage = storage_state JSON)만 박제한다. + +사용 (MacBook): + pip install playwright && playwright install chromium + python scripts/capture_subscription_session.py --profile lemonde --url https://www.lemonde.fr + 1) 떠오른 브라우저에서 직접 로그인 (Google OAuth 포함) + 2) 로그인 완료 확인 후 터미널에서 Enter + 3) ~/.local/share/crawl-auth/lemonde.json 저장 (600) + +GPU 반영: + ssh gpu 'mkdir -p ~/.local/share/crawl-auth && chmod 700 ~/.local/share/crawl-auth' + scp ~/.local/share/crawl-auth/lemonde.json gpu:.local/share/crawl-auth/ + ssh gpu 'chmod 600 ~/.local/share/crawl-auth/lemonde.json' + +세션 만료 후 재로그인도 동일 절차 + source_health.relogin_requested 플래그 set +(어댑터가 다음 틱에 half-open probe 로 소비). + +주의: storage_state = credential 등가물. repo 안·백업 대상 경로에 두지 말 것. +""" + +import argparse +from pathlib import Path + +from playwright.sync_api import sync_playwright + +AUTH_DIR = Path.home() / ".local" / "share" / "crawl-auth" + + +def main() -> None: + parser = argparse.ArgumentParser(description="B-3 구독 세션 storage_state 박제") + parser.add_argument("--profile", required=True, help="예: lemonde") + parser.add_argument("--url", required=True, help="로그인 시작 페이지") + args = parser.parse_args() + + AUTH_DIR.mkdir(parents=True, exist_ok=True) + AUTH_DIR.chmod(0o700) + out = AUTH_DIR / f"{args.profile}.json" + + with sync_playwright() as pw: + browser = pw.chromium.launch(headless=False) + context = browser.new_context(viewport={"width": 1366, "height": 900}) + page = context.new_page() + page.goto(args.url) + print(f"\n브라우저에서 로그인을 완료한 뒤 이 터미널에서 Enter 를 누르세요.") + input("로그인 완료 후 Enter > ") + context.storage_state(path=str(out)) + browser.close() + + out.chmod(0o600) + print(f"저장: {out} (600)") + print("다음: scp 로 GPU ~/.local/share/crawl-auth/ 반영 + chmod 600") + + +if __name__ == "__main__": + main() diff --git a/services/playwright-fetcher/Dockerfile b/services/playwright-fetcher/Dockerfile new file mode 100644 index 0000000..86d0487 --- /dev/null +++ b/services/playwright-fetcher/Dockerfile @@ -0,0 +1,14 @@ +# B-3 / A-1 Tier 2 (plan crawl-24x7-1) — Playwright 격리 컨테이너. +# 브라우저 hang/크래시가 fastapi APScheduler 를 잠식하지 않게 별도 서비스로 격리, +# 타임아웃 있는 HTTP 호출로만 사용. 요청당 브라우저 기동 = 컨텍스트 누적 메모리 차단. +FROM mcr.microsoft.com/playwright/python:v1.47.0-jammy + +WORKDIR /srv +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY server.py . + +# internal-only — compose 네트워크 전용, host 포트 미매핑 (caddy 라우트 금지) +EXPOSE 3400 +CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3400"] diff --git a/services/playwright-fetcher/requirements.txt b/services/playwright-fetcher/requirements.txt new file mode 100644 index 0000000..92e69b0 --- /dev/null +++ b/services/playwright-fetcher/requirements.txt @@ -0,0 +1,3 @@ +fastapi==0.115.* +uvicorn==0.32.* +playwright==1.47.0 diff --git a/services/playwright-fetcher/server.py b/services/playwright-fetcher/server.py new file mode 100644 index 0000000..78d2e4a --- /dev/null +++ b/services/playwright-fetcher/server.py @@ -0,0 +1,107 @@ +"""B-3 구독 세션 Playwright fetcher (plan crawl-24x7-1). + +storage_state JSON(쿠키+localStorage 스냅샷) 기반 인증 페이지 fetch + 내용 기반 probe. +- 동시 1 인스턴스 (글로벌 세마포어) — 계정 보호 + 사람 속도는 호출측 politeness 가 담당. +- 요청당 브라우저 기동/종료 — 컨텍스트 메모리 누적·hang 잔존 차단 (저빈도라 기동비용 무관). +- 세션 파일: /auth/{profile}.json (호스트 ~/.local/share/crawl-auth/, ro mount, 600). + 부재 = 503 profile_missing (silent fallback 없음 — 호출측이 degrade). +- 시간 기반 만료 판정 금지 — probe 는 알려진 유료 기사에서 본문 길이 + 페이월 마커 부재 검증 + (만료 후 200 '페이월 안내문'이 본문으로 저장되는 silent corruption 차단). +""" + +import asyncio +import logging +from pathlib import Path + +from fastapi import FastAPI, HTTPException +from playwright.async_api import async_playwright, Error as PlaywrightError +from pydantic import BaseModel, Field + +logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") +logger = logging.getLogger("playwright-fetcher") + +AUTH_DIR = Path("/auth") +NAV_TIMEOUT_MS = 45_000 +SETTLE_MS = 1_500 # domcontentloaded 후 lazy 본문 settle 대기 + +app = FastAPI(title="playwright-fetcher") +_browser_slot = asyncio.Semaphore(1) # 동시 1 인스턴스 (B-3 ① persistent 제약과 동일 규율) + + +class FetchReq(BaseModel): + url: str + profile: str = Field(pattern=r"^[a-z0-9_-]{1,50}$") + + +class ProbeReq(BaseModel): + profile: str = Field(pattern=r"^[a-z0-9_-]{1,50}$") + probe_url: str + min_body_chars: int = 800 + paywall_markers: list[str] = [] + + +def _state_path(profile: str) -> Path: + p = AUTH_DIR / f"{profile}.json" + if not p.is_file(): + raise HTTPException(503, detail={"error_reason": "profile_missing", "profile": profile}) + return p + + +async def _browse(url: str, state: Path) -> tuple[str, str, str]: + """(html, final_url, visible_text). 요청당 브라우저 — 종료를 finally 로 보장.""" + async with async_playwright() as pw: + browser = await pw.chromium.launch(headless=True) + try: + context = await browser.new_context( + storage_state=str(state), + viewport={"width": 1366, "height": 900}, + locale="fr-FR", + ) + page = await context.new_page() + await page.goto(url, wait_until="domcontentloaded", timeout=NAV_TIMEOUT_MS) + await page.wait_for_timeout(SETTLE_MS) + html = await page.content() + final_url = page.url + text = await page.evaluate("document.body ? document.body.innerText : ''") + return html, final_url, text + finally: + await browser.close() + + +@app.get("/health") +def health(): + profiles = sorted(p.stem for p in AUTH_DIR.glob("*.json")) if AUTH_DIR.is_dir() else [] + return {"status": "ok", "profiles": profiles} + + +@app.post("/fetch") +async def fetch(req: FetchReq): + state = _state_path(req.profile) + async with _browser_slot: + try: + html, final_url, _ = await _browse(req.url, state) + except PlaywrightError as e: + logger.warning("fetch 실패 %s: %s", req.url, e) + raise HTTPException(502, detail={"error_reason": "browse_failed", "message": str(e)[:300]}) + logger.info("fetch ok profile=%s %s (%d bytes)", req.profile, req.url, len(html)) + return {"html": html, "final_url": final_url} + + +@app.post("/probe") +async def probe(req: ProbeReq): + """내용 기반 세션 probe — ok=False 사유를 명시 반환 (호출측이 health 에 기록).""" + state = _state_path(req.profile) + async with _browser_slot: + try: + _, final_url, text = await _browse(req.probe_url, state) + except PlaywrightError as e: + return {"ok": False, "reason": f"browse_failed: {str(e)[:200]}", "body_chars": 0} + body_chars = len(text.strip()) + hit = next((m for m in req.paywall_markers if m and m.lower() in text.lower()), None) + if hit: + return {"ok": False, "reason": f"paywall_marker: {hit}", "body_chars": body_chars} + if body_chars < req.min_body_chars: + return {"ok": False, "reason": f"body_too_short: {body_chars} < {req.min_body_chars}", + "body_chars": body_chars} + logger.info("probe ok profile=%s (%d chars, final=%s)", req.profile, body_chars, final_url) + return {"ok": True, "reason": None, "body_chars": body_chars} diff --git a/tests/fixtures/guardian_open_platform_search_response.json b/tests/fixtures/guardian_open_platform_search_response.json new file mode 100644 index 0000000..fe7249e --- /dev/null +++ b/tests/fixtures/guardian_open_platform_search_response.json @@ -0,0 +1 @@ +{"response":{"status":"ok","userTier":"developer","total":251230,"startIndex":1,"pageSize":2,"currentPage":1,"pages":125615,"orderBy":"newest","results":[{"id":"world/live/2026/jun/10/iran-war-updates-missile-strikes-trump-us-retaliation-middle-east-crisis-war-live","type":"liveblog","sectionId":"world","sectionName":"World news","webPublicationDate":"2026-06-10T05:46:19Z","webTitle":"Middle East crisis live: Iran launches broad retaliatory attacks after US strikes over downed helicopter","webUrl":"https://www.theguardian.com/world/live/2026/jun/10/iran-war-updates-missile-strikes-trump-us-retaliation-middle-east-crisis-war-live","apiUrl":"https://content.guardianapis.com/world/live/2026/jun/10/iran-war-updates-missile-strikes-trump-us-retaliation-middle-east-crisis-war-live","fields":{"trailText":"Iran’s Revolutionary Guards Corps says it has targeted an airbase in Jordan hosting US forces, as well as Kuwait and Bahrain, in response to US strikes ","bodyText":"If the US genuinely wants a deal it will have to engage with Iranian demands on sanctions relief, says Danny Citrinowicz, the former head of the Iran branch of Israeli military intelligence. Today’s exchange of strikes shows how easily both Iran and the US can slide towards another round of escalation, says Citrinowicz, who is now a nonresident fellow at the Atlantic Council. He adds that regardless of how much pressure is applied, Iran has shown it will not abandon its current position and reach a deal without “meaningful economic relief”. If Washington is unwilling to accept that reality, it should recognize the likely alternative: continued confrontations with Iran that could eventually spiral beyond anyone’s control and lead to military conflict under less favorable conditions. Even a limited military campaign designed to weaken Iran would not fundamentally alter Tehran’s negotiating position. It has not happened in the past, and there is little reason to believe it would happen now. Iran emerges from the latest exchange of blows convinced that it can absorb pressure and respond to attacks.” Iran’s foreign ministry has warned that its neighbours in the Gulf have a “legal and moral responsibility” to prevent American and Israeli strikes. In a statement released hours after the exchange of fire between the US and Iran, the foreign ministry said there is a: Legal and moral responsibility of all countries in the region (especially those located along the southern shores of the Persian Gulf) to prevent the US military and Israel from using their territory or facilities to plan, organise, execute, or support hostile actions against Iran”. Donald Trump has addressed the most recent exchange of strikes between the US and Iran – by posting a 26-year-old clip from the NBC show The West Wing. Replying to the US military’s announcement of “self-defense strikes” on Iran, Trump posted a 1.33 minute clip from the programme, which sees the fictional president, Jed Bartlet, and his aides debate the approach to take after a US military plane is shot down over Syria. “What is the virtue of a proportional response?” Bartlet, portrayed by Martin Sheen, asks his chief of staff and military advisers. After becoming angry at their answers he demands that they engage in a “disproportional response”. Let the word ring forth from this time and this place, gentlemen – you kill an American, any American, we don’t come back with a proportional response. We come back with total disaster.” It is not entirely clear what message Trump is trying to convey – but he is perhaps unaware of how the episode ends. After the military presents Bartlet with a plan that would lead to hundreds of civilian casualties, the fictional president reluctantly picks the initial, proportional response. Despite its final episode airing more than 20 years ago, The West Wing has remained a cultural touchstone to many Americans, with some calling it a “bittersweet comfort watch” in the age of Trump. In 2016, as Trump secured his grip on the Republican party and launched his campaign for the presidency, pop-culture journalist Brian Moylan wrote in the Guardian: “The West Wing shows us a world where the political system works.” It reminds us of a time, not too long ago, when people in political office took their jobs very seriously and wanted to actually govern this country rather than settle scores and appeal to their respective bases.” You can watch the clip that Trump posted here: The Wall Street Journal has reported that Donald Trump was not convinced of the need to retaliate against Iran after the Apache Helicopter went down earlier on Tuesday. He spent much of the day playing down the incident, telling reporters that it wasn’t a big deal. But according to the WSJ, his mind was changed after a briefing from defense secretary, Pete Hegseth, and chairman of the joint chiefs of staff, Dan Caine. The Associated Press has reported that the Apache helicopter that crashed went down after colliding with an Iranian drone. It is not clear whether the collision was intentional, but US officials reportedly told the president that the attack merited a response nonetheless. Trump would then go on to say that Iran shot down the helicopter, in a post on Truth Social, and declared that the US must “respond to this attack.” Hours later the US began the strikes on Iran. Wednesday’s strikes by the US on Iran are just the latest in a series of ceasefire breaches that have escalated considerably in the last two weeks. After weeks of conflict, the US and Iran agreed to a ceasefire on 8 April and entered into protracted negotiations to reopen the strait of Hormuz and resolve the issue of Iran’s nuclear program. Since then the US and Iran have exchanged strikes on at least four occasions, but in every instance both sides have characterised their actions as “measured” and “limited”, and stressed the importance of maintaining the ceasefire. The ceasefire faced its biggest test on Sunday, when Iran launched missiles at Israel in response to Israeli strikes on Beirut’s southern suburbs. The Israeli military launched airstrikes on Iran in retaliation; the first exchange of fire between the two countries since the ceasefire was reached. Fears of a return to a full-scale regional war in the Middle East eased on Monday, with Israel and Iran saying they had halted attacks on each other after an appeal from Donald Trump to “immediately stop shooting”. The breaches of the ceasefire fly in the face of Trump’s continued claims that a longterm deal with Iran is close. The US president is reportedly very close to agreeing to a series of Iranian demands that would allow the strait to reopen to traffic, and begin the process of a new round of nuclear negotiations. However Trump has for weeks promised that a deal is close, but failed to follow through on those promises. US House speaker Mike Johnson is among the many senior American officials who have been playing down the significance of the strikes. He called the strikes on Iran “targeted”, “proportional” and “defensive in nature.” Johnson said he spent several hours earlier in the situation room with Trump, the vice-president, JD Vance, secretary of state, Marco Rubio and defense secretary, Pete Hegseth, discussing the Iran war and other matters. “We lament that has become necessary,” he said. But he said after Iran struck US assets and personnel in the region, “We can’t allow that.” Nearly all the missiles and drones launched by Iran over the last few hours were intercepted, a US officials has told the Reuters news agency. The US official said that the military was not aware of any reports of harm to US personnel, or known damage to US locations at this time. According to the official, the US struck nearly 20 targets in Iran on Wednesday morning. We’ll bring you more on this when we have it. Jordanian armed forces said on Wednesday they intercepted and shot down five missiles launched from Iran toward the al-Azraq area in Jordan. The military said that debris from the interception operation fell on Jordanian territory but caused no injuries or material damage. Earlier Iran’s Revolutionary Guards said they had launched a missile attack at an airbase in Jordan hosting US forces, after also targeting Kuwait and Bahrain. The missiles targeted the Muwaffaq Salti airbase, which is known to host US F-35 fighter jets and other aircraft. As the US launched several waves of strikes on Iran, Asian share markets fell and oil prices surged. Escalating tensions in the Middle East have unsettled markets, dimming hopes for an end to the months-long war that has pushed commodities higher and stoked inflation worries. Japan’s Nikkei fell 0.9% while the tech-heavy South Korean KOSPI slumped 2%. Oil prices climbed about 1% in early trade, moving away from a seven-week low touched in the previous session. Brent futures rose 0.9% to $92.29 a barrel, while US West Texas Intermediate (WTI) crude climbed 0.8% to $88.97. “Oil holding around $90 despite fresh Iran headlines suggests markets are not pricing a sustained supply disruption. That leaves room for a bigger repricing if energy infrastructure, shipping routes or U.S. involvement escalate,” said Charu Chanana, chief investment strategist at Saxo in Singapore. US investor will be focused on inflation data, which is set to be released later on Wednesday. The report – covering the last 12 months through to May – will gauge the impact of the war, with a Reuters survey of economists predicting that inflation likely increased 4.2% in the perdiod. Welcome to our live coverage of the crisis in the Middle East. Iran says it has launched a missile attack at an airbase in Jordan hosting US forces, after also targeting Kuwait and Bahrain. The Revolutionary Guards said missiles have targeted the Muwaffaq Salti airbase, which is known to host US F-35 fighter jets and other aircraft. Neither Jordan nor the US has acknowledged any attack, but if confirmed it would likely be the first time that Iran has targeted Jordan since the start of the ceasefire in April. The US strikes on Iran followed the downing of a US Apache helicopter over the strait of Hormuz, from which two crew members were rescued in a stable condition. In a post on social media Trump said the US “must” respond to the helicopter crash. Here is the latest: The US launched multiple waves of strikes on Iran in response to a military helicopter crash off the strait of Hormuz that Donald Trump said Iran had downed. The Associated Press reported that the Apache helicopter that crashed went down after colliding with an Iranian drone, but it was not clear whether the collision was intentional. US strikes were reported across Iran’s southern coast, on the strait of Hormuz. After more than three hours of military action, US central command (Centcom) said strikes were “completed”, adding that the US remained ready to defend against “unjustified Iranian aggression.” Soon after, Iran launched retaliatory attacks against the US, according to the countries state media, which said American bases in the region and the US fifth fleet in Bahrain were targeted with drones. Kuwait and Bahrain issued air raid alerts and reported that air defences were active in repelling attacks. Iran also claimed it had targeted a US base in Jordan with long range missiles. Iran’s foreign minister, Abbas Araghchi, said no attack would go “unanswered”, soon after the US launched strikes on Iran. Posting an image of the strait of Hormuz with the label, “Forever Persian Gulf”, Araghchi says that “despite its defeats on the battlefield, the U.S. opted to test our determination.” Five hours before the airstrikes, Trump had posted on social media that the US “must” respond to the helicopter crash, from which two crew members were rescued in stable condition. Before his social media post, however, Trump appeared to downplay the crash, telling the Wall Street Journal in a phone interview that it “wasn’t a big deal” and that “the pilot is fine.” Iranian state media reported that no air military operations have taken place in the strait of Hormuz over the past 24 hours, according to Reuters. Lebanon’s health ministry said 11 people were killed in Israeli airstrikes on the southern city of Tyre on Tuesday. The state-run National News Agency (NNA) had reported the first strike taking place not long before Israel’s military issued an evacuation warning for the entire city and surrounding areas ahead of strikes there."},"isHosted":false,"pillarId":"pillar/news","pillarName":"News"},{"id":"world/2026/jun/10/only-one-in-10-europeans-now-see-us-as-an-ally-survey-suggests","type":"article","sectionId":"world","sectionName":"World news","webPublicationDate":"2026-06-10T04:00:11Z","webTitle":"Only one in 10 Europeans now see US as an ally, survey suggests","webUrl":"https://www.theguardian.com/world/2026/jun/10/only-one-in-10-europeans-now-see-us-as-an-ally-survey-suggests","apiUrl":"https://content.guardianapis.com/world/2026/jun/10/only-one-in-10-europeans-now-see-us-as-an-ally-survey-suggests","fields":{"trailText":"Exclusive: poll across 15 countries finds ‘deep mistrust’, with majority doubting US would come to their aid in an attack","bodyText":"European confidence in an American “security guarantee” has hit a historic low, a survey suggests, with only one in 10 people across 15 countries seeing the US as an ally and majorities in all doubting it would come to their aid if they were attacked. The survey, published on Wednesday by the European Council on Foreign Relations (ECFR) thinktank before critical G7 and Nato summits in France and Turkey over the coming weeks, revealed “deep European distrust in the US”, the authors said. It also showed that, while many Europeans felt relations with Washington would improve once Donald Trump leaves office, they were increasingly ready in the meantime to protect themselves against US unreliability by bolstering Europe’s defence. The US president’s Middle East aggression, threats against Greenland, vows to withdraw troops from European bases and scepticism on the future of Nato had also prompted a growing European pragmatism, the report said. “Across the continent, there’s clear support for reducing dependence on Washington,” said Jana Kobzová, a co-author and ECFR senior policy fellow. “Europeans are increasingly open to higher defence spending and, crucially, show a striking degree of confidence that neighbouring countries would come to their aid in a crisis.” Paweł Zerka, Kobzová’s co-author and also an ECFR senior policy fellow, said clear public demand for greater self-reliance and the need to hedge against US defence guarantees had “created a window for Europe’s leaders to go further and faster” on security. The survey, based on polling carried out in May in Austria, Bulgaria, Denmark, Estonia, France, Germany, Hungary, Italy, the Netherlands, Poland, Portugal, Spain, Sweden, Switzerland and the UK, showed an average of just 11% of respondents across all countries now viewed the US as an ally. That compared with 16% six months ago, and 22% in November 2024. The prevailing view was that the US was now a “necessary partner”, although 13% of the European public said they considered the US a rival and 12% a direct adversary. Majorities in every country were no longer confident the US would come to their aid in an attack. Except in Bulgaria, most people – including in countries with large far-right parties such as France, Italy, the Netherlands and Sweden – believed “at least some European countries” would help them in a similar scenario. Europeans were now on average 4% more likely to support higher national defence spending than last year, the survey found, with Italy the only country where a clear majority remained opposed. On average, 47% of respondents backed the ideal of collective EU borrowing for finance greater defence spending, with 35% opposed to it. Support was strongest in Portugal (59%), Denmark (56%), the Netherlands (55%) and Spain. In almost every country polled, most respondents said their country should reduce its strategic dependence on US military hardware, with “buy European” backers most numerous in Denmark (75%), the Netherlands (72%), Sweden (70%), Portugal (69%), France (66%), Switzerland (64%), the UK and Spain (both 62%). There was, however, markedly less support for the idea of cutting domestic public spending to pay for higher national defence budgets, with opposition strongest in Italy (63%), Austria (59%), Germany (56%), Spain (54%) and Denmark (52%). There was also little backing (29%) for replacing Nato with a new EU-only defence body, with the dominant view in almost every country except Bulgaria that US-European relations would “probably get better” once Trump leaves – a view held by 60% or more in France, Spain, Denmark, the Netherlands and Sweden. Despite rising energy costs, 44% of Europeans said it would be a “rather bad” or “very bad” idea to resume importing oil and gas from Russia. Ukraine’s ambition to join the EU, however, continues to divide European opinion, with respondents in countries including Hungary, Bulgaria, Austria, Germany and even Estonia, one of Kyiv’s staunchest supporters, more likely to oppose admitting Ukraine “in the current context” than favour it."},"isHosted":false,"pillarId":"pillar/news","pillarName":"News"}]}} \ No newline at end of file diff --git a/tests/fixtures/kosha_attach_response.json b/tests/fixtures/kosha_attach_response.json new file mode 100644 index 0000000..a299740 --- /dev/null +++ b/tests/fixtures/kosha_attach_response.json @@ -0,0 +1 @@ +{"body":{"pageNo":1,"totalCount":1,"numOfRows":5,"items":{"item":[{"filenm":"컨베이어에 끼임.pdf","filepath":"https://portal.kosha.or.kr/openapi/v1/file/down/stdboard/B2025022104002/202605281621537G75H2/D0801000010001","boardno":"202605281621537G75H2"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}} \ No newline at end of file diff --git a/tests/fixtures/kosha_board_response.json b/tests/fixtures/kosha_board_response.json new file mode 100644 index 0000000..53d76df --- /dev/null +++ b/tests/fixtures/kosha_board_response.json @@ -0,0 +1 @@ +{"body":{"pageNo":1,"totalCount":6334,"numOfRows":3,"items":{"item":[{"business":"제조업","contents":"2026.01.00(월) 07:30경, 경기도 소재 OOOO(주)에서 재해자가 골재 이송 컨베이어 상부의 이물질을 제거하던 중,다리가 컨베이어 벨트와 테일 풀리 (Tail Pulley)* 사이에 끼임 *컨베이어의 아래쪽 끝단에서 회전하며 벨트를 순환시키는 원통형 기계장치","atcflcnt":1,"keyword":"컨베이어에 끼임","boardno":"202605281621537G75H2"},{"business":"건설업","contents":"2025. 8. 00. (금) 11:12 경 경기도 소재 OOO 신축공 사현장에서 데크플레이트 설치 중 밟고 있던 미고정 데크플레이트가 탈락하며 약 7m 높이에서 추락함","atcflcnt":1,"keyword":"데크플레이트 설치 작업 중 추락","boardno":"20260528162031VZLE93"},{"business":"건설업","contents":"2025. 06. 00.(금) 12:35경, 경북 봉화군 소재 (주)OOOO 침전저류지 현장에서 타워크레인 전도 후 매립된 케이크*(오염토)를 굴착 및 운반 작업 중, 사면의 토사와 타워크레인 기초구조물이 무너지며 하단에서 작업 중이던 굴착기가 매몰됨 * 분말 상태의 원료에서 아연을 채취한 후 남은 중금속 부산물(산화칼슘, 납, 산화철, 황산 등)을 장기간 매립하여 만들어지는 고체 형태의 오염 토양 덩어리","atcflcnt":1,"keyword":"사면 굴착 작업 중 매몰","boardno":"20260527153100O7QX25"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}} \ No newline at end of file diff --git a/tests/fixtures/kosha_guide_response.json b/tests/fixtures/kosha_guide_response.json new file mode 100644 index 0000000..e874576 --- /dev/null +++ b/tests/fixtures/kosha_guide_response.json @@ -0,0 +1 @@ +{"body":{"pageNo":1,"totalCount":1039,"numOfRows":3,"items":{"item":[{"techGdlnNm":"구리에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-1-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883045/7"},{"techGdlnNm":"마그네슘에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-4-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883165/3"},{"techGdlnNm":"백금에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-6-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883187/3"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}} \ No newline at end of file diff --git a/tests/test_crawl_cycle2_shapes.py b/tests/test_crawl_cycle2_shapes.py new file mode 100644 index 0000000..940aede --- /dev/null +++ b/tests/test_crawl_cycle2_shapes.py @@ -0,0 +1,115 @@ +"""crawl-24x7 사이클 2 — 순수 함수/형태 회귀 테스트 (DB 불요). + +Guardian 호출 형태 + fixture 응답 파싱 + 채널 정체성 + B-5 quirk. +fixture = tests/fixtures/guardian_open_platform_search_response.json +(2026-06-10 실키 live 박제, api-key 응답 본문 미포함 확인 — [[feedback_external_api_fixture_first]]). +""" + +import json +import re +from pathlib import Path + +from workers.news_collector import ( + _article_hash, + _doc_identity, + _guardian_request, + _normalize_category, +) + +FIXTURE = Path(__file__).parent / "fixtures" / "guardian_open_platform_search_response.json" + + +def _make_source(**kw): + """ORM 인스턴스 없이 속성만 흉내 (식별성 함수는 속성 접근만 사용).""" + class S: + pass + s = S() + s.source_channel = kw.get("source_channel", "news") + s.parser_quirk = kw.get("parser_quirk") + return s + + +class TestGuardianCallShape: + def test_request_shape_matches_fixture_recipe(self): + """fixture 박제 시 사용한 호출과 단일 source-of-truth 정합 + ([[feedback_fixture_first_call_shape]]).""" + endpoint, params = _guardian_request( + "https://content.guardianapis.com/search?section=world", "KEY" + ) + assert endpoint == "https://content.guardianapis.com/search" + assert params["section"] == "world" + assert params["show-fields"] == "bodyText,trailText" + assert params["order-by"] == "newest" + assert params["api-key"] == "KEY" + + def test_feed_url_query_overridden_by_fixed_fields(self): + # feed_url 에 show-fields 가 잘못 박혀 있어도 고정 필드가 이긴다 (dict merge 순서) + _, params = _guardian_request( + "https://content.guardianapis.com/search?section=world&show-fields=headline", "K" + ) + assert params["show-fields"] == "bodyText,trailText" + + +class TestGuardianFixtureParsing: + def test_fixture_response_shape(self): + payload = json.loads(FIXTURE.read_text())["response"] + assert payload["status"] == "ok" + assert payload["results"], "fixture 에 결과 0건" + for item in payload["results"]: + assert item["webTitle"].strip() + assert item["webUrl"].startswith("https://") + assert "webPublicationDate" in item + assert "sectionName" in item + fields = item.get("fields") or {} + assert "bodyText" in fields and "trailText" in fields + + def test_fixture_bodytext_is_fulltext_grade(self): + payload = json.loads(FIXTURE.read_text())["response"] + # 전문 게이트(200자)를 fixture 가 통과해야 어댑터 is_full 경로가 산다 + assert any(len(i["fields"]["bodyText"]) >= 200 for i in payload["results"]) + + def test_fixture_contains_no_api_key(self): + assert "api-key" not in FIXTURE.read_text() + + +class TestChannelIdentity: + def test_news_channel_unchanged(self): + ident = _doc_identity(_make_source(source_channel="news"), "경향신문", "Society") + assert ident == { + "path_prefix": "news", + "ai_domain": "News", + "ai_tags": ["News/경향신문/Society"], + } + + def test_crawl_channel_domain_identity(self): + ident = _doc_identity(_make_source(source_channel="crawl"), "TWI", "Engineering") + assert ident["path_prefix"] == "crawl" + assert ident["ai_domain"] == "Engineering" + assert ident["ai_tags"] == ["Engineering/TWI"] + + def test_crawl_channel_unknown_category_falls_back(self): + ident = _doc_identity(_make_source(source_channel="crawl"), "X", "Other") + assert ident["ai_domain"] == "Domain" + + def test_category_map_has_domain_axes(self): + assert _normalize_category("안전") == "Safety" + assert _normalize_category("Engineering") == "Engineering" + assert _normalize_category("철학") == "Philosophy" + + +class TestSkipVideoQuirk: + PATTERN = re.compile(r"/videos?/") + + def test_video_urls_match(self): + assert self.PATTERN.search("https://psyche.co/videos/some-film") + assert self.PATTERN.search("https://aeon.co/video/another") + + def test_article_urls_pass(self): + assert not self.PATTERN.search("https://psyche.co/ideas/how-to-think") + + +class TestArticleHashStability: + def test_static_corpus_hash_deterministic(self): + a = _article_hash("Creep and Creep Failures", "static", "National Board 기술 아티클") + b = _article_hash("Creep and Creep Failures", "static", "National Board 기술 아티클") + assert a == b and len(a) == 32