import gradio as gr import os import tempfile import zipfile import shutil import base64 import json import re import concurrent.futures import time import random from pdf2image import convert_from_path from PIL import Image from dotenv import load_dotenv # PPTX 處理套件 from pptx import Presentation from pptx.util import Inches, Pt from pptx.dml.color import RGBColor # 使用 Google 新版 SDK from google import genai from google.genai import types load_dotenv() class NotebookLMTool: def __init__(self): self.api_key = os.getenv("GEMINI_API_KEY") # 移除全域 client,改為動態建立以確保執行緒安全 def set_key(self, user_key): if user_key and user_key.strip(): self.api_key = user_key.strip() return "✅ API Key 已更新!" return "⚠️ Key 無效" def _extract_json(self, text): """強化版 JSON 提取""" try: match = re.search(r"```json\s*(.*)\s*```", text, re.DOTALL) if match: return json.loads(match.group(1)) match = re.search(r"\[\s*\{.*\}\s*\]", text, re.DOTALL) if match: return json.loads(match.group(0)) return json.loads(text) except: return [] def _create_client(self): """為每個執行緒建立獨立的 Client""" if not self.api_key: raise ValueError("API Key 未設定") return genai.Client(api_key=self.api_key) def _call_gemini_with_retry(self, client, model_name, contents, config=None, retries=5): """ 封裝 Gemini 呼叫,加入指數退避重試機制 """ delay = 5 # 初始等待秒數 for attempt in range(retries): try: response = client.models.generate_content( model=model_name, contents=contents, config=config ) return response except Exception as e: error_str = str(e) # 檢查是否為 Rate Limit 相關錯誤 if "429" in error_str or "RESOURCE_EXHAUSTED" in error_str or "503" in error_str: wait_time = delay + random.uniform(0, 3) print(f"⚠️ API 忙碌 (Attempt {attempt+1}/{retries}),休息 {wait_time:.1f} 秒...", flush=True) time.sleep(wait_time) delay *= 1.5 else: raise e raise Exception("API 重試多次失敗,請檢查配額。") # --- 單頁處理邏輯 --- def process_single_page(self, page_index, img, img_output_dir): """處理單一頁面的:去字(背景) + 文字分析(Layout)""" print(f"🚀 [Page {page_index+1}] 啟動處理...", flush=True) # 關鍵修改:在此處建立獨立的 Client,避免執行緒衝突 try: local_client = self._create_client() except Exception as e: print(f"❌ [Page {page_index+1}] Client Init Error: {e}") return None result = { "index": page_index, "bg_path": None, "blocks": [], "log": "", "preview": None, "tokens_in": 0, "tokens_out": 0 } save_name = f"slide_{page_index+1:02d}.png" final_bg_path = os.path.join(img_output_dir, save_name) bg_success = False # ========================================== # 1. 背景去字 (Image Cleaning) # ========================================== try: clean_prompt = """ Strictly remove all text, titles, text-boxes, and bullet points from this slide image. CRITICAL INSTRUCTION: 1. Preserve the original background pattern, colors, logos, and non-text graphics EXACTLY as they are. 2. Do NOT add any new objects, decorations, or hallucinations. 3. Output ONLY the image. """ # 使用 gemini-2.5-flash-image (支援繪圖) resp_img = self._call_gemini_with_retry( client=local_client, model_name="gemini-2.5-flash-image", contents=[clean_prompt, img], config=types.GenerateContentConfig(response_modalities=["IMAGE"]) ) # Token 統計 if resp_img.usage_metadata: result["tokens_in"] += resp_img.usage_metadata.prompt_token_count result["tokens_out"] += resp_img.usage_metadata.candidates_token_count # 存圖邏輯 image_data = None if hasattr(resp_img, 'parts') and resp_img.parts: for part in resp_img.parts: if part.inline_data: image_data = part.inline_data.data; break if image_data is None and hasattr(resp_img, 'bytes') and resp_img.bytes: image_data = resp_img.bytes if image_data: if isinstance(image_data, str): image_data = base64.b64decode(image_data) with open(final_bg_path, "wb") as f: f.write(image_data) bg_success = True result["bg_path"] = final_bg_path result["preview"] = (final_bg_path, f"Page {page_index+1} Cleaned") else: print(f"⚠️ [Page {page_index+1}] 去字失敗: 模型未回傳圖片", flush=True) except Exception as e: print(f"❌ [Page {page_index+1}] Clean Error: {e}", flush=True) # 失敗回退原圖 if not bg_success: img.save(final_bg_path) result["bg_path"] = final_bg_path result["preview"] = (final_bg_path, f"Page {page_index+1} (Original)") result["log"] += f"[P{page_index+1}] Warning: Background cleaning failed. Used original image.\n" # ========================================== # 2. 文字與佈局分析 (Layout Analysis) # ========================================== try: layout_prompt = """ Analyze this slide. Return a JSON list of all text blocks. Each item: {"text": string, "box_2d": [ymin, xmin, ymax, xmax] (0-1000), "font_size": int, "color": hex, "is_bold": bool} """ # 使用一般的 flash 模型做文字分析 resp_layout = self._call_gemini_with_retry( client=local_client, model_name="gemini-2.5-flash", contents=[layout_prompt, img], config=types.GenerateContentConfig(response_mime_type="application/json") ) if resp_layout.usage_metadata: result["tokens_in"] += resp_layout.usage_metadata.prompt_token_count result["tokens_out"] += resp_layout.usage_metadata.candidates_token_count blocks = self._extract_json(resp_layout.text) result["blocks"] = blocks except Exception as e: print(f"❌ [Page {page_index+1}] Layout Error: {e}", flush=True) result["log"] += f"[P{page_index+1}] Layout Analysis Failed.\n" print(f"✅ [Page {page_index+1}] 完成!", flush=True) return result def process_pdf(self, pdf_file, progress=gr.Progress()): if not self.api_key: raise ValueError("請先輸入 Google API Key!") if pdf_file is None: return None, None, None, "" # 統計數據 total_input_tokens = 0 total_output_tokens = 0 full_text_log = "" gallery_preview = [] # 1. 準備環境 temp_dir = tempfile.mkdtemp() img_output_dir = os.path.join(temp_dir, "cleaned_images") os.makedirs(img_output_dir, exist_ok=True) # 初始化 PPTX prs = Presentation() prs.slide_width = Inches(16) prs.slide_height = Inches(9) # 2. PDF 轉圖片 (降低 DPI 加速) progress(0.1, desc="正在將 PDF 轉為圖片 (DPI=150)...") try: images = convert_from_path(pdf_file, dpi=150) except Exception as e: raise ValueError(f"PDF 轉換失敗: {str(e)}") # 3. 平行處理 (Parallel Execution) max_workers = 2 results_map = {} progress(0.2, desc="🚀 AI 處理中 (已啟用速率保護)...") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_page = {} for i, img in enumerate(images): time.sleep(1.5) # 錯開請求 future = executor.submit(self.process_single_page, i, img, img_output_dir) future_to_page[future] = i for future in concurrent.futures.as_completed(future_to_page): try: res = future.result() if res: results_map[res["index"]] = res total_input_tokens += res["tokens_in"] total_output_tokens += res["tokens_out"] except Exception as exc: print(f"Page processing generated an exception: {exc}") # 4. 依序組裝 PPTX progress(0.8, desc="正在組裝 PPTX...") cleaned_images_paths = [] for i in range(len(images)): if i not in results_map: continue res = results_map[i] full_text_log += res["log"] if res["preview"]: gallery_preview.append(res["preview"]) if res["bg_path"]: cleaned_images_paths.append(res["bg_path"]) slide = prs.slides.add_slide(prs.slide_layouts[6]) # A. 貼背景 if res["bg_path"] and os.path.exists(res["bg_path"]): try: slide.shapes.add_picture(res["bg_path"], 0, 0, width=prs.slide_width, height=prs.slide_height) except: pass # B. 貼文字 for block in res["blocks"]: text_content = block.get("text", "") if not text_content: continue box = block.get("box_2d", [0, 0, 100, 100]) ymin, xmin, ymax, xmax = box left = Inches((xmin / 1000) * 16) top = Inches((ymin / 1000) * 9) width = Inches(((xmax - xmin) / 1000) * 16) height = Inches(((ymax - ymin) / 1000) * 9) textbox = slide.shapes.add_textbox(left, top, width, height) tf = textbox.text_frame tf.word_wrap = True p = tf.paragraphs[0] p.text = text_content try: p.font.size = Pt(int(block.get("font_size", 18))) except: p.font.size = Pt(18) p.font.bold = block.get("is_bold", False) try: hex_c = block.get("color", "#000000").replace("#", "") p.font.color.rgb = RGBColor.from_string(hex_c) except: pass # 5. 打包 progress(0.9, desc="正在打包檔案...") pptx_path = os.path.join(temp_dir, "restored_presentation.pptx") prs.save(pptx_path) txt_path = os.path.join(temp_dir, "content_log.txt") with open(txt_path, "w", encoding="utf-8") as f: f.write(full_text_log) zip_path = os.path.join(temp_dir, "notebooklm_restore_pack.zip") with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: zf.write(pptx_path, "restored_slides.pptx") zf.write(txt_path, "content_log.txt") for img_path in cleaned_images_paths: zf.write(img_path, os.path.join("cleaned_backgrounds", os.path.basename(img_path))) token_stats = f""" ### 📊 Token 用量統計 - **總輸入:** {total_input_tokens:,} - **總輸出:** {total_output_tokens:,} - **總計消耗:** {total_input_tokens + total_output_tokens:,} """ return zip_path, pptx_path, gallery_preview, token_stats # Init tool = NotebookLMTool() # --- Gradio UI --- with gr.Blocks(title="NotebookLM Slide Restorer,PPT.404", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🛠️ NotebookLM 投影片 PDF 還原神器 (PPT.404)") gr.Markdown("""
# 🪄 上傳 PDF,AI 自動:**去字背景** + **版面分析** + **合成可編輯 PPTX** 👉 歡迎 Star [GitHub](https://github.com/Deep-Learning-101/) ⭐ 覺得不錯 👈

🧠 補腦專區:Deep Learning 101

| 🔥 技術傳送門 (Tech Stack) | 📚 必讀心法 (Must Read) | | :--- | :--- | | 🤖 [**大語言模型 (LLM)**](https://deep-learning-101.github.io/Large-Language-Model) | 🏹 [**策略篇:企業入門策略**](https://deep-learning-101.github.io/Blog/AIBeginner) | | 📝 [**自然語言處理 (NLP)**](https://deep-learning-101.github.io/Natural-Language-Processing) | 📊 [**評測篇:臺灣 LLM 分析**](https://deep-learning-101.github.io/Blog/TW-LLM-Benchmark) | | 👁️ [**電腦視覺 (CV)**](https://deep-learning-101.github.io//Computer-Vision) | 🛠️ [**實戰篇:打造高精準 RAG**](https://deep-learning-101.github.io/RAG) | | 🎤 [**語音處理 (Speech)**](https://deep-learning-101.github.io/Speech-Processing) | 🕳️ [**避坑篇:AI Agent 開發陷阱**](https://deep-learning-101.github.io/agent) |
""") with gr.Row(): with gr.Column(): api_input = gr.Textbox(label="Google API Key", type="password", placeholder="貼上你的 Gemini API Key") btn_set_key = gr.Button("設定 Key") status_msg = gr.Markdown("") gr.Markdown("---") pdf_input = gr.File(label="上傳 PDF") btn_process = gr.Button("🚀 開始還原 PPTX (穩定版)", variant="primary") with gr.Column(): out_zip = gr.File(label="📦 下載完整包") out_pptx = gr.File(label="📊 直接下載 PPTX") out_tokens = gr.Markdown("### 📊 等待處理...") gr.Markdown("### 🖼️ 背景去字效果預覽") out_gallery = gr.Gallery(columns=4) btn_set_key.click(tool.set_key, inputs=api_input, outputs=status_msg) btn_process.click( tool.process_pdf, inputs=[pdf_input], outputs=[out_zip, out_pptx, out_gallery, out_tokens] ) if __name__ == "__main__": demo.launch()