Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f4e5db9723 | |||
| 69db9bcb94 | |||
| 61e5a416d0 | |||
| cdf4ee0ef6 | |||
| 251a5392ef | |||
| 1842f27d89 | |||
| 53a30449e2 | |||
| ab668d7990 | |||
| dcf99b377e | |||
| 3df0ca53ab | |||
| 7cd8cfde0a |
@@ -0,0 +1,284 @@
|
||||
"""크롤링 politeness 코어 (A-4, plan crawl-24x7-1)
|
||||
|
||||
개인 아카이빙 권장치를 그대로 박은 공용 fetch 계층:
|
||||
- per-domain 동시성 1 (asyncio.Lock) + 같은 도메인 연속 요청 5–15초 지연 + jitter
|
||||
- robots.txt 존중 (urllib.robotparser, 24h 캐시) — 비로그인 공개 크롤링 한정.
|
||||
로그인 세션 fetch (B-3) 는 사용자 행위 성격이라 robots 대신 사람 속도가 기준.
|
||||
- 정직 식별 UA + 연락처 (익명 크롤링 트랙. 로그인 세션은 브라우저 UA 유지 — B-3)
|
||||
- 429 = Retry-After 존중 / 5xx = 재시도 가능 / 403 = 차단 신호 (호출측 circuit 연동)
|
||||
|
||||
도메인별 마지막 요청 시각 등 rate 상태는 in-process (영속 워터마크는 DB — news_sources).
|
||||
SSRF 차단은 core.url_validator.validate_feed_url 재사용 (redirect target 재검증 포함).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
import time
|
||||
import urllib.robotparser
|
||||
from urllib.parse import urljoin, urlparse
|
||||
|
||||
import httpx
|
||||
|
||||
from core.url_validator import validate_feed_url
|
||||
from core.utils import setup_logger
|
||||
|
||||
# bare getLogger 는 root(WARNING) 상속이라 INFO 대기/차단 로그가 드랍됨 — 타 워커와 동일 설정
|
||||
logger = setup_logger("crawl_politeness")
|
||||
|
||||
# 정직 식별 UA + 연락처 — 차단 전 연락 통로 (A-4)
|
||||
CRAWL_UA = "HyungiPKM-Archiver/1.0 (personal archive; +mailto:hyun49196@gmail.com)"
|
||||
|
||||
# 같은 도메인 연속 요청 간격 (초) — 권장치 5–15s + jitter
|
||||
_DOMAIN_DELAY_MIN = 5.0
|
||||
_DOMAIN_DELAY_MAX = 15.0
|
||||
|
||||
# 구독 세션(브라우저) fetch 간격 — 사람 속도 (B-3 ④: 기사 간 수십 초)
|
||||
_AUTH_DELAY_MIN = 30.0
|
||||
_AUTH_DELAY_MAX = 60.0
|
||||
|
||||
# B-3 Playwright 격리 컨테이너 (internal-only, compose DNS)
|
||||
_FETCHER_URL = "http://playwright-fetcher:3400"
|
||||
_FETCHER_TIMEOUT = 120.0 # 브라우저 기동 + 네비게이션 + settle 포함
|
||||
|
||||
# 안티봇 챌린지 페이지 식별 마커 (DataDome/Cloudflare 등) — 좁게 유지(오탐 회피).
|
||||
# 실측: 르몽드 기사 = DataDome "Client Challenge" + "Entrez les caractères" CAPTCHA.
|
||||
_CHALLENGE_MARKERS = (
|
||||
"Client Challenge",
|
||||
"Entrez les caractères affichés",
|
||||
"Checking your browser before",
|
||||
"captcha-delivery.com",
|
||||
"geo.captcha-delivery",
|
||||
)
|
||||
|
||||
_ROBOTS_CACHE_TTL = 24 * 3600 # 24h
|
||||
_MAX_PAGE_BYTES = 5 * 1024 * 1024 # 피드 fetch 와 동일 5MB cap
|
||||
_PAGE_TIMEOUT = 20.0
|
||||
_MAX_REDIRECTS = 3
|
||||
|
||||
_HTML_CONTENT_TYPES = ("text/html", "application/xhtml+xml")
|
||||
|
||||
|
||||
class CrawlFetchError(Exception):
|
||||
"""일시 오류 (5xx / timeout / 네트워크) — 큐 재시도 대상."""
|
||||
|
||||
|
||||
class CrawlBlocked(Exception):
|
||||
"""차단 신호 (403 / 429 / robots disallow) — 재시도보다 backoff/circuit 대상."""
|
||||
|
||||
|
||||
class CrawlSkip(Exception):
|
||||
"""영구 비대상 (비-HTML / 크기 초과 / SSRF 차단 / 4xx) — 격하 처리 대상."""
|
||||
|
||||
|
||||
# 도메인별 직렬화 상태 (in-process)
|
||||
_domain_locks: dict[str, asyncio.Lock] = {}
|
||||
_domain_last_request: dict[str, float] = {}
|
||||
# host → (cached_at, RobotFileParser | None). None = robots 없음/4xx (전부 허용)
|
||||
_robots_cache: dict[str, tuple[float, urllib.robotparser.RobotFileParser | None]] = {}
|
||||
|
||||
|
||||
def _domain_of(url: str) -> str:
|
||||
return (urlparse(url).hostname or "").lower()
|
||||
|
||||
|
||||
def _get_lock(domain: str) -> asyncio.Lock:
|
||||
if domain not in _domain_locks:
|
||||
_domain_locks[domain] = asyncio.Lock()
|
||||
return _domain_locks[domain]
|
||||
|
||||
|
||||
async def _respect_domain_rate(
|
||||
domain: str,
|
||||
delay_min: float = _DOMAIN_DELAY_MIN,
|
||||
delay_max: float = _DOMAIN_DELAY_MAX,
|
||||
) -> None:
|
||||
"""같은 도메인 직전 요청에서 delay(jitter) 경과할 때까지 대기."""
|
||||
last = _domain_last_request.get(domain)
|
||||
if last is not None:
|
||||
delay = random.uniform(delay_min, delay_max)
|
||||
wait = last + delay - time.monotonic()
|
||||
if wait > 0:
|
||||
# silent sleep 금지 — politeness 동작 검증·운영 관찰 가시성
|
||||
logger.info("[politeness] %s %.1fs 대기", domain, wait)
|
||||
await asyncio.sleep(wait)
|
||||
|
||||
|
||||
async def _fetch_robots(client: httpx.AsyncClient, scheme: str, host: str):
|
||||
"""robots.txt 조회. 4xx/부재 = 전부 허용(None), 5xx/오류 = 보수적으로 이번 사이클 차단."""
|
||||
robots_url = f"{scheme}://{host}/robots.txt"
|
||||
try:
|
||||
resp = await client.get(robots_url, headers={"User-Agent": CRAWL_UA})
|
||||
except httpx.HTTPError as e:
|
||||
raise CrawlFetchError(f"robots.txt 조회 실패: {host}: {e}") from e
|
||||
if resp.status_code >= 500:
|
||||
# 5xx 는 의도 불명 — 표준 관행대로 이번 사이클은 차단 취급
|
||||
raise CrawlFetchError(f"robots.txt 5xx: {host}: {resp.status_code}")
|
||||
if resp.status_code >= 400:
|
||||
return None # robots 없음 = 전부 허용
|
||||
rp = urllib.robotparser.RobotFileParser()
|
||||
rp.parse(resp.text.splitlines())
|
||||
return rp
|
||||
|
||||
|
||||
async def _robots_allows(client: httpx.AsyncClient, url: str) -> bool:
|
||||
parsed = urlparse(url)
|
||||
host = (parsed.hostname or "").lower()
|
||||
cached = _robots_cache.get(host)
|
||||
if cached is None or time.monotonic() - cached[0] > _ROBOTS_CACHE_TTL:
|
||||
rp = await _fetch_robots(client, parsed.scheme or "https", host)
|
||||
_robots_cache[host] = (time.monotonic(), rp)
|
||||
cached = _robots_cache[host]
|
||||
rp = cached[1]
|
||||
if rp is None:
|
||||
return True
|
||||
return rp.can_fetch(CRAWL_UA, url)
|
||||
|
||||
|
||||
async def fetch_page(
|
||||
url: str, *, check_robots: bool = True,
|
||||
content_types: tuple[str, ...] = _HTML_CONTENT_TYPES,
|
||||
) -> tuple[str, str]:
|
||||
"""공개 페이지 1건 politeness fetch. (html_text, final_url) 반환.
|
||||
|
||||
- SSRF 검증 (redirect target 포함, news_collector 피드 fetch 와 동일 이중 검증)
|
||||
- per-domain 동시성 1 + 5–15s jitter 지연
|
||||
- 429: Retry-After 로그 후 CrawlBlocked / 403: CrawlBlocked / 그 외 4xx: CrawlSkip
|
||||
- 5xx/timeout: CrawlFetchError (큐 재시도)
|
||||
- 비-HTML content-type / 5MB 초과: CrawlSkip
|
||||
"""
|
||||
try:
|
||||
validate_feed_url(url)
|
||||
except ValueError as e:
|
||||
raise CrawlSkip(f"URL 검증 실패: {e}") from e
|
||||
|
||||
domain = _domain_of(url)
|
||||
async with _get_lock(domain):
|
||||
await _respect_domain_rate(domain)
|
||||
try:
|
||||
async with httpx.AsyncClient(
|
||||
timeout=_PAGE_TIMEOUT, follow_redirects=False,
|
||||
headers={"User-Agent": CRAWL_UA},
|
||||
) as client:
|
||||
if check_robots and not await _robots_allows(client, url):
|
||||
raise CrawlBlocked(f"robots.txt disallow: {url}")
|
||||
|
||||
resp = await client.get(url)
|
||||
redirects = 0
|
||||
# has_redirect_location = location 헤더 있는 진짜 redirect 만 (httpx 의
|
||||
# is_redirect 는 3xx 전체라 304 등을 redirect 로 오인 — news_collector 동일 함정)
|
||||
while resp.has_redirect_location and redirects < _MAX_REDIRECTS:
|
||||
location = urljoin(str(resp.request.url), resp.headers["location"])
|
||||
try:
|
||||
validate_feed_url(location)
|
||||
except ValueError as e:
|
||||
raise CrawlSkip(f"redirect target 차단: {e}") from e
|
||||
# redirect 도 같은 도메인 연속 요청 — 간격은 lock 보유로 충분 (즉시 1회)
|
||||
resp = await client.get(location)
|
||||
redirects += 1
|
||||
if resp.has_redirect_location:
|
||||
raise CrawlSkip(f"redirect {_MAX_REDIRECTS}회 초과: {url}")
|
||||
except httpx.TimeoutException as e:
|
||||
raise CrawlFetchError(f"timeout: {url}") from e
|
||||
except httpx.HTTPError as e:
|
||||
raise CrawlFetchError(f"네트워크 오류: {url}: {e}") from e
|
||||
finally:
|
||||
_domain_last_request[domain] = time.monotonic()
|
||||
|
||||
if resp.status_code == 429:
|
||||
retry_after = resp.headers.get("retry-after", "")
|
||||
logger.warning("[politeness] 429 %s (Retry-After=%s)", domain, retry_after or "-")
|
||||
raise CrawlBlocked(f"429 rate limited: {url} (Retry-After={retry_after or '-'})")
|
||||
if resp.status_code == 403:
|
||||
raise CrawlBlocked(f"403 forbidden: {url}")
|
||||
if resp.status_code >= 500:
|
||||
raise CrawlFetchError(f"{resp.status_code}: {url}")
|
||||
if resp.status_code >= 400:
|
||||
raise CrawlSkip(f"{resp.status_code}: {url}")
|
||||
|
||||
ct = resp.headers.get("content-type", "").lower()
|
||||
if ct and not any(t in ct for t in content_types):
|
||||
raise CrawlSkip(f"비허용 content-type: {ct}: {url}")
|
||||
if len(resp.content) > _MAX_PAGE_BYTES:
|
||||
raise CrawlSkip(f"크기 초과: {len(resp.content)} bytes: {url}")
|
||||
|
||||
return resp.text, str(resp.request.url)
|
||||
|
||||
|
||||
# ── B-3 구독 세션 fetch (Playwright 격리 컨테이너 경유) ──────────────────────
|
||||
|
||||
async def fetch_page_via_browser(url: str, profile: str) -> tuple[str, str]:
|
||||
"""인증 페이지 1건 — playwright-fetcher 에 위임, politeness 는 사람 속도(30~60s).
|
||||
|
||||
(html_text, final_url) 반환. robots 미적용 — 구독 계약 기반 개인 보관 fetch 로
|
||||
공개 크롤러 규약 대상이 아님 (대신 사람 속도 + 동시 1 + 야간 저빈도가 보호 장치).
|
||||
예외 어휘는 fetch_page 와 동일 (호출측 분기 재사용).
|
||||
"""
|
||||
try:
|
||||
validate_feed_url(url)
|
||||
except ValueError as e:
|
||||
raise CrawlSkip(f"URL 검증 실패: {e}") from e
|
||||
|
||||
domain = _domain_of(url)
|
||||
async with _get_lock(domain):
|
||||
await _respect_domain_rate(domain, _AUTH_DELAY_MIN, _AUTH_DELAY_MAX)
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=_FETCHER_TIMEOUT) as client:
|
||||
resp = await client.post(
|
||||
f"{_FETCHER_URL}/fetch", json={"url": url, "profile": profile}
|
||||
)
|
||||
except httpx.TimeoutException as e:
|
||||
raise CrawlFetchError(f"browser fetch timeout: {url}") from e
|
||||
except httpx.HTTPError as e:
|
||||
raise CrawlFetchError(f"playwright-fetcher 연결 오류: {e}") from e
|
||||
finally:
|
||||
_domain_last_request[domain] = time.monotonic()
|
||||
|
||||
if resp.status_code == 503:
|
||||
# storage_state 부재 — 수동 세션 박제 대기 (호출측 degrade, 재시도 루프 금지)
|
||||
raise CrawlBlocked(f"세션 프로필 부재: {profile}")
|
||||
if resp.status_code != 200:
|
||||
raise CrawlFetchError(f"playwright-fetcher {resp.status_code}: {url}")
|
||||
data = resp.json()
|
||||
html_text = data.get("html", "")
|
||||
if len(html_text.encode("utf-8", errors="replace")) > _MAX_PAGE_BYTES:
|
||||
raise CrawlSkip(f"크기 초과 (browser): {url}")
|
||||
# 안티봇 챌린지 페이지(DataDome 등) 식별 — 본문 길이 게이트(200자)를 통과하는
|
||||
# 짧은 챌린지 HTML 이 기사 본문으로 승격되는 silent corruption 차단. 헤드리스 탐지라
|
||||
# 재시도 무의미 → CrawlBlocked(=degrade, RSS 요약 유지). 마커는 보수적으로 좁게.
|
||||
if any(m in html_text for m in _CHALLENGE_MARKERS):
|
||||
raise CrawlBlocked(f"안티봇 챌린지 페이지(headless 차단): {url}")
|
||||
return html_text, data.get("final_url", url)
|
||||
|
||||
|
||||
async def probe_session(
|
||||
profile: str, probe_url: str, min_body_chars: int, paywall_markers: list[str]
|
||||
) -> dict:
|
||||
"""내용 기반 세션 probe (B-3 ②) — {'ok': bool, 'reason': str|None, 'body_chars': int}.
|
||||
|
||||
실패를 예외가 아닌 값으로 반환 — 호출측이 source_health 에 기록하고 degrade 분기.
|
||||
probe 도 실제 publisher fetch 라 동일 도메인 lock + 사람 속도 적용.
|
||||
"""
|
||||
domain = _domain_of(probe_url)
|
||||
async with _get_lock(domain):
|
||||
await _respect_domain_rate(domain, _AUTH_DELAY_MIN, _AUTH_DELAY_MAX)
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=_FETCHER_TIMEOUT) as client:
|
||||
resp = await client.post(
|
||||
f"{_FETCHER_URL}/probe",
|
||||
json={
|
||||
"profile": profile,
|
||||
"probe_url": probe_url,
|
||||
"min_body_chars": min_body_chars,
|
||||
"paywall_markers": paywall_markers,
|
||||
},
|
||||
)
|
||||
except httpx.HTTPError as e:
|
||||
return {"ok": False, "reason": f"fetcher 연결 오류: {e}", "body_chars": 0}
|
||||
finally:
|
||||
_domain_last_request[domain] = time.monotonic()
|
||||
|
||||
if resp.status_code == 503:
|
||||
return {"ok": False, "reason": f"세션 프로필 부재: {profile}", "body_chars": 0}
|
||||
if resp.status_code != 200:
|
||||
return {"ok": False, "reason": f"fetcher {resp.status_code}", "body_chars": 0}
|
||||
return resp.json()
|
||||
@@ -54,6 +54,8 @@ async def lifespan(app: FastAPI):
|
||||
from workers.law_monitor import run as law_monitor_run
|
||||
from workers.mailplus_archive import run as mailplus_run
|
||||
from workers.news_collector import run as news_collector_run
|
||||
from workers.fulltext_worker import reconcile_unresolved as fulltext_reconcile_run
|
||||
from workers.kosha_collector import run as kosha_collector_run
|
||||
from workers.queue_consumer import consume_queue, consume_markdown_queue
|
||||
from workers.study_queue_consumer import consume_study_queue
|
||||
from workers.study_session_queue_consumer import consume_study_session_queue
|
||||
@@ -121,9 +123,14 @@ async def lifespan(app: FastAPI):
|
||||
# 이드 W3-2: 공부중 토픽 약점 derived 스냅샷 (nightly 04:30 KST, LLM 0). study_diagnosis 표면 source.
|
||||
scheduler.add_job(study_weakness_run, CronTrigger(hour=4, minute=30, timezone=KST), id="study_weakness")
|
||||
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
|
||||
# crawl-24x7 A-2 안전망: fulltext 영구 실패(3회 소진) 문서를 RSS 요약 기준으로
|
||||
# 후속 enqueue (silent skip 누적 방지). 03:40 = dedup_reconcile(03:30) 직후 비충돌 슬롯.
|
||||
scheduler.add_job(fulltext_reconcile_run, CronTrigger(hour=3, minute=40, timezone=KST), id="fulltext_reconcile")
|
||||
# plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산.
|
||||
# soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌).
|
||||
scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile")
|
||||
# crawl-24x7 C-2: KOSHA 재해사례 diff + GUIDE 점진 백필 (daily, 새벽 잡들과 비충돌 슬롯).
|
||||
scheduler.add_job(kosha_collector_run, CronTrigger(hour=6, minute=40, timezone=KST), id="kosha_collector")
|
||||
scheduler.start()
|
||||
|
||||
# Phase 2.1 (async 구조): QueryAnalyzer prewarm.
|
||||
|
||||
@@ -118,7 +118,7 @@ class Document(Base):
|
||||
source_channel: Mapped[str | None] = mapped_column(
|
||||
Enum("law_monitor", "devonagent", "email", "web_clip",
|
||||
"tksafety", "inbox_route", "manual", "drive_sync", "news", "memo",
|
||||
"voice", "hermes",
|
||||
"voice", "hermes", "crawl",
|
||||
name="source_channel")
|
||||
)
|
||||
# 외부 채널 (Hermes Discord 등) 의 channel/user/message_id/timestamp 메타.
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import Boolean, DateTime, String, Text
|
||||
from sqlalchemy import Boolean, DateTime, Enum, Integer, String, Text
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from core.database import Base
|
||||
@@ -23,3 +24,32 @@ class NewsSource(Base):
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), default=datetime.now
|
||||
)
|
||||
|
||||
# ── A-3 (plan crawl-24x7-1) 레지스트리 증축 — migration 319 ──
|
||||
# fetch_method: rss / rss+page / sitemap+page / page / api / signal-only
|
||||
fetch_method: Mapped[str] = mapped_column(String(20), default="rss")
|
||||
# fulltext_policy: none(현행) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
|
||||
fulltext_policy: Mapped[str] = mapped_column(String(20), default="none")
|
||||
# NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터 슬롯)
|
||||
auth_profile: Mapped[str | None] = mapped_column(String(50))
|
||||
# 소스별 차등 폴링 (NULL=전역 6h 사이클)
|
||||
poll_interval_minutes: Mapped[int | None] = mapped_column(Integer)
|
||||
# 조건부 GET 워터마크 — 서버가 준 값 그대로 저장·재전송 (A-1)
|
||||
etag: Mapped[str | None] = mapped_column(Text)
|
||||
last_modified: Mapped[str | None] = mapped_column(Text)
|
||||
# CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행 (A-1)
|
||||
feed_content_hash: Mapped[str | None] = mapped_column(String(64))
|
||||
# 추출 실패 잦은 소스의 site-specific CSS selector (A-2)
|
||||
selector_override: Mapped[dict | None] = mapped_column(JSONB)
|
||||
# rdf / table-strip / gn-redirect / skip-video 등 파서 특이 케이스 (B-5)
|
||||
parser_quirk: Mapped[str | None] = mapped_column(String(30))
|
||||
# 채널 — 'news'(다이제스트/브리핑 대상) / 'crawl'(도메인 재료, 0-5 (a)) — migration 324.
|
||||
# documents.source_channel 로 전파, crawl 채널은 embed/chunk 30일 게이트 미적용.
|
||||
# documents 와 동일 PG enum 재사용 (Document 모델과 값 목록 동기 유지).
|
||||
source_channel: Mapped[str] = mapped_column(
|
||||
Enum("law_monitor", "devonagent", "email", "web_clip",
|
||||
"tksafety", "inbox_route", "manual", "drive_sync", "news", "memo",
|
||||
"voice", "hermes", "crawl",
|
||||
name="source_channel"),
|
||||
default="news",
|
||||
)
|
||||
|
||||
+2
-1
@@ -18,10 +18,11 @@ class ProcessingQueue(Base):
|
||||
stage: Mapped[str] = mapped_column(
|
||||
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
|
||||
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
|
||||
# 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격.
|
||||
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
|
||||
Enum(
|
||||
"extract", "classify", "summarize", "embed", "chunk", "preview",
|
||||
"stt", "thumbnail", "deep_summary", "markdown",
|
||||
"stt", "thumbnail", "deep_summary", "markdown", "fulltext",
|
||||
name="process_stage",
|
||||
create_type=False,
|
||||
),
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
"""source_health 테이블 ORM (A-5, plan crawl-24x7-1)
|
||||
|
||||
news_sources 와 1:1. 소스별 fetch 성공/실패 기록 + circuit breaker 상태.
|
||||
silent skip 누적 방지의 가시성 기반 — A-8 헬스 패널이 읽는다.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, String, Text
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from core.database import Base
|
||||
|
||||
|
||||
class SourceHealth(Base):
|
||||
__tablename__ = "source_health"
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
source_id: Mapped[int] = mapped_column(
|
||||
Integer, ForeignKey("news_sources.id", ondelete="CASCADE"), nullable=False
|
||||
)
|
||||
consecutive_failures: Mapped[int] = mapped_column(Integer, default=0)
|
||||
total_fetches: Mapped[int] = mapped_column(BigInteger, default=0)
|
||||
total_failures: Mapped[int] = mapped_column(BigInteger, default=0)
|
||||
last_success_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
||||
last_error: Mapped[str | None] = mapped_column(Text)
|
||||
last_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
||||
last_fetch_items: Mapped[int | None] = mapped_column(Integer)
|
||||
# 200 인데 entries 0 인 연속 fetch 횟수 (304/해시동일은 미집계 — 피드 부패 신호 전용)
|
||||
empty_streak: Mapped[int] = mapped_column(Integer, default=0)
|
||||
# closed(정상) / open(연속 실패 → 지수 backoff) / disabled(임계 초과, 수동 복구 대상)
|
||||
circuit_state: Mapped[str] = mapped_column(String(10), default="closed")
|
||||
circuit_opened_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), default=datetime.now
|
||||
)
|
||||
|
||||
# ── B-3 구독 세션 상태 계약 — migration 325 ──
|
||||
# 쓰기 1종 플래그: A-8 버튼이 기록만, 어댑터가 소비(수동 half-open).
|
||||
# 소비 위치 = open-스킵 분기보다 앞 (r5 함정 고정 — 데드 버튼 방지).
|
||||
relogin_requested: Mapped[bool] = mapped_column(Boolean, default=False)
|
||||
# 내용 기반 probe 결과 (시간 기반 만료 판정 금지 — 페이월 안내문 silent corruption 차단)
|
||||
last_probe_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
||||
last_probe_ok: Mapped[bool | None] = mapped_column(Boolean)
|
||||
@@ -17,10 +17,13 @@ python-multipart>=0.0.9
|
||||
jinja2>=3.1.0
|
||||
feedparser>=6.0.0
|
||||
pymupdf>=1.24.0
|
||||
# Web/Blog ingest (devonagent 트랙) — HTML 본문 정화 4-tier fallback
|
||||
trafilatura>=1.12.0
|
||||
# Web/Blog ingest (devonagent 트랙) + 뉴스 fulltext 승격 (crawl-24x7 A-2) — 4-tier fallback.
|
||||
# trafilatura 는 단일 메인테이너 리스크로 exact pin (A-2 결정).
|
||||
trafilatura==2.1.0
|
||||
readability-lxml>=0.8.1
|
||||
markdownify>=0.13.1
|
||||
# tier-4 (bs4) 가 직접 import — 전이 의존 가정 제거 (crawl-24x7 A-2)
|
||||
beautifulsoup4>=4.12.0
|
||||
# office OOXML(docx/xlsx/pptx) → md (plan ds-s1-backend-1 C-1).
|
||||
# 정확한 핀은 E-1 markitdown OOXML PoC(devsbx/버전핀 컨텍스트)에서 확정.
|
||||
markitdown[docx,xlsx,pptx]>=0.1.0
|
||||
|
||||
@@ -0,0 +1,320 @@
|
||||
"""fulltext 승격 워커 (A-2 + A-7, plan crawl-24x7-1)
|
||||
|
||||
news_collector 가 fulltext_policy='page' 소스의 기사에 enqueue 한 'fulltext' stage 를 소비:
|
||||
기사 페이지 politeness fetch (A-4) → 원본 HTML NAS gzip 보존 (A-7)
|
||||
→ extract_worker 4-tier 재사용 (tier 2 sibling .md 는 디스크 원본이 없어 비적용)
|
||||
→ extracted_text/md_content 승격 → summarize + (30일 게이트) embed/chunk enqueue.
|
||||
|
||||
실패 처리 (큐 어휘 = DB enum, 분기만 워커):
|
||||
- 일시 오류 (5xx/timeout) : raise → 큐 재시도 (max_attempts 3)
|
||||
- 차단/비대상 (403/429/robots/비HTML/추출부족): RSS 요약으로 격하(degrade) 후 완료
|
||||
→ summarize/embed/chunk enqueue 보장 (기사 유실 0). 격하 사유는 extract_meta.fulltext 에 기록.
|
||||
- 영구 실패 (3회 소진) : 야간 reconcile_unresolved() 가 summarize 안전망 enqueue
|
||||
([[feedback_silent_skip_accumulation]] — 조건부 skip 이 영구 침묵으로 누적되지 않게).
|
||||
|
||||
승격 게이트: 전 tier 공통 본문 >= 200자 (devonagent 와 달리 tier 4 도 게이트 적용 —
|
||||
페이월/오류 페이지의 nav 찌꺼기를 본문으로 승격하느니 RSS 요약 격하가 낫다).
|
||||
"""
|
||||
|
||||
import gzip
|
||||
import hashlib
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy import exists, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import aliased
|
||||
|
||||
from core.config import settings
|
||||
from core.crawl_politeness import (
|
||||
CrawlBlocked,
|
||||
CrawlFetchError,
|
||||
CrawlSkip,
|
||||
fetch_page,
|
||||
fetch_page_via_browser,
|
||||
probe_session,
|
||||
)
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.news_source import NewsSource
|
||||
from models.queue import ProcessingQueue, enqueue_stage
|
||||
from workers.extract_worker import (
|
||||
_WEB_MIN_BODY_LEN,
|
||||
_extract_web_with_bs4,
|
||||
_extract_web_with_readability,
|
||||
_extract_web_with_trafilatura,
|
||||
)
|
||||
|
||||
logger = setup_logger("fulltext_worker")
|
||||
|
||||
# 한국 기사 푸터 1층 후처리 (A-2) — 보수적으로 라인 단위만 제거
|
||||
_FOOTER_PATTERNS = [
|
||||
re.compile(r"^.{0,120}(무단\s*전재|무단\s*복제|재배포\s*금지|저작권자\s*[ⓒ©(]).*$", re.M),
|
||||
re.compile(r"^[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\s*$", re.M), # 단독 이메일 라인
|
||||
re.compile(r"^\s*\S{2,4}\s*기자\s*$", re.M), # 단독 '◯◯◯ 기자' 라인
|
||||
]
|
||||
|
||||
|
||||
def _strip_article_footer(body: str) -> str:
|
||||
for pat in _FOOTER_PATTERNS:
|
||||
body = pat.sub("", body)
|
||||
return re.sub(r"\n{3,}", "\n\n", body).strip()
|
||||
|
||||
|
||||
def _extract_body(html_text: str) -> tuple[str, str | None, str | None]:
|
||||
"""(body, engine, engine_version). 전 tier >= 200자 게이트, 미달이면 ("", None, None)."""
|
||||
body, ver = _extract_web_with_trafilatura(html_text)
|
||||
if body and len(body) >= _WEB_MIN_BODY_LEN:
|
||||
return body, "trafilatura", ver
|
||||
body, ver = _extract_web_with_readability(html_text)
|
||||
if body and len(body) >= _WEB_MIN_BODY_LEN:
|
||||
return body, "readability", ver
|
||||
body, ver = _extract_web_with_bs4(html_text)
|
||||
if body and len(body) >= _WEB_MIN_BODY_LEN:
|
||||
return body, "bs4_text", ver
|
||||
return "", None, None
|
||||
|
||||
|
||||
def _raw_html_path(source_id: int | None, file_hash: str, now: datetime) -> Path:
|
||||
"""A-7 원본 보존 경로 — NAS 본진. 한글 디렉토리의 NFC/NFD 비대칭을 피해 source_id 사용.
|
||||
|
||||
file_hash 는 DB 컬럼이 character(64) 라 32자 해시가 공백 패딩되어 돌아옴 — strip 필수
|
||||
(미적용 시 NAS 파일명에 공백 32개 = 쉘/rsync 함정).
|
||||
"""
|
||||
src_dir = f"src_{source_id}" if source_id is not None else "src_unknown"
|
||||
return (
|
||||
Path(settings.nas_mount_path) / "crawl_raw" / src_dir
|
||||
/ now.strftime("%Y-%m") / f"{file_hash.strip()}.html.gz"
|
||||
)
|
||||
|
||||
|
||||
def _save_raw_html(path: Path, html_text: str) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with gzip.open(path, "wb") as f:
|
||||
f.write(html_text.encode("utf-8", errors="replace"))
|
||||
|
||||
|
||||
async def _enqueue_downstream(session: AsyncSession, doc: Document) -> None:
|
||||
"""승격/격하 공통 후속 — summarize 무조건 + 30일 게이트 통과 시 embed/chunk."""
|
||||
await enqueue_stage(session, doc.id, "summarize")
|
||||
published_raw = (doc.extract_meta or {}).get("published_at")
|
||||
if doc.source_channel == "crawl":
|
||||
# 도메인 재료 코퍼스 — 발행일 무관 전량 색인 (30일 게이트는 뉴스 전용)
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
return
|
||||
days_old = 0
|
||||
if published_raw:
|
||||
try:
|
||||
pub_dt = datetime.fromisoformat(published_raw)
|
||||
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
||||
except ValueError:
|
||||
days_old = 0 # 파싱 불가 = 신규 취급 (수집 시점 기본과 동일)
|
||||
if days_old <= 30:
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
|
||||
|
||||
def _set_fulltext_meta(doc: Document, **fields) -> None:
|
||||
"""extract_meta.fulltext 갱신 — JSONB 변경 감지를 위해 dict 재할당."""
|
||||
meta = dict(doc.extract_meta or {})
|
||||
meta["fulltext"] = {**meta.get("fulltext", {}), **fields}
|
||||
doc.extract_meta = meta
|
||||
|
||||
|
||||
_PROBE_TTL_SECONDS = 6 * 3600 # probe 유효 시간 — 만료 시 배치 경계에서 재검증
|
||||
|
||||
|
||||
async def _auth_session_ready(session: AsyncSession, source: NewsSource) -> tuple[bool, str]:
|
||||
"""B-3 ② 내용 기반 probe 게이트 + relogin_requested 소비 (수동 half-open).
|
||||
|
||||
플래그 소비는 '불가용 스킵' 분기보다 앞 — 어댑터 틱마다 도달 (r5 데드 버튼 함정 고정).
|
||||
probe 실패 상태에서는 auth fetch 0회 (자동 재시도 루프 = 계정 잠금 직행 — B-3 ③).
|
||||
복구 경로 = storage_state 갱신 후 relogin_requested 플래그 set (수동).
|
||||
probe 설정은 source.selector_override JSONB: probe_url / min_body_chars / paywall_markers.
|
||||
"""
|
||||
from workers.news_collector import _get_or_create_health
|
||||
|
||||
health = await _get_or_create_health(session, source.id)
|
||||
now = datetime.now(timezone.utc)
|
||||
cfg = source.selector_override or {}
|
||||
probe_url = cfg.get("probe_url")
|
||||
|
||||
force = False
|
||||
if health.relogin_requested:
|
||||
health.relogin_requested = False # 소비 = 1회 half-open 시도
|
||||
health.updated_at = now
|
||||
force = True
|
||||
logger.info(f"[fulltext/auth] {source.name} relogin_requested 소비 — half-open probe")
|
||||
|
||||
if not force:
|
||||
if health.last_probe_ok is False:
|
||||
return False, "probe 실패 상태 (storage_state 갱신 + relogin_requested 대기)"
|
||||
if (
|
||||
health.last_probe_ok
|
||||
and health.last_probe_at
|
||||
and (now - health.last_probe_at).total_seconds() < _PROBE_TTL_SECONDS
|
||||
):
|
||||
return True, ""
|
||||
|
||||
if not probe_url:
|
||||
return False, "selector_override.probe_url 미설정"
|
||||
|
||||
result = await probe_session(
|
||||
source.auth_profile,
|
||||
probe_url,
|
||||
int(cfg.get("min_body_chars", 800)),
|
||||
list(cfg.get("paywall_markers", [])),
|
||||
)
|
||||
health.last_probe_at = now
|
||||
health.last_probe_ok = bool(result.get("ok"))
|
||||
health.updated_at = now
|
||||
if not health.last_probe_ok:
|
||||
logger.warning(f"[fulltext/auth] {source.name} probe 실패: {result.get('reason')}")
|
||||
return False, str(result.get("reason"))
|
||||
logger.info(f"[fulltext/auth] {source.name} probe OK ({result.get('body_chars')}자)")
|
||||
return True, ""
|
||||
|
||||
|
||||
async def _degrade(session: AsyncSession, doc: Document, reason: str) -> None:
|
||||
"""본문 승격 실패 — RSS 요약 그대로 후속 단계 진행 (기사 유실 0)."""
|
||||
_set_fulltext_meta(
|
||||
doc, status="degraded", reason=reason[:300],
|
||||
resolved_at=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
await _enqueue_downstream(session, doc)
|
||||
logger.warning(f"[fulltext] doc={doc.id} 격하(RSS 요약 유지): {reason}")
|
||||
|
||||
|
||||
async def process(document_id: int, session: AsyncSession) -> None:
|
||||
"""기사 1건 풀텍스트 승격. queue_consumer 컨벤션 시그니처 (커밋은 consumer 가)."""
|
||||
doc = await session.get(Document, document_id)
|
||||
if not doc:
|
||||
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
|
||||
if not doc.edit_url:
|
||||
await _degrade(session, doc, "edit_url 없음")
|
||||
return
|
||||
|
||||
meta = doc.extract_meta or {}
|
||||
source_id = meta.get("source_id")
|
||||
|
||||
# B-3: 구독 소스(auth_profile)는 Playwright 세션 fetch — probe 게이트 선행
|
||||
source = await session.get(NewsSource, source_id) if source_id else None
|
||||
auth_profile = source.auth_profile if source is not None else None
|
||||
|
||||
if auth_profile:
|
||||
ready, why = await _auth_session_ready(session, source)
|
||||
if not ready:
|
||||
await _degrade(session, doc, f"구독 세션 불가용: {why}")
|
||||
return
|
||||
|
||||
try:
|
||||
if auth_profile:
|
||||
html_text, final_url = await fetch_page_via_browser(doc.edit_url, auth_profile)
|
||||
else:
|
||||
html_text, final_url = await fetch_page(doc.edit_url)
|
||||
except (CrawlBlocked, CrawlSkip) as e:
|
||||
await _degrade(session, doc, f"{type(e).__name__}: {e}")
|
||||
return
|
||||
except CrawlFetchError:
|
||||
raise # 일시 오류 — 큐 재시도
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# A-7: 원본 HTML 보존 (추출기 교체 시 전체 재추출 가능 상태 유지)
|
||||
raw_path = _raw_html_path(source_id, doc.file_hash, now)
|
||||
try:
|
||||
_save_raw_html(raw_path, html_text)
|
||||
raw_saved = True
|
||||
except OSError as e:
|
||||
# NAS 일시 장애 시 보존만 누락하고 승격은 진행 — 사유 기록 (silent 누락 회피)
|
||||
raw_saved = False
|
||||
logger.error(f"[fulltext] doc={doc.id} 원본 보존 실패 (승격은 진행): {e}")
|
||||
|
||||
body, engine, engine_ver = _extract_body(html_text)
|
||||
if not engine:
|
||||
await _degrade(session, doc, f"추출 실패 (전 tier < {_WEB_MIN_BODY_LEN}자)")
|
||||
return
|
||||
|
||||
clean_body = _strip_article_footer(body.replace("\x00", ""))
|
||||
if len(clean_body) < _WEB_MIN_BODY_LEN:
|
||||
await _degrade(session, doc, "푸터 제거 후 본문 부족")
|
||||
return
|
||||
|
||||
# B-3: 추출 결과도 페이월 마커로 게이트 — probe 통과 후 만료된 세션의
|
||||
# '페이월 안내문' 본문 승격(silent corruption) 차단 + 즉시 probe 상태 강등
|
||||
if auth_profile:
|
||||
from workers.news_collector import _get_or_create_health
|
||||
|
||||
markers = (source.selector_override or {}).get("paywall_markers", [])
|
||||
hit = next((m for m in markers if m and m.lower() in clean_body.lower()), None)
|
||||
if hit:
|
||||
health = await _get_or_create_health(session, source.id)
|
||||
health.last_probe_ok = False
|
||||
health.updated_at = datetime.now(timezone.utc)
|
||||
await _degrade(session, doc, f"본문 페이월 마커 검출({hit}) — 세션 손상 의심")
|
||||
return
|
||||
|
||||
title = doc.title or ""
|
||||
doc.extracted_text = f"{title}\n\n{clean_body}" if title else clean_body
|
||||
doc.extracted_at = now
|
||||
doc.extractor_version = f"rss+page@{engine}"
|
||||
doc.md_content = clean_body
|
||||
doc.md_status = "success"
|
||||
doc.md_extraction_engine = engine
|
||||
doc.md_extraction_engine_version = engine_ver
|
||||
doc.md_format_version = "1.0"
|
||||
doc.md_generated_at = now
|
||||
doc.md_source_hash = hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest()
|
||||
doc.md_content_hash = hashlib.sha256(clean_body.encode("utf-8")).hexdigest()
|
||||
doc.md_extraction_error = None # 수집 시점의 '변환 비대상' 마커 해제
|
||||
doc.content_origin = "extracted"
|
||||
doc.file_size = len(doc.extracted_text.encode())
|
||||
_set_fulltext_meta(
|
||||
doc, status="promoted", engine=engine,
|
||||
raw_html_path=str(raw_path) if raw_saved else None,
|
||||
final_url=final_url, body_chars=len(clean_body),
|
||||
resolved_at=now.isoformat(),
|
||||
)
|
||||
|
||||
await _enqueue_downstream(session, doc)
|
||||
logger.info(
|
||||
f"[fulltext/{engine}] doc={doc.id} {len(clean_body)}자 승격 "
|
||||
f"(raw={'saved' if raw_saved else 'MISSING'})"
|
||||
)
|
||||
|
||||
|
||||
async def reconcile_unresolved() -> None:
|
||||
"""안전망 (야간 1회): fulltext 영구 실패(3회 소진)로 summarize 가 영영 안 잡힌
|
||||
뉴스 문서에 RSS 요약 기준 후속 단계를 enqueue. 멱등 — enqueue 후엔 조건 불일치."""
|
||||
async with async_session() as session:
|
||||
# 외부 쿼리 FROM 에 ProcessingQueue 가 이미 있어 alias 없이는 auto-correlation 이
|
||||
# 서브쿼리 FROM 을 전부 제거 → InvalidRequestError (queue_consumer.reset_stale_items 패턴)
|
||||
pq = aliased(ProcessingQueue)
|
||||
summarize_q = (
|
||||
select(pq.id)
|
||||
.where(
|
||||
pq.document_id == Document.id,
|
||||
pq.stage == "summarize",
|
||||
)
|
||||
)
|
||||
result = await session.execute(
|
||||
select(Document)
|
||||
.join(ProcessingQueue, ProcessingQueue.document_id == Document.id)
|
||||
.where(
|
||||
ProcessingQueue.stage == "fulltext",
|
||||
ProcessingQueue.status == "failed",
|
||||
Document.source_channel == "news",
|
||||
~exists(summarize_q),
|
||||
)
|
||||
.limit(200)
|
||||
)
|
||||
docs = result.scalars().unique().all()
|
||||
for doc in docs:
|
||||
_set_fulltext_meta(doc, status="failed_reconciled")
|
||||
await _enqueue_downstream(session, doc)
|
||||
if docs:
|
||||
await session.commit()
|
||||
logger.warning(f"[fulltext] reconcile: 영구 실패 {len(docs)}건 RSS 요약으로 후속 enqueue")
|
||||
@@ -0,0 +1,351 @@
|
||||
"""C-2 KOSHA Open API 수집 워커 (plan crawl-24x7-1).
|
||||
|
||||
3 API (2026-06-10 실키 live 검증 + fixture 박제 — tests/fixtures/kosha_*_response.json):
|
||||
재해사례 게시판: GET /B552468/disaster_api02/getdisaster_api02 callApiId=1060
|
||||
재해사례 첨부: GET /B552468/disaster_attach_api02/Disaster_attach_api02 callApiId=1070
|
||||
KOSHA GUIDE: GET /B552468/koshaguide/getKoshaGuide callApiId=1050
|
||||
|
||||
daily 스케줄 1회 (main.py):
|
||||
재해사례 = 최근 페이지만 diff (boardno dedup) — 사례 본문 Document(텍스트 네이티브)
|
||||
+ 첨부 PDF/HWP 다운로드 → /documents/crawl_raw/kosha/{boardno}/ 저장
|
||||
→ 파일 Document + extract enqueue (kordoc HWP/PDF 기존 파이프라인 재사용).
|
||||
GUIDE = 전체 레지스트리 메타 diff (1039건, 100/page = 11 call) → 신규/개정만,
|
||||
일일 ingest cap(기본 25) = backlog 자동 점진 백필(~6주) + 부하 평탄화.
|
||||
cap 으로 미처리 잔량은 매회 로그 (silent cap 금지).
|
||||
|
||||
키: KOSHA_API_KEY (credentials.env) — 공공데이터포털 '인코딩' 키를 그대로 저장.
|
||||
httpx params= 로 넘기면 % 가 재인코딩되므로 반드시 URL 문자열에 직접 결합.
|
||||
개정 감지: GUIDE dedup 키 = 규정번호+공표일자 — 같은 번호의 새 공표일자 = 신규 문서로 적재.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
from sqlalchemy import select
|
||||
|
||||
from core.config import settings
|
||||
from core.crawl_politeness import CRAWL_UA
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.news_source import NewsSource
|
||||
from models.queue import enqueue_stage
|
||||
from workers.news_collector import (
|
||||
FeedError,
|
||||
_get_or_create_health,
|
||||
_record_failure,
|
||||
_record_success,
|
||||
)
|
||||
|
||||
logger = setup_logger("kosha_collector")
|
||||
|
||||
_BASE = "https://apis.data.go.kr/B552468"
|
||||
_BOARD_EP = f"{_BASE}/disaster_api02/getdisaster_api02"
|
||||
_ATTACH_EP = f"{_BASE}/disaster_attach_api02/Disaster_attach_api02"
|
||||
_GUIDE_EP = f"{_BASE}/koshaguide/getKoshaGuide"
|
||||
|
||||
_CASE_SOURCE = "KOSHA 재해사례"
|
||||
_GUIDE_SOURCE = "KOSHA GUIDE"
|
||||
|
||||
_CASE_PAGES = 2 # daily diff 범위 (30×2 = 최근 60건 — 등록일 역순 API)
|
||||
_CASE_ROWS = 30
|
||||
_GUIDE_ROWS = 100
|
||||
_GUIDE_DAILY_CAP = int(os.getenv("KOSHA_GUIDE_DAILY_CAP", "25"))
|
||||
_MAX_FILE_BYTES = 50 * 1024 * 1024
|
||||
_DOWNLOAD_DELAY = (2.0, 5.0) # portal.kosha.or.kr 파일서버 — 연속 다운로드 간격
|
||||
|
||||
|
||||
def _api_key() -> str:
|
||||
key = os.getenv("KOSHA_API_KEY", "")
|
||||
if not key:
|
||||
raise FeedError("KOSHA_API_KEY 미설정 — KOSHA 수집 불가")
|
||||
return key
|
||||
|
||||
|
||||
async def _api_get(url: str) -> dict:
|
||||
"""공통 GET — 게이트웨이/제공자 이중 에러 체계 검사."""
|
||||
async with httpx.AsyncClient(timeout=25) as client:
|
||||
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
|
||||
if resp.status_code != 200:
|
||||
raise FeedError(f"KOSHA API {resp.status_code} @ {url.split('?')[0]}")
|
||||
try:
|
||||
payload = resp.json()
|
||||
except ValueError as e:
|
||||
# 게이트웨이 에러는 XML/plain 으로 옴 (SERVICE_KEY_IS_NOT_REGISTERED 등)
|
||||
raise FeedError(f"KOSHA API 비-JSON 응답: {resp.text[:120]}") from e
|
||||
code = (payload.get("header") or {}).get("resultCode")
|
||||
if code != "00":
|
||||
raise FeedError(f"KOSHA API resultCode={code}: {(payload.get('header') or {}).get('resultMsg')}")
|
||||
return payload
|
||||
|
||||
|
||||
def _items(payload: dict) -> list[dict]:
|
||||
"""body.items.item — 단건이면 dict 로 오는 data.go.kr 관행 방어."""
|
||||
item = ((payload.get("body") or {}).get("items") or {}).get("item")
|
||||
if item is None:
|
||||
return []
|
||||
return [item] if isinstance(item, dict) else list(item)
|
||||
|
||||
|
||||
def _safe_filename(name: str) -> str:
|
||||
"""NAS 파일명 정화 — 경로분리자/제어문자/공백연쇄 제거 (쉘 함정 회피)."""
|
||||
name = re.sub(r"[/\\\x00-\x1f]", "_", name).strip()
|
||||
name = re.sub(r"\s+", " ", name)
|
||||
return name[:140] or "unnamed"
|
||||
|
||||
|
||||
async def _download(url: str, dest: Path) -> int:
|
||||
"""첨부/규정 파일 다운로드 — 크기 cap + 디렉토리 생성 + 연속 간격."""
|
||||
await asyncio.sleep(random.uniform(*_DOWNLOAD_DELAY))
|
||||
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
|
||||
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
|
||||
if resp.status_code != 200:
|
||||
raise FeedError(f"파일 다운로드 {resp.status_code}: {url}")
|
||||
if len(resp.content) > _MAX_FILE_BYTES:
|
||||
raise FeedError(f"파일 크기 초과 ({len(resp.content)} bytes): {url}")
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
dest.write_bytes(resp.content)
|
||||
return len(resp.content)
|
||||
|
||||
|
||||
async def _get_or_create_source(session, name: str, feed_url: str) -> NewsSource:
|
||||
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
|
||||
source = result.scalars().first()
|
||||
if source is None:
|
||||
source = NewsSource(
|
||||
name=name, feed_url=feed_url, feed_type="rss", fetch_method="api",
|
||||
fulltext_policy="none", source_channel="crawl", category="Safety",
|
||||
language="ko", country="KR",
|
||||
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 daily 폴링
|
||||
)
|
||||
session.add(source)
|
||||
await session.flush()
|
||||
return source
|
||||
|
||||
|
||||
async def _ingest_attachment(session, boardno: str, filenm: str, filepath: str) -> bool:
|
||||
"""첨부 1건 → NAS 저장 + 파일 Document + extract enqueue. 반환 = 신규 여부."""
|
||||
safe = _safe_filename(filenm)
|
||||
rel_path = f"crawl_raw/kosha/{boardno}/{safe}"
|
||||
existing = await session.execute(
|
||||
select(Document).where(Document.file_path == rel_path).limit(1)
|
||||
)
|
||||
if existing.scalars().first():
|
||||
return False
|
||||
|
||||
dest = Path(settings.nas_mount_path) / rel_path
|
||||
size = await _download(filepath, dest)
|
||||
ext = (safe.rsplit(".", 1)[-1].lower() if "." in safe else "bin")[:10]
|
||||
|
||||
doc = Document(
|
||||
file_path=rel_path,
|
||||
file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(),
|
||||
file_format=ext,
|
||||
file_size=size,
|
||||
file_type="immutable",
|
||||
title=safe.rsplit(".", 1)[0],
|
||||
source_channel="crawl",
|
||||
data_origin="external",
|
||||
import_source="kosha_api",
|
||||
edit_url=filepath,
|
||||
ai_tags=["Safety/KOSHA재해사례/첨부"],
|
||||
extract_meta={"kosha": {"boardno": boardno, "kind": "case_attachment"}},
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
# extract → (crawl override) classify → embed/chunk — 기존 파일 파이프라인 재사용
|
||||
await enqueue_stage(session, doc.id, "extract")
|
||||
logger.info(f"[kosha] 첨부 ingest: {rel_path} ({size} bytes)")
|
||||
return True
|
||||
|
||||
|
||||
async def collect_disaster_cases(session) -> int:
|
||||
"""재해사례 daily diff — 최근 _CASE_PAGES 페이지, boardno dedup."""
|
||||
key = _api_key()
|
||||
source = await _get_or_create_source(session, _CASE_SOURCE, _BOARD_EP)
|
||||
new_count = 0
|
||||
|
||||
for page in range(1, _CASE_PAGES + 1):
|
||||
payload = await _api_get(
|
||||
f"{_BOARD_EP}?serviceKey={key}&callApiId=1060&pageNo={page}&numOfRows={_CASE_ROWS}"
|
||||
)
|
||||
items = _items(payload)
|
||||
if not items:
|
||||
break
|
||||
page_all_dup = True
|
||||
for item in items:
|
||||
boardno = str(item.get("boardno") or "").strip()
|
||||
title = (item.get("keyword") or "").strip()
|
||||
if not boardno or not title:
|
||||
continue
|
||||
fhash = hashlib.sha256(f"kosha-case|{boardno}".encode()).hexdigest()[:32]
|
||||
existing = await session.execute(
|
||||
select(Document).where(Document.file_hash == fhash).limit(1)
|
||||
)
|
||||
if existing.scalars().first():
|
||||
continue
|
||||
page_all_dup = False
|
||||
|
||||
contents = (item.get("contents") or "").strip()
|
||||
business = (item.get("business") or "").strip()
|
||||
now = datetime.now(timezone.utc)
|
||||
doc = Document(
|
||||
file_path=f"crawl/{_CASE_SOURCE}/{boardno}",
|
||||
file_hash=fhash,
|
||||
file_format="article",
|
||||
file_size=len(contents.encode()),
|
||||
file_type="note",
|
||||
title=title,
|
||||
extracted_text=f"{title}\n\n[{business}]\n{contents}",
|
||||
extracted_at=now,
|
||||
extractor_version="kosha_api",
|
||||
md_status="skipped",
|
||||
md_extraction_error="kosha case: 텍스트 네이티브, markdown 변환 비대상",
|
||||
source_channel="crawl",
|
||||
data_origin="external",
|
||||
review_status="approved",
|
||||
ai_domain="Safety",
|
||||
ai_sub_group=_CASE_SOURCE,
|
||||
ai_tags=[f"Safety/KOSHA재해사례/{business or '기타'}"],
|
||||
extract_meta={
|
||||
"source_id": source.id,
|
||||
"source_name": _CASE_SOURCE,
|
||||
"published_at": None,
|
||||
"kosha": {"boardno": boardno, "business": business,
|
||||
"atcflcnt": item.get("atcflcnt")},
|
||||
},
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
await enqueue_stage(session, doc.id, "summarize")
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
new_count += 1
|
||||
|
||||
# 첨부 (PDF/HWP) — 본문보다 정보량 큰 정식 사례 보고서
|
||||
if int(item.get("atcflcnt") or 0) > 0:
|
||||
attach = await _api_get(
|
||||
f"{_ATTACH_EP}?serviceKey={key}&callApiId=1070"
|
||||
f"&pageNo=1&numOfRows=10&boardno={boardno}"
|
||||
)
|
||||
for att in _items(attach):
|
||||
filenm = (att.get("filenm") or "").strip()
|
||||
filepath = (att.get("filepath") or "").strip()
|
||||
if not filenm or not filepath.startswith("https://"):
|
||||
continue
|
||||
try:
|
||||
await _ingest_attachment(session, boardno, filenm, filepath)
|
||||
except FeedError as e:
|
||||
logger.warning(f"[kosha] 첨부 실패 skip ({boardno}/{filenm}): {e}")
|
||||
if page_all_dup:
|
||||
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
|
||||
|
||||
logger.info(f"[kosha] 재해사례 신규 {new_count}건")
|
||||
return new_count
|
||||
|
||||
|
||||
async def collect_kosha_guide(session, cap: int = _GUIDE_DAILY_CAP) -> int:
|
||||
"""GUIDE 레지스트리 전체 메타 diff → 신규/개정만 다운로드 (일일 cap 점진 백필)."""
|
||||
key = _api_key()
|
||||
await _get_or_create_source(session, _GUIDE_SOURCE, _GUIDE_EP)
|
||||
new_specs: list[dict] = []
|
||||
page, total = 1, None
|
||||
|
||||
while True:
|
||||
payload = await _api_get(
|
||||
f"{_GUIDE_EP}?serviceKey={key}&callApiId=1050&pageNo={page}&numOfRows={_GUIDE_ROWS}"
|
||||
)
|
||||
if total is None:
|
||||
total = int((payload.get("body") or {}).get("totalCount") or 0)
|
||||
items = _items(payload)
|
||||
if not items:
|
||||
break
|
||||
for item in items:
|
||||
no = (item.get("techGdlnNo") or "").strip()
|
||||
ymd = (item.get("techGdlnOfancYmd") or "").strip()
|
||||
url = (item.get("fileDownloadUrl") or "").strip()
|
||||
if not no or not url.startswith("https://"):
|
||||
continue
|
||||
fhash = hashlib.sha256(f"kosha-guide|{no}|{ymd}".encode()).hexdigest()[:32]
|
||||
existing = await session.execute(
|
||||
select(Document).where(Document.file_hash == fhash).limit(1)
|
||||
)
|
||||
if not existing.scalars().first():
|
||||
new_specs.append({"no": no, "ymd": ymd, "url": url,
|
||||
"name": (item.get("techGdlnNm") or no).strip(),
|
||||
"fhash": fhash})
|
||||
if page * _GUIDE_ROWS >= total:
|
||||
break
|
||||
page += 1
|
||||
|
||||
todo, deferred = new_specs[:cap], len(new_specs) - min(len(new_specs), cap)
|
||||
ingested = 0
|
||||
for spec in todo:
|
||||
safe_no = _safe_filename(spec["no"])
|
||||
rel_path = f"crawl_raw/kosha_guide/{safe_no}-{spec['ymd'] or 'nodate'}.pdf"
|
||||
dest = Path(settings.nas_mount_path) / rel_path
|
||||
try:
|
||||
size = await _download(spec["url"], dest)
|
||||
except FeedError as e:
|
||||
logger.warning(f"[kosha] GUIDE 다운로드 실패 skip ({spec['no']}): {e}")
|
||||
continue
|
||||
doc = Document(
|
||||
file_path=rel_path,
|
||||
file_hash=spec["fhash"],
|
||||
file_format="pdf",
|
||||
file_size=size,
|
||||
file_type="immutable",
|
||||
title=f"{spec['name']} ({spec['no']})",
|
||||
source_channel="crawl",
|
||||
data_origin="external",
|
||||
import_source="kosha_api",
|
||||
edit_url=spec["url"],
|
||||
ai_tags=["Safety/KOSHA GUIDE"],
|
||||
extract_meta={"kosha": {"kind": "guide", "techGdlnNo": spec["no"],
|
||||
"ofancYmd": spec["ymd"]}},
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
await enqueue_stage(session, doc.id, "extract")
|
||||
ingested += 1
|
||||
|
||||
# silent cap 금지 — 잔량 가시화 (자동 점진 백필: 내일 cap 만큼 또 소화)
|
||||
logger.info(f"[kosha] GUIDE 신규/개정 {len(new_specs)}건 중 {ingested}건 ingest"
|
||||
+ (f" (cap {cap}, 잔여 {deferred}건 — 일일 점진 백필)" if deferred > 0 else ""))
|
||||
return ingested
|
||||
|
||||
|
||||
async def run() -> None:
|
||||
"""daily 1회 — 소스별 실패 격리 (재해사례 실패가 GUIDE 를 막지 않게)."""
|
||||
now = datetime.now(timezone.utc)
|
||||
for name, collector in ((_CASE_SOURCE, collect_disaster_cases),
|
||||
(_GUIDE_SOURCE, collect_kosha_guide)):
|
||||
async with async_session() as session:
|
||||
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
|
||||
source = result.scalars().first()
|
||||
try:
|
||||
count = await collector(session)
|
||||
if source is None: # 첫 실행에서 collector 가 생성
|
||||
result = await session.execute(
|
||||
select(NewsSource).where(NewsSource.name == name))
|
||||
source = result.scalars().first()
|
||||
health = await _get_or_create_health(session, source.id)
|
||||
_record_success(health, count, False, now)
|
||||
await session.commit()
|
||||
except Exception as e:
|
||||
logger.error(f"[kosha] {name} 수집 실패: {e}")
|
||||
await session.rollback() # 부분 적재 폐기 후 health 만 기록
|
||||
if source is not None:
|
||||
health = await _get_or_create_health(session, source.id)
|
||||
_record_failure(health, str(e) or repr(e), now)
|
||||
await session.commit()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run())
|
||||
+436
-60
@@ -1,8 +1,16 @@
|
||||
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장"""
|
||||
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장
|
||||
|
||||
plan crawl-24x7-1 A그룹 (2026-06-10):
|
||||
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송) + 콘텐츠 해시 변경감지
|
||||
A-2 fulltext_policy='page' 소스는 'fulltext' stage 로 본문 승격 위임
|
||||
A-5 source_health 기록 + circuit breaker (소스별 실패 격리)
|
||||
A-6 first-wins + 포털 전재 2차 dedup (제목+최근 3일, 12자 이상 제목 한정)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from html import unescape
|
||||
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
|
||||
|
||||
@@ -10,11 +18,13 @@ import feedparser
|
||||
import httpx
|
||||
from sqlalchemy import select
|
||||
|
||||
from core.crawl_politeness import CRAWL_UA
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.news_source import NewsSource
|
||||
from models.queue import enqueue_stage
|
||||
from models.source_health import SourceHealth
|
||||
|
||||
logger = setup_logger("news_collector")
|
||||
|
||||
@@ -26,6 +36,7 @@ CATEGORY_MAP = {
|
||||
"환경": "Environment", "기술": "Technology",
|
||||
# 영어
|
||||
"World": "International", "International": "International",
|
||||
"World news": "International", # Guardian sectionName (B-2)
|
||||
"Technology": "Technology", "Tech": "Technology", "Sci-Tech": "Technology",
|
||||
"Arts": "Culture", "Culture": "Culture",
|
||||
"Climate": "Environment", "Environment": "Environment",
|
||||
@@ -35,21 +46,30 @@ CATEGORY_MAP = {
|
||||
"Kultur": "Culture", "Wissenschaft": "Technology",
|
||||
# 프랑스어
|
||||
"Environnement": "Environment",
|
||||
# 도메인 채널 (source_channel='crawl', 0-5 (a)) — 양쪽 공통 맵
|
||||
"안전": "Safety", "Safety": "Safety",
|
||||
"공학": "Engineering", "Engineering": "Engineering",
|
||||
"철학": "Philosophy", "Philosophy": "Philosophy",
|
||||
}
|
||||
|
||||
|
||||
class FeedError(Exception):
|
||||
"""소스 단위 fetch/parse 실패 — run() 이 source_health 실패로 기록."""
|
||||
|
||||
|
||||
def _normalize_category(raw: str) -> str:
|
||||
"""카테고리 표준화"""
|
||||
return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other"))
|
||||
|
||||
|
||||
def _clean_html(text: str) -> str:
|
||||
"""HTML 태그 제거 + 정제"""
|
||||
def _clean_html(text: str, max_len: int | None = 1000) -> str:
|
||||
"""HTML 태그 제거 + 정제. max_len=None 이면 절단 없음 (feed-full 전문용)."""
|
||||
if not text:
|
||||
return ""
|
||||
text = re.sub(r"<[^>]+>", "", text)
|
||||
text = unescape(text)
|
||||
return text.strip()[:1000]
|
||||
text = text.strip()
|
||||
return text if max_len is None else text[:max_len]
|
||||
|
||||
|
||||
# tracking 파라미터 판별 — prefix(utm_/at_=BBC/ns_=BBC/mc_=mailchimp) + 단독 키
|
||||
@@ -87,8 +107,104 @@ def _normalize_to_utc(dt) -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
# ── A-5: circuit breaker 정책 ──
|
||||
# 연속 실패 >= OPEN 임계 → open (재시도 간격 지수 확대, 6h × 2^n, cap 48h)
|
||||
# 연속 실패 > DISABLE 임계 → disabled (수집 제외 + 가시 로그, 수동 복구 대상)
|
||||
# news_sources.enabled 는 건드리지 않는다 — 사용자 의도(enabled)와 자동 상태(circuit) 분리.
|
||||
_CIRCUIT_OPEN_AFTER = 3
|
||||
_CIRCUIT_DISABLE_AFTER = 10
|
||||
_BACKOFF_BASE_HOURS = 6
|
||||
_BACKOFF_CAP_HOURS = 48
|
||||
_EMPTY_STREAK_ALERT = 8 # 6h 사이클 × 8 = 약 2일 연속 빈 피드 → 가시 경고
|
||||
|
||||
|
||||
def _should_attempt(health: SourceHealth, now: datetime) -> bool:
|
||||
"""circuit 상태에 따라 이번 사이클 fetch 여부 결정.
|
||||
|
||||
주의 (B-3 계약 ②, r5): 추후 relogin_requested 플래그 소비는 반드시 이
|
||||
open-스킵 분기보다 *앞*에 두어야 한다 — open 이 스케줄 제외 형태가 되면
|
||||
배치 경계가 안 와 플래그가 영원히 미소비(half-open 데드 버튼)가 된다.
|
||||
"""
|
||||
if health.circuit_state == "disabled":
|
||||
return False
|
||||
if health.circuit_state == "open" and health.last_error_at is not None:
|
||||
over = max(health.consecutive_failures - _CIRCUIT_OPEN_AFTER, 0)
|
||||
backoff_h = min(_BACKOFF_BASE_HOURS * (2 ** over), _BACKOFF_CAP_HOURS)
|
||||
if now - health.last_error_at < timedelta(hours=backoff_h):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _record_success(health: SourceHealth, items: int, not_modified: bool, now: datetime) -> None:
|
||||
health.consecutive_failures = 0
|
||||
health.total_fetches += 1
|
||||
health.last_success_at = now
|
||||
health.last_fetch_items = items
|
||||
if health.circuit_state != "closed":
|
||||
logger.info(f"[health] source={health.source_id} circuit {health.circuit_state}→closed")
|
||||
health.circuit_state = "closed"
|
||||
health.circuit_opened_at = None
|
||||
# 빈 피드 streak: 304/해시동일은 정상 신호라 미집계, 200+entries 0 만 집계 (피드 부패 감시)
|
||||
if not_modified:
|
||||
pass
|
||||
elif items == 0:
|
||||
health.empty_streak += 1
|
||||
if health.empty_streak >= _EMPTY_STREAK_ALERT:
|
||||
logger.error(
|
||||
f"[health] source={health.source_id} 빈 피드 {health.empty_streak}회 연속 "
|
||||
f"— 피드 부패 의심 (RSSHub 류 라우트 깨짐 패턴)"
|
||||
)
|
||||
else:
|
||||
health.empty_streak = 0
|
||||
health.updated_at = now
|
||||
|
||||
|
||||
def _record_failure(health: SourceHealth, error: str, now: datetime) -> None:
|
||||
health.consecutive_failures += 1
|
||||
health.total_fetches += 1
|
||||
health.total_failures += 1
|
||||
health.last_error = error[:500]
|
||||
health.last_error_at = now
|
||||
health.updated_at = now
|
||||
cf = health.consecutive_failures
|
||||
if cf > _CIRCUIT_DISABLE_AFTER and health.circuit_state != "disabled":
|
||||
health.circuit_state = "disabled"
|
||||
logger.error(
|
||||
f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit DISABLED "
|
||||
f"(수집 제외, A-8 패널에서 수동 복구 필요)"
|
||||
)
|
||||
elif cf >= _CIRCUIT_OPEN_AFTER and health.circuit_state == "closed":
|
||||
health.circuit_state = "open"
|
||||
health.circuit_opened_at = now
|
||||
logger.warning(f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit open")
|
||||
|
||||
|
||||
async def _get_or_create_health(session, source_id: int) -> SourceHealth:
|
||||
result = await session.execute(
|
||||
select(SourceHealth).where(SourceHealth.source_id == source_id)
|
||||
)
|
||||
health = result.scalars().first()
|
||||
if health is None:
|
||||
health = SourceHealth(source_id=source_id)
|
||||
session.add(health)
|
||||
await session.flush()
|
||||
return health
|
||||
|
||||
|
||||
# 수동 POST /api/news/collect 와 6h 스케줄 사이클의 동시 실행 차단 (단일 프로세스·단일
|
||||
# 이벤트루프). 동시 진입 시 _get_or_create_health 가 같은 source_id 를 양쪽에서 INSERT
|
||||
# → uq_source_health_source_id 위반 IntegrityError 로 사이클 전체가 죽는 경합의 원천 봉쇄.
|
||||
_run_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def run():
|
||||
"""뉴스 수집 실행"""
|
||||
async with _run_lock:
|
||||
await _run_locked()
|
||||
|
||||
|
||||
async def _run_locked():
|
||||
now = datetime.now(timezone.utc)
|
||||
async with async_session() as session:
|
||||
result = await session.execute(
|
||||
select(NewsSource).where(NewsSource.enabled == True)
|
||||
@@ -101,17 +217,23 @@ async def run():
|
||||
|
||||
total = 0
|
||||
for source in sources:
|
||||
health = await _get_or_create_health(session, source.id)
|
||||
if not _should_attempt(health, now):
|
||||
logger.info(f"[{source.name}] circuit {health.circuit_state} — 이번 사이클 skip")
|
||||
continue
|
||||
try:
|
||||
if source.feed_type == "api":
|
||||
count = await _fetch_api(session, source)
|
||||
count, status = await _fetch_api(session, source)
|
||||
else:
|
||||
count = await _fetch_rss(session, source)
|
||||
count, status = await _fetch_rss(session, source)
|
||||
|
||||
source.last_fetched_at = datetime.now(timezone.utc)
|
||||
_record_success(health, count, status == "not_modified", now)
|
||||
total += count
|
||||
except Exception as e:
|
||||
logger.error(f"[{source.name}] 수집 실패: {e}")
|
||||
source.last_fetched_at = datetime.now(timezone.utc)
|
||||
_record_failure(health, str(e) or repr(e), now)
|
||||
|
||||
await session.commit()
|
||||
logger.info(f"뉴스 수집 완료: {total}건 신규")
|
||||
@@ -122,8 +244,83 @@ ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml",
|
||||
"application/xml", "text/xml")
|
||||
|
||||
|
||||
async def _fetch_rss(session, source: NewsSource) -> int:
|
||||
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한"""
|
||||
async def _is_portal_duplicate(session, title: str) -> bool:
|
||||
"""A-6 2차 dedup: 포털 전재본 vs 원본이 다른 URL 로 이중 적재되는 케이스.
|
||||
|
||||
보조 키 = 제목 + 최근 3일 (다른 소스/다른 URL 이므로 1차 키로 안 잡힘).
|
||||
범용 제목 오탐 방지: 12자 미만 제목은 비적용. skip 은 전부 로그 (silent 누락 회피).
|
||||
"""
|
||||
if len(title) < 12:
|
||||
return False
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=3)
|
||||
dup = await session.execute(
|
||||
select(Document.id).where(
|
||||
Document.title == title,
|
||||
Document.source_channel == "news",
|
||||
Document.file_format == "article",
|
||||
Document.extracted_at >= cutoff,
|
||||
).limit(1)
|
||||
)
|
||||
return dup.scalars().first() is not None
|
||||
|
||||
|
||||
async def _enqueue_processing(session, doc: Document, source: NewsSource, pub_dt: datetime) -> None:
|
||||
"""후속 단계 enqueue.
|
||||
|
||||
fulltext_policy='page' 소스는 'fulltext' stage 만 — summarize/embed/chunk 는
|
||||
fulltext_worker 가 승격(또는 격하) 확정 후 enqueue (RSS 요약 선요약 → 풀텍스트
|
||||
도착 시 summarize_worker 의 '이미 요약 있음 skip' 에 막히는 순서 함정 회피).
|
||||
"""
|
||||
if source.fulltext_policy == "page" and doc.edit_url:
|
||||
await enqueue_stage(session, doc.id, "fulltext")
|
||||
return
|
||||
await enqueue_stage(session, doc.id, "summarize")
|
||||
if source.source_channel == "crawl":
|
||||
# 도메인 재료 코퍼스 — 발행일 무관 전량 색인 (30일 게이트는 뉴스 전용)
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
return
|
||||
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
||||
if days_old <= 30:
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
|
||||
|
||||
def _build_extract_meta(source: NewsSource, pub_dt: datetime) -> dict:
|
||||
"""fulltext_worker / 패널이 쓰는 출처 메타 (documents 에 source FK 가 없어 여기 기록)."""
|
||||
return {
|
||||
"source_id": source.id,
|
||||
"source_name": source.name,
|
||||
"published_at": pub_dt.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
def _doc_identity(source: NewsSource, source_short: str, category: str) -> dict:
|
||||
"""채널별 문서 정체성 — news 채널은 기존 값 그대로(무회귀), crawl 채널은 도메인 정체성.
|
||||
|
||||
file_path 접두사가 곧 채널 디렉토리. ai_domain 은 다이제스트/검색 필터의 분기 축이라
|
||||
crawl 채널이 'News' 를 오염시키지 않게 분리 (0-5 채널 레벨 분리 사상).
|
||||
"""
|
||||
if source.source_channel == "crawl":
|
||||
domain = category if category and category != "Other" else "Domain"
|
||||
return {
|
||||
"path_prefix": "crawl",
|
||||
"ai_domain": domain,
|
||||
"ai_tags": [f"{domain}/{source_short}"],
|
||||
}
|
||||
return {
|
||||
"path_prefix": "news",
|
||||
"ai_domain": "News",
|
||||
"ai_tags": [f"News/{source_short}/{category}"],
|
||||
}
|
||||
|
||||
|
||||
async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
|
||||
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1).
|
||||
|
||||
반환 (신규 건수, 상태). 상태 'not_modified' = 304 또는 콘텐츠 해시 동일.
|
||||
소스 단위 실패는 FeedError raise — run() 이 health 실패로 기록.
|
||||
"""
|
||||
from urllib.parse import urljoin
|
||||
from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS
|
||||
|
||||
@@ -134,51 +331,79 @@ async def _fetch_rss(session, source: NewsSource) -> int:
|
||||
|
||||
# 순수 HTTP 소스인데 allowlist에 없으면 차단
|
||||
if source.feed_url.startswith("http://") and not http_allowed:
|
||||
logger.error(f"[{source.name}] HTTP 차단 (allowlist 미등록): {source_hostname}")
|
||||
return 0
|
||||
raise FeedError(f"HTTP 차단 (allowlist 미등록): {source_hostname}")
|
||||
|
||||
# fetch 전 URL 재검증 (등록 이후 DNS 변경 대비)
|
||||
try:
|
||||
validate_feed_url(source.feed_url, allow_http=http_allowed)
|
||||
except ValueError as e:
|
||||
logger.error(f"[{source.name}] URL 검증 실패: {e}")
|
||||
return 0
|
||||
raise FeedError(f"URL 검증 실패: {e}") from e
|
||||
|
||||
async with httpx.AsyncClient(timeout=10, follow_redirects=False) as client:
|
||||
# A-1: 정직 UA + 조건부 GET — 서버가 준 워터마크를 받은 그대로 재전송
|
||||
headers = {"User-Agent": CRAWL_UA}
|
||||
if source.etag:
|
||||
headers["If-None-Match"] = source.etag
|
||||
if source.last_modified:
|
||||
headers["If-Modified-Since"] = source.last_modified
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
timeout=10, follow_redirects=False, headers=headers
|
||||
) as client:
|
||||
resp = await client.get(source.feed_url)
|
||||
|
||||
# redirect 수동 처리 (최대 3회, 각 target 재검증)
|
||||
# 304 는 redirect 처리보다 먼저 — httpx 의 is_redirect 는 3xx 전체(304 포함)에
|
||||
# True 라, 304 를 redirect 로 오인하면 location 없는 같은 URL 을 재요청해
|
||||
# "redirect 3회 초과" 로 오류 처리됨(조건부 GET 안정 피드 전멸 버그).
|
||||
if resp.status_code == 304:
|
||||
logger.info(f"[{source.name}] 304 Not Modified — 본문 미전송")
|
||||
return 0, "not_modified"
|
||||
|
||||
# redirect 수동 처리 (최대 3회, 각 target 재검증) — location 있는 진짜 redirect 만.
|
||||
# allowlist 도메인이면 redirect target의 HTTP도 허용
|
||||
redirects = 0
|
||||
while resp.is_redirect and redirects < 3:
|
||||
location = resp.headers.get("location", "")
|
||||
location = urljoin(str(resp.request.url), location)
|
||||
while resp.has_redirect_location and redirects < 3:
|
||||
location = urljoin(str(resp.request.url), resp.headers["location"])
|
||||
try:
|
||||
validate_feed_url(location, allow_http=http_allowed)
|
||||
except ValueError as e:
|
||||
logger.error(f"[{source.name}] redirect target 차단: {e}")
|
||||
return 0
|
||||
raise FeedError(f"redirect target 차단: {e}") from e
|
||||
resp = await client.get(location)
|
||||
if resp.status_code == 304:
|
||||
logger.info(f"[{source.name}] 304 Not Modified (redirect 후) — 본문 미전송")
|
||||
return 0, "not_modified"
|
||||
redirects += 1
|
||||
if resp.is_redirect:
|
||||
logger.error(f"[{source.name}] redirect 3회 초과")
|
||||
return 0
|
||||
if resp.has_redirect_location:
|
||||
raise FeedError("redirect 3회 초과")
|
||||
|
||||
resp.raise_for_status()
|
||||
|
||||
if len(resp.content) > MAX_RESPONSE_SIZE:
|
||||
logger.warning(f"[{source.name}] 응답 크기 초과: {len(resp.content)} bytes")
|
||||
return 0
|
||||
raise FeedError(f"응답 크기 초과: {len(resp.content)} bytes")
|
||||
|
||||
ct = resp.headers.get("content-type", "").lower()
|
||||
if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
|
||||
logger.warning(f"[{source.name}] 비정상 content-type: {ct}")
|
||||
return 0
|
||||
raise FeedError(f"비정상 content-type: {ct}")
|
||||
|
||||
# A-1: 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) — 저장된 해시는 항상
|
||||
# 파싱 검증을 통과한 응답의 것이므로 동일성 비교는 파싱 전에 안전
|
||||
new_etag = resp.headers.get("etag")
|
||||
new_last_modified = resp.headers.get("last-modified")
|
||||
content_hash = hashlib.sha256(resp.content).hexdigest()
|
||||
if source.feed_content_hash == content_hash:
|
||||
logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip")
|
||||
return 0, "not_modified"
|
||||
|
||||
feed = feedparser.parse(resp.text)
|
||||
if feed.bozo and not feed.entries:
|
||||
logger.warning(f"[{source.name}] RSS 파싱 실패: {feed.bozo_exception}")
|
||||
return 0
|
||||
raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}")
|
||||
|
||||
# A-1: 워터마크 영속은 파싱 검증 통과 후에만 — 부패(bozo) 응답의 ETag 를 저장하면
|
||||
# 이후 304 로 영구 skip 되는 silent corruption 차단
|
||||
if new_etag:
|
||||
source.etag = new_etag
|
||||
if new_last_modified:
|
||||
source.last_modified = new_last_modified
|
||||
source.feed_content_hash = content_hash
|
||||
count = 0
|
||||
|
||||
for entry in feed.entries:
|
||||
@@ -190,7 +415,24 @@ async def _fetch_rss(session, source: NewsSource) -> int:
|
||||
if not summary:
|
||||
summary = title
|
||||
|
||||
# A-6: feed-full 소스만 피드 본문을 전문으로 신뢰 (truncate·광고 삽입이 흔해
|
||||
# 일반 소스의 summary/content:encoded 를 전문으로 오인 저장 금지)
|
||||
body = summary
|
||||
is_feed_full = False
|
||||
if source.fulltext_policy == "feed-full":
|
||||
content_list = entry.get("content") or []
|
||||
raw_body = content_list[0].get("value", "") if content_list else ""
|
||||
full_body = _clean_html(raw_body or entry.get("summary", ""), max_len=None)
|
||||
if len(full_body) > len(summary):
|
||||
body = full_body
|
||||
is_feed_full = True
|
||||
|
||||
link = entry.get("link", "")
|
||||
|
||||
# B-5 quirk: 비디오 항목 필터 (Aeon/Psyche — 텍스트 코퍼스에 비디오 페이지 무가치)
|
||||
if source.parser_quirk == "skip-video" and re.search(r"/videos?/", link):
|
||||
continue
|
||||
|
||||
published = entry.get("published_parsed") or entry.get("updated_parsed")
|
||||
pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc)
|
||||
|
||||
@@ -209,56 +451,190 @@ async def _fetch_rss(session, source: NewsSource) -> int:
|
||||
if existing.scalars().first():
|
||||
continue
|
||||
|
||||
# A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본)
|
||||
if await _is_portal_duplicate(session, title):
|
||||
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
|
||||
continue
|
||||
|
||||
category = _normalize_category(source.category or "")
|
||||
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
|
||||
ident = _doc_identity(source, source_short, category)
|
||||
|
||||
doc = Document(
|
||||
file_path=f"news/{source.name}/{article_id}",
|
||||
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
|
||||
file_hash=article_id,
|
||||
file_format="article",
|
||||
file_size=len(summary.encode()),
|
||||
file_size=len(body.encode()),
|
||||
file_type="note",
|
||||
title=title,
|
||||
extracted_text=f"{title}\n\n{summary}",
|
||||
extracted_text=f"{title}\n\n{body}",
|
||||
extracted_at=datetime.now(timezone.utc),
|
||||
extractor_version="rss",
|
||||
extractor_version="rss-feed-full" if is_feed_full else "rss",
|
||||
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
|
||||
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
|
||||
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
|
||||
# fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신.
|
||||
md_status="skipped",
|
||||
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
|
||||
source_channel="news",
|
||||
source_channel=source.source_channel,
|
||||
data_origin="external",
|
||||
# 조회와 동일하게 정규화해 저장 — raw(tracking param 포함) 저장 시 URL dedup 무력화
|
||||
edit_url=normalized_url,
|
||||
review_status="approved",
|
||||
ai_domain="News",
|
||||
ai_domain=ident["ai_domain"],
|
||||
ai_sub_group=source_short,
|
||||
ai_tags=[f"News/{source_short}/{category}"],
|
||||
ai_tags=ident["ai_tags"],
|
||||
extract_meta=_build_extract_meta(source, pub_dt),
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
|
||||
# summarize + embed + chunk 등록 (classify 불필요)
|
||||
await enqueue_stage(session, doc.id, "summarize")
|
||||
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
||||
if days_old <= 30:
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
# summarize + embed + chunk 등록 (classify 불필요).
|
||||
# page 정책 소스는 fulltext 만 — 후속은 fulltext_worker 가 확정 후 enqueue.
|
||||
await _enqueue_processing(session, doc, source, pub_dt)
|
||||
|
||||
count += 1
|
||||
|
||||
logger.info(f"[{source.name}] RSS → {count}건 수집")
|
||||
return count
|
||||
return count, "ok"
|
||||
|
||||
|
||||
async def _fetch_api(session, source: NewsSource) -> int:
|
||||
async def _fetch_api(session, source: NewsSource) -> tuple[int, str]:
|
||||
"""API 소스 디스패치 — feed_url 호스트로 제공자 판별 (B-2).
|
||||
|
||||
레거시 NYT 행(feed_url=api.nytimes.com)은 무변경 경로. 신규 제공자는 호스트 분기 추가.
|
||||
미지의 호스트 = NYT 경로로 넘기지 않고 명시 실패 (silent fallback 금지).
|
||||
"""
|
||||
host = (urlparse(source.feed_url).hostname or "").lower()
|
||||
if host.endswith("guardianapis.com"):
|
||||
return await _fetch_api_guardian(session, source)
|
||||
if host.endswith("nytimes.com"):
|
||||
return await _fetch_api_nyt(session, source)
|
||||
raise FeedError(f"API 제공자 미등록 호스트: {host} — 디스패치 분기 추가 필요")
|
||||
|
||||
|
||||
def _guardian_request(feed_url: str, api_key: str) -> tuple[str, dict]:
|
||||
"""Guardian 호출 형태 단일 source-of-truth — fixture 회귀 테스트 대상
|
||||
(tests/fixtures/guardian_open_platform_search_response.json 박제 시 호출과 동일해야 함)."""
|
||||
parsed = urlparse(feed_url)
|
||||
params = {
|
||||
**dict(parse_qsl(parsed.query)),
|
||||
"show-fields": "bodyText,trailText",
|
||||
"page-size": "20",
|
||||
"order-by": "newest",
|
||||
"api-key": api_key,
|
||||
}
|
||||
return f"{parsed.scheme}://{parsed.netloc}{parsed.path}", params
|
||||
|
||||
|
||||
async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]:
|
||||
"""Guardian Open Platform 수집 (B-2) — show-fields=bodyText 로 정식 전문 JSON.
|
||||
|
||||
feed_url 에 section 쿼리를 박아 등록 (예: https://content.guardianapis.com/search?section=world).
|
||||
전문이 API 로 오므로 fulltext stage 불요. 키 미설정 = FeedError (health 실패 기록,
|
||||
silent fallback 없음 — [[feedback_no_silent_fallback_explicit_opt_in]]).
|
||||
"""
|
||||
import os
|
||||
api_key = os.getenv("GUARDIAN_API_KEY", "")
|
||||
if not api_key:
|
||||
raise FeedError("GUARDIAN_API_KEY 미설정 — Guardian 수집 불가")
|
||||
|
||||
endpoint, params = _guardian_request(source.feed_url, api_key)
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
resp = await client.get(endpoint, params=params)
|
||||
resp.raise_for_status()
|
||||
except httpx.HTTPStatusError as e:
|
||||
# 쿼리스트링(api-key 포함) 제거 — path 까지만 로깅 (NYT 와 동일 규율)
|
||||
safe_url = str(e.request.url).split("?")[0]
|
||||
raise FeedError(f"Guardian API 실패: {e.response.status_code} @ {safe_url}") from e
|
||||
except httpx.RequestError as e:
|
||||
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
|
||||
raise FeedError(f"Guardian API 연결 실패: {safe_url}") from e
|
||||
|
||||
payload = resp.json().get("response", {})
|
||||
if payload.get("status") != "ok":
|
||||
raise FeedError(f"Guardian API status={payload.get('status')}")
|
||||
|
||||
count = 0
|
||||
for item in payload.get("results", []):
|
||||
title = (item.get("webTitle") or "").strip()
|
||||
if not title:
|
||||
continue
|
||||
|
||||
fields = item.get("fields") or {}
|
||||
body_text = (fields.get("bodyText") or "").strip()
|
||||
trail = _clean_html(fields.get("trailText") or "")
|
||||
# bodyText = plain text 전문 (HTML 정화 불요). 짧으면(라이브 블로그 잔재 등) trail 격하.
|
||||
is_full = len(body_text) >= 200
|
||||
body = body_text if is_full else (trail or title)
|
||||
|
||||
link = item.get("webUrl", "")
|
||||
pub_str = item.get("webPublicationDate", "")
|
||||
try:
|
||||
pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00"))
|
||||
except (ValueError, AttributeError):
|
||||
pub_dt = datetime.now(timezone.utc)
|
||||
|
||||
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
|
||||
normalized_url = _normalize_url(link)
|
||||
|
||||
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
|
||||
existing = await session.execute(
|
||||
select(Document).where(
|
||||
(Document.file_hash == article_id) |
|
||||
(Document.edit_url.in_([normalized_url, link]))
|
||||
).limit(1)
|
||||
)
|
||||
if existing.scalars().first():
|
||||
continue
|
||||
|
||||
if await _is_portal_duplicate(session, title):
|
||||
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
|
||||
continue
|
||||
|
||||
category = _normalize_category(item.get("sectionName", source.category or ""))
|
||||
source_short = source.name.split(" ")[0]
|
||||
ident = _doc_identity(source, source_short, category)
|
||||
|
||||
doc = Document(
|
||||
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
|
||||
file_hash=article_id,
|
||||
file_format="article",
|
||||
file_size=len(body.encode()),
|
||||
file_type="note",
|
||||
title=title,
|
||||
extracted_text=f"{title}\n\n{body}",
|
||||
extracted_at=datetime.now(timezone.utc),
|
||||
extractor_version="guardian_api_full" if is_full else "guardian_api",
|
||||
md_status="skipped",
|
||||
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
|
||||
source_channel=source.source_channel,
|
||||
data_origin="external",
|
||||
edit_url=normalized_url,
|
||||
review_status="approved",
|
||||
ai_domain=ident["ai_domain"],
|
||||
ai_sub_group=source_short,
|
||||
ai_tags=ident["ai_tags"],
|
||||
extract_meta=_build_extract_meta(source, pub_dt),
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
|
||||
await _enqueue_processing(session, doc, source, pub_dt)
|
||||
count += 1
|
||||
|
||||
logger.info(f"[{source.name}] API → {count}건 수집")
|
||||
return count, "ok"
|
||||
|
||||
|
||||
async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]:
|
||||
"""NYT API 수집 — 키 마스킹 + health degradation"""
|
||||
import os
|
||||
nyt_key = os.getenv("NYT_API_KEY", "")
|
||||
if not nyt_key:
|
||||
logger.error("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
|
||||
return 0
|
||||
raise FeedError("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
@@ -270,12 +646,10 @@ async def _fetch_api(session, source: NewsSource) -> int:
|
||||
except httpx.HTTPStatusError as e:
|
||||
# 쿼리스트링(api-key 포함) 제거 — path까지만 로깅
|
||||
safe_url = str(e.request.url).split("?")[0]
|
||||
logger.error(f"NYT API 실패: {e.response.status_code} @ {safe_url}")
|
||||
return 0
|
||||
raise FeedError(f"NYT API 실패: {e.response.status_code} @ {safe_url}") from e
|
||||
except httpx.RequestError as e:
|
||||
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
|
||||
logger.error(f"NYT API 연결 실패: {safe_url}")
|
||||
return 0
|
||||
raise FeedError(f"NYT API 연결 실패: {safe_url}") from e
|
||||
|
||||
data = resp.json()
|
||||
count = 0
|
||||
@@ -309,11 +683,16 @@ async def _fetch_api(session, source: NewsSource) -> int:
|
||||
if existing.scalars().first():
|
||||
continue
|
||||
|
||||
if await _is_portal_duplicate(session, title):
|
||||
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
|
||||
continue
|
||||
|
||||
category = _normalize_category(article.get("section", source.category or ""))
|
||||
source_short = source.name.split(" ")[0]
|
||||
|
||||
ident = _doc_identity(source, source_short, category)
|
||||
doc = Document(
|
||||
file_path=f"news/{source.name}/{article_id}",
|
||||
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
|
||||
file_hash=article_id,
|
||||
file_format="article",
|
||||
file_size=len(summary.encode()),
|
||||
@@ -327,24 +706,21 @@ async def _fetch_api(session, source: NewsSource) -> int:
|
||||
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
|
||||
md_status="skipped",
|
||||
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
|
||||
source_channel="news",
|
||||
source_channel=source.source_channel,
|
||||
data_origin="external",
|
||||
edit_url=normalized_url,
|
||||
review_status="approved",
|
||||
ai_domain="News",
|
||||
ai_domain=ident["ai_domain"],
|
||||
ai_sub_group=source_short,
|
||||
ai_tags=[f"News/{source_short}/{category}"],
|
||||
ai_tags=ident["ai_tags"],
|
||||
extract_meta=_build_extract_meta(source, pub_dt),
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
|
||||
await enqueue_stage(session, doc.id, "summarize")
|
||||
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
||||
if days_old <= 30:
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
await _enqueue_processing(session, doc, source, pub_dt)
|
||||
|
||||
count += 1
|
||||
|
||||
logger.info(f"[{source.name}] API → {count}건 수집")
|
||||
return count
|
||||
return count, "ok"
|
||||
|
||||
@@ -22,8 +22,11 @@ logger = setup_logger("queue_consumer")
|
||||
# stage별 배치 크기
|
||||
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
|
||||
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
|
||||
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
|
||||
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
|
||||
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
|
||||
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1}
|
||||
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
|
||||
"fulltext": 3}
|
||||
STALE_THRESHOLD_MINUTES = 10
|
||||
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
|
||||
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
|
||||
@@ -35,7 +38,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"
|
||||
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
|
||||
MAIN_QUEUE_STAGES = [
|
||||
"extract", "classify", "summarize", "embed", "chunk",
|
||||
"preview", "stt", "thumbnail", "deep_summary",
|
||||
"preview", "stt", "thumbnail", "deep_summary", "fulltext",
|
||||
]
|
||||
MARKDOWN_QUEUE_STAGES = ["markdown"]
|
||||
|
||||
@@ -137,6 +140,9 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
|
||||
# source_channel-aware override (extract stage 만). source_channel 누락 시 _default.
|
||||
extract_override_by_channel = {
|
||||
"devonagent": ["embed", "chunk"],
|
||||
# crawl 채널 파일형 (KOSHA 첨부/GUIDE PDF 등): preview 사전 캐시 스킵 —
|
||||
# 재료 코퍼스 대량 백필이 preview 큐를 점령하지 않게. classify → embed/chunk/markdown 유지.
|
||||
"crawl": ["classify"],
|
||||
}
|
||||
|
||||
next_stages = {
|
||||
@@ -179,6 +185,7 @@ def _load_workers():
|
||||
from workers.summarize_worker import process as summarize_process
|
||||
from workers.thumbnail_worker import process as thumbnail_process
|
||||
from workers.marker_worker import process as marker_process
|
||||
from workers.fulltext_worker import process as fulltext_process
|
||||
|
||||
return {
|
||||
"extract": extract_process,
|
||||
@@ -195,6 +202,9 @@ def _load_workers():
|
||||
# Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립).
|
||||
# consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록).
|
||||
"markdown": marker_process,
|
||||
# crawl-24x7 A-2: 기사 페이지 fetch → 4-tier 본문 승격. 후속(summarize/embed/chunk)은
|
||||
# 워커가 직접 enqueue — next_stages dict 미등록 (enqueue_next_stage no-op).
|
||||
"fulltext": fulltext_process,
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,265 @@
|
||||
"""C-3 공학 정적 코퍼스 1회 일괄 ingest (plan crawl-24x7-1).
|
||||
|
||||
National Board 기술 아티클(~86, ASP.NET 구식 — 기사 앵커가 싱글쿼트 href) +
|
||||
TWI Job Knowledge(~153, sitemap 기반). 지속 크롤링이 아니라 아카이브 일괄 +
|
||||
저빈도 증분 유형 — 스케줄러 미등록, 수동 CLI:
|
||||
|
||||
docker exec hyungi_document_server-fastapi-1 \
|
||||
python -m workers.static_corpus_ingest --corpus all --limit 3 # 검증용
|
||||
docker exec -d hyungi_document_server-fastapi-1 \
|
||||
python -m workers.static_corpus_ingest --corpus all # 전체 (~45분)
|
||||
|
||||
※ -d 백그라운드 실행 시 중단은 host pkill 이 아니라 컨테이너 내부 PID kill
|
||||
([[feedback_docker_exec_orphan_kill]]).
|
||||
|
||||
멱등: edit_url(정규화)+file_hash dedup — 재실행 = 신규분만 (그대로 monthly 증분 절차).
|
||||
politeness: fetch_page 재사용 (per-domain 1 + 5~15s jitter + robots).
|
||||
원본 보존·승격 필드: fulltext_worker 와 동일 규약 (재추출 가능 상태 유지).
|
||||
실패는 degrade 없이 skip + 말미 목록 출력 (정적 코퍼스 — RSS 요약 같은 격하 대상 부재).
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import hashlib
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from html import unescape
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.news_source import NewsSource
|
||||
from models.queue import enqueue_stage
|
||||
from workers.fulltext_worker import (
|
||||
_WEB_MIN_BODY_LEN,
|
||||
_extract_body,
|
||||
_raw_html_path,
|
||||
_save_raw_html,
|
||||
_strip_article_footer,
|
||||
)
|
||||
from workers.news_collector import _article_hash, _normalize_url
|
||||
|
||||
logger = setup_logger("static_corpus")
|
||||
|
||||
_NB_LISTING = "https://www.nationalboard.org/Index.aspx?pageID=164"
|
||||
_TWI_SITEMAP = "https://www.twi-global.com/sitemap.xml"
|
||||
|
||||
|
||||
async def _discover_national_board() -> list[str]:
|
||||
"""목록 페이지의 기사 앵커 — 싱글쿼트 href 가 기본형이라 양쪽 인용부호 매칭."""
|
||||
html_text, _ = await fetch_page(_NB_LISTING)
|
||||
ids = sorted(
|
||||
{int(i) for i in re.findall(
|
||||
r"href=['\"]/?Index\.aspx\?pageID=164&(?:amp;)?ID=(\d+)['\"]", html_text)}
|
||||
)
|
||||
return [f"https://www.nationalboard.org/Index.aspx?pageID=164&ID={i}" for i in ids]
|
||||
|
||||
|
||||
async def _discover_twi() -> list[str]:
|
||||
"""sitemap 에서 job-knowledge 시리즈만 (faqs/published-papers 는 향후 증분 후보)."""
|
||||
xml_text, _ = await fetch_page(
|
||||
_TWI_SITEMAP,
|
||||
content_types=("text/xml", "application/xml", "text/html"),
|
||||
)
|
||||
urls = re.findall(
|
||||
r"<loc>(https://www\.twi-global\.com/technical-knowledge/job-knowledge/[^<]+)</loc>",
|
||||
xml_text,
|
||||
)
|
||||
return sorted({u for u in urls if not u.rstrip("/").endswith("job-knowledge")})
|
||||
|
||||
|
||||
CORPORA = {
|
||||
"national-board": {
|
||||
"source_name": "National Board 기술 아티클",
|
||||
"listing_url": _NB_LISTING,
|
||||
"discover": _discover_national_board,
|
||||
"fetch_method": "page",
|
||||
},
|
||||
"twi": {
|
||||
"source_name": "TWI Job Knowledge",
|
||||
"listing_url": _TWI_SITEMAP,
|
||||
"discover": _discover_twi,
|
||||
"fetch_method": "sitemap+page",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
async def _get_or_create_source(session, spec: dict) -> NewsSource:
|
||||
"""레지스트리 행 — 출처 추적 + crawl_raw src_{id} 경로 + A-8 패널 가시성.
|
||||
|
||||
enabled=False: 6h 뉴스 사이클 비대상 (피드가 없는 정적 코퍼스 — 증분은 본 CLI 재실행).
|
||||
"""
|
||||
result = await session.execute(
|
||||
select(NewsSource).where(NewsSource.name == spec["source_name"])
|
||||
)
|
||||
source = result.scalars().first()
|
||||
if source is None:
|
||||
source = NewsSource(
|
||||
name=spec["source_name"],
|
||||
feed_url=spec["listing_url"],
|
||||
feed_type="rss",
|
||||
fetch_method=spec["fetch_method"],
|
||||
fulltext_policy="none",
|
||||
source_channel="crawl",
|
||||
category="Engineering",
|
||||
language="en",
|
||||
country="US" if "national" in spec["source_name"].lower() else "GB",
|
||||
enabled=False,
|
||||
)
|
||||
session.add(source)
|
||||
await session.flush()
|
||||
return source
|
||||
|
||||
|
||||
def _page_title(html_text: str, fallback: str) -> str:
|
||||
m = re.search(r'<meta\s+property="og:title"\s+content="([^"]+)"', html_text)
|
||||
if not m:
|
||||
m = re.search(r"<title[^>]*>([^<]+)</title>", html_text, re.I)
|
||||
title = unescape(m.group(1)).strip() if m else ""
|
||||
# 사이트 접미 잡음 제거 (TWI 는 ' - TWI', NB 는 'National Board ...' 꼬리표)
|
||||
title = re.sub(r"\s*[-|·]\s*(TWI|National Board[^-|]*)\s*$", "", title).strip()
|
||||
return title or fallback
|
||||
|
||||
|
||||
async def _ingest_one(session, source: NewsSource, url: str) -> str:
|
||||
"""기사 1건. 반환: 'ok' / 'dup' / 'skip'(추출부족·차단)."""
|
||||
normalized_url = _normalize_url(url)
|
||||
existing = await session.execute(
|
||||
select(Document).where(Document.edit_url.in_([normalized_url, url])).limit(1)
|
||||
)
|
||||
if existing.scalars().first():
|
||||
return "dup"
|
||||
|
||||
try:
|
||||
html_text, final_url = await fetch_page(url)
|
||||
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
|
||||
logger.warning(f"[{source.name}] fetch 실패 skip: {url} — {type(e).__name__}: {e}")
|
||||
return "skip"
|
||||
|
||||
body, engine, engine_ver = _extract_body(html_text)
|
||||
if not engine:
|
||||
logger.warning(f"[{source.name}] 추출 실패 skip (< {_WEB_MIN_BODY_LEN}자): {url}")
|
||||
return "skip"
|
||||
clean_body = _strip_article_footer(body.replace("\x00", ""))
|
||||
if len(clean_body) < _WEB_MIN_BODY_LEN:
|
||||
logger.warning(f"[{source.name}] 푸터 제거 후 본문 부족 skip: {url}")
|
||||
return "skip"
|
||||
|
||||
title = _page_title(html_text, fallback=url.rsplit("/", 1)[-1][:90])
|
||||
article_id = _article_hash(title, "static", source.name)
|
||||
dup2 = await session.execute(
|
||||
select(Document).where(Document.file_hash == article_id).limit(1)
|
||||
)
|
||||
if dup2.scalars().first():
|
||||
return "dup"
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
raw_path = _raw_html_path(source.id, article_id, now)
|
||||
raw_saved = True
|
||||
try:
|
||||
_save_raw_html(raw_path, html_text)
|
||||
except OSError as e:
|
||||
raw_saved = False
|
||||
logger.error(f"[{source.name}] 원본 보존 실패 (ingest 는 진행): {e}")
|
||||
|
||||
doc = Document(
|
||||
file_path=f"crawl/{source.name}/{article_id}",
|
||||
file_hash=article_id,
|
||||
file_format="article",
|
||||
file_size=0, # 아래 extracted_text 확정 후 재계산
|
||||
file_type="note",
|
||||
title=title,
|
||||
extracted_text=f"{title}\n\n{clean_body}",
|
||||
extracted_at=now,
|
||||
extractor_version=f"static+page@{engine}",
|
||||
md_content=clean_body,
|
||||
md_status="success",
|
||||
md_extraction_engine=engine,
|
||||
md_extraction_engine_version=engine_ver,
|
||||
md_format_version="1.0",
|
||||
md_generated_at=now,
|
||||
md_source_hash=hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest(),
|
||||
md_content_hash=hashlib.sha256(clean_body.encode("utf-8")).hexdigest(),
|
||||
content_origin="extracted",
|
||||
source_channel="crawl",
|
||||
data_origin="external",
|
||||
edit_url=normalized_url,
|
||||
review_status="approved",
|
||||
ai_domain="Engineering",
|
||||
ai_sub_group=source.name,
|
||||
ai_tags=[f"Engineering/{source.name}"],
|
||||
extract_meta={
|
||||
"source_id": source.id,
|
||||
"source_name": source.name,
|
||||
"published_at": None, # 정적 코퍼스 — 페이지 발행일 비신뢰, 색인은 채널 게이트로 무조건
|
||||
"fulltext": {
|
||||
"status": "static_corpus",
|
||||
"engine": engine,
|
||||
"final_url": final_url,
|
||||
"raw_html_path": str(raw_path) if raw_saved else None,
|
||||
"body_chars": len(clean_body),
|
||||
"resolved_at": now.isoformat(),
|
||||
},
|
||||
},
|
||||
)
|
||||
doc.file_size = len(doc.extracted_text.encode())
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
|
||||
# crawl 채널 = 발행일 무관 전량 색인 (summarize 는 맥미니 큐 — D-4 lag 관찰 대상)
|
||||
await enqueue_stage(session, doc.id, "summarize")
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
logger.info(f"[{source.name}] ingest {len(clean_body)}자 ({engine}): {title[:60]}")
|
||||
return "ok"
|
||||
|
||||
|
||||
async def run(corpus: str = "all", limit: int = 0) -> None:
|
||||
targets = list(CORPORA) if corpus == "all" else [corpus]
|
||||
for key in targets:
|
||||
spec = CORPORA[key]
|
||||
async with async_session() as session:
|
||||
source = await _get_or_create_source(session, spec)
|
||||
await session.commit()
|
||||
source_id = source.id
|
||||
|
||||
try:
|
||||
urls = await spec["discover"]()
|
||||
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
|
||||
logger.error(f"[{spec['source_name']}] 목록 수집 실패 — corpus 건너뜀: {e}")
|
||||
continue
|
||||
if limit:
|
||||
urls = urls[:limit]
|
||||
logger.info(f"[{spec['source_name']}] 대상 {len(urls)}건 (limit={limit or '없음'})")
|
||||
|
||||
counts = {"ok": 0, "dup": 0, "skip": 0}
|
||||
failed: list[str] = []
|
||||
for i, url in enumerate(urls, 1):
|
||||
# 커밋 10건 단위 — 장시간 배치 중단 시 진행분 보존
|
||||
async with async_session() as session:
|
||||
src = await session.get(NewsSource, source_id)
|
||||
status = await _ingest_one(session, src, url)
|
||||
await session.commit()
|
||||
counts[status] += 1
|
||||
if status == "skip":
|
||||
failed.append(url)
|
||||
if i % 10 == 0:
|
||||
logger.info(f"[{spec['source_name']}] 진행 {i}/{len(urls)} {counts}")
|
||||
|
||||
logger.info(f"[{spec['source_name']}] 완료: {counts}")
|
||||
if failed:
|
||||
logger.warning(
|
||||
f"[{spec['source_name']}] skip {len(failed)}건 — 재시도는 CLI 재실행(멱등):\n "
|
||||
+ "\n ".join(failed)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="C-3 정적 코퍼스 일괄 ingest")
|
||||
parser.add_argument("--corpus", choices=[*CORPORA, "all"], default="all")
|
||||
parser.add_argument("--limit", type=int, default=0, help="corpus 당 상한 (0=전체)")
|
||||
args = parser.parse_args()
|
||||
asyncio.run(run(args.corpus, args.limit))
|
||||
+38
-3
@@ -64,6 +64,11 @@ services:
|
||||
environment:
|
||||
- HF_HOME=/models/huggingface
|
||||
- TORCH_HOME=/models/torch
|
||||
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~3.5GB) 해제가 90% 봉투의 전제.
|
||||
# /ready 는 idle 에서도 200 (fastapi depends_on service_healthy 유지).
|
||||
# 롤백 = MARKER_PRELOAD=1 + MARKER_IDLE_UNLOAD_MINUTES=0.
|
||||
- MARKER_PRELOAD=0
|
||||
- MARKER_IDLE_UNLOAD_MINUTES=${MARKER_IDLE_UNLOAD_MINUTES:-30}
|
||||
volumes:
|
||||
- ${NAS_NFS_PATH:-/mnt/nas/Document_Server}:/documents:ro
|
||||
- marker_models:/models
|
||||
@@ -97,6 +102,11 @@ services:
|
||||
- WHISPER_MODEL=${WHISPER_MODEL:-large-v3}
|
||||
- WHISPER_DEVICE=${WHISPER_DEVICE:-cuda}
|
||||
- WHISPER_COMPUTE_TYPE=${WHISPER_COMPUTE_TYPE:-float16}
|
||||
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~4GB) 해제가 90% 봉투의 전제.
|
||||
# 콜드로드 수초~수십 초는 배치 작업이라 무방 (stt_worker read=1800s 가 흡수).
|
||||
# 롤백 = STT_PRELOAD=1 + STT_IDLE_UNLOAD_MINUTES=0.
|
||||
- STT_PRELOAD=0
|
||||
- STT_IDLE_UNLOAD_MINUTES=${STT_IDLE_UNLOAD_MINUTES:-30}
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
@@ -105,9 +115,9 @@ services:
|
||||
count: 1
|
||||
capabilities: [gpu]
|
||||
healthcheck:
|
||||
# /ready: CUDA 디바이스 + 모델 적재 둘 다 확인. ready=true 만 healthy 처리.
|
||||
# /health 는 단순 liveness 라 모델 미적재 상태도 healthy 로 잡혀 운영 신호로 부적합.
|
||||
test: ["CMD", "python3", "-c", "import json,urllib.request,sys; r=urllib.request.urlopen('http://localhost:3300/ready'); sys.exit(0 if json.load(r).get('ready') else 1)"]
|
||||
# D-1: idle-unload 도입으로 '모델 적재' 는 더 이상 상시 상태가 아님 — cuda 가용성만
|
||||
# healthy 기준. 모델 적재 여부는 /ready 의 models_loaded 필드로 관측(정보성).
|
||||
test: ["CMD", "python3", "-c", "import json,urllib.request,sys; r=urllib.request.urlopen('http://localhost:3300/ready'); sys.exit(0 if json.load(r).get('cuda') else 1)"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
@@ -229,6 +239,31 @@ services:
|
||||
- fastapi
|
||||
restart: unless-stopped
|
||||
|
||||
# crawl-24x7 A-8 1차: 전 소스 헬스 패널 — 내부 전용 (읽기 전용 SELECT 만).
|
||||
# '내부 전용' 성립 구현 = 별도 바인딩뿐 (r4 결정): Tailscale 인터페이스에만 publish.
|
||||
# 기존 SvelteKit 라우트(vhost=Host 헤더 검사=앱 가드 환원)나 프록시 경로 차단(경로 가드
|
||||
# 회귀)으로 옮기지 말 것. caddy/home-caddy 라우트 추가 금지. fastapi/postgres 바인딩 선례.
|
||||
crawl-health:
|
||||
build: ./services/crawl-health
|
||||
ports:
|
||||
- "100.110.63.63:8765:8765"
|
||||
environment:
|
||||
- CRAWL_HEALTH_DSN=postgresql://pkm:${POSTGRES_PASSWORD}@postgres:5432/pkm
|
||||
depends_on:
|
||||
postgres:
|
||||
condition: service_healthy
|
||||
restart: unless-stopped
|
||||
|
||||
# crawl-24x7 B-3: 구독 세션 Playwright fetch 격리 — internal-only (host 포트·caddy 라우트 금지).
|
||||
# 브라우저 hang/크래시가 fastapi APScheduler 를 잠식하지 않게 별도 컨테이너 + mem cap.
|
||||
# 세션 파일(쿠키=credential 등가물)은 repo 밖 호스트 경로 ro mount (600, gitignore 무관 영역).
|
||||
playwright-fetcher:
|
||||
build: ./services/playwright-fetcher
|
||||
volumes:
|
||||
- /home/hyungi/.local/share/crawl-auth:/auth:ro
|
||||
mem_limit: 2g
|
||||
restart: unless-stopped
|
||||
|
||||
caddy:
|
||||
image: caddy:2
|
||||
ports:
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
-- A-3 (plan crawl-24x7-1): 소스 레지스트리 증축 — additive only.
|
||||
-- fetch_method : rss / rss+page / sitemap+page / page / api / signal-only
|
||||
-- fulltext_policy : none(현행 유지) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
|
||||
-- auth_profile : NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터용 슬롯)
|
||||
-- poll_interval_minutes : 소스별 차등 폴링 (NULL=전역 6h 사이클)
|
||||
-- etag / last_modified : 조건부 GET 워터마크 — 받은 그대로 저장·재전송 (상태는 전부 DB, APScheduler in-process)
|
||||
-- feed_content_hash : CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행
|
||||
-- selector_override : 추출 실패 잦은 소스의 site-specific CSS selector (JSONB)
|
||||
-- parser_quirk : rdf / table-strip / gn-redirect 등 파서 특이 케이스
|
||||
ALTER TABLE news_sources
|
||||
ADD COLUMN IF NOT EXISTS fetch_method VARCHAR(20) NOT NULL DEFAULT 'rss',
|
||||
ADD COLUMN IF NOT EXISTS fulltext_policy VARCHAR(20) NOT NULL DEFAULT 'none',
|
||||
ADD COLUMN IF NOT EXISTS auth_profile VARCHAR(50),
|
||||
ADD COLUMN IF NOT EXISTS poll_interval_minutes INTEGER,
|
||||
ADD COLUMN IF NOT EXISTS etag TEXT,
|
||||
ADD COLUMN IF NOT EXISTS last_modified TEXT,
|
||||
ADD COLUMN IF NOT EXISTS feed_content_hash VARCHAR(64),
|
||||
ADD COLUMN IF NOT EXISTS selector_override JSONB,
|
||||
ADD COLUMN IF NOT EXISTS parser_quirk VARCHAR(30);
|
||||
@@ -0,0 +1,3 @@
|
||||
-- 0-5 (a) 확정 (plan crawl-24x7-1): 도메인 자료(안전/공학/철학) 채널 신설 — news 와 분리.
|
||||
-- 신규 값은 같은 트랜잭션 내 사용 금지 (PG 제약) — 본 배치의 다른 마이그레이션은 'crawl' 미사용.
|
||||
ALTER TYPE source_channel ADD VALUE IF NOT EXISTS 'crawl';
|
||||
@@ -0,0 +1,3 @@
|
||||
-- A-2 (plan crawl-24x7-1): RSS 요약 → 기사 페이지 fetch → 4-tier 본문 승격 stage.
|
||||
-- fulltext_policy='page' 소스의 기사에만 news_collector 가 enqueue.
|
||||
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'fulltext';
|
||||
@@ -0,0 +1,19 @@
|
||||
-- A-5 (plan crawl-24x7-1): 소스 건강 — 소스별 실패 격리 기록 + circuit breaker.
|
||||
-- 한 소스가 죽어도 나머지 영향 0. silent skip 누적 방지의 가시성 기반 (A-8 패널이 읽음).
|
||||
-- circuit_state: closed(정상) / open(연속 실패로 지수 backoff 중) / disabled(M회 초과, 수동 복구 대상)
|
||||
-- empty_streak : 200 인데 entries 0 인 연속 fetch 횟수 (피드 부패 감시 — 304/해시동일은 미집계)
|
||||
CREATE TABLE IF NOT EXISTS source_health (
|
||||
id SERIAL PRIMARY KEY,
|
||||
source_id INTEGER NOT NULL REFERENCES news_sources(id) ON DELETE CASCADE,
|
||||
consecutive_failures INTEGER NOT NULL DEFAULT 0,
|
||||
total_fetches BIGINT NOT NULL DEFAULT 0,
|
||||
total_failures BIGINT NOT NULL DEFAULT 0,
|
||||
last_success_at TIMESTAMPTZ,
|
||||
last_error TEXT,
|
||||
last_error_at TIMESTAMPTZ,
|
||||
last_fetch_items INTEGER,
|
||||
empty_streak INTEGER NOT NULL DEFAULT 0,
|
||||
circuit_state VARCHAR(10) NOT NULL DEFAULT 'closed',
|
||||
circuit_opened_at TIMESTAMPTZ,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
@@ -0,0 +1,2 @@
|
||||
-- A-5: source_health 는 news_sources 와 1:1 — upsert 기준 키.
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS uq_source_health_source_id ON source_health (source_id);
|
||||
@@ -0,0 +1,5 @@
|
||||
-- B/C 그룹 (plan crawl-24x7-1, 0-5 확정): 레지스트리에 채널 컬럼 — additive only.
|
||||
-- documents.source_channel 과 동일 enum 재사용 ('crawl' 값은 320 에서 별도 트랜잭션으로 추가 완료).
|
||||
-- 기존 행 전부 'news' 기본값 = 무회귀. crawl 채널 소스의 문서 생성/색인 게이트 분기 기준.
|
||||
ALTER TABLE news_sources
|
||||
ADD COLUMN IF NOT EXISTS source_channel source_channel NOT NULL DEFAULT 'news';
|
||||
@@ -0,0 +1,8 @@
|
||||
-- B-3 (plan crawl-24x7-1): 구독 세션 상태 노출 계약 — additive only.
|
||||
-- relogin_requested: 쓰기 1종 플래그 (A-8 버튼이 기록, 어댑터가 소비 = 수동 half-open).
|
||||
-- 소비 위치 함정(r5 고정): open-스킵 분기보다 앞 — 어댑터 틱마다 확인.
|
||||
-- last_probe_at/ok: 내용 기반 probe 결과 (시간 기반 만료 판정 금지 — silent corruption 차단).
|
||||
ALTER TABLE source_health
|
||||
ADD COLUMN IF NOT EXISTS relogin_requested BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
ADD COLUMN IF NOT EXISTS last_probe_at TIMESTAMPTZ,
|
||||
ADD COLUMN IF NOT EXISTS last_probe_ok BOOLEAN;
|
||||
@@ -0,0 +1,33 @@
|
||||
-- crawl-24x7 사이클 2 소스 seed (B-2 + C-1 안전 + C-5 철학) — 2026-06-10 전 URL live 검증.
|
||||
-- 262 선례: WHERE NOT EXISTS idempotent, 기존 행 보존, 신규만 insert (단일 statement).
|
||||
-- 채널: news = 다이제스트/브리핑 대상 / crawl = 도메인 재료 (0-5 분리).
|
||||
-- 정책: feed-full = 피드 본문이 전문 (UK HSE content:encoded 실측) / page = 기사 페이지 4-tier 승격.
|
||||
-- EU-OSHA 는 후보 등재만 (enabled=false — 카드 C-1 '우선순위 낮음').
|
||||
-- 르몽드 B-3 활성화는 seed 아님 — 세션 박제 후 runtime UPDATE (auth_profile/selector_override).
|
||||
INSERT INTO news_sources
|
||||
(name, country, language, feed_type, feed_url, category, enabled,
|
||||
fetch_method, fulltext_policy, source_channel, parser_quirk)
|
||||
SELECT v.name, v.country, v.language, v.feed_type, v.feed_url, v.category, v.enabled,
|
||||
v.fetch_method, v.fulltext_policy, v.source_channel::source_channel, v.parser_quirk
|
||||
FROM (VALUES
|
||||
-- B-2: Guardian Open Platform (전문 JSON — 스크래핑 불요, GUARDIAN_API_KEY 필요)
|
||||
('Guardian World', 'GB', 'en', 'api', 'https://content.guardianapis.com/search?section=world', 'International', true, 'api', 'none', 'news', NULL),
|
||||
-- C-1 안전 (Safety)
|
||||
('UK HSE Press', 'GB', 'en', 'rss', 'https://press.hse.gov.uk/feed/', 'Safety', true, 'rss', 'feed-full', 'crawl', NULL),
|
||||
('안전신문', 'KR', 'ko', 'rss', 'https://www.safetynews.co.kr/rss/allArticle.xml', 'Safety', true, 'rss', 'page', 'crawl', NULL),
|
||||
('고용노동부 공지', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/notice.do', 'Safety', true, 'rss', 'page', 'crawl', NULL),
|
||||
('고용노동부 정책', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/policy.do', 'Safety', true, 'rss', 'page', 'crawl', NULL),
|
||||
('고용노동부 입법행정예고', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/lawinfo.do', 'Safety', true, 'rss', 'page', 'crawl', NULL),
|
||||
('OSHA QuickTakes', 'US', 'en', 'rss', 'https://www.osha.gov/sites/default/files/quicktakes.xml', 'Safety', true, 'rss', 'page', 'crawl', NULL),
|
||||
('EU-OSHA News', 'EU', 'en', 'rss', 'https://osha.europa.eu/en/rss-feeds/latest/news.xml', 'Safety', false, 'rss', 'page', 'crawl', NULL),
|
||||
-- C-5 철학 (Philosophy)
|
||||
('SEP 신규·개정', 'US', 'en', 'rss', 'https://plato.stanford.edu/rss/sep.xml', 'Philosophy', true, 'rss', 'page', 'crawl', NULL),
|
||||
('1000-Word Philosophy', 'US', 'en', 'rss', 'https://1000wordphilosophy.com/feed/', 'Philosophy', true, 'rss', 'feed-full', 'crawl', NULL),
|
||||
('Doing Philosophy', 'KR', 'ko', 'rss', 'https://doingphilosophy.kr/feed', 'Philosophy', true, 'rss', 'page', 'crawl', NULL),
|
||||
('Aeon', 'GB', 'en', 'rss', 'https://aeon.co/feed.rss', 'Philosophy', true, 'rss', 'page', 'crawl', 'skip-video'),
|
||||
('Psyche', 'GB', 'en', 'rss', 'https://psyche.co/feed.rss', 'Philosophy', true, 'rss', 'page', 'crawl', 'skip-video')
|
||||
) AS v(name, country, language, feed_type, feed_url, category, enabled,
|
||||
fetch_method, fulltext_policy, source_channel, parser_quirk)
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM news_sources ns WHERE ns.name = v.name
|
||||
);
|
||||
@@ -0,0 +1,59 @@
|
||||
"""B-3 구독 세션 1회 수동 박제 (MacBook 등 GUI 머신에서 실행).
|
||||
|
||||
르몽드 = Google OAuth — 자동화 브라우저 로그인은 Google 이 차단하므로
|
||||
로그인 자체는 항상 사람이 headed 브라우저에서 수행하고, 본 스크립트는
|
||||
그 결과(쿠키+localStorage = storage_state JSON)만 박제한다.
|
||||
|
||||
사용 (MacBook):
|
||||
pip install playwright && playwright install chromium
|
||||
python scripts/capture_subscription_session.py --profile lemonde --url https://www.lemonde.fr
|
||||
1) 떠오른 브라우저에서 직접 로그인 (Google OAuth 포함)
|
||||
2) 로그인 완료 확인 후 터미널에서 Enter
|
||||
3) ~/.local/share/crawl-auth/lemonde.json 저장 (600)
|
||||
|
||||
GPU 반영:
|
||||
ssh gpu 'mkdir -p ~/.local/share/crawl-auth && chmod 700 ~/.local/share/crawl-auth'
|
||||
scp ~/.local/share/crawl-auth/lemonde.json gpu:.local/share/crawl-auth/
|
||||
ssh gpu 'chmod 600 ~/.local/share/crawl-auth/lemonde.json'
|
||||
|
||||
세션 만료 후 재로그인도 동일 절차 + source_health.relogin_requested 플래그 set
|
||||
(어댑터가 다음 틱에 half-open probe 로 소비).
|
||||
|
||||
주의: storage_state = credential 등가물. repo 안·백업 대상 경로에 두지 말 것.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
AUTH_DIR = Path.home() / ".local" / "share" / "crawl-auth"
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="B-3 구독 세션 storage_state 박제")
|
||||
parser.add_argument("--profile", required=True, help="예: lemonde")
|
||||
parser.add_argument("--url", required=True, help="로그인 시작 페이지")
|
||||
args = parser.parse_args()
|
||||
|
||||
AUTH_DIR.mkdir(parents=True, exist_ok=True)
|
||||
AUTH_DIR.chmod(0o700)
|
||||
out = AUTH_DIR / f"{args.profile}.json"
|
||||
|
||||
with sync_playwright() as pw:
|
||||
browser = pw.chromium.launch(headless=False)
|
||||
context = browser.new_context(viewport={"width": 1366, "height": 900})
|
||||
page = context.new_page()
|
||||
page.goto(args.url)
|
||||
print(f"\n브라우저에서 로그인을 완료한 뒤 이 터미널에서 Enter 를 누르세요.")
|
||||
input("로그인 완료 후 Enter > ")
|
||||
context.storage_state(path=str(out))
|
||||
browser.close()
|
||||
|
||||
out.chmod(0o600)
|
||||
print(f"저장: {out} (600)")
|
||||
print("다음: scp 로 GPU ~/.local/share/crawl-auth/ 반영 + chmod 600")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,12 @@
|
||||
FROM python:3.12-slim
|
||||
|
||||
WORKDIR /srv
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
COPY server.py .
|
||||
|
||||
EXPOSE 8765
|
||||
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
|
||||
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8765/health')"
|
||||
|
||||
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8765"]
|
||||
@@ -0,0 +1,3 @@
|
||||
fastapi>=0.111.0
|
||||
uvicorn>=0.30.0
|
||||
asyncpg>=0.29.0
|
||||
@@ -0,0 +1,202 @@
|
||||
"""crawl-health — 전 소스 헬스 패널 1차 (A-8, plan crawl-24x7-1)
|
||||
|
||||
읽기 전용 내부 운영 패널. 의존 = 기존 수집 상태(news_sources/source_health/documents/
|
||||
processing_queue SELECT 만) — 쓰기 0.
|
||||
|
||||
[1차] 소스별 last success / 수집 건수 추이(24h/7d) / 연속 실패 / circuit 상태 /
|
||||
빈 피드 streak + fulltext 승격/격하 통계 + 큐 백로그. 비-RSS 소스(C-2 sitemap 등)도
|
||||
같은 표면이 수용 (fetch_method 컬럼 표시 — '구독 소스 패널' 로 좁히지 않는 전 소스 일반화).
|
||||
[2차 범위 외] B-3 상태 계약 도착 시 세션 열 + [재로그인 시도] 버튼(enqueue 방식).
|
||||
|
||||
노출: 별도 바인딩만 — compose 가 Tailscale 인터페이스(100.110.63.63)에만 publish.
|
||||
vhost/경로 가드 방식 금지 (r4: 둘 다 '덜 깨짐' 속성 상실). 앱 레벨 인증 없음 =
|
||||
Tailscale 도달성만이 경계 (fab-server 선례).
|
||||
"""
|
||||
|
||||
import html
|
||||
import logging
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
import asyncpg
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import HTMLResponse, JSONResponse
|
||||
|
||||
logger = logging.getLogger("crawl_health")
|
||||
|
||||
DSN = os.environ.get("CRAWL_HEALTH_DSN", "")
|
||||
|
||||
_pool: asyncpg.Pool | None = None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(_app: FastAPI):
|
||||
global _pool
|
||||
_pool = await asyncpg.create_pool(DSN, min_size=1, max_size=3)
|
||||
yield
|
||||
await _pool.close()
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
|
||||
async def _collect_data() -> dict:
|
||||
async with _pool.acquire() as conn:
|
||||
sources = await conn.fetch(
|
||||
"""
|
||||
SELECT s.id, s.name, s.country, s.enabled, s.feed_type, s.fetch_method,
|
||||
s.fulltext_policy, s.last_fetched_at,
|
||||
h.circuit_state, h.consecutive_failures, h.last_success_at,
|
||||
h.last_error, h.last_error_at, h.last_fetch_items, h.empty_streak,
|
||||
h.total_fetches, h.total_failures
|
||||
FROM news_sources s
|
||||
LEFT JOIN source_health h ON h.source_id = s.id
|
||||
ORDER BY s.enabled DESC, s.name
|
||||
"""
|
||||
)
|
||||
counts = await conn.fetch(
|
||||
"""
|
||||
SELECT s.id,
|
||||
count(d.id) FILTER (WHERE d.extracted_at > now() - interval '24 hours') AS items_24h,
|
||||
count(d.id) AS items_7d
|
||||
FROM news_sources s
|
||||
LEFT JOIN documents d
|
||||
ON d.source_channel = 'news'
|
||||
AND d.extracted_at > now() - interval '7 days'
|
||||
AND d.file_path LIKE 'news/' || s.name || '/%'
|
||||
GROUP BY s.id
|
||||
"""
|
||||
)
|
||||
queue = await conn.fetch(
|
||||
"""
|
||||
SELECT stage::text AS stage, status::text AS status, count(*) AS n,
|
||||
min(created_at) FILTER (WHERE status = 'pending') AS oldest_pending
|
||||
FROM processing_queue
|
||||
WHERE stage IN ('fulltext', 'summarize', 'embed', 'chunk')
|
||||
AND status IN ('pending', 'processing', 'failed')
|
||||
GROUP BY 1, 2
|
||||
ORDER BY 1, 2
|
||||
"""
|
||||
)
|
||||
fulltext = await conn.fetch(
|
||||
"""
|
||||
SELECT extract_meta -> 'fulltext' ->> 'status' AS status, count(*) AS n
|
||||
FROM documents
|
||||
WHERE source_channel = 'news' AND extract_meta ? 'fulltext'
|
||||
GROUP BY 1
|
||||
"""
|
||||
)
|
||||
count_map = {r["id"]: r for r in counts}
|
||||
return {
|
||||
"sources": [
|
||||
{**dict(r),
|
||||
"items_24h": count_map.get(r["id"], {}).get("items_24h", 0),
|
||||
"items_7d": count_map.get(r["id"], {}).get("items_7d", 0)}
|
||||
for r in sources
|
||||
],
|
||||
"queue": [dict(r) for r in queue],
|
||||
"fulltext": [dict(r) for r in fulltext],
|
||||
}
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
"""Liveness — Docker healthcheck 용 (DB 미접근, 프로세스 생존만)."""
|
||||
return {"status": "ok", "service": "crawl-health"}
|
||||
|
||||
|
||||
@app.get("/api/health.json")
|
||||
async def api_health():
|
||||
data = await _collect_data()
|
||||
# asyncpg Record 의 datetime → isoformat 직렬화
|
||||
def _ser(v):
|
||||
return v.isoformat() if hasattr(v, "isoformat") else v
|
||||
return JSONResponse({
|
||||
k: [{kk: _ser(vv) for kk, vv in row.items()} for row in v]
|
||||
for k, v in data.items()
|
||||
})
|
||||
|
||||
|
||||
def _chip(state: str | None, enabled: bool) -> str:
|
||||
if not enabled:
|
||||
return '<span class="chip off">OFF</span>'
|
||||
if state == "disabled":
|
||||
return '<span class="chip err">DISABLED</span>'
|
||||
if state == "open":
|
||||
return '<span class="chip warn">OPEN</span>'
|
||||
return '<span class="chip ok">OK</span>'
|
||||
|
||||
|
||||
def _fmt_ts(v) -> str:
|
||||
return v.strftime("%m-%d %H:%M") if v else "-"
|
||||
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def index():
|
||||
data = await _collect_data()
|
||||
rows = []
|
||||
for s in data["sources"]:
|
||||
err = html.escape((s.get("last_error") or "")[:80])
|
||||
warn_cls = ""
|
||||
if s["enabled"] and (s.get("consecutive_failures") or 0) >= 3:
|
||||
warn_cls = ' class="row-warn"'
|
||||
elif s["enabled"] and (s.get("empty_streak") or 0) >= 8:
|
||||
warn_cls = ' class="row-warn"'
|
||||
rows.append(
|
||||
f"<tr{warn_cls}>"
|
||||
f"<td>{html.escape(s['name'])}</td>"
|
||||
f"<td>{_chip(s.get('circuit_state'), s['enabled'])}</td>"
|
||||
f"<td>{html.escape(s.get('fetch_method') or 'rss')}</td>"
|
||||
f"<td>{html.escape(s.get('fulltext_policy') or 'none')}</td>"
|
||||
f"<td class='num'>{s['items_24h']}</td>"
|
||||
f"<td class='num'>{s['items_7d']}</td>"
|
||||
f"<td class='num'>{s.get('consecutive_failures') or 0}</td>"
|
||||
f"<td class='num'>{s.get('empty_streak') or 0}</td>"
|
||||
f"<td>{_fmt_ts(s.get('last_success_at'))}</td>"
|
||||
f"<td>{_fmt_ts(s.get('last_fetched_at'))}</td>"
|
||||
f"<td class='err-text'>{err}</td>"
|
||||
f"</tr>"
|
||||
)
|
||||
qrows = [
|
||||
f"<tr><td>{html.escape(q['stage'])}</td><td>{html.escape(q['status'])}</td>"
|
||||
f"<td class='num'>{q['n']}</td><td>{_fmt_ts(q.get('oldest_pending'))}</td></tr>"
|
||||
for q in data["queue"]
|
||||
]
|
||||
frows = [
|
||||
f"<tr><td>{html.escape(f['status'] or '-')}</td><td class='num'>{f['n']}</td></tr>"
|
||||
for f in data["fulltext"]
|
||||
]
|
||||
body = f"""<!DOCTYPE html>
|
||||
<html lang="ko"><head><meta charset="utf-8">
|
||||
<title>crawl-health — 전 소스 헬스 패널</title>
|
||||
<style>
|
||||
body {{ font-family: -apple-system, 'Apple SD Gothic Neo', sans-serif; background: #f5f1e8;
|
||||
color: #3d3a33; margin: 0; padding: 28px; }}
|
||||
h1 {{ font-size: 19px; margin: 0 0 4px; }} h2 {{ font-size: 14px; margin: 26px 0 8px; }}
|
||||
.sub {{ color: #8a8474; font-size: 12px; margin-bottom: 18px; }}
|
||||
table {{ border-collapse: collapse; width: 100%; background: #fffdf8; font-size: 12.5px; }}
|
||||
th, td {{ border: 1px solid #e3ddcd; padding: 5px 9px; text-align: left; }}
|
||||
th {{ background: #ece6d6; font-weight: 600; white-space: nowrap; }}
|
||||
td.num {{ text-align: right; font-variant-numeric: tabular-nums; }}
|
||||
td.err-text {{ color: #9a4a3a; font-size: 11.5px; max-width: 320px; }}
|
||||
tr.row-warn td {{ background: #fbf0e4; }}
|
||||
.chip {{ display: inline-block; padding: 1px 8px; border-radius: 9px; font-size: 11px; font-weight: 600; }}
|
||||
.chip.ok {{ background: #dce8d4; color: #3c5a2e; }}
|
||||
.chip.warn {{ background: #f3e0b8; color: #7a5a14; }}
|
||||
.chip.err {{ background: #eecfc6; color: #8a2f1d; }}
|
||||
.chip.off {{ background: #e3ddcd; color: #6e6859; }}
|
||||
</style></head><body>
|
||||
<h1>crawl-health — 전 소스 헬스 패널</h1>
|
||||
<div class="sub">A-8 1차 (피드 수집 헬스) · 내부 전용 (Tailscale 바인딩) · 새로고침 = 실시간 조회</div>
|
||||
<h2>소스 ({len(rows)})</h2>
|
||||
<table><tr><th>소스</th><th>circuit</th><th>fetch</th><th>fulltext</th><th>24h</th><th>7d</th>
|
||||
<th>연속실패</th><th>빈피드</th><th>last success</th><th>last fetch</th><th>last error</th></tr>
|
||||
{''.join(rows)}</table>
|
||||
<h2>처리 큐 (fulltext / summarize / embed / chunk)</h2>
|
||||
<table><tr><th>stage</th><th>status</th><th>건수</th><th>oldest pending</th></tr>
|
||||
{''.join(qrows) or '<tr><td colspan="4">백로그 없음</td></tr>'}</table>
|
||||
<h2>fulltext 승격 누적</h2>
|
||||
<table><tr><th>status</th><th>건수</th></tr>
|
||||
{''.join(frows) or '<tr><td colspan="2">기록 없음 (파일럿 전환 전)</td></tr>'}</table>
|
||||
</body></html>"""
|
||||
return HTMLResponse(body)
|
||||
+109
-28
@@ -1,12 +1,18 @@
|
||||
"""marker-service — POST /convert: PDF → markdown + 추출 이미지 base64.
|
||||
|
||||
Phase 1B (2026-05-01) — 텍스트만 응답, 이미지 폐기.
|
||||
Phase 1B.5 (본 변경) — `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
|
||||
Phase 1B.5 — `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
|
||||
없는 stateless 변환기 유지 (fastapi 가 NAS persist 담당).
|
||||
D-1 (plan crawl-24x7-1, 2026-06-10) — idle-unload 운영 전환:
|
||||
MARKER_PRELOAD=0 : startup warmup 끔 (첫 /convert 시 lazy load)
|
||||
MARKER_IDLE_UNLOAD_MINUTES : N분 유휴 시 모델 해제 (0=비활성, 기존 동작)
|
||||
/ready 는 idle(미적재)에서도 200 — fastapi 의 depends_on service_healthy 가
|
||||
lazy 모드에서 영구 미기동으로 굳는 것 방지. 503 은 warmup_failed 한정.
|
||||
|
||||
plan: ~/.claude/plans/piped-humming-crystal.md
|
||||
"""
|
||||
import base64
|
||||
import gc
|
||||
import hashlib
|
||||
import io
|
||||
import logging
|
||||
@@ -40,6 +46,12 @@ _warmup_done = False
|
||||
_warmup_error: str | None = None
|
||||
_warmup_lock = threading.Lock()
|
||||
|
||||
# D-1 idle-unload 상태 — 전이는 전부 _warmup_lock 아래
|
||||
_PRELOAD = os.getenv("MARKER_PRELOAD", "1") != "0"
|
||||
_IDLE_UNLOAD_MINUTES = int(os.getenv("MARKER_IDLE_UNLOAD_MINUTES", "0"))
|
||||
_inflight = 0
|
||||
_last_used = time.monotonic()
|
||||
|
||||
# 이미지 응답 cap. base64 응답 크기 폭주 방지. 사용자 PDF 풀 측정 (Phase 1D) 시
|
||||
# 가장 이미지 많은 문서가 ~30건 수준 → 200 은 안전 마진. 초과 시 truncate flag 응답.
|
||||
MAX_IMAGES_PER_DOC = int(os.getenv("MARKER_MAX_IMAGES_PER_DOC", "200"))
|
||||
@@ -68,11 +80,67 @@ def _ensure_warmup() -> None:
|
||||
raise
|
||||
|
||||
|
||||
def _acquire_models():
|
||||
"""warmup 보장 + inflight 진입을 원자적으로 — ensure 직후 reaper 가 해제하는 경합 차단."""
|
||||
global _inflight
|
||||
while True:
|
||||
_ensure_warmup()
|
||||
with _warmup_lock:
|
||||
if _warmup_done:
|
||||
_inflight += 1
|
||||
return
|
||||
# ensure 와 lock 재진입 사이에 unload 가 끼어든 희귀 경합 — 재시도
|
||||
|
||||
|
||||
def _release_models():
|
||||
global _inflight, _last_used
|
||||
with _warmup_lock:
|
||||
_inflight -= 1
|
||||
_last_used = time.monotonic()
|
||||
|
||||
|
||||
def _maybe_unload() -> None:
|
||||
"""유휴 시 모델 해제. 변환 중(inflight>0)이면 절대 해제하지 않는다.
|
||||
|
||||
split 변환의 배치 사이 간격은 초 단위 — N>=1분 임계면 배치 사이 해제 없음.
|
||||
"""
|
||||
global _models, _converter, _warmup_done
|
||||
with _warmup_lock:
|
||||
if not _warmup_done or _inflight > 0:
|
||||
return
|
||||
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
|
||||
return
|
||||
_models = None
|
||||
_converter = None
|
||||
_warmup_done = False
|
||||
gc.collect()
|
||||
try:
|
||||
import torch
|
||||
torch.cuda.empty_cache()
|
||||
except Exception:
|
||||
pass
|
||||
logger.info(f"[marker-service] idle-unload: 모델 해제 (유휴 {_IDLE_UNLOAD_MINUTES}분 초과)")
|
||||
|
||||
|
||||
async def _idle_reaper():
|
||||
import asyncio
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
try:
|
||||
_maybe_unload()
|
||||
except Exception:
|
||||
logger.exception("[marker-service] idle reaper 오류")
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup():
|
||||
"""startup hook — async warmup 백그라운드. /ready 가 완료 여부 노출."""
|
||||
"""startup hook — warmup 은 MARKER_PRELOAD 게이트 (D-1: lazy 기본 전환은 compose 가)."""
|
||||
import asyncio
|
||||
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
|
||||
if _PRELOAD:
|
||||
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
|
||||
if _IDLE_UNLOAD_MINUTES > 0:
|
||||
asyncio.create_task(_idle_reaper())
|
||||
logger.info(f"[marker-service] idle-unload 활성: {_IDLE_UNLOAD_MINUTES}분")
|
||||
|
||||
|
||||
class ConvertRequest(BaseModel):
|
||||
@@ -111,7 +179,12 @@ def health():
|
||||
|
||||
@app.get("/ready")
|
||||
async def ready(response: Response):
|
||||
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출."""
|
||||
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출.
|
||||
|
||||
D-1: idle(미적재) = 200. 503 은 warmup_failed 한정 — lazy 모드에서 fastapi
|
||||
depends_on service_healthy 가 영구 미기동으로 굳지 않게. 배포 검증에서
|
||||
'status=ready' 단언하던 runbook 은 강제 warm 호출(/convert 1건)로 대체.
|
||||
"""
|
||||
if _warmup_error:
|
||||
response.status_code = 503
|
||||
return {
|
||||
@@ -121,31 +194,28 @@ async def ready(response: Response):
|
||||
"error": _warmup_error,
|
||||
}
|
||||
if not _warmup_done:
|
||||
response.status_code = 503
|
||||
return {
|
||||
"status": "warming_up",
|
||||
"status": "warming_up" if _PRELOAD else "idle",
|
||||
"engine": "marker",
|
||||
"engine_version": _engine_version,
|
||||
"models_loaded": False,
|
||||
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
|
||||
}
|
||||
return {
|
||||
"status": "ready",
|
||||
"engine": "marker",
|
||||
"engine_version": _engine_version,
|
||||
"models_loaded": True,
|
||||
"inflight": _inflight,
|
||||
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
|
||||
}
|
||||
|
||||
|
||||
@app.post("/convert", response_model=ConvertResponse)
|
||||
async def convert(req: ConvertRequest):
|
||||
_ensure_warmup()
|
||||
|
||||
p = Path(req.file_path)
|
||||
if not p.is_file():
|
||||
raise HTTPException(404, detail={"code": "file_not_found", "message": str(p)})
|
||||
|
||||
start = time.monotonic()
|
||||
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
|
||||
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
|
||||
converter = _converter
|
||||
if req.start_page is not None and req.end_page is not None:
|
||||
if req.start_page < 1 or req.end_page < req.start_page:
|
||||
raise HTTPException(
|
||||
@@ -155,22 +225,33 @@ async def convert(req: ConvertRequest):
|
||||
"message": f"start_page={req.start_page} end_page={req.end_page}",
|
||||
},
|
||||
)
|
||||
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
|
||||
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
|
||||
try:
|
||||
rendered = converter(str(p))
|
||||
except Exception as exc:
|
||||
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
|
||||
raise HTTPException(
|
||||
status_code=422,
|
||||
detail={
|
||||
"code": "conversion_failed",
|
||||
"message": f"{type(exc).__name__}: {exc}",
|
||||
},
|
||||
) from exc
|
||||
|
||||
md_text, _meta, raw_images = text_from_rendered(rendered)
|
||||
elapsed_ms = int((time.monotonic() - start) * 1000)
|
||||
# D-1: warmup 보장 + inflight 진입 원자화 — 변환 중 reaper 해제 차단. 해제는 finally.
|
||||
_acquire_models()
|
||||
try:
|
||||
start = time.monotonic()
|
||||
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
|
||||
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
|
||||
converter = _converter
|
||||
if req.start_page is not None and req.end_page is not None:
|
||||
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
|
||||
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
|
||||
try:
|
||||
rendered = converter(str(p))
|
||||
except Exception as exc:
|
||||
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
|
||||
raise HTTPException(
|
||||
status_code=422,
|
||||
detail={
|
||||
"code": "conversion_failed",
|
||||
"message": f"{type(exc).__name__}: {exc}",
|
||||
},
|
||||
) from exc
|
||||
|
||||
md_text, _meta, raw_images = text_from_rendered(rendered)
|
||||
elapsed_ms = int((time.monotonic() - start) * 1000)
|
||||
finally:
|
||||
_release_models()
|
||||
|
||||
images_payload, truncated = _serialize_images(raw_images, str(p))
|
||||
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
# B-3 / A-1 Tier 2 (plan crawl-24x7-1) — Playwright 격리 컨테이너.
|
||||
# 브라우저 hang/크래시가 fastapi APScheduler 를 잠식하지 않게 별도 서비스로 격리,
|
||||
# 타임아웃 있는 HTTP 호출로만 사용. 요청당 브라우저 기동 = 컨텍스트 누적 메모리 차단.
|
||||
FROM mcr.microsoft.com/playwright/python:v1.47.0-jammy
|
||||
|
||||
WORKDIR /srv
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY server.py .
|
||||
|
||||
# root 로 Chromium 실행 시 sandbox 비활성 강제됨 — 이미지 내장 pwuser(uid 1000)로 실행.
|
||||
# /auth ro mount(호스트 hyungi uid 1000, mode 600)도 동일 uid 라 판독 가능.
|
||||
USER pwuser
|
||||
|
||||
# internal-only — compose 네트워크 전용, host 포트 미매핑 (caddy 라우트 금지)
|
||||
EXPOSE 3400
|
||||
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3400"]
|
||||
@@ -0,0 +1,3 @@
|
||||
fastapi==0.115.*
|
||||
uvicorn==0.32.*
|
||||
playwright==1.47.0
|
||||
@@ -0,0 +1,107 @@
|
||||
"""B-3 구독 세션 Playwright fetcher (plan crawl-24x7-1).
|
||||
|
||||
storage_state JSON(쿠키+localStorage 스냅샷) 기반 인증 페이지 fetch + 내용 기반 probe.
|
||||
- 동시 1 인스턴스 (글로벌 세마포어) — 계정 보호 + 사람 속도는 호출측 politeness 가 담당.
|
||||
- 요청당 브라우저 기동/종료 — 컨텍스트 메모리 누적·hang 잔존 차단 (저빈도라 기동비용 무관).
|
||||
- 세션 파일: /auth/{profile}.json (호스트 ~/.local/share/crawl-auth/, ro mount, 600).
|
||||
부재 = 503 profile_missing (silent fallback 없음 — 호출측이 degrade).
|
||||
- 시간 기반 만료 판정 금지 — probe 는 알려진 유료 기사에서 본문 길이 + 페이월 마커 부재 검증
|
||||
(만료 후 200 '페이월 안내문'이 본문으로 저장되는 silent corruption 차단).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from playwright.async_api import async_playwright, Error as PlaywrightError
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
|
||||
logger = logging.getLogger("playwright-fetcher")
|
||||
|
||||
AUTH_DIR = Path("/auth")
|
||||
NAV_TIMEOUT_MS = 45_000
|
||||
SETTLE_MS = 1_500 # domcontentloaded 후 lazy 본문 settle 대기
|
||||
|
||||
app = FastAPI(title="playwright-fetcher")
|
||||
_browser_slot = asyncio.Semaphore(1) # 동시 1 인스턴스 (B-3 ① persistent 제약과 동일 규율)
|
||||
|
||||
|
||||
class FetchReq(BaseModel):
|
||||
url: str
|
||||
profile: str = Field(pattern=r"^[a-z0-9_-]{1,50}$")
|
||||
|
||||
|
||||
class ProbeReq(BaseModel):
|
||||
profile: str = Field(pattern=r"^[a-z0-9_-]{1,50}$")
|
||||
probe_url: str
|
||||
min_body_chars: int = 800
|
||||
paywall_markers: list[str] = []
|
||||
|
||||
|
||||
def _state_path(profile: str) -> Path:
|
||||
p = AUTH_DIR / f"{profile}.json"
|
||||
if not p.is_file():
|
||||
raise HTTPException(503, detail={"error_reason": "profile_missing", "profile": profile})
|
||||
return p
|
||||
|
||||
|
||||
async def _browse(url: str, state: Path) -> tuple[str, str, str]:
|
||||
"""(html, final_url, visible_text). 요청당 브라우저 — 종료를 finally 로 보장."""
|
||||
async with async_playwright() as pw:
|
||||
browser = await pw.chromium.launch(headless=True)
|
||||
try:
|
||||
context = await browser.new_context(
|
||||
storage_state=str(state),
|
||||
viewport={"width": 1366, "height": 900},
|
||||
locale="fr-FR",
|
||||
)
|
||||
page = await context.new_page()
|
||||
await page.goto(url, wait_until="domcontentloaded", timeout=NAV_TIMEOUT_MS)
|
||||
await page.wait_for_timeout(SETTLE_MS)
|
||||
html = await page.content()
|
||||
final_url = page.url
|
||||
text = await page.evaluate("document.body ? document.body.innerText : ''")
|
||||
return html, final_url, text
|
||||
finally:
|
||||
await browser.close()
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
def health():
|
||||
profiles = sorted(p.stem for p in AUTH_DIR.glob("*.json")) if AUTH_DIR.is_dir() else []
|
||||
return {"status": "ok", "profiles": profiles}
|
||||
|
||||
|
||||
@app.post("/fetch")
|
||||
async def fetch(req: FetchReq):
|
||||
state = _state_path(req.profile)
|
||||
async with _browser_slot:
|
||||
try:
|
||||
html, final_url, _ = await _browse(req.url, state)
|
||||
except PlaywrightError as e:
|
||||
logger.warning("fetch 실패 %s: %s", req.url, e)
|
||||
raise HTTPException(502, detail={"error_reason": "browse_failed", "message": str(e)[:300]})
|
||||
logger.info("fetch ok profile=%s %s (%d bytes)", req.profile, req.url, len(html))
|
||||
return {"html": html, "final_url": final_url}
|
||||
|
||||
|
||||
@app.post("/probe")
|
||||
async def probe(req: ProbeReq):
|
||||
"""내용 기반 세션 probe — ok=False 사유를 명시 반환 (호출측이 health 에 기록)."""
|
||||
state = _state_path(req.profile)
|
||||
async with _browser_slot:
|
||||
try:
|
||||
_, final_url, text = await _browse(req.probe_url, state)
|
||||
except PlaywrightError as e:
|
||||
return {"ok": False, "reason": f"browse_failed: {str(e)[:200]}", "body_chars": 0}
|
||||
body_chars = len(text.strip())
|
||||
hit = next((m for m in req.paywall_markers if m and m.lower() in text.lower()), None)
|
||||
if hit:
|
||||
return {"ok": False, "reason": f"paywall_marker: {hit}", "body_chars": body_chars}
|
||||
if body_chars < req.min_body_chars:
|
||||
return {"ok": False, "reason": f"body_too_short: {body_chars} < {req.min_body_chars}",
|
||||
"body_chars": body_chars}
|
||||
logger.info("probe ok profile=%s (%d chars, final=%s)", req.profile, body_chars, final_url)
|
||||
return {"ok": True, "reason": None, "body_chars": body_chars}
|
||||
+83
-27
@@ -1,14 +1,23 @@
|
||||
"""STT 마이크로서비스 — faster-whisper (GPU) 기반 음성 전사.
|
||||
|
||||
filePath → {text, segments:[{start,end,text}]}.
|
||||
모델은 startup 에서 eager preload (Docker /ready healthcheck 가 모델 적재까지 검증).
|
||||
기본 모델 large-v3 (VRAM ~3GB, float16). 환경변수로 교체 가능.
|
||||
|
||||
환경변수 `STT_PRELOAD=0` 으로 lazy 로 강제 가능 (개발/테스트용).
|
||||
D-1 (plan crawl-24x7-1, 2026-06-10) — idle-unload 운영 전환:
|
||||
STT_PRELOAD=0 : startup eager preload 끔 (첫 요청 시 lazy load)
|
||||
STT_IDLE_UNLOAD_MINUTES: N분 유휴 시 모델 해제 (0=비활성, 기존 동작).
|
||||
faster-whisper=CTranslate2 라 torch 미설치 — 해제는
|
||||
참조 제거 + gc (CTranslate2 가 소멸 시 VRAM 반환).
|
||||
콜드로드 수초~수십 초는 호출측(stt_worker read=1800s)이 흡수. healthcheck 는
|
||||
cuda 가용성 기준 (compose) — 모델 적재는 더 이상 상시 상태가 아니다.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import gc
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import unicodedata
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
@@ -17,18 +26,26 @@ from fastapi import FastAPI
|
||||
|
||||
logger = logging.getLogger("stt")
|
||||
|
||||
_IDLE_UNLOAD_MINUTES = int(os.getenv("STT_IDLE_UNLOAD_MINUTES", "0"))
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(_app: FastAPI):
|
||||
# startup: 모델 eager preload 시도. 실패해도 프로세스는 살아 있고
|
||||
# /ready 가 false 로 남아 healthcheck 가 unhealthy 처리.
|
||||
# /ready 의 models_loaded 가 false 로 남는다.
|
||||
if os.getenv("STT_PRELOAD", "1") != "0":
|
||||
try:
|
||||
_load_model()
|
||||
logger.info("stt model preloaded: %s (%s, %s)", _MODEL_NAME, _DEVICE, _COMPUTE_TYPE)
|
||||
except Exception as e:
|
||||
logger.exception("stt model preload failed: %s", e)
|
||||
reaper = None
|
||||
if _IDLE_UNLOAD_MINUTES > 0:
|
||||
reaper = asyncio.create_task(_idle_reaper())
|
||||
logger.info("stt idle-unload 활성: %d분", _IDLE_UNLOAD_MINUTES)
|
||||
yield
|
||||
if reaper:
|
||||
reaper.cancel()
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
@@ -38,6 +55,11 @@ _MODEL_NAME = os.getenv("WHISPER_MODEL", "large-v3")
|
||||
_DEVICE = os.getenv("WHISPER_DEVICE", "cuda")
|
||||
_COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "float16")
|
||||
|
||||
# load/unload/inflight 상태 전이는 전부 이 lock 아래 (cold 동시 요청 이중 로드 방지 포함)
|
||||
_model_lock = threading.Lock()
|
||||
_inflight = 0
|
||||
_last_used = time.monotonic()
|
||||
|
||||
|
||||
def _resolve_path(file_path: str) -> Path | None:
|
||||
"""NFC(DB) vs NFD(NFS) 한글 경로 정규화 차이 흡수. OCR 서비스와 동일 패턴."""
|
||||
@@ -61,14 +83,38 @@ def _resolve_path(file_path: str) -> Path | None:
|
||||
|
||||
|
||||
def _load_model():
|
||||
"""faster-whisper lazy loading — 첫 호출 시만 VRAM 점유."""
|
||||
"""faster-whisper lazy loading — 첫 호출 시만 VRAM 점유. lock 으로 이중 로드 방지."""
|
||||
global _model
|
||||
if _model is not None:
|
||||
return _model
|
||||
from faster_whisper import WhisperModel
|
||||
with _model_lock:
|
||||
if _model is None:
|
||||
from faster_whisper import WhisperModel
|
||||
logger.info("stt model loading: %s (%s, %s)", _MODEL_NAME, _DEVICE, _COMPUTE_TYPE)
|
||||
_model = WhisperModel(_MODEL_NAME, device=_DEVICE, compute_type=_COMPUTE_TYPE)
|
||||
return _model
|
||||
|
||||
_model = WhisperModel(_MODEL_NAME, device=_DEVICE, compute_type=_COMPUTE_TYPE)
|
||||
return _model
|
||||
|
||||
def _maybe_unload() -> None:
|
||||
"""유휴 시 모델 해제. 처리 중(inflight>0)이면 절대 해제하지 않는다."""
|
||||
global _model
|
||||
with _model_lock:
|
||||
if _model is None or _inflight > 0:
|
||||
return
|
||||
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
|
||||
return
|
||||
_model = None
|
||||
gc.collect()
|
||||
logger.info("stt idle-unload: whisper 모델 해제 (유휴 %d분 초과)", _IDLE_UNLOAD_MINUTES)
|
||||
|
||||
|
||||
async def _idle_reaper():
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
try:
|
||||
_maybe_unload()
|
||||
except Exception:
|
||||
logger.exception("stt idle reaper 오류")
|
||||
|
||||
|
||||
def _cuda_device_count() -> int:
|
||||
@@ -87,7 +133,7 @@ def health():
|
||||
|
||||
@app.get("/ready")
|
||||
def ready():
|
||||
"""Readiness — CUDA + 모델 상태. 배포 검증용."""
|
||||
"""Readiness — CUDA + 모델 상태. healthcheck 는 cuda 만 본다 (D-1 idle-unload)."""
|
||||
count = _cuda_device_count()
|
||||
cuda_ok = count > 0
|
||||
models_loaded = _model is not None
|
||||
@@ -98,6 +144,8 @@ def ready():
|
||||
"models_loaded": models_loaded,
|
||||
"model": _MODEL_NAME,
|
||||
"compute_type": _COMPUTE_TYPE,
|
||||
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
|
||||
"inflight": _inflight,
|
||||
}
|
||||
|
||||
|
||||
@@ -121,6 +169,7 @@ async def transcribe(body: dict):
|
||||
"duration": 1832.5
|
||||
}
|
||||
"""
|
||||
global _inflight, _last_used
|
||||
raw_path = body["filePath"]
|
||||
langs = body.get("langs")
|
||||
beam_size = int(body.get("beamSize", 5))
|
||||
@@ -129,28 +178,35 @@ async def transcribe(body: dict):
|
||||
if resolved is None:
|
||||
return {"error": f"파일 없음: {raw_path}", "text": "", "segments": []}
|
||||
|
||||
model = _load_model()
|
||||
with _model_lock:
|
||||
_inflight += 1
|
||||
try:
|
||||
model = _load_model()
|
||||
|
||||
language = None
|
||||
if isinstance(langs, list) and len(langs) == 1:
|
||||
language = langs[0]
|
||||
language = None
|
||||
if isinstance(langs, list) and len(langs) == 1:
|
||||
language = langs[0]
|
||||
|
||||
segments_iter, info = model.transcribe(
|
||||
str(resolved),
|
||||
beam_size=beam_size,
|
||||
language=language,
|
||||
vad_filter=True,
|
||||
)
|
||||
segments_iter, info = model.transcribe(
|
||||
str(resolved),
|
||||
beam_size=beam_size,
|
||||
language=language,
|
||||
vad_filter=True,
|
||||
)
|
||||
|
||||
segments = []
|
||||
parts = []
|
||||
for seg in segments_iter:
|
||||
segments.append({
|
||||
"start": round(float(seg.start), 2),
|
||||
"end": round(float(seg.end), 2),
|
||||
"text": seg.text.strip(),
|
||||
})
|
||||
parts.append(seg.text)
|
||||
segments = []
|
||||
parts = []
|
||||
for seg in segments_iter:
|
||||
segments.append({
|
||||
"start": round(float(seg.start), 2),
|
||||
"end": round(float(seg.end), 2),
|
||||
"text": seg.text.strip(),
|
||||
})
|
||||
parts.append(seg.text)
|
||||
finally:
|
||||
with _model_lock:
|
||||
_inflight -= 1
|
||||
_last_used = time.monotonic()
|
||||
|
||||
return {
|
||||
"text": " ".join(p.strip() for p in parts).strip(),
|
||||
|
||||
File diff suppressed because one or more lines are too long
+1
@@ -0,0 +1 @@
|
||||
{"body":{"pageNo":1,"totalCount":1,"numOfRows":5,"items":{"item":[{"filenm":"컨베이어에 끼임.pdf","filepath":"https://portal.kosha.or.kr/openapi/v1/file/down/stdboard/B2025022104002/202605281621537G75H2/D0801000010001","boardno":"202605281621537G75H2"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}}
|
||||
+1
@@ -0,0 +1 @@
|
||||
{"body":{"pageNo":1,"totalCount":6334,"numOfRows":3,"items":{"item":[{"business":"제조업","contents":"2026.01.00(월) 07:30경, 경기도 소재 OOOO(주)에서 재해자가 골재 이송 컨베이어 상부의 이물질을 제거하던 중,다리가 컨베이어 벨트와 테일 풀리 (Tail Pulley)* 사이에 끼임 *컨베이어의 아래쪽 끝단에서 회전하며 벨트를 순환시키는 원통형 기계장치","atcflcnt":1,"keyword":"컨베이어에 끼임","boardno":"202605281621537G75H2"},{"business":"건설업","contents":"2025. 8. 00. (금) 11:12 경 경기도 소재 OOO 신축공 사현장에서 데크플레이트 설치 중 밟고 있던 미고정 데크플레이트가 탈락하며 약 7m 높이에서 추락함","atcflcnt":1,"keyword":"데크플레이트 설치 작업 중 추락","boardno":"20260528162031VZLE93"},{"business":"건설업","contents":"2025. 06. 00.(금) 12:35경, 경북 봉화군 소재 (주)OOOO 침전저류지 현장에서 타워크레인 전도 후 매립된 케이크*(오염토)를 굴착 및 운반 작업 중, 사면의 토사와 타워크레인 기초구조물이 무너지며 하단에서 작업 중이던 굴착기가 매몰됨 * 분말 상태의 원료에서 아연을 채취한 후 남은 중금속 부산물(산화칼슘, 납, 산화철, 황산 등)을 장기간 매립하여 만들어지는 고체 형태의 오염 토양 덩어리","atcflcnt":1,"keyword":"사면 굴착 작업 중 매몰","boardno":"20260527153100O7QX25"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}}
|
||||
+1
@@ -0,0 +1 @@
|
||||
{"body":{"pageNo":1,"totalCount":1039,"numOfRows":3,"items":{"item":[{"techGdlnNm":"구리에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-1-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883045/7"},{"techGdlnNm":"마그네슘에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-4-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883165/3"},{"techGdlnNm":"백금에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-6-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883187/3"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}}
|
||||
@@ -0,0 +1,139 @@
|
||||
"""crawl-24x7 사이클 2 — 순수 함수/형태 회귀 테스트 (DB 불요).
|
||||
|
||||
Guardian 호출 형태 + fixture 응답 파싱 + 채널 정체성 + B-5 quirk.
|
||||
fixture = tests/fixtures/guardian_open_platform_search_response.json
|
||||
(2026-06-10 실키 live 박제, api-key 응답 본문 미포함 확인 — [[feedback_external_api_fixture_first]]).
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from workers.news_collector import (
|
||||
_article_hash,
|
||||
_doc_identity,
|
||||
_guardian_request,
|
||||
_normalize_category,
|
||||
)
|
||||
|
||||
FIXTURE = Path(__file__).parent / "fixtures" / "guardian_open_platform_search_response.json"
|
||||
|
||||
|
||||
def _make_source(**kw):
|
||||
"""ORM 인스턴스 없이 속성만 흉내 (식별성 함수는 속성 접근만 사용)."""
|
||||
class S:
|
||||
pass
|
||||
s = S()
|
||||
s.source_channel = kw.get("source_channel", "news")
|
||||
s.parser_quirk = kw.get("parser_quirk")
|
||||
return s
|
||||
|
||||
|
||||
class TestGuardianCallShape:
|
||||
def test_request_shape_matches_fixture_recipe(self):
|
||||
"""fixture 박제 시 사용한 호출과 단일 source-of-truth 정합
|
||||
([[feedback_fixture_first_call_shape]])."""
|
||||
endpoint, params = _guardian_request(
|
||||
"https://content.guardianapis.com/search?section=world", "KEY"
|
||||
)
|
||||
assert endpoint == "https://content.guardianapis.com/search"
|
||||
assert params["section"] == "world"
|
||||
assert params["show-fields"] == "bodyText,trailText"
|
||||
assert params["order-by"] == "newest"
|
||||
assert params["api-key"] == "KEY"
|
||||
|
||||
def test_feed_url_query_overridden_by_fixed_fields(self):
|
||||
# feed_url 에 show-fields 가 잘못 박혀 있어도 고정 필드가 이긴다 (dict merge 순서)
|
||||
_, params = _guardian_request(
|
||||
"https://content.guardianapis.com/search?section=world&show-fields=headline", "K"
|
||||
)
|
||||
assert params["show-fields"] == "bodyText,trailText"
|
||||
|
||||
|
||||
class TestGuardianFixtureParsing:
|
||||
def test_fixture_response_shape(self):
|
||||
payload = json.loads(FIXTURE.read_text())["response"]
|
||||
assert payload["status"] == "ok"
|
||||
assert payload["results"], "fixture 에 결과 0건"
|
||||
for item in payload["results"]:
|
||||
assert item["webTitle"].strip()
|
||||
assert item["webUrl"].startswith("https://")
|
||||
assert "webPublicationDate" in item
|
||||
assert "sectionName" in item
|
||||
fields = item.get("fields") or {}
|
||||
assert "bodyText" in fields and "trailText" in fields
|
||||
|
||||
def test_fixture_bodytext_is_fulltext_grade(self):
|
||||
payload = json.loads(FIXTURE.read_text())["response"]
|
||||
# 전문 게이트(200자)를 fixture 가 통과해야 어댑터 is_full 경로가 산다
|
||||
assert any(len(i["fields"]["bodyText"]) >= 200 for i in payload["results"])
|
||||
|
||||
def test_fixture_contains_no_api_key(self):
|
||||
assert "api-key" not in FIXTURE.read_text()
|
||||
|
||||
|
||||
class TestChannelIdentity:
|
||||
def test_news_channel_unchanged(self):
|
||||
ident = _doc_identity(_make_source(source_channel="news"), "경향신문", "Society")
|
||||
assert ident == {
|
||||
"path_prefix": "news",
|
||||
"ai_domain": "News",
|
||||
"ai_tags": ["News/경향신문/Society"],
|
||||
}
|
||||
|
||||
def test_crawl_channel_domain_identity(self):
|
||||
ident = _doc_identity(_make_source(source_channel="crawl"), "TWI", "Engineering")
|
||||
assert ident["path_prefix"] == "crawl"
|
||||
assert ident["ai_domain"] == "Engineering"
|
||||
assert ident["ai_tags"] == ["Engineering/TWI"]
|
||||
|
||||
def test_crawl_channel_unknown_category_falls_back(self):
|
||||
ident = _doc_identity(_make_source(source_channel="crawl"), "X", "Other")
|
||||
assert ident["ai_domain"] == "Domain"
|
||||
|
||||
def test_category_map_has_domain_axes(self):
|
||||
assert _normalize_category("안전") == "Safety"
|
||||
assert _normalize_category("Engineering") == "Engineering"
|
||||
assert _normalize_category("철학") == "Philosophy"
|
||||
|
||||
|
||||
class TestSkipVideoQuirk:
|
||||
PATTERN = re.compile(r"/videos?/")
|
||||
|
||||
def test_video_urls_match(self):
|
||||
assert self.PATTERN.search("https://psyche.co/videos/some-film")
|
||||
assert self.PATTERN.search("https://aeon.co/video/another")
|
||||
|
||||
def test_article_urls_pass(self):
|
||||
assert not self.PATTERN.search("https://psyche.co/ideas/how-to-think")
|
||||
|
||||
|
||||
class TestRedirect304Distinction:
|
||||
"""httpx is_redirect 가 304(3xx 전체)에 True 라 redirect 로 오인 → 조건부 GET
|
||||
안정 피드가 'redirect 3회 초과'로 전멸하던 버그. has_redirect_location 으로 구분."""
|
||||
|
||||
def test_304_is_not_a_redirect_location(self):
|
||||
import httpx
|
||||
r = httpx.Response(304, request=httpx.Request("GET", "https://x/"))
|
||||
assert r.is_redirect is True # httpx 함정: 304 도 is_redirect
|
||||
assert r.has_redirect_location is False # 우리가 써야 하는 정확한 판별
|
||||
|
||||
def test_real_redirect_has_location(self):
|
||||
import httpx
|
||||
r = httpx.Response(301, headers={"location": "https://y/"},
|
||||
request=httpx.Request("GET", "https://x/"))
|
||||
assert r.has_redirect_location is True
|
||||
|
||||
def test_collector_uses_has_redirect_location(self):
|
||||
import inspect
|
||||
from workers import news_collector
|
||||
src = inspect.getsource(news_collector._fetch_rss)
|
||||
assert "has_redirect_location" in src
|
||||
assert "while resp.is_redirect" not in src # 옛 버그 패턴 부재
|
||||
|
||||
|
||||
class TestArticleHashStability:
|
||||
def test_static_corpus_hash_deterministic(self):
|
||||
a = _article_hash("Creep and Creep Failures", "static", "National Board 기술 아티클")
|
||||
b = _article_hash("Creep and Creep Failures", "static", "National Board 기술 아티클")
|
||||
assert a == b and len(a) == 32
|
||||
Reference in New Issue
Block a user