Compare commits

..

11 Commits

Author SHA1 Message Date
hyungi f4e5db9723 fix(news): 304 를 redirect 로 오인하던 버그 — is_redirect → has_redirect_location
httpx 의 Response.is_redirect 는 3xx 전체(304 Not Modified 포함)에 True 라,
조건부 GET 으로 304 를 받으면 location 없는 같은 URL 을 3회 재요청 후
'redirect 3회 초과'로 오류 처리 → ETag/Last-Modified 받는 안정 피드(SEP/HSE/OSHA
/철학 RSS 등)가 2번째 사이클부터 전멸하던 systematic 버그.

- 304 처리를 redirect 루프보다 앞으로 이동.
- redirect 판별을 has_redirect_location(=location 헤더 있는 진짜 redirect)으로 교체.
  news_collector._fetch_rss + crawl_politeness.fetch_page 동일 함정 양쪽 수정.
- 사이클 1 파일럿(경향)은 304 를 받은 적 없어 잠복했고, 안정 피드 첫 304 에서 발현.
- 회귀 테스트 3건(304 비-redirect / 진짜 redirect / 코드 패턴 audit).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 06:32:15 +09:00
hyungi 69db9bcb94 fix(news): 안티봇 챌린지 페이지 식별 게이트 — DataDome corruption 차단 (B-3 실측)
르몽드 기사 = DataDome Client Challenge(316자)가 200자 본문 floor 통과 → 챌린지
HTML 이 기사 본문으로 승격되는 silent corruption 위험. fetch_page_via_browser 에
챌린지 마커 게이트 추가 → CrawlBlocked(degrade=RSS 요약 유지). 헤드리스 탐지라 재시도 무의미.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 17:04:11 +09:00
hyungi 61e5a416d0 fix(news): fetch_page content-type 허용 파라미터 — TWI sitemap(text/xml) 수집 (검증 게이트 발견)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 16:41:30 +09:00
hyungi cdf4ee0ef6 fix(news): Guardian sectionName 'World news' 카테고리 매핑 (셀프 리뷰)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 16:37:22 +09:00
hyungi 251a5392ef fix(services): playwright-fetcher pwuser 실행 — root Chromium sandbox 함정 회피
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 15:11:03 +09:00
hyungi 1842f27d89 feat(news): crawl-24x7 사이클 2 — B-2/B-3/C-1/C-2/C-3/C-5 (마이그 324-326)
- 채널 인지화: news_sources.source_channel(324, documents enum 재사용) →
  문서 생성 정체성(_doc_identity)·embed/chunk 30일 게이트(crawl=전량 색인)·
  extract 후속 override(crawl→classify, preview 스킵) 분기.
- B-2 Guardian Open Platform: API 디스패치(호스트 분기, 미지 호스트=명시 실패)
  + show-fields=bodyText 전문 어댑터. fixture live 박제 + call-shape 테스트.
- B-3 구독지: playwright-fetcher 격리 컨테이너(동시 1·요청당 브라우저·storage_state
  ro mount) + politeness 사람속도(30-60s) 브라우저 경로 + fulltext 인증 라우팅
  (내용 기반 probe 게이트·relogin_requested 소비=open-스킵보다 앞·본문 페이월 마커
  게이트) + source_health probe 컬럼(325) + 세션 박제 스크립트(맥북용).
- C-2 KOSHA: 3 API live 검증·fixture 박제(board/attach/guide) — 재해사례 daily diff
  +첨부 PDF/HWP→extract 파이프라인, GUIDE 일일 cap 점진 백필(silent cap 금지 로그).
  키는 URL 직결합(재인코딩 함정 회피). daily 06:40 KST.
- C-3 정적 코퍼스: National Board 86 + TWI job-knowledge 153 일괄 CLI(멱등·politeness
  ·crawl_raw 보존·fulltext_worker 승격 필드 규약 동일).
- C-1/C-5 시드(326): 전 URL live 검증 — UK HSE(feed-full)/안전신문/고용노동부 3종
  (rss/*.do)/OSHA/EU-OSHA(후보)/SEP/1000-Word(feed-full)/Doing Philosophy/Aeon/Psyche
  (skip-video quirk).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 15:08:18 +09:00
hyungi 53a30449e2 fix(news): crawl_politeness logger 를 setup_logger 로 정합화 — INFO 대기 로그 가시화
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 13:47:18 +09:00
hyungi ab668d7990 fix(news): crawl_raw 파일명 CHAR(64) 패딩 strip + politeness 대기 로그
- documents.file_hash 실 컬럼이 character(64) — 32자 해시가 공백 패딩되어
  gz 파일명에 공백 32개 포함 (실배포 1건 실측). _raw_html_path 에서 strip.
- _respect_domain_rate silent sleep 에 대기 로그 1줄 (검증 게이트·운영 가시성).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 13:43:29 +09:00
hyungi dcf99b377e fix(news): 적대 리뷰 반영 — reconcile auto-correlation·워터마크 검증 후 영속·수집 락
- fulltext_worker.reconcile_unresolved: EXISTS 서브쿼리 aliased(ProcessingQueue) —
  auto-correlation 이 FROM 전부 제거해 매 실행 InvalidRequestError (안전망 dead code).
  SQLAlchemy 2.0.50 컴파일 재현·수정 확인.
- news_collector._fetch_rss: ETag/Last-Modified/content-hash 영속을 bozo 파싱 검증
  뒤로 이동 — 부패 응답 워터마크 저장 시 영구 304-skip 차단.
- news_collector.run: 모듈 락으로 수동 collect vs 6h 스케줄 동시 실행 차단 —
  _get_or_create_health 동시 INSERT 의 uq_source_health_source_id 위반이
  사이클 전체를 죽이는 경합 봉쇄.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 13:34:46 +09:00
hyungi 3df0ca53ab feat(services): crawl-24x7 A-8 헬스 패널 + D-1 stt/marker idle-unload
A-8 1차: crawl-health 컨테이너(100.110.63.63:8765 Tailscale 바인딩 전용, 읽기 전용 SELECT, caddy 라우트 금지).
D-1 전제 작업: STT_PRELOAD=0+30분 유휴 해제(lock+inflight+reaper), marker MARKER_PRELOAD=0+idle-unload,
/ready idle=200(503=warmup_failed 한정 — fastapi depends_on 정합), healthcheck cuda 기준 전환.
2026-06-10 13:03:31 +09:00
hyungi 7cd8cfde0a feat(news): crawl-24x7 A그룹 — 레지스트리 증축·조건부 GET·fulltext 승격·politeness·source_health
A-3 migrations 319-323 (news_sources 9컬럼 + source_channel 'crawl' + process_stage 'fulltext' + source_health)
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송)+콘텐츠 해시 변경감지, A-4 politeness 코어(per-domain 직렬+robots+정직UA),
A-2+A-7 fulltext_worker(4-tier 재사용·NAS crawl_raw gzip 보존·격하 경로·03:40 reconcile 안전망),
A-5 circuit breaker(3/10 임계, enabled 미터치), A-6 포털 전재 2차 dedup(제목+3일, 12자 게이트).
기존 소스 fulltext_policy='none' 기본 = 무회귀. plan crawl-24x7-1, 예외 박제 crawl-24x7-exec1-20260610.md
2026-06-10 13:03:31 +09:00
35 changed files with 2627 additions and 125 deletions
+284
View File
@@ -0,0 +1,284 @@
"""크롤링 politeness 코어 (A-4, plan crawl-24x7-1)
개인 아카이빙 권장치를 그대로 박은 공용 fetch 계층:
- per-domain 동시성 1 (asyncio.Lock) + 같은 도메인 연속 요청 515초 지연 + jitter
- robots.txt 존중 (urllib.robotparser, 24h 캐시) — 비로그인 공개 크롤링 한정.
로그인 세션 fetch (B-3) 는 사용자 행위 성격이라 robots 대신 사람 속도가 기준.
- 정직 식별 UA + 연락처 (익명 크롤링 트랙. 로그인 세션은 브라우저 UA 유지 — B-3)
- 429 = Retry-After 존중 / 5xx = 재시도 가능 / 403 = 차단 신호 (호출측 circuit 연동)
도메인별 마지막 요청 시각 등 rate 상태는 in-process (영속 워터마크는 DB — news_sources).
SSRF 차단은 core.url_validator.validate_feed_url 재사용 (redirect target 재검증 포함).
"""
import asyncio
import random
import time
import urllib.robotparser
from urllib.parse import urljoin, urlparse
import httpx
from core.url_validator import validate_feed_url
from core.utils import setup_logger
# bare getLogger 는 root(WARNING) 상속이라 INFO 대기/차단 로그가 드랍됨 — 타 워커와 동일 설정
logger = setup_logger("crawl_politeness")
# 정직 식별 UA + 연락처 — 차단 전 연락 통로 (A-4)
CRAWL_UA = "HyungiPKM-Archiver/1.0 (personal archive; +mailto:hyun49196@gmail.com)"
# 같은 도메인 연속 요청 간격 (초) — 권장치 515s + jitter
_DOMAIN_DELAY_MIN = 5.0
_DOMAIN_DELAY_MAX = 15.0
# 구독 세션(브라우저) fetch 간격 — 사람 속도 (B-3 ④: 기사 간 수십 초)
_AUTH_DELAY_MIN = 30.0
_AUTH_DELAY_MAX = 60.0
# B-3 Playwright 격리 컨테이너 (internal-only, compose DNS)
_FETCHER_URL = "http://playwright-fetcher:3400"
_FETCHER_TIMEOUT = 120.0 # 브라우저 기동 + 네비게이션 + settle 포함
# 안티봇 챌린지 페이지 식별 마커 (DataDome/Cloudflare 등) — 좁게 유지(오탐 회피).
# 실측: 르몽드 기사 = DataDome "Client Challenge" + "Entrez les caractères" CAPTCHA.
_CHALLENGE_MARKERS = (
"Client Challenge",
"Entrez les caractères affichés",
"Checking your browser before",
"captcha-delivery.com",
"geo.captcha-delivery",
)
_ROBOTS_CACHE_TTL = 24 * 3600 # 24h
_MAX_PAGE_BYTES = 5 * 1024 * 1024 # 피드 fetch 와 동일 5MB cap
_PAGE_TIMEOUT = 20.0
_MAX_REDIRECTS = 3
_HTML_CONTENT_TYPES = ("text/html", "application/xhtml+xml")
class CrawlFetchError(Exception):
"""일시 오류 (5xx / timeout / 네트워크) — 큐 재시도 대상."""
class CrawlBlocked(Exception):
"""차단 신호 (403 / 429 / robots disallow) — 재시도보다 backoff/circuit 대상."""
class CrawlSkip(Exception):
"""영구 비대상 (비-HTML / 크기 초과 / SSRF 차단 / 4xx) — 격하 처리 대상."""
# 도메인별 직렬화 상태 (in-process)
_domain_locks: dict[str, asyncio.Lock] = {}
_domain_last_request: dict[str, float] = {}
# host → (cached_at, RobotFileParser | None). None = robots 없음/4xx (전부 허용)
_robots_cache: dict[str, tuple[float, urllib.robotparser.RobotFileParser | None]] = {}
def _domain_of(url: str) -> str:
return (urlparse(url).hostname or "").lower()
def _get_lock(domain: str) -> asyncio.Lock:
if domain not in _domain_locks:
_domain_locks[domain] = asyncio.Lock()
return _domain_locks[domain]
async def _respect_domain_rate(
domain: str,
delay_min: float = _DOMAIN_DELAY_MIN,
delay_max: float = _DOMAIN_DELAY_MAX,
) -> None:
"""같은 도메인 직전 요청에서 delay(jitter) 경과할 때까지 대기."""
last = _domain_last_request.get(domain)
if last is not None:
delay = random.uniform(delay_min, delay_max)
wait = last + delay - time.monotonic()
if wait > 0:
# silent sleep 금지 — politeness 동작 검증·운영 관찰 가시성
logger.info("[politeness] %s %.1fs 대기", domain, wait)
await asyncio.sleep(wait)
async def _fetch_robots(client: httpx.AsyncClient, scheme: str, host: str):
"""robots.txt 조회. 4xx/부재 = 전부 허용(None), 5xx/오류 = 보수적으로 이번 사이클 차단."""
robots_url = f"{scheme}://{host}/robots.txt"
try:
resp = await client.get(robots_url, headers={"User-Agent": CRAWL_UA})
except httpx.HTTPError as e:
raise CrawlFetchError(f"robots.txt 조회 실패: {host}: {e}") from e
if resp.status_code >= 500:
# 5xx 는 의도 불명 — 표준 관행대로 이번 사이클은 차단 취급
raise CrawlFetchError(f"robots.txt 5xx: {host}: {resp.status_code}")
if resp.status_code >= 400:
return None # robots 없음 = 전부 허용
rp = urllib.robotparser.RobotFileParser()
rp.parse(resp.text.splitlines())
return rp
async def _robots_allows(client: httpx.AsyncClient, url: str) -> bool:
parsed = urlparse(url)
host = (parsed.hostname or "").lower()
cached = _robots_cache.get(host)
if cached is None or time.monotonic() - cached[0] > _ROBOTS_CACHE_TTL:
rp = await _fetch_robots(client, parsed.scheme or "https", host)
_robots_cache[host] = (time.monotonic(), rp)
cached = _robots_cache[host]
rp = cached[1]
if rp is None:
return True
return rp.can_fetch(CRAWL_UA, url)
async def fetch_page(
url: str, *, check_robots: bool = True,
content_types: tuple[str, ...] = _HTML_CONTENT_TYPES,
) -> tuple[str, str]:
"""공개 페이지 1건 politeness fetch. (html_text, final_url) 반환.
- SSRF 검증 (redirect target 포함, news_collector 피드 fetch 와 동일 이중 검증)
- per-domain 동시성 1 + 515s jitter 지연
- 429: Retry-After 로그 후 CrawlBlocked / 403: CrawlBlocked / 그 외 4xx: CrawlSkip
- 5xx/timeout: CrawlFetchError (큐 재시도)
- 비-HTML content-type / 5MB 초과: CrawlSkip
"""
try:
validate_feed_url(url)
except ValueError as e:
raise CrawlSkip(f"URL 검증 실패: {e}") from e
domain = _domain_of(url)
async with _get_lock(domain):
await _respect_domain_rate(domain)
try:
async with httpx.AsyncClient(
timeout=_PAGE_TIMEOUT, follow_redirects=False,
headers={"User-Agent": CRAWL_UA},
) as client:
if check_robots and not await _robots_allows(client, url):
raise CrawlBlocked(f"robots.txt disallow: {url}")
resp = await client.get(url)
redirects = 0
# has_redirect_location = location 헤더 있는 진짜 redirect 만 (httpx 의
# is_redirect 는 3xx 전체라 304 등을 redirect 로 오인 — news_collector 동일 함정)
while resp.has_redirect_location and redirects < _MAX_REDIRECTS:
location = urljoin(str(resp.request.url), resp.headers["location"])
try:
validate_feed_url(location)
except ValueError as e:
raise CrawlSkip(f"redirect target 차단: {e}") from e
# redirect 도 같은 도메인 연속 요청 — 간격은 lock 보유로 충분 (즉시 1회)
resp = await client.get(location)
redirects += 1
if resp.has_redirect_location:
raise CrawlSkip(f"redirect {_MAX_REDIRECTS}회 초과: {url}")
except httpx.TimeoutException as e:
raise CrawlFetchError(f"timeout: {url}") from e
except httpx.HTTPError as e:
raise CrawlFetchError(f"네트워크 오류: {url}: {e}") from e
finally:
_domain_last_request[domain] = time.monotonic()
if resp.status_code == 429:
retry_after = resp.headers.get("retry-after", "")
logger.warning("[politeness] 429 %s (Retry-After=%s)", domain, retry_after or "-")
raise CrawlBlocked(f"429 rate limited: {url} (Retry-After={retry_after or '-'})")
if resp.status_code == 403:
raise CrawlBlocked(f"403 forbidden: {url}")
if resp.status_code >= 500:
raise CrawlFetchError(f"{resp.status_code}: {url}")
if resp.status_code >= 400:
raise CrawlSkip(f"{resp.status_code}: {url}")
ct = resp.headers.get("content-type", "").lower()
if ct and not any(t in ct for t in content_types):
raise CrawlSkip(f"비허용 content-type: {ct}: {url}")
if len(resp.content) > _MAX_PAGE_BYTES:
raise CrawlSkip(f"크기 초과: {len(resp.content)} bytes: {url}")
return resp.text, str(resp.request.url)
# ── B-3 구독 세션 fetch (Playwright 격리 컨테이너 경유) ──────────────────────
async def fetch_page_via_browser(url: str, profile: str) -> tuple[str, str]:
"""인증 페이지 1건 — playwright-fetcher 에 위임, politeness 는 사람 속도(30~60s).
(html_text, final_url) 반환. robots 미적용 — 구독 계약 기반 개인 보관 fetch 로
공개 크롤러 규약 대상이 아님 (대신 사람 속도 + 동시 1 + 야간 저빈도가 보호 장치).
예외 어휘는 fetch_page 와 동일 (호출측 분기 재사용).
"""
try:
validate_feed_url(url)
except ValueError as e:
raise CrawlSkip(f"URL 검증 실패: {e}") from e
domain = _domain_of(url)
async with _get_lock(domain):
await _respect_domain_rate(domain, _AUTH_DELAY_MIN, _AUTH_DELAY_MAX)
try:
async with httpx.AsyncClient(timeout=_FETCHER_TIMEOUT) as client:
resp = await client.post(
f"{_FETCHER_URL}/fetch", json={"url": url, "profile": profile}
)
except httpx.TimeoutException as e:
raise CrawlFetchError(f"browser fetch timeout: {url}") from e
except httpx.HTTPError as e:
raise CrawlFetchError(f"playwright-fetcher 연결 오류: {e}") from e
finally:
_domain_last_request[domain] = time.monotonic()
if resp.status_code == 503:
# storage_state 부재 — 수동 세션 박제 대기 (호출측 degrade, 재시도 루프 금지)
raise CrawlBlocked(f"세션 프로필 부재: {profile}")
if resp.status_code != 200:
raise CrawlFetchError(f"playwright-fetcher {resp.status_code}: {url}")
data = resp.json()
html_text = data.get("html", "")
if len(html_text.encode("utf-8", errors="replace")) > _MAX_PAGE_BYTES:
raise CrawlSkip(f"크기 초과 (browser): {url}")
# 안티봇 챌린지 페이지(DataDome 등) 식별 — 본문 길이 게이트(200자)를 통과하는
# 짧은 챌린지 HTML 이 기사 본문으로 승격되는 silent corruption 차단. 헤드리스 탐지라
# 재시도 무의미 → CrawlBlocked(=degrade, RSS 요약 유지). 마커는 보수적으로 좁게.
if any(m in html_text for m in _CHALLENGE_MARKERS):
raise CrawlBlocked(f"안티봇 챌린지 페이지(headless 차단): {url}")
return html_text, data.get("final_url", url)
async def probe_session(
profile: str, probe_url: str, min_body_chars: int, paywall_markers: list[str]
) -> dict:
"""내용 기반 세션 probe (B-3 ②) — {'ok': bool, 'reason': str|None, 'body_chars': int}.
실패를 예외가 아닌 값으로 반환 — 호출측이 source_health 에 기록하고 degrade 분기.
probe 도 실제 publisher fetch 라 동일 도메인 lock + 사람 속도 적용.
"""
domain = _domain_of(probe_url)
async with _get_lock(domain):
await _respect_domain_rate(domain, _AUTH_DELAY_MIN, _AUTH_DELAY_MAX)
try:
async with httpx.AsyncClient(timeout=_FETCHER_TIMEOUT) as client:
resp = await client.post(
f"{_FETCHER_URL}/probe",
json={
"profile": profile,
"probe_url": probe_url,
"min_body_chars": min_body_chars,
"paywall_markers": paywall_markers,
},
)
except httpx.HTTPError as e:
return {"ok": False, "reason": f"fetcher 연결 오류: {e}", "body_chars": 0}
finally:
_domain_last_request[domain] = time.monotonic()
if resp.status_code == 503:
return {"ok": False, "reason": f"세션 프로필 부재: {profile}", "body_chars": 0}
if resp.status_code != 200:
return {"ok": False, "reason": f"fetcher {resp.status_code}", "body_chars": 0}
return resp.json()
+7
View File
@@ -54,6 +54,8 @@ async def lifespan(app: FastAPI):
from workers.law_monitor import run as law_monitor_run
from workers.mailplus_archive import run as mailplus_run
from workers.news_collector import run as news_collector_run
from workers.fulltext_worker import reconcile_unresolved as fulltext_reconcile_run
from workers.kosha_collector import run as kosha_collector_run
from workers.queue_consumer import consume_queue, consume_markdown_queue
from workers.study_queue_consumer import consume_study_queue
from workers.study_session_queue_consumer import consume_study_session_queue
@@ -121,9 +123,14 @@ async def lifespan(app: FastAPI):
# 이드 W3-2: 공부중 토픽 약점 derived 스냅샷 (nightly 04:30 KST, LLM 0). study_diagnosis 표면 source.
scheduler.add_job(study_weakness_run, CronTrigger(hour=4, minute=30, timezone=KST), id="study_weakness")
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
# crawl-24x7 A-2 안전망: fulltext 영구 실패(3회 소진) 문서를 RSS 요약 기준으로
# 후속 enqueue (silent skip 누적 방지). 03:40 = dedup_reconcile(03:30) 직후 비충돌 슬롯.
scheduler.add_job(fulltext_reconcile_run, CronTrigger(hour=3, minute=40, timezone=KST), id="fulltext_reconcile")
# plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산.
# soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌).
scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile")
# crawl-24x7 C-2: KOSHA 재해사례 diff + GUIDE 점진 백필 (daily, 새벽 잡들과 비충돌 슬롯).
scheduler.add_job(kosha_collector_run, CronTrigger(hour=6, minute=40, timezone=KST), id="kosha_collector")
scheduler.start()
# Phase 2.1 (async 구조): QueryAnalyzer prewarm.
+1 -1
View File
@@ -118,7 +118,7 @@ class Document(Base):
source_channel: Mapped[str | None] = mapped_column(
Enum("law_monitor", "devonagent", "email", "web_clip",
"tksafety", "inbox_route", "manual", "drive_sync", "news", "memo",
"voice", "hermes",
"voice", "hermes", "crawl",
name="source_channel")
)
# 외부 채널 (Hermes Discord 등) 의 channel/user/message_id/timestamp 메타.
+31 -1
View File
@@ -2,7 +2,8 @@
from datetime import datetime
from sqlalchemy import Boolean, DateTime, String, Text
from sqlalchemy import Boolean, DateTime, Enum, Integer, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
@@ -23,3 +24,32 @@ class NewsSource(Base):
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
)
# ── A-3 (plan crawl-24x7-1) 레지스트리 증축 — migration 319 ──
# fetch_method: rss / rss+page / sitemap+page / page / api / signal-only
fetch_method: Mapped[str] = mapped_column(String(20), default="rss")
# fulltext_policy: none(현행) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
fulltext_policy: Mapped[str] = mapped_column(String(20), default="none")
# NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터 슬롯)
auth_profile: Mapped[str | None] = mapped_column(String(50))
# 소스별 차등 폴링 (NULL=전역 6h 사이클)
poll_interval_minutes: Mapped[int | None] = mapped_column(Integer)
# 조건부 GET 워터마크 — 서버가 준 값 그대로 저장·재전송 (A-1)
etag: Mapped[str | None] = mapped_column(Text)
last_modified: Mapped[str | None] = mapped_column(Text)
# CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행 (A-1)
feed_content_hash: Mapped[str | None] = mapped_column(String(64))
# 추출 실패 잦은 소스의 site-specific CSS selector (A-2)
selector_override: Mapped[dict | None] = mapped_column(JSONB)
# rdf / table-strip / gn-redirect / skip-video 등 파서 특이 케이스 (B-5)
parser_quirk: Mapped[str | None] = mapped_column(String(30))
# 채널 — 'news'(다이제스트/브리핑 대상) / 'crawl'(도메인 재료, 0-5 (a)) — migration 324.
# documents.source_channel 로 전파, crawl 채널은 embed/chunk 30일 게이트 미적용.
# documents 와 동일 PG enum 재사용 (Document 모델과 값 목록 동기 유지).
source_channel: Mapped[str] = mapped_column(
Enum("law_monitor", "devonagent", "email", "web_clip",
"tksafety", "inbox_route", "manual", "drive_sync", "news", "memo",
"voice", "hermes", "crawl",
name="source_channel"),
default="news",
)
+2 -1
View File
@@ -18,10 +18,11 @@ class ProcessingQueue(Base):
stage: Mapped[str] = mapped_column(
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
# 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격.
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
Enum(
"extract", "classify", "summarize", "embed", "chunk", "preview",
"stt", "thumbnail", "deep_summary", "markdown",
"stt", "thumbnail", "deep_summary", "markdown", "fulltext",
name="process_stage",
create_type=False,
),
+44
View File
@@ -0,0 +1,44 @@
"""source_health 테이블 ORM (A-5, plan crawl-24x7-1)
news_sources 와 1:1. 소스별 fetch 성공/실패 기록 + circuit breaker 상태.
silent skip 누적 방지의 가시성 기반 — A-8 헬스 패널이 읽는다.
"""
from datetime import datetime
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class SourceHealth(Base):
__tablename__ = "source_health"
id: Mapped[int] = mapped_column(primary_key=True)
source_id: Mapped[int] = mapped_column(
Integer, ForeignKey("news_sources.id", ondelete="CASCADE"), nullable=False
)
consecutive_failures: Mapped[int] = mapped_column(Integer, default=0)
total_fetches: Mapped[int] = mapped_column(BigInteger, default=0)
total_failures: Mapped[int] = mapped_column(BigInteger, default=0)
last_success_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_error: Mapped[str | None] = mapped_column(Text)
last_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_fetch_items: Mapped[int | None] = mapped_column(Integer)
# 200 인데 entries 0 인 연속 fetch 횟수 (304/해시동일은 미집계 — 피드 부패 신호 전용)
empty_streak: Mapped[int] = mapped_column(Integer, default=0)
# closed(정상) / open(연속 실패 → 지수 backoff) / disabled(임계 초과, 수동 복구 대상)
circuit_state: Mapped[str] = mapped_column(String(10), default="closed")
circuit_opened_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
)
# ── B-3 구독 세션 상태 계약 — migration 325 ──
# 쓰기 1종 플래그: A-8 버튼이 기록만, 어댑터가 소비(수동 half-open).
# 소비 위치 = open-스킵 분기보다 앞 (r5 함정 고정 — 데드 버튼 방지).
relogin_requested: Mapped[bool] = mapped_column(Boolean, default=False)
# 내용 기반 probe 결과 (시간 기반 만료 판정 금지 — 페이월 안내문 silent corruption 차단)
last_probe_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_probe_ok: Mapped[bool | None] = mapped_column(Boolean)
+5 -2
View File
@@ -17,10 +17,13 @@ python-multipart>=0.0.9
jinja2>=3.1.0
feedparser>=6.0.0
pymupdf>=1.24.0
# Web/Blog ingest (devonagent 트랙) — HTML 본문 정화 4-tier fallback
trafilatura>=1.12.0
# Web/Blog ingest (devonagent 트랙) + 뉴스 fulltext 승격 (crawl-24x7 A-2) — 4-tier fallback.
# trafilatura 는 단일 메인테이너 리스크로 exact pin (A-2 결정).
trafilatura==2.1.0
readability-lxml>=0.8.1
markdownify>=0.13.1
# tier-4 (bs4) 가 직접 import — 전이 의존 가정 제거 (crawl-24x7 A-2)
beautifulsoup4>=4.12.0
# office OOXML(docx/xlsx/pptx) → md (plan ds-s1-backend-1 C-1).
# 정확한 핀은 E-1 markitdown OOXML PoC(devsbx/버전핀 컨텍스트)에서 확정.
markitdown[docx,xlsx,pptx]>=0.1.0
+320
View File
@@ -0,0 +1,320 @@
"""fulltext 승격 워커 (A-2 + A-7, plan crawl-24x7-1)
news_collector 가 fulltext_policy='page' 소스의 기사에 enqueue 한 'fulltext' stage 를 소비:
기사 페이지 politeness fetch (A-4) → 원본 HTML NAS gzip 보존 (A-7)
→ extract_worker 4-tier 재사용 (tier 2 sibling .md 는 디스크 원본이 없어 비적용)
→ extracted_text/md_content 승격 → summarize + (30일 게이트) embed/chunk enqueue.
실패 처리 (큐 어휘 = DB enum, 분기만 워커):
- 일시 오류 (5xx/timeout) : raise → 큐 재시도 (max_attempts 3)
- 차단/비대상 (403/429/robots/비HTML/추출부족): RSS 요약으로 격하(degrade) 후 완료
→ summarize/embed/chunk enqueue 보장 (기사 유실 0). 격하 사유는 extract_meta.fulltext 에 기록.
- 영구 실패 (3회 소진) : 야간 reconcile_unresolved() 가 summarize 안전망 enqueue
([[feedback_silent_skip_accumulation]] — 조건부 skip 이 영구 침묵으로 누적되지 않게).
승격 게이트: 전 tier 공통 본문 >= 200자 (devonagent 와 달리 tier 4 도 게이트 적용 —
페이월/오류 페이지의 nav 찌꺼기를 본문으로 승격하느니 RSS 요약 격하가 낫다).
"""
import gzip
import hashlib
import re
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy import exists, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import aliased
from core.config import settings
from core.crawl_politeness import (
CrawlBlocked,
CrawlFetchError,
CrawlSkip,
fetch_page,
fetch_page_via_browser,
probe_session,
)
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import ProcessingQueue, enqueue_stage
from workers.extract_worker import (
_WEB_MIN_BODY_LEN,
_extract_web_with_bs4,
_extract_web_with_readability,
_extract_web_with_trafilatura,
)
logger = setup_logger("fulltext_worker")
# 한국 기사 푸터 1층 후처리 (A-2) — 보수적으로 라인 단위만 제거
_FOOTER_PATTERNS = [
re.compile(r"^.{0,120}(무단\s*전재|무단\s*복제|재배포\s*금지|저작권자\s*[ⓒ©(]).*$", re.M),
re.compile(r"^[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\s*$", re.M), # 단독 이메일 라인
re.compile(r"^\s*\S{2,4}\s*기자\s*$", re.M), # 단독 '◯◯◯ 기자' 라인
]
def _strip_article_footer(body: str) -> str:
for pat in _FOOTER_PATTERNS:
body = pat.sub("", body)
return re.sub(r"\n{3,}", "\n\n", body).strip()
def _extract_body(html_text: str) -> tuple[str, str | None, str | None]:
"""(body, engine, engine_version). 전 tier >= 200자 게이트, 미달이면 ("", None, None)."""
body, ver = _extract_web_with_trafilatura(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "trafilatura", ver
body, ver = _extract_web_with_readability(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "readability", ver
body, ver = _extract_web_with_bs4(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "bs4_text", ver
return "", None, None
def _raw_html_path(source_id: int | None, file_hash: str, now: datetime) -> Path:
"""A-7 원본 보존 경로 — NAS 본진. 한글 디렉토리의 NFC/NFD 비대칭을 피해 source_id 사용.
file_hash 는 DB 컬럼이 character(64) 라 32자 해시가 공백 패딩되어 돌아옴 — strip 필수
(미적용 시 NAS 파일명에 공백 32개 = 쉘/rsync 함정).
"""
src_dir = f"src_{source_id}" if source_id is not None else "src_unknown"
return (
Path(settings.nas_mount_path) / "crawl_raw" / src_dir
/ now.strftime("%Y-%m") / f"{file_hash.strip()}.html.gz"
)
def _save_raw_html(path: Path, html_text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(path, "wb") as f:
f.write(html_text.encode("utf-8", errors="replace"))
async def _enqueue_downstream(session: AsyncSession, doc: Document) -> None:
"""승격/격하 공통 후속 — summarize 무조건 + 30일 게이트 통과 시 embed/chunk."""
await enqueue_stage(session, doc.id, "summarize")
published_raw = (doc.extract_meta or {}).get("published_at")
if doc.source_channel == "crawl":
# 도메인 재료 코퍼스 — 발행일 무관 전량 색인 (30일 게이트는 뉴스 전용)
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
return
days_old = 0
if published_raw:
try:
pub_dt = datetime.fromisoformat(published_raw)
days_old = (datetime.now(timezone.utc) - pub_dt).days
except ValueError:
days_old = 0 # 파싱 불가 = 신규 취급 (수집 시점 기본과 동일)
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
def _set_fulltext_meta(doc: Document, **fields) -> None:
"""extract_meta.fulltext 갱신 — JSONB 변경 감지를 위해 dict 재할당."""
meta = dict(doc.extract_meta or {})
meta["fulltext"] = {**meta.get("fulltext", {}), **fields}
doc.extract_meta = meta
_PROBE_TTL_SECONDS = 6 * 3600 # probe 유효 시간 — 만료 시 배치 경계에서 재검증
async def _auth_session_ready(session: AsyncSession, source: NewsSource) -> tuple[bool, str]:
"""B-3 ② 내용 기반 probe 게이트 + relogin_requested 소비 (수동 half-open).
플래그 소비는 '불가용 스킵' 분기보다 앞 — 어댑터 틱마다 도달 (r5 데드 버튼 함정 고정).
probe 실패 상태에서는 auth fetch 0회 (자동 재시도 루프 = 계정 잠금 직행 — B-3 ③).
복구 경로 = storage_state 갱신 후 relogin_requested 플래그 set (수동).
probe 설정은 source.selector_override JSONB: probe_url / min_body_chars / paywall_markers.
"""
from workers.news_collector import _get_or_create_health
health = await _get_or_create_health(session, source.id)
now = datetime.now(timezone.utc)
cfg = source.selector_override or {}
probe_url = cfg.get("probe_url")
force = False
if health.relogin_requested:
health.relogin_requested = False # 소비 = 1회 half-open 시도
health.updated_at = now
force = True
logger.info(f"[fulltext/auth] {source.name} relogin_requested 소비 — half-open probe")
if not force:
if health.last_probe_ok is False:
return False, "probe 실패 상태 (storage_state 갱신 + relogin_requested 대기)"
if (
health.last_probe_ok
and health.last_probe_at
and (now - health.last_probe_at).total_seconds() < _PROBE_TTL_SECONDS
):
return True, ""
if not probe_url:
return False, "selector_override.probe_url 미설정"
result = await probe_session(
source.auth_profile,
probe_url,
int(cfg.get("min_body_chars", 800)),
list(cfg.get("paywall_markers", [])),
)
health.last_probe_at = now
health.last_probe_ok = bool(result.get("ok"))
health.updated_at = now
if not health.last_probe_ok:
logger.warning(f"[fulltext/auth] {source.name} probe 실패: {result.get('reason')}")
return False, str(result.get("reason"))
logger.info(f"[fulltext/auth] {source.name} probe OK ({result.get('body_chars')}자)")
return True, ""
async def _degrade(session: AsyncSession, doc: Document, reason: str) -> None:
"""본문 승격 실패 — RSS 요약 그대로 후속 단계 진행 (기사 유실 0)."""
_set_fulltext_meta(
doc, status="degraded", reason=reason[:300],
resolved_at=datetime.now(timezone.utc).isoformat(),
)
await _enqueue_downstream(session, doc)
logger.warning(f"[fulltext] doc={doc.id} 격하(RSS 요약 유지): {reason}")
async def process(document_id: int, session: AsyncSession) -> None:
"""기사 1건 풀텍스트 승격. queue_consumer 컨벤션 시그니처 (커밋은 consumer 가)."""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
if not doc.edit_url:
await _degrade(session, doc, "edit_url 없음")
return
meta = doc.extract_meta or {}
source_id = meta.get("source_id")
# B-3: 구독 소스(auth_profile)는 Playwright 세션 fetch — probe 게이트 선행
source = await session.get(NewsSource, source_id) if source_id else None
auth_profile = source.auth_profile if source is not None else None
if auth_profile:
ready, why = await _auth_session_ready(session, source)
if not ready:
await _degrade(session, doc, f"구독 세션 불가용: {why}")
return
try:
if auth_profile:
html_text, final_url = await fetch_page_via_browser(doc.edit_url, auth_profile)
else:
html_text, final_url = await fetch_page(doc.edit_url)
except (CrawlBlocked, CrawlSkip) as e:
await _degrade(session, doc, f"{type(e).__name__}: {e}")
return
except CrawlFetchError:
raise # 일시 오류 — 큐 재시도
now = datetime.now(timezone.utc)
# A-7: 원본 HTML 보존 (추출기 교체 시 전체 재추출 가능 상태 유지)
raw_path = _raw_html_path(source_id, doc.file_hash, now)
try:
_save_raw_html(raw_path, html_text)
raw_saved = True
except OSError as e:
# NAS 일시 장애 시 보존만 누락하고 승격은 진행 — 사유 기록 (silent 누락 회피)
raw_saved = False
logger.error(f"[fulltext] doc={doc.id} 원본 보존 실패 (승격은 진행): {e}")
body, engine, engine_ver = _extract_body(html_text)
if not engine:
await _degrade(session, doc, f"추출 실패 (전 tier < {_WEB_MIN_BODY_LEN}자)")
return
clean_body = _strip_article_footer(body.replace("\x00", ""))
if len(clean_body) < _WEB_MIN_BODY_LEN:
await _degrade(session, doc, "푸터 제거 후 본문 부족")
return
# B-3: 추출 결과도 페이월 마커로 게이트 — probe 통과 후 만료된 세션의
# '페이월 안내문' 본문 승격(silent corruption) 차단 + 즉시 probe 상태 강등
if auth_profile:
from workers.news_collector import _get_or_create_health
markers = (source.selector_override or {}).get("paywall_markers", [])
hit = next((m for m in markers if m and m.lower() in clean_body.lower()), None)
if hit:
health = await _get_or_create_health(session, source.id)
health.last_probe_ok = False
health.updated_at = datetime.now(timezone.utc)
await _degrade(session, doc, f"본문 페이월 마커 검출({hit}) — 세션 손상 의심")
return
title = doc.title or ""
doc.extracted_text = f"{title}\n\n{clean_body}" if title else clean_body
doc.extracted_at = now
doc.extractor_version = f"rss+page@{engine}"
doc.md_content = clean_body
doc.md_status = "success"
doc.md_extraction_engine = engine
doc.md_extraction_engine_version = engine_ver
doc.md_format_version = "1.0"
doc.md_generated_at = now
doc.md_source_hash = hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest()
doc.md_content_hash = hashlib.sha256(clean_body.encode("utf-8")).hexdigest()
doc.md_extraction_error = None # 수집 시점의 '변환 비대상' 마커 해제
doc.content_origin = "extracted"
doc.file_size = len(doc.extracted_text.encode())
_set_fulltext_meta(
doc, status="promoted", engine=engine,
raw_html_path=str(raw_path) if raw_saved else None,
final_url=final_url, body_chars=len(clean_body),
resolved_at=now.isoformat(),
)
await _enqueue_downstream(session, doc)
logger.info(
f"[fulltext/{engine}] doc={doc.id} {len(clean_body)}자 승격 "
f"(raw={'saved' if raw_saved else 'MISSING'})"
)
async def reconcile_unresolved() -> None:
"""안전망 (야간 1회): fulltext 영구 실패(3회 소진)로 summarize 가 영영 안 잡힌
뉴스 문서에 RSS 요약 기준 후속 단계를 enqueue. 멱등 — enqueue 후엔 조건 불일치."""
async with async_session() as session:
# 외부 쿼리 FROM 에 ProcessingQueue 가 이미 있어 alias 없이는 auto-correlation 이
# 서브쿼리 FROM 을 전부 제거 → InvalidRequestError (queue_consumer.reset_stale_items 패턴)
pq = aliased(ProcessingQueue)
summarize_q = (
select(pq.id)
.where(
pq.document_id == Document.id,
pq.stage == "summarize",
)
)
result = await session.execute(
select(Document)
.join(ProcessingQueue, ProcessingQueue.document_id == Document.id)
.where(
ProcessingQueue.stage == "fulltext",
ProcessingQueue.status == "failed",
Document.source_channel == "news",
~exists(summarize_q),
)
.limit(200)
)
docs = result.scalars().unique().all()
for doc in docs:
_set_fulltext_meta(doc, status="failed_reconciled")
await _enqueue_downstream(session, doc)
if docs:
await session.commit()
logger.warning(f"[fulltext] reconcile: 영구 실패 {len(docs)}건 RSS 요약으로 후속 enqueue")
+351
View File
@@ -0,0 +1,351 @@
"""C-2 KOSHA Open API 수집 워커 (plan crawl-24x7-1).
3 API (2026-06-10 실키 live 검증 + fixture 박제 — tests/fixtures/kosha_*_response.json):
재해사례 게시판: GET /B552468/disaster_api02/getdisaster_api02 callApiId=1060
재해사례 첨부: GET /B552468/disaster_attach_api02/Disaster_attach_api02 callApiId=1070
KOSHA GUIDE: GET /B552468/koshaguide/getKoshaGuide callApiId=1050
daily 스케줄 1회 (main.py):
재해사례 = 최근 페이지만 diff (boardno dedup) — 사례 본문 Document(텍스트 네이티브)
+ 첨부 PDF/HWP 다운로드 → /documents/crawl_raw/kosha/{boardno}/ 저장
→ 파일 Document + extract enqueue (kordoc HWP/PDF 기존 파이프라인 재사용).
GUIDE = 전체 레지스트리 메타 diff (1039건, 100/page = 11 call) → 신규/개정만,
일일 ingest cap(기본 25) = backlog 자동 점진 백필(~6주) + 부하 평탄화.
cap 으로 미처리 잔량은 매회 로그 (silent cap 금지).
키: KOSHA_API_KEY (credentials.env) — 공공데이터포털 '인코딩' 키를 그대로 저장.
httpx params= 로 넘기면 % 가 재인코딩되므로 반드시 URL 문자열에 직접 결합.
개정 감지: GUIDE dedup 키 = 규정번호+공표일자 — 같은 번호의 새 공표일자 = 신규 문서로 적재.
"""
import asyncio
import hashlib
import os
import random
import re
from datetime import datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy import select
from core.config import settings
from core.crawl_politeness import CRAWL_UA
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from workers.news_collector import (
FeedError,
_get_or_create_health,
_record_failure,
_record_success,
)
logger = setup_logger("kosha_collector")
_BASE = "https://apis.data.go.kr/B552468"
_BOARD_EP = f"{_BASE}/disaster_api02/getdisaster_api02"
_ATTACH_EP = f"{_BASE}/disaster_attach_api02/Disaster_attach_api02"
_GUIDE_EP = f"{_BASE}/koshaguide/getKoshaGuide"
_CASE_SOURCE = "KOSHA 재해사례"
_GUIDE_SOURCE = "KOSHA GUIDE"
_CASE_PAGES = 2 # daily diff 범위 (30×2 = 최근 60건 — 등록일 역순 API)
_CASE_ROWS = 30
_GUIDE_ROWS = 100
_GUIDE_DAILY_CAP = int(os.getenv("KOSHA_GUIDE_DAILY_CAP", "25"))
_MAX_FILE_BYTES = 50 * 1024 * 1024
_DOWNLOAD_DELAY = (2.0, 5.0) # portal.kosha.or.kr 파일서버 — 연속 다운로드 간격
def _api_key() -> str:
key = os.getenv("KOSHA_API_KEY", "")
if not key:
raise FeedError("KOSHA_API_KEY 미설정 — KOSHA 수집 불가")
return key
async def _api_get(url: str) -> dict:
"""공통 GET — 게이트웨이/제공자 이중 에러 체계 검사."""
async with httpx.AsyncClient(timeout=25) as client:
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
if resp.status_code != 200:
raise FeedError(f"KOSHA API {resp.status_code} @ {url.split('?')[0]}")
try:
payload = resp.json()
except ValueError as e:
# 게이트웨이 에러는 XML/plain 으로 옴 (SERVICE_KEY_IS_NOT_REGISTERED 등)
raise FeedError(f"KOSHA API 비-JSON 응답: {resp.text[:120]}") from e
code = (payload.get("header") or {}).get("resultCode")
if code != "00":
raise FeedError(f"KOSHA API resultCode={code}: {(payload.get('header') or {}).get('resultMsg')}")
return payload
def _items(payload: dict) -> list[dict]:
"""body.items.item — 단건이면 dict 로 오는 data.go.kr 관행 방어."""
item = ((payload.get("body") or {}).get("items") or {}).get("item")
if item is None:
return []
return [item] if isinstance(item, dict) else list(item)
def _safe_filename(name: str) -> str:
"""NAS 파일명 정화 — 경로분리자/제어문자/공백연쇄 제거 (쉘 함정 회피)."""
name = re.sub(r"[/\\\x00-\x1f]", "_", name).strip()
name = re.sub(r"\s+", " ", name)
return name[:140] or "unnamed"
async def _download(url: str, dest: Path) -> int:
"""첨부/규정 파일 다운로드 — 크기 cap + 디렉토리 생성 + 연속 간격."""
await asyncio.sleep(random.uniform(*_DOWNLOAD_DELAY))
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
if resp.status_code != 200:
raise FeedError(f"파일 다운로드 {resp.status_code}: {url}")
if len(resp.content) > _MAX_FILE_BYTES:
raise FeedError(f"파일 크기 초과 ({len(resp.content)} bytes): {url}")
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(resp.content)
return len(resp.content)
async def _get_or_create_source(session, name: str, feed_url: str) -> NewsSource:
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
if source is None:
source = NewsSource(
name=name, feed_url=feed_url, feed_type="rss", fetch_method="api",
fulltext_policy="none", source_channel="crawl", category="Safety",
language="ko", country="KR",
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 daily 폴링
)
session.add(source)
await session.flush()
return source
async def _ingest_attachment(session, boardno: str, filenm: str, filepath: str) -> bool:
"""첨부 1건 → NAS 저장 + 파일 Document + extract enqueue. 반환 = 신규 여부."""
safe = _safe_filename(filenm)
rel_path = f"crawl_raw/kosha/{boardno}/{safe}"
existing = await session.execute(
select(Document).where(Document.file_path == rel_path).limit(1)
)
if existing.scalars().first():
return False
dest = Path(settings.nas_mount_path) / rel_path
size = await _download(filepath, dest)
ext = (safe.rsplit(".", 1)[-1].lower() if "." in safe else "bin")[:10]
doc = Document(
file_path=rel_path,
file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(),
file_format=ext,
file_size=size,
file_type="immutable",
title=safe.rsplit(".", 1)[0],
source_channel="crawl",
data_origin="external",
import_source="kosha_api",
edit_url=filepath,
ai_tags=["Safety/KOSHA재해사례/첨부"],
extract_meta={"kosha": {"boardno": boardno, "kind": "case_attachment"}},
)
session.add(doc)
await session.flush()
# extract → (crawl override) classify → embed/chunk — 기존 파일 파이프라인 재사용
await enqueue_stage(session, doc.id, "extract")
logger.info(f"[kosha] 첨부 ingest: {rel_path} ({size} bytes)")
return True
async def collect_disaster_cases(session) -> int:
"""재해사례 daily diff — 최근 _CASE_PAGES 페이지, boardno dedup."""
key = _api_key()
source = await _get_or_create_source(session, _CASE_SOURCE, _BOARD_EP)
new_count = 0
for page in range(1, _CASE_PAGES + 1):
payload = await _api_get(
f"{_BOARD_EP}?serviceKey={key}&callApiId=1060&pageNo={page}&numOfRows={_CASE_ROWS}"
)
items = _items(payload)
if not items:
break
page_all_dup = True
for item in items:
boardno = str(item.get("boardno") or "").strip()
title = (item.get("keyword") or "").strip()
if not boardno or not title:
continue
fhash = hashlib.sha256(f"kosha-case|{boardno}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(Document.file_hash == fhash).limit(1)
)
if existing.scalars().first():
continue
page_all_dup = False
contents = (item.get("contents") or "").strip()
business = (item.get("business") or "").strip()
now = datetime.now(timezone.utc)
doc = Document(
file_path=f"crawl/{_CASE_SOURCE}/{boardno}",
file_hash=fhash,
file_format="article",
file_size=len(contents.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n[{business}]\n{contents}",
extracted_at=now,
extractor_version="kosha_api",
md_status="skipped",
md_extraction_error="kosha case: 텍스트 네이티브, markdown 변환 비대상",
source_channel="crawl",
data_origin="external",
review_status="approved",
ai_domain="Safety",
ai_sub_group=_CASE_SOURCE,
ai_tags=[f"Safety/KOSHA재해사례/{business or '기타'}"],
extract_meta={
"source_id": source.id,
"source_name": _CASE_SOURCE,
"published_at": None,
"kosha": {"boardno": boardno, "business": business,
"atcflcnt": item.get("atcflcnt")},
},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
new_count += 1
# 첨부 (PDF/HWP) — 본문보다 정보량 큰 정식 사례 보고서
if int(item.get("atcflcnt") or 0) > 0:
attach = await _api_get(
f"{_ATTACH_EP}?serviceKey={key}&callApiId=1070"
f"&pageNo=1&numOfRows=10&boardno={boardno}"
)
for att in _items(attach):
filenm = (att.get("filenm") or "").strip()
filepath = (att.get("filepath") or "").strip()
if not filenm or not filepath.startswith("https://"):
continue
try:
await _ingest_attachment(session, boardno, filenm, filepath)
except FeedError as e:
logger.warning(f"[kosha] 첨부 실패 skip ({boardno}/{filenm}): {e}")
if page_all_dup:
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
logger.info(f"[kosha] 재해사례 신규 {new_count}")
return new_count
async def collect_kosha_guide(session, cap: int = _GUIDE_DAILY_CAP) -> int:
"""GUIDE 레지스트리 전체 메타 diff → 신규/개정만 다운로드 (일일 cap 점진 백필)."""
key = _api_key()
await _get_or_create_source(session, _GUIDE_SOURCE, _GUIDE_EP)
new_specs: list[dict] = []
page, total = 1, None
while True:
payload = await _api_get(
f"{_GUIDE_EP}?serviceKey={key}&callApiId=1050&pageNo={page}&numOfRows={_GUIDE_ROWS}"
)
if total is None:
total = int((payload.get("body") or {}).get("totalCount") or 0)
items = _items(payload)
if not items:
break
for item in items:
no = (item.get("techGdlnNo") or "").strip()
ymd = (item.get("techGdlnOfancYmd") or "").strip()
url = (item.get("fileDownloadUrl") or "").strip()
if not no or not url.startswith("https://"):
continue
fhash = hashlib.sha256(f"kosha-guide|{no}|{ymd}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(Document.file_hash == fhash).limit(1)
)
if not existing.scalars().first():
new_specs.append({"no": no, "ymd": ymd, "url": url,
"name": (item.get("techGdlnNm") or no).strip(),
"fhash": fhash})
if page * _GUIDE_ROWS >= total:
break
page += 1
todo, deferred = new_specs[:cap], len(new_specs) - min(len(new_specs), cap)
ingested = 0
for spec in todo:
safe_no = _safe_filename(spec["no"])
rel_path = f"crawl_raw/kosha_guide/{safe_no}-{spec['ymd'] or 'nodate'}.pdf"
dest = Path(settings.nas_mount_path) / rel_path
try:
size = await _download(spec["url"], dest)
except FeedError as e:
logger.warning(f"[kosha] GUIDE 다운로드 실패 skip ({spec['no']}): {e}")
continue
doc = Document(
file_path=rel_path,
file_hash=spec["fhash"],
file_format="pdf",
file_size=size,
file_type="immutable",
title=f"{spec['name']} ({spec['no']})",
source_channel="crawl",
data_origin="external",
import_source="kosha_api",
edit_url=spec["url"],
ai_tags=["Safety/KOSHA GUIDE"],
extract_meta={"kosha": {"kind": "guide", "techGdlnNo": spec["no"],
"ofancYmd": spec["ymd"]}},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
ingested += 1
# silent cap 금지 — 잔량 가시화 (자동 점진 백필: 내일 cap 만큼 또 소화)
logger.info(f"[kosha] GUIDE 신규/개정 {len(new_specs)}건 중 {ingested}건 ingest"
+ (f" (cap {cap}, 잔여 {deferred}건 — 일일 점진 백필)" if deferred > 0 else ""))
return ingested
async def run() -> None:
"""daily 1회 — 소스별 실패 격리 (재해사례 실패가 GUIDE 를 막지 않게)."""
now = datetime.now(timezone.utc)
for name, collector in ((_CASE_SOURCE, collect_disaster_cases),
(_GUIDE_SOURCE, collect_kosha_guide)):
async with async_session() as session:
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
try:
count = await collector(session)
if source is None: # 첫 실행에서 collector 가 생성
result = await session.execute(
select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
health = await _get_or_create_health(session, source.id)
_record_success(health, count, False, now)
await session.commit()
except Exception as e:
logger.error(f"[kosha] {name} 수집 실패: {e}")
await session.rollback() # 부분 적재 폐기 후 health 만 기록
if source is not None:
health = await _get_or_create_health(session, source.id)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
if __name__ == "__main__":
asyncio.run(run())
+436 -60
View File
@@ -1,8 +1,16 @@
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장"""
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장
plan crawl-24x7-1 A그룹 (2026-06-10):
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송) + 콘텐츠 해시 변경감지
A-2 fulltext_policy='page' 소스는 'fulltext' stage 로 본문 승격 위임
A-5 source_health 기록 + circuit breaker (소스별 실패 격리)
A-6 first-wins + 포털 전재 2차 dedup (제목+최근 3일, 12자 이상 제목 한정)
"""
import asyncio
import hashlib
import re
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from html import unescape
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
@@ -10,11 +18,13 @@ import feedparser
import httpx
from sqlalchemy import select
from core.crawl_politeness import CRAWL_UA
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from models.source_health import SourceHealth
logger = setup_logger("news_collector")
@@ -26,6 +36,7 @@ CATEGORY_MAP = {
"환경": "Environment", "기술": "Technology",
# 영어
"World": "International", "International": "International",
"World news": "International", # Guardian sectionName (B-2)
"Technology": "Technology", "Tech": "Technology", "Sci-Tech": "Technology",
"Arts": "Culture", "Culture": "Culture",
"Climate": "Environment", "Environment": "Environment",
@@ -35,21 +46,30 @@ CATEGORY_MAP = {
"Kultur": "Culture", "Wissenschaft": "Technology",
# 프랑스어
"Environnement": "Environment",
# 도메인 채널 (source_channel='crawl', 0-5 (a)) — 양쪽 공통 맵
"안전": "Safety", "Safety": "Safety",
"공학": "Engineering", "Engineering": "Engineering",
"철학": "Philosophy", "Philosophy": "Philosophy",
}
class FeedError(Exception):
"""소스 단위 fetch/parse 실패 — run() 이 source_health 실패로 기록."""
def _normalize_category(raw: str) -> str:
"""카테고리 표준화"""
return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other"))
def _clean_html(text: str) -> str:
"""HTML 태그 제거 + 정제"""
def _clean_html(text: str, max_len: int | None = 1000) -> str:
"""HTML 태그 제거 + 정제. max_len=None 이면 절단 없음 (feed-full 전문용)."""
if not text:
return ""
text = re.sub(r"<[^>]+>", "", text)
text = unescape(text)
return text.strip()[:1000]
text = text.strip()
return text if max_len is None else text[:max_len]
# tracking 파라미터 판별 — prefix(utm_/at_=BBC/ns_=BBC/mc_=mailchimp) + 단독 키
@@ -87,8 +107,104 @@ def _normalize_to_utc(dt) -> datetime:
return datetime.now(timezone.utc)
# ── A-5: circuit breaker 정책 ──
# 연속 실패 >= OPEN 임계 → open (재시도 간격 지수 확대, 6h × 2^n, cap 48h)
# 연속 실패 > DISABLE 임계 → disabled (수집 제외 + 가시 로그, 수동 복구 대상)
# news_sources.enabled 는 건드리지 않는다 — 사용자 의도(enabled)와 자동 상태(circuit) 분리.
_CIRCUIT_OPEN_AFTER = 3
_CIRCUIT_DISABLE_AFTER = 10
_BACKOFF_BASE_HOURS = 6
_BACKOFF_CAP_HOURS = 48
_EMPTY_STREAK_ALERT = 8 # 6h 사이클 × 8 = 약 2일 연속 빈 피드 → 가시 경고
def _should_attempt(health: SourceHealth, now: datetime) -> bool:
"""circuit 상태에 따라 이번 사이클 fetch 여부 결정.
주의 (B-3 계약 ②, r5): 추후 relogin_requested 플래그 소비는 반드시 이
open-스킵 분기보다 *앞*에 두어야 한다 — open 이 스케줄 제외 형태가 되면
배치 경계가 안 와 플래그가 영원히 미소비(half-open 데드 버튼)가 된다.
"""
if health.circuit_state == "disabled":
return False
if health.circuit_state == "open" and health.last_error_at is not None:
over = max(health.consecutive_failures - _CIRCUIT_OPEN_AFTER, 0)
backoff_h = min(_BACKOFF_BASE_HOURS * (2 ** over), _BACKOFF_CAP_HOURS)
if now - health.last_error_at < timedelta(hours=backoff_h):
return False
return True
def _record_success(health: SourceHealth, items: int, not_modified: bool, now: datetime) -> None:
health.consecutive_failures = 0
health.total_fetches += 1
health.last_success_at = now
health.last_fetch_items = items
if health.circuit_state != "closed":
logger.info(f"[health] source={health.source_id} circuit {health.circuit_state}→closed")
health.circuit_state = "closed"
health.circuit_opened_at = None
# 빈 피드 streak: 304/해시동일은 정상 신호라 미집계, 200+entries 0 만 집계 (피드 부패 감시)
if not_modified:
pass
elif items == 0:
health.empty_streak += 1
if health.empty_streak >= _EMPTY_STREAK_ALERT:
logger.error(
f"[health] source={health.source_id} 빈 피드 {health.empty_streak}회 연속 "
f"— 피드 부패 의심 (RSSHub 류 라우트 깨짐 패턴)"
)
else:
health.empty_streak = 0
health.updated_at = now
def _record_failure(health: SourceHealth, error: str, now: datetime) -> None:
health.consecutive_failures += 1
health.total_fetches += 1
health.total_failures += 1
health.last_error = error[:500]
health.last_error_at = now
health.updated_at = now
cf = health.consecutive_failures
if cf > _CIRCUIT_DISABLE_AFTER and health.circuit_state != "disabled":
health.circuit_state = "disabled"
logger.error(
f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit DISABLED "
f"(수집 제외, A-8 패널에서 수동 복구 필요)"
)
elif cf >= _CIRCUIT_OPEN_AFTER and health.circuit_state == "closed":
health.circuit_state = "open"
health.circuit_opened_at = now
logger.warning(f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit open")
async def _get_or_create_health(session, source_id: int) -> SourceHealth:
result = await session.execute(
select(SourceHealth).where(SourceHealth.source_id == source_id)
)
health = result.scalars().first()
if health is None:
health = SourceHealth(source_id=source_id)
session.add(health)
await session.flush()
return health
# 수동 POST /api/news/collect 와 6h 스케줄 사이클의 동시 실행 차단 (단일 프로세스·단일
# 이벤트루프). 동시 진입 시 _get_or_create_health 가 같은 source_id 를 양쪽에서 INSERT
# → uq_source_health_source_id 위반 IntegrityError 로 사이클 전체가 죽는 경합의 원천 봉쇄.
_run_lock = asyncio.Lock()
async def run():
"""뉴스 수집 실행"""
async with _run_lock:
await _run_locked()
async def _run_locked():
now = datetime.now(timezone.utc)
async with async_session() as session:
result = await session.execute(
select(NewsSource).where(NewsSource.enabled == True)
@@ -101,17 +217,23 @@ async def run():
total = 0
for source in sources:
health = await _get_or_create_health(session, source.id)
if not _should_attempt(health, now):
logger.info(f"[{source.name}] circuit {health.circuit_state} — 이번 사이클 skip")
continue
try:
if source.feed_type == "api":
count = await _fetch_api(session, source)
count, status = await _fetch_api(session, source)
else:
count = await _fetch_rss(session, source)
count, status = await _fetch_rss(session, source)
source.last_fetched_at = datetime.now(timezone.utc)
_record_success(health, count, status == "not_modified", now)
total += count
except Exception as e:
logger.error(f"[{source.name}] 수집 실패: {e}")
source.last_fetched_at = datetime.now(timezone.utc)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
logger.info(f"뉴스 수집 완료: {total}건 신규")
@@ -122,8 +244,83 @@ ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml",
"application/xml", "text/xml")
async def _fetch_rss(session, source: NewsSource) -> int:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한"""
async def _is_portal_duplicate(session, title: str) -> bool:
"""A-6 2차 dedup: 포털 전재본 vs 원본이 다른 URL 로 이중 적재되는 케이스.
보조 키 = 제목 + 최근 3일 (다른 소스/다른 URL 이므로 1차 키로 안 잡힘).
범용 제목 오탐 방지: 12자 미만 제목은 비적용. skip 은 전부 로그 (silent 누락 회피).
"""
if len(title) < 12:
return False
cutoff = datetime.now(timezone.utc) - timedelta(days=3)
dup = await session.execute(
select(Document.id).where(
Document.title == title,
Document.source_channel == "news",
Document.file_format == "article",
Document.extracted_at >= cutoff,
).limit(1)
)
return dup.scalars().first() is not None
async def _enqueue_processing(session, doc: Document, source: NewsSource, pub_dt: datetime) -> None:
"""후속 단계 enqueue.
fulltext_policy='page' 소스는 'fulltext' stage 만 — summarize/embed/chunk 는
fulltext_worker 가 승격(또는 격하) 확정 후 enqueue (RSS 요약 선요약 → 풀텍스트
도착 시 summarize_worker 의 '이미 요약 있음 skip' 에 막히는 순서 함정 회피).
"""
if source.fulltext_policy == "page" and doc.edit_url:
await enqueue_stage(session, doc.id, "fulltext")
return
await enqueue_stage(session, doc.id, "summarize")
if source.source_channel == "crawl":
# 도메인 재료 코퍼스 — 발행일 무관 전량 색인 (30일 게이트는 뉴스 전용)
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
return
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
def _build_extract_meta(source: NewsSource, pub_dt: datetime) -> dict:
"""fulltext_worker / 패널이 쓰는 출처 메타 (documents 에 source FK 가 없어 여기 기록)."""
return {
"source_id": source.id,
"source_name": source.name,
"published_at": pub_dt.isoformat(),
}
def _doc_identity(source: NewsSource, source_short: str, category: str) -> dict:
"""채널별 문서 정체성 — news 채널은 기존 값 그대로(무회귀), crawl 채널은 도메인 정체성.
file_path 접두사가 곧 채널 디렉토리. ai_domain 은 다이제스트/검색 필터의 분기 축이라
crawl 채널이 'News' 를 오염시키지 않게 분리 (0-5 채널 레벨 분리 사상).
"""
if source.source_channel == "crawl":
domain = category if category and category != "Other" else "Domain"
return {
"path_prefix": "crawl",
"ai_domain": domain,
"ai_tags": [f"{domain}/{source_short}"],
}
return {
"path_prefix": "news",
"ai_domain": "News",
"ai_tags": [f"News/{source_short}/{category}"],
}
async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1).
반환 (신규 건수, 상태). 상태 'not_modified' = 304 또는 콘텐츠 해시 동일.
소스 단위 실패는 FeedError raise — run() 이 health 실패로 기록.
"""
from urllib.parse import urljoin
from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS
@@ -134,51 +331,79 @@ async def _fetch_rss(session, source: NewsSource) -> int:
# 순수 HTTP 소스인데 allowlist에 없으면 차단
if source.feed_url.startswith("http://") and not http_allowed:
logger.error(f"[{source.name}] HTTP 차단 (allowlist 미등록): {source_hostname}")
return 0
raise FeedError(f"HTTP 차단 (allowlist 미등록): {source_hostname}")
# fetch 전 URL 재검증 (등록 이후 DNS 변경 대비)
try:
validate_feed_url(source.feed_url, allow_http=http_allowed)
except ValueError as e:
logger.error(f"[{source.name}] URL 검증 실패: {e}")
return 0
raise FeedError(f"URL 검증 실패: {e}") from e
async with httpx.AsyncClient(timeout=10, follow_redirects=False) as client:
# A-1: 정직 UA + 조건부 GET — 서버가 준 워터마크를 받은 그대로 재전송
headers = {"User-Agent": CRAWL_UA}
if source.etag:
headers["If-None-Match"] = source.etag
if source.last_modified:
headers["If-Modified-Since"] = source.last_modified
async with httpx.AsyncClient(
timeout=10, follow_redirects=False, headers=headers
) as client:
resp = await client.get(source.feed_url)
# redirect 수동 처리 (최대 3회, 각 target 재검증)
# 304 는 redirect 처리보다 먼저 — httpx 의 is_redirect 는 3xx 전체(304 포함)에
# True 라, 304 를 redirect 로 오인하면 location 없는 같은 URL 을 재요청해
# "redirect 3회 초과" 로 오류 처리됨(조건부 GET 안정 피드 전멸 버그).
if resp.status_code == 304:
logger.info(f"[{source.name}] 304 Not Modified — 본문 미전송")
return 0, "not_modified"
# redirect 수동 처리 (최대 3회, 각 target 재검증) — location 있는 진짜 redirect 만.
# allowlist 도메인이면 redirect target의 HTTP도 허용
redirects = 0
while resp.is_redirect and redirects < 3:
location = resp.headers.get("location", "")
location = urljoin(str(resp.request.url), location)
while resp.has_redirect_location and redirects < 3:
location = urljoin(str(resp.request.url), resp.headers["location"])
try:
validate_feed_url(location, allow_http=http_allowed)
except ValueError as e:
logger.error(f"[{source.name}] redirect target 차단: {e}")
return 0
raise FeedError(f"redirect target 차단: {e}") from e
resp = await client.get(location)
if resp.status_code == 304:
logger.info(f"[{source.name}] 304 Not Modified (redirect 후) — 본문 미전송")
return 0, "not_modified"
redirects += 1
if resp.is_redirect:
logger.error(f"[{source.name}] redirect 3회 초과")
return 0
if resp.has_redirect_location:
raise FeedError("redirect 3회 초과")
resp.raise_for_status()
if len(resp.content) > MAX_RESPONSE_SIZE:
logger.warning(f"[{source.name}] 응답 크기 초과: {len(resp.content)} bytes")
return 0
raise FeedError(f"응답 크기 초과: {len(resp.content)} bytes")
ct = resp.headers.get("content-type", "").lower()
if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
logger.warning(f"[{source.name}] 비정상 content-type: {ct}")
return 0
raise FeedError(f"비정상 content-type: {ct}")
# A-1: 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) — 저장된 해시는 항상
# 파싱 검증을 통과한 응답의 것이므로 동일성 비교는 파싱 전에 안전
new_etag = resp.headers.get("etag")
new_last_modified = resp.headers.get("last-modified")
content_hash = hashlib.sha256(resp.content).hexdigest()
if source.feed_content_hash == content_hash:
logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip")
return 0, "not_modified"
feed = feedparser.parse(resp.text)
if feed.bozo and not feed.entries:
logger.warning(f"[{source.name}] RSS 파싱 실패: {feed.bozo_exception}")
return 0
raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}")
# A-1: 워터마크 영속은 파싱 검증 통과 후에만 — 부패(bozo) 응답의 ETag 를 저장하면
# 이후 304 로 영구 skip 되는 silent corruption 차단
if new_etag:
source.etag = new_etag
if new_last_modified:
source.last_modified = new_last_modified
source.feed_content_hash = content_hash
count = 0
for entry in feed.entries:
@@ -190,7 +415,24 @@ async def _fetch_rss(session, source: NewsSource) -> int:
if not summary:
summary = title
# A-6: feed-full 소스만 피드 본문을 전문으로 신뢰 (truncate·광고 삽입이 흔해
# 일반 소스의 summary/content:encoded 를 전문으로 오인 저장 금지)
body = summary
is_feed_full = False
if source.fulltext_policy == "feed-full":
content_list = entry.get("content") or []
raw_body = content_list[0].get("value", "") if content_list else ""
full_body = _clean_html(raw_body or entry.get("summary", ""), max_len=None)
if len(full_body) > len(summary):
body = full_body
is_feed_full = True
link = entry.get("link", "")
# B-5 quirk: 비디오 항목 필터 (Aeon/Psyche — 텍스트 코퍼스에 비디오 페이지 무가치)
if source.parser_quirk == "skip-video" and re.search(r"/videos?/", link):
continue
published = entry.get("published_parsed") or entry.get("updated_parsed")
pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc)
@@ -209,56 +451,190 @@ async def _fetch_rss(session, source: NewsSource) -> int:
if existing.scalars().first():
continue
# A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본)
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(source.category or "")
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
ident = _doc_identity(source, source_short, category)
doc = Document(
file_path=f"news/{source.name}/{article_id}",
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{summary}",
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version="rss",
extractor_version="rss-feed-full" if is_feed_full else "rss",
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
# fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신.
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel="news",
source_channel=source.source_channel,
data_origin="external",
# 조회와 동일하게 정규화해 저장 — raw(tracking param 포함) 저장 시 URL dedup 무력화
edit_url=normalized_url,
review_status="approved",
ai_domain="News",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"],
ai_tags=ident["ai_tags"],
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
# summarize + embed + chunk 등록 (classify 불필요)
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
# summarize + embed + chunk 등록 (classify 불필요).
# page 정책 소스는 fulltext 만 — 후속은 fulltext_worker 가 확정 후 enqueue.
await _enqueue_processing(session, doc, source, pub_dt)
count += 1
logger.info(f"[{source.name}] RSS → {count}건 수집")
return count
return count, "ok"
async def _fetch_api(session, source: NewsSource) -> int:
async def _fetch_api(session, source: NewsSource) -> tuple[int, str]:
"""API 소스 디스패치 — feed_url 호스트로 제공자 판별 (B-2).
레거시 NYT 행(feed_url=api.nytimes.com)은 무변경 경로. 신규 제공자는 호스트 분기 추가.
미지의 호스트 = NYT 경로로 넘기지 않고 명시 실패 (silent fallback 금지).
"""
host = (urlparse(source.feed_url).hostname or "").lower()
if host.endswith("guardianapis.com"):
return await _fetch_api_guardian(session, source)
if host.endswith("nytimes.com"):
return await _fetch_api_nyt(session, source)
raise FeedError(f"API 제공자 미등록 호스트: {host} — 디스패치 분기 추가 필요")
def _guardian_request(feed_url: str, api_key: str) -> tuple[str, dict]:
"""Guardian 호출 형태 단일 source-of-truth — fixture 회귀 테스트 대상
(tests/fixtures/guardian_open_platform_search_response.json 박제 시 호출과 동일해야 함)."""
parsed = urlparse(feed_url)
params = {
**dict(parse_qsl(parsed.query)),
"show-fields": "bodyText,trailText",
"page-size": "20",
"order-by": "newest",
"api-key": api_key,
}
return f"{parsed.scheme}://{parsed.netloc}{parsed.path}", params
async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]:
"""Guardian Open Platform 수집 (B-2) — show-fields=bodyText 로 정식 전문 JSON.
feed_url 에 section 쿼리를 박아 등록 (예: https://content.guardianapis.com/search?section=world).
전문이 API 로 오므로 fulltext stage 불요. 키 미설정 = FeedError (health 실패 기록,
silent fallback 없음 — [[feedback_no_silent_fallback_explicit_opt_in]]).
"""
import os
api_key = os.getenv("GUARDIAN_API_KEY", "")
if not api_key:
raise FeedError("GUARDIAN_API_KEY 미설정 — Guardian 수집 불가")
endpoint, params = _guardian_request(source.feed_url, api_key)
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(endpoint, params=params)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
# 쿼리스트링(api-key 포함) 제거 — path 까지만 로깅 (NYT 와 동일 규율)
safe_url = str(e.request.url).split("?")[0]
raise FeedError(f"Guardian API 실패: {e.response.status_code} @ {safe_url}") from e
except httpx.RequestError as e:
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
raise FeedError(f"Guardian API 연결 실패: {safe_url}") from e
payload = resp.json().get("response", {})
if payload.get("status") != "ok":
raise FeedError(f"Guardian API status={payload.get('status')}")
count = 0
for item in payload.get("results", []):
title = (item.get("webTitle") or "").strip()
if not title:
continue
fields = item.get("fields") or {}
body_text = (fields.get("bodyText") or "").strip()
trail = _clean_html(fields.get("trailText") or "")
# bodyText = plain text 전문 (HTML 정화 불요). 짧으면(라이브 블로그 잔재 등) trail 격하.
is_full = len(body_text) >= 200
body = body_text if is_full else (trail or title)
link = item.get("webUrl", "")
pub_str = item.get("webPublicationDate", "")
try:
pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00"))
except (ValueError, AttributeError):
pub_dt = datetime.now(timezone.utc)
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
continue
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(item.get("sectionName", source.category or ""))
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version="guardian_api_full" if is_full else "guardian_api",
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
await _enqueue_processing(session, doc, source, pub_dt)
count += 1
logger.info(f"[{source.name}] API → {count}건 수집")
return count, "ok"
async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]:
"""NYT API 수집 — 키 마스킹 + health degradation"""
import os
nyt_key = os.getenv("NYT_API_KEY", "")
if not nyt_key:
logger.error("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
return 0
raise FeedError("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
try:
async with httpx.AsyncClient(timeout=10) as client:
@@ -270,12 +646,10 @@ async def _fetch_api(session, source: NewsSource) -> int:
except httpx.HTTPStatusError as e:
# 쿼리스트링(api-key 포함) 제거 — path까지만 로깅
safe_url = str(e.request.url).split("?")[0]
logger.error(f"NYT API 실패: {e.response.status_code} @ {safe_url}")
return 0
raise FeedError(f"NYT API 실패: {e.response.status_code} @ {safe_url}") from e
except httpx.RequestError as e:
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
logger.error(f"NYT API 연결 실패: {safe_url}")
return 0
raise FeedError(f"NYT API 연결 실패: {safe_url}") from e
data = resp.json()
count = 0
@@ -309,11 +683,16 @@ async def _fetch_api(session, source: NewsSource) -> int:
if existing.scalars().first():
continue
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(article.get("section", source.category or ""))
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = Document(
file_path=f"news/{source.name}/{article_id}",
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
@@ -327,24 +706,21 @@ async def _fetch_api(session, source: NewsSource) -> int:
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel="news",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain="News",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"],
ai_tags=ident["ai_tags"],
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
await _enqueue_processing(session, doc, source, pub_dt)
count += 1
logger.info(f"[{source.name}] API → {count}건 수집")
return count
return count, "ok"
+12 -2
View File
@@ -22,8 +22,11 @@ logger = setup_logger("queue_consumer")
# stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1}
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3}
STALE_THRESHOLD_MINUTES = 10
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
@@ -35,7 +38,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [
"extract", "classify", "summarize", "embed", "chunk",
"preview", "stt", "thumbnail", "deep_summary",
"preview", "stt", "thumbnail", "deep_summary", "fulltext",
]
MARKDOWN_QUEUE_STAGES = ["markdown"]
@@ -137,6 +140,9 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
# source_channel-aware override (extract stage 만). source_channel 누락 시 _default.
extract_override_by_channel = {
"devonagent": ["embed", "chunk"],
# crawl 채널 파일형 (KOSHA 첨부/GUIDE PDF 등): preview 사전 캐시 스킵 —
# 재료 코퍼스 대량 백필이 preview 큐를 점령하지 않게. classify → embed/chunk/markdown 유지.
"crawl": ["classify"],
}
next_stages = {
@@ -179,6 +185,7 @@ def _load_workers():
from workers.summarize_worker import process as summarize_process
from workers.thumbnail_worker import process as thumbnail_process
from workers.marker_worker import process as marker_process
from workers.fulltext_worker import process as fulltext_process
return {
"extract": extract_process,
@@ -195,6 +202,9 @@ def _load_workers():
# Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립).
# consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록).
"markdown": marker_process,
# crawl-24x7 A-2: 기사 페이지 fetch → 4-tier 본문 승격. 후속(summarize/embed/chunk)은
# 워커가 직접 enqueue — next_stages dict 미등록 (enqueue_next_stage no-op).
"fulltext": fulltext_process,
}
+265
View File
@@ -0,0 +1,265 @@
"""C-3 공학 정적 코퍼스 1회 일괄 ingest (plan crawl-24x7-1).
National Board 기술 아티클(~86, ASP.NET 구식 — 기사 앵커가 싱글쿼트 href) +
TWI Job Knowledge(~153, sitemap 기반). 지속 크롤링이 아니라 아카이브 일괄 +
저빈도 증분 유형 — 스케줄러 미등록, 수동 CLI:
docker exec hyungi_document_server-fastapi-1 \
python -m workers.static_corpus_ingest --corpus all --limit 3 # 검증용
docker exec -d hyungi_document_server-fastapi-1 \
python -m workers.static_corpus_ingest --corpus all # 전체 (~45분)
※ -d 백그라운드 실행 시 중단은 host pkill 이 아니라 컨테이너 내부 PID kill
([[feedback_docker_exec_orphan_kill]]).
멱등: edit_url(정규화)+file_hash dedup — 재실행 = 신규분만 (그대로 monthly 증분 절차).
politeness: fetch_page 재사용 (per-domain 1 + 5~15s jitter + robots).
원본 보존·승격 필드: fulltext_worker 와 동일 규약 (재추출 가능 상태 유지).
실패는 degrade 없이 skip + 말미 목록 출력 (정적 코퍼스 — RSS 요약 같은 격하 대상 부재).
"""
import argparse
import asyncio
import hashlib
import re
from datetime import datetime, timezone
from html import unescape
from sqlalchemy import select
from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from workers.fulltext_worker import (
_WEB_MIN_BODY_LEN,
_extract_body,
_raw_html_path,
_save_raw_html,
_strip_article_footer,
)
from workers.news_collector import _article_hash, _normalize_url
logger = setup_logger("static_corpus")
_NB_LISTING = "https://www.nationalboard.org/Index.aspx?pageID=164"
_TWI_SITEMAP = "https://www.twi-global.com/sitemap.xml"
async def _discover_national_board() -> list[str]:
"""목록 페이지의 기사 앵커 — 싱글쿼트 href 가 기본형이라 양쪽 인용부호 매칭."""
html_text, _ = await fetch_page(_NB_LISTING)
ids = sorted(
{int(i) for i in re.findall(
r"href=['\"]/?Index\.aspx\?pageID=164&(?:amp;)?ID=(\d+)['\"]", html_text)}
)
return [f"https://www.nationalboard.org/Index.aspx?pageID=164&ID={i}" for i in ids]
async def _discover_twi() -> list[str]:
"""sitemap 에서 job-knowledge 시리즈만 (faqs/published-papers 는 향후 증분 후보)."""
xml_text, _ = await fetch_page(
_TWI_SITEMAP,
content_types=("text/xml", "application/xml", "text/html"),
)
urls = re.findall(
r"<loc>(https://www\.twi-global\.com/technical-knowledge/job-knowledge/[^<]+)</loc>",
xml_text,
)
return sorted({u for u in urls if not u.rstrip("/").endswith("job-knowledge")})
CORPORA = {
"national-board": {
"source_name": "National Board 기술 아티클",
"listing_url": _NB_LISTING,
"discover": _discover_national_board,
"fetch_method": "page",
},
"twi": {
"source_name": "TWI Job Knowledge",
"listing_url": _TWI_SITEMAP,
"discover": _discover_twi,
"fetch_method": "sitemap+page",
},
}
async def _get_or_create_source(session, spec: dict) -> NewsSource:
"""레지스트리 행 — 출처 추적 + crawl_raw src_{id} 경로 + A-8 패널 가시성.
enabled=False: 6h 뉴스 사이클 비대상 (피드가 없는 정적 코퍼스 — 증분은 본 CLI 재실행).
"""
result = await session.execute(
select(NewsSource).where(NewsSource.name == spec["source_name"])
)
source = result.scalars().first()
if source is None:
source = NewsSource(
name=spec["source_name"],
feed_url=spec["listing_url"],
feed_type="rss",
fetch_method=spec["fetch_method"],
fulltext_policy="none",
source_channel="crawl",
category="Engineering",
language="en",
country="US" if "national" in spec["source_name"].lower() else "GB",
enabled=False,
)
session.add(source)
await session.flush()
return source
def _page_title(html_text: str, fallback: str) -> str:
m = re.search(r'<meta\s+property="og:title"\s+content="([^"]+)"', html_text)
if not m:
m = re.search(r"<title[^>]*>([^<]+)</title>", html_text, re.I)
title = unescape(m.group(1)).strip() if m else ""
# 사이트 접미 잡음 제거 (TWI 는 ' - TWI', NB 는 'National Board ...' 꼬리표)
title = re.sub(r"\s*[-|·]\s*(TWI|National Board[^-|]*)\s*$", "", title).strip()
return title or fallback
async def _ingest_one(session, source: NewsSource, url: str) -> str:
"""기사 1건. 반환: 'ok' / 'dup' / 'skip'(추출부족·차단)."""
normalized_url = _normalize_url(url)
existing = await session.execute(
select(Document).where(Document.edit_url.in_([normalized_url, url])).limit(1)
)
if existing.scalars().first():
return "dup"
try:
html_text, final_url = await fetch_page(url)
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
logger.warning(f"[{source.name}] fetch 실패 skip: {url}{type(e).__name__}: {e}")
return "skip"
body, engine, engine_ver = _extract_body(html_text)
if not engine:
logger.warning(f"[{source.name}] 추출 실패 skip (< {_WEB_MIN_BODY_LEN}자): {url}")
return "skip"
clean_body = _strip_article_footer(body.replace("\x00", ""))
if len(clean_body) < _WEB_MIN_BODY_LEN:
logger.warning(f"[{source.name}] 푸터 제거 후 본문 부족 skip: {url}")
return "skip"
title = _page_title(html_text, fallback=url.rsplit("/", 1)[-1][:90])
article_id = _article_hash(title, "static", source.name)
dup2 = await session.execute(
select(Document).where(Document.file_hash == article_id).limit(1)
)
if dup2.scalars().first():
return "dup"
now = datetime.now(timezone.utc)
raw_path = _raw_html_path(source.id, article_id, now)
raw_saved = True
try:
_save_raw_html(raw_path, html_text)
except OSError as e:
raw_saved = False
logger.error(f"[{source.name}] 원본 보존 실패 (ingest 는 진행): {e}")
doc = Document(
file_path=f"crawl/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=0, # 아래 extracted_text 확정 후 재계산
file_type="note",
title=title,
extracted_text=f"{title}\n\n{clean_body}",
extracted_at=now,
extractor_version=f"static+page@{engine}",
md_content=clean_body,
md_status="success",
md_extraction_engine=engine,
md_extraction_engine_version=engine_ver,
md_format_version="1.0",
md_generated_at=now,
md_source_hash=hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest(),
md_content_hash=hashlib.sha256(clean_body.encode("utf-8")).hexdigest(),
content_origin="extracted",
source_channel="crawl",
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain="Engineering",
ai_sub_group=source.name,
ai_tags=[f"Engineering/{source.name}"],
extract_meta={
"source_id": source.id,
"source_name": source.name,
"published_at": None, # 정적 코퍼스 — 페이지 발행일 비신뢰, 색인은 채널 게이트로 무조건
"fulltext": {
"status": "static_corpus",
"engine": engine,
"final_url": final_url,
"raw_html_path": str(raw_path) if raw_saved else None,
"body_chars": len(clean_body),
"resolved_at": now.isoformat(),
},
},
)
doc.file_size = len(doc.extracted_text.encode())
session.add(doc)
await session.flush()
# crawl 채널 = 발행일 무관 전량 색인 (summarize 는 맥미니 큐 — D-4 lag 관찰 대상)
await enqueue_stage(session, doc.id, "summarize")
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
logger.info(f"[{source.name}] ingest {len(clean_body)}자 ({engine}): {title[:60]}")
return "ok"
async def run(corpus: str = "all", limit: int = 0) -> None:
targets = list(CORPORA) if corpus == "all" else [corpus]
for key in targets:
spec = CORPORA[key]
async with async_session() as session:
source = await _get_or_create_source(session, spec)
await session.commit()
source_id = source.id
try:
urls = await spec["discover"]()
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
logger.error(f"[{spec['source_name']}] 목록 수집 실패 — corpus 건너뜀: {e}")
continue
if limit:
urls = urls[:limit]
logger.info(f"[{spec['source_name']}] 대상 {len(urls)}건 (limit={limit or '없음'})")
counts = {"ok": 0, "dup": 0, "skip": 0}
failed: list[str] = []
for i, url in enumerate(urls, 1):
# 커밋 10건 단위 — 장시간 배치 중단 시 진행분 보존
async with async_session() as session:
src = await session.get(NewsSource, source_id)
status = await _ingest_one(session, src, url)
await session.commit()
counts[status] += 1
if status == "skip":
failed.append(url)
if i % 10 == 0:
logger.info(f"[{spec['source_name']}] 진행 {i}/{len(urls)} {counts}")
logger.info(f"[{spec['source_name']}] 완료: {counts}")
if failed:
logger.warning(
f"[{spec['source_name']}] skip {len(failed)}건 — 재시도는 CLI 재실행(멱등):\n "
+ "\n ".join(failed)
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="C-3 정적 코퍼스 일괄 ingest")
parser.add_argument("--corpus", choices=[*CORPORA, "all"], default="all")
parser.add_argument("--limit", type=int, default=0, help="corpus 당 상한 (0=전체)")
args = parser.parse_args()
asyncio.run(run(args.corpus, args.limit))
+38 -3
View File
@@ -64,6 +64,11 @@ services:
environment:
- HF_HOME=/models/huggingface
- TORCH_HOME=/models/torch
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~3.5GB) 해제가 90% 봉투의 전제.
# /ready 는 idle 에서도 200 (fastapi depends_on service_healthy 유지).
# 롤백 = MARKER_PRELOAD=1 + MARKER_IDLE_UNLOAD_MINUTES=0.
- MARKER_PRELOAD=0
- MARKER_IDLE_UNLOAD_MINUTES=${MARKER_IDLE_UNLOAD_MINUTES:-30}
volumes:
- ${NAS_NFS_PATH:-/mnt/nas/Document_Server}:/documents:ro
- marker_models:/models
@@ -97,6 +102,11 @@ services:
- WHISPER_MODEL=${WHISPER_MODEL:-large-v3}
- WHISPER_DEVICE=${WHISPER_DEVICE:-cuda}
- WHISPER_COMPUTE_TYPE=${WHISPER_COMPUTE_TYPE:-float16}
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~4GB) 해제가 90% 봉투의 전제.
# 콜드로드 수초~수십 초는 배치 작업이라 무방 (stt_worker read=1800s 가 흡수).
# 롤백 = STT_PRELOAD=1 + STT_IDLE_UNLOAD_MINUTES=0.
- STT_PRELOAD=0
- STT_IDLE_UNLOAD_MINUTES=${STT_IDLE_UNLOAD_MINUTES:-30}
deploy:
resources:
reservations:
@@ -105,9 +115,9 @@ services:
count: 1
capabilities: [gpu]
healthcheck:
# /ready: CUDA 디바이스 + 모델 적재 둘 다 확인. ready=true 만 healthy 처리.
# /health 는 단순 liveness 라 모델 적재 상태도 healthy 로 잡혀 운영 신호로 부적합.
test: ["CMD", "python3", "-c", "import json,urllib.request,sys; r=urllib.request.urlopen('http://localhost:3300/ready'); sys.exit(0 if json.load(r).get('ready') else 1)"]
# D-1: idle-unload 도입으로 '모델 적재' 는 더 이상 상시 상태가 아님 — cuda 가용성만
# healthy 기준. 모델 적재 여부는 /ready 의 models_loaded 필드로 관측(정보성).
test: ["CMD", "python3", "-c", "import json,urllib.request,sys; r=urllib.request.urlopen('http://localhost:3300/ready'); sys.exit(0 if json.load(r).get('cuda') else 1)"]
interval: 30s
timeout: 10s
retries: 3
@@ -229,6 +239,31 @@ services:
- fastapi
restart: unless-stopped
# crawl-24x7 A-8 1차: 전 소스 헬스 패널 — 내부 전용 (읽기 전용 SELECT 만).
# '내부 전용' 성립 구현 = 별도 바인딩뿐 (r4 결정): Tailscale 인터페이스에만 publish.
# 기존 SvelteKit 라우트(vhost=Host 헤더 검사=앱 가드 환원)나 프록시 경로 차단(경로 가드
# 회귀)으로 옮기지 말 것. caddy/home-caddy 라우트 추가 금지. fastapi/postgres 바인딩 선례.
crawl-health:
build: ./services/crawl-health
ports:
- "100.110.63.63:8765:8765"
environment:
- CRAWL_HEALTH_DSN=postgresql://pkm:${POSTGRES_PASSWORD}@postgres:5432/pkm
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
# crawl-24x7 B-3: 구독 세션 Playwright fetch 격리 — internal-only (host 포트·caddy 라우트 금지).
# 브라우저 hang/크래시가 fastapi APScheduler 를 잠식하지 않게 별도 컨테이너 + mem cap.
# 세션 파일(쿠키=credential 등가물)은 repo 밖 호스트 경로 ro mount (600, gitignore 무관 영역).
playwright-fetcher:
build: ./services/playwright-fetcher
volumes:
- /home/hyungi/.local/share/crawl-auth:/auth:ro
mem_limit: 2g
restart: unless-stopped
caddy:
image: caddy:2
ports:
@@ -0,0 +1,19 @@
-- A-3 (plan crawl-24x7-1): 소스 레지스트리 증축 — additive only.
-- fetch_method : rss / rss+page / sitemap+page / page / api / signal-only
-- fulltext_policy : none(현행 유지) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
-- auth_profile : NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터용 슬롯)
-- poll_interval_minutes : 소스별 차등 폴링 (NULL=전역 6h 사이클)
-- etag / last_modified : 조건부 GET 워터마크 — 받은 그대로 저장·재전송 (상태는 전부 DB, APScheduler in-process)
-- feed_content_hash : CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행
-- selector_override : 추출 실패 잦은 소스의 site-specific CSS selector (JSONB)
-- parser_quirk : rdf / table-strip / gn-redirect 등 파서 특이 케이스
ALTER TABLE news_sources
ADD COLUMN IF NOT EXISTS fetch_method VARCHAR(20) NOT NULL DEFAULT 'rss',
ADD COLUMN IF NOT EXISTS fulltext_policy VARCHAR(20) NOT NULL DEFAULT 'none',
ADD COLUMN IF NOT EXISTS auth_profile VARCHAR(50),
ADD COLUMN IF NOT EXISTS poll_interval_minutes INTEGER,
ADD COLUMN IF NOT EXISTS etag TEXT,
ADD COLUMN IF NOT EXISTS last_modified TEXT,
ADD COLUMN IF NOT EXISTS feed_content_hash VARCHAR(64),
ADD COLUMN IF NOT EXISTS selector_override JSONB,
ADD COLUMN IF NOT EXISTS parser_quirk VARCHAR(30);
@@ -0,0 +1,3 @@
-- 0-5 (a) 확정 (plan crawl-24x7-1): 도메인 자료(안전/공학/철학) 채널 신설 — news 와 분리.
-- 신규 값은 같은 트랜잭션 내 사용 금지 (PG 제약) — 본 배치의 다른 마이그레이션은 'crawl' 미사용.
ALTER TYPE source_channel ADD VALUE IF NOT EXISTS 'crawl';
@@ -0,0 +1,3 @@
-- A-2 (plan crawl-24x7-1): RSS 요약 → 기사 페이지 fetch → 4-tier 본문 승격 stage.
-- fulltext_policy='page' 소스의 기사에만 news_collector 가 enqueue.
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'fulltext';
+19
View File
@@ -0,0 +1,19 @@
-- A-5 (plan crawl-24x7-1): 소스 건강 — 소스별 실패 격리 기록 + circuit breaker.
-- 한 소스가 죽어도 나머지 영향 0. silent skip 누적 방지의 가시성 기반 (A-8 패널이 읽음).
-- circuit_state: closed(정상) / open(연속 실패로 지수 backoff 중) / disabled(M회 초과, 수동 복구 대상)
-- empty_streak : 200 인데 entries 0 인 연속 fetch 횟수 (피드 부패 감시 — 304/해시동일은 미집계)
CREATE TABLE IF NOT EXISTS source_health (
id SERIAL PRIMARY KEY,
source_id INTEGER NOT NULL REFERENCES news_sources(id) ON DELETE CASCADE,
consecutive_failures INTEGER NOT NULL DEFAULT 0,
total_fetches BIGINT NOT NULL DEFAULT 0,
total_failures BIGINT NOT NULL DEFAULT 0,
last_success_at TIMESTAMPTZ,
last_error TEXT,
last_error_at TIMESTAMPTZ,
last_fetch_items INTEGER,
empty_streak INTEGER NOT NULL DEFAULT 0,
circuit_state VARCHAR(10) NOT NULL DEFAULT 'closed',
circuit_opened_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
@@ -0,0 +1,2 @@
-- A-5: source_health 는 news_sources 와 1:1 — upsert 기준 키.
CREATE UNIQUE INDEX IF NOT EXISTS uq_source_health_source_id ON source_health (source_id);
@@ -0,0 +1,5 @@
-- B/C 그룹 (plan crawl-24x7-1, 0-5 확정): 레지스트리에 채널 컬럼 — additive only.
-- documents.source_channel 과 동일 enum 재사용 ('crawl' 값은 320 에서 별도 트랜잭션으로 추가 완료).
-- 기존 행 전부 'news' 기본값 = 무회귀. crawl 채널 소스의 문서 생성/색인 게이트 분기 기준.
ALTER TABLE news_sources
ADD COLUMN IF NOT EXISTS source_channel source_channel NOT NULL DEFAULT 'news';
@@ -0,0 +1,8 @@
-- B-3 (plan crawl-24x7-1): 구독 세션 상태 노출 계약 — additive only.
-- relogin_requested: 쓰기 1종 플래그 (A-8 버튼이 기록, 어댑터가 소비 = 수동 half-open).
-- 소비 위치 함정(r5 고정): open-스킵 분기보다 앞 — 어댑터 틱마다 확인.
-- last_probe_at/ok: 내용 기반 probe 결과 (시간 기반 만료 판정 금지 — silent corruption 차단).
ALTER TABLE source_health
ADD COLUMN IF NOT EXISTS relogin_requested BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS last_probe_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS last_probe_ok BOOLEAN;
@@ -0,0 +1,33 @@
-- crawl-24x7 사이클 2 소스 seed (B-2 + C-1 안전 + C-5 철학) — 2026-06-10 전 URL live 검증.
-- 262 선례: WHERE NOT EXISTS idempotent, 기존 행 보존, 신규만 insert (단일 statement).
-- 채널: news = 다이제스트/브리핑 대상 / crawl = 도메인 재료 (0-5 분리).
-- 정책: feed-full = 피드 본문이 전문 (UK HSE content:encoded 실측) / page = 기사 페이지 4-tier 승격.
-- EU-OSHA 는 후보 등재만 (enabled=false — 카드 C-1 '우선순위 낮음').
-- 르몽드 B-3 활성화는 seed 아님 — 세션 박제 후 runtime UPDATE (auth_profile/selector_override).
INSERT INTO news_sources
(name, country, language, feed_type, feed_url, category, enabled,
fetch_method, fulltext_policy, source_channel, parser_quirk)
SELECT v.name, v.country, v.language, v.feed_type, v.feed_url, v.category, v.enabled,
v.fetch_method, v.fulltext_policy, v.source_channel::source_channel, v.parser_quirk
FROM (VALUES
-- B-2: Guardian Open Platform (전문 JSON — 스크래핑 불요, GUARDIAN_API_KEY 필요)
('Guardian World', 'GB', 'en', 'api', 'https://content.guardianapis.com/search?section=world', 'International', true, 'api', 'none', 'news', NULL),
-- C-1 안전 (Safety)
('UK HSE Press', 'GB', 'en', 'rss', 'https://press.hse.gov.uk/feed/', 'Safety', true, 'rss', 'feed-full', 'crawl', NULL),
('안전신문', 'KR', 'ko', 'rss', 'https://www.safetynews.co.kr/rss/allArticle.xml', 'Safety', true, 'rss', 'page', 'crawl', NULL),
('고용노동부 공지', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/notice.do', 'Safety', true, 'rss', 'page', 'crawl', NULL),
('고용노동부 정책', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/policy.do', 'Safety', true, 'rss', 'page', 'crawl', NULL),
('고용노동부 입법행정예고', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/lawinfo.do', 'Safety', true, 'rss', 'page', 'crawl', NULL),
('OSHA QuickTakes', 'US', 'en', 'rss', 'https://www.osha.gov/sites/default/files/quicktakes.xml', 'Safety', true, 'rss', 'page', 'crawl', NULL),
('EU-OSHA News', 'EU', 'en', 'rss', 'https://osha.europa.eu/en/rss-feeds/latest/news.xml', 'Safety', false, 'rss', 'page', 'crawl', NULL),
-- C-5 철학 (Philosophy)
('SEP 신규·개정', 'US', 'en', 'rss', 'https://plato.stanford.edu/rss/sep.xml', 'Philosophy', true, 'rss', 'page', 'crawl', NULL),
('1000-Word Philosophy', 'US', 'en', 'rss', 'https://1000wordphilosophy.com/feed/', 'Philosophy', true, 'rss', 'feed-full', 'crawl', NULL),
('Doing Philosophy', 'KR', 'ko', 'rss', 'https://doingphilosophy.kr/feed', 'Philosophy', true, 'rss', 'page', 'crawl', NULL),
('Aeon', 'GB', 'en', 'rss', 'https://aeon.co/feed.rss', 'Philosophy', true, 'rss', 'page', 'crawl', 'skip-video'),
('Psyche', 'GB', 'en', 'rss', 'https://psyche.co/feed.rss', 'Philosophy', true, 'rss', 'page', 'crawl', 'skip-video')
) AS v(name, country, language, feed_type, feed_url, category, enabled,
fetch_method, fulltext_policy, source_channel, parser_quirk)
WHERE NOT EXISTS (
SELECT 1 FROM news_sources ns WHERE ns.name = v.name
);
+59
View File
@@ -0,0 +1,59 @@
"""B-3 구독 세션 1회 수동 박제 (MacBook 등 GUI 머신에서 실행).
르몽드 = Google OAuth 자동화 브라우저 로그인은 Google 차단하므로
로그인 자체는 항상 사람이 headed 브라우저에서 수행하고, 스크립트는
결과(쿠키+localStorage = storage_state JSON) 박제한다.
사용 (MacBook):
pip install playwright && playwright install chromium
python scripts/capture_subscription_session.py --profile lemonde --url https://www.lemonde.fr
1) 떠오른 브라우저에서 직접 로그인 (Google OAuth 포함)
2) 로그인 완료 확인 터미널에서 Enter
3) ~/.local/share/crawl-auth/lemonde.json 저장 (600)
GPU 반영:
ssh gpu 'mkdir -p ~/.local/share/crawl-auth && chmod 700 ~/.local/share/crawl-auth'
scp ~/.local/share/crawl-auth/lemonde.json gpu:.local/share/crawl-auth/
ssh gpu 'chmod 600 ~/.local/share/crawl-auth/lemonde.json'
세션 만료 재로그인도 동일 절차 + source_health.relogin_requested 플래그 set
(어댑터가 다음 틱에 half-open probe 소비).
주의: storage_state = credential 등가물. repo ·백업 대상 경로에 두지 .
"""
import argparse
from pathlib import Path
from playwright.sync_api import sync_playwright
AUTH_DIR = Path.home() / ".local" / "share" / "crawl-auth"
def main() -> None:
parser = argparse.ArgumentParser(description="B-3 구독 세션 storage_state 박제")
parser.add_argument("--profile", required=True, help="예: lemonde")
parser.add_argument("--url", required=True, help="로그인 시작 페이지")
args = parser.parse_args()
AUTH_DIR.mkdir(parents=True, exist_ok=True)
AUTH_DIR.chmod(0o700)
out = AUTH_DIR / f"{args.profile}.json"
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=False)
context = browser.new_context(viewport={"width": 1366, "height": 900})
page = context.new_page()
page.goto(args.url)
print(f"\n브라우저에서 로그인을 완료한 뒤 이 터미널에서 Enter 를 누르세요.")
input("로그인 완료 후 Enter > ")
context.storage_state(path=str(out))
browser.close()
out.chmod(0o600)
print(f"저장: {out} (600)")
print("다음: scp 로 GPU ~/.local/share/crawl-auth/ 반영 + chmod 600")
if __name__ == "__main__":
main()
+12
View File
@@ -0,0 +1,12 @@
FROM python:3.12-slim
WORKDIR /srv
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY server.py .
EXPOSE 8765
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8765/health')"
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8765"]
+3
View File
@@ -0,0 +1,3 @@
fastapi>=0.111.0
uvicorn>=0.30.0
asyncpg>=0.29.0
+202
View File
@@ -0,0 +1,202 @@
"""crawl-health — 전 소스 헬스 패널 1차 (A-8, plan crawl-24x7-1)
읽기 전용 내부 운영 패널. 의존 = 기존 수집 상태(news_sources/source_health/documents/
processing_queue SELECT ) 쓰기 0.
[1] 소스별 last success / 수집 건수 추이(24h/7d) / 연속 실패 / circuit 상태 /
피드 streak + fulltext 승격/격하 통계 + 백로그. -RSS 소스(C-2 sitemap )
같은 표면이 수용 (fetch_method 컬럼 표시 '구독 소스 패널' 좁히지 않는 소스 일반화).
[2 범위 ] B-3 상태 계약 도착 세션 + [재로그인 시도] 버튼(enqueue 방식).
노출: 별도 바인딩만 compose Tailscale 인터페이스(100.110.63.63)에만 publish.
vhost/경로 가드 방식 금지 (r4: '덜 깨짐' 속성 상실). 레벨 인증 없음 =
Tailscale 도달성만이 경계 (fab-server 선례).
"""
import html
import logging
import os
from contextlib import asynccontextmanager
import asyncpg
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, JSONResponse
logger = logging.getLogger("crawl_health")
DSN = os.environ.get("CRAWL_HEALTH_DSN", "")
_pool: asyncpg.Pool | None = None
@asynccontextmanager
async def lifespan(_app: FastAPI):
global _pool
_pool = await asyncpg.create_pool(DSN, min_size=1, max_size=3)
yield
await _pool.close()
app = FastAPI(lifespan=lifespan)
async def _collect_data() -> dict:
async with _pool.acquire() as conn:
sources = await conn.fetch(
"""
SELECT s.id, s.name, s.country, s.enabled, s.feed_type, s.fetch_method,
s.fulltext_policy, s.last_fetched_at,
h.circuit_state, h.consecutive_failures, h.last_success_at,
h.last_error, h.last_error_at, h.last_fetch_items, h.empty_streak,
h.total_fetches, h.total_failures
FROM news_sources s
LEFT JOIN source_health h ON h.source_id = s.id
ORDER BY s.enabled DESC, s.name
"""
)
counts = await conn.fetch(
"""
SELECT s.id,
count(d.id) FILTER (WHERE d.extracted_at > now() - interval '24 hours') AS items_24h,
count(d.id) AS items_7d
FROM news_sources s
LEFT JOIN documents d
ON d.source_channel = 'news'
AND d.extracted_at > now() - interval '7 days'
AND d.file_path LIKE 'news/' || s.name || '/%'
GROUP BY s.id
"""
)
queue = await conn.fetch(
"""
SELECT stage::text AS stage, status::text AS status, count(*) AS n,
min(created_at) FILTER (WHERE status = 'pending') AS oldest_pending
FROM processing_queue
WHERE stage IN ('fulltext', 'summarize', 'embed', 'chunk')
AND status IN ('pending', 'processing', 'failed')
GROUP BY 1, 2
ORDER BY 1, 2
"""
)
fulltext = await conn.fetch(
"""
SELECT extract_meta -> 'fulltext' ->> 'status' AS status, count(*) AS n
FROM documents
WHERE source_channel = 'news' AND extract_meta ? 'fulltext'
GROUP BY 1
"""
)
count_map = {r["id"]: r for r in counts}
return {
"sources": [
{**dict(r),
"items_24h": count_map.get(r["id"], {}).get("items_24h", 0),
"items_7d": count_map.get(r["id"], {}).get("items_7d", 0)}
for r in sources
],
"queue": [dict(r) for r in queue],
"fulltext": [dict(r) for r in fulltext],
}
@app.get("/health")
async def health():
"""Liveness — Docker healthcheck 용 (DB 미접근, 프로세스 생존만)."""
return {"status": "ok", "service": "crawl-health"}
@app.get("/api/health.json")
async def api_health():
data = await _collect_data()
# asyncpg Record 의 datetime → isoformat 직렬화
def _ser(v):
return v.isoformat() if hasattr(v, "isoformat") else v
return JSONResponse({
k: [{kk: _ser(vv) for kk, vv in row.items()} for row in v]
for k, v in data.items()
})
def _chip(state: str | None, enabled: bool) -> str:
if not enabled:
return '<span class="chip off">OFF</span>'
if state == "disabled":
return '<span class="chip err">DISABLED</span>'
if state == "open":
return '<span class="chip warn">OPEN</span>'
return '<span class="chip ok">OK</span>'
def _fmt_ts(v) -> str:
return v.strftime("%m-%d %H:%M") if v else "-"
@app.get("/", response_class=HTMLResponse)
async def index():
data = await _collect_data()
rows = []
for s in data["sources"]:
err = html.escape((s.get("last_error") or "")[:80])
warn_cls = ""
if s["enabled"] and (s.get("consecutive_failures") or 0) >= 3:
warn_cls = ' class="row-warn"'
elif s["enabled"] and (s.get("empty_streak") or 0) >= 8:
warn_cls = ' class="row-warn"'
rows.append(
f"<tr{warn_cls}>"
f"<td>{html.escape(s['name'])}</td>"
f"<td>{_chip(s.get('circuit_state'), s['enabled'])}</td>"
f"<td>{html.escape(s.get('fetch_method') or 'rss')}</td>"
f"<td>{html.escape(s.get('fulltext_policy') or 'none')}</td>"
f"<td class='num'>{s['items_24h']}</td>"
f"<td class='num'>{s['items_7d']}</td>"
f"<td class='num'>{s.get('consecutive_failures') or 0}</td>"
f"<td class='num'>{s.get('empty_streak') or 0}</td>"
f"<td>{_fmt_ts(s.get('last_success_at'))}</td>"
f"<td>{_fmt_ts(s.get('last_fetched_at'))}</td>"
f"<td class='err-text'>{err}</td>"
f"</tr>"
)
qrows = [
f"<tr><td>{html.escape(q['stage'])}</td><td>{html.escape(q['status'])}</td>"
f"<td class='num'>{q['n']}</td><td>{_fmt_ts(q.get('oldest_pending'))}</td></tr>"
for q in data["queue"]
]
frows = [
f"<tr><td>{html.escape(f['status'] or '-')}</td><td class='num'>{f['n']}</td></tr>"
for f in data["fulltext"]
]
body = f"""<!DOCTYPE html>
<html lang="ko"><head><meta charset="utf-8">
<title>crawl-health 소스 헬스 패널</title>
<style>
body {{ font-family: -apple-system, 'Apple SD Gothic Neo', sans-serif; background: #f5f1e8;
color: #3d3a33; margin: 0; padding: 28px; }}
h1 {{ font-size: 19px; margin: 0 0 4px; }} h2 {{ font-size: 14px; margin: 26px 0 8px; }}
.sub {{ color: #8a8474; font-size: 12px; margin-bottom: 18px; }}
table {{ border-collapse: collapse; width: 100%; background: #fffdf8; font-size: 12.5px; }}
th, td {{ border: 1px solid #e3ddcd; padding: 5px 9px; text-align: left; }}
th {{ background: #ece6d6; font-weight: 600; white-space: nowrap; }}
td.num {{ text-align: right; font-variant-numeric: tabular-nums; }}
td.err-text {{ color: #9a4a3a; font-size: 11.5px; max-width: 320px; }}
tr.row-warn td {{ background: #fbf0e4; }}
.chip {{ display: inline-block; padding: 1px 8px; border-radius: 9px; font-size: 11px; font-weight: 600; }}
.chip.ok {{ background: #dce8d4; color: #3c5a2e; }}
.chip.warn {{ background: #f3e0b8; color: #7a5a14; }}
.chip.err {{ background: #eecfc6; color: #8a2f1d; }}
.chip.off {{ background: #e3ddcd; color: #6e6859; }}
</style></head><body>
<h1>crawl-health 소스 헬스 패널</h1>
<div class="sub">A-8 1 (피드 수집 헬스) · 내부 전용 (Tailscale 바인딩) · 새로고침 = 실시간 조회</div>
<h2>소스 ({len(rows)})</h2>
<table><tr><th>소스</th><th>circuit</th><th>fetch</th><th>fulltext</th><th>24h</th><th>7d</th>
<th>연속실패</th><th>빈피드</th><th>last success</th><th>last fetch</th><th>last error</th></tr>
{''.join(rows)}</table>
<h2>처리 (fulltext / summarize / embed / chunk)</h2>
<table><tr><th>stage</th><th>status</th><th>건수</th><th>oldest pending</th></tr>
{''.join(qrows) or '<tr><td colspan="4">백로그 없음</td></tr>'}</table>
<h2>fulltext 승격 누적</h2>
<table><tr><th>status</th><th>건수</th></tr>
{''.join(frows) or '<tr><td colspan="2">기록 없음 (파일럿 전환 전)</td></tr>'}</table>
</body></html>"""
return HTMLResponse(body)
+109 -28
View File
@@ -1,12 +1,18 @@
"""marker-service — POST /convert: PDF → markdown + 추출 이미지 base64.
Phase 1B (2026-05-01) 텍스트만 응답, 이미지 폐기.
Phase 1B.5 ( 변경) `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
Phase 1B.5 `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
없는 stateless 변환기 유지 (fastapi NAS persist 담당).
D-1 (plan crawl-24x7-1, 2026-06-10) idle-unload 운영 전환:
MARKER_PRELOAD=0 : startup warmup ( /convert lazy load)
MARKER_IDLE_UNLOAD_MINUTES : N분 유휴 모델 해제 (0=비활성, 기존 동작)
/ready idle(미적재)에서도 200 fastapi depends_on service_healthy
lazy 모드에서 영구 미기동으로 굳는 방지. 503 warmup_failed 한정.
plan: ~/.claude/plans/piped-humming-crystal.md
"""
import base64
import gc
import hashlib
import io
import logging
@@ -40,6 +46,12 @@ _warmup_done = False
_warmup_error: str | None = None
_warmup_lock = threading.Lock()
# D-1 idle-unload 상태 — 전이는 전부 _warmup_lock 아래
_PRELOAD = os.getenv("MARKER_PRELOAD", "1") != "0"
_IDLE_UNLOAD_MINUTES = int(os.getenv("MARKER_IDLE_UNLOAD_MINUTES", "0"))
_inflight = 0
_last_used = time.monotonic()
# 이미지 응답 cap. base64 응답 크기 폭주 방지. 사용자 PDF 풀 측정 (Phase 1D) 시
# 가장 이미지 많은 문서가 ~30건 수준 → 200 은 안전 마진. 초과 시 truncate flag 응답.
MAX_IMAGES_PER_DOC = int(os.getenv("MARKER_MAX_IMAGES_PER_DOC", "200"))
@@ -68,11 +80,67 @@ def _ensure_warmup() -> None:
raise
def _acquire_models():
"""warmup 보장 + inflight 진입을 원자적으로 — ensure 직후 reaper 가 해제하는 경합 차단."""
global _inflight
while True:
_ensure_warmup()
with _warmup_lock:
if _warmup_done:
_inflight += 1
return
# ensure 와 lock 재진입 사이에 unload 가 끼어든 희귀 경합 — 재시도
def _release_models():
global _inflight, _last_used
with _warmup_lock:
_inflight -= 1
_last_used = time.monotonic()
def _maybe_unload() -> None:
"""유휴 시 모델 해제. 변환 중(inflight>0)이면 절대 해제하지 않는다.
split 변환의 배치 사이 간격은 단위 N>=1 임계면 배치 사이 해제 없음.
"""
global _models, _converter, _warmup_done
with _warmup_lock:
if not _warmup_done or _inflight > 0:
return
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
return
_models = None
_converter = None
_warmup_done = False
gc.collect()
try:
import torch
torch.cuda.empty_cache()
except Exception:
pass
logger.info(f"[marker-service] idle-unload: 모델 해제 (유휴 {_IDLE_UNLOAD_MINUTES}분 초과)")
async def _idle_reaper():
import asyncio
while True:
await asyncio.sleep(60)
try:
_maybe_unload()
except Exception:
logger.exception("[marker-service] idle reaper 오류")
@app.on_event("startup")
async def startup():
"""startup hook — async warmup 백그라운드. /ready 가 완료 여부 노출."""
"""startup hook — warmup 은 MARKER_PRELOAD 게이트 (D-1: lazy 기본 전환은 compose 가)."""
import asyncio
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
if _PRELOAD:
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
if _IDLE_UNLOAD_MINUTES > 0:
asyncio.create_task(_idle_reaper())
logger.info(f"[marker-service] idle-unload 활성: {_IDLE_UNLOAD_MINUTES}")
class ConvertRequest(BaseModel):
@@ -111,7 +179,12 @@ def health():
@app.get("/ready")
async def ready(response: Response):
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출."""
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출.
D-1: idle(미적재) = 200. 503 warmup_failed 한정 lazy 모드에서 fastapi
depends_on service_healthy 영구 미기동으로 굳지 않게. 배포 검증에서
'status=ready' 단언하던 runbook 강제 warm 호출(/convert 1) 대체.
"""
if _warmup_error:
response.status_code = 503
return {
@@ -121,31 +194,28 @@ async def ready(response: Response):
"error": _warmup_error,
}
if not _warmup_done:
response.status_code = 503
return {
"status": "warming_up",
"status": "warming_up" if _PRELOAD else "idle",
"engine": "marker",
"engine_version": _engine_version,
"models_loaded": False,
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
}
return {
"status": "ready",
"engine": "marker",
"engine_version": _engine_version,
"models_loaded": True,
"inflight": _inflight,
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
}
@app.post("/convert", response_model=ConvertResponse)
async def convert(req: ConvertRequest):
_ensure_warmup()
p = Path(req.file_path)
if not p.is_file():
raise HTTPException(404, detail={"code": "file_not_found", "message": str(p)})
start = time.monotonic()
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
converter = _converter
if req.start_page is not None and req.end_page is not None:
if req.start_page < 1 or req.end_page < req.start_page:
raise HTTPException(
@@ -155,22 +225,33 @@ async def convert(req: ConvertRequest):
"message": f"start_page={req.start_page} end_page={req.end_page}",
},
)
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
try:
rendered = converter(str(p))
except Exception as exc:
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
raise HTTPException(
status_code=422,
detail={
"code": "conversion_failed",
"message": f"{type(exc).__name__}: {exc}",
},
) from exc
md_text, _meta, raw_images = text_from_rendered(rendered)
elapsed_ms = int((time.monotonic() - start) * 1000)
# D-1: warmup 보장 + inflight 진입 원자화 — 변환 중 reaper 해제 차단. 해제는 finally.
_acquire_models()
try:
start = time.monotonic()
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
converter = _converter
if req.start_page is not None and req.end_page is not None:
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
try:
rendered = converter(str(p))
except Exception as exc:
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
raise HTTPException(
status_code=422,
detail={
"code": "conversion_failed",
"message": f"{type(exc).__name__}: {exc}",
},
) from exc
md_text, _meta, raw_images = text_from_rendered(rendered)
elapsed_ms = int((time.monotonic() - start) * 1000)
finally:
_release_models()
images_payload, truncated = _serialize_images(raw_images, str(p))
+18
View File
@@ -0,0 +1,18 @@
# B-3 / A-1 Tier 2 (plan crawl-24x7-1) — Playwright 격리 컨테이너.
# 브라우저 hang/크래시가 fastapi APScheduler 를 잠식하지 않게 별도 서비스로 격리,
# 타임아웃 있는 HTTP 호출로만 사용. 요청당 브라우저 기동 = 컨텍스트 누적 메모리 차단.
FROM mcr.microsoft.com/playwright/python:v1.47.0-jammy
WORKDIR /srv
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY server.py .
# root 로 Chromium 실행 시 sandbox 비활성 강제됨 — 이미지 내장 pwuser(uid 1000)로 실행.
# /auth ro mount(호스트 hyungi uid 1000, mode 600)도 동일 uid 라 판독 가능.
USER pwuser
# internal-only — compose 네트워크 전용, host 포트 미매핑 (caddy 라우트 금지)
EXPOSE 3400
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3400"]
@@ -0,0 +1,3 @@
fastapi==0.115.*
uvicorn==0.32.*
playwright==1.47.0
+107
View File
@@ -0,0 +1,107 @@
"""B-3 구독 세션 Playwright fetcher (plan crawl-24x7-1).
storage_state JSON(쿠키+localStorage 스냅샷) 기반 인증 페이지 fetch + 내용 기반 probe.
- 동시 1 인스턴스 (글로벌 세마포어) 계정 보호 + 사람 속도는 호출측 politeness 담당.
- 요청당 브라우저 기동/종료 컨텍스트 메모리 누적·hang 잔존 차단 (저빈도라 기동비용 무관).
- 세션 파일: /auth/{profile}.json (호스트 ~/.local/share/crawl-auth/, ro mount, 600).
부재 = 503 profile_missing (silent fallback 없음 호출측이 degrade).
- 시간 기반 만료 판정 금지 probe 알려진 유료 기사에서 본문 길이 + 페이월 마커 부재 검증
(만료 200 '페이월 안내문' 본문으로 저장되는 silent corruption 차단).
"""
import asyncio
import logging
from pathlib import Path
from fastapi import FastAPI, HTTPException
from playwright.async_api import async_playwright, Error as PlaywrightError
from pydantic import BaseModel, Field
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
logger = logging.getLogger("playwright-fetcher")
AUTH_DIR = Path("/auth")
NAV_TIMEOUT_MS = 45_000
SETTLE_MS = 1_500 # domcontentloaded 후 lazy 본문 settle 대기
app = FastAPI(title="playwright-fetcher")
_browser_slot = asyncio.Semaphore(1) # 동시 1 인스턴스 (B-3 ① persistent 제약과 동일 규율)
class FetchReq(BaseModel):
url: str
profile: str = Field(pattern=r"^[a-z0-9_-]{1,50}$")
class ProbeReq(BaseModel):
profile: str = Field(pattern=r"^[a-z0-9_-]{1,50}$")
probe_url: str
min_body_chars: int = 800
paywall_markers: list[str] = []
def _state_path(profile: str) -> Path:
p = AUTH_DIR / f"{profile}.json"
if not p.is_file():
raise HTTPException(503, detail={"error_reason": "profile_missing", "profile": profile})
return p
async def _browse(url: str, state: Path) -> tuple[str, str, str]:
"""(html, final_url, visible_text). 요청당 브라우저 — 종료를 finally 로 보장."""
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
try:
context = await browser.new_context(
storage_state=str(state),
viewport={"width": 1366, "height": 900},
locale="fr-FR",
)
page = await context.new_page()
await page.goto(url, wait_until="domcontentloaded", timeout=NAV_TIMEOUT_MS)
await page.wait_for_timeout(SETTLE_MS)
html = await page.content()
final_url = page.url
text = await page.evaluate("document.body ? document.body.innerText : ''")
return html, final_url, text
finally:
await browser.close()
@app.get("/health")
def health():
profiles = sorted(p.stem for p in AUTH_DIR.glob("*.json")) if AUTH_DIR.is_dir() else []
return {"status": "ok", "profiles": profiles}
@app.post("/fetch")
async def fetch(req: FetchReq):
state = _state_path(req.profile)
async with _browser_slot:
try:
html, final_url, _ = await _browse(req.url, state)
except PlaywrightError as e:
logger.warning("fetch 실패 %s: %s", req.url, e)
raise HTTPException(502, detail={"error_reason": "browse_failed", "message": str(e)[:300]})
logger.info("fetch ok profile=%s %s (%d bytes)", req.profile, req.url, len(html))
return {"html": html, "final_url": final_url}
@app.post("/probe")
async def probe(req: ProbeReq):
"""내용 기반 세션 probe — ok=False 사유를 명시 반환 (호출측이 health 에 기록)."""
state = _state_path(req.profile)
async with _browser_slot:
try:
_, final_url, text = await _browse(req.probe_url, state)
except PlaywrightError as e:
return {"ok": False, "reason": f"browse_failed: {str(e)[:200]}", "body_chars": 0}
body_chars = len(text.strip())
hit = next((m for m in req.paywall_markers if m and m.lower() in text.lower()), None)
if hit:
return {"ok": False, "reason": f"paywall_marker: {hit}", "body_chars": body_chars}
if body_chars < req.min_body_chars:
return {"ok": False, "reason": f"body_too_short: {body_chars} < {req.min_body_chars}",
"body_chars": body_chars}
logger.info("probe ok profile=%s (%d chars, final=%s)", req.profile, body_chars, final_url)
return {"ok": True, "reason": None, "body_chars": body_chars}
+83 -27
View File
@@ -1,14 +1,23 @@
"""STT 마이크로서비스 — faster-whisper (GPU) 기반 음성 전사.
filePath {text, segments:[{start,end,text}]}.
모델은 startup 에서 eager preload (Docker /ready healthcheck 모델 적재까지 검증).
기본 모델 large-v3 (VRAM ~3GB, float16). 환경변수로 교체 가능.
환경변수 `STT_PRELOAD=0` 으로 lazy 강제 가능 (개발/테스트용).
D-1 (plan crawl-24x7-1, 2026-06-10) idle-unload 운영 전환:
STT_PRELOAD=0 : startup eager preload ( 요청 lazy load)
STT_IDLE_UNLOAD_MINUTES: N분 유휴 모델 해제 (0=비활성, 기존 동작).
faster-whisper=CTranslate2 torch 미설치 해제는
참조 제거 + gc (CTranslate2 소멸 VRAM 반환).
콜드로드 수초~수십 초는 호출측(stt_worker read=1800s) 흡수. healthcheck
cuda 가용성 기준 (compose) 모델 적재는 이상 상시 상태가 아니다.
"""
import asyncio
import gc
import logging
import os
import threading
import time
import unicodedata
from contextlib import asynccontextmanager
from pathlib import Path
@@ -17,18 +26,26 @@ from fastapi import FastAPI
logger = logging.getLogger("stt")
_IDLE_UNLOAD_MINUTES = int(os.getenv("STT_IDLE_UNLOAD_MINUTES", "0"))
@asynccontextmanager
async def lifespan(_app: FastAPI):
# startup: 모델 eager preload 시도. 실패해도 프로세스는 살아 있고
# /ready 가 false 로 남아 healthcheck 가 unhealthy 처리.
# /ready 의 models_loaded 가 false 로 남는다.
if os.getenv("STT_PRELOAD", "1") != "0":
try:
_load_model()
logger.info("stt model preloaded: %s (%s, %s)", _MODEL_NAME, _DEVICE, _COMPUTE_TYPE)
except Exception as e:
logger.exception("stt model preload failed: %s", e)
reaper = None
if _IDLE_UNLOAD_MINUTES > 0:
reaper = asyncio.create_task(_idle_reaper())
logger.info("stt idle-unload 활성: %d", _IDLE_UNLOAD_MINUTES)
yield
if reaper:
reaper.cancel()
app = FastAPI(lifespan=lifespan)
@@ -38,6 +55,11 @@ _MODEL_NAME = os.getenv("WHISPER_MODEL", "large-v3")
_DEVICE = os.getenv("WHISPER_DEVICE", "cuda")
_COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "float16")
# load/unload/inflight 상태 전이는 전부 이 lock 아래 (cold 동시 요청 이중 로드 방지 포함)
_model_lock = threading.Lock()
_inflight = 0
_last_used = time.monotonic()
def _resolve_path(file_path: str) -> Path | None:
"""NFC(DB) vs NFD(NFS) 한글 경로 정규화 차이 흡수. OCR 서비스와 동일 패턴."""
@@ -61,14 +83,38 @@ def _resolve_path(file_path: str) -> Path | None:
def _load_model():
"""faster-whisper lazy loading — 첫 호출 시만 VRAM 점유."""
"""faster-whisper lazy loading — 첫 호출 시만 VRAM 점유. lock 으로 이중 로드 방지."""
global _model
if _model is not None:
return _model
from faster_whisper import WhisperModel
with _model_lock:
if _model is None:
from faster_whisper import WhisperModel
logger.info("stt model loading: %s (%s, %s)", _MODEL_NAME, _DEVICE, _COMPUTE_TYPE)
_model = WhisperModel(_MODEL_NAME, device=_DEVICE, compute_type=_COMPUTE_TYPE)
return _model
_model = WhisperModel(_MODEL_NAME, device=_DEVICE, compute_type=_COMPUTE_TYPE)
return _model
def _maybe_unload() -> None:
"""유휴 시 모델 해제. 처리 중(inflight>0)이면 절대 해제하지 않는다."""
global _model
with _model_lock:
if _model is None or _inflight > 0:
return
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
return
_model = None
gc.collect()
logger.info("stt idle-unload: whisper 모델 해제 (유휴 %d분 초과)", _IDLE_UNLOAD_MINUTES)
async def _idle_reaper():
while True:
await asyncio.sleep(60)
try:
_maybe_unload()
except Exception:
logger.exception("stt idle reaper 오류")
def _cuda_device_count() -> int:
@@ -87,7 +133,7 @@ def health():
@app.get("/ready")
def ready():
"""Readiness — CUDA + 모델 상태. 배포 검증용."""
"""Readiness — CUDA + 모델 상태. healthcheck 는 cuda 만 본다 (D-1 idle-unload)."""
count = _cuda_device_count()
cuda_ok = count > 0
models_loaded = _model is not None
@@ -98,6 +144,8 @@ def ready():
"models_loaded": models_loaded,
"model": _MODEL_NAME,
"compute_type": _COMPUTE_TYPE,
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
"inflight": _inflight,
}
@@ -121,6 +169,7 @@ async def transcribe(body: dict):
"duration": 1832.5
}
"""
global _inflight, _last_used
raw_path = body["filePath"]
langs = body.get("langs")
beam_size = int(body.get("beamSize", 5))
@@ -129,28 +178,35 @@ async def transcribe(body: dict):
if resolved is None:
return {"error": f"파일 없음: {raw_path}", "text": "", "segments": []}
model = _load_model()
with _model_lock:
_inflight += 1
try:
model = _load_model()
language = None
if isinstance(langs, list) and len(langs) == 1:
language = langs[0]
language = None
if isinstance(langs, list) and len(langs) == 1:
language = langs[0]
segments_iter, info = model.transcribe(
str(resolved),
beam_size=beam_size,
language=language,
vad_filter=True,
)
segments_iter, info = model.transcribe(
str(resolved),
beam_size=beam_size,
language=language,
vad_filter=True,
)
segments = []
parts = []
for seg in segments_iter:
segments.append({
"start": round(float(seg.start), 2),
"end": round(float(seg.end), 2),
"text": seg.text.strip(),
})
parts.append(seg.text)
segments = []
parts = []
for seg in segments_iter:
segments.append({
"start": round(float(seg.start), 2),
"end": round(float(seg.end), 2),
"text": seg.text.strip(),
})
parts.append(seg.text)
finally:
with _model_lock:
_inflight -= 1
_last_used = time.monotonic()
return {
"text": " ".join(p.strip() for p in parts).strip(),
File diff suppressed because one or more lines are too long
+1
View File
@@ -0,0 +1 @@
{"body":{"pageNo":1,"totalCount":1,"numOfRows":5,"items":{"item":[{"filenm":"컨베이어에 끼임.pdf","filepath":"https://portal.kosha.or.kr/openapi/v1/file/down/stdboard/B2025022104002/202605281621537G75H2/D0801000010001","boardno":"202605281621537G75H2"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}}
+1
View File
@@ -0,0 +1 @@
{"body":{"pageNo":1,"totalCount":6334,"numOfRows":3,"items":{"item":[{"business":"제조업","contents":"2026.01.00(월) 07:30경, 경기도 소재 OOOO(주)에서 재해자가 골재 이송 컨베이어 상부의 이물질을 제거하던 중,다리가 컨베이어 벨트와 테일 풀리 (Tail Pulley)* 사이에 끼임 *컨베이어의 아래쪽 끝단에서 회전하며 벨트를 순환시키는 원통형 기계장치","atcflcnt":1,"keyword":"컨베이어에 끼임","boardno":"202605281621537G75H2"},{"business":"건설업","contents":"2025. 8. 00. (금) 11:12 경 경기도 소재 OOO 신축공 사현장에서 데크플레이트 설치 중 밟고 있던 미고정 데크플레이트가 탈락하며 약 7m 높이에서 추락함","atcflcnt":1,"keyword":"데크플레이트 설치 작업 중 추락","boardno":"20260528162031VZLE93"},{"business":"건설업","contents":"2025. 06. 00.(금) 12:35경, 경북 봉화군 소재 (주)OOOO 침전저류지 현장에서 타워크레인 전도 후 매립된 케이크*(오염토)를 굴착 및 운반 작업 중, 사면의 토사와 타워크레인 기초구조물이 무너지며 하단에서 작업 중이던 굴착기가 매몰됨 * 분말 상태의 원료에서 아연을 채취한 후 남은 중금속 부산물(산화칼슘, 납, 산화철, 황산 등)을 장기간 매립하여 만들어지는 고체 형태의 오염 토양 덩어리","atcflcnt":1,"keyword":"사면 굴착 작업 중 매몰","boardno":"20260527153100O7QX25"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}}
+1
View File
@@ -0,0 +1 @@
{"body":{"pageNo":1,"totalCount":1039,"numOfRows":3,"items":{"item":[{"techGdlnNm":"구리에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-1-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883045/7"},{"techGdlnNm":"마그네슘에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-4-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883165/3"},{"techGdlnNm":"백금에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-6-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883187/3"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}}
+139
View File
@@ -0,0 +1,139 @@
"""crawl-24x7 사이클 2 — 순수 함수/형태 회귀 테스트 (DB 불요).
Guardian 호출 형태 + fixture 응답 파싱 + 채널 정체성 + B-5 quirk.
fixture = tests/fixtures/guardian_open_platform_search_response.json
(2026-06-10 실키 live 박제, api-key 응답 본문 미포함 확인 [[feedback_external_api_fixture_first]]).
"""
import json
import re
from pathlib import Path
from workers.news_collector import (
_article_hash,
_doc_identity,
_guardian_request,
_normalize_category,
)
FIXTURE = Path(__file__).parent / "fixtures" / "guardian_open_platform_search_response.json"
def _make_source(**kw):
"""ORM 인스턴스 없이 속성만 흉내 (식별성 함수는 속성 접근만 사용)."""
class S:
pass
s = S()
s.source_channel = kw.get("source_channel", "news")
s.parser_quirk = kw.get("parser_quirk")
return s
class TestGuardianCallShape:
def test_request_shape_matches_fixture_recipe(self):
"""fixture 박제 시 사용한 호출과 단일 source-of-truth 정합
([[feedback_fixture_first_call_shape]])."""
endpoint, params = _guardian_request(
"https://content.guardianapis.com/search?section=world", "KEY"
)
assert endpoint == "https://content.guardianapis.com/search"
assert params["section"] == "world"
assert params["show-fields"] == "bodyText,trailText"
assert params["order-by"] == "newest"
assert params["api-key"] == "KEY"
def test_feed_url_query_overridden_by_fixed_fields(self):
# feed_url 에 show-fields 가 잘못 박혀 있어도 고정 필드가 이긴다 (dict merge 순서)
_, params = _guardian_request(
"https://content.guardianapis.com/search?section=world&show-fields=headline", "K"
)
assert params["show-fields"] == "bodyText,trailText"
class TestGuardianFixtureParsing:
def test_fixture_response_shape(self):
payload = json.loads(FIXTURE.read_text())["response"]
assert payload["status"] == "ok"
assert payload["results"], "fixture 에 결과 0건"
for item in payload["results"]:
assert item["webTitle"].strip()
assert item["webUrl"].startswith("https://")
assert "webPublicationDate" in item
assert "sectionName" in item
fields = item.get("fields") or {}
assert "bodyText" in fields and "trailText" in fields
def test_fixture_bodytext_is_fulltext_grade(self):
payload = json.loads(FIXTURE.read_text())["response"]
# 전문 게이트(200자)를 fixture 가 통과해야 어댑터 is_full 경로가 산다
assert any(len(i["fields"]["bodyText"]) >= 200 for i in payload["results"])
def test_fixture_contains_no_api_key(self):
assert "api-key" not in FIXTURE.read_text()
class TestChannelIdentity:
def test_news_channel_unchanged(self):
ident = _doc_identity(_make_source(source_channel="news"), "경향신문", "Society")
assert ident == {
"path_prefix": "news",
"ai_domain": "News",
"ai_tags": ["News/경향신문/Society"],
}
def test_crawl_channel_domain_identity(self):
ident = _doc_identity(_make_source(source_channel="crawl"), "TWI", "Engineering")
assert ident["path_prefix"] == "crawl"
assert ident["ai_domain"] == "Engineering"
assert ident["ai_tags"] == ["Engineering/TWI"]
def test_crawl_channel_unknown_category_falls_back(self):
ident = _doc_identity(_make_source(source_channel="crawl"), "X", "Other")
assert ident["ai_domain"] == "Domain"
def test_category_map_has_domain_axes(self):
assert _normalize_category("안전") == "Safety"
assert _normalize_category("Engineering") == "Engineering"
assert _normalize_category("철학") == "Philosophy"
class TestSkipVideoQuirk:
PATTERN = re.compile(r"/videos?/")
def test_video_urls_match(self):
assert self.PATTERN.search("https://psyche.co/videos/some-film")
assert self.PATTERN.search("https://aeon.co/video/another")
def test_article_urls_pass(self):
assert not self.PATTERN.search("https://psyche.co/ideas/how-to-think")
class TestRedirect304Distinction:
"""httpx is_redirect 가 304(3xx 전체)에 True 라 redirect 로 오인 → 조건부 GET
안정 피드가 'redirect 3회 초과' 전멸하던 버그. has_redirect_location 으로 구분."""
def test_304_is_not_a_redirect_location(self):
import httpx
r = httpx.Response(304, request=httpx.Request("GET", "https://x/"))
assert r.is_redirect is True # httpx 함정: 304 도 is_redirect
assert r.has_redirect_location is False # 우리가 써야 하는 정확한 판별
def test_real_redirect_has_location(self):
import httpx
r = httpx.Response(301, headers={"location": "https://y/"},
request=httpx.Request("GET", "https://x/"))
assert r.has_redirect_location is True
def test_collector_uses_has_redirect_location(self):
import inspect
from workers import news_collector
src = inspect.getsource(news_collector._fetch_rss)
assert "has_redirect_location" in src
assert "while resp.is_redirect" not in src # 옛 버그 패턴 부재
class TestArticleHashStability:
def test_static_corpus_hash_deterministic(self):
a = _article_hash("Creep and Creep Failures", "static", "National Board 기술 아티클")
b = _article_hash("Creep and Creep Failures", "static", "National Board 기술 아티클")
assert a == b and len(a) == 32