Files
hyungi_document_server/services/crawl-health/server.py
hyungi 3df0ca53ab feat(services): crawl-24x7 A-8 헬스 패널 + D-1 stt/marker idle-unload
A-8 1차: crawl-health 컨테이너(100.110.63.63:8765 Tailscale 바인딩 전용, 읽기 전용 SELECT, caddy 라우트 금지).
D-1 전제 작업: STT_PRELOAD=0+30분 유휴 해제(lock+inflight+reaper), marker MARKER_PRELOAD=0+idle-unload,
/ready idle=200(503=warmup_failed 한정 — fastapi depends_on 정합), healthcheck cuda 기준 전환.
2026-06-10 13:03:31 +09:00

203 lines
8.2 KiB
Python

"""crawl-health — 전 소스 헬스 패널 1차 (A-8, plan crawl-24x7-1)
읽기 전용 내부 운영 패널. 의존 = 기존 수집 상태(news_sources/source_health/documents/
processing_queue SELECT 만) — 쓰기 0.
[1차] 소스별 last success / 수집 건수 추이(24h/7d) / 연속 실패 / circuit 상태 /
빈 피드 streak + fulltext 승격/격하 통계 + 큐 백로그. 비-RSS 소스(C-2 sitemap 등)도
같은 표면이 수용 (fetch_method 컬럼 표시 — '구독 소스 패널' 로 좁히지 않는 전 소스 일반화).
[2차 범위 외] B-3 상태 계약 도착 시 세션 열 + [재로그인 시도] 버튼(enqueue 방식).
노출: 별도 바인딩만 — compose 가 Tailscale 인터페이스(100.110.63.63)에만 publish.
vhost/경로 가드 방식 금지 (r4: 둘 다 '덜 깨짐' 속성 상실). 앱 레벨 인증 없음 =
Tailscale 도달성만이 경계 (fab-server 선례).
"""
import html
import logging
import os
from contextlib import asynccontextmanager
import asyncpg
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, JSONResponse
logger = logging.getLogger("crawl_health")
DSN = os.environ.get("CRAWL_HEALTH_DSN", "")
_pool: asyncpg.Pool | None = None
@asynccontextmanager
async def lifespan(_app: FastAPI):
global _pool
_pool = await asyncpg.create_pool(DSN, min_size=1, max_size=3)
yield
await _pool.close()
app = FastAPI(lifespan=lifespan)
async def _collect_data() -> dict:
async with _pool.acquire() as conn:
sources = await conn.fetch(
"""
SELECT s.id, s.name, s.country, s.enabled, s.feed_type, s.fetch_method,
s.fulltext_policy, s.last_fetched_at,
h.circuit_state, h.consecutive_failures, h.last_success_at,
h.last_error, h.last_error_at, h.last_fetch_items, h.empty_streak,
h.total_fetches, h.total_failures
FROM news_sources s
LEFT JOIN source_health h ON h.source_id = s.id
ORDER BY s.enabled DESC, s.name
"""
)
counts = await conn.fetch(
"""
SELECT s.id,
count(d.id) FILTER (WHERE d.extracted_at > now() - interval '24 hours') AS items_24h,
count(d.id) AS items_7d
FROM news_sources s
LEFT JOIN documents d
ON d.source_channel = 'news'
AND d.extracted_at > now() - interval '7 days'
AND d.file_path LIKE 'news/' || s.name || '/%'
GROUP BY s.id
"""
)
queue = await conn.fetch(
"""
SELECT stage::text AS stage, status::text AS status, count(*) AS n,
min(created_at) FILTER (WHERE status = 'pending') AS oldest_pending
FROM processing_queue
WHERE stage IN ('fulltext', 'summarize', 'embed', 'chunk')
AND status IN ('pending', 'processing', 'failed')
GROUP BY 1, 2
ORDER BY 1, 2
"""
)
fulltext = await conn.fetch(
"""
SELECT extract_meta -> 'fulltext' ->> 'status' AS status, count(*) AS n
FROM documents
WHERE source_channel = 'news' AND extract_meta ? 'fulltext'
GROUP BY 1
"""
)
count_map = {r["id"]: r for r in counts}
return {
"sources": [
{**dict(r),
"items_24h": count_map.get(r["id"], {}).get("items_24h", 0),
"items_7d": count_map.get(r["id"], {}).get("items_7d", 0)}
for r in sources
],
"queue": [dict(r) for r in queue],
"fulltext": [dict(r) for r in fulltext],
}
@app.get("/health")
async def health():
"""Liveness — Docker healthcheck 용 (DB 미접근, 프로세스 생존만)."""
return {"status": "ok", "service": "crawl-health"}
@app.get("/api/health.json")
async def api_health():
data = await _collect_data()
# asyncpg Record 의 datetime → isoformat 직렬화
def _ser(v):
return v.isoformat() if hasattr(v, "isoformat") else v
return JSONResponse({
k: [{kk: _ser(vv) for kk, vv in row.items()} for row in v]
for k, v in data.items()
})
def _chip(state: str | None, enabled: bool) -> str:
if not enabled:
return '<span class="chip off">OFF</span>'
if state == "disabled":
return '<span class="chip err">DISABLED</span>'
if state == "open":
return '<span class="chip warn">OPEN</span>'
return '<span class="chip ok">OK</span>'
def _fmt_ts(v) -> str:
return v.strftime("%m-%d %H:%M") if v else "-"
@app.get("/", response_class=HTMLResponse)
async def index():
data = await _collect_data()
rows = []
for s in data["sources"]:
err = html.escape((s.get("last_error") or "")[:80])
warn_cls = ""
if s["enabled"] and (s.get("consecutive_failures") or 0) >= 3:
warn_cls = ' class="row-warn"'
elif s["enabled"] and (s.get("empty_streak") or 0) >= 8:
warn_cls = ' class="row-warn"'
rows.append(
f"<tr{warn_cls}>"
f"<td>{html.escape(s['name'])}</td>"
f"<td>{_chip(s.get('circuit_state'), s['enabled'])}</td>"
f"<td>{html.escape(s.get('fetch_method') or 'rss')}</td>"
f"<td>{html.escape(s.get('fulltext_policy') or 'none')}</td>"
f"<td class='num'>{s['items_24h']}</td>"
f"<td class='num'>{s['items_7d']}</td>"
f"<td class='num'>{s.get('consecutive_failures') or 0}</td>"
f"<td class='num'>{s.get('empty_streak') or 0}</td>"
f"<td>{_fmt_ts(s.get('last_success_at'))}</td>"
f"<td>{_fmt_ts(s.get('last_fetched_at'))}</td>"
f"<td class='err-text'>{err}</td>"
f"</tr>"
)
qrows = [
f"<tr><td>{html.escape(q['stage'])}</td><td>{html.escape(q['status'])}</td>"
f"<td class='num'>{q['n']}</td><td>{_fmt_ts(q.get('oldest_pending'))}</td></tr>"
for q in data["queue"]
]
frows = [
f"<tr><td>{html.escape(f['status'] or '-')}</td><td class='num'>{f['n']}</td></tr>"
for f in data["fulltext"]
]
body = f"""<!DOCTYPE html>
<html lang="ko"><head><meta charset="utf-8">
<title>crawl-health — 전 소스 헬스 패널</title>
<style>
body {{ font-family: -apple-system, 'Apple SD Gothic Neo', sans-serif; background: #f5f1e8;
color: #3d3a33; margin: 0; padding: 28px; }}
h1 {{ font-size: 19px; margin: 0 0 4px; }} h2 {{ font-size: 14px; margin: 26px 0 8px; }}
.sub {{ color: #8a8474; font-size: 12px; margin-bottom: 18px; }}
table {{ border-collapse: collapse; width: 100%; background: #fffdf8; font-size: 12.5px; }}
th, td {{ border: 1px solid #e3ddcd; padding: 5px 9px; text-align: left; }}
th {{ background: #ece6d6; font-weight: 600; white-space: nowrap; }}
td.num {{ text-align: right; font-variant-numeric: tabular-nums; }}
td.err-text {{ color: #9a4a3a; font-size: 11.5px; max-width: 320px; }}
tr.row-warn td {{ background: #fbf0e4; }}
.chip {{ display: inline-block; padding: 1px 8px; border-radius: 9px; font-size: 11px; font-weight: 600; }}
.chip.ok {{ background: #dce8d4; color: #3c5a2e; }}
.chip.warn {{ background: #f3e0b8; color: #7a5a14; }}
.chip.err {{ background: #eecfc6; color: #8a2f1d; }}
.chip.off {{ background: #e3ddcd; color: #6e6859; }}
</style></head><body>
<h1>crawl-health — 전 소스 헬스 패널</h1>
<div class="sub">A-8 1차 (피드 수집 헬스) · 내부 전용 (Tailscale 바인딩) · 새로고침 = 실시간 조회</div>
<h2>소스 ({len(rows)})</h2>
<table><tr><th>소스</th><th>circuit</th><th>fetch</th><th>fulltext</th><th>24h</th><th>7d</th>
<th>연속실패</th><th>빈피드</th><th>last success</th><th>last fetch</th><th>last error</th></tr>
{''.join(rows)}</table>
<h2>처리 큐 (fulltext / summarize / embed / chunk)</h2>
<table><tr><th>stage</th><th>status</th><th>건수</th><th>oldest pending</th></tr>
{''.join(qrows) or '<tr><td colspan="4">백로그 없음</td></tr>'}</table>
<h2>fulltext 승격 누적</h2>
<table><tr><th>status</th><th>건수</th></tr>
{''.join(frows) or '<tr><td colspan="2">기록 없음 (파일럿 전환 전)</td></tr>'}</table>
</body></html>"""
return HTMLResponse(body)