code-appendix
Char*.txt 完整代码附录
本页把随书 Char01.txt 到 Char10.txt 中的主要代码示例按章节整理出来,方便直接复制、学习和改造。
说明:原书章节中既有解释文字,也有多段代码。这里优先收录能够直接运行或便于改造的核心代码,并保留原始注释风格。
Char01:Transformer 与微调基础
1. Transformer 编码器层
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
# 定义位置编码模块
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
self.encoding = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(np.log(10000.0) / d_model))
self.encoding[:, 0::2] = torch.sin(position * div_term)
self.encoding[:, 1::2] = torch.cos(position * div_term)
self.encoding = self.encoding.unsqueeze(0) # 增加批次维度
def forward(self, x):
seq_len = x.size(1)
return x + self.encoding[:, :seq_len, :].to(x.device)
# 定义多头注意力机制
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
assert d_model % num_heads == 0, "d_model必须能被num_heads整除"
self.d_model = d_model
self.num_heads = num_heads
self.depth = d_model // num_heads
self.q_linear = nn.Linear(d_model, d_model)
self.k_linear = nn.Linear(d_model, d_model)
self.v_linear = nn.Linear(d_model, d_model)
self.fc_out = nn.Linear(d_model, d_model)
def scaled_dot_product_attention(self, Q, K, V, mask=None):
matmul_qk = torch.matmul(Q, K.transpose(-2, -1))
dk = K.size(-1)
scaled_attention_logits = matmul_qk / torch.sqrt(torch.tensor(dk, dtype=torch.float32))
if mask is not None:
scaled_attention_logits += (mask * -1e9)
attention_weights = F.softmax(scaled_attention_logits, dim=-1)
output = torch.matmul(attention_weights, V)
return output, attention_weights
def split_heads(self, x, batch_size):
return x.view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
def forward(self, Q, K, V, mask=None):
batch_size = Q.size(0)
Q = self.q_linear(Q)
K = self.k_linear(K)
V = self.v_linear(V)
Q = self.split_heads(Q, batch_size)
K = self.split_heads(K, batch_size)
V = self.split_heads(V, batch_size)
attention, weights = self.scaled_dot_product_attention(Q, K, V, mask)
attention = attention.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
return self.fc_out(attention)
# 定义前馈神经网络
class FeedForwardNetwork(nn.Module):
def __init__(self, d_model, d_ff):
super(FeedForwardNetwork, self).__init__()
self.linear1 = nn.Linear(d_model, d_ff)
self.linear2 = nn.Linear(d_ff, d_model)
def forward(self, x):
return self.linear2(F.relu(self.linear1(x)))
# 定义Transformer编码器层
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super(TransformerEncoderLayer, self).__init__()
self.mha = MultiHeadAttention(d_model, num_heads)
self.ffn = FeedForwardNetwork(d_model, d_ff)
self.layernorm1 = nn.LayerNorm(d_model)
self.layernorm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
attn_output = self.mha(x, x, x, mask)
out1 = self.layernorm1(x + self.dropout(attn_output))
ffn_output = self.ffn(out1)
out2 = self.layernorm2(out1 + self.dropout(ffn_output))
return out2
# 测试Transformer编码器层
if __name__ == "__main__":
# 设置模型参数
d_model = 512
num_heads = 8
d_ff = 2048
seq_len = 10
batch_size = 2
# 模拟输入数据
sample_input = torch.rand(batch_size, seq_len, d_model)
mask = None # 暂不设置掩码
# 初始化模型
encoder_layer = TransformerEncoderLayer(d_model, num_heads, d_ff)
positional_encoding = PositionalEncoding(d_model)
# 加入位置编码并传入编码器
input_with_pos = positional_encoding(sample_input)
output = encoder_layer(input_with_pos, mask)
print("输入形状:", sample_input.shape)
print("输出形状:", output.shape)
2. 编码器-解码器
import torch
import torch.nn as nn
import torch.nn.functional as F
# 定义位置编码
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
self.encoding = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(
torch.log(torch.tensor(10000.0)) / d_model))
self.encoding[:, 0::2] = torch.sin(position * div_term)
self.encoding[:, 1::2] = torch.cos(position * div_term)
self.encoding = self.encoding.unsqueeze(0) # 扩展为批次维度
def forward(self, x):
return x + self.encoding[:, :x.size(1), :].to(x.device)
# 定义多头注意力机制
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
assert d_model % num_heads == 0, "d_model必须能被num_heads整除"
self.num_heads = num_heads
self.depth = d_model // num_heads
self.q_linear = nn.Linear(d_model, d_model)
self.k_linear = nn.Linear(d_model, d_model)
self.v_linear = nn.Linear(d_model, d_model)
self.fc_out = nn.Linear(d_model, d_model)
def scaled_dot_product_attention(self, Q, K, V, mask=None):
scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(
torch.tensor(self.depth, dtype=torch.float))
if mask is not None:
scores += mask * -1e9
attention_weights = F.softmax(scores, dim=-1)
output = torch.matmul(attention_weights, V)
return output, attention_weights
def forward(self, Q, K, V, mask=None):
batch_size = Q.size(0)
Q = self.q_linear(Q)
K = self.k_linear(K)
V = self.v_linear(V)
# 分多头
Q = Q.view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
K = K.view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
V = V.view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
attention, _ = self.scaled_dot_product_attention(Q, K, V, mask)
attention = attention.transpose(1, 2).contiguous().view(
batch_size, -1, self.num_heads * self.depth)
return self.fc_out(attention)
# 定义编码器
class Encoder(nn.Module):
def __init__(self, input_dim, d_model, num_heads, num_layers, d_ff, max_len):
super(Encoder, self).__init__()
self.embedding = nn.Embedding(input_dim, d_model)
self.pos_encoding = PositionalEncoding(d_model, max_len)
self.layers = nn.ModuleList([
nn.Sequential(
MultiHeadAttention(d_model, num_heads),
nn.LayerNorm(d_model),
FeedForward(d_model, d_ff),
nn.LayerNorm(d_model)
)
for _ in range(num_layers)
])
def forward(self, x, mask=None):
x = self.embedding(x)
x = self.pos_encoding(x)
for mha, norm1, ffn, norm2 in self.layers:
attn_output = mha(x, x, x, mask)
x = norm1(x + attn_output)
ffn_output = ffn(x)
x = norm2(x + ffn_output)
return x
# 定义前馈神经网络
class FeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super(FeedForward, self).__init__()
self.linear1 = nn.Linear(d_model, d_ff)
self.linear2 = nn.Linear(d_ff, d_model)
def forward(self, x):
return self.linear2(F.relu(self.linear1(x)))
# 定义解码器
class Decoder(nn.Module):
def __init__(self, output_dim, d_model, num_heads, num_layers, d_ff, max_len):
super(Decoder, self).__init__()
self.embedding = nn.Embedding(output_dim, d_model)
self.pos_encoding = PositionalEncoding(d_model, max_len)
self.layers = nn.ModuleList([
nn.Sequential(
MultiHeadAttention(d_model, num_heads),
nn.LayerNorm(d_model),
MultiHeadAttention(d_model, num_heads),
nn.LayerNorm(d_model),
FeedForward(d_model, d_ff),
nn.LayerNorm(d_model)
)
for _ in range(num_layers)
])
def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
x = self.embedding(x)
x = self.pos_encoding(x)
for mha1, norm1, mha2, norm2, ffn, norm3 in self.layers:
attn1 = mha1(x, x, x, tgt_mask)
x = norm1(x + attn1)
attn2 = mha2(x, enc_output, enc_output, src_mask)
x = norm2(x + attn2)
ffn_output = ffn(x)
x = norm3(x + ffn_output)
return x
# 编码器-解码器模型测试
if __name__ == "__main__":
input_dim = 1000 # 输入词汇量
output_dim = 1000 # 输出词汇量
d_model = 512
num_heads = 8
num_layers = 2
d_ff = 2048
max_len = 100
src_seq = torch.randint(0, input_dim, (2, 10))
tgt_seq = torch.randint(0, output_dim, (2, 10))
encoder = Encoder(input_dim, d_model, num_heads, num_layers, d_ff, max_len)
decoder = Decoder(output_dim, d_model, num_heads, num_layers, d_ff, max_len)
enc_output = encoder(src_seq)
dec_output = decoder(tgt_seq, enc_output)
print("Encoder output shape:", enc_output.shape)
print("Decoder output shape:", dec_output.shape)
3. 多头注意力机制
import torch
import torch.nn as nn
class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, mask=None):
scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(K.size(-1), dtype=torch.float32))
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
attention_weights = torch.softmax(scores, dim=-1)
output = torch.matmul(attention_weights, V)
return output, attention_weights
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
assert d_model % num_heads == 0, "d_model必须能被num_heads整除"
self.num_heads = num_heads # 注意力头数量
self.depth = d_model // num_heads
self.q_linear = nn.Linear(d_model, d_model)
self.k_linear = nn.Linear(d_model, d_model)
self.v_linear = nn.Linear(d_model, d_model)
self.fc_out = nn.Linear(d_model, d_model)
def split_heads(self, x, batch_size):
"""
将 d_model 维度拆分为 num_heads 个 head,每个 head 的维度为 depth
"""
x = x.view(batch_size, -1, self.num_heads, self.depth)
return x.transpose(1, 2)
def forward(self, Q, K, V, mask=None):
batch_size = Q.size(0)
# 线性变换并拆分为多头
Q = self.split_heads(self.q_linear(Q), batch_size)
K = self.split_heads(self.k_linear(K), batch_size)
V = self.split_heads(self.v_linear(V), batch_size)
# 计算注意力输出
attention, weights = ScaledDotProductAttention()(Q, K, V, mask)
# 将多头结果拼接回去
attention = attention.transpose(1, 2).contiguous().view(
batch_size, -1, self.num_heads * self.depth)
# 通过最终线性层
output = self.fc_out(attention)
return output, weights
if __name__ == "__main__":
# 设置参数
d_model = 512 # 嵌入维度
num_heads = 8 # 注意力头数量
seq_len = 10 # 序列长度
batch_size = 2 # 批次大小
# 模拟输入数据
Q = torch.rand(batch_size, seq_len, d_model)
K = torch.rand(batch_size, seq_len, d_model)
V = torch.rand(batch_size, seq_len, d_model)
# 初始化多头注意力机制
multi_head_attention = MultiHeadAttention(d_model, num_heads)
# 传入数据
output, attention_weights = multi_head_attention(Q, K, V)
print("输出形状:", output.shape)
print("注意力权重形状:", attention_weights.shape)
4. LoRA 微调
import torch
import torch.nn as nn
from transformers import (AutoModelForCausalLM, AutoTokenizer,
Trainer, TrainingArguments)
from datasets import Dataset
# 定义 LoRA 模块
class LoRAModule(nn.Module):
def __init__(self, input_dim, output_dim, rank=4):
super(LoRAModule, self).__init__()
self.A = nn.Linear(input_dim, rank, bias=False) # 低秩矩阵 A
self.B = nn.Linear(rank, output_dim, bias=False) # 低秩矩阵 B
def forward(self, x):
return self.B(self.A(x))
# 将 LoRA 注入到模型的线性层中
class GPTWithLoRA(nn.Module):
def __init__(self, base_model, rank=4):
super(GPTWithLoRA, self).__init__()
self.base_model = base_model
self.lora_modules = nn.ModuleDict() # 用于存储 LoRA 模块
for name, module in base_model.named_modules():
if isinstance(module, nn.Linear): # 仅在线性层中插入 LoRA
input_dim, output_dim = module.in_features, module.out_features
self.lora_modules[name] = LoRAModule(input_dim, output_dim, rank)
def forward(self, input_ids, attention_mask=None, labels=None):
outputs = self.base_model(input_ids=input_ids,
attention_mask=attention_mask, labels=labels)
lora_output = 0
for name, lora in self.lora_modules.items():
base_output = dict(self.base_model.named_modules())[name](
outputs.last_hidden_state)
lora_output += lora(base_output)
return outputs.loss, lora_output
# 加载预训练模型和分词器
tokenizer = AutoTokenizer.from_pretrained("gpt2")
base_model = AutoModelForCausalLM.from_pretrained("gpt2")
# 初始化带 LoRA 的模型
lora_model = GPTWithLoRA(base_model, rank=4)
# 打印模型结构
print(lora_model)
# 准备训练数据
data = {
"text": [
"今天是个好天气。",
"我喜欢用GPT模型学习。",
"微调技术让模型更加灵活。",
"LoRA 技术是一种高效的微调方法。",
"通过低秩矩阵分解减少参数量。"
]
}
# 转换为 Hugging Face 数据集
dataset = Dataset.from_dict(data)
def preprocess_function(examples):
return tokenizer(examples["text"], truncation=True,
padding="max_length", max_length=64)
tokenized_dataset = dataset.map(preprocess_function, batched=True)
# 定义训练参数
training_args = TrainingArguments(
output_dir="./results",
per_device_train_batch_size=4,
num_train_epochs=3,
logging_dir="./logs",
save_strategy="epoch",
logging_steps=10
)
# 使用 Hugging Face Trainer 进行训练
trainer = Trainer(
model=lora_model,
args=training_args,
train_dataset=tokenized_dataset,
tokenizer=tokenizer,
)
# 开始训练
print("开始训练 LoRA 微调模型...")
trainer.train()
# 保存微调后的模型
print("保存模型...")
lora_model.save_pretrained("./lora_finetuned_model")
tokenizer.save_pretrained("./lora_finetuned_model")
# 加载微调后的模型进行推理
print("加载微调模型进行推理...")
finetuned_model = AutoModelForCausalLM.from_pretrained(
"./lora_finetuned_model")
finetuned_tokenizer = AutoTokenizer.from_pretrained(
"./lora_finetuned_model")
# 测试推理
test_text = "GPT 模型的优点是"
input_ids = finetuned_tokenizer(test_text, return_tensors="pt").input_ids
output = finetuned_model.generate(input_ids, max_length=50)
result = finetuned_tokenizer.decode(output[0], skip_special_tokens=True)
print("输入文本:", test_text)
print("生成结果:", result)
Char02:CUDA 与 PyTorch 加速
1. 矩阵运算与优化
import torch
import time
# 检查CUDA设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"当前设备: {device}")
# 初始化矩阵
def initialize_matrices(size):
# 随机生成两个矩阵
A = torch.randn(size, size, device=device)
B = torch.randn(size, size, device=device)
return A, B
# 矩阵乘法
def matrix_multiplication(A, B):
start_time = time.time()
C = torch.mm(A, B) # 使用CUDA加速的矩阵乘法
torch.cuda.synchronize() # 等待所有CUDA操作完成
end_time = time.time()
return C, end_time - start_time
# 优化矩阵乘法: 分块计算
def optimized_matrix_multiplication(A, B, block_size):
start_time = time.time()
size = A.size(0)
C = torch.zeros(size, size, device=device) # 初始化结果矩阵
for i in range(0, size, block_size):
for j in range(0, size, block_size):
for k in range(0, size, block_size):
# 提取块并进行计算
A_block = A[i:i+block_size, k:k+block_size]
B_block = B[k:k+block_size, j:j+block_size]
C[i:i+block_size, j:j+block_size] += torch.mm(A_block, B_block)
torch.cuda.synchronize() # 等待所有CUDA操作完成
end_time = time.time()
return C, end_time - start_time
# 验证结果
def verify_results(C1, C2):
return torch.allclose(C1, C2, atol=1e-5)
# 主函数
if __name__ == "__main__":
matrix_size = 1024 # 定义矩阵大小
block_size = 128 # 定义分块大小
print(f"初始化{matrix_size}x{matrix_size}矩阵...")
A, B = initialize_matrices(matrix_size)
print("开始标准矩阵乘法...")
C_standard, time_standard = matrix_multiplication(A, B)
print(f"标准矩阵乘法耗时: {time_standard:.4f} 秒")
print("开始优化矩阵乘法...")
C_optimized, time_optimized = optimized_matrix_multiplication(A, B, block_size)
print(f"优化矩阵乘法耗时: {time_optimized:.4f} 秒")
print("验证两种方法的结果是否一致...")
if verify_results(C_standard, C_optimized):
print("结果一致!")
else:
print("结果不一致!")
print("矩阵乘法性能对比:")
print(f"标准方法耗时: {time_standard:.4f} 秒")
print(f"优化方法耗时: {time_optimized:.4f} 秒")
print(f"加速比: {time_standard / time_optimized:.2f} 倍")
2. CUDA 内核性能调优与工具使用
import torch
import time
# 检查CUDA是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"当前设备: {device}")
# CUDA内核实现矩阵加法
def cuda_matrix_addition(A, B, block_size):
"""
使用CUDA加速的矩阵加法
"""
# 获取矩阵大小
size = A.size(0)
# 创建结果矩阵
C = torch.zeros(size, size, device=device)
# 使用分块方式执行加法操作
for i in range(0, size, block_size):
for j in range(0, size, block_size):
C[i:i+block_size, j:j+block_size] = A[i:i+block_size,
j:j+block_size] + B[i:i+block_size, j:j+block_size]
return C
# 优化版本:引入共享内存模拟
def optimized_matrix_addition(A, B, block_size):
"""
优化的矩阵加法,模拟共享内存优化
"""
size = A.size(0)
C = torch.zeros(size, size, device=device)
# 以分块方式进行加法
for i in range(0, size, block_size):
for j in range(0, size, block_size):
# 加载块到局部变量,模拟共享内存的作用
A_block = A[i:i+block_size, j:j+block_size]
B_block = B[i:i+block_size, j:j+block_size]
# 进行加法操作
C[i:i+block_size, j:j+block_size] = A_block + B_block
return C
# 性能测试函数
def measure_performance(func, *args):
torch.cuda.synchronize() # 同步CUDA操作
start_time = time.time()
result = func(*args)
torch.cuda.synchronize() # 同步完成所有操作
end_time = time.time()
return result, end_time - start_time
# 主函数
if __name__ == "__main__":
# 初始化矩阵
size = 2048 # 矩阵大小
block_size = 128 # 分块大小
A = torch.randn(size, size, device=device)
B = torch.randn(size, size, device=device)
print(f"初始化{size}x{size}矩阵完成,使用分块大小: {block_size}")
# 测试标准CUDA加法
print("开始标准CUDA加法...")
result_standard, time_standard = measure_performance(cuda_matrix_addition, A, B, block_size)
print(f"标准CUDA加法耗时: {time_standard:.4f} 秒")
# 测试优化版本
print("开始优化CUDA加法...")
result_optimized, time_optimized = measure_performance(optimized_matrix_addition, A, B, block_size)
print(f"优化CUDA加法耗时: {time_optimized:.4f} 秒")
# 验证结果
print("验证结果一致性...")
if torch.allclose(result_standard, result_optimized):
print("结果一致!")
else:
print("结果不一致!")
Char03:微调任务与数据准备
1. 分类、生成与问答
from transformers import (AutoTokenizer, AutoModelForSequenceClassification,
Trainer, TrainingArguments)
from datasets import load_dataset
import torch
# 加载预训练模型和分词器
model_name = "facebook/llama-3b"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(
model_name, num_labels=2)
# 加载数据集
dataset = load_dataset("imdb")
dataset = dataset.shuffle(seed=42)
# 数据预处理
def preprocess_function(examples):
return tokenizer(examples["text"], truncation=True,
padding="max_length", max_length=128)
encoded_dataset = dataset.map(preprocess_function, batched=True)
encoded_dataset = encoded_dataset.rename_column("label", "labels")
encoded_dataset.set_format("torch")
# 定义训练参数
training_args = TrainingArguments(
output_dir="./results",
evaluation_strategy="epoch",
learning_rate=2e-5,
per_device_train_batch_size=8,
per_device_eval_batch_size=8,
num_train_epochs=1,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=10,
save_total_limit=2,
load_best_model_at_end=True,
)
# 定义Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=encoded_dataset["train"].select(range(1000)),
eval_dataset=encoded_dataset["test"].select(range(500)),
tokenizer=tokenizer,
)
# 开始训练
trainer.train()
# 测试推理
text = "This movie is fantastic!"
inputs = tokenizer(text, return_tensors="pt", truncation=True,
padding="max_length", max_length=128)
outputs = model(**inputs)
logits = outputs.logits
prediction = torch.argmax(logits, dim=1).item()
labels = ["negative", "positive"]
print(f"分类结果: {labels[prediction]}")
from transformers import AutoTokenizer, AutoModelForCausalLM
# 加载模型和分词器
model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 文本生成
prompt = "Once upon a time in a distant galaxy,"
inputs = tokenizer(prompt, return_tensors="pt")
outputs = model.generate(inputs["input_ids"], max_length=50, num_beams=5,
no_repeat_ngram_size=2, early_stopping=True)
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(f"生成结果: {generated_text}")
from transformers import pipeline
# 加载问答管道
qa_pipeline = pipeline("question-answering", model=model_name,
tokenizer=tokenizer)
# 定义上下文和问题
context = """
机器学习是人工智能的一个分支,主要研究如何让计算机从数据中学习。近年来,深度学习成为机器学习的一个重要方向。
"""
question = "深度学习属于哪个领域?"
# 问答推理
result = qa_pipeline(question=question, context=context)
print(f"问答结果: {result['answer']}")
2. 数据准备与预处理
from datasets import load_dataset, Dataset
from transformers import AutoTokenizer
import torch
# 1. 数据加载
print("加载IMDb数据集...")
raw_dataset = load_dataset("imdb")
print(f"原始数据集结构: {raw_dataset}")
# 2. 数据清洗:筛选有意义的样本(可选)
print("筛选样本...")
raw_dataset = raw_dataset.filter(lambda x: len(x["text"]) > 0)
print(f"清洗后数据集大小: {raw_dataset['train'].num_rows}")
# 3. 数据集拆分(训练集和验证集)
print("拆分数据集...")
raw_dataset = raw_dataset["train"].train_test_split(test_size=0.1)
train_dataset = raw_dataset["train"]
val_dataset = raw_dataset["test"]
print(f"训练集大小: {len(train_dataset)}, 验证集大小: {len(val_dataset)}")
# 4. 加载分词器
model_name = "facebook/llama-3b"
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 5. 定义数据预处理函数
def preprocess_function(examples):
return tokenizer(
examples["text"],
truncation=True,
padding="max_length",
max_length=128,
)
# 6. 应用分词和编码
print("对训练集进行分词和编码...")
train_dataset = train_dataset.map(preprocess_function, batched=True)
val_dataset = val_dataset.map(preprocess_function, batched=True)
# 7. 删除原始文本列
train_dataset = train_dataset.remove_columns(["text"])
val_dataset = val_dataset.remove_columns(["text"])
# 8. 设置张量格式
train_dataset.set_format("torch")
val_dataset.set_format("torch")
print("数据准备完成")
Char04:量化、部署与性能测试
1. FastAPI 部署微调模型
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
# 1. 初始化FastAPI应用
app = FastAPI()
# 2. 加载微调后的模型和分词器
model_name = "gemma/gpt-7b-finetuned" # 假设这是微调后的模型路径
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
# 3. 定义请求体和响应体的数据模型
class RequestData(BaseModel):
text: str
class ResponseData(BaseModel):
sentiment: str
confidence: float
# 4. 定义情感分析接口
@app.post("/analyze", response_model=ResponseData)
async def analyze_sentiment(data: RequestData):
try:
# 文本分词与编码
inputs = tokenizer(
data.text,
return_tensors="pt",
truncation=True,
padding="max_length",
max_length=128,
)
# 模型推理
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probabilities = torch.softmax(logits, dim=1).squeeze()
sentiment = "positive" if torch.argmax(logits).item() == 1 else "negative"
confidence = probabilities.max().item()
return ResponseData(sentiment=sentiment, confidence=confidence)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 启动服务:uvicorn app:app --reload
2. 接口性能测试
# test_performance.py
import requests
import time
# API地址
url = "http://127.0.0.1:8000/analyze"
# 测试样本
test_texts = [
"这款商品非常不错,我很满意!",
"物流速度太慢了,真让人失望。",
"客服态度很好,帮助解决了问题,谢谢!",
"质量一般,不值这个价钱。",
"整体体验不错,值得推荐。",
] * 100 # 模拟500次请求
# 性能测试
start_time = time.time()
responses = []
for text in test_texts:
response = requests.post(url, json={"text": text})
if response.status_code == 200:
responses.append(response.json())
else:
print(f"请求失败: {response.status_code}")
end_time = time.time()
# 打印结果统计
print(f"总请求数: {len(test_texts)}")
print(f"成功响应数: {len(responses)}")
print(f"总耗时: {end_time - start_time:.2f} 秒")
print(f"平均响应时间: {(end_time-start_time) / len(test_texts):.2f} 秒/请求")
Char05:智能客服
1. 对话生成、FAQ 匹配、情感检测
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoModel, pipeline
# 对话生成通过预训练语言模型(如GPT系列)实现
model_name = "gpt2" # 可替换为适合对话的模型
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
model.eval()
def generate_response(input_text):
# 编码输入
inputs = tokenizer(input_text, return_tensors="pt",
max_length=128, truncation=True)
# 生成响应
with torch.no_grad():
outputs = model.generate(
input_ids=inputs["input_ids"],
max_length=50,
num_return_sequences=1,
pad_token_id=tokenizer.eos_token_id
)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
return response
# 示例对话生成
input_text = "这款商品支持退货吗?"
response = generate_response(input_text)
print(f"输入: {input_text}")
print(f"生成的响应: {response}")
# 问题匹配通过嵌入向量和余弦相似度实现
embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"
embedding_tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)
embedding_model = AutoModel.from_pretrained(embedding_model_name)
embedding_model.eval()
# 生成嵌入向量
def generate_embedding(text):
inputs = embedding_tokenizer(text, return_tensors="pt",
padding=True, truncation=True, max_length=128)
with torch.no_grad():
outputs = embedding_model(**inputs)
return outputs.last_hidden_state.mean(dim=1).numpy()
# FAQ 数据库
faq_questions = [
"这款商品支持退货吗?",
"支付方式有哪些?",
"订单多久发货?"
]
faq_answers = [
"支持7天无理由退货。",
"支持微信、支付宝和信用卡支付。",
"订单通常1-2天内发货。"
]
# 查询匹配
def match_question(user_question):
user_embedding = generate_embedding(user_question)
faq_embeddings = [generate_embedding(q) for q in faq_questions]
similarities = [cosine_similarity(user_embedding,
faq_emb)[0][0] for faq_emb in faq_embeddings]
best_match_index = similarities.index(max(similarities))
return faq_answers[best_match_index]
# 示例问题匹配
user_question = "如何付款?"
matched_answer = match_question(user_question)
print(f"用户问题: {user_question}")
print(f"匹配答案: {matched_answer}")
# 用户情绪检测
sentiment_analyzer = pipeline("sentiment-analysis")
def detect_emotion(user_input):
analysis = sentiment_analyzer(user_input)
return analysis[0]["label"], analysis[0]["score"]
user_input = "这服务太差了!"
emotion, confidence = detect_emotion(user_input)
print(emotion, confidence)
Char06:代码助手
1. 代码生成任务微调
from transformers import (AutoTokenizer, AutoModelForCausalLM,
Trainer, TrainingArguments)
from datasets import Dataset
import torch
# 1. 加载预训练模型和分词器
model_name = "Salesforce/codegen-350M-multi" # 用于代码生成的模型
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
# 2. 准备训练数据
train_data = {
"text": [
"编写一个函数判断一个数是否是素数。",
"实现一个冒泡排序算法。"
]
}
dataset = Dataset.from_dict(train_data)
# 3. Tokenization:为模型输入格式化数据
def preprocess_function(examples):
return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=128)
tokenized_dataset = dataset.map(preprocess_function, batched=True)
# 4. 定义训练参数
training_args = TrainingArguments(
output_dir="./results",
per_device_train_batch_size=2,
num_train_epochs=1,
logging_steps=10,
)
# 5. 开始微调训练
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset,
tokenizer=tokenizer,
)
trainer.train()
# 6. 测试模型
test_prompt = "编写一个函数判断一个数是否是素数。"
inputs = tokenizer(test_prompt, return_tensors="pt")
outputs = model.generate(inputs["input_ids"], max_length=128)
print(tokenizer.decode(outputs[0], skip_special_tokens=True))
2. 错误检测与修复模型实现
from transformers import (AutoTokenizer, AutoModelForSeq2SeqLM,
Trainer, TrainingArguments)
from datasets import Dataset
# 1. 加载预训练模型和分词器
model_name = "t5-small" # T5 模型适合序列到序列任务,包括代码修复
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
# 2. 准备数据集
train_data = {
"input": ["def multiply(a, b return a * b"],
"target": ["def multiply(a, b):\n return a * b"]
}
dataset = Dataset.from_dict(train_data)
# 3. Tokenization:将输入和输出编码为模型可处理的格式
def preprocess_function(examples):
inputs = tokenizer(examples["input"], truncation=True, padding="max_length", max_length=128)
targets = tokenizer(examples["target"], truncation=True, padding="max_length", max_length=128)
inputs["labels"] = targets["input_ids"]
return inputs
tokenized_dataset = dataset.map(preprocess_function, batched=True)
# 4. 训练参数
training_args = TrainingArguments(
output_dir="./results",
per_device_train_batch_size=2,
num_train_epochs=1,
)
# 5. 开始微调训练
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset,
tokenizer=tokenizer,
)
trainer.train()
# 6. 测试模型修复能力
test_code = "def multiply(a, b return a * b" # 缺少括号
inputs = tokenizer(test_code, return_tensors="pt")
outputs = model.generate(inputs["input_ids"], max_length=128)
print(tokenizer.decode(outputs[0], skip_special_tokens=True))
Char07:向量数据库与语义搜索
1. Sentence-BERT 生成向量
from sentence_transformers import SentenceTransformer
import numpy as np
# 加载预训练的 Sentence-BERT 模型
model_name = "paraphrase-MiniLM-L6-v2" # 模型名称,可根据需要更换
model = SentenceTransformer(model_name)
# 示例文本数据
texts = ["今天天气很好", "我喜欢机器学习", "向量数据库很有用"]
embeddings = model.encode(texts)
print(embeddings.shape)
2. 语义搜索
from sentence_transformers import SentenceTransformer
import numpy as np
# 加载 Sentence-BERT 模型
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
# 示例数据
documents = ["Jetson Nano 是一款边缘计算开发板。", "Transformer 依赖自注意力机制。", "Milvus 可以做向量检索。"]
query = "什么是向量检索?"
# 编码
corpus_embeddings = model.encode(documents)
query_embedding = model.encode([query])[0]
# 计算相似度
def cosine_similarity(a, b):
a = np.array(a)
b = np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
scores = [cosine_similarity(query_embedding, doc) for doc in corpus_embeddings]
best_index = int(np.argmax(scores))
print(documents[best_index])
Char08:硬件开发助手
1. 硬件领域微调
from transformers import AutoModelForMaskedLM, Trainer, TrainingArguments
# 加载预训练模型
model = AutoModelForMaskedLM.from_pretrained("bert-base-uncased")
# 训练参数
training_args = TrainingArguments(
output_dir="./hardware_model",
per_device_train_batch_size=4,
num_train_epochs=1,
)
# 训练模型
trainer = Trainer(
model=model,
args=training_args,
train_dataset=None,
)
print("开始微调模型...")
trainer.train()
print("模型微调完成")
# 保存微调后的模型
model.save_pretrained("./hardware_model")
print("微调后的模型已保存")
Char09:提示词优化
1. T5 提示词优化
from transformers import (T5Tokenizer, T5ForConditionalGeneration,
Trainer, TrainingArguments)
# 加载T5模型和分词器
model_name = "t5-small"
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name)
# 微调训练参数
training_args = TrainingArguments(
output_dir="./results",
evaluation_strategy="epoch",
learning_rate=5e-5,
per_device_train_batch_size=4,
num_train_epochs=2,
)
# 模型训练
trainer = Trainer(
model=model,
args=training_args,
train_dataset=None,
)
trainer.train()
2. 对比学习
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from torch.utils.data import DataLoader
import torch
# 模型加载
model_name = "t5-small"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
# 训练模型
def train_contrastive_learning(model, dataloader, tokenizer):
for batch in dataloader:
# forward / loss / backward
pass
# 测试对比学习模型效果
def test_prompt_similarity(model, tokenizer, prompt_1, prompt_2):
pass
Char10:文档翻译与翻译 Agent
1. 翻译模型加载
from transformers import MarianTokenizer, MarianMTModel, pipeline
# 加载翻译模型
def load_translation_model():
"""
加载MarianMT模型和分词器
返回:
translator: 翻译模型管道
"""
model_name = "Helsinki-NLP/opus-mt-en-zh" # 英文到中文翻译模型
tokenizer = MarianTokenizer.from_pretrained(model_name)
model = MarianMTModel.from_pretrained(model_name)
translator = pipeline("translation", model=model, tokenizer=tokenizer)
print("翻译模型加载成功")
return translator
2. 划词翻译
from transformers import MarianTokenizer, MarianMTModel, pipeline
# 加载翻译模型
model_name = "Helsinki-NLP/opus-mt-en-zh"
3. 翻译结果增强
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
original_embedding = model.encode([original])
4. BLEU / ROUGE 评估
from nltk.translate.bleu_score import sentence_bleu
# 加载翻译模型
def load_model():
pass
Jetson Nano 适配代码
1. tiny Transformer 编码器参数
# Jetson Nano 建议先把 Char01 中 d_model=512 的示例缩小
# 这样可以保留 Transformer 结构,但显著减少内存占用和计算量
d_model = 64
num_heads = 4
d_ff = 256
seq_len = 16
batch_size = 1
sample_input = torch.rand(batch_size, seq_len, d_model)
encoder_layer = TransformerEncoderLayer(d_model, num_heads, d_ff)
positional_encoding = PositionalEncoding(d_model)
input_with_pos = positional_encoding(sample_input)
output = encoder_layer(input_with_pos)
print(output.shape)
2. tiny GPT-2 推理
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
model_name = "sshleifer/tiny-gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
model.eval()
prompt = "Transformer is"
inputs = tokenizer(prompt, return_tensors="pt")
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=32,
do_sample=False,
pad_token_id=tokenizer.eos_token_id,
)
print(tokenizer.decode(outputs[0], skip_special_tokens=True))
3. 用 tiny BERT 替换 LLaMA 分类模型
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
model_name = "prajjwal1/bert-tiny"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
text = "This movie is fantastic!"
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=64)
with torch.no_grad():
outputs = model(**inputs)
pred = torch.argmax(outputs.logits, dim=-1).item()
print(pred)
4. Qwen2.5-0.5B 使用建议
# Qwen2.5-0.5B 对 Jetson Nano 仍然偏大。
# 建议用 Ollama / llama.cpp / 远端服务做量化推理,Nano 只做调用端。
# Python 侧可以用 requests 调本地或局域网模型服务。
import requests
response = requests.post(
"http://127.0.0.1:11434/api/generate",
json={
"model": "qwen2.5:0.5b",
"prompt": "用三句话解释 Transformer 的自注意力机制。",
"stream": False,
},
timeout=120,
)
print(response.json().get("response"))
适合 Jetson Nano 的改法
- 所有
batch_size=4/8改成1。 - 所有大模型如
llama-3b、gpt2、codegen-350M、gemma/gpt-7b-finetuned改成 tiny 模型或远端 API。 - 先做前向推理,再做极小数据训练。
- 用
Qwen2.5-0.5B时优先做量化推理,不在 Nano 上训练。
复习建议
- 先读 Char01 / Char02,理解 attention 和 CUDA。
- 再读 Char03 / Char09,理解微调和 prompt。
- 再读 Char05 / Char07 / Char08 / Char10,理解系统设计。
- 最后把这些代码改成 Jetson Nano 的 tiny 版本。
Page Source