Files
hyungi_document_server/scripts/audit_study_question_markdown.py
T
hyungi 63be005c6f fix(security): 보안 위생 5건 — library admin 게이트·edit_url SSRF·보안헤더·8080 바인드·하드코딩 비번 제거
M3 library.py: categories POST/PATCH/DELETE + facets POST 를 get_current_user→require_admin
(공유 분류 CRUD 를 17주체→admin 한정, news/digest 패턴 정합).
M1 documents.py: update_document PATCH 에 edit_url validate_feed_url 가드 — 내부/메타데이터 주소
후속 fetch(fulltext_worker) latent SSRF 차단(API 레이어 무방비 해소, news.py 동형).
Caddyfile: 보안 헤더(nosniff·X-Frame SAMEORIGIN·Referrer-Policy·-Server). HSTS 는 edge 소관.
compose: caddy 8080:80 0.0.0.0→127.0.0.1 (LAN 우회 차단, 실 ingress=home-caddy→caddy:80 도커망).
scripts: 하드코딩 죽은 DB 비번 → os.environ (1차 감사 누락분, .env 한정 점검이 놓침).

별도(DB): test-% 계정 12개 비활성화 (공유풀 주체 17→5, 랜덤해시라 비번노출 아님·위생).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-20 05:48:02 +00:00

470 lines
16 KiB
Python

"""scripts/audit_study_question_markdown.py — study_questions DB 텍스트 정합성 audit.
사용:
docker compose exec fastapi python /app/scripts/audit_study_question_markdown.py \\
--round "2019년 1회"
기본 동작 (한 번에 끝):
1. HC dry-run: 자동 fix 가능한 포맷 찌꺼기 detect.
- HC-1 outer fence wrap (전체 ``` ... ``` 감싸짐)
- HC-2 raw \\n \\t \\r 이스케이프
- HC-3 HTML 엔티티 (&lt; &gt; &amp; &quot;)
- HC-4 앞뒤 불필요 공백 / 빈 줄 / 빈 fence
2. HC apply: 자동 적용 (비정상 카운트 시 abort).
3. HC 재검사: 0건 확인.
4. LC 리포트: 사람 판단 필요 (백틱 홀수 / $$ 홀수 / ** 홀수 / 표 / 4-space 들여쓰기).
옵션:
--round (필수)
--topic-id (default 4)
--no-apply : HC dry-run 만, apply 안 함.
--abort-threshold (default 50) : HC dry-run 카운트가 이 값 이상이면 abort.
"""
from __future__ import annotations
import argparse
import os
import re
import sys
from dataclasses import dataclass
import asyncpg
SITE_BASE = os.environ.get("STUDY_SITE_BASE", "https://document.hyungi.net")
# ── HC 룰 ──
TERM_FENCE_RE = re.compile(r"^```[A-Za-z0-9_-]*[ \t]*\n([\s\S]*?)\n```$")
UNTERM_FENCE_RE = re.compile(r"^```[A-Za-z0-9_-]*[ \t]*\n([\s\S]*)$")
# HC-2: JSON re-serialize 잔재 \n / \t / \r 를 실제 문자로 변환.
# KaTeX 명령어 (\nabla, \rho, \text, \rangle 등) false positive 회피 — 백슬래시-r/n/t 다음이
# 영문자가 아닐 때만 매칭. lookahead (?![A-Za-z]).
ESCAPE_PATTERNS = [
(re.compile(r"\\n(?![A-Za-z])"), "\n"),
(re.compile(r"\\t(?![A-Za-z])"), "\t"),
(re.compile(r"\\r(?![A-Za-z])"), ""),
]
HTML_ENTITIES = [
("&lt;", "<"),
("&gt;", ">"),
("&amp;", "&"),
("&quot;", '"'),
("&#39;", "'"),
]
def hc1_strip_outer_fence(text: str) -> str | None:
"""HC-1: 전체 텍스트가 단일 fenced block 으로 감싸진 경우 unwrap. 변경 시 새 텍스트, 아니면 None."""
if not text:
return None
trimmed = text.strip()
m = TERM_FENCE_RE.match(trimmed)
if m:
inner = m.group(1)
if "```" not in inner:
return inner
return None
if trimmed.count("```") == 1:
m2 = UNTERM_FENCE_RE.match(trimmed)
if m2:
return m2.group(1)
return None
def hc2_unescape(text: str) -> str | None:
"""HC-2: raw \\n \\t \\r 이스케이프 → 실제 문자."""
if not text:
return None
new = text
for pat, repl in ESCAPE_PATTERNS:
new = pat.sub(repl, new)
return new if new != text else None
def hc3_html_entities(text: str) -> str | None:
"""HC-3: HTML 엔티티 → 정상 문자."""
if not text:
return None
new = text
for ent, ch in HTML_ENTITIES:
new = new.replace(ent, ch)
return new if new != text else None
def hc4_strip_whitespace(text: str) -> str | None:
"""HC-4: 앞뒤 공백/빈 줄 정리. 본문 내부는 유지."""
if not text:
return None
stripped = text.strip()
# 빈 fence ``` ``` 제거 (앞뒤 fence 가 빈 본문이면)
stripped = re.sub(r"^```[A-Za-z0-9_-]*[ \t]*\n[\s]*\n```\s*", "", stripped)
stripped = re.sub(r"\s*```[A-Za-z0-9_-]*[ \t]*\n[\s]*\n```$", "", stripped)
return stripped if stripped != text else None
def hc5_block_math_spacing(text: str) -> str | None:
"""HC-5: 자체 줄 block math (^$$...$$$ 단독 라인) 의 앞뒤로 빈 줄 보장.
KaTeX block math 가 인라인 텍스트와 같은 단락에 묶여 렌더 실패하는 케이스 fix.
빈 줄 삽입만 — 본문 내용 보존.
"""
if not text or "$$" not in text:
return None
lines = text.split("\n")
new_lines: list[str] = []
changed = False
for i, line in enumerate(lines):
s = line.strip()
# 자체 줄 block math: ^$$...$$$ (한 줄 안 시작·종료, 내용 1자 이상)
is_block = (
s.startswith("$$") and s.endswith("$$") and len(s) >= 4 and s != "$$"
)
if is_block:
# 앞에 비어있지 않은 라인 있으면 빈 줄 추가
if new_lines and new_lines[-1].strip():
new_lines.append("")
changed = True
new_lines.append(line)
# 다음 라인이 비어있지 않으면 빈 줄 추가
if i < len(lines) - 1 and lines[i + 1].strip():
new_lines.append("")
changed = True
else:
new_lines.append(line)
if not changed:
return None
return "\n".join(new_lines)
def apply_all_hc(text: str) -> tuple[str, list[str]]:
"""HC 룰 순서대로 적용. (최종 텍스트, 적용된 룰 라벨 리스트)."""
new = text
applied: list[str] = []
# HC-1 outer fence
r = hc1_strip_outer_fence(new)
if r is not None:
new = r
applied.append("HC-1")
# HC-2 escape
r = hc2_unescape(new)
if r is not None:
new = r
applied.append("HC-2")
# HC-3 html entities
r = hc3_html_entities(new)
if r is not None:
new = r
applied.append("HC-3")
# HC-4 whitespace
r = hc4_strip_whitespace(new)
if r is not None:
new = r
applied.append("HC-4")
# HC-5 block math spacing (마지막 — HC-1~4 가 변경한 결과에도 적용)
r = hc5_block_math_spacing(new)
if r is not None:
new = r
applied.append("HC-5")
return new, applied
# ── LC 룰 ──
def lc_check(text: str) -> list[tuple[str, str, str]]:
"""LC 의심 리스트. (룰 라벨, 짧은 설명, snippet)."""
if not text:
return []
issues: list[tuple[str, str, str]] = []
# LC-1 백틱 그룹 홀수 (HC-1 적용 후에도 남음)
bt = text.count("```")
if bt % 2 == 1:
issues.append(("LC-1", f"백틱 그룹 홀수 ({bt}개)", _snippet_around(text, "```")))
# LC-2 $$ 홀수
dd = text.count("$$")
if dd % 2 == 1:
issues.append(("LC-2", f"$$ 짝 안 맞음 ({dd}개)", _snippet_around(text, "$$")))
# LC-3 inline $ 홀수 — $$ 제거 후 단일 $ 카운트
text_no_block = text.replace("$$", "")
sd = text_no_block.count("$")
if sd % 2 == 1:
issues.append(("LC-3", f"inline $ 짝 의심 ({sd}개)", _snippet_around(text, "$")))
# LC-4 ** 홀수
bb = text.count("**")
if bb % 2 == 1:
issues.append(("LC-4", f"** 짝 안 맞음 ({bb}개)", _snippet_around(text, "**")))
# LC-5 표 구분자 누락 — pipe 3개 이상 (헤더|...|...|컬럼) 만 검사. 절대값 |x| 는 무시.
# 한 컬럼 표 |---| 도 정상으로 인정 (`*` 사용).
lines = text.splitlines()
for i, line in enumerate(lines):
if line.count("|") >= 3: # 다컬럼 표 헤더만 (|x|y| 최소)
# 다음 비빈 줄이 ---|--- 형태인지
j = i + 1
while j < len(lines) and not lines[j].strip():
j += 1
if j < len(lines):
nxt = lines[j].strip()
# 헤더 구분자 패턴: |---| 또는 |---|---| (한 컬럼도 OK)
if not re.match(r"^\|?\s*:?-+:?\s*(\|\s*:?-+:?\s*)*\|?$", nxt):
issues.append((
"LC-5",
"표 구분자 누락 의심",
line.strip()[:80],
))
break
else:
issues.append((
"LC-5",
"표 구분자 누락 의심 (마지막 라인)",
line.strip()[:80],
))
break
break
# LC-6 4-space 들여쓰기 시작 (의도 외 코드블록)
for i, line in enumerate(lines):
if line.startswith(" ") and line.strip():
# 이전 줄이 비어있고 그 이전이 list/header 아니면 코드블록으로 인식 가능성
if i > 0 and not lines[i - 1].strip():
issues.append((
"LC-6",
"4-space 들여쓰기 코드블록 의심",
line[:80],
))
break
return issues
def _snippet_around(text: str, pattern: str, ctx: int = 30) -> str:
"""패턴 첫 등장 주변 snippet (newline → \\n 으로 표시)."""
idx = text.find(pattern)
if idx < 0:
return text[:60].replace("\n", "\\n")
start = max(0, idx - ctx)
end = min(len(text), idx + len(pattern) + ctx)
s = text[start:end].replace("\n", "\\n")
prefix = "..." if start > 0 else ""
suffix = "..." if end < len(text) else ""
return f"{prefix}{s}{suffix}"
# ── DB ──
@dataclass
class FieldChange:
qid: int
qnum: int | None
field: str
applied_rules: list[str]
old_len: int
new_len: int
@dataclass
class LCFinding:
qid: int
qnum: int | None
field: str
rule: str
desc: str
snippet: str
FIELDS = ["question_text", "choice_1", "choice_2", "choice_3", "choice_4", "explanation", "ai_explanation"]
async def run(topic_id: int, exam_round: str, apply: bool, abort_threshold: int) -> int:
conn = await asyncpg.connect(
host="postgres",
port=5432,
user="pkm",
password=os.environ["POSTGRES_PASSWORD"],
database="pkm",
)
try:
rows = await conn.fetch(
"""
SELECT id, exam_question_number, question_text,
choice_1, choice_2, choice_3, choice_4,
explanation, ai_explanation
FROM study_questions
WHERE study_topic_id=$1 AND deleted_at IS NULL AND exam_round=$2
ORDER BY exam_question_number NULLS LAST, id
""",
topic_id,
exam_round,
)
print(f"[{exam_round}] 검사 대상: {len(rows)}문항\n")
# ── HC dry-run ──
hc_changes: list[FieldChange] = []
rule_counts: dict[str, int] = {}
for r in rows:
for fld in FIELDS:
old = r[fld]
if not old:
continue
new, applied = apply_all_hc(old)
if applied:
hc_changes.append(FieldChange(
qid=r["id"],
qnum=r["exam_question_number"],
field=fld,
applied_rules=applied,
old_len=len(old),
new_len=len(new),
))
for rl in applied:
rule_counts[rl] = rule_counts.get(rl, 0) + 1
print("─── HC dry-run ───")
for rl in ["HC-1", "HC-2", "HC-3", "HC-4", "HC-5"]:
print(f" {rl}: {rule_counts.get(rl, 0)}")
print(f" 총 변경 대상 field: {len(hc_changes)}\n")
if hc_changes:
for c in hc_changes[:5]:
print(f" 샘플 — {c.qnum}번 / {c.field}: rules={c.applied_rules} {c.old_len}{c.new_len}")
if len(hc_changes) > 5:
print(f" ... +{len(hc_changes) - 5}건 더")
print()
# 비정상 카운트 abort
if len(hc_changes) >= abort_threshold:
print(f"⚠ HC 변경 대상이 {len(hc_changes)}건 (임계값 {abort_threshold}). abort. --abort-threshold 로 조정 가능.", file=sys.stderr)
return 2
# ── HC apply ──
if apply and hc_changes:
print("─── HC apply ───")
applied_count = 0
async with conn.transaction():
for c in hc_changes:
# 다시 fetch 해서 새 값 계산 (트랜잭션 안 일관성)
row = await conn.fetchrow(
f"SELECT {c.field} AS val FROM study_questions WHERE id=$1",
c.qid,
)
if row is None or row["val"] is None:
continue
new, _ = apply_all_hc(row["val"])
if new != row["val"]:
await conn.execute(
f"UPDATE study_questions SET {c.field}=$1 WHERE id=$2",
new,
c.qid,
)
applied_count += 1
print(f" 적용 완료: {applied_count}\n")
# 재검사
print("─── HC 재검사 ───")
recheck_rows = await conn.fetch(
"""
SELECT id, question_text, choice_1, choice_2, choice_3, choice_4,
explanation, ai_explanation
FROM study_questions
WHERE study_topic_id=$1 AND deleted_at IS NULL AND exam_round=$2
""",
topic_id, exam_round,
)
recheck_hits = 0
for r in recheck_rows:
for fld in FIELDS:
old = r[fld]
if not old:
continue
_, applied = apply_all_hc(old)
if applied:
recheck_hits += 1
if recheck_hits == 0:
print(f" ✓ 재검사 0건 (apply 효과 검증)\n")
else:
print(f" ⚠ 재검사 {recheck_hits}건 남음 — 추가 조사 필요\n")
# ── LC 리포트 ──
lc_findings: list[LCFinding] = []
# apply 후의 최신 텍스트 기준으로 LC 검사
rows_now = await conn.fetch(
"""
SELECT id, exam_question_number, question_text,
choice_1, choice_2, choice_3, choice_4,
explanation, ai_explanation
FROM study_questions
WHERE study_topic_id=$1 AND deleted_at IS NULL AND exam_round=$2
ORDER BY exam_question_number NULLS LAST, id
""",
topic_id, exam_round,
)
for r in rows_now:
for fld in FIELDS:
txt = r[fld]
if not txt:
continue
for rule, desc, snip in lc_check(txt):
lc_findings.append(LCFinding(
qid=r["id"],
qnum=r["exam_question_number"],
field=fld,
rule=rule,
desc=desc,
snippet=snip,
))
print("─── LC 리포트 (사람 판단 필요) ───")
if not lc_findings:
print(" ✓ 0건\n")
else:
lc_counts: dict[str, int] = {}
for f in lc_findings:
lc_counts[f.rule] = lc_counts.get(f.rule, 0) + 1
for rl in ["LC-1", "LC-2", "LC-3", "LC-4", "LC-5", "LC-6"]:
if rl in lc_counts:
print(f" {rl}: {lc_counts[rl]}")
print(f" 총: {len(lc_findings)}\n")
print("상세:")
for f in lc_findings:
print(f" [{f.rule}] {f.qnum}번 / {f.field}{f.desc}")
print(f" Snippet: {f.snippet!r}")
print(f" Edit: {SITE_BASE}/study/topics/{topic_id}/questions/{f.qid}/edit")
print()
return 0
finally:
await conn.close()
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--topic-id", type=int, default=4)
p.add_argument("--round", required=True, help="예: 2019년 1회")
p.add_argument("--no-apply", action="store_true", help="HC dry-run 만, apply 안 함")
p.add_argument("--abort-threshold", type=int, default=50, help="HC 변경 대상이 이 값 이상이면 abort")
args = p.parse_args()
import asyncio
code = asyncio.run(run(
topic_id=args.topic_id,
exam_round=args.round,
apply=not args.no_apply,
abort_threshold=args.abort_threshold,
))
sys.exit(code)
if __name__ == "__main__":
main()