概念定义
监督学习(Supervised Learning)是一种机器学习范式,通过使用有标签的训练数据来训练模型,使其能够在新的、未见过的数据上做出准确预测。在大语言模型中,监督学习主要体现为监督微调(SFT),用于将预训练模型适配到特定任务。详细解释
什么是监督学习?
监督学习是最经典且广泛应用的机器学习方法。在这种学习方式中,模型通过观察输入-输出对(有标签数据)来学习输入到输出的映射关系,从而在面对新输入时能够产生正确的输出。 核心要素- 标注数据:包含输入和正确答案的训练样本
- 学习目标:最小化预测输出与真实标签的差异
- 泛化能力:在未见数据上保持良好性能
- 评估指标:使用验证集评估模型效果
- 预训练阶段:下一个词预测(自监督形式)
- 微调阶段:任务特定的有监督训练
- 指令微调:输入指令,输出期望回应
- 对话训练:多轮对话的监督学习
形象比喻监督学习就像一个学生在老师指导下学习:传统课堂:老师给出问题和标准答案,学生反复练习
监督学习:算法通过大量”问题-答案”对来学习规律
考试应用:面对新问题时,运用学到的规律给出答案在AI领域,“标签”就是”标准答案”,“训练”就是”练习过程”。
发展历程
传统监督学习时代(1950s-2010s)- 线性回归、决策树、支持向量机
- 依赖人工特征工程
- 数据量相对较小
- 神经网络和深度学习
- 自动特征学习
- 大规模数据训练
- 预训练+微调范式
- 指令跟随和对话能力
- 人类反馈和偏好学习
技术原理
基本流程
核心步骤- 数据准备:收集和标注训练数据
- 模型设计:选择合适的网络架构
- 损失函数:定义优化目标
- 训练过程:通过梯度下降优化参数
- 模型评估:在验证集上测试性能
- 超参调优:优化学习率、batch size等
大语言模型中的监督学习
Copy
def supervised_fine_tuning(model, train_data, eval_data, config):
"""监督微调实现"""
optimizer = torch.optim.AdamW(
model.parameters(),
lr=config.learning_rate,
weight_decay=config.weight_decay
)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=config.num_epochs
)
for epoch in range(config.num_epochs):
model.train()
total_loss = 0
for batch in train_data:
# 前向传播
outputs = model(
input_ids=batch['input_ids'],
attention_mask=batch['attention_mask'],
labels=batch['labels']
)
loss = outputs.loss
total_loss += loss.item()
# 反向传播
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(
model.parameters(),
config.max_grad_norm
)
# 参数更新
optimizer.step()
optimizer.zero_grad()
# 学习率调度
scheduler.step()
# 验证评估
eval_loss, eval_metrics = evaluate_model(model, eval_data)
print(f"Epoch {epoch+1}/{config.num_epochs}")
print(f"Train Loss: {total_loss/len(train_data):.4f}")
print(f"Eval Loss: {eval_loss:.4f}")
print(f"Metrics: {eval_metrics}")
# 早停检查
if should_early_stop(eval_loss):
break
return model
实际应用
文本分类任务
Copy
class TextClassificationTrainer:
"""文本分类监督学习"""
def __init__(self, model, tokenizer, num_classes):
self.model = model
self.tokenizer = tokenizer
self.num_classes = num_classes
def prepare_data(self, texts, labels):
"""数据预处理"""
encoded = self.tokenizer(
texts,
truncation=True,
padding=True,
max_length=512,
return_tensors='pt'
)
# 标签转换
label_tensor = torch.tensor(labels, dtype=torch.long)
dataset = torch.utils.data.TensorDataset(
encoded['input_ids'],
encoded['attention_mask'],
label_tensor
)
return dataset
def train(self, train_texts, train_labels, val_texts, val_labels):
"""训练分类器"""
# 准备数据
train_dataset = self.prepare_data(train_texts, train_labels)
val_dataset = self.prepare_data(val_texts, val_labels)
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=16, shuffle=True
)
val_loader = torch.utils.data.DataLoader(
val_dataset, batch_size=16, shuffle=False
)
# 添加分类头
classifier = torch.nn.Linear(
self.model.config.hidden_size,
self.num_classes
)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(
list(self.model.parameters()) + list(classifier.parameters()),
lr=2e-5
)
# 训练循环
for epoch in range(5):
self.model.train()
classifier.train()
for batch in train_loader:
input_ids, attention_mask, labels = batch
# 获取模型输出
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask
)
# 分类预测
logits = classifier(outputs.last_hidden_state[:, 0, :])
loss = criterion(logits, labels)
# 反向传播
loss.backward()
optimizer.step()
optimizer.zero_grad()
# 验证
val_accuracy = self.evaluate(val_loader, classifier)
print(f"Epoch {epoch+1}, Val Accuracy: {val_accuracy:.3f}")
return classifier
def evaluate(self, dataloader, classifier):
"""评估模型"""
self.model.eval()
classifier.eval()
correct = 0
total = 0
with torch.no_grad():
for batch in dataloader:
input_ids, attention_mask, labels = batch
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask
)
logits = classifier(outputs.last_hidden_state[:, 0, :])
predictions = torch.argmax(logits, dim=1)
total += labels.size(0)
correct += (predictions == labels).sum().item()
return correct / total
指令微调(Instruction Tuning)
Copy
class InstructionTuner:
"""指令微调训练器"""
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def format_instruction_data(self, instruction, input_text, output_text):
"""格式化指令数据"""
if input_text:
prompt = f"指令:{instruction}\n输入:{input_text}\n输出:"
else:
prompt = f"指令:{instruction}\n输出:"
full_text = prompt + output_text
return {
'prompt': prompt,
'full_text': full_text,
'output': output_text
}
def create_training_examples(self, instruction_data):
"""创建训练样本"""
examples = []
for item in instruction_data:
formatted = self.format_instruction_data(
item['instruction'],
item.get('input', ''),
item['output']
)
examples.append(formatted)
return examples
def train_on_instructions(self, instruction_dataset, num_epochs=3):
"""指令微调训练"""
examples = self.create_training_examples(instruction_dataset)
optimizer = torch.optim.AdamW(
self.model.parameters(),
lr=1e-5,
weight_decay=0.01
)
for epoch in range(num_epochs):
total_loss = 0
for example in examples:
# 编码文本
encoded = self.tokenizer(
example['full_text'],
truncation=True,
padding=True,
max_length=1024,
return_tensors='pt'
)
# 创建标签(只对输出部分计算损失)
labels = encoded['input_ids'].clone()
# 找到输出开始位置
prompt_length = len(self.tokenizer.encode(example['prompt']))
labels[:, :prompt_length] = -100 # 忽略提示部分
# 前向传播
outputs = self.model(
input_ids=encoded['input_ids'],
attention_mask=encoded['attention_mask'],
labels=labels
)
loss = outputs.loss
total_loss += loss.item()
# 反向传播
loss.backward()
optimizer.step()
optimizer.zero_grad()
avg_loss = total_loss / len(examples)
print(f"Epoch {epoch+1}, Average Loss: {avg_loss:.4f}")
return self.model
# 使用示例
instruction_data = [
{
'instruction': '将以下文本翻译成英文',
'input': '你好,世界!',
'output': 'Hello, world!'
},
{
'instruction': '解释什么是机器学习',
'input': '',
'output': '机器学习是一种人工智能技术,通过算法让计算机从数据中学习模式,从而对新数据做出预测或决策。'
},
{
'instruction': '计算两个数的和',
'input': '25 + 37',
'output': '25 + 37 = 62'
}
]
trainer = InstructionTuner(model, tokenizer)
fine_tuned_model = trainer.train_on_instructions(instruction_data)
对话系统训练
Copy
class DialogueTrainer:
"""对话系统监督训练"""
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.special_tokens = {
'user': '<|user|>',
'assistant': '<|assistant|>',
'system': '<|system|>',
'end': '<|end|>'
}
def format_dialogue(self, conversation):
"""格式化对话数据"""
formatted = ""
for turn in conversation:
role = turn['role']
content = turn['content']
if role in self.special_tokens:
formatted += f"{self.special_tokens[role]}{content}{self.special_tokens['end']}"
return formatted
def create_dialogue_dataset(self, dialogues):
"""创建对话数据集"""
dataset = []
for dialogue in dialogues:
formatted = self.format_dialogue(dialogue)
# 编码
encoded = self.tokenizer(
formatted,
truncation=True,
max_length=2048,
return_tensors='pt'
)
# 创建标签
labels = encoded['input_ids'].clone()
# 只对助手回复计算损失
input_ids = encoded['input_ids'][0]
assistant_token = self.tokenizer.encode(
self.special_tokens['assistant']
)[0]
mask = torch.zeros_like(labels[0])
in_assistant_response = False
for i, token_id in enumerate(input_ids):
if token_id == assistant_token:
in_assistant_response = True
elif token_id == self.tokenizer.encode(
self.special_tokens['end']
)[0]:
in_assistant_response = False
if not in_assistant_response:
labels[0][i] = -100
dataset.append({
'input_ids': encoded['input_ids'],
'attention_mask': encoded['attention_mask'],
'labels': labels
})
return dataset
def train_dialogue_model(self, dialogue_data, num_epochs=5):
"""训练对话模型"""
dataset = self.create_dialogue_dataset(dialogue_data)
optimizer = torch.optim.AdamW(
self.model.parameters(),
lr=5e-6,
weight_decay=0.01
)
for epoch in range(num_epochs):
total_loss = 0
for batch in dataset:
outputs = self.model(
input_ids=batch['input_ids'],
attention_mask=batch['attention_mask'],
labels=batch['labels']
)
loss = outputs.loss
total_loss += loss.item()
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(
self.model.parameters(), 1.0
)
optimizer.step()
optimizer.zero_grad()
avg_loss = total_loss / len(dataset)
print(f"Epoch {epoch+1}, Average Loss: {avg_loss:.4f}")
return self.model
# 对话数据示例
dialogue_examples = [
[
{'role': 'user', 'content': '你能帮我解释一下什么是深度学习吗?'},
{'role': 'assistant', 'content': '深度学习是机器学习的一个分支,使用多层神经网络来学习数据的复杂模式。它能够自动提取特征,在图像识别、自然语言处理等领域表现出色。'}
],
[
{'role': 'user', 'content': '我想学Python编程,有什么建议吗?'},
{'role': 'assistant', 'content': '建议你从基础语法开始,然后练习编写小程序。可以使用在线教程如Python.org官方教程,多做实际项目来巩固知识。'}
]
]
2024年最新技术
参数高效微调(PEFT)
Copy
from peft import LoraConfig, get_peft_model, TaskType
class ParameterEfficientTrainer:
"""参数高效微调(2024年主流方法)"""
def __init__(self, base_model, task_type=TaskType.CAUSAL_LM):
self.base_model = base_model
self.task_type = task_type
def setup_lora(self, r=16, lora_alpha=32, lora_dropout=0.05):
"""设置LoRA配置"""
lora_config = LoraConfig(
task_type=self.task_type,
inference_mode=False,
r=r, # 低秩分解的秩
lora_alpha=lora_alpha, # LoRA缩放参数
lora_dropout=lora_dropout, # LoRA层的dropout
target_modules=["q_proj", "v_proj", "k_proj", "o_proj"], # 目标模块
)
# 应用LoRA
self.model = get_peft_model(self.base_model, lora_config)
# 打印可训练参数
self.model.print_trainable_parameters()
return self.model
def setup_adalora(self, init_r=12, target_r=8, beta1=0.85, beta2=0.85):
"""设置AdaLoRA(自适应LoRA)"""
from peft import AdaLoraConfig
adalora_config = AdaLoraConfig(
task_type=self.task_type,
inference_mode=False,
r=init_r,
lora_alpha=32,
target_r=target_r, # 目标秩
lora_dropout=0.05,
beta1=beta1, # 重要性估计的指数移动平均参数
beta2=beta2,
tinit=200, # 初始warmup步数
tfinal=1000, # 最终剪枝步数
deltaT=10, # 剪枝间隔
target_modules=["q_proj", "v_proj", "k_proj", "o_proj"],
)
self.model = get_peft_model(self.base_model, adalora_config)
return self.model
def setup_ia3(self):
"""设置IA³(Infused Adapter by Inhibiting and Amplifying Inner Activations)"""
from peft import IA3Config
ia3_config = IA3Config(
task_type=self.task_type,
target_modules=["k_proj", "v_proj", "down_proj"],
feedforward_modules=["down_proj"],
)
self.model = get_peft_model(self.base_model, ia3_config)
return self.model
# 使用示例
trainer = ParameterEfficientTrainer(base_model)
# LoRA微调
lora_model = trainer.setup_lora(r=32, lora_alpha=64)
# AdaLoRA微调
adalora_model = trainer.setup_adalora(init_r=16, target_r=8)
双阶段混合微调(DMT)
Copy
class DualStageMixedFineTuner:
"""双阶段混合微调(2024年最新方法)"""
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def stage1_skill_learning(self, skill_datasets, num_epochs=3):
"""阶段1:技能学习"""
print("阶段1:独立技能学习")
skill_models = {}
for skill_name, dataset in skill_datasets.items():
print(f"训练技能:{skill_name}")
# 复制基础模型
skill_model = copy.deepcopy(self.model)
# 独立训练每个技能
skill_model = self.train_single_skill(
skill_model, dataset, num_epochs
)
skill_models[skill_name] = skill_model
return skill_models
def stage2_mixed_training(self, skill_models, mixed_dataset,
num_epochs=2, mixing_ratio=0.3):
"""阶段2:混合训练"""
print("阶段2:混合技能训练")
# 选择最佳基础模型
base_model = self.select_best_base_model(skill_models)
# 创建混合数据集
mixed_data = self.create_mixed_dataset(
mixed_dataset, mixing_ratio
)
# 混合训练
final_model = self.train_mixed_skills(
base_model, mixed_data, num_epochs
)
return final_model
def train_single_skill(self, model, dataset, num_epochs):
"""训练单一技能"""
optimizer = torch.optim.AdamW(
model.parameters(),
lr=2e-5,
weight_decay=0.01
)
for epoch in range(num_epochs):
model.train()
total_loss = 0
for batch in dataset:
outputs = model(
input_ids=batch['input_ids'],
attention_mask=batch['attention_mask'],
labels=batch['labels']
)
loss = outputs.loss
total_loss += loss.item()
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
optimizer.zero_grad()
print(f" Epoch {epoch+1}, Loss: {total_loss/len(dataset):.4f}")
return model
def create_mixed_dataset(self, datasets, mixing_ratio):
"""创建混合数据集"""
mixed_data = []
# 计算每个技能的样本数
total_samples = sum(len(dataset) for dataset in datasets.values())
samples_per_skill = int(total_samples * mixing_ratio / len(datasets))
for skill_name, dataset in datasets.items():
# 随机采样
sampled_indices = torch.randperm(len(dataset))[:samples_per_skill]
for idx in sampled_indices:
mixed_data.append(dataset[idx])
# 打乱混合数据
random.shuffle(mixed_data)
return mixed_data
def knowledge_distillation_training(self, student_model, teacher_models,
dataset, temperature=3.0, alpha=0.5):
"""知识蒸馏训练"""
print("执行知识蒸馏训练")
optimizer = torch.optim.AdamW(
student_model.parameters(),
lr=1e-5,
weight_decay=0.01
)
kl_loss = torch.nn.KLDivLoss(reduction='batchmean')
ce_loss = torch.nn.CrossEntropyLoss()
for epoch in range(3):
student_model.train()
for batch in dataset:
# 学生模型输出
student_outputs = student_model(
input_ids=batch['input_ids'],
attention_mask=batch['attention_mask']
)
student_logits = student_outputs.logits
# 教师模型集成输出
teacher_logits_list = []
for teacher in teacher_models.values():
teacher.eval()
with torch.no_grad():
teacher_outputs = teacher(
input_ids=batch['input_ids'],
attention_mask=batch['attention_mask']
)
teacher_logits_list.append(teacher_outputs.logits)
# 平均教师输出
avg_teacher_logits = torch.stack(teacher_logits_list).mean(dim=0)
# 计算蒸馏损失
distill_loss = kl_loss(
F.log_softmax(student_logits / temperature, dim=-1),
F.softmax(avg_teacher_logits / temperature, dim=-1)
) * (temperature ** 2)
# 计算标准损失
standard_loss = ce_loss(
student_logits.view(-1, student_logits.size(-1)),
batch['labels'].view(-1)
)
# 总损失
total_loss = alpha * distill_loss + (1 - alpha) * standard_loss
total_loss.backward()
optimizer.step()
optimizer.zero_grad()
return student_model
直接偏好优化(DPO)
Copy
class DirectPreferenceOptimizer:
"""直接偏好优化(2024年热门方法)"""
def __init__(self, model, ref_model, tokenizer, beta=0.1):
self.model = model
self.ref_model = ref_model # 参考模型
self.tokenizer = tokenizer
self.beta = beta # 温度参数
def dpo_loss(self, chosen_logps, rejected_logps,
chosen_ref_logps, rejected_ref_logps):
"""DPO损失函数"""
# 计算logits差异
chosen_relative_logps = chosen_logps - chosen_ref_logps
rejected_relative_logps = rejected_logps - rejected_ref_logps
# DPO损失
loss = -F.logsigmoid(
self.beta * (chosen_relative_logps - rejected_relative_logps)
).mean()
return loss
def get_batch_logps(self, model, input_ids, attention_mask, labels):
"""计算批次的log概率"""
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels
)
logits = outputs.logits
labels = labels[:, 1:].clone() # 移除第一个token
logits = logits[:, :-1, :] # 移除最后一个logit
# 计算log概率
log_probs = F.log_softmax(logits, dim=-1)
selected_log_probs = torch.gather(
log_probs,
2,
labels.unsqueeze(-1)
).squeeze(-1)
# 计算序列总概率
mask = (labels != -100).float()
sequence_log_probs = (selected_log_probs * mask).sum(dim=1)
return sequence_log_probs
def train_dpo(self, preference_dataset, num_epochs=3):
"""DPO训练"""
optimizer = torch.optim.AdamW(
self.model.parameters(),
lr=1e-6,
weight_decay=0.01
)
for epoch in range(num_epochs):
total_loss = 0
for batch in preference_dataset:
# 获取偏好数据
chosen_inputs = batch['chosen']
rejected_inputs = batch['rejected']
# 计算当前模型的log概率
chosen_logps = self.get_batch_logps(
self.model,
chosen_inputs['input_ids'],
chosen_inputs['attention_mask'],
chosen_inputs['labels']
)
rejected_logps = self.get_batch_logps(
self.model,
rejected_inputs['input_ids'],
rejected_inputs['attention_mask'],
rejected_inputs['labels']
)
# 计算参考模型的log概率
with torch.no_grad():
chosen_ref_logps = self.get_batch_logps(
self.ref_model,
chosen_inputs['input_ids'],
chosen_inputs['attention_mask'],
chosen_inputs['labels']
)
rejected_ref_logps = self.get_batch_logps(
self.ref_model,
rejected_inputs['input_ids'],
rejected_inputs['attention_mask'],
rejected_inputs['labels']
)
# 计算DPO损失
loss = self.dpo_loss(
chosen_logps, rejected_logps,
chosen_ref_logps, rejected_ref_logps
)
total_loss += loss.item()
# 反向传播
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
optimizer.step()
optimizer.zero_grad()
avg_loss = total_loss / len(preference_dataset)
print(f"Epoch {epoch+1}, DPO Loss: {avg_loss:.4f}")
return self.model
def create_preference_dataset(self, prompts, responses_list, preferences):
"""创建偏好数据集"""
preference_data = []
for prompt, responses, preference in zip(prompts, responses_list, preferences):
chosen_response = responses[preference['chosen']]
rejected_response = responses[preference['rejected']]
# 格式化输入
chosen_text = f"{prompt}{chosen_response}"
rejected_text = f"{prompt}{rejected_response}"
# 编码
chosen_encoded = self.tokenizer(
chosen_text,
truncation=True,
max_length=1024,
return_tensors='pt'
)
rejected_encoded = self.tokenizer(
rejected_text,
truncation=True,
max_length=1024,
return_tensors='pt'
)
# 创建标签
chosen_labels = chosen_encoded['input_ids'].clone()
rejected_labels = rejected_encoded['input_ids'].clone()
# 只对回复部分计算损失
prompt_length = len(self.tokenizer.encode(prompt))
chosen_labels[:, :prompt_length] = -100
rejected_labels[:, :prompt_length] = -100
preference_data.append({
'chosen': {
'input_ids': chosen_encoded['input_ids'],
'attention_mask': chosen_encoded['attention_mask'],
'labels': chosen_labels
},
'rejected': {
'input_ids': rejected_encoded['input_ids'],
'attention_mask': rejected_encoded['attention_mask'],
'labels': rejected_labels
}
})
return preference_data
监督学习最佳实践
- 数据质量优先:高质量标注数据比大量低质量数据更重要
- 合适的学习率:通常比预训练低1-2个数量级
- 梯度裁剪:防止梯度爆炸,提高训练稳定性
- 早停机制:避免过拟合,提高泛化能力
- 数据平衡:确保各类别数据分布均衡
- 参数高效:使用LoRA等方法减少计算成本
性能评估与分析
评估指标体系
Copy
class SupervisedLearningEvaluator:
"""监督学习评估器"""
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def evaluate_classification(self, test_data):
"""分类任务评估"""
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.metrics import confusion_matrix, classification_report
predictions = []
true_labels = []
self.model.eval()
with torch.no_grad():
for batch in test_data:
outputs = self.model(
input_ids=batch['input_ids'],
attention_mask=batch['attention_mask']
)
preds = torch.argmax(outputs.logits, dim=-1)
predictions.extend(preds.cpu().numpy())
true_labels.extend(batch['labels'].cpu().numpy())
# 计算指标
accuracy = accuracy_score(true_labels, predictions)
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels, predictions, average='weighted'
)
return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'confusion_matrix': confusion_matrix(true_labels, predictions),
'classification_report': classification_report(true_labels, predictions)
}
def evaluate_generation(self, test_prompts, reference_outputs):
"""生成任务评估"""
from rouge_score import rouge_scorer
from bert_score import score
generated_outputs = []
self.model.eval()
for prompt in test_prompts:
encoded = self.tokenizer(
prompt,
return_tensors='pt',
truncation=True,
max_length=512
)
with torch.no_grad():
outputs = self.model.generate(
input_ids=encoded['input_ids'],
attention_mask=encoded['attention_mask'],
max_new_tokens=256,
temperature=0.7,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
generated = self.tokenizer.decode(
outputs[0][len(encoded['input_ids'][0]):],
skip_special_tokens=True
)
generated_outputs.append(generated)
# ROUGE评分
scorer = rouge_scorer.RougeScorer(
['rouge1', 'rouge2', 'rougeL'],
use_stemmer=True
)
rouge_scores = []
for ref, gen in zip(reference_outputs, generated_outputs):
scores = scorer.score(ref, gen)
rouge_scores.append(scores)
# BERTScore
P, R, F1 = score(
generated_outputs,
reference_outputs,
lang='zh',
verbose=True
)
# 计算平均分数
avg_rouge = {
'rouge1': {
'precision': sum(s['rouge1'].precision for s in rouge_scores) / len(rouge_scores),
'recall': sum(s['rouge1'].recall for s in rouge_scores) / len(rouge_scores),
'fmeasure': sum(s['rouge1'].fmeasure for s in rouge_scores) / len(rouge_scores)
},
'rouge2': {
'precision': sum(s['rouge2'].precision for s in rouge_scores) / len(rouge_scores),
'recall': sum(s['rouge2'].recall for s in rouge_scores) / len(rouge_scores),
'fmeasure': sum(s['rouge2'].fmeasure for s in rouge_scores) / len(rouge_scores)
},
'rougeL': {
'precision': sum(s['rougeL'].precision for s in rouge_scores) / len(rouge_scores),
'recall': sum(s['rougeL'].recall for s in rouge_scores) / len(rouge_scores),
'fmeasure': sum(s['rougeL'].fmeasure for s in rouge_scores) / len(rouge_scores)
}
}
return {
'rouge': avg_rouge,
'bert_score': {
'precision': P.mean().item(),
'recall': R.mean().item(),
'f1': F1.mean().item()
},
'generated_outputs': generated_outputs
}
def evaluate_instruction_following(self, instruction_test_set):
"""指令跟随能力评估"""
scores = []
for item in instruction_test_set:
instruction = item['instruction']
input_text = item.get('input', '')
expected_output = item['output']
# 构建提示
if input_text:
prompt = f"指令:{instruction}\n输入:{input_text}\n输出:"
else:
prompt = f"指令:{instruction}\n输出:"
# 生成回复
encoded = self.tokenizer(
prompt,
return_tensors='pt',
truncation=True,
max_length=512
)
with torch.no_grad():
outputs = self.model.generate(
input_ids=encoded['input_ids'],
max_new_tokens=256,
temperature=0.1,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
generated = self.tokenizer.decode(
outputs[0][len(encoded['input_ids'][0]):],
skip_special_tokens=True
).strip()
# 评估质量(可以使用GPT-4等模型评估)
quality_score = self.evaluate_response_quality(
instruction, input_text, expected_output, generated
)
scores.append({
'instruction': instruction,
'expected': expected_output,
'generated': generated,
'quality_score': quality_score
})
avg_score = sum(item['quality_score'] for item in scores) / len(scores)
return {
'average_score': avg_score,
'detailed_scores': scores
}
2024年基准测试结果
不同微调方法性能对比方法 | 参数量 | 训练时间 | 准确率 | ROUGE-L | 成本 |
---|---|---|---|---|---|
全量微调 | 100% | 24h | 92.3% | 0.845 | 高 |
LoRA (r=16) | 0.6% | 4h | 91.8% | 0.838 | 低 |
AdaLoRA | 0.4% | 5h | 91.9% | 0.841 | 低 |
IA³ | 0.01% | 2h | 89.7% | 0.825 | 极低 |
QLoRA | 0.6% | 6h | 91.5% | 0.836 | 极低 |
模型规模 | 基础性能 | 微调后性能 | 提升幅度 | 最优数据量 |
---|---|---|---|---|
7B | 78.5% | 89.2% | +10.7% | 10K |
13B | 82.1% | 91.8% | +9.7% | 15K |
30B | 86.3% | 94.1% | +7.8% | 25K |
70B | 89.7% | 95.9% | +6.2% | 50K |
挑战与局限
主要挑战
1. 灾难性遗忘Copy
def measure_catastrophic_forgetting(model, original_tasks, new_task_data):
"""测量灾难性遗忘程度"""
# 微调前性能
before_scores = {}
for task_name, task_data in original_tasks.items():
before_scores[task_name] = evaluate_task(model, task_data)
# 执行微调
fine_tuned_model = supervised_fine_tuning(model, new_task_data)
# 微调后性能
after_scores = {}
for task_name, task_data in original_tasks.items():
after_scores[task_name] = evaluate_task(fine_tuned_model, task_data)
# 计算遗忘程度
forgetting_scores = {}
for task_name in original_tasks.keys():
forgetting = before_scores[task_name] - after_scores[task_name]
forgetting_scores[task_name] = forgetting
return forgetting_scores
Copy
def detect_data_bias(dataset, protected_attributes):
"""检测数据偏见"""
bias_analysis = {}
for attribute in protected_attributes:
# 分析标签分布
attribute_groups = dataset.groupby(attribute)
bias_metrics = {}
for group_name, group_data in attribute_groups:
label_dist = group_data['label'].value_counts(normalize=True)
bias_metrics[group_name] = label_dist.to_dict()
bias_analysis[attribute] = bias_metrics
return bias_analysis
def mitigate_bias(dataset, strategy='resampling'):
"""缓解数据偏见"""
if strategy == 'resampling':
# 重采样平衡
balanced_data = []
# 按标签分组
label_groups = dataset.groupby('label')
min_size = min(len(group) for _, group in label_groups)
for label, group in label_groups:
sampled = group.sample(n=min_size, random_state=42)
balanced_data.append(sampled)
return pd.concat(balanced_data, ignore_index=True)
elif strategy == 'augmentation':
# 数据增强
augmented_data = dataset.copy()
# 对少数类进行增强
label_counts = dataset['label'].value_counts()
majority_count = label_counts.max()
for label, count in label_counts.items():
if count < majority_count:
minority_data = dataset[dataset['label'] == label]
# 增强策略:同义词替换、回译等
augmented_samples = data_augmentation(
minority_data,
target_count=majority_count - count
)
augmented_data = pd.concat([
augmented_data,
augmented_samples
], ignore_index=True)
return augmented_data
Copy
class OverfittingDetector:
"""过拟合检测器"""
def __init__(self, patience=5, min_delta=0.001):
self.patience = patience
self.min_delta = min_delta
self.best_val_loss = float('inf')
self.patience_counter = 0
def check_overfitting(self, train_loss, val_loss, epoch):
"""检查是否过拟合"""
# 验证损失改善
if val_loss < self.best_val_loss - self.min_delta:
self.best_val_loss = val_loss
self.patience_counter = 0
return False, "继续训练"
else:
self.patience_counter += 1
if self.patience_counter >= self.patience:
return True, f"检测到过拟合,在第{epoch}轮停止"
return False, f"验证损失未改善 ({self.patience_counter}/{self.patience})"
def analyze_learning_curves(self, train_losses, val_losses):
"""分析学习曲线"""
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 5))
# 损失曲线
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='训练损失')
plt.plot(val_losses, label='验证损失')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('训练/验证损失曲线')
plt.legend()
plt.grid(True)
# 过拟合分析
plt.subplot(1, 2, 2)
gap = [val - train for train, val in zip(train_losses, val_losses)]
plt.plot(gap, label='验证-训练损失差')
plt.axhline(y=0, color='r', linestyle='--', alpha=0.5)
plt.xlabel('Epoch')
plt.ylabel('Loss Gap')
plt.title('过拟合检测')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
# 过拟合诊断
final_gap = gap[-10:] # 最后10个epoch的平均gap
avg_gap = sum(final_gap) / len(final_gap)
if avg_gap > 0.1:
return "严重过拟合"
elif avg_gap > 0.05:
return "轻微过拟合"
else:
return "正常拟合"
监督学习注意事项
- 数据泄露:确保训练、验证、测试集严格分离
- 标签噪声:清理和验证标注数据的质量
- 分布偏移:训练数据与实际应用数据分布一致
- 计算资源:合理安排GPU内存和训练时间
- 版本控制:记录模型、数据、代码的版本信息
- 伦理考量:避免训练有害或有偏见的模型
未来发展趋势
自动化监督学习
Copy
class AutoSupervisedLearner:
"""自动化监督学习系统"""
def __init__(self, model_family="llama"):
self.model_family = model_family
def auto_data_labeling(self, unlabeled_data, seed_examples, confidence_threshold=0.9):
"""自动数据标注"""
labeled_data = seed_examples.copy()
# 使用种子数据训练初始模型
initial_model = self.train_initial_model(seed_examples)
# 迭代标注
remaining_data = unlabeled_data.copy()
while len(remaining_data) > 0:
# 预测标签和置信度
predictions = []
for sample in remaining_data:
pred, confidence = initial_model.predict_with_confidence(sample)
predictions.append((sample, pred, confidence))
# 选择高置信度样本
high_confidence = [
(sample, pred) for sample, pred, conf in predictions
if conf >= confidence_threshold
]
if len(high_confidence) == 0:
break # 没有高置信度样本,停止
# 添加到标注数据
for sample, label in high_confidence:
labeled_data.append({'text': sample, 'label': label})
remaining_data.remove(sample)
# 重新训练模型
initial_model = self.retrain_model(initial_model, labeled_data)
# 更新置信度阈值(逐渐降低)
confidence_threshold *= 0.95
return labeled_data
def auto_hyperparameter_optimization(self, model, train_data, val_data):
"""自动超参数优化"""
import optuna
def objective(trial):
# 定义超参数搜索空间
lr = trial.suggest_float('learning_rate', 1e-6, 1e-4, log=True)
batch_size = trial.suggest_categorical('batch_size', [8, 16, 32, 64])
warmup_steps = trial.suggest_int('warmup_steps', 0, 1000)
weight_decay = trial.suggest_float('weight_decay', 0.0, 0.3)
# 配置训练参数
config = {
'learning_rate': lr,
'batch_size': batch_size,
'warmup_steps': warmup_steps,
'weight_decay': weight_decay,
'num_epochs': 3 # 减少epoch数以加快搜索
}
# 训练模型
trained_model = self.train_with_config(model, train_data, config)
# 在验证集上评估
val_score = self.evaluate(trained_model, val_data)
return val_score
# 执行优化
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=100)
best_params = study.best_params
print(f"最佳超参数: {best_params}")
return best_params
def continual_learning(self, base_model, task_sequence):
"""持续学习"""
current_model = base_model
task_memories = []
for task_id, task_data in enumerate(task_sequence):
print(f"学习任务 {task_id + 1}")
# 训练当前任务
current_model = self.train_single_task(current_model, task_data)
# 保存任务记忆
task_memory = self.extract_task_memory(current_model, task_data)
task_memories.append(task_memory)
# 防止灾难性遗忘
if len(task_memories) > 1:
current_model = self.rehearsal_training(
current_model, task_memories
)
return current_model, task_memories
def extract_task_memory(self, model, task_data, memory_size=1000):
"""提取任务记忆"""
# 选择代表性样本
representative_samples = self.select_representative_samples(
task_data, memory_size
)
# 提取模型状态
model_state = {
'parameters': model.state_dict(),
'samples': representative_samples
}
return model_state
多模态监督学习
Copy
class MultimodalSupervisedLearner:
"""多模态监督学习"""
def __init__(self, vision_model, language_model):
self.vision_model = vision_model
self.language_model = language_model
def joint_training(self, multimodal_data, num_epochs=5):
"""联合训练"""
# 创建联合优化器
all_params = list(self.vision_model.parameters()) + \
list(self.language_model.parameters())
optimizer = torch.optim.AdamW(all_params, lr=1e-5)
for epoch in range(num_epochs):
total_loss = 0
for batch in multimodal_data:
images = batch['images']
texts = batch['texts']
labels = batch['labels']
# 视觉特征提取
vision_features = self.vision_model(images)
# 文本特征提取
text_features = self.language_model.encode(texts)
# 特征融合
fused_features = self.fuse_features(
vision_features, text_features
)
# 分类预测
logits = self.classifier(fused_features)
loss = F.cross_entropy(logits, labels)
total_loss += loss.item()
# 反向传播
loss.backward()
optimizer.step()
optimizer.zero_grad()
print(f"Epoch {epoch+1}, Loss: {total_loss/len(multimodal_data):.4f}")
return self.vision_model, self.language_model
def cross_modal_alignment(self, image_text_pairs):
"""跨模态对齐学习"""
contrastive_loss = torch.nn.CosineEmbeddingLoss()
optimizer = torch.optim.AdamW(
list(self.vision_model.parameters()) +
list(self.language_model.parameters()),
lr=1e-5
)
for epoch in range(10):
total_loss = 0
for batch in image_text_pairs:
images = batch['images']
texts = batch['texts']
# 提取特征
image_embeds = self.vision_model(images)
text_embeds = self.language_model.encode(texts)
# 归一化
image_embeds = F.normalize(image_embeds, dim=-1)
text_embeds = F.normalize(text_embeds, dim=-1)
# 对比学习
# 正样本:匹配的图像-文本对
positive_loss = contrastive_loss(
image_embeds, text_embeds,
torch.ones(len(images))
)
# 负样本:不匹配的图像-文本对
shuffled_text_embeds = text_embeds[torch.randperm(len(texts))]
negative_loss = contrastive_loss(
image_embeds, shuffled_text_embeds,
-torch.ones(len(images))
)
total_loss_batch = positive_loss + negative_loss
total_loss += total_loss_batch.item()
total_loss_batch.backward()
optimizer.step()
optimizer.zero_grad()
print(f"Alignment Epoch {epoch+1}, Loss: {total_loss/len(image_text_pairs):.4f}")
相关概念
- 微调(Fine-tuning):在预训练模型基础上的任务特定训练
- 预训练:大规模无标签数据上的自监督学习
- 强化学习:通过奖励信号学习的范式
- 自监督学习:从数据本身构建监督信号
延伸阅读
推荐资源
- Attention Is All You Need - Transformer架构的原始论文
- LoRA: Low-Rank Adaptation of Large Language Models - 参数高效微调方法
- Direct Preference Optimization - DPO方法的原始论文
- The Ultimate Guide to Fine-Tuning LLMs - 2024年LLM微调综述
- Parameter-Efficient Transfer Learning for NLP - 参数高效迁移学习综述