| import gradio as gr | |
| import copy | |
| import re | |
| from threading import Thread | |
| from transformers import TextIteratorStreamer | |
| from qwen_vl_utils import process_vision_info | |
| def _parse_text(text): | |
| lines = text.split("\n") | |
| lines = [line for line in lines if line != ""] | |
| count = 0 | |
| for i, line in enumerate(lines): | |
| if "```" in line: | |
| count += 1 | |
| items = line.split("`") | |
| if count % 2 == 1: | |
| lines[i] = f'<pre><code class="language-{items[-1]}">' | |
| else: | |
| lines[i] = "<br></code></pre>" | |
| else: | |
| if i > 0: | |
| if count % 2 == 1: | |
| line = line.replace("`", r"\`") | |
| line = line.replace("<", "<") | |
| line = line.replace(">", ">") | |
| line = line.replace(" ", " ") | |
| line = line.replace("*", "*") | |
| line = line.replace("_", "_") | |
| line = line.replace("-", "-") | |
| line = line.replace(".", ".") | |
| line = line.replace("!", "!") | |
| line = line.replace("(", "(") | |
| line = line.replace(")", ")") | |
| line = line.replace("$", "$") | |
| lines[i] = "<br>" + line | |
| text = "".join(lines) | |
| return text | |
| def _remove_image_special(text): | |
| text = text.replace("<ref>", "").replace("</ref>", "") | |
| return re.sub(r"<box>.*?(</box>|$)", "", text) | |
| def is_video_file(filename): | |
| video_extensions = [".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm", ".mpeg"] | |
| return any(filename.lower().endswith(ext) for ext in video_extensions) | |
| def transform_messages(original_messages): | |
| transformed_messages = [] | |
| for message in original_messages: | |
| new_content = [] | |
| for item in message["content"]: | |
| if "image" in item: | |
| new_item = {"type": "image", "image": item["image"]} | |
| elif "text" in item: | |
| new_item = {"type": "text", "text": item["text"]} | |
| elif "video" in item: | |
| new_item = {"type": "video", "video": item["video"]} | |
| else: | |
| continue | |
| new_content.append(new_item) | |
| new_message = {"role": message["role"], "content": new_content} | |
| transformed_messages.append(new_message) | |
| return transformed_messages | |
| def make_demo(model, processor): | |
| def call_local_model(model, processor, messages): | |
| messages = transform_messages(messages) | |
| text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| inputs = processor(text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt").to(model.device) | |
| tokenizer = processor.tokenizer | |
| streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True) | |
| gen_kwargs = {"max_new_tokens": 512, "streamer": streamer, **inputs} | |
| thread = Thread(target=model.generate, kwargs=gen_kwargs) | |
| thread.start() | |
| generated_text = "" | |
| for new_text in streamer: | |
| generated_text += new_text | |
| yield generated_text | |
| def create_predict_fn(): | |
| def predict(_chatbot, task_history): | |
| chat_query = _chatbot[-1][0] | |
| query = task_history[-1][0] | |
| if len(chat_query) == 0: | |
| _chatbot.pop() | |
| task_history.pop() | |
| return _chatbot | |
| print("User: " + _parse_text(query)) | |
| history_cp = copy.deepcopy(task_history) | |
| full_response = "" | |
| messages = [] | |
| content = [] | |
| for q, a in history_cp: | |
| if isinstance(q, (tuple, list)): | |
| if is_video_file(q[0]): | |
| content.append({"video": f"file://{q[0]}"}) | |
| else: | |
| content.append({"image": f"file://{q[0]}"}) | |
| else: | |
| content.append({"text": q}) | |
| messages.append({"role": "user", "content": content}) | |
| messages.append({"role": "assistant", "content": [{"text": a}]}) | |
| content = [] | |
| messages.pop() | |
| for response in call_local_model(model, processor, messages): | |
| _chatbot[-1] = (_parse_text(chat_query), _remove_image_special(_parse_text(response))) | |
| yield _chatbot | |
| full_response = _parse_text(response) | |
| task_history[-1] = (query, full_response) | |
| print("Qwen-VL-Chat: " + _parse_text(full_response)) | |
| yield _chatbot | |
| return predict | |
| def create_regenerate_fn(): | |
| def regenerate(_chatbot, task_history): | |
| if not task_history: | |
| return _chatbot | |
| item = task_history[-1] | |
| if item[1] is None: | |
| return _chatbot | |
| task_history[-1] = (item[0], None) | |
| chatbot_item = _chatbot.pop(-1) | |
| if chatbot_item[0] is None: | |
| _chatbot[-1] = (_chatbot[-1][0], None) | |
| else: | |
| _chatbot.append((chatbot_item[0], None)) | |
| _chatbot_gen = predict(_chatbot, task_history) | |
| for _chatbot in _chatbot_gen: | |
| yield _chatbot | |
| return regenerate | |
| predict = create_predict_fn() | |
| regenerate = create_regenerate_fn() | |
| def add_text(history, task_history, text): | |
| task_text = text | |
| history = history if history is not None else [] | |
| task_history = task_history if task_history is not None else [] | |
| history = history + [(_parse_text(text), None)] | |
| task_history = task_history + [(task_text, None)] | |
| return history, task_history, "" | |
| def add_file(history, task_history, file): | |
| history = history if history is not None else [] | |
| task_history = task_history if task_history is not None else [] | |
| history = history + [((file.name,), None)] | |
| task_history = task_history + [((file.name,), None)] | |
| return history, task_history | |
| def reset_user_input(): | |
| return gr.update(value="") | |
| def reset_state(task_history): | |
| task_history.clear() | |
| return [] | |
| with gr.Blocks() as demo: | |
| gr.Markdown("""<center><font size=8>Qwen2-VL OpenVINO demo</center>""") | |
| chatbot = gr.Chatbot(label="Qwen2-VL", elem_classes="control-height", height=500) | |
| query = gr.Textbox(lines=2, label="Input") | |
| task_history = gr.State([]) | |
| with gr.Row(): | |
| addfile_btn = gr.UploadButton("📁 Upload (上传文件)", file_types=["image", "video"]) | |
| submit_btn = gr.Button("🚀 Submit (发送)") | |
| regen_btn = gr.Button("🤔️ Regenerate (重试)") | |
| empty_bin = gr.Button("🧹 Clear History (清除历史)") | |
| submit_btn.click(add_text, [chatbot, task_history, query], [chatbot, task_history]).then( | |
| predict, [chatbot, task_history], [chatbot], show_progress=True | |
| ) | |
| submit_btn.click(reset_user_input, [], [query]) | |
| empty_bin.click(reset_state, [task_history], [chatbot], show_progress=True) | |
| regen_btn.click(regenerate, [chatbot, task_history], [chatbot], show_progress=True) | |
| addfile_btn.upload(add_file, [chatbot, task_history, addfile_btn], [chatbot, task_history], show_progress=True) | |
| gr.Markdown( | |
| """\ | |
| <font size=2>Note: This demo is governed by the original license of Qwen2-VL. \ | |
| We strongly advise users not to knowingly generate or allow others to knowingly generate harmful content, \ | |
| including hate speech, violence, pornography, deception, etc. \ | |
| (注:本演示受Qwen2-VL的许可协议限制。我们强烈建议,用户不应传播及不应允许他人传播以下内容,\ | |
| 包括但不限于仇恨言论、暴力、色情、欺诈相关的有害信息。)""" | |
| ) | |
| return demo |