#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ cha_json.py — 將單一 CLAN .cha 轉成 JSON(強化 %mor/%wor/%gra 對齊) 用法: # CLI python3 cha_json.py --input /path/to/input.cha --output /path/to/output.json 程式化呼叫(供 pipeline 使用): from cha_json import cha_to_json_file, cha_to_dict out_path, data = cha_to_json_file("/path/in.cha", "/path/out.json") data2 = cha_to_dict("/path/in.cha") """ from __future__ import annotations import re import json import sys import argparse from pathlib import Path from collections import defaultdict from typing import List, Dict, Any, Tuple, Optional # 可接受的跨行停止條件(用於 %mor/%wor/%gra 合併) TAG_PREFIXES = ("*PAR", "*INV", "%mor:", "%gra:", "%wor:", "@") WORD_RE = re.compile(r"[A-Za-z0-9]+") # 病人角色:PAR / PAR0 / PAR1 / ... ID_PAR_RE = re.compile(r"\|PAR\d*\|") # 對話行:*INV: 或 *PAR0: / *PAR1: / ... UTTER_RE = re.compile(r"^\*(INV|PAR\d+):") # ────────── 同義集合(對齊時容忍形態變化) ────────── SYN_SETS = [ {"be", "am", "is", "are", "was", "were", "been", "being"}, {"have", "has", "had"}, {"do", "does", "did", "done", "doing"}, {"go", "goes", "going", "went", "gone"}, {"run", "runs", "running", "ran"}, {"see", "sees", "seeing", "saw", "seen"}, {"get", "gets", "getting", "got", "gotten"}, {"drop", "drops", "dropping", "dropped"}, {"swim", "swims", "swimming", "swam", "swum"}, ] def same_syn(a: str, b: str) -> bool: if not a or not b: return False for s in SYN_SETS: if a in s and b in s: return True return False def canonical(txt: str) -> str: """token/word → 比對用字串:去掉 & ~ - | 之後的非字母數字、轉小寫""" head = re.split(r"[~\-\&|]", txt, 1)[0] m = WORD_RE.search(head) return m.group(0).lower() if m else "" def merge_multiline(block_lines: List[str]) -> str: """ 合併跨行的 %mor/%wor/%gra。 規則:以 '%' 開頭者作為起始,往下串,遇到新標籤或 @ 開頭就停。 """ merged, buf = [], None for raw in block_lines: ln = raw.rstrip("\n").replace("\x15", "") # 去掉 CLAN 控制字 if ln.lstrip().startswith("%") and ":" in ln: if buf: merged.append(buf) buf = ln else: if buf and ln.strip(): buf += " " + ln.strip() else: merged.append(ln) if buf: merged.append(buf) return "\n".join(merged) def cha_to_json(lines: List[str]) -> Dict[str, Any]: """ 將 .cha 檔行列表轉 JSON 結構。 回傳格式: { "sentences": [...], "pos_mapping": {...}, "grammar_mapping": {...}, "aphasia_types": {...}, "text_all": "..." # 方便下游模型使用的 PAR 合併文字 } """ # 對應表(pos / gra 從 1 起算;aphasia 類型 0 起) pos_map: Dict[str, int] = defaultdict(lambda: len(pos_map) + 1) gra_map: Dict[str, int] = defaultdict(lambda: len(gra_map) + 1) aphasia_map: Dict[str, int] = defaultdict(lambda: len(aphasia_map)) data: List[Dict[str, Any]] = [] sent: Optional[Dict[str, Any]] = None i = 0 while i < len(lines): line = lines[i].rstrip("\n") # 啟段 if line.startswith("@Begin"): sent = { "sentence_id": f"S{len(data)+1}", "sentence_pid": None, "aphasia_type": None, # 若最後仍沒有,就標 UNKNOWN "dialogues": [] # [ { "INV": [...], "PAR": [...] }, ... ] } i += 1 continue # 結束 if line.startswith("@End"): if sent and sent["dialogues"]: if not sent.get("aphasia_type"): sent["aphasia_type"] = "UNKNOWN" aphasia_map["UNKNOWN"] data.append(sent) sent = None i += 1 continue # 句子屬性 if sent and line.startswith("@PID:"): parts = line.split("\t") if len(parts) > 1: sent["sentence_pid"] = parts[1].strip() i += 1 continue if sent and line.startswith("@ID:"): # 是否為病人那位 PAR* if ID_PAR_RE.search(line): aph = "UNKNOWN" # 如果 @ID 有標註失語類型,可在此使用 regex 抓出來並替換 aph # m = re.search(r"WAB:([A-Za-z]+)", line) # if m: aph = m.group(1) aph = aph.upper() aphasia_map[aph] # 建立 map(自動編號) sent["aphasia_type"] = aph i += 1 continue # 對話行:*INV: 或 *PARx: if sent and UTTER_RE.match(line): role_tag = UTTER_RE.match(line).group(1) role = "INV" if role_tag == "INV" else "PAR" if not sent["dialogues"]: sent["dialogues"].append({"INV": [], "PAR": []}) # 新輪對話:若來的是 INV 且上一輪已有 PAR,視為下一輪 if role == "INV" and sent["dialogues"][-1]["PAR"]: sent["dialogues"].append({"INV": [], "PAR": []}) # 新增一個空 turn(之後 %mor/%wor/%gra 會補) sent["dialogues"][-1][role].append( {"tokens": [], "word_pos_ids": [], "word_grammar_ids": [], "word_durations": [], "utterance_text": ""} ) i += 1 continue # %mor if sent and line.startswith("%mor:"): blk = [line]; i += 1 while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): blk.append(lines[i]); i += 1 units = merge_multiline(blk).replace("%mor:", "").strip().split() toks, pos_ids = [], [] for u in units: if "|" in u: pos, rest = u.split("|", 1) word = rest.split("|", 1)[0] toks.append(word) pos_ids.append(pos_map[pos]) dlg = sent["dialogues"][-1] tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] tgt["tokens"], tgt["word_pos_ids"] = toks, pos_ids # 也保存 plain text 供下游模型使用 tgt["utterance_text"] = " ".join(toks).strip() continue # %wor if sent and line.startswith("%wor:"): blk = [line]; i += 1 while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): blk.append(lines[i]); i += 1 merged = merge_multiline(blk).replace("%wor:", "").strip() # 抓 _ raw_pairs = re.findall(r"(\S+)\s+(\d+)_(\d+)", merged) wor = [(w, int(s), int(e)) for (w, s, e) in raw_pairs] dlg = sent["dialogues"][-1] tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] # 與 %mor tokens 對齊,duration = end - start aligned: List[Tuple[str, int]] = [] j = 0 for tok in tgt.get("tokens", []): c_tok = canonical(tok) match = None for k in range(j, len(wor)): c_w = canonical(wor[k][0]) if ( c_tok == c_w or c_w.startswith(c_tok) or c_tok.startswith(c_w) or same_syn(c_tok, c_w) ): match = wor[k] j = k + 1 break dur = (match[2] - match[1]) if match else 0 aligned.append([tok, dur]) tgt["word_durations"] = aligned continue # %gra if sent and line.startswith("%gra:"): blk = [line]; i += 1 while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): blk.append(lines[i]); i += 1 units = merge_multiline(blk).replace("%gra:", "").strip().split() triples = [] for u in units: # 例:1|2|DET parts = u.split("|") if len(parts) == 3: a, b, r = parts if a.isdigit() and b.isdigit(): triples.append([int(a), int(b), gra_map[r]]) dlg = sent["dialogues"][-1] tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] tgt["word_grammar_ids"] = triples continue # 其他行 i += 1 # 收尾(檔案若意外沒 @End) if sent and sent["dialogues"]: if not sent.get("aphasia_type"): sent["aphasia_type"] = "UNKNOWN" aphasia_map["UNKNOWN"] data.append(sent) # 建立 text_all:把所有 PAR utterance_text 串起來 par_texts: List[str] = [] for s in data: for turn in s.get("dialogues", []): for par_ut in turn.get("PAR", []): if par_ut.get("utterance_text"): par_texts.append(par_ut["utterance_text"]) text_all = "\n".join(par_texts).strip() return { "sentences": data, "pos_mapping": dict(pos_map), "grammar_mapping": dict(gra_map), "aphasia_types": dict(aphasia_map), "text_all": text_all } # ────────── 封裝:檔案 → dict / 檔案 → 檔案 ────────── def cha_to_dict(cha_path: str) -> Dict[str, Any]: """讀取 .cha 檔並回傳 dict(不寫檔)。""" p = Path(cha_path) if not p.exists(): raise FileNotFoundError(f"找不到檔案: {cha_path}") with p.open("r", encoding="utf-8") as fh: lines = fh.readlines() return cha_to_json(lines) def cha_to_json_file(cha_path: str, output_json: Optional[str] = None) -> Tuple[str, Dict[str, Any]]: """ 將 .cha 轉成 JSON 並寫檔。 回傳:(output_json_path, data_dict) """ data = cha_to_dict(cha_path) out_path = Path(output_json) if output_json else Path(cha_path).with_suffix(".json") out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", encoding="utf-8") as fh: json.dump(data, fh, ensure_ascii=False, indent=4) return str(out_path), data # ────────── CLI ────────── def parse_args(): p = argparse.ArgumentParser() p.add_argument("--input", "-i", type=str, required=True, help="輸入 .cha 檔") p.add_argument("--output", "-o", type=str, required=True, help="輸出 .json 檔") return p.parse_args() def cha_to_json_path(cha_path: str, output_json: str | None = None) -> str: """Backward-compatible alias for old code.""" out, _ = cha_to_json_file(cha_path, output_json=output_json) return out def main(): args = parse_args() in_path = Path(args.input) out_path = Path(args.output) if not in_path.exists(): sys.exit(f"❌ 找不到檔案: {in_path}") with in_path.open("r", encoding="utf-8") as fh: lines = fh.readlines() dataset = cha_to_json(lines) out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", encoding="utf-8") as fh: json.dump(dataset, fh, ensure_ascii=False, indent=4) print( f"✅ 轉換完成 → {out_path}(句數 {len(dataset['sentences'])}," f"pos={len(dataset['pos_mapping'])},gra={len(dataset['grammar_mapping'])}," f"類型鍵={list(dataset['aphasia_types'].keys())})" ) if __name__ == "__main__": main()