机器学习Web界面框架,快速构建AI模型演示和聊天机器人
import gradio as gr
import time
import random
from typing import List, Dict, Any, Optional
def simple_text_processor(text: str, temperature: float = 0.7) -> str:
"""简单文本处理函数"""
time.sleep(1) # 模拟处理时间
processed = f"处理结果: {text.upper()} (温度: {temperature})"
return processed
def image_classifier(image):
"""图像分类示例"""
time.sleep(2) # 模拟推理时间
classes = ["猫", "狗", "鸟", "汽车", "飞机"]
confidence = random.uniform(0.7, 0.99)
predicted_class = random.choice(classes)
return f"预测类别: {predicted_class}\n置信度: {confidence:.2%}"
def audio_transcriber(audio_file):
"""音频转录示例"""
if audio_file is None:
return "请上传音频文件"
# 模拟音频转录
time.sleep(3)
return "这是音频转录的结果文本示例。"
# 创建基础界面
def create_basic_interfaces():
"""创建基础界面集合"""
# 文本处理界面
text_interface = gr.Interface(
fn=simple_text_processor,
inputs=[
gr.Textbox(
label="输入文本",
placeholder="在这里输入文本...",
lines=3
),
gr.Slider(
minimum=0.1,
maximum=2.0,
value=0.7,
step=0.1,
label="温度参数"
)
],
outputs=gr.Textbox(label="处理结果"),
title="文本处理器",
description="一个简单的文本处理演示界面",
examples=[
["Hello World", 0.7],
["Gradio is awesome", 1.0],
["机器学习很有趣", 0.5]
]
)
# 图像分类界面
image_interface = gr.Interface(
fn=image_classifier,
inputs=gr.Image(
label="上传图像",
type="pil"
),
outputs=gr.Textbox(label="分类结果"),
title="图像分类器",
description="上传图像进行分类",
examples=["https://example.com/cat.jpg"] # 可以提供示例图片URL
)
# 音频转录界面
audio_interface = gr.Interface(
fn=audio_transcriber,
inputs=gr.Audio(
label="上传音频",
type="filepath"
),
outputs=gr.Textbox(label="转录结果"),
title="音频转录器",
description="上传音频文件进行语音转录"
)
return text_interface, image_interface, audio_interface
# 组合多个界面
def create_tabbed_interface():
"""创建选项卡界面"""
text_iface, image_iface, audio_iface = create_basic_interfaces()
demo = gr.TabbedInterface(
[text_iface, image_iface, audio_iface],
["文本处理", "图像分类", "音频转录"],
title="AI模型演示平台"
)
return demo
if __name__ == "__main__":
demo = create_tabbed_interface()
demo.launch(
share=True, # 创建公共链接
debug=True, # 启用调试模式
server_name="0.0.0.0", # 允许外部访问
server_port=7860
)
import openai
import anthropic
from typing import Generator, List, Dict, Any
class LLMChatBot:
"""LLM聊天机器人"""
def __init__(self):
self.openai_client = openai.OpenAI()
self.anthropic_client = anthropic.Anthropic()
self.conversation_history = []
def chat_with_openai(self,
message: str,
history: List[List[str]],
model: str = "gpt-3.5-turbo",
temperature: float = 0.7) -> Generator[str, None, None]:
"""OpenAI聊天生成器"""
# 构建消息历史
messages = []
for human, assistant in history:
messages.append({"role": "user", "content": human})
messages.append({"role": "assistant", "content": assistant})
messages.append({"role": "user", "content": message})
try:
# 流式响应
stream = self.openai_client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
stream=True
)
response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
response += content
yield response
except Exception as e:
yield f"错误: {str(e)}"
def chat_with_anthropic(self,
message: str,
history: List[List[str]]) -> str:
"""Anthropic聊天"""
# 构建消息历史
messages = []
for human, assistant in history:
messages.append({"role": "user", "content": human})
messages.append({"role": "assistant", "content": assistant})
messages.append({"role": "user", "content": message})
try:
response = self.anthropic_client.messages.create(
model="claude-3-sonnet-20240229",
max_tokens=2000,
messages=messages
)
return response.content[0].text
except Exception as e:
return f"错误: {str(e)}"
# 创建聊天机器人实例
chatbot = LLMChatBot()
def create_simple_chatbot():
"""创建简单聊天机器人"""
def respond(message, history):
"""聊天响应函数"""
# 使用生成器进行流式响应
return chatbot.chat_with_openai(message, history)
# 使用ChatInterface快速创建
demo = gr.ChatInterface(
fn=respond,
title="AI聊天助手",
description="与AI进行对话",
examples=[
"你好,请介绍一下自己",
"解释一下量子计算的基本概念",
"写一个Python排序算法",
"今天天气怎么样?"
],
cache_examples=False,
retry_btn="重试",
undo_btn="撤销",
clear_btn="清除对话"
)
return demo
def create_advanced_chatbot():
"""创建高级聊天机器人"""
with gr.Blocks(title="高级AI聊天机器人") as demo:
gr.Markdown("# 🤖 高级AI聊天机器人")
gr.Markdown("支持多模型对比和自定义参数")
with gr.Row():
with gr.Column(scale=1):
# 控制面板
model_choice = gr.Dropdown(
choices=["gpt-3.5-turbo", "gpt-4", "claude-3-sonnet"],
value="gpt-3.5-turbo",
label="选择模型"
)
temperature = gr.Slider(
minimum=0.1,
maximum=2.0,
value=0.7,
step=0.1,
label="温度参数"
)
max_tokens = gr.Slider(
minimum=100,
maximum=4000,
value=1000,
step=100,
label="最大Token数"
)
system_prompt = gr.Textbox(
label="系统提示词",
value="你是一个有用的AI助手。",
lines=3
)
# 对话统计
stats_display = gr.JSON(
label="对话统计",
value={"消息数": 0, "总Token": 0}
)
with gr.Column(scale=2):
# 聊天界面
chatbot_display = gr.Chatbot(
label="对话历史",
height=500,
show_label=True,
container=True,
bubble_full_width=False
)
with gr.Row():
msg_input = gr.Textbox(
label="输入消息",
placeholder="在这里输入你的消息...",
lines=2,
scale=4
)
with gr.Column(scale=1):
send_btn = gr.Button("发送", variant="primary")
clear_btn = gr.Button("清除")
# 文件上传区域
with gr.Row():
image_input = gr.Image(
label="上传图片(多模态对话)",
type="pil"
)
file_input = gr.File(
label="上传文件",
file_types=[".txt", ".pdf", ".docx"]
)
# 事件处理
def respond_with_params(message, history, model, temp, max_tok, sys_prompt, image, file):
"""带参数的响应函数"""
if not message.strip():
return history, ""
# 处理图片
if image is not None:
message += "\n[用户上传了一张图片]"
# 处理文件
if file is not None:
message += f"\n[用户上传了文件: {file.name}]"
# 调用相应的模型
if model.startswith("gpt"):
response_gen = chatbot.chat_with_openai(
message, history, model, temp
)
# 流式更新
partial_response = ""
for response in response_gen:
partial_response = response
yield history + [[message, partial_response]], ""
else: # Claude
response = chatbot.chat_with_anthropic(message, history)
yield history + [[message, response]], ""
def clear_conversation():
"""清除对话"""
return [], ""
# 绑定事件
send_btn.click(
respond_with_params,
inputs=[msg_input, chatbot_display, model_choice, temperature,
max_tokens, system_prompt, image_input, file_input],
outputs=[chatbot_display, msg_input]
)
msg_input.submit(
respond_with_params,
inputs=[msg_input, chatbot_display, model_choice, temperature,
max_tokens, system_prompt, image_input, file_input],
outputs=[chatbot_display, msg_input]
)
clear_btn.click(
clear_conversation,
outputs=[chatbot_display, msg_input]
)
return demo
import PIL.Image
import io
import base64
class MultiModalProcessor:
"""多模态处理器"""
def __init__(self):
# 这里可以初始化各种模型
pass
def process_text_and_image(self,
text: str,
image: PIL.Image.Image,
task: str = "describe") -> Dict[str, Any]:
"""处理文本和图像"""
if image is None:
return {"error": "请上传图像"}
# 模拟多模态处理
if task == "describe":
result = f"图像描述: 这是一张{image.size}像素的图片。用户询问: {text}"
elif task == "question":
result = f"图像问答: 关于这张图片的问题'{text}'的答案是..."
else:
result = f"其他任务: {task}"
return {
"result": result,
"image_info": {
"size": image.size,
"format": image.format,
"mode": image.mode
}
}
def process_audio_and_text(self,
audio_file,
text_instruction: str) -> str:
"""处理音频和文本"""
if audio_file is None:
return "请上传音频文件"
# 模拟音频处理
return f"音频处理结果: 根据指令'{text_instruction}'处理了音频文件"
multimodal_processor = MultiModalProcessor()
def create_multimodal_interface():
"""创建多模态界面"""
with gr.Blocks(title="多模态AI助手") as demo:
gr.Markdown("# 🌟 多模态AI助手")
gr.Markdown("支持文本、图像、音频的多模态交互")
with gr.Tab("图像理解"):
with gr.Row():
with gr.Column():
image_input = gr.Image(
label="上传图像",
type="pil"
)
text_input = gr.Textbox(
label="文本输入",
placeholder="询问关于图像的问题...",
lines=3
)
task_choice = gr.Radio(
choices=["describe", "question", "analyze"],
value="describe",
label="任务类型"
)
process_btn = gr.Button("处理", variant="primary")
with gr.Column():
result_output = gr.JSON(label="处理结果")
result_text = gr.Textbox(
label="文本结果",
lines=5
)
# 事件绑定
process_btn.click(
multimodal_processor.process_text_and_image,
inputs=[text_input, image_input, task_choice],
outputs=[result_output]
)
with gr.Tab("音频处理"):
with gr.Row():
with gr.Column():
audio_input = gr.Audio(
label="上传音频",
type="filepath"
)
instruction_input = gr.Textbox(
label="处理指令",
placeholder="描述你想对音频做什么...",
lines=2
)
audio_process_btn = gr.Button("处理音频")
with gr.Column():
audio_result = gr.Textbox(
label="处理结果",
lines=5
)
audio_process_btn.click(
multimodal_processor.process_audio_and_text,
inputs=[audio_input, instruction_input],
outputs=[audio_result]
)
with gr.Tab("批量处理"):
with gr.Row():
with gr.Column():
file_list = gr.File(
label="批量上传文件",
file_count="multiple",
file_types=["image", "audio", "text"]
)
batch_instruction = gr.Textbox(
label="批处理指令",
lines=3
)
batch_btn = gr.Button("开始批处理")
with gr.Column():
batch_progress = gr.Progress()
batch_results = gr.Dataframe(
headers=["文件名", "类型", "结果", "状态"],
label="批处理结果"
)
def batch_process(files, instruction):
"""批量处理文件"""
if not files:
return "没有选择文件"
results = []
for i, file in enumerate(files):
# 更新进度
progress = (i + 1) / len(files)
batch_progress(progress, f"处理文件 {i+1}/{len(files)}")
# 模拟处理
time.sleep(0.5)
file_type = file.name.split('.')[-1] if '.' in file.name else "unknown"
result = f"已处理: {instruction}"
status = "成功"
results.append([file.name, file_type, result, status])
return results
batch_btn.click(
batch_process,
inputs=[file_list, batch_instruction],
outputs=[batch_results]
)
return demo
import threading
import queue
import datetime
class RealTimeDataManager:
"""实时数据管理器"""
def __init__(self):
self.data_queue = queue.Queue()
self.is_running = False
self.data_history = []
def start_data_stream(self):
"""开始数据流"""
self.is_running = True
def generate_data():
while self.is_running:
# 模拟生成实时数据
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
value = random.randint(1, 100)
data_point = {
"timestamp": timestamp,
"value": value,
"status": "正常" if value < 80 else "异常"
}
self.data_queue.put(data_point)
self.data_history.append(data_point)
# 保持最近100个数据点
if len(self.data_history) > 100:
self.data_history.pop(0)
time.sleep(1)
thread = threading.Thread(target=generate_data, daemon=True)
thread.start()
def stop_data_stream(self):
"""停止数据流"""
self.is_running = False
def get_latest_data(self):
"""获取最新数据"""
try:
return self.data_queue.get_nowait()
except queue.Empty:
return None
def get_history_data(self):
"""获取历史数据"""
return self.data_history.copy()
real_time_manager = RealTimeDataManager()
def create_realtime_interface():
"""创建实时界面"""
with gr.Blocks(title="实时数据监控") as demo:
gr.Markdown("# 📊 实时数据监控")
with gr.Row():
start_btn = gr.Button("开始监控", variant="primary")
stop_btn = gr.Button("停止监控", variant="secondary")
refresh_btn = gr.Button("刷新", variant="secondary")
with gr.Row():
with gr.Column():
# 实时数据显示
current_data = gr.JSON(
label="当前数据",
value={}
)
status_indicator = gr.Textbox(
label="系统状态",
value="未启动"
)
with gr.Column():
# 历史数据图表
history_plot = gr.Plot(label="历史趋势")
# 数据表格
data_table = gr.Dataframe(
headers=["时间", "数值", "状态"],
label="数据历史"
)
# 状态变量
monitoring_state = gr.State(value=False)
def start_monitoring(state):
"""开始监控"""
if not state:
real_time_manager.start_data_stream()
return True, "监控中...", "系统正在运行"
return state, "已经在监控中", "系统正在运行"
def stop_monitoring(state):
"""停止监控"""
if state:
real_time_manager.stop_data_stream()
return False, "监控已停止", "系统已停止"
return state, "监控未启动", "系统已停止"
def update_data(state):
"""更新数据显示"""
if not state:
return {}, None, []
latest = real_time_manager.get_latest_data()
history = real_time_manager.get_history_data()
# 更新图表
if history:
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
timestamps = [d["timestamp"] for d in history[-20:]] # 最近20个点
values = [d["value"] for d in history[-20:]]
ax.plot(timestamps, values, 'b-o')
ax.set_title("实时数据趋势")
ax.set_xlabel("时间")
ax.set_ylabel("数值")
ax.tick_params(axis='x', rotation=45)
plt.tight_layout()
else:
fig = None
# 更新表格
table_data = []
for item in history[-10:]: # 最近10条记录
table_data.append([
item["timestamp"],
item["value"],
item["status"]
])
return latest or {}, fig, table_data
# 事件绑定
start_btn.click(
start_monitoring,
inputs=[monitoring_state],
outputs=[monitoring_state, status_indicator, status_indicator]
)
stop_btn.click(
stop_monitoring,
inputs=[monitoring_state],
outputs=[monitoring_state, status_indicator, status_indicator]
)
refresh_btn.click(
update_data,
inputs=[monitoring_state],
outputs=[current_data, history_plot, data_table]
)
# 自动刷新(每5秒)
demo.load(
update_data,
inputs=[monitoring_state],
outputs=[current_data, history_plot, data_table],
every=5
)
return demo
# MCP客户端集成示例
class MCPGradioClient:
"""MCP Gradio客户端"""
def __init__(self):
self.mcp_tools = {}
self.setup_mcp_tools()
def setup_mcp_tools(self):
"""设置MCP工具"""
self.mcp_tools = {
"weather": {
"description": "获取天气信息",
"function": self.get_weather
},
"calculator": {
"description": "执行数学计算",
"function": self.calculate
},
"file_manager": {
"description": "管理文件系统",
"function": self.manage_files
}
}
def get_weather(self, city: str) -> str:
"""获取天气信息"""
return f"{city}今天天气晴朗,温度25°C"
def calculate(self, expression: str) -> str:
"""计算表达式"""
try:
result = eval(expression) # 实际应用中需要安全处理
return f"{expression} = {result}"
except:
return "计算错误"
def manage_files(self, action: str, path: str = "") -> str:
"""文件管理"""
if action == "list":
return f"列出目录 {path} 的文件..."
elif action == "create":
return f"创建文件 {path}"
else:
return f"未知操作: {action}"
def create_mcp_interface():
"""创建MCP集成界面"""
mcp_client = MCPGradioClient()
with gr.Blocks(title="MCP工具集成") as demo:
gr.Markdown("# 🔧 MCP工具集成演示")
with gr.Row():
tool_choice = gr.Dropdown(
choices=list(mcp_client.mcp_tools.keys()),
label="选择工具",
value="weather"
)
tool_input = gr.Textbox(
label="工具参数",
placeholder="输入工具参数..."
)
execute_btn = gr.Button("执行", variant="primary")
tool_output = gr.Textbox(
label="执行结果",
lines=5
)
def execute_tool(tool_name, parameters):
"""执行MCP工具"""
if tool_name in mcp_client.mcp_tools:
tool_func = mcp_client.mcp_tools[tool_name]["function"]
try:
result = tool_func(parameters)
return f"✅ 执行成功:\n{result}"
except Exception as e:
return f"❌ 执行失败: {str(e)}"
else:
return "❌ 工具不存在"
execute_btn.click(
execute_tool,
inputs=[tool_choice, tool_input],
outputs=[tool_output]
)
return demo
def deploy_gradio_app():
"""部署Gradio应用的多种方式"""
# 选择要部署的应用
demo_choice = input("选择要部署的应用 (1-简单聊天, 2-多模态, 3-实时监控, 4-MCP工具): ")
if demo_choice == "1":
demo = create_simple_chatbot()
elif demo_choice == "2":
demo = create_multimodal_interface()
elif demo_choice == "3":
demo = create_realtime_interface()
elif demo_choice == "4":
demo = create_mcp_interface()
else:
demo = create_simple_chatbot()
# 部署选项
deployment_options = {
"local": {
"share": False,
"server_name": "127.0.0.1",
"server_port": 7860
},
"public": {
"share": True, # 创建公共Gradio链接
"server_name": "0.0.0.0",
"server_port": 7860
},
"huggingface": {
# 部署到Hugging Face Spaces
"share": True,
"server_name": "0.0.0.0",
"server_port": 7860
}
}
# 选择部署方式
deploy_type = input("部署方式 (local/public/huggingface): ") or "local"
options = deployment_options.get(deploy_type, deployment_options["local"])
print(f"启动应用,部署方式: {deploy_type}")
demo.launch(**options)
# Docker部署配置
def create_docker_files():
"""创建Docker部署文件"""
dockerfile_content = """
FROM python:3.9-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 7860
# 启动命令
CMD ["python", "app.py"]
"""
requirements_content = """
gradio>=4.0.0
openai
anthropic
Pillow
matplotlib
pandas
numpy
"""
docker_compose_content = """
version: '3.8'
services:
gradio-app:
build: .
ports:
- "7860:7860"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
volumes:
- ./data:/app/data
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- gradio-app
restart: unless-stopped
"""
return {
"Dockerfile": dockerfile_content,
"requirements.txt": requirements_content,
"docker-compose.yml": docker_compose_content
}