理解无监督学习的原理和应用,掌握聚类、降维、表示学习等核心技术在大语言模型中的应用
import numpy as np
import torch
import torch.nn as nn
from sklearn.cluster import KMeans
from torch.nn import functional as F
class TextClustering:
"""文本聚类实现"""
def __init__(self, encoder_model, n_clusters=10):
self.encoder = encoder_model
self.n_clusters = n_clusters
def extract_embeddings(self, texts):
"""提取文本嵌入"""
embeddings = []
self.encoder.eval()
with torch.no_grad():
for text in texts:
# 编码文本
inputs = self.encoder.tokenize(text)
embedding = self.encoder.encode(inputs)
embeddings.append(embedding.cpu().numpy())
return np.array(embeddings)
def kmeans_clustering(self, texts):
"""K-means聚类"""
# 提取嵌入
embeddings = self.extract_embeddings(texts)
# 执行K-means
kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
clusters = kmeans.fit_predict(embeddings)
# 计算聚类中心
centers = kmeans.cluster_centers_
# 分析聚类结果
cluster_info = {}
for i in range(self.n_clusters):
cluster_texts = [texts[j] for j in range(len(texts)) if clusters[j] == i]
cluster_info[i] = {
'size': len(cluster_texts),
'samples': cluster_texts[:5], # 前5个样本
'center': centers[i]
}
return clusters, cluster_info
def hierarchical_clustering(self, texts, threshold=0.5):
"""层次聚类"""
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
embeddings = self.extract_embeddings(texts)
# 计算链接矩阵
linkage_matrix = linkage(embeddings, method='ward')
# 根据阈值切分聚类
clusters = fcluster(linkage_matrix, threshold, criterion='distance')
return clusters, linkage_matrix
def dbscan_clustering(self, texts, eps=0.5, min_samples=5):
"""DBSCAN密度聚类"""
from sklearn.cluster import DBSCAN
embeddings = self.extract_embeddings(texts)
# 执行DBSCAN
dbscan = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine')
clusters = dbscan.fit_predict(embeddings)
# 统计结果
n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
n_noise = list(clusters).count(-1)
print(f"发现 {n_clusters} 个聚类")
print(f"噪声点: {n_noise}")
return clusters
# 使用示例
texts = [
"机器学习是人工智能的一个分支",
"深度学习使用神经网络",
"今天天气很好",
"明天会下雨",
"Python是一种编程语言",
"Java也是编程语言"
]
clusterer = TextClustering(encoder_model, n_clusters=3)
clusters, info = clusterer.kmeans_clustering(texts)
print(f"聚类结果: {clusters}")
class DimensionalityReduction:
"""降维技术实现"""
def __init__(self, method='pca'):
self.method = method
def pca_reduction(self, embeddings, n_components=2):
"""主成分分析(PCA)"""
from sklearn.decomposition import PCA
pca = PCA(n_components=n_components)
reduced = pca.fit_transform(embeddings)
# 解释方差比
explained_variance = pca.explained_variance_ratio_
print(f"解释方差比: {explained_variance}")
return reduced, pca
def tsne_reduction(self, embeddings, n_components=2, perplexity=30):
"""t-SNE降维"""
from sklearn.manifold import TSNE
tsne = TSNE(
n_components=n_components,
perplexity=perplexity,
random_state=42,
n_iter=1000
)
reduced = tsne.fit_transform(embeddings)
return reduced
def umap_reduction(self, embeddings, n_components=2, n_neighbors=15):
"""UMAP降维"""
import umap
reducer = umap.UMAP(
n_components=n_components,
n_neighbors=n_neighbors,
min_dist=0.1,
metric='cosine'
)
reduced = reducer.fit_transform(embeddings)
return reduced, reducer
def autoencoder_reduction(self, embeddings, encoding_dim=32):
"""自编码器降维"""
input_dim = embeddings.shape[1]
class Autoencoder(nn.Module):
def __init__(self, input_dim, encoding_dim):
super().__init__()
# 编码器
self.encoder = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, encoding_dim)
)
# 解码器
self.decoder = nn.Sequential(
nn.Linear(encoding_dim, 64),
nn.ReLU(),
nn.Linear(64, 128),
nn.ReLU(),
nn.Linear(128, input_dim)
)
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded, encoded
# 训练自编码器
model = Autoencoder(input_dim, encoding_dim)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
data_tensor = torch.FloatTensor(embeddings)
for epoch in range(100):
decoded, encoded = model(data_tensor)
loss = criterion(decoded, data_tensor)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if epoch % 20 == 0:
print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
# 获取降维结果
with torch.no_grad():
_, reduced = model(data_tensor)
return reduced.numpy(), model
# 可视化降维结果
def visualize_reduction(reduced_embeddings, labels=None):
"""可视化降维结果"""
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 8))
if labels is not None:
unique_labels = np.unique(labels)
colors = plt.cm.rainbow(np.linspace(0, 1, len(unique_labels)))
for label, color in zip(unique_labels, colors):
mask = labels == label
plt.scatter(
reduced_embeddings[mask, 0],
reduced_embeddings[mask, 1],
c=[color],
label=f'Cluster {label}',
alpha=0.6
)
plt.legend()
else:
plt.scatter(
reduced_embeddings[:, 0],
reduced_embeddings[:, 1],
alpha=0.6
)
plt.xlabel('Component 1')
plt.ylabel('Component 2')
plt.title('降维可视化')
plt.grid(True, alpha=0.3)
plt.show()
class VariationalAutoencoder(nn.Module):
"""变分自编码器(VAE)"""
def __init__(self, input_dim, hidden_dim=256, latent_dim=32):
super().__init__()
# 编码器
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU()
)
# 潜在空间参数
self.fc_mu = nn.Linear(hidden_dim // 2, latent_dim)
self.fc_var = nn.Linear(hidden_dim // 2, latent_dim)
# 解码器
self.decoder = nn.Sequential(
nn.Linear(latent_dim, hidden_dim // 2),
nn.ReLU(),
nn.Linear(hidden_dim // 2, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, input_dim)
)
def encode(self, x):
"""编码到潜在空间"""
h = self.encoder(x)
mu = self.fc_mu(h)
log_var = self.fc_var(h)
return mu, log_var
def reparameterize(self, mu, log_var):
"""重参数化技巧"""
std = torch.exp(0.5 * log_var)
eps = torch.randn_like(std)
return mu + eps * std
def decode(self, z):
"""从潜在空间解码"""
return self.decoder(z)
def forward(self, x):
mu, log_var = self.encode(x)
z = self.reparameterize(mu, log_var)
reconstructed = self.decode(z)
return reconstructed, mu, log_var
def vae_loss(reconstructed, original, mu, log_var):
"""VAE损失函数"""
# 重构损失
recon_loss = F.mse_loss(reconstructed, original, reduction='sum')
# KL散度
kl_divergence = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
return recon_loss + kl_divergence
# 训练VAE
def train_vae(model, data_loader, num_epochs=50):
"""训练变分自编码器"""
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(num_epochs):
total_loss = 0
for batch in data_loader:
# 前向传播
reconstructed, mu, log_var = model(batch)
loss = vae_loss(reconstructed, batch, mu, log_var)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(data_loader)
if epoch % 10 == 0:
print(f"Epoch {epoch}, Average Loss: {avg_loss:.4f}")
return model
class SelfOrganizingClusteringHeader(nn.Module):
"""自组织聚类头(2024年新方法)"""
def __init__(self, feature_dim, n_clusters, temperature=0.1):
super().__init__()
self.n_clusters = n_clusters
self.temperature = temperature
# 自组织层作为聚类中心
self.cluster_centers = nn.Parameter(
torch.randn(n_clusters, feature_dim)
)
# 初始化聚类中心
nn.init.xavier_normal_(self.cluster_centers)
def forward(self, features):
"""计算软聚类分配"""
# 计算特征与聚类中心的相似度
similarities = torch.matmul(features, self.cluster_centers.T)
# 转换为概率(软分配)
soft_assignments = F.softmax(similarities / self.temperature, dim=1)
# 转换为硬分配
hard_assignments = torch.argmax(soft_assignments, dim=1)
return soft_assignments, hard_assignments
def update_centers(self, features, assignments):
"""更新聚类中心"""
new_centers = []
for i in range(self.n_clusters):
mask = assignments == i
if mask.sum() > 0:
cluster_features = features[mask]
new_center = cluster_features.mean(dim=0)
new_centers.append(new_center)
else:
# 保持原中心
new_centers.append(self.cluster_centers[i])
self.cluster_centers.data = torch.stack(new_centers)
class DeepClusteringNetwork(nn.Module):
"""深度聚类网络"""
def __init__(self, input_dim, hidden_dims, n_clusters):
super().__init__()
# 特征提取器
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(0.2)
])
prev_dim = hidden_dim
self.feature_extractor = nn.Sequential(*layers)
# 聚类头
self.clustering_head = SelfOrganizingClusteringHeader(
prev_dim, n_clusters
)
# 辅助任务头(重构)
self.reconstruction_head = nn.Sequential(
nn.Linear(prev_dim, hidden_dims[-1]),
nn.ReLU(),
nn.Linear(hidden_dims[-1], input_dim)
)
def forward(self, x):
# 提取特征
features = self.feature_extractor(x)
# 聚类
soft_assignments, hard_assignments = self.clustering_head(features)
# 重构
reconstructed = self.reconstruction_head(features)
return {
'features': features,
'soft_assignments': soft_assignments,
'hard_assignments': hard_assignments,
'reconstructed': reconstructed
}
def cluster_loss(self, soft_assignments):
"""聚类损失:最大化分配熵"""
# 计算边际分布
marginal = soft_assignments.mean(dim=0)
# 最大化熵(鼓励均匀分布)
entropy = -torch.sum(marginal * torch.log(marginal + 1e-8))
# 最小化条件熵(鼓励确定分配)
conditional_entropy = -torch.mean(
torch.sum(soft_assignments * torch.log(soft_assignments + 1e-8), dim=1)
)
return -entropy + conditional_entropy
class ContrastiveLearning:
"""对比学习实现"""
def __init__(self, encoder, projection_dim=128, temperature=0.07):
self.encoder = encoder
self.temperature = temperature
# 投影头
self.projection_head = nn.Sequential(
nn.Linear(encoder.output_dim, projection_dim),
nn.ReLU(),
nn.Linear(projection_dim, projection_dim)
)
def simclr_loss(self, z1, z2):
"""SimCLR对比损失"""
batch_size = z1.shape[0]
# 归一化
z1 = F.normalize(z1, dim=1)
z2 = F.normalize(z2, dim=1)
# 拼接正负样本
representations = torch.cat([z1, z2], dim=0)
# 计算相似度矩阵
similarity_matrix = torch.matmul(representations, representations.T)
# 创建标签
labels = torch.cat([torch.arange(batch_size), torch.arange(batch_size)])
labels = (labels.unsqueeze(0) == labels.unsqueeze(1)).float()
# 掩码:移除对角线
mask = torch.eye(labels.shape[0], dtype=torch.bool)
labels = labels[~mask].view(labels.shape[0], -1)
similarity_matrix = similarity_matrix[~mask].view(
similarity_matrix.shape[0], -1
)
# 计算损失
positives = similarity_matrix[labels.bool()].view(labels.shape[0], -1)
negatives = similarity_matrix[~labels.bool()].view(
similarity_matrix.shape[0], -1
)
logits = torch.cat([positives, negatives], dim=1)
labels = torch.zeros(logits.shape[0], dtype=torch.long)
logits = logits / self.temperature
loss = F.cross_entropy(logits, labels)
return loss
def swav_clustering(self, features, n_prototypes=3000):
"""SwAV聚类方法(无需成对比较)"""
# 原型(聚类中心)
prototypes = nn.Parameter(torch.randn(n_prototypes, features.shape[1]))
# Sinkhorn-Knopp算法
def sinkhorn(scores, epsilon=0.05, n_iterations=3):
Q = torch.exp(scores / epsilon).T
for _ in range(n_iterations):
# 归一化行
sum_Q = torch.sum(Q, dim=1, keepdim=True)
Q /= sum_Q
# 归一化列
sum_Q = torch.sum(Q, dim=0, keepdim=True)
Q /= sum_Q
return Q.T
# 计算分配
scores = torch.matmul(features, prototypes.T)
assignments = sinkhorn(scores)
return assignments, prototypes
def byol_update(self, online_network, target_network, x, tau=0.99):
"""BYOL更新(无需负样本)"""
# 在线网络预测
online_proj = online_network(x)
online_pred = self.predictor(online_proj)
# 目标网络预测
with torch.no_grad():
target_proj = target_network(x)
# 损失:预测误差
loss = F.mse_loss(online_pred, target_proj)
# 更新目标网络(动量更新)
for online_params, target_params in zip(
online_network.parameters(),
target_network.parameters()
):
target_params.data = tau * target_params.data + \
(1 - tau) * online_params.data
return loss
class AnomalyDetector:
"""异常检测器"""
def __init__(self, contamination=0.1):
self.contamination = contamination
def isolation_forest(self, data):
"""孤立森林"""
from sklearn.ensemble import IsolationForest
detector = IsolationForest(
contamination=self.contamination,
random_state=42
)
predictions = detector.fit_predict(data)
anomaly_scores = detector.score_samples(data)
return predictions, anomaly_scores
def local_outlier_factor(self, data, n_neighbors=20):
"""局部异常因子(LOF)"""
from sklearn.neighbors import LocalOutlierFactor
lof = LocalOutlierFactor(
n_neighbors=n_neighbors,
contamination=self.contamination
)
predictions = lof.fit_predict(data)
anomaly_scores = lof.negative_outlier_factor_
return predictions, anomaly_scores
def autoencoder_anomaly(self, model, data, threshold=None):
"""基于自编码器的异常检测"""
model.eval()
with torch.no_grad():
# 重构数据
data_tensor = torch.FloatTensor(data)
reconstructed = model(data_tensor)
# 计算重构误差
mse = F.mse_loss(reconstructed, data_tensor, reduction='none')
reconstruction_errors = mse.mean(dim=1).numpy()
# 确定阈值
if threshold is None:
threshold = np.percentile(
reconstruction_errors,
(1 - self.contamination) * 100
)
# 标记异常
anomalies = reconstruction_errors > threshold
return anomalies, reconstruction_errors
def one_class_svm(self, data):
"""单类SVM"""
from sklearn.svm import OneClassSVM
detector = OneClassSVM(
nu=self.contamination,
kernel='rbf',
gamma='auto'
)
detector.fit(data)
predictions = detector.predict(data)
decision_scores = detector.decision_function(data)
return predictions, decision_scores
class TopicModeling:
"""主题建模实现"""
def __init__(self, n_topics=10):
self.n_topics = n_topics
def lda_modeling(self, documents):
"""潜在狄利克雷分配(LDA)"""
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation
# 向量化文档
vectorizer = CountVectorizer(
max_features=1000,
stop_words='english'
)
doc_term_matrix = vectorizer.fit_transform(documents)
# LDA模型
lda = LatentDirichletAllocation(
n_components=self.n_topics,
random_state=42,
learning_method='batch'
)
# 训练模型
doc_topics = lda.fit_transform(doc_term_matrix)
# 提取主题词
feature_names = vectorizer.get_feature_names_out()
topics = []
for topic_idx, topic in enumerate(lda.components_):
top_indices = topic.argsort()[-10:][::-1]
top_words = [feature_names[i] for i in top_indices]
topics.append({
'topic_id': topic_idx,
'words': top_words,
'weights': topic[top_indices].tolist()
})
return doc_topics, topics
def neural_topic_model(self, embeddings, hidden_dim=100):
"""神经主题模型"""
class NeuralTopicModel(nn.Module):
def __init__(self, vocab_size, n_topics, hidden_dim):
super().__init__()
# 编码器
self.encoder = nn.Sequential(
nn.Linear(vocab_size, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
# 主题层
self.topic_layer = nn.Linear(hidden_dim, n_topics)
# 解码器
self.decoder = nn.Linear(n_topics, vocab_size)
# Softmax激活
self.softmax = nn.Softmax(dim=1)
def forward(self, x):
# 编码
hidden = self.encoder(x)
# 主题分布
topic_dist = self.softmax(self.topic_layer(hidden))
# 重构
reconstructed = self.decoder(topic_dist)
return reconstructed, topic_dist
vocab_size = embeddings.shape[1]
model = NeuralTopicModel(vocab_size, self.n_topics, hidden_dim)
# 训练
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
embeddings_tensor = torch.FloatTensor(embeddings)
for epoch in range(100):
reconstructed, topic_dist = model(embeddings_tensor)
loss = criterion(reconstructed, embeddings_tensor)
# 添加稀疏性约束
sparsity_loss = torch.mean(topic_dist * torch.log(topic_dist + 1e-8))
total_loss = loss + 0.1 * sparsity_loss
optimizer.zero_grad()
total_loss.backward()
optimizer.step()
return model, topic_dist.detach().numpy()
class RepresentationLearning:
"""表示学习应用"""
def __init__(self, encoder_model):
self.encoder = encoder_model
def semantic_similarity(self, text1, text2):
"""语义相似度计算"""
# 编码文本
embed1 = self.encoder.encode(text1)
embed2 = self.encoder.encode(text2)
# 余弦相似度
similarity = F.cosine_similarity(embed1, embed2, dim=0)
return similarity.item()
def document_retrieval(self, query, documents, top_k=5):
"""文档检索"""
# 编码查询
query_embed = self.encoder.encode(query)
# 编码文档
doc_embeds = []
for doc in documents:
doc_embed = self.encoder.encode(doc)
doc_embeds.append(doc_embed)
doc_embeds = torch.stack(doc_embeds)
# 计算相似度
similarities = F.cosine_similarity(
query_embed.unsqueeze(0),
doc_embeds,
dim=1
)
# 获取top-k
top_scores, top_indices = torch.topk(similarities, top_k)
results = []
for idx, score in zip(top_indices, top_scores):
results.append({
'document': documents[idx],
'score': score.item()
})
return results
def zero_shot_classification(self, text, labels):
"""零样本分类(基于表示学习)"""
# 编码文本
text_embed = self.encoder.encode(text)
# 编码标签
label_embeds = []
for label in labels:
# 将标签转换为描述
label_desc = f"This text is about {label}"
label_embed = self.encoder.encode(label_desc)
label_embeds.append(label_embed)
label_embeds = torch.stack(label_embeds)
# 计算相似度
similarities = F.cosine_similarity(
text_embed.unsqueeze(0),
label_embeds,
dim=1
)
# 获取最相似的标签
best_idx = torch.argmax(similarities)
return labels[best_idx], similarities[best_idx].item()
class UnsupervisedDataAugmentation:
"""无监督数据增强"""
def __init__(self, model):
self.model = model
def back_translation(self, text, intermediate_lang='en'):
"""回译增强"""
# 翻译到中间语言
translated = self.model.translate(text, target_lang=intermediate_lang)
# 翻译回原语言
back_translated = self.model.translate(
translated,
target_lang='zh'
)
return back_translated
def paraphrase_generation(self, text, num_paraphrases=3):
"""释义生成"""
paraphrases = []
for _ in range(num_paraphrases):
prompt = f"请改写以下句子,保持原意:\n{text}\n改写:"
paraphrase = self.model.generate(prompt, temperature=0.8)
paraphrases.append(paraphrase)
return paraphrases
def contextual_augmentation(self, text, mask_ratio=0.15):
"""上下文增强(类似BERT的MLM)"""
words = text.split()
n_mask = int(len(words) * mask_ratio)
# 随机选择要掩码的位置
mask_positions = np.random.choice(
len(words),
n_mask,
replace=False
)
augmented_texts = []
for pos in mask_positions:
masked_words = words.copy()
original_word = masked_words[pos]
masked_words[pos] = '[MASK]'
masked_text = ' '.join(masked_words)
# 预测掩码位置的词
predicted_word = self.model.predict_mask(masked_text)
if predicted_word != original_word:
masked_words[pos] = predicted_word
augmented_text = ' '.join(masked_words)
augmented_texts.append(augmented_text)
return augmented_texts
def mixup_augmentation(self, embeddings, alpha=0.2):
"""Mixup数据增强"""
batch_size = embeddings.shape[0]
# 生成混合系数
lam = np.random.beta(alpha, alpha, batch_size)
lam = np.maximum(lam, 1 - lam)
# 随机排列索引
index = torch.randperm(batch_size)
# 混合嵌入
mixed_embeddings = lam.reshape(-1, 1) * embeddings + \
(1 - lam.reshape(-1, 1)) * embeddings[index]
return mixed_embeddings, lam, index
class ClusteringEvaluator:
"""聚类评估器"""
def __init__(self):
pass
def silhouette_score(self, data, labels):
"""轮廓系数"""
from sklearn.metrics import silhouette_score, silhouette_samples
# 整体轮廓系数
overall_score = silhouette_score(data, labels)
# 每个样本的轮廓系数
sample_scores = silhouette_samples(data, labels)
# 每个聚类的平均轮廓系数
cluster_scores = {}
for label in np.unique(labels):
mask = labels == label
cluster_scores[label] = sample_scores[mask].mean()
return {
'overall': overall_score,
'by_cluster': cluster_scores,
'samples': sample_scores
}
def davies_bouldin_score(self, data, labels):
"""Davies-Bouldin指数(越小越好)"""
from sklearn.metrics import davies_bouldin_score
score = davies_bouldin_score(data, labels)
return score
def calinski_harabasz_score(self, data, labels):
"""Calinski-Harabasz指数(越大越好)"""
from sklearn.metrics import calinski_harabasz_score
score = calinski_harabasz_score(data, labels)
return score
def evaluate_with_ground_truth(self, predicted_labels, true_labels):
"""有真实标签时的评估"""
from sklearn.metrics import (
adjusted_rand_score,
normalized_mutual_info_score,
homogeneity_score,
completeness_score,
v_measure_score
)
return {
'adjusted_rand_index': adjusted_rand_score(true_labels, predicted_labels),
'normalized_mutual_info': normalized_mutual_info_score(
true_labels, predicted_labels
),
'homogeneity': homogeneity_score(true_labels, predicted_labels),
'completeness': completeness_score(true_labels, predicted_labels),
'v_measure': v_measure_score(true_labels, predicted_labels)
}
def optimal_clusters_elbow(self, data, max_k=10):
"""肘部法则确定最优聚类数"""
from sklearn.cluster import KMeans
inertias = []
K_range = range(2, max_k + 1)
for k in K_range:
kmeans = KMeans(n_clusters=k, random_state=42)
kmeans.fit(data)
inertias.append(kmeans.inertia_)
# 计算肘部点
deltas = np.diff(inertias)
delta_deltas = np.diff(deltas)
# 找到变化率最大的点
elbow_point = np.argmax(delta_deltas) + 2 # +2因为从k=2开始
return {
'k_values': list(K_range),
'inertias': inertias,
'optimal_k': elbow_point
}
class RepresentationQualityEvaluator:
"""表示质量评估"""
def __init__(self):
pass
def intrinsic_dimension(self, embeddings):
"""估计内在维度"""
from sklearn.decomposition import PCA
pca = PCA()
pca.fit(embeddings)
# 累积解释方差
cumsum_var = np.cumsum(pca.explained_variance_ratio_)
# 找到解释90%方差所需的维度
n_components_90 = np.argmax(cumsum_var >= 0.9) + 1
# 找到解释95%方差所需的维度
n_components_95 = np.argmax(cumsum_var >= 0.95) + 1
return {
'n_components_90': n_components_90,
'n_components_95': n_components_95,
'explained_variance_ratio': pca.explained_variance_ratio_,
'cumulative_variance': cumsum_var
}
def neighborhood_preservation(self, original_data, reduced_data, k=10):
"""邻域保持度评估"""
from sklearn.neighbors import NearestNeighbors
# 原始空间的k近邻
nn_original = NearestNeighbors(n_neighbors=k+1)
nn_original.fit(original_data)
neighbors_original = nn_original.kneighbors(
original_data,
return_distance=False
)[:, 1:] # 排除自己
# 降维空间的k近邻
nn_reduced = NearestNeighbors(n_neighbors=k+1)
nn_reduced.fit(reduced_data)
neighbors_reduced = nn_reduced.kneighbors(
reduced_data,
return_distance=False
)[:, 1:]
# 计算保持度
preservation_scores = []
for i in range(len(original_data)):
# 计算交集
intersection = np.intersect1d(
neighbors_original[i],
neighbors_reduced[i]
)
preservation = len(intersection) / k
preservation_scores.append(preservation)
return {
'mean_preservation': np.mean(preservation_scores),
'std_preservation': np.std(preservation_scores),
'scores': preservation_scores
}
def reconstruction_quality(self, model, test_data):
"""重构质量评估(用于自编码器)"""
model.eval()
with torch.no_grad():
test_tensor = torch.FloatTensor(test_data)
# 如果是VAE
if hasattr(model, 'encode'):
mu, log_var = model.encode(test_tensor)
z = model.reparameterize(mu, log_var)
reconstructed = model.decode(z)
else:
reconstructed = model(test_tensor)
# 计算重构误差
mse = F.mse_loss(reconstructed, test_tensor).item()
# 计算PSNR(峰值信噪比)
max_val = test_tensor.max().item()
psnr = 20 * np.log10(max_val) - 10 * np.log10(mse)
# 计算结构相似性(如果是图像)
if len(test_tensor.shape) == 4: # 图像数据
from skimage.metrics import structural_similarity as ssim
original_np = test_tensor.numpy()
reconstructed_np = reconstructed.numpy()
ssim_scores = []
for i in range(len(original_np)):
score = ssim(
original_np[i],
reconstructed_np[i],
multichannel=True
)
ssim_scores.append(score)
mean_ssim = np.mean(ssim_scores)
else:
mean_ssim = None
return {
'mse': mse,
'psnr': psnr,
'ssim': mean_ssim
}
class HybridUnsupervisedLearning:
"""混合无监督学习方法"""
def __init__(self):
pass
def masked_autoencoding(self, model, data, mask_ratio=0.75):
"""掩码自编码(MAE)"""
# 随机掩码
batch_size, seq_len, feature_dim = data.shape
len_keep = int(seq_len * (1 - mask_ratio))
noise = torch.rand(batch_size, seq_len)
ids_shuffle = torch.argsort(noise, dim=1)
ids_restore = torch.argsort(ids_shuffle, dim=1)
# 保留的token
ids_keep = ids_shuffle[:, :len_keep]
x_masked = torch.gather(
data,
dim=1,
index=ids_keep.unsqueeze(-1).repeat(1, 1, feature_dim)
)
# 生成掩码token
mask_token = nn.Parameter(torch.zeros(1, 1, feature_dim))
mask_tokens = mask_token.repeat(batch_size, seq_len - len_keep, 1)
# 重构
x_with_mask = torch.cat([x_masked, mask_tokens], dim=1)
x_with_mask = torch.gather(
x_with_mask,
dim=1,
index=ids_restore.unsqueeze(-1).repeat(1, 1, feature_dim)
)
# 编码和解码
reconstructed = model(x_with_mask)
# 只计算掩码位置的损失
mask = torch.ones_like(data)
mask[:, :len_keep, :] = 0
mask = torch.gather(
mask,
dim=1,
index=ids_restore.unsqueeze(-1).repeat(1, 1, feature_dim)
)
loss = (reconstructed - data) ** 2
loss = (loss * mask).sum() / mask.sum()
return loss
def momentum_contrast(self, query_encoder, key_encoder,
queries, keys, queue, temperature=0.07):
"""动量对比(MoCo)"""
# Query特征
q = query_encoder(queries)
q = F.normalize(q, dim=1)
# Key特征(不计算梯度)
with torch.no_grad():
k = key_encoder(keys)
k = F.normalize(k, dim=1)
# 正样本
l_pos = torch.einsum('nc,nc->n', [q, k]).unsqueeze(-1)
# 负样本(从队列中)
l_neg = torch.einsum('nc,ck->nk', [q, queue.T])
# 对比损失
logits = torch.cat([l_pos, l_neg], dim=1)
logits /= temperature
labels = torch.zeros(logits.shape[0], dtype=torch.long)
loss = F.cross_entropy(logits, labels)
# 更新队列
queue = torch.cat([k.T, queue[:, :-k.shape[0]]], dim=1)
return loss, queue
class GenerativeUnsupervisedLearning:
"""生成式无监督学习"""
def __init__(self):
pass
def diffusion_model(self, model, data, timesteps=1000):
"""扩散模型"""
# 前向扩散过程
def forward_diffusion(x_0, t, noise=None):
if noise is None:
noise = torch.randn_like(x_0)
# 计算噪声调度
alpha_t = self.get_alpha_schedule(t, timesteps)
alpha_bar_t = torch.cumprod(alpha_t, dim=0)
# 添加噪声
sqrt_alpha_bar = torch.sqrt(alpha_bar_t)
sqrt_one_minus_alpha_bar = torch.sqrt(1 - alpha_bar_t)
x_t = sqrt_alpha_bar * x_0 + sqrt_one_minus_alpha_bar * noise
return x_t, noise
# 反向去噪过程
def reverse_diffusion(model, x_t, t):
# 预测噪声
predicted_noise = model(x_t, t)
# 去噪步骤
alpha_t = self.get_alpha_schedule(t, timesteps)
alpha_bar_t = torch.cumprod(alpha_t, dim=0)
# 计算均值
beta_t = 1 - alpha_t
sqrt_recip_alpha_t = 1 / torch.sqrt(alpha_t)
mean = sqrt_recip_alpha_t * (
x_t - beta_t / torch.sqrt(1 - alpha_bar_t) * predicted_noise
)
# 添加噪声(除了最后一步)
if t > 1:
variance = beta_t
noise = torch.randn_like(x_t)
x_t_minus_1 = mean + torch.sqrt(variance) * noise
else:
x_t_minus_1 = mean
return x_t_minus_1
# 训练循环
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
for epoch in range(100):
# 随机时间步
t = torch.randint(1, timesteps, (data.shape[0],))
# 前向扩散
x_t, noise = forward_diffusion(data, t)
# 预测噪声
predicted_noise = model(x_t, t)
# 损失
loss = F.mse_loss(predicted_noise, noise)
optimizer.zero_grad()
loss.backward()
optimizer.step()
return model
def flow_matching(self, model, source_data, target_data):
"""流匹配(Flow Matching)"""
# 定义概率路径
def interpolate(x0, x1, t):
return (1 - t) * x0 + t * x1
# 计算条件向量场
def conditional_vector_field(x0, x1, xt, t):
return (x1 - x0) / (1 - t + 1e-8)
# 训练
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
for epoch in range(100):
# 随机时间
t = torch.rand(source_data.shape[0], 1)
# 插值
xt = interpolate(source_data, target_data, t)
# 真实向量场
true_vt = conditional_vector_field(source_data, target_data, xt, t)
# 预测向量场
pred_vt = model(xt, t)
# 损失
loss = F.mse_loss(pred_vt, true_vt)
optimizer.zero_grad()
loss.backward()
optimizer.step()
return model