概念定义

无监督学习(Unsupervised Learning)是一种机器学习范式,在没有标签数据的情况下,通过发现数据中的内在结构、模式和关系来学习有用的表示。在大语言模型中,无监督学习是预训练阶段的核心方法。

详细解释

什么是无监督学习?

无监督学习让模型自主发现数据中的隐藏模式,无需人工标注。它通过分析数据的统计特性、分布规律和内在结构,学习到有意义的表示和知识。 核心特征
  • 无需标签:直接从原始数据学习
  • 模式发现:自动识别数据结构
  • 表示学习:学习数据的有效表示
  • 可扩展性:能够利用海量无标签数据
主要任务类型
  • 聚类:将相似数据分组
  • 降维:减少数据维度,保留关键信息
  • 密度估计:学习数据分布
  • 异常检测:识别异常模式
  • 生成建模:学习生成新数据
形象比喻无监督学习就像一个探索者在未知领域的探索:监督学习:有导游带领,告诉你什么是什么(有标签) 无监督学习:独自探索,自己发现规律和分类(无标签) 自监督学习:自己制作地图,边探索边学习(自创标签)就像观察星空,无监督学习自动将星星分组成星座,发现它们的排列规律。

在大语言模型中的应用

大语言模型的预训练本质上是一种无监督学习:

技术原理

聚类算法

import numpy as np
import torch
import torch.nn as nn
from sklearn.cluster import KMeans
from torch.nn import functional as F

class TextClustering:
    """文本聚类实现"""
    
    def __init__(self, encoder_model, n_clusters=10):
        self.encoder = encoder_model
        self.n_clusters = n_clusters
        
    def extract_embeddings(self, texts):
        """提取文本嵌入"""
        embeddings = []
        
        self.encoder.eval()
        with torch.no_grad():
            for text in texts:
                # 编码文本
                inputs = self.encoder.tokenize(text)
                embedding = self.encoder.encode(inputs)
                embeddings.append(embedding.cpu().numpy())
        
        return np.array(embeddings)
    
    def kmeans_clustering(self, texts):
        """K-means聚类"""
        # 提取嵌入
        embeddings = self.extract_embeddings(texts)
        
        # 执行K-means
        kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
        clusters = kmeans.fit_predict(embeddings)
        
        # 计算聚类中心
        centers = kmeans.cluster_centers_
        
        # 分析聚类结果
        cluster_info = {}
        for i in range(self.n_clusters):
            cluster_texts = [texts[j] for j in range(len(texts)) if clusters[j] == i]
            cluster_info[i] = {
                'size': len(cluster_texts),
                'samples': cluster_texts[:5],  # 前5个样本
                'center': centers[i]
            }
        
        return clusters, cluster_info
    
    def hierarchical_clustering(self, texts, threshold=0.5):
        """层次聚类"""
        from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
        
        embeddings = self.extract_embeddings(texts)
        
        # 计算链接矩阵
        linkage_matrix = linkage(embeddings, method='ward')
        
        # 根据阈值切分聚类
        clusters = fcluster(linkage_matrix, threshold, criterion='distance')
        
        return clusters, linkage_matrix
    
    def dbscan_clustering(self, texts, eps=0.5, min_samples=5):
        """DBSCAN密度聚类"""
        from sklearn.cluster import DBSCAN
        
        embeddings = self.extract_embeddings(texts)
        
        # 执行DBSCAN
        dbscan = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine')
        clusters = dbscan.fit_predict(embeddings)
        
        # 统计结果
        n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
        n_noise = list(clusters).count(-1)
        
        print(f"发现 {n_clusters} 个聚类")
        print(f"噪声点: {n_noise}")
        
        return clusters

# 使用示例
texts = [
    "机器学习是人工智能的一个分支",
    "深度学习使用神经网络",
    "今天天气很好",
    "明天会下雨",
    "Python是一种编程语言",
    "Java也是编程语言"
]

clusterer = TextClustering(encoder_model, n_clusters=3)
clusters, info = clusterer.kmeans_clustering(texts)
print(f"聚类结果: {clusters}")

降维技术

class DimensionalityReduction:
    """降维技术实现"""
    
    def __init__(self, method='pca'):
        self.method = method
        
    def pca_reduction(self, embeddings, n_components=2):
        """主成分分析(PCA)"""
        from sklearn.decomposition import PCA
        
        pca = PCA(n_components=n_components)
        reduced = pca.fit_transform(embeddings)
        
        # 解释方差比
        explained_variance = pca.explained_variance_ratio_
        print(f"解释方差比: {explained_variance}")
        
        return reduced, pca
    
    def tsne_reduction(self, embeddings, n_components=2, perplexity=30):
        """t-SNE降维"""
        from sklearn.manifold import TSNE
        
        tsne = TSNE(
            n_components=n_components,
            perplexity=perplexity,
            random_state=42,
            n_iter=1000
        )
        
        reduced = tsne.fit_transform(embeddings)
        return reduced
    
    def umap_reduction(self, embeddings, n_components=2, n_neighbors=15):
        """UMAP降维"""
        import umap
        
        reducer = umap.UMAP(
            n_components=n_components,
            n_neighbors=n_neighbors,
            min_dist=0.1,
            metric='cosine'
        )
        
        reduced = reducer.fit_transform(embeddings)
        return reduced, reducer
    
    def autoencoder_reduction(self, embeddings, encoding_dim=32):
        """自编码器降维"""
        input_dim = embeddings.shape[1]
        
        class Autoencoder(nn.Module):
            def __init__(self, input_dim, encoding_dim):
                super().__init__()
                # 编码器
                self.encoder = nn.Sequential(
                    nn.Linear(input_dim, 128),
                    nn.ReLU(),
                    nn.Linear(128, 64),
                    nn.ReLU(),
                    nn.Linear(64, encoding_dim)
                )
                # 解码器
                self.decoder = nn.Sequential(
                    nn.Linear(encoding_dim, 64),
                    nn.ReLU(),
                    nn.Linear(64, 128),
                    nn.ReLU(),
                    nn.Linear(128, input_dim)
                )
            
            def forward(self, x):
                encoded = self.encoder(x)
                decoded = self.decoder(encoded)
                return decoded, encoded
        
        # 训练自编码器
        model = Autoencoder(input_dim, encoding_dim)
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.MSELoss()
        
        data_tensor = torch.FloatTensor(embeddings)
        
        for epoch in range(100):
            decoded, encoded = model(data_tensor)
            loss = criterion(decoded, data_tensor)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            if epoch % 20 == 0:
                print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
        
        # 获取降维结果
        with torch.no_grad():
            _, reduced = model(data_tensor)
        
        return reduced.numpy(), model

# 可视化降维结果
def visualize_reduction(reduced_embeddings, labels=None):
    """可视化降维结果"""
    import matplotlib.pyplot as plt
    
    plt.figure(figsize=(10, 8))
    
    if labels is not None:
        unique_labels = np.unique(labels)
        colors = plt.cm.rainbow(np.linspace(0, 1, len(unique_labels)))
        
        for label, color in zip(unique_labels, colors):
            mask = labels == label
            plt.scatter(
                reduced_embeddings[mask, 0],
                reduced_embeddings[mask, 1],
                c=[color],
                label=f'Cluster {label}',
                alpha=0.6
            )
        plt.legend()
    else:
        plt.scatter(
            reduced_embeddings[:, 0],
            reduced_embeddings[:, 1],
            alpha=0.6
        )
    
    plt.xlabel('Component 1')
    plt.ylabel('Component 2')
    plt.title('降维可视化')
    plt.grid(True, alpha=0.3)
    plt.show()

自编码器与表示学习

class VariationalAutoencoder(nn.Module):
    """变分自编码器(VAE)"""
    
    def __init__(self, input_dim, hidden_dim=256, latent_dim=32):
        super().__init__()
        
        # 编码器
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU()
        )
        
        # 潜在空间参数
        self.fc_mu = nn.Linear(hidden_dim // 2, latent_dim)
        self.fc_var = nn.Linear(hidden_dim // 2, latent_dim)
        
        # 解码器
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )
    
    def encode(self, x):
        """编码到潜在空间"""
        h = self.encoder(x)
        mu = self.fc_mu(h)
        log_var = self.fc_var(h)
        return mu, log_var
    
    def reparameterize(self, mu, log_var):
        """重参数化技巧"""
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decode(self, z):
        """从潜在空间解码"""
        return self.decoder(z)
    
    def forward(self, x):
        mu, log_var = self.encode(x)
        z = self.reparameterize(mu, log_var)
        reconstructed = self.decode(z)
        return reconstructed, mu, log_var

def vae_loss(reconstructed, original, mu, log_var):
    """VAE损失函数"""
    # 重构损失
    recon_loss = F.mse_loss(reconstructed, original, reduction='sum')
    
    # KL散度
    kl_divergence = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
    
    return recon_loss + kl_divergence

# 训练VAE
def train_vae(model, data_loader, num_epochs=50):
    """训练变分自编码器"""
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    for epoch in range(num_epochs):
        total_loss = 0
        
        for batch in data_loader:
            # 前向传播
            reconstructed, mu, log_var = model(batch)
            loss = vae_loss(reconstructed, batch, mu, log_var)
            
            # 反向传播
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        avg_loss = total_loss / len(data_loader)
        if epoch % 10 == 0:
            print(f"Epoch {epoch}, Average Loss: {avg_loss:.4f}")
    
    return model

2024年最新技术

自组织聚类网络(SOCH)

class SelfOrganizingClusteringHeader(nn.Module):
    """自组织聚类头(2024年新方法)"""
    
    def __init__(self, feature_dim, n_clusters, temperature=0.1):
        super().__init__()
        self.n_clusters = n_clusters
        self.temperature = temperature
        
        # 自组织层作为聚类中心
        self.cluster_centers = nn.Parameter(
            torch.randn(n_clusters, feature_dim)
        )
        
        # 初始化聚类中心
        nn.init.xavier_normal_(self.cluster_centers)
    
    def forward(self, features):
        """计算软聚类分配"""
        # 计算特征与聚类中心的相似度
        similarities = torch.matmul(features, self.cluster_centers.T)
        
        # 转换为概率(软分配)
        soft_assignments = F.softmax(similarities / self.temperature, dim=1)
        
        # 转换为硬分配
        hard_assignments = torch.argmax(soft_assignments, dim=1)
        
        return soft_assignments, hard_assignments
    
    def update_centers(self, features, assignments):
        """更新聚类中心"""
        new_centers = []
        
        for i in range(self.n_clusters):
            mask = assignments == i
            if mask.sum() > 0:
                cluster_features = features[mask]
                new_center = cluster_features.mean(dim=0)
                new_centers.append(new_center)
            else:
                # 保持原中心
                new_centers.append(self.cluster_centers[i])
        
        self.cluster_centers.data = torch.stack(new_centers)

class DeepClusteringNetwork(nn.Module):
    """深度聚类网络"""
    
    def __init__(self, input_dim, hidden_dims, n_clusters):
        super().__init__()
        
        # 特征提取器
        layers = []
        prev_dim = input_dim
        
        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.BatchNorm1d(hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2)
            ])
            prev_dim = hidden_dim
        
        self.feature_extractor = nn.Sequential(*layers)
        
        # 聚类头
        self.clustering_head = SelfOrganizingClusteringHeader(
            prev_dim, n_clusters
        )
        
        # 辅助任务头(重构)
        self.reconstruction_head = nn.Sequential(
            nn.Linear(prev_dim, hidden_dims[-1]),
            nn.ReLU(),
            nn.Linear(hidden_dims[-1], input_dim)
        )
    
    def forward(self, x):
        # 提取特征
        features = self.feature_extractor(x)
        
        # 聚类
        soft_assignments, hard_assignments = self.clustering_head(features)
        
        # 重构
        reconstructed = self.reconstruction_head(features)
        
        return {
            'features': features,
            'soft_assignments': soft_assignments,
            'hard_assignments': hard_assignments,
            'reconstructed': reconstructed
        }
    
    def cluster_loss(self, soft_assignments):
        """聚类损失:最大化分配熵"""
        # 计算边际分布
        marginal = soft_assignments.mean(dim=0)
        
        # 最大化熵(鼓励均匀分布)
        entropy = -torch.sum(marginal * torch.log(marginal + 1e-8))
        
        # 最小化条件熵(鼓励确定分配)
        conditional_entropy = -torch.mean(
            torch.sum(soft_assignments * torch.log(soft_assignments + 1e-8), dim=1)
        )
        
        return -entropy + conditional_entropy

对比学习方法

class ContrastiveLearning:
    """对比学习实现"""
    
    def __init__(self, encoder, projection_dim=128, temperature=0.07):
        self.encoder = encoder
        self.temperature = temperature
        
        # 投影头
        self.projection_head = nn.Sequential(
            nn.Linear(encoder.output_dim, projection_dim),
            nn.ReLU(),
            nn.Linear(projection_dim, projection_dim)
        )
    
    def simclr_loss(self, z1, z2):
        """SimCLR对比损失"""
        batch_size = z1.shape[0]
        
        # 归一化
        z1 = F.normalize(z1, dim=1)
        z2 = F.normalize(z2, dim=1)
        
        # 拼接正负样本
        representations = torch.cat([z1, z2], dim=0)
        
        # 计算相似度矩阵
        similarity_matrix = torch.matmul(representations, representations.T)
        
        # 创建标签
        labels = torch.cat([torch.arange(batch_size), torch.arange(batch_size)])
        labels = (labels.unsqueeze(0) == labels.unsqueeze(1)).float()
        
        # 掩码:移除对角线
        mask = torch.eye(labels.shape[0], dtype=torch.bool)
        labels = labels[~mask].view(labels.shape[0], -1)
        similarity_matrix = similarity_matrix[~mask].view(
            similarity_matrix.shape[0], -1
        )
        
        # 计算损失
        positives = similarity_matrix[labels.bool()].view(labels.shape[0], -1)
        negatives = similarity_matrix[~labels.bool()].view(
            similarity_matrix.shape[0], -1
        )
        
        logits = torch.cat([positives, negatives], dim=1)
        labels = torch.zeros(logits.shape[0], dtype=torch.long)
        
        logits = logits / self.temperature
        loss = F.cross_entropy(logits, labels)
        
        return loss
    
    def swav_clustering(self, features, n_prototypes=3000):
        """SwAV聚类方法(无需成对比较)"""
        # 原型(聚类中心)
        prototypes = nn.Parameter(torch.randn(n_prototypes, features.shape[1]))
        
        # Sinkhorn-Knopp算法
        def sinkhorn(scores, epsilon=0.05, n_iterations=3):
            Q = torch.exp(scores / epsilon).T
            
            for _ in range(n_iterations):
                # 归一化行
                sum_Q = torch.sum(Q, dim=1, keepdim=True)
                Q /= sum_Q
                
                # 归一化列
                sum_Q = torch.sum(Q, dim=0, keepdim=True)
                Q /= sum_Q
            
            return Q.T
        
        # 计算分配
        scores = torch.matmul(features, prototypes.T)
        assignments = sinkhorn(scores)
        
        return assignments, prototypes
    
    def byol_update(self, online_network, target_network, x, tau=0.99):
        """BYOL更新(无需负样本)"""
        # 在线网络预测
        online_proj = online_network(x)
        online_pred = self.predictor(online_proj)
        
        # 目标网络预测
        with torch.no_grad():
            target_proj = target_network(x)
        
        # 损失:预测误差
        loss = F.mse_loss(online_pred, target_proj)
        
        # 更新目标网络(动量更新)
        for online_params, target_params in zip(
            online_network.parameters(),
            target_network.parameters()
        ):
            target_params.data = tau * target_params.data + \
                               (1 - tau) * online_params.data
        
        return loss

异常检测

class AnomalyDetector:
    """异常检测器"""
    
    def __init__(self, contamination=0.1):
        self.contamination = contamination
        
    def isolation_forest(self, data):
        """孤立森林"""
        from sklearn.ensemble import IsolationForest
        
        detector = IsolationForest(
            contamination=self.contamination,
            random_state=42
        )
        
        predictions = detector.fit_predict(data)
        anomaly_scores = detector.score_samples(data)
        
        return predictions, anomaly_scores
    
    def local_outlier_factor(self, data, n_neighbors=20):
        """局部异常因子(LOF)"""
        from sklearn.neighbors import LocalOutlierFactor
        
        lof = LocalOutlierFactor(
            n_neighbors=n_neighbors,
            contamination=self.contamination
        )
        
        predictions = lof.fit_predict(data)
        anomaly_scores = lof.negative_outlier_factor_
        
        return predictions, anomaly_scores
    
    def autoencoder_anomaly(self, model, data, threshold=None):
        """基于自编码器的异常检测"""
        model.eval()
        
        with torch.no_grad():
            # 重构数据
            data_tensor = torch.FloatTensor(data)
            reconstructed = model(data_tensor)
            
            # 计算重构误差
            mse = F.mse_loss(reconstructed, data_tensor, reduction='none')
            reconstruction_errors = mse.mean(dim=1).numpy()
        
        # 确定阈值
        if threshold is None:
            threshold = np.percentile(
                reconstruction_errors, 
                (1 - self.contamination) * 100
            )
        
        # 标记异常
        anomalies = reconstruction_errors > threshold
        
        return anomalies, reconstruction_errors
    
    def one_class_svm(self, data):
        """单类SVM"""
        from sklearn.svm import OneClassSVM
        
        detector = OneClassSVM(
            nu=self.contamination,
            kernel='rbf',
            gamma='auto'
        )
        
        detector.fit(data)
        predictions = detector.predict(data)
        decision_scores = detector.decision_function(data)
        
        return predictions, decision_scores
无监督学习最佳实践
  1. 数据预处理:标准化和归一化对无监督学习至关重要
  2. 特征选择:选择信息量大的特征,去除噪声
  3. 聚类数选择:使用肘部法则、轮廓系数等确定最优聚类数
  4. 多方法验证:结合多种无监督方法验证结果
  5. 可视化分析:降维可视化帮助理解数据结构
  6. 评估指标:使用内部指标(如轮廓系数)和外部指标(如纯度)

实际应用

主题建模

class TopicModeling:
    """主题建模实现"""
    
    def __init__(self, n_topics=10):
        self.n_topics = n_topics
        
    def lda_modeling(self, documents):
        """潜在狄利克雷分配(LDA)"""
        from sklearn.feature_extraction.text import CountVectorizer
        from sklearn.decomposition import LatentDirichletAllocation
        
        # 向量化文档
        vectorizer = CountVectorizer(
            max_features=1000,
            stop_words='english'
        )
        doc_term_matrix = vectorizer.fit_transform(documents)
        
        # LDA模型
        lda = LatentDirichletAllocation(
            n_components=self.n_topics,
            random_state=42,
            learning_method='batch'
        )
        
        # 训练模型
        doc_topics = lda.fit_transform(doc_term_matrix)
        
        # 提取主题词
        feature_names = vectorizer.get_feature_names_out()
        topics = []
        
        for topic_idx, topic in enumerate(lda.components_):
            top_indices = topic.argsort()[-10:][::-1]
            top_words = [feature_names[i] for i in top_indices]
            topics.append({
                'topic_id': topic_idx,
                'words': top_words,
                'weights': topic[top_indices].tolist()
            })
        
        return doc_topics, topics
    
    def neural_topic_model(self, embeddings, hidden_dim=100):
        """神经主题模型"""
        class NeuralTopicModel(nn.Module):
            def __init__(self, vocab_size, n_topics, hidden_dim):
                super().__init__()
                
                # 编码器
                self.encoder = nn.Sequential(
                    nn.Linear(vocab_size, hidden_dim),
                    nn.ReLU(),
                    nn.Dropout(0.2),
                    nn.Linear(hidden_dim, hidden_dim),
                    nn.ReLU()
                )
                
                # 主题层
                self.topic_layer = nn.Linear(hidden_dim, n_topics)
                
                # 解码器
                self.decoder = nn.Linear(n_topics, vocab_size)
                
                # Softmax激活
                self.softmax = nn.Softmax(dim=1)
            
            def forward(self, x):
                # 编码
                hidden = self.encoder(x)
                
                # 主题分布
                topic_dist = self.softmax(self.topic_layer(hidden))
                
                # 重构
                reconstructed = self.decoder(topic_dist)
                
                return reconstructed, topic_dist
        
        vocab_size = embeddings.shape[1]
        model = NeuralTopicModel(vocab_size, self.n_topics, hidden_dim)
        
        # 训练
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.MSELoss()
        
        embeddings_tensor = torch.FloatTensor(embeddings)
        
        for epoch in range(100):
            reconstructed, topic_dist = model(embeddings_tensor)
            loss = criterion(reconstructed, embeddings_tensor)
            
            # 添加稀疏性约束
            sparsity_loss = torch.mean(topic_dist * torch.log(topic_dist + 1e-8))
            total_loss = loss + 0.1 * sparsity_loss
            
            optimizer.zero_grad()
            total_loss.backward()
            optimizer.step()
        
        return model, topic_dist.detach().numpy()

表示学习应用

class RepresentationLearning:
    """表示学习应用"""
    
    def __init__(self, encoder_model):
        self.encoder = encoder_model
        
    def semantic_similarity(self, text1, text2):
        """语义相似度计算"""
        # 编码文本
        embed1 = self.encoder.encode(text1)
        embed2 = self.encoder.encode(text2)
        
        # 余弦相似度
        similarity = F.cosine_similarity(embed1, embed2, dim=0)
        
        return similarity.item()
    
    def document_retrieval(self, query, documents, top_k=5):
        """文档检索"""
        # 编码查询
        query_embed = self.encoder.encode(query)
        
        # 编码文档
        doc_embeds = []
        for doc in documents:
            doc_embed = self.encoder.encode(doc)
            doc_embeds.append(doc_embed)
        
        doc_embeds = torch.stack(doc_embeds)
        
        # 计算相似度
        similarities = F.cosine_similarity(
            query_embed.unsqueeze(0),
            doc_embeds,
            dim=1
        )
        
        # 获取top-k
        top_scores, top_indices = torch.topk(similarities, top_k)
        
        results = []
        for idx, score in zip(top_indices, top_scores):
            results.append({
                'document': documents[idx],
                'score': score.item()
            })
        
        return results
    
    def zero_shot_classification(self, text, labels):
        """零样本分类(基于表示学习)"""
        # 编码文本
        text_embed = self.encoder.encode(text)
        
        # 编码标签
        label_embeds = []
        for label in labels:
            # 将标签转换为描述
            label_desc = f"This text is about {label}"
            label_embed = self.encoder.encode(label_desc)
            label_embeds.append(label_embed)
        
        label_embeds = torch.stack(label_embeds)
        
        # 计算相似度
        similarities = F.cosine_similarity(
            text_embed.unsqueeze(0),
            label_embeds,
            dim=1
        )
        
        # 获取最相似的标签
        best_idx = torch.argmax(similarities)
        
        return labels[best_idx], similarities[best_idx].item()

数据增强与生成

class UnsupervisedDataAugmentation:
    """无监督数据增强"""
    
    def __init__(self, model):
        self.model = model
        
    def back_translation(self, text, intermediate_lang='en'):
        """回译增强"""
        # 翻译到中间语言
        translated = self.model.translate(text, target_lang=intermediate_lang)
        
        # 翻译回原语言
        back_translated = self.model.translate(
            translated, 
            target_lang='zh'
        )
        
        return back_translated
    
    def paraphrase_generation(self, text, num_paraphrases=3):
        """释义生成"""
        paraphrases = []
        
        for _ in range(num_paraphrases):
            prompt = f"请改写以下句子,保持原意:\n{text}\n改写:"
            paraphrase = self.model.generate(prompt, temperature=0.8)
            paraphrases.append(paraphrase)
        
        return paraphrases
    
    def contextual_augmentation(self, text, mask_ratio=0.15):
        """上下文增强(类似BERT的MLM)"""
        words = text.split()
        n_mask = int(len(words) * mask_ratio)
        
        # 随机选择要掩码的位置
        mask_positions = np.random.choice(
            len(words), 
            n_mask, 
            replace=False
        )
        
        augmented_texts = []
        
        for pos in mask_positions:
            masked_words = words.copy()
            original_word = masked_words[pos]
            masked_words[pos] = '[MASK]'
            
            masked_text = ' '.join(masked_words)
            
            # 预测掩码位置的词
            predicted_word = self.model.predict_mask(masked_text)
            
            if predicted_word != original_word:
                masked_words[pos] = predicted_word
                augmented_text = ' '.join(masked_words)
                augmented_texts.append(augmented_text)
        
        return augmented_texts
    
    def mixup_augmentation(self, embeddings, alpha=0.2):
        """Mixup数据增强"""
        batch_size = embeddings.shape[0]
        
        # 生成混合系数
        lam = np.random.beta(alpha, alpha, batch_size)
        lam = np.maximum(lam, 1 - lam)
        
        # 随机排列索引
        index = torch.randperm(batch_size)
        
        # 混合嵌入
        mixed_embeddings = lam.reshape(-1, 1) * embeddings + \
                          (1 - lam.reshape(-1, 1)) * embeddings[index]
        
        return mixed_embeddings, lam, index

评估与分析

聚类评估指标

class ClusteringEvaluator:
    """聚类评估器"""
    
    def __init__(self):
        pass
    
    def silhouette_score(self, data, labels):
        """轮廓系数"""
        from sklearn.metrics import silhouette_score, silhouette_samples
        
        # 整体轮廓系数
        overall_score = silhouette_score(data, labels)
        
        # 每个样本的轮廓系数
        sample_scores = silhouette_samples(data, labels)
        
        # 每个聚类的平均轮廓系数
        cluster_scores = {}
        for label in np.unique(labels):
            mask = labels == label
            cluster_scores[label] = sample_scores[mask].mean()
        
        return {
            'overall': overall_score,
            'by_cluster': cluster_scores,
            'samples': sample_scores
        }
    
    def davies_bouldin_score(self, data, labels):
        """Davies-Bouldin指数(越小越好)"""
        from sklearn.metrics import davies_bouldin_score
        
        score = davies_bouldin_score(data, labels)
        return score
    
    def calinski_harabasz_score(self, data, labels):
        """Calinski-Harabasz指数(越大越好)"""
        from sklearn.metrics import calinski_harabasz_score
        
        score = calinski_harabasz_score(data, labels)
        return score
    
    def evaluate_with_ground_truth(self, predicted_labels, true_labels):
        """有真实标签时的评估"""
        from sklearn.metrics import (
            adjusted_rand_score,
            normalized_mutual_info_score,
            homogeneity_score,
            completeness_score,
            v_measure_score
        )
        
        return {
            'adjusted_rand_index': adjusted_rand_score(true_labels, predicted_labels),
            'normalized_mutual_info': normalized_mutual_info_score(
                true_labels, predicted_labels
            ),
            'homogeneity': homogeneity_score(true_labels, predicted_labels),
            'completeness': completeness_score(true_labels, predicted_labels),
            'v_measure': v_measure_score(true_labels, predicted_labels)
        }
    
    def optimal_clusters_elbow(self, data, max_k=10):
        """肘部法则确定最优聚类数"""
        from sklearn.cluster import KMeans
        
        inertias = []
        K_range = range(2, max_k + 1)
        
        for k in K_range:
            kmeans = KMeans(n_clusters=k, random_state=42)
            kmeans.fit(data)
            inertias.append(kmeans.inertia_)
        
        # 计算肘部点
        deltas = np.diff(inertias)
        delta_deltas = np.diff(deltas)
        
        # 找到变化率最大的点
        elbow_point = np.argmax(delta_deltas) + 2  # +2因为从k=2开始
        
        return {
            'k_values': list(K_range),
            'inertias': inertias,
            'optimal_k': elbow_point
        }

表示质量评估

class RepresentationQualityEvaluator:
    """表示质量评估"""
    
    def __init__(self):
        pass
    
    def intrinsic_dimension(self, embeddings):
        """估计内在维度"""
        from sklearn.decomposition import PCA
        
        pca = PCA()
        pca.fit(embeddings)
        
        # 累积解释方差
        cumsum_var = np.cumsum(pca.explained_variance_ratio_)
        
        # 找到解释90%方差所需的维度
        n_components_90 = np.argmax(cumsum_var >= 0.9) + 1
        
        # 找到解释95%方差所需的维度
        n_components_95 = np.argmax(cumsum_var >= 0.95) + 1
        
        return {
            'n_components_90': n_components_90,
            'n_components_95': n_components_95,
            'explained_variance_ratio': pca.explained_variance_ratio_,
            'cumulative_variance': cumsum_var
        }
    
    def neighborhood_preservation(self, original_data, reduced_data, k=10):
        """邻域保持度评估"""
        from sklearn.neighbors import NearestNeighbors
        
        # 原始空间的k近邻
        nn_original = NearestNeighbors(n_neighbors=k+1)
        nn_original.fit(original_data)
        neighbors_original = nn_original.kneighbors(
            original_data, 
            return_distance=False
        )[:, 1:]  # 排除自己
        
        # 降维空间的k近邻
        nn_reduced = NearestNeighbors(n_neighbors=k+1)
        nn_reduced.fit(reduced_data)
        neighbors_reduced = nn_reduced.kneighbors(
            reduced_data,
            return_distance=False
        )[:, 1:]
        
        # 计算保持度
        preservation_scores = []
        for i in range(len(original_data)):
            # 计算交集
            intersection = np.intersect1d(
                neighbors_original[i],
                neighbors_reduced[i]
            )
            preservation = len(intersection) / k
            preservation_scores.append(preservation)
        
        return {
            'mean_preservation': np.mean(preservation_scores),
            'std_preservation': np.std(preservation_scores),
            'scores': preservation_scores
        }
    
    def reconstruction_quality(self, model, test_data):
        """重构质量评估(用于自编码器)"""
        model.eval()
        
        with torch.no_grad():
            test_tensor = torch.FloatTensor(test_data)
            
            # 如果是VAE
            if hasattr(model, 'encode'):
                mu, log_var = model.encode(test_tensor)
                z = model.reparameterize(mu, log_var)
                reconstructed = model.decode(z)
            else:
                reconstructed = model(test_tensor)
            
            # 计算重构误差
            mse = F.mse_loss(reconstructed, test_tensor).item()
            
            # 计算PSNR(峰值信噪比)
            max_val = test_tensor.max().item()
            psnr = 20 * np.log10(max_val) - 10 * np.log10(mse)
            
            # 计算结构相似性(如果是图像)
            if len(test_tensor.shape) == 4:  # 图像数据
                from skimage.metrics import structural_similarity as ssim
                
                original_np = test_tensor.numpy()
                reconstructed_np = reconstructed.numpy()
                
                ssim_scores = []
                for i in range(len(original_np)):
                    score = ssim(
                        original_np[i], 
                        reconstructed_np[i],
                        multichannel=True
                    )
                    ssim_scores.append(score)
                
                mean_ssim = np.mean(ssim_scores)
            else:
                mean_ssim = None
        
        return {
            'mse': mse,
            'psnr': psnr,
            'ssim': mean_ssim
        }
无监督学习注意事项
  1. 数据质量:无监督学习对噪声和异常值敏感
  2. 特征尺度:不同尺度的特征需要标准化
  3. 维度诅咒:高维数据可能需要降维处理
  4. 评估困难:缺少标签使得评估更具挑战性
  5. 参数敏感:聚类数、降维维度等参数选择重要
  6. 计算成本:某些方法(如t-SNE)计算成本高

未来发展趋势

自监督与无监督结合

class HybridUnsupervisedLearning:
    """混合无监督学习方法"""
    
    def __init__(self):
        pass
    
    def masked_autoencoding(self, model, data, mask_ratio=0.75):
        """掩码自编码(MAE)"""
        # 随机掩码
        batch_size, seq_len, feature_dim = data.shape
        len_keep = int(seq_len * (1 - mask_ratio))
        
        noise = torch.rand(batch_size, seq_len)
        ids_shuffle = torch.argsort(noise, dim=1)
        ids_restore = torch.argsort(ids_shuffle, dim=1)
        
        # 保留的token
        ids_keep = ids_shuffle[:, :len_keep]
        x_masked = torch.gather(
            data, 
            dim=1,
            index=ids_keep.unsqueeze(-1).repeat(1, 1, feature_dim)
        )
        
        # 生成掩码token
        mask_token = nn.Parameter(torch.zeros(1, 1, feature_dim))
        mask_tokens = mask_token.repeat(batch_size, seq_len - len_keep, 1)
        
        # 重构
        x_with_mask = torch.cat([x_masked, mask_tokens], dim=1)
        x_with_mask = torch.gather(
            x_with_mask,
            dim=1,
            index=ids_restore.unsqueeze(-1).repeat(1, 1, feature_dim)
        )
        
        # 编码和解码
        reconstructed = model(x_with_mask)
        
        # 只计算掩码位置的损失
        mask = torch.ones_like(data)
        mask[:, :len_keep, :] = 0
        mask = torch.gather(
            mask,
            dim=1,
            index=ids_restore.unsqueeze(-1).repeat(1, 1, feature_dim)
        )
        
        loss = (reconstructed - data) ** 2
        loss = (loss * mask).sum() / mask.sum()
        
        return loss
    
    def momentum_contrast(self, query_encoder, key_encoder, 
                         queries, keys, queue, temperature=0.07):
        """动量对比(MoCo)"""
        # Query特征
        q = query_encoder(queries)
        q = F.normalize(q, dim=1)
        
        # Key特征(不计算梯度)
        with torch.no_grad():
            k = key_encoder(keys)
            k = F.normalize(k, dim=1)
        
        # 正样本
        l_pos = torch.einsum('nc,nc->n', [q, k]).unsqueeze(-1)
        
        # 负样本(从队列中)
        l_neg = torch.einsum('nc,ck->nk', [q, queue.T])
        
        # 对比损失
        logits = torch.cat([l_pos, l_neg], dim=1)
        logits /= temperature
        
        labels = torch.zeros(logits.shape[0], dtype=torch.long)
        loss = F.cross_entropy(logits, labels)
        
        # 更新队列
        queue = torch.cat([k.T, queue[:, :-k.shape[0]]], dim=1)
        
        return loss, queue

生成式无监督学习

class GenerativeUnsupervisedLearning:
    """生成式无监督学习"""
    
    def __init__(self):
        pass
    
    def diffusion_model(self, model, data, timesteps=1000):
        """扩散模型"""
        # 前向扩散过程
        def forward_diffusion(x_0, t, noise=None):
            if noise is None:
                noise = torch.randn_like(x_0)
            
            # 计算噪声调度
            alpha_t = self.get_alpha_schedule(t, timesteps)
            alpha_bar_t = torch.cumprod(alpha_t, dim=0)
            
            # 添加噪声
            sqrt_alpha_bar = torch.sqrt(alpha_bar_t)
            sqrt_one_minus_alpha_bar = torch.sqrt(1 - alpha_bar_t)
            
            x_t = sqrt_alpha_bar * x_0 + sqrt_one_minus_alpha_bar * noise
            
            return x_t, noise
        
        # 反向去噪过程
        def reverse_diffusion(model, x_t, t):
            # 预测噪声
            predicted_noise = model(x_t, t)
            
            # 去噪步骤
            alpha_t = self.get_alpha_schedule(t, timesteps)
            alpha_bar_t = torch.cumprod(alpha_t, dim=0)
            
            # 计算均值
            beta_t = 1 - alpha_t
            sqrt_recip_alpha_t = 1 / torch.sqrt(alpha_t)
            
            mean = sqrt_recip_alpha_t * (
                x_t - beta_t / torch.sqrt(1 - alpha_bar_t) * predicted_noise
            )
            
            # 添加噪声(除了最后一步)
            if t > 1:
                variance = beta_t
                noise = torch.randn_like(x_t)
                x_t_minus_1 = mean + torch.sqrt(variance) * noise
            else:
                x_t_minus_1 = mean
            
            return x_t_minus_1
        
        # 训练循环
        optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
        
        for epoch in range(100):
            # 随机时间步
            t = torch.randint(1, timesteps, (data.shape[0],))
            
            # 前向扩散
            x_t, noise = forward_diffusion(data, t)
            
            # 预测噪声
            predicted_noise = model(x_t, t)
            
            # 损失
            loss = F.mse_loss(predicted_noise, noise)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        return model
    
    def flow_matching(self, model, source_data, target_data):
        """流匹配(Flow Matching)"""
        # 定义概率路径
        def interpolate(x0, x1, t):
            return (1 - t) * x0 + t * x1
        
        # 计算条件向量场
        def conditional_vector_field(x0, x1, xt, t):
            return (x1 - x0) / (1 - t + 1e-8)
        
        # 训练
        optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
        
        for epoch in range(100):
            # 随机时间
            t = torch.rand(source_data.shape[0], 1)
            
            # 插值
            xt = interpolate(source_data, target_data, t)
            
            # 真实向量场
            true_vt = conditional_vector_field(source_data, target_data, xt, t)
            
            # 预测向量场
            pred_vt = model(xt, t)
            
            # 损失
            loss = F.mse_loss(pred_vt, true_vt)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        return model

相关概念

延伸阅读