开源AI模型平台,100万+预训练模型的社区生态系统
from transformers import AutoTokenizer, AutoModelForCausalLM
# 快速加载任意预训练模型
model_name = "microsoft/DialoGPT-medium"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
# 2025年支持的主流模型类型
models_2025 = {
"文本生成": ["Llama-3.1", "Mistral-8x7B", "DeepSeek-R1"],
"多模态": ["GPT-4V", "LLaVA-1.6", "MiniCPM-V"],
"代码生成": ["CodeLlama-34B", "StarCoder2", "DeepSeek-Coder"],
"嵌入模型": ["BGE-M3", "E5-Instruct", "GTE-Large"],
"视觉模型": ["CLIP-ViT-L", "DINOv2", "SAM-2"]
}
# 模型搜索和过滤
from huggingface_hub import HfApi
api = HfApi()
models = api.list_models(
filter="text-generation",
sort="downloads",
direction=-1,
limit=10
)
for model in models:
print(f"模型: {model.id}, 下载量: {model.downloads}")
from huggingface_hub import snapshot_download, HfFolder
import os
class ModelVersionManager:
def __init__(self, model_id):
self.model_id = model_id
self.local_dir = f"./models/{model_id.replace('/', '_')}"
def download_specific_version(self, revision="main"):
"""下载特定版本的模型"""
snapshot_download(
repo_id=self.model_id,
revision=revision,
local_dir=self.local_dir,
local_dir_use_symlinks=False
)
return f"模型 {self.model_id} (版本: {revision}) 已下载到 {self.local_dir}"
def list_available_versions(self):
"""列出可用版本"""
api = HfApi()
repo_info = api.repo_info(repo_id=self.model_id)
branches = [ref.name for ref in repo_info.refs if ref.ref_type == "branch"]
tags = [ref.name for ref in repo_info.refs if ref.ref_type == "tag"]
return {
"branches": branches,
"tags": tags,
"default_branch": repo_info.default_branch
}
def compare_versions(self, version1, version2):
"""比较不同版本的差异"""
api = HfApi()
# 获取版本信息
v1_info = api.repo_info(self.model_id, revision=version1)
v2_info = api.repo_info(self.model_id, revision=version2)
return {
"version1": {"sha": v1_info.sha, "last_modified": v1_info.last_modified},
"version2": {"sha": v2_info.sha, "last_modified": v2_info.last_modified},
"files_changed": self._get_file_differences(version1, version2)
}
from transformers import pipeline
class HuggingFacePipelines:
def __init__(self):
self.pipelines = {}
def setup_common_pipelines(self):
"""设置常用Pipeline"""
# 2025年支持的主要任务
self.pipelines = {
# 文本任务
"text_generation": pipeline(
"text-generation",
model="microsoft/DialoGPT-medium",
device=0 if torch.cuda.is_available() else -1
),
# 多模态任务
"visual_question_answering": pipeline(
"visual-question-answering",
model="dandelin/vilt-b32-finetuned-vqa"
),
# 音频任务
"automatic_speech_recognition": pipeline(
"automatic-speech-recognition",
model="openai/whisper-large-v3"
),
# 文档理解
"document_question_answering": pipeline(
"document-question-answering",
model="impira/layoutlm-document-qa"
)
}
def batch_inference(self, task, inputs, batch_size=8):
"""批量推理优化"""
pipeline_obj = self.pipelines[task]
# 启用批处理
results = []
for i in range(0, len(inputs), batch_size):
batch = inputs[i:i + batch_size]
batch_results = pipeline_obj(batch)
results.extend(batch_results)
return results
def stream_generation(self, text, max_length=100):
"""流式文本生成"""
generator = self.pipelines["text_generation"]
# 启用流式输出
streamer = TextStreamer(
generator.tokenizer,
skip_prompt=True,
skip_special_tokens=True
)
return generator(
text,
max_length=max_length,
do_sample=True,
temperature=0.7,
streamer=streamer
)
from transformers import TrainingArguments, Trainer
from datasets import Dataset
import torch
class HuggingFaceTrainer:
def __init__(self, model_name, task_type="text-classification"):
self.model_name = model_name
self.task_type = task_type
self.setup_model_and_tokenizer()
def setup_model_and_tokenizer(self):
"""设置模型和分词器"""
if self.task_type == "text-classification":
from transformers import AutoModelForSequenceClassification
self.model = AutoModelForSequenceClassification.from_pretrained(
self.model_name,
num_labels=2
)
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
def prepare_dataset(self, texts, labels):
"""准备训练数据"""
def tokenize_function(examples):
return self.tokenizer(
examples["text"],
truncation=True,
padding=True,
max_length=512
)
# 创建数据集
dataset = Dataset.from_dict({
"text": texts,
"label": labels
})
# 分词处理
tokenized_dataset = dataset.map(
tokenize_function,
batched=True,
remove_columns=["text"]
)
return tokenized_dataset
def fine_tune(self, train_dataset, eval_dataset=None, output_dir="./results"):
"""执行微调训练"""
# 训练参数配置
training_args = TrainingArguments(
output_dir=output_dir,
num_train_epochs=3,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
warmup_steps=500,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=100,
evaluation_strategy="steps" if eval_dataset else "no",
eval_steps=500,
save_steps=1000,
load_best_model_at_end=True,
metric_for_best_model="eval_loss",
greater_is_better=False,
# 2025年新特性
fp16=True, # 混合精度训练
dataloader_num_workers=4,
gradient_checkpointing=True, # 梯度检查点
torch_compile=True # PyTorch 2.0编译优化
)
# 创建训练器
trainer = Trainer(
model=self.model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
tokenizer=self.tokenizer,
compute_metrics=self.compute_metrics
)
# 开始训练
trainer.train()
# 保存模型
trainer.save_model()
return trainer
def compute_metrics(self, eval_pred):
"""计算评估指标"""
predictions, labels = eval_pred
predictions = predictions.argmax(axis=-1)
from sklearn.metrics import accuracy_score, f1_score
return {
"accuracy": accuracy_score(labels, predictions),
"f1": f1_score(labels, predictions, average="weighted")
}
from datasets import load_dataset, Dataset, DatasetDict
from datasets import Audio, Image, Value, Features
class HuggingFaceDatasets:
def __init__(self):
self.supported_formats = [
"csv", "json", "parquet", "arrow",
"audio", "image", "text"
]
def load_popular_datasets(self):
"""加载热门数据集"""
datasets_2025 = {
# 文本数据集
"chinese_text": load_dataset("liwu/MNBVC", split="train[:1000]"),
"code_dataset": load_dataset("bigcode/the-stack-dedup", split="train[:1000]"),
# 多模态数据集
"vision_language": load_dataset("nlphuji/flickr30k", split="test[:100]"),
"audio_text": load_dataset("mozilla-foundation/common_voice_11_0", "zh-CN", split="test[:100]"),
# 指令数据集
"instruction_tuning": load_dataset("tatsu-lab/alpaca", split="train[:1000]")
}
return datasets_2025
def create_custom_dataset(self, data_path, data_type="text"):
"""创建自定义数据集"""
if data_type == "text":
dataset = load_dataset("text", data_files=data_path)
elif data_type == "multimodal":
# 多模态数据集示例
features = Features({
"image": Image(),
"text": Value("string"),
"label": Value("int64")
})
dataset = Dataset.from_dict({
"image": [img_path for img_path in image_paths],
"text": [text for text in texts],
"label": [label for label in labels]
}, features=features)
return dataset
def preprocess_dataset(self, dataset, tokenizer, max_length=512):
"""数据预处理"""
def preprocess_function(examples):
# 分词处理
model_inputs = tokenizer(
examples["text"],
max_length=max_length,
truncation=True,
padding=True
)
# 添加标签
model_inputs["labels"] = examples["label"]
return model_inputs
# 并行处理
processed_dataset = dataset.map(
preprocess_function,
batched=True,
num_proc=4, # 使用4个进程并行处理
remove_columns=dataset.column_names
)
return processed_dataset
def upload_to_hub(self, dataset, repo_id, private=False):
"""上传数据集到Hub"""
dataset.push_to_hub(
repo_id,
private=private,
commit_message="Upload custom dataset"
)
return f"数据集已上传到: https://huggingface.co/datasets/{repo_id}"
import requests
from huggingface_hub import InferenceClient
class HuggingFaceInference:
def __init__(self, api_token):
self.client = InferenceClient(token=api_token)
self.api_token = api_token
self.base_url = "https://api-inference.huggingface.co/models"
def text_generation(self, model_id, prompt, **kwargs):
"""文本生成API调用"""
response = self.client.text_generation(
model=model_id,
prompt=prompt,
max_new_tokens=kwargs.get("max_new_tokens", 100),
temperature=kwargs.get("temperature", 0.7),
top_p=kwargs.get("top_p", 0.9),
stream=kwargs.get("stream", False)
)
return response
def image_classification(self, model_id, image_path):
"""图像分类"""
with open(image_path, "rb") as f:
image_bytes = f.read()
response = self.client.image_classification(
model=model_id,
image=image_bytes
)
return response
def speech_to_text(self, model_id, audio_path):
"""语音转文本"""
with open(audio_path, "rb") as f:
audio_bytes = f.read()
response = self.client.automatic_speech_recognition(
model=model_id,
audio=audio_bytes
)
return response
def batch_inference(self, requests_list):
"""批量推理优化"""
responses = []
# 使用异步请求提高效率
import asyncio
import aiohttp
async def make_request(session, request_data):
async with session.post(
f"{self.base_url}/{request_data['model']}",
json=request_data['inputs'],
headers={"Authorization": f"Bearer {self.api_token}"}
) as response:
return await response.json()
async def batch_requests():
async with aiohttp.ClientSession() as session:
tasks = [make_request(session, req) for req in requests_list]
return await asyncio.gather(*tasks)
responses = asyncio.run(batch_requests())
return responses
# gradio_app.py - Hugging Face Spaces应用示例
import gradio as gr
from transformers import pipeline
class ChatbotApp:
def __init__(self):
self.chatbot = pipeline(
"conversational",
model="microsoft/DialoGPT-medium"
)
self.setup_interface()
def setup_interface(self):
"""设置Gradio界面"""
def chat_response(message, history):
# 构建对话历史
conversation = self.build_conversation(history)
# 生成回复
response = self.chatbot(conversation)
# 更新历史记录
history.append([message, response.generated_responses[-1]])
return "", history
# 创建界面
with gr.Blocks(title="AI聊天助手") as self.interface:
gr.Markdown("# 🤗 AI聊天助手")
chatbot = gr.Chatbot(height=400)
msg = gr.Textbox(
placeholder="输入你的消息...",
label="消息"
)
clear = gr.Button("清除对话")
# 绑定事件
msg.submit(chat_response, [msg, chatbot], [msg, chatbot])
clear.click(lambda: ([], ""), outputs=[chatbot, msg])
def launch(self):
"""启动应用"""
self.interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=True # 生成公开链接
)
# 启动应用
if __name__ == "__main__":
app = ChatbotApp()
app.launch()
from huggingface_hub import HfApi, create_repo
from huggingface_hub.utils import RepositoryNotFoundError
class HuggingFaceOrganization:
def __init__(self, org_name, token):
self.org_name = org_name
self.api = HfApi(token=token)
def create_team_repo(self, repo_name, repo_type="model", private=False):
"""创建团队仓库"""
try:
repo_id = f"{self.org_name}/{repo_name}"
create_repo(
repo_id=repo_id,
repo_type=repo_type,
private=private,
exist_ok=False
)
return f"仓库创建成功: https://huggingface.co/{repo_id}"
except Exception as e:
return f"创建失败: {str(e)}"
def manage_team_access(self, repo_name, username, role="read"):
"""管理团队访问权限"""
repo_id = f"{self.org_name}/{repo_name}"
try:
# 添加协作者
self.api.add_space_secret(
repo_id=repo_id,
key=f"USER_{username.upper()}_ROLE",
value=role
)
return f"用户 {username} 已获得 {role} 权限"
except Exception as e:
return f"权限设置失败: {str(e)}"
def sync_models_to_org(self, model_list):
"""同步模型到组织"""
synced_models = []
for model_id in model_list:
try:
# 复制模型到组织
new_repo_id = f"{self.org_name}/{model_id.split('/')[-1]}"
# 这里实现模型复制逻辑
self.copy_model_to_org(model_id, new_repo_id)
synced_models.append(new_repo_id)
except Exception as e:
print(f"同步 {model_id} 失败: {e}")
return synced_models
import onnxruntime as ort
from optimum.onnxruntime import ORTModelForSequenceClassification
from optimum.onnxruntime import ORTQuantizer, ORTConfig
class EdgeDeployment:
def __init__(self):
self.providers = ['CPUExecutionProvider']
if ort.get_available_providers():
# 检测可用的执行提供者
available = ort.get_available_providers()
if 'CUDAExecutionProvider' in available:
self.providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
def optimize_for_edge(self, model_id, optimization_level="all"):
"""为边缘设备优化模型"""
# 加载模型
ort_model = ORTModelForSequenceClassification.from_pretrained(
model_id,
from_transformers=True
)
# 量化配置
if optimization_level in ["all", "quantization"]:
quantizer = ORTQuantizer.from_pretrained(ort_model)
quantization_config = ORTConfig(
is_static=False,
format="QDQ", # 量化-去量化格式
mode="QLinearOps",
activations_dtype="int8",
weights_dtype="int8"
)
# 执行量化
quantizer.quantize(
quantization_config=quantization_config,
save_dir="./quantized_model"
)
# 图优化
if optimization_level in ["all", "optimization"]:
ort_model.save_pretrained(
"./optimized_model",
optimization_level="all"
)
return "./optimized_model"
def benchmark_performance(self, model_path, test_inputs):
"""性能基准测试"""
import time
import numpy as np
# 加载优化后的模型
session = ort.InferenceSession(
f"{model_path}/model.onnx",
providers=self.providers
)
# 预热
for _ in range(10):
_ = session.run(None, test_inputs[0])
# 基准测试
start_time = time.time()
for test_input in test_inputs:
_ = session.run(None, test_input)
total_time = time.time() - start_time
throughput = len(test_inputs) / total_time
return {
"total_time": total_time,
"throughput": throughput,
"avg_latency": total_time / len(test_inputs),
"providers": self.providers
}
from evaluate import load
from datasets import load_dataset
class ModelEvaluator:
def __init__(self):
self.metrics = {
"accuracy": load("accuracy"),
"f1": load("f1"),
"bleu": load("bleu"),
"rouge": load("rouge")
}
def evaluate_model_on_dataset(self, model, dataset_name, split="test"):
"""在标准数据集上评估模型"""
# 加载评估数据集
eval_dataset = load_dataset(dataset_name, split=split)
predictions = []
references = []
for example in eval_dataset:
# 模型预测
pred = model(example["input"])
predictions.append(pred)
references.append(example["target"])
# 计算多个指标
results = {}
for metric_name, metric in self.metrics.items():
if metric_name in ["accuracy", "f1"]:
score = metric.compute(
predictions=predictions,
references=references
)
else:
score = metric.compute(
predictions=predictions,
references=references
)
results[metric_name] = score
return results
def upload_evaluation_results(self, model_id, results, dataset_name):
"""上传评估结果到Hub"""
from huggingface_hub import ModelCard, ModelCardData
# 创建模型卡片
card_data = ModelCardData(
evaluation_results=[{
"task_type": "text-classification",
"dataset": dataset_name,
"metrics": results
}]
)
card = ModelCard.from_template(
card_data,
template_path="path/to/template.md"
)
# 推送到Hub
card.push_to_hub(model_id)