code-appendix

Char*.txt 完整代码附录

本页把随书 Char01.txtChar10.txt 中的主要代码示例按章节整理出来,方便直接复制、学习和改造。

说明:原书章节中既有解释文字,也有多段代码。这里优先收录能够直接运行或便于改造的核心代码,并保留原始注释风格。

Char01:Transformer 与微调基础

1. Transformer 编码器层

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

# 定义位置编码模块
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(np.log(10000.0) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)  # 增加批次维度

    def forward(self, x):
        seq_len = x.size(1)
        return x + self.encoding[:, :seq_len, :].to(x.device)

# 定义多头注意力机制
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model必须能被num_heads整除"
        self.d_model = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.fc_out = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        matmul_qk = torch.matmul(Q, K.transpose(-2, -1))
        dk = K.size(-1)
        scaled_attention_logits = matmul_qk / torch.sqrt(torch.tensor(dk, dtype=torch.float32))
        if mask is not None:
            scaled_attention_logits += (mask * -1e9)
        attention_weights = F.softmax(scaled_attention_logits, dim=-1)
        output = torch.matmul(attention_weights, V)
        return output, attention_weights

    def split_heads(self, x, batch_size):
        return x.view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        Q = self.q_linear(Q)
        K = self.k_linear(K)
        V = self.v_linear(V)
        Q = self.split_heads(Q, batch_size)
        K = self.split_heads(K, batch_size)
        V = self.split_heads(V, batch_size)
        attention, weights = self.scaled_dot_product_attention(Q, K, V, mask)
        attention = attention.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return self.fc_out(attention)

# 定义前馈神经网络
class FeedForwardNetwork(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForwardNetwork, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2(F.relu(self.linear1(x)))

# 定义Transformer编码器层
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(TransformerEncoderLayer, self).__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForwardNetwork(d_model, d_ff)
        self.layernorm1 = nn.LayerNorm(d_model)
        self.layernorm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_output = self.mha(x, x, x, mask)
        out1 = self.layernorm1(x + self.dropout(attn_output))
        ffn_output = self.ffn(out1)
        out2 = self.layernorm2(out1 + self.dropout(ffn_output))
        return out2

# 测试Transformer编码器层
if __name__ == "__main__":
    # 设置模型参数
    d_model = 512
    num_heads = 8
    d_ff = 2048
    seq_len = 10
    batch_size = 2
    # 模拟输入数据
    sample_input = torch.rand(batch_size, seq_len, d_model)
    mask = None  # 暂不设置掩码
    # 初始化模型
    encoder_layer = TransformerEncoderLayer(d_model, num_heads, d_ff)
    positional_encoding = PositionalEncoding(d_model)
    # 加入位置编码并传入编码器
    input_with_pos = positional_encoding(sample_input)
    output = encoder_layer(input_with_pos, mask)
    print("输入形状:", sample_input.shape)
    print("输出形状:", output.shape)

2. 编码器-解码器

import torch
import torch.nn as nn
import torch.nn.functional as F

# 定义位置编码
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(
                                torch.log(torch.tensor(10000.0)) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)  # 扩展为批次维度

    def forward(self, x):
        return x + self.encoding[:, :x.size(1), :].to(x.device)

# 定义多头注意力机制
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model必须能被num_heads整除"
        self.num_heads = num_heads
        self.depth = d_model // num_heads
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.fc_out = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(
                                torch.tensor(self.depth, dtype=torch.float))
        if mask is not None:
            scores += mask * -1e9
        attention_weights = F.softmax(scores, dim=-1)
        output = torch.matmul(attention_weights, V)
        return output, attention_weights

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        Q = self.q_linear(Q)
        K = self.k_linear(K)
        V = self.v_linear(V)

        # 分多头
        Q = Q.view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
        K = K.view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
        V = V.view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
        attention, _ = self.scaled_dot_product_attention(Q, K, V, mask)
        attention = attention.transpose(1, 2).contiguous().view(
                                batch_size, -1, self.num_heads * self.depth)
        return self.fc_out(attention)

# 定义编码器
class Encoder(nn.Module):
    def __init__(self, input_dim, d_model, num_heads, num_layers, d_ff, max_len):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([
            nn.Sequential(
                MultiHeadAttention(d_model, num_heads),
                nn.LayerNorm(d_model),
                FeedForward(d_model, d_ff),
                nn.LayerNorm(d_model)
            )
            for _ in range(num_layers)
        ])

    def forward(self, x, mask=None):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for mha, norm1, ffn, norm2 in self.layers:
            attn_output = mha(x, x, x, mask)
            x = norm1(x + attn_output)
            ffn_output = ffn(x)
            x = norm2(x + ffn_output)
        return x

# 定义前馈神经网络
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2(F.relu(self.linear1(x)))

# 定义解码器
class Decoder(nn.Module):
    def __init__(self, output_dim, d_model, num_heads, num_layers, d_ff, max_len):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([
            nn.Sequential(
                MultiHeadAttention(d_model, num_heads),
                nn.LayerNorm(d_model),
                MultiHeadAttention(d_model, num_heads),
                nn.LayerNorm(d_model),
                FeedForward(d_model, d_ff),
                nn.LayerNorm(d_model)
            )
            for _ in range(num_layers)
        ])

    def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for mha1, norm1, mha2, norm2, ffn, norm3 in self.layers:
            attn1 = mha1(x, x, x, tgt_mask)
            x = norm1(x + attn1)
            attn2 = mha2(x, enc_output, enc_output, src_mask)
            x = norm2(x + attn2)
            ffn_output = ffn(x)
            x = norm3(x + ffn_output)
        return x

# 编码器-解码器模型测试
if __name__ == "__main__":
    input_dim = 1000  # 输入词汇量
    output_dim = 1000  # 输出词汇量
    d_model = 512
    num_heads = 8
    num_layers = 2
    d_ff = 2048
    max_len = 100
    src_seq = torch.randint(0, input_dim, (2, 10))
    tgt_seq = torch.randint(0, output_dim, (2, 10))
    encoder = Encoder(input_dim, d_model, num_heads, num_layers, d_ff, max_len)
    decoder = Decoder(output_dim, d_model, num_heads, num_layers, d_ff, max_len)
    enc_output = encoder(src_seq)
    dec_output = decoder(tgt_seq, enc_output)
    print("Encoder output shape:", enc_output.shape)
    print("Decoder output shape:", dec_output.shape)

3. 多头注意力机制

import torch
import torch.nn as nn

class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self, Q, K, V, mask=None):
        scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(K.size(-1), dtype=torch.float32))
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attention_weights = torch.softmax(scores, dim=-1)
        output = torch.matmul(attention_weights, V)
        return output, attention_weights

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model必须能被num_heads整除"
        self.num_heads = num_heads  # 注意力头数量
        self.depth = d_model // num_heads
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.fc_out = nn.Linear(d_model, d_model)

    def split_heads(self, x, batch_size):
        """
        将 d_model 维度拆分为 num_heads 个 head,每个 head 的维度为 depth
        """
        x = x.view(batch_size, -1, self.num_heads, self.depth)
        return x.transpose(1, 2)

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        # 线性变换并拆分为多头
        Q = self.split_heads(self.q_linear(Q), batch_size)
        K = self.split_heads(self.k_linear(K), batch_size)
        V = self.split_heads(self.v_linear(V), batch_size)
        # 计算注意力输出
        attention, weights = ScaledDotProductAttention()(Q, K, V, mask)
        # 将多头结果拼接回去
        attention = attention.transpose(1, 2).contiguous().view(
                                batch_size, -1, self.num_heads * self.depth)
        # 通过最终线性层
        output = self.fc_out(attention)
        return output, weights

if __name__ == "__main__":
    # 设置参数
    d_model = 512  # 嵌入维度
    num_heads = 8  # 注意力头数量
    seq_len = 10   # 序列长度
    batch_size = 2 # 批次大小
    # 模拟输入数据
    Q = torch.rand(batch_size, seq_len, d_model)
    K = torch.rand(batch_size, seq_len, d_model)
    V = torch.rand(batch_size, seq_len, d_model)
    # 初始化多头注意力机制
    multi_head_attention = MultiHeadAttention(d_model, num_heads)
    # 传入数据
    output, attention_weights = multi_head_attention(Q, K, V)
    print("输出形状:", output.shape)
    print("注意力权重形状:", attention_weights.shape)

4. LoRA 微调

import torch
import torch.nn as nn
from transformers import (AutoModelForCausalLM, AutoTokenizer, 
                              Trainer, TrainingArguments)
from datasets import Dataset

# 定义 LoRA 模块
class LoRAModule(nn.Module):
    def __init__(self, input_dim, output_dim, rank=4):
        super(LoRAModule, self).__init__()
        self.A = nn.Linear(input_dim, rank, bias=False)  # 低秩矩阵 A
        self.B = nn.Linear(rank, output_dim, bias=False)  # 低秩矩阵 B

    def forward(self, x):
        return self.B(self.A(x))

# 将 LoRA 注入到模型的线性层中
class GPTWithLoRA(nn.Module):
    def __init__(self, base_model, rank=4):
        super(GPTWithLoRA, self).__init__()
        self.base_model = base_model
        self.lora_modules = nn.ModuleDict()  # 用于存储 LoRA 模块
        for name, module in base_model.named_modules():
            if isinstance(module, nn.Linear):  # 仅在线性层中插入 LoRA
                input_dim, output_dim = module.in_features, module.out_features
                self.lora_modules[name] = LoRAModule(input_dim, output_dim, rank)

    def forward(self, input_ids, attention_mask=None, labels=None):
        outputs = self.base_model(input_ids=input_ids,
                                 attention_mask=attention_mask, labels=labels)
        lora_output = 0
        for name, lora in self.lora_modules.items():
            base_output = dict(self.base_model.named_modules())[name](
                                outputs.last_hidden_state)
            lora_output += lora(base_output)
        return outputs.loss, lora_output

# 加载预训练模型和分词器
tokenizer = AutoTokenizer.from_pretrained("gpt2")
base_model = AutoModelForCausalLM.from_pretrained("gpt2")
# 初始化带 LoRA 的模型
lora_model = GPTWithLoRA(base_model, rank=4)
# 打印模型结构
print(lora_model)
# 准备训练数据
data = {
    "text": [
        "今天是个好天气。",
        "我喜欢用GPT模型学习。",
        "微调技术让模型更加灵活。",
        "LoRA 技术是一种高效的微调方法。",
        "通过低秩矩阵分解减少参数量。"
    ]
}
# 转换为 Hugging Face 数据集
dataset = Dataset.from_dict(data)

def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True,
                         padding="max_length", max_length=64)

tokenized_dataset = dataset.map(preprocess_function, batched=True)
# 定义训练参数
training_args = TrainingArguments(
    output_dir="./results",
    per_device_train_batch_size=4,
    num_train_epochs=3,
    logging_dir="./logs",
    save_strategy="epoch",
    logging_steps=10
)
# 使用 Hugging Face Trainer 进行训练
trainer = Trainer(
    model=lora_model,
    args=training_args,
    train_dataset=tokenized_dataset,
    tokenizer=tokenizer,
)
# 开始训练
print("开始训练 LoRA 微调模型...")
trainer.train()
# 保存微调后的模型
print("保存模型...")
lora_model.save_pretrained("./lora_finetuned_model")
tokenizer.save_pretrained("./lora_finetuned_model")
# 加载微调后的模型进行推理
print("加载微调模型进行推理...")
finetuned_model = AutoModelForCausalLM.from_pretrained(
                                                "./lora_finetuned_model")
finetuned_tokenizer = AutoTokenizer.from_pretrained(
                                                "./lora_finetuned_model")
# 测试推理
test_text = "GPT 模型的优点是"
input_ids = finetuned_tokenizer(test_text, return_tensors="pt").input_ids
output = finetuned_model.generate(input_ids, max_length=50)
result = finetuned_tokenizer.decode(output[0], skip_special_tokens=True)
print("输入文本:", test_text)
print("生成结果:", result)

Char02:CUDA 与 PyTorch 加速

1. 矩阵运算与优化

import torch
import time

# 检查CUDA设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"当前设备: {device}")

# 初始化矩阵
def initialize_matrices(size):
    # 随机生成两个矩阵
    A = torch.randn(size, size, device=device)
    B = torch.randn(size, size, device=device)
    return A, B

# 矩阵乘法
def matrix_multiplication(A, B):
    start_time = time.time()
    C = torch.mm(A, B)  # 使用CUDA加速的矩阵乘法
    torch.cuda.synchronize()  # 等待所有CUDA操作完成
    end_time = time.time()
    return C, end_time - start_time

# 优化矩阵乘法: 分块计算
def optimized_matrix_multiplication(A, B, block_size):
    start_time = time.time()
    size = A.size(0)
    C = torch.zeros(size, size, device=device)  # 初始化结果矩阵
    for i in range(0, size, block_size):
        for j in range(0, size, block_size):
            for k in range(0, size, block_size):
                # 提取块并进行计算
                A_block = A[i:i+block_size, k:k+block_size]
                B_block = B[k:k+block_size, j:j+block_size]
                C[i:i+block_size, j:j+block_size] += torch.mm(A_block, B_block)
    torch.cuda.synchronize()  # 等待所有CUDA操作完成
    end_time = time.time()
    return C, end_time - start_time

# 验证结果
def verify_results(C1, C2):
    return torch.allclose(C1, C2, atol=1e-5)

# 主函数
if __name__ == "__main__":
    matrix_size = 1024  # 定义矩阵大小
    block_size = 128  # 定义分块大小
    print(f"初始化{matrix_size}x{matrix_size}矩阵...")
    A, B = initialize_matrices(matrix_size)
    print("开始标准矩阵乘法...")
    C_standard, time_standard = matrix_multiplication(A, B)
    print(f"标准矩阵乘法耗时: {time_standard:.4f}")
    print("开始优化矩阵乘法...")
    C_optimized, time_optimized = optimized_matrix_multiplication(A, B, block_size)
    print(f"优化矩阵乘法耗时: {time_optimized:.4f}")
    print("验证两种方法的结果是否一致...")
    if verify_results(C_standard, C_optimized):
        print("结果一致!")
    else:
        print("结果不一致!")
    print("矩阵乘法性能对比:")
    print(f"标准方法耗时: {time_standard:.4f}")
    print(f"优化方法耗时: {time_optimized:.4f}")
    print(f"加速比: {time_standard / time_optimized:.2f}")

2. CUDA 内核性能调优与工具使用

import torch
import time

# 检查CUDA是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"当前设备: {device}")

# CUDA内核实现矩阵加法
def cuda_matrix_addition(A, B, block_size):
    """
    使用CUDA加速的矩阵加法
    """
    # 获取矩阵大小
    size = A.size(0)
    # 创建结果矩阵
    C = torch.zeros(size, size, device=device)
    # 使用分块方式执行加法操作
    for i in range(0, size, block_size):
        for j in range(0, size, block_size):
            C[i:i+block_size, j:j+block_size] = A[i:i+block_size,
                         j:j+block_size] + B[i:i+block_size, j:j+block_size]
    return C

# 优化版本:引入共享内存模拟
def optimized_matrix_addition(A, B, block_size):
    """
    优化的矩阵加法,模拟共享内存优化
    """
    size = A.size(0)
    C = torch.zeros(size, size, device=device)
    # 以分块方式进行加法
    for i in range(0, size, block_size):
        for j in range(0, size, block_size):
            # 加载块到局部变量,模拟共享内存的作用
            A_block = A[i:i+block_size, j:j+block_size]
            B_block = B[i:i+block_size, j:j+block_size]
            # 进行加法操作
            C[i:i+block_size, j:j+block_size] = A_block + B_block
    return C

# 性能测试函数
def measure_performance(func, *args):
    torch.cuda.synchronize()  # 同步CUDA操作
    start_time = time.time()
    result = func(*args)
    torch.cuda.synchronize()  # 同步完成所有操作
    end_time = time.time()
    return result, end_time - start_time

# 主函数
if __name__ == "__main__":
    # 初始化矩阵
    size = 2048  # 矩阵大小
    block_size = 128  # 分块大小
    A = torch.randn(size, size, device=device)
    B = torch.randn(size, size, device=device)
    print(f"初始化{size}x{size}矩阵完成,使用分块大小: {block_size}")
    # 测试标准CUDA加法
    print("开始标准CUDA加法...")
    result_standard, time_standard = measure_performance(cuda_matrix_addition, A, B, block_size)
    print(f"标准CUDA加法耗时: {time_standard:.4f}")
    # 测试优化版本
    print("开始优化CUDA加法...")
    result_optimized, time_optimized = measure_performance(optimized_matrix_addition, A, B, block_size)
    print(f"优化CUDA加法耗时: {time_optimized:.4f}")
    # 验证结果
    print("验证结果一致性...")
    if torch.allclose(result_standard, result_optimized):
        print("结果一致!")
    else:
        print("结果不一致!")

Char03:微调任务与数据准备

1. 分类、生成与问答

from transformers import (AutoTokenizer, AutoModelForSequenceClassification, 
                                Trainer, TrainingArguments)
from datasets import load_dataset
import torch

# 加载预训练模型和分词器
model_name = "facebook/llama-3b"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(
                                                        model_name, num_labels=2)
# 加载数据集
dataset = load_dataset("imdb")
dataset = dataset.shuffle(seed=42)
# 数据预处理
def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True,
                         padding="max_length", max_length=128)
encoded_dataset = dataset.map(preprocess_function, batched=True)
encoded_dataset = encoded_dataset.rename_column("label", "labels")
encoded_dataset.set_format("torch")
# 定义训练参数
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=1,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
    save_total_limit=2,
    load_best_model_at_end=True,
)
# 定义Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=encoded_dataset["train"].select(range(1000)),
    eval_dataset=encoded_dataset["test"].select(range(500)),
    tokenizer=tokenizer,
)
# 开始训练
trainer.train()
# 测试推理
text = "This movie is fantastic!"
inputs = tokenizer(text, return_tensors="pt", truncation=True,
                     padding="max_length", max_length=128)
outputs = model(**inputs)
logits = outputs.logits
prediction = torch.argmax(logits, dim=1).item()
labels = ["negative", "positive"]
print(f"分类结果: {labels[prediction]}")

from transformers import AutoTokenizer, AutoModelForCausalLM
# 加载模型和分词器
model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 文本生成
prompt = "Once upon a time in a distant galaxy,"
inputs = tokenizer(prompt, return_tensors="pt")
outputs = model.generate(inputs["input_ids"], max_length=50, num_beams=5,
                           no_repeat_ngram_size=2, early_stopping=True)
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(f"生成结果: {generated_text}")

from transformers import pipeline
# 加载问答管道
qa_pipeline = pipeline("question-answering", model=model_name, 
                          tokenizer=tokenizer)
# 定义上下文和问题
context = """
机器学习是人工智能的一个分支,主要研究如何让计算机从数据中学习。近年来,深度学习成为机器学习的一个重要方向。
"""
question = "深度学习属于哪个领域?"
# 问答推理
result = qa_pipeline(question=question, context=context)
print(f"问答结果: {result['answer']}")

2. 数据准备与预处理

from datasets import load_dataset, Dataset
from transformers import AutoTokenizer
import torch

# 1. 数据加载
print("加载IMDb数据集...")
raw_dataset = load_dataset("imdb")
print(f"原始数据集结构: {raw_dataset}")
# 2. 数据清洗:筛选有意义的样本(可选)
print("筛选样本...")
raw_dataset = raw_dataset.filter(lambda x: len(x["text"]) > 0)
print(f"清洗后数据集大小: {raw_dataset['train'].num_rows}")
# 3. 数据集拆分(训练集和验证集)
print("拆分数据集...")
raw_dataset = raw_dataset["train"].train_test_split(test_size=0.1)
train_dataset = raw_dataset["train"]
val_dataset = raw_dataset["test"]
print(f"训练集大小: {len(train_dataset)}, 验证集大小: {len(val_dataset)}")
# 4. 加载分词器
model_name = "facebook/llama-3b"
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 5. 定义数据预处理函数
def preprocess_function(examples):
    return tokenizer(
        examples["text"],
        truncation=True,
        padding="max_length",
        max_length=128,
    )
# 6. 应用分词和编码
print("对训练集进行分词和编码...")
train_dataset = train_dataset.map(preprocess_function, batched=True)
val_dataset = val_dataset.map(preprocess_function, batched=True)
# 7. 删除原始文本列
train_dataset = train_dataset.remove_columns(["text"])
val_dataset = val_dataset.remove_columns(["text"])
# 8. 设置张量格式
train_dataset.set_format("torch")
val_dataset.set_format("torch")
print("数据准备完成")

Char04:量化、部署与性能测试

1. FastAPI 部署微调模型

# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

# 1. 初始化FastAPI应用
app = FastAPI()

# 2. 加载微调后的模型和分词器
model_name = "gemma/gpt-7b-finetuned"  # 假设这是微调后的模型路径
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)

# 3. 定义请求体和响应体的数据模型
class RequestData(BaseModel):
    text: str

class ResponseData(BaseModel):
    sentiment: str
    confidence: float

# 4. 定义情感分析接口
@app.post("/analyze", response_model=ResponseData)
async def analyze_sentiment(data: RequestData):
    try:
        # 文本分词与编码
        inputs = tokenizer(
            data.text,
            return_tensors="pt",
            truncation=True,
            padding="max_length",
            max_length=128,
        )
        # 模型推理
        with torch.no_grad():
            outputs = model(**inputs)
        logits = outputs.logits
        probabilities = torch.softmax(logits, dim=1).squeeze()
        sentiment = "positive" if torch.argmax(logits).item() == 1 else "negative"
        confidence = probabilities.max().item()
        return ResponseData(sentiment=sentiment, confidence=confidence)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# 启动服务:uvicorn app:app --reload

2. 接口性能测试

# test_performance.py
import requests
import time

# API地址
url = "http://127.0.0.1:8000/analyze"
# 测试样本
test_texts = [
    "这款商品非常不错,我很满意!",
    "物流速度太慢了,真让人失望。",
    "客服态度很好,帮助解决了问题,谢谢!",
    "质量一般,不值这个价钱。",
    "整体体验不错,值得推荐。",
] * 100  # 模拟500次请求

# 性能测试
start_time = time.time()
responses = []
for text in test_texts:
    response = requests.post(url, json={"text": text})
    if response.status_code == 200:
        responses.append(response.json())
    else:
        print(f"请求失败: {response.status_code}")
end_time = time.time()

# 打印结果统计
print(f"总请求数: {len(test_texts)}")
print(f"成功响应数: {len(responses)}")
print(f"总耗时: {end_time - start_time:.2f}")
print(f"平均响应时间: {(end_time-start_time) / len(test_texts):.2f} 秒/请求")

Char05:智能客服

1. 对话生成、FAQ 匹配、情感检测

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoModel, pipeline

# 对话生成通过预训练语言模型(如GPT系列)实现
model_name = "gpt2"  # 可替换为适合对话的模型
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
model.eval()

def generate_response(input_text):
    # 编码输入
    inputs = tokenizer(input_text, return_tensors="pt", 
                        max_length=128, truncation=True)
    # 生成响应
    with torch.no_grad():
        outputs = model.generate(
            input_ids=inputs["input_ids"],
            max_length=50,
            num_return_sequences=1,
            pad_token_id=tokenizer.eos_token_id
        )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return response

# 示例对话生成
input_text = "这款商品支持退货吗?"
response = generate_response(input_text)
print(f"输入: {input_text}")
print(f"生成的响应: {response}")

# 问题匹配通过嵌入向量和余弦相似度实现
embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"
embedding_tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)
embedding_model = AutoModel.from_pretrained(embedding_model_name)
embedding_model.eval()

# 生成嵌入向量
def generate_embedding(text):
    inputs = embedding_tokenizer(text, return_tensors="pt", 
                               padding=True, truncation=True, max_length=128)
    with torch.no_grad():
        outputs = embedding_model(**inputs)
    return outputs.last_hidden_state.mean(dim=1).numpy()

# FAQ 数据库
faq_questions = [
    "这款商品支持退货吗?",
    "支付方式有哪些?",
    "订单多久发货?"
]
faq_answers = [
    "支持7天无理由退货。",
    "支持微信、支付宝和信用卡支付。",
    "订单通常1-2天内发货。"
]

# 查询匹配
def match_question(user_question):
    user_embedding = generate_embedding(user_question)
    faq_embeddings = [generate_embedding(q) for q in faq_questions]
    similarities = [cosine_similarity(user_embedding, 
                                faq_emb)[0][0] for faq_emb in faq_embeddings]
    best_match_index = similarities.index(max(similarities))
    return faq_answers[best_match_index]

# 示例问题匹配
user_question = "如何付款?"
matched_answer = match_question(user_question)
print(f"用户问题: {user_question}")
print(f"匹配答案: {matched_answer}")

# 用户情绪检测
sentiment_analyzer = pipeline("sentiment-analysis")
def detect_emotion(user_input):
    analysis = sentiment_analyzer(user_input)
    return analysis[0]["label"], analysis[0]["score"]

user_input = "这服务太差了!"
emotion, confidence = detect_emotion(user_input)
print(emotion, confidence)

Char06:代码助手

1. 代码生成任务微调

from transformers import (AutoTokenizer, AutoModelForCausalLM, 
                         Trainer, TrainingArguments)
from datasets import Dataset
import torch

# 1. 加载预训练模型和分词器
model_name = "Salesforce/codegen-350M-multi"  # 用于代码生成的模型
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)

# 2. 准备训练数据
train_data = {
    "text": [
        "编写一个函数判断一个数是否是素数。",
        "实现一个冒泡排序算法。"
    ]
}
dataset = Dataset.from_dict(train_data)

# 3. Tokenization:为模型输入格式化数据
def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=128)

tokenized_dataset = dataset.map(preprocess_function, batched=True)

# 4. 定义训练参数
training_args = TrainingArguments(
    output_dir="./results",
    per_device_train_batch_size=2,
    num_train_epochs=1,
    logging_steps=10,
)

# 5. 开始微调训练
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset,
    tokenizer=tokenizer,
)
trainer.train()

# 6. 测试模型
test_prompt = "编写一个函数判断一个数是否是素数。"
inputs = tokenizer(test_prompt, return_tensors="pt")
outputs = model.generate(inputs["input_ids"], max_length=128)
print(tokenizer.decode(outputs[0], skip_special_tokens=True))

2. 错误检测与修复模型实现

from transformers import (AutoTokenizer, AutoModelForSeq2SeqLM, 
                          Trainer, TrainingArguments)
from datasets import Dataset

# 1. 加载预训练模型和分词器
model_name = "t5-small"  # T5 模型适合序列到序列任务,包括代码修复
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)

# 2. 准备数据集
train_data = {
    "input": ["def multiply(a, b return a * b"],
    "target": ["def multiply(a, b):\n    return a * b"]
}
dataset = Dataset.from_dict(train_data)

# 3. Tokenization:将输入和输出编码为模型可处理的格式
def preprocess_function(examples):
    inputs = tokenizer(examples["input"], truncation=True, padding="max_length", max_length=128)
    targets = tokenizer(examples["target"], truncation=True, padding="max_length", max_length=128)
    inputs["labels"] = targets["input_ids"]
    return inputs

tokenized_dataset = dataset.map(preprocess_function, batched=True)

# 4. 训练参数
training_args = TrainingArguments(
    output_dir="./results",
    per_device_train_batch_size=2,
    num_train_epochs=1,
)

# 5. 开始微调训练
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset,
    tokenizer=tokenizer,
)
trainer.train()

# 6. 测试模型修复能力
test_code = "def multiply(a, b return a * b"  # 缺少括号
inputs = tokenizer(test_code, return_tensors="pt")
outputs = model.generate(inputs["input_ids"], max_length=128)
print(tokenizer.decode(outputs[0], skip_special_tokens=True))

Char07:向量数据库与语义搜索

1. Sentence-BERT 生成向量

from sentence_transformers import SentenceTransformer
import numpy as np

# 加载预训练的 Sentence-BERT 模型
model_name = "paraphrase-MiniLM-L6-v2"  # 模型名称,可根据需要更换
model = SentenceTransformer(model_name)

# 示例文本数据
texts = ["今天天气很好", "我喜欢机器学习", "向量数据库很有用"]
embeddings = model.encode(texts)
print(embeddings.shape)

2. 语义搜索

from sentence_transformers import SentenceTransformer
import numpy as np

# 加载 Sentence-BERT 模型
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

# 示例数据
documents = ["Jetson Nano 是一款边缘计算开发板。", "Transformer 依赖自注意力机制。", "Milvus 可以做向量检索。"]
query = "什么是向量检索?"

# 编码
corpus_embeddings = model.encode(documents)
query_embedding = model.encode([query])[0]

# 计算相似度
def cosine_similarity(a, b):
    a = np.array(a)
    b = np.array(b)
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

scores = [cosine_similarity(query_embedding, doc) for doc in corpus_embeddings]
best_index = int(np.argmax(scores))
print(documents[best_index])

Char08:硬件开发助手

1. 硬件领域微调

from transformers import AutoModelForMaskedLM, Trainer, TrainingArguments

# 加载预训练模型
model = AutoModelForMaskedLM.from_pretrained("bert-base-uncased")

# 训练参数
training_args = TrainingArguments(
    output_dir="./hardware_model",
    per_device_train_batch_size=4,
    num_train_epochs=1,
)

# 训练模型
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=None,
)
print("开始微调模型...")
trainer.train()
print("模型微调完成")

# 保存微调后的模型
model.save_pretrained("./hardware_model")
print("微调后的模型已保存")

Char09:提示词优化

1. T5 提示词优化

from transformers import (T5Tokenizer, T5ForConditionalGeneration, 
                          Trainer, TrainingArguments)

# 加载T5模型和分词器
model_name = "t5-small"
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name)

# 微调训练参数
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=4,
    num_train_epochs=2,
)

# 模型训练
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=None,
)
trainer.train()

2. 对比学习

from transformers import AutoTokenizer, AutoModelForSequenceClassification
from torch.utils.data import DataLoader
import torch

# 模型加载
model_name = "t5-small"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)

# 训练模型
def train_contrastive_learning(model, dataloader, tokenizer):
    for batch in dataloader:
        # forward / loss / backward
        pass

# 测试对比学习模型效果
def test_prompt_similarity(model, tokenizer, prompt_1, prompt_2):
    pass

Char10:文档翻译与翻译 Agent

1. 翻译模型加载

from transformers import MarianTokenizer, MarianMTModel, pipeline

# 加载翻译模型
def load_translation_model():
    """
    加载MarianMT模型和分词器
    返回:
        translator: 翻译模型管道
    """
    model_name = "Helsinki-NLP/opus-mt-en-zh"  # 英文到中文翻译模型
    tokenizer = MarianTokenizer.from_pretrained(model_name)
    model = MarianMTModel.from_pretrained(model_name)
    translator = pipeline("translation", model=model, tokenizer=tokenizer)
    print("翻译模型加载成功")
    return translator

2. 划词翻译

from transformers import MarianTokenizer, MarianMTModel, pipeline

# 加载翻译模型
model_name = "Helsinki-NLP/opus-mt-en-zh"

3. 翻译结果增强

from sentence_transformers import SentenceTransformer

model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
original_embedding = model.encode([original])

4. BLEU / ROUGE 评估

from nltk.translate.bleu_score import sentence_bleu

# 加载翻译模型
def load_model():
    pass

Jetson Nano 适配代码

1. tiny Transformer 编码器参数

# Jetson Nano 建议先把 Char01 中 d_model=512 的示例缩小
# 这样可以保留 Transformer 结构,但显著减少内存占用和计算量

d_model = 64
num_heads = 4
d_ff = 256
seq_len = 16
batch_size = 1

sample_input = torch.rand(batch_size, seq_len, d_model)
encoder_layer = TransformerEncoderLayer(d_model, num_heads, d_ff)
positional_encoding = PositionalEncoding(d_model)
input_with_pos = positional_encoding(sample_input)
output = encoder_layer(input_with_pos)
print(output.shape)

2. tiny GPT-2 推理

from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

model_name = "sshleifer/tiny-gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
model.eval()

prompt = "Transformer is"
inputs = tokenizer(prompt, return_tensors="pt")
with torch.no_grad():
    outputs = model.generate(
        **inputs,
        max_new_tokens=32,
        do_sample=False,
        pad_token_id=tokenizer.eos_token_id,
    )
print(tokenizer.decode(outputs[0], skip_special_tokens=True))

3. 用 tiny BERT 替换 LLaMA 分类模型

from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

model_name = "prajjwal1/bert-tiny"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)

text = "This movie is fantastic!"
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=64)
with torch.no_grad():
    outputs = model(**inputs)
pred = torch.argmax(outputs.logits, dim=-1).item()
print(pred)

4. Qwen2.5-0.5B 使用建议

# Qwen2.5-0.5B 对 Jetson Nano 仍然偏大。
# 建议用 Ollama / llama.cpp / 远端服务做量化推理,Nano 只做调用端。
# Python 侧可以用 requests 调本地或局域网模型服务。

import requests

response = requests.post(
    "http://127.0.0.1:11434/api/generate",
    json={
        "model": "qwen2.5:0.5b",
        "prompt": "用三句话解释 Transformer 的自注意力机制。",
        "stream": False,
    },
    timeout=120,
)
print(response.json().get("response"))

适合 Jetson Nano 的改法

复习建议

  1. 先读 Char01 / Char02,理解 attention 和 CUDA。
  2. 再读 Char03 / Char09,理解微调和 prompt。
  3. 再读 Char05 / Char07 / Char08 / Char10,理解系统设计。
  4. 最后把这些代码改成 Jetson Nano 的 tiny 版本。

Page Source