"""scripts/audit_study_question_markdown.py — study_questions DB 텍스트 정합성 audit. 사용: docker compose exec fastapi python /app/scripts/audit_study_question_markdown.py \\ --round "2019년 1회" 기본 동작 (한 번에 끝): 1. HC dry-run: 자동 fix 가능한 포맷 찌꺼기 detect. - HC-1 outer fence wrap (전체 ``` ... ``` 감싸짐) - HC-2 raw \\n \\t \\r 이스케이프 - HC-3 HTML 엔티티 (< > & ") - HC-4 앞뒤 불필요 공백 / 빈 줄 / 빈 fence 2. HC apply: 자동 적용 (비정상 카운트 시 abort). 3. HC 재검사: 0건 확인. 4. LC 리포트: 사람 판단 필요 (백틱 홀수 / $$ 홀수 / ** 홀수 / 표 / 4-space 들여쓰기). 옵션: --round (필수) --topic-id (default 4) --no-apply : HC dry-run 만, apply 안 함. --abort-threshold (default 50) : HC dry-run 카운트가 이 값 이상이면 abort. """ from __future__ import annotations import argparse import os import re import sys from dataclasses import dataclass import asyncpg SITE_BASE = os.environ.get("STUDY_SITE_BASE", "https://document.hyungi.net") # ── HC 룰 ── TERM_FENCE_RE = re.compile(r"^```[A-Za-z0-9_-]*[ \t]*\n([\s\S]*?)\n```$") UNTERM_FENCE_RE = re.compile(r"^```[A-Za-z0-9_-]*[ \t]*\n([\s\S]*)$") # HC-2: JSON re-serialize 잔재 \n / \t / \r 를 실제 문자로 변환. # KaTeX 명령어 (\nabla, \rho, \text, \rangle 등) false positive 회피 — 백슬래시-r/n/t 다음이 # 영문자가 아닐 때만 매칭. lookahead (?![A-Za-z]). ESCAPE_PATTERNS = [ (re.compile(r"\\n(?![A-Za-z])"), "\n"), (re.compile(r"\\t(?![A-Za-z])"), "\t"), (re.compile(r"\\r(?![A-Za-z])"), ""), ] HTML_ENTITIES = [ ("<", "<"), (">", ">"), ("&", "&"), (""", '"'), ("'", "'"), ] def hc1_strip_outer_fence(text: str) -> str | None: """HC-1: 전체 텍스트가 단일 fenced block 으로 감싸진 경우 unwrap. 변경 시 새 텍스트, 아니면 None.""" if not text: return None trimmed = text.strip() m = TERM_FENCE_RE.match(trimmed) if m: inner = m.group(1) if "```" not in inner: return inner return None if trimmed.count("```") == 1: m2 = UNTERM_FENCE_RE.match(trimmed) if m2: return m2.group(1) return None def hc2_unescape(text: str) -> str | None: """HC-2: raw \\n \\t \\r 이스케이프 → 실제 문자.""" if not text: return None new = text for pat, repl in ESCAPE_PATTERNS: new = pat.sub(repl, new) return new if new != text else None def hc3_html_entities(text: str) -> str | None: """HC-3: HTML 엔티티 → 정상 문자.""" if not text: return None new = text for ent, ch in HTML_ENTITIES: new = new.replace(ent, ch) return new if new != text else None def hc4_strip_whitespace(text: str) -> str | None: """HC-4: 앞뒤 공백/빈 줄 정리. 본문 내부는 유지.""" if not text: return None stripped = text.strip() # 빈 fence ``` ``` 제거 (앞뒤 fence 가 빈 본문이면) stripped = re.sub(r"^```[A-Za-z0-9_-]*[ \t]*\n[\s]*\n```\s*", "", stripped) stripped = re.sub(r"\s*```[A-Za-z0-9_-]*[ \t]*\n[\s]*\n```$", "", stripped) return stripped if stripped != text else None def hc5_block_math_spacing(text: str) -> str | None: """HC-5: 자체 줄 block math (^$$...$$$ 단독 라인) 의 앞뒤로 빈 줄 보장. KaTeX block math 가 인라인 텍스트와 같은 단락에 묶여 렌더 실패하는 케이스 fix. 빈 줄 삽입만 — 본문 내용 보존. """ if not text or "$$" not in text: return None lines = text.split("\n") new_lines: list[str] = [] changed = False for i, line in enumerate(lines): s = line.strip() # 자체 줄 block math: ^$$...$$$ (한 줄 안 시작·종료, 내용 1자 이상) is_block = ( s.startswith("$$") and s.endswith("$$") and len(s) >= 4 and s != "$$" ) if is_block: # 앞에 비어있지 않은 라인 있으면 빈 줄 추가 if new_lines and new_lines[-1].strip(): new_lines.append("") changed = True new_lines.append(line) # 다음 라인이 비어있지 않으면 빈 줄 추가 if i < len(lines) - 1 and lines[i + 1].strip(): new_lines.append("") changed = True else: new_lines.append(line) if not changed: return None return "\n".join(new_lines) def apply_all_hc(text: str) -> tuple[str, list[str]]: """HC 룰 순서대로 적용. (최종 텍스트, 적용된 룰 라벨 리스트).""" new = text applied: list[str] = [] # HC-1 outer fence r = hc1_strip_outer_fence(new) if r is not None: new = r applied.append("HC-1") # HC-2 escape r = hc2_unescape(new) if r is not None: new = r applied.append("HC-2") # HC-3 html entities r = hc3_html_entities(new) if r is not None: new = r applied.append("HC-3") # HC-4 whitespace r = hc4_strip_whitespace(new) if r is not None: new = r applied.append("HC-4") # HC-5 block math spacing (마지막 — HC-1~4 가 변경한 결과에도 적용) r = hc5_block_math_spacing(new) if r is not None: new = r applied.append("HC-5") return new, applied # ── LC 룰 ── def lc_check(text: str) -> list[tuple[str, str, str]]: """LC 의심 리스트. (룰 라벨, 짧은 설명, snippet).""" if not text: return [] issues: list[tuple[str, str, str]] = [] # LC-1 백틱 그룹 홀수 (HC-1 적용 후에도 남음) bt = text.count("```") if bt % 2 == 1: issues.append(("LC-1", f"백틱 그룹 홀수 ({bt}개)", _snippet_around(text, "```"))) # LC-2 $$ 홀수 dd = text.count("$$") if dd % 2 == 1: issues.append(("LC-2", f"$$ 짝 안 맞음 ({dd}개)", _snippet_around(text, "$$"))) # LC-3 inline $ 홀수 — $$ 제거 후 단일 $ 카운트 text_no_block = text.replace("$$", "") sd = text_no_block.count("$") if sd % 2 == 1: issues.append(("LC-3", f"inline $ 짝 의심 ({sd}개)", _snippet_around(text, "$"))) # LC-4 ** 홀수 bb = text.count("**") if bb % 2 == 1: issues.append(("LC-4", f"** 짝 안 맞음 ({bb}개)", _snippet_around(text, "**"))) # LC-5 표 구분자 누락 — pipe 3개 이상 (헤더|...|...|컬럼) 만 검사. 절대값 |x| 는 무시. # 한 컬럼 표 |---| 도 정상으로 인정 (`*` 사용). lines = text.splitlines() for i, line in enumerate(lines): if line.count("|") >= 3: # 다컬럼 표 헤더만 (|x|y| 최소) # 다음 비빈 줄이 ---|--- 형태인지 j = i + 1 while j < len(lines) and not lines[j].strip(): j += 1 if j < len(lines): nxt = lines[j].strip() # 헤더 구분자 패턴: |---| 또는 |---|---| (한 컬럼도 OK) if not re.match(r"^\|?\s*:?-+:?\s*(\|\s*:?-+:?\s*)*\|?$", nxt): issues.append(( "LC-5", "표 구분자 누락 의심", line.strip()[:80], )) break else: issues.append(( "LC-5", "표 구분자 누락 의심 (마지막 라인)", line.strip()[:80], )) break break # LC-6 4-space 들여쓰기 시작 (의도 외 코드블록) for i, line in enumerate(lines): if line.startswith(" ") and line.strip(): # 이전 줄이 비어있고 그 이전이 list/header 아니면 코드블록으로 인식 가능성 if i > 0 and not lines[i - 1].strip(): issues.append(( "LC-6", "4-space 들여쓰기 코드블록 의심", line[:80], )) break return issues def _snippet_around(text: str, pattern: str, ctx: int = 30) -> str: """패턴 첫 등장 주변 snippet (newline → \\n 으로 표시).""" idx = text.find(pattern) if idx < 0: return text[:60].replace("\n", "\\n") start = max(0, idx - ctx) end = min(len(text), idx + len(pattern) + ctx) s = text[start:end].replace("\n", "\\n") prefix = "..." if start > 0 else "" suffix = "..." if end < len(text) else "" return f"{prefix}{s}{suffix}" # ── DB ── @dataclass class FieldChange: qid: int qnum: int | None field: str applied_rules: list[str] old_len: int new_len: int @dataclass class LCFinding: qid: int qnum: int | None field: str rule: str desc: str snippet: str FIELDS = ["question_text", "choice_1", "choice_2", "choice_3", "choice_4", "explanation", "ai_explanation"] async def run(topic_id: int, exam_round: str, apply: bool, abort_threshold: int) -> int: conn = await asyncpg.connect( host="postgres", port=5432, user="pkm", password=os.environ["POSTGRES_PASSWORD"], database="pkm", ) try: rows = await conn.fetch( """ SELECT id, exam_question_number, question_text, choice_1, choice_2, choice_3, choice_4, explanation, ai_explanation FROM study_questions WHERE study_topic_id=$1 AND deleted_at IS NULL AND exam_round=$2 ORDER BY exam_question_number NULLS LAST, id """, topic_id, exam_round, ) print(f"[{exam_round}] 검사 대상: {len(rows)}문항\n") # ── HC dry-run ── hc_changes: list[FieldChange] = [] rule_counts: dict[str, int] = {} for r in rows: for fld in FIELDS: old = r[fld] if not old: continue new, applied = apply_all_hc(old) if applied: hc_changes.append(FieldChange( qid=r["id"], qnum=r["exam_question_number"], field=fld, applied_rules=applied, old_len=len(old), new_len=len(new), )) for rl in applied: rule_counts[rl] = rule_counts.get(rl, 0) + 1 print("─── HC dry-run ───") for rl in ["HC-1", "HC-2", "HC-3", "HC-4", "HC-5"]: print(f" {rl}: {rule_counts.get(rl, 0)}건") print(f" 총 변경 대상 field: {len(hc_changes)}건\n") if hc_changes: for c in hc_changes[:5]: print(f" 샘플 — {c.qnum}번 / {c.field}: rules={c.applied_rules} {c.old_len} → {c.new_len}") if len(hc_changes) > 5: print(f" ... +{len(hc_changes) - 5}건 더") print() # 비정상 카운트 abort if len(hc_changes) >= abort_threshold: print(f"⚠ HC 변경 대상이 {len(hc_changes)}건 (임계값 {abort_threshold}). abort. --abort-threshold 로 조정 가능.", file=sys.stderr) return 2 # ── HC apply ── if apply and hc_changes: print("─── HC apply ───") applied_count = 0 async with conn.transaction(): for c in hc_changes: # 다시 fetch 해서 새 값 계산 (트랜잭션 안 일관성) row = await conn.fetchrow( f"SELECT {c.field} AS val FROM study_questions WHERE id=$1", c.qid, ) if row is None or row["val"] is None: continue new, _ = apply_all_hc(row["val"]) if new != row["val"]: await conn.execute( f"UPDATE study_questions SET {c.field}=$1 WHERE id=$2", new, c.qid, ) applied_count += 1 print(f" 적용 완료: {applied_count}건\n") # 재검사 print("─── HC 재검사 ───") recheck_rows = await conn.fetch( """ SELECT id, question_text, choice_1, choice_2, choice_3, choice_4, explanation, ai_explanation FROM study_questions WHERE study_topic_id=$1 AND deleted_at IS NULL AND exam_round=$2 """, topic_id, exam_round, ) recheck_hits = 0 for r in recheck_rows: for fld in FIELDS: old = r[fld] if not old: continue _, applied = apply_all_hc(old) if applied: recheck_hits += 1 if recheck_hits == 0: print(f" ✓ 재검사 0건 (apply 효과 검증)\n") else: print(f" ⚠ 재검사 {recheck_hits}건 남음 — 추가 조사 필요\n") # ── LC 리포트 ── lc_findings: list[LCFinding] = [] # apply 후의 최신 텍스트 기준으로 LC 검사 rows_now = await conn.fetch( """ SELECT id, exam_question_number, question_text, choice_1, choice_2, choice_3, choice_4, explanation, ai_explanation FROM study_questions WHERE study_topic_id=$1 AND deleted_at IS NULL AND exam_round=$2 ORDER BY exam_question_number NULLS LAST, id """, topic_id, exam_round, ) for r in rows_now: for fld in FIELDS: txt = r[fld] if not txt: continue for rule, desc, snip in lc_check(txt): lc_findings.append(LCFinding( qid=r["id"], qnum=r["exam_question_number"], field=fld, rule=rule, desc=desc, snippet=snip, )) print("─── LC 리포트 (사람 판단 필요) ───") if not lc_findings: print(" ✓ 0건\n") else: lc_counts: dict[str, int] = {} for f in lc_findings: lc_counts[f.rule] = lc_counts.get(f.rule, 0) + 1 for rl in ["LC-1", "LC-2", "LC-3", "LC-4", "LC-5", "LC-6"]: if rl in lc_counts: print(f" {rl}: {lc_counts[rl]}건") print(f" 총: {len(lc_findings)}건\n") print("상세:") for f in lc_findings: print(f" [{f.rule}] {f.qnum}번 / {f.field} — {f.desc}") print(f" Snippet: {f.snippet!r}") print(f" Edit: {SITE_BASE}/study/topics/{topic_id}/questions/{f.qid}/edit") print() return 0 finally: await conn.close() def main() -> None: p = argparse.ArgumentParser() p.add_argument("--topic-id", type=int, default=4) p.add_argument("--round", required=True, help="예: 2019년 1회") p.add_argument("--no-apply", action="store_true", help="HC dry-run 만, apply 안 함") p.add_argument("--abort-threshold", type=int, default=50, help="HC 변경 대상이 이 값 이상이면 abort") args = p.parse_args() import asyncio code = asyncio.run(run( topic_id=args.topic_id, exam_round=args.round, apply=not args.no_apply, abort_threshold=args.abort_threshold, )) sys.exit(code) if __name__ == "__main__": main()