基于PassGAN的密码训练系统设计与实现
密码安全一直是网络安全领域的核心问题。随着网络攻击手段的不断演进,传统的密码强度检测方法已逐渐显露出局限性。基于规则的密码强度检查器往往无法有效识别复杂的密码模式,而基于黑名单的方法又难以应对新型的密码攻击。在这种背景下,利用人工智能技术特别是生成对抗网络(GAN)来分析和生成密码模式,为密码安全研究提供了新的方向。PassGAN是一种基于生成对抗网络的密码生成模型,它能够通过学习真实密码数据集中
基于PassGAN的密码训练系统设计与实现
1. 项目概述与背景
密码安全一直是网络安全领域的核心问题。随着网络攻击手段的不断演进,传统的密码强度检测方法已逐渐显露出局限性。基于规则的密码强度检查器往往无法有效识别复杂的密码模式,而基于黑名单的方法又难以应对新型的密码攻击。在这种背景下,利用人工智能技术特别是生成对抗网络(GAN)来分析和生成密码模式,为密码安全研究提供了新的方向。
PassGAN是一种基于生成对抗网络的密码生成模型,它能够通过学习真实密码数据集中的统计规律,生成类似真实密码的样本。与传统方法不同,PassGAN不需要手工制定密码生成规则,而是通过无监督学习自动发现密码的分布特征。这使得PassGAN能够生成更加多样化和符合真实情况的密码样本,可用于密码强度评估、密码破解测试和密码策略改进等多个领域。
本项目旨在构建一个完整的基于PassGAN的密码训练系统,该系统将实现数据预处理、模型训练、密码生成和评估等完整流程,为密码安全研究提供一个强大的工具。
2. 系统架构设计
2.1 整体架构
本系统采用模块化设计,主要包含以下核心组件:
- 数据预处理模块:负责密码数据的清洗、格式化和向量化
- 模型定义模块:实现PassGAN的生成器和判别器网络结构
- 训练模块:管理GAN的训练过程,包括损失计算和参数优化
- 生成模块:使用训练好的模型生成密码样本
- 评估模块:对生成的密码质量进行定量和定性评估
- 用户界面模块:提供命令行和可视化界面供用户交互
2.2 技术栈选择
- 深度学习框架:PyTorch(灵活性强,动态图机制适合研究)
- 数据处理:Pandas + Numpy
- 可视化:Matplotlib + Seaborn
- 进度显示:tqdm
- 并行处理:多进程/多线程(可选)
2.3 数据流设计
系统数据流遵循以下流程:
原始密码数据 → 数据清洗 → 字符编码 → 批量训练 → 模型保存 → 密码生成 → 结果评估
3. 环境配置与依赖管理
3.1 Python环境设置
# 创建conda环境
conda create -n passgan python=3.8
conda activate passgan
# 安装核心依赖
pip install torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html
pip install pandas numpy matplotlib seaborn tqdm scikit-learn
# 项目目录结构
"""
passgan-system/
├── data/
│ ├── raw/ # 原始数据
│ ├── processed/ # 处理后的数据
│ └── generated/ # 生成的密码
├── models/ # 模型保存目录
├── src/
│ ├── preprocessing/ # 数据预处理
│ ├── models/ # 模型定义
│ ├── training/ # 训练逻辑
│ ├── generation/ # 密码生成
│ ├── evaluation/ # 评估模块
│ └── utils/ # 工具函数
├── configs/ # 配置文件
├── tests/ # 测试代码
└── docs/ # 文档
"""
3.2 配置文件设计
# configs/default.yaml
data:
input_path: "data/raw/rockyou.txt"
output_path: "data/processed/encoded_passwords.npy"
min_length: 4
max_length: 16
train_ratio: 0.8
vocab_size: 256 # ASCII字符集
model:
latent_dim: 100
gen_hidden_dim: 512
disc_hidden_dim: 512
seq_length: 16
training:
batch_size: 64
epochs: 1000
gen_learning_rate: 0.0002
disc_learning_rate: 0.0002
beta1: 0.5
beta2: 0.999
disc_iterations: 1 # 判别器训练次数
sample_interval: 100
generation:
num_samples: 10000
temperature: 0.8
evaluation:
test_size: 1000
metrics: ["unique_ratio", "novelty", "similarity"]
4. 数据预处理模块
4.1 数据加载与清洗
# src/preprocessing/data_loader.py
import pandas as pd
import numpy as np
import re
from tqdm import tqdm
import logging
class PasswordDataLoader:
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(__name__)
def load_raw_data(self, file_path):
"""加载原始密码数据"""
self.logger.info(f"Loading data from {file_path}")
try:
with open(file_path, 'r', encoding='latin-1') as f:
passwords = [line.strip() for line in f.readlines()]
return passwords
except Exception as e:
self.logger.error(f"Error loading data: {e}")
raise
def clean_data(self, passwords):
"""清洗数据,过滤无效密码"""
self.logger.info("Cleaning data...")
cleaned = []
for pwd in tqdm(passwords):
# 过滤空密码和过长密码
if not pwd or len(pwd) < self.config['data']['min_length']:
continue
if len(pwd) > self.config['data']['max_length']:
continue
# 可选:过滤非ASCII字符
if self.config['data'].get('ascii_only', True):
try:
pwd.encode('ascii')
except UnicodeEncodeError:
continue
cleaned.append(pwd)
self.logger.info(f"Original: {len(passwords)}, Cleaned: {len(cleaned)}")
return cleaned
def analyze_dataset(self, passwords):
"""分析数据集特征"""
self.logger.info("Analyzing dataset...")
lengths = [len(p) for p in passwords]
stats = {
'total_count': len(passwords),
'avg_length': np.mean(lengths),
'min_length': np.min(lengths),
'max_length': np.max(lengths),
'unique_ratio': len(set(passwords)) / len(passwords)
}
self.logger.info(f"Dataset stats: {stats}")
return stats
4.2 字符编码与向量化
# src/preprocessing/encoder.py
import numpy as np
from collections import Counter
class PasswordEncoder:
def __init__(self, config):
self.config = config
self.char_to_idx = {}
self.idx_to_char = {}
self.vocab_size = config['data']['vocab_size']
self.seq_length = config['model']['seq_length']
self.build_vocab()
def build_vocab(self):
"""构建字符词汇表"""
# ASCII字符集
for i in range(self.vocab_size):
self.char_to_idx[chr(i)] = i
self.idx_to_char[i] = chr(i)
# 添加特殊令牌
self.pad_token = '<PAD>'
self.start_token = '<START>'
self.end_token = '<END>'
self.unk_token = '<UNK>'
special_tokens = [self.pad_token, self.start_token, self.end_token, self.unk_token]
for idx, token in enumerate(special_tokens):
actual_idx = self.vocab_size + idx
self.char_to_idx[token] = actual_idx
self.idx_to_char[actual_idx] = token
def encode(self, password):
"""将密码编码为数字序列"""
encoded = []
# 添加开始令牌
encoded.append(self.char_to_idx[self.start_token])
for char in password:
if char in self.char_to_idx:
encoded.append(self.char_to_idx[char])
else:
encoded.append(self.char_to_idx[self.unk_token])
# 添加结束令牌
encoded.append(self.char_to_idx[self.end_token])
# 填充或截断
if len(encoded) < self.seq_length:
encoded.extend([self.char_to_idx[self.pad_token]] * (self.seq_length - len(encoded)))
else:
encoded = encoded[:self.seq_length-1] + [self.char_to_idx[self.end_token]]
return encoded
def decode(self, encoded_seq):
"""将数字序列解码为密码"""
password = []
for idx in encoded_seq:
if idx in self.idx_to_char:
char = self.idx_to_char[idx]
if char == self.end_token:
break
if char not in [self.pad_token, self.start_token]:
password.append(char)
else:
password.append(self.unk_token)
return ''.join(password)
def batch_encode(self, passwords):
"""批量编码密码"""
encoded = np.zeros((len(passwords), self.seq_length), dtype=np.int64)
for i, pwd in enumerate(passwords):
encoded[i] = self.encode(pwd)
return encoded
4.3 数据集类实现
# src/preprocessing/dataset.py
import torch
from torch.utils.data import Dataset, DataLoader
class PasswordDataset(Dataset):
def __init__(self, encoded_passwords):
self.data = encoded_passwords
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return torch.tensor(self.data[idx], dtype=torch.long)
def create_data_loaders(encoded_data, config):
"""创建训练和测试数据加载器"""
dataset = PasswordDataset(encoded_data)
# 划分训练测试集
train_size = int(config['data']['train_ratio'] * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(
dataset, [train_size, test_size]
)
train_loader = DataLoader(
train_dataset,
batch_size=config['training']['batch_size'],
shuffle=True,
num_workers=4,
pin_memory=True
)
test_loader = DataLoader(
test_dataset,
batch_size=config['training']['batch_size'],
shuffle=False,
num_workers=4,
pin_memory=True
)
return train_loader, test_loader
5. PassGAN模型实现
5.1 生成器网络
# src/models/generator.py
import torch
import torch.nn as nn
import torch.nn.functional as F
class Generator(nn.Module):
def __init__(self, config, encoder):
super(Generator, self).__init__()
self.config = config
self.encoder = encoder
self.vocab_size = encoder.vocab_size + 4 # 包括特殊令牌
self.seq_length = config['model']['seq_length']
self.latent_dim = config['model']['latent_dim']
self.hidden_dim = config['model']['gen_hidden_dim']
# 投影层:将噪声向量转换为LSTM初始状态
self.projection = nn.Linear(self.latent_dim, 2 * self.hidden_dim)
# LSTM层
self.lstm = nn.LSTM(
input_size=self.latent_dim,
hidden_size=self.hidden_dim,
num_layers=2,
batch_first=True,
dropout=0.2,
bidirectional=False
)
# 输出层
self.fc = nn.Linear(self.hidden_dim, self.vocab_size)
# 初始化权重
self.apply(self._init_weights)
def _init_weights(self, module):
if isinstance(module, nn.Linear):
nn.init.xavier_uniform_(module.weight)
if module.bias is not None:
nn.init.constant_(module.bias, 0)
elif isinstance(module, nn.LSTM):
for name, param in module.named_parameters():
if 'weight_ih' in name:
nn.init.xavier_uniform_(param.data)
elif 'weight_hh' in name:
nn.init.orthogonal_(param.data)
elif 'bias' in name:
nn.init.constant_(param.data, 0)
def forward(self, z, temperature=1.0):
"""
前向传播
z: 噪声向量 [batch_size, latent_dim]
temperature: 温度参数控制生成多样性
"""
batch_size = z.size(0)
# 扩展噪声向量以匹配序列长度
z_expanded = z.unsqueeze(1).repeat(1, self.seq_length, 1)
# LSTM前向传播
lstm_out, _ = self.lstm(z_expanded)
# 通过全连接层获取输出logits
logits = self.fc(lstm_out) # [batch_size, seq_length, vocab_size]
# 应用温度参数
logits = logits / temperature
# 使用Gumbel-Softmax获取离散输出
if self.training:
# 训练时使用Gumbel-Softmax近似
samples = F.gumbel_softmax(logits, tau=temperature, hard=True, dim=-1)
else:
# 推理时直接采样
probs = F.softmax(logits, dim=-1)
samples = torch.multinomial(probs.view(-1, self.vocab_size), 1)
samples = samples.view(batch_size, self.seq_length)
return samples, logits
def generate(self, num_samples, temperature=1.0, device='cpu'):
"""生成密码样本"""
self.eval()
with torch.no_grad():
# 生成随机噪声
z = torch.randn(num_samples, self.latent_dim).to(device)
# 生成样本
samples, _ = self.forward(z, temperature)
# 转换为密码字符串
generated_passwords = []
for i in range(num_samples):
seq = samples[i].cpu().numpy()
password = self.encoder.decode(seq)
generated_passwords.append(password)
return generated_passwords
5.2 判别器网络
# src/models/discriminator.py
import torch
import torch.nn as nn
import torch.nn.functional as F
class Discriminator(nn.Module):
def __init__(self, config, encoder):
super(Discriminator, self).__init__()
self.config = config
self.encoder = encoder
self.vocab_size = encoder.vocab_size + 4
self.seq_length = config['model']['seq_length']
self.hidden_dim = config['model']['disc_hidden_dim']
# 嵌入层
self.embedding = nn.Embedding(self.vocab_size, 128)
# 卷积网络
self.conv_net = nn.Sequential(
# 输入: [batch_size, 128, seq_length]
nn.Conv1d(128, 256, kernel_size=3, padding=1),
nn.LeakyReLU(0.2),
nn.Dropout(0.2),
nn.Conv1d(256, 512, kernel_size=3, padding=1),
nn.LeakyReLU(0.2),
nn.Dropout(0.2),
nn.Conv1d(512, 1024, kernel_size=3, padding=1),
nn.LeakyReLU(0.2),
nn.Dropout(0.2),
nn.AdaptiveMaxPool1d(1)
)
# 全连接层
self.fc = nn.Sequential(
nn.Linear(1024, 512),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(512, 256),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(256, 1),
nn.Sigmoid()
)
self.apply(self._init_weights)
def _init_weights(self, module):
if isinstance(module, nn.Linear) or isinstance(module, nn.Conv1d):
nn.init.xavier_uniform_(module.weight)
if module.bias is not None:
nn.init.constant_(module.bias, 0)
elif isinstance(module, nn.Embedding):
nn.init.normal_(module.weight, mean=0, std=0.02)
def forward(self, x):
"""
前向传播
x: 输入序列 [batch_size, seq_length]
"""
# 嵌入层
embedded = self.embedding(x) # [batch_size, seq_length, embedding_dim]
embedded = embedded.transpose(1, 2) # [batch_size, embedding_dim, seq_length]
# 卷积网络
features = self.conv_net(embedded) # [batch_size, 1024, 1]
features = features.squeeze(2) # [batch_size, 1024]
# 全连接层
validity = self.fc(features) # [batch_size, 1]
return validity
5.3 完整PassGAN模型
# src/models/passgan.py
import torch
import torch.nn as nn
from .generator import Generator
from .discriminator import Discriminator
class PassGAN:
def __init__(self, config, encoder, device='cpu'):
self.config = config
self.encoder = encoder
self.device = device
# 初始化生成器和判别器
self.generator = Generator(config, encoder).to(device)
self.discriminator = Discriminator(config, encoder).to(device)
# 优化器
self.optimizer_G = torch.optim.Adam(
self.generator.parameters(),
lr=config['training']['gen_learning_rate'],
betas=(config['training']['beta1'], config['training']['beta2'])
)
self.optimizer_D = torch.optim.Adam(
self.discriminator.parameters(),
lr=config['training']['disc_learning_rate'],
betas=(config['training']['beta1'], config['training']['beta2'])
)
# 损失函数
self.adversarial_loss = nn.BCELoss()
# 训练历史
self.history = {
'd_loss': [], 'g_loss': [], 'd_real': [], 'd_fake': []
}
def train_step(self, real_passwords):
"""单次训练步骤"""
batch_size = real_passwords.size(0)
# 真实和假标签
real_labels = torch.ones(batch_size, 1).to(self.device)
fake_labels = torch.zeros(batch_size, 1).to(self.device)
# ---------------------
# 训练判别器
# ---------------------
self.discriminator.train()
self.generator.eval()
self.optimizer_D.zero_grad()
# 真实样本的损失
real_validity = self.discriminator(real_passwords)
d_real_loss = self.adversarial_loss(real_validity, real_labels)
# 生成假样本
z = torch.randn(batch_size, self.config['model']['latent_dim']).to(self.device)
fake_passwords, _ = self.generator(z)
# 假样本的损失
fake_validity = self.discriminator(fake_passwords.detach())
d_fake_loss = self.adversarial_loss(fake_validity, fake_labels)
# 总判别器损失
d_loss = d_real_loss + d_fake_loss
d_loss.backward()
self.optimizer_D.step()
# ---------------------
# 训练生成器
# ---------------------
self.discriminator.eval()
self.generator.train()
self.optimizer_G.zero_grad()
# 生成新样本
z = torch.randn(batch_size, self.config['model']['latent_dim']).to(self.device)
gen_passwords, _ = self.generator(z)
# 生成器损失:让判别器认为生成的样本是真实的
gen_validity = self.discriminator(gen_passwords)
g_loss = self.adversarial_loss(gen_validity, real_labels)
g_loss.backward()
self.optimizer_G.step()
# 记录训练指标
return {
'd_loss': d_loss.item(),
'g_loss': g_loss.item(),
'd_real': real_validity.mean().item(),
'd_fake': fake_validity.mean().item()
}
def sample(self, num_samples, temperature=1.0):
"""生成密码样本"""
return self.generator.generate(num_samples, temperature, self.device)
def save_models(self, path):
"""保存模型"""
torch.save({
'generator': self.generator.state_dict(),
'discriminator': self.discriminator.state_dict(),
'optimizer_G': self.optimizer_G.state_dict(),
'optimizer_D': self.optimizer_D.state_dict(),
'history': self.history
}, path)
def load_models(self, path):
"""加载模型"""
checkpoint = torch.load(path, map_location=self.device)
self.generator.load_state_dict(checkpoint['generator'])
self.discriminator.load_state_dict(checkpoint['discriminator'])
self.optimizer_G.load_state_dict(checkpoint['optimizer_G'])
self.optimizer_D.load_state_dict(checkpoint['optimizer_D'])
self.history = checkpoint['history']
6. 训练模块实现
6.1 训练循环
# src/training/trainer.py
import torch
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import os
import logging
class PassGANTrainer:
def __init__(self, model, train_loader, config, device='cpu'):
self.model = model
self.train_loader = train_loader
self.config = config
self.device = device
self.logger = logging.getLogger(__name__)
# 创建输出目录
os.makedirs('models', exist_ok=True)
os.makedirs('results', exist_ok=True)
def train(self):
"""完整的训练过程"""
num_epochs = self.config['training']['epochs']
sample_interval = self.config['training']['sample_interval']
self.logger.info("Starting training...")
for epoch in range(num_epochs):
epoch_d_loss = 0
epoch_g_loss = 0
epoch_d_real = 0
epoch_d_fake = 0
batch_count = 0
with tqdm(self.train_loader, desc=f'Epoch {epoch+1}/{num_epochs}') as pbar:
for batch_idx, real_passwords in enumerate(pbar):
real_passwords = real_passwords.to(self.device)
# 训练一个批次
metrics = self.model.train_step(real_passwords)
# 累计指标
epoch_d_loss += metrics['d_loss']
epoch_g_loss += metrics['g_loss']
epoch_d_real += metrics['d_real']
epoch_d_fake += metrics['d_fake']
batch_count += 1
# 更新进度条
pbar.set_postfix({
'D Loss': f"{metrics['d_loss']:.4f}",
'G Loss': f"{metrics['g_loss']:.4f}",
'D Real': f"{metrics['d_real']:.4f}",
'D Fake': f"{metrics['d_fake']:.4f}"
})
# 计算epoch平均指标
avg_d_loss = epoch_d_loss / batch_count
avg_g_loss = epoch_g_loss / batch_count
avg_d_real = epoch_d_real / batch_count
avg_d_fake = epoch_d_fake / batch_count
# 记录历史
self.model.history['d_loss'].append(avg_d_loss)
self.model.history['g_loss'].append(avg_g_loss)
self.model.history['d_real'].append(avg_d_real)
self.model.history['d_fake'].append(avg_d_fake)
self.logger.info(
f"Epoch {epoch+1}/{num_epochs} | "
f"D Loss: {avg_d_loss:.4f} | G Loss: {avg_g_loss:.4f} | "
f"D Real: {avg_d_real:.4f} | D Fake: {avg_d_fake:.4f}"
)
# 定期采样和保存
if (epoch + 1) % sample_interval == 0:
self._sample_and_save(epoch + 1)
self._save_checkpoint(epoch + 1)
self._plot_training_history()
def _sample_and_save(self, epoch):
"""生成样本并保存"""
# 生成样本
samples = self.model.sample(100, temperature=0.8)
# 保存样本
sample_file = f"results/samples_epoch_{epoch}.txt"
with open(sample_file, 'w') as f:
for pwd in samples:
f.write(f"{pwd}\n")
self.logger.info(f"Saved samples to {sample_file}")
def _save_checkpoint(self, epoch):
"""保存检查点"""
checkpoint_path = f"models/checkpoint_epoch_{epoch}.pth"
self.model.save_models(checkpoint_path)
self.logger.info(f"Saved checkpoint to {checkpoint_path}")
def _plot_training_history(self):
"""绘制训练历史"""
plt.figure(figsize=(12, 8))
# 损失曲线
plt.subplot(2, 2, 1)
plt.plot(self.model.history['d_loss'], label='Discriminator Loss')
plt.plot(self.model.history['g_loss'], label='Generator Loss')
plt.title('Training Losses')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
# 判别器输出
plt.subplot(2, 2, 2)
plt.plot(self.model.history['d_real'], label='D(x)')
plt.plot(self.model.history['d_fake'], label='D(G(z))')
plt.title('Discriminator Outputs')
plt.xlabel('Epoch')
plt.ylabel('Probability')
plt.legend()
# 损失比率
plt.subplot(2, 2, 3)
ratio = [d / g if g != 0 else 0 for d, g in
zip(self.model.history['d_loss'], self.model.history['g_loss'])]
plt.plot(ratio)
plt.title('D Loss / G Loss Ratio')
plt.xlabel('Epoch')
plt.ylabel('Ratio')
plt.tight_layout()
plt.savefig('results/training_history.png')
plt.close()
6.2 学习率调度和早停
# src/training/scheduler.py
import numpy as np
class LearningRateScheduler:
def __init__(self, optimizer, mode='step', **kwargs):
self.optimizer = optimizer
self.mode = mode
self.config = kwargs
if mode == 'step':
self.step_size = kwargs.get('step_size', 30)
self.gamma = kwargs.get('gamma', 0.1)
elif mode == 'plateau':
self.patience = kwargs.get('patience', 10)
self.factor = kwargs.get('factor', 0.5)
self.min_lr = kwargs.get('min_lr', 1e-6)
self.best_loss = np.inf
self.counter = 0
self.initial_lr = self.optimizer.param_groups[0]['lr']
def step(self, current_loss=None):
"""更新学习率"""
if self.mode == 'step':
# 步长衰减
if self.optimizer.param_groups[0]['lr'] > self.min_lr:
self.optimizer.param_groups[0]['lr'] *= self.gamma
elif self.mode == 'plateau' and current_loss is not None:
# 基于验证损失的衰减
if current_loss < self.best_loss:
self.best_loss = current_loss
self.counter = 0
else:
self.counter += 1
if self.counter >= self.patience:
new_lr = max(self.optimizer.param_groups[0]['lr'] * self.factor, self.min_lr)
self.optimizer.param_groups[0]['lr'] = new_lr
self.counter = 0
class EarlyStopping:
def __init__(self, patience=10, min_delta=0):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.best_loss = np.inf
self.early_stop = False
def __call__(self, current_loss):
if current_loss < self.best_loss - self.min_delta:
self.best_loss = current_loss
self.counter = 0
else:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
return self.early_stop
7. 密码生成与评估模块
7.1 批量生成器
# src/generation/batch_generator.py
import torch
import numpy as np
from tqdm import tqdm
class BatchPasswordGenerator:
def __init__(self, model, encoder, device='cpu'):
self.model = model
self.encoder = encoder
self.device = device
def generate_batch(self, num_samples, batch_size=1000, temperature=1.0):
"""批量生成密码"""
all_passwords = []
num_batches = (num_samples + batch_size - 1) // batch_size
for i in tqdm(range(num_batches)):
current_batch_size = min(batch_size, num_samples - i * batch_size)
# 生成一批密码
batch_passwords = self.model.sample(current_batch_size, temperature, self.device)
all_passwords.extend(batch_passwords)
return all_passwords
def generate_with_constraints(self, num_samples, constraints, temperature=1.0):
"""生成满足特定约束的密码"""
valid_passwords = []
attempts = 0
max_attempts = num_samples * 10 # 最大尝试次数
with tqdm(total=num_samples) as pbar:
while len(valid_passwords) < num_samples and attempts < max_attempts:
# 生成一批密码
batch = self.model.sample(100, temperature, self.device)
for pwd in batch:
if self._check_constraints(pwd, constraints):
valid_passwords.append(pwd)
pbar.update(1)
if len(valid_passwords) >= num_samples:
break
attempts += 1
return valid_passwords
def _check_constraints(self, password, constraints):
"""检查密码是否满足约束条件"""
# 长度约束
min_len = constraints.get('min_length', 0)
max_len = constraints.get('max_length', float('inf'))
if not (min_len <= len(password) <= max_len):
return False
# 字符类型约束
if 'require_digit' in constraints and constraints['require_digit']:
if not any(c.isdigit() for c in password):
return False
if 'require_upper' in constraints and constraints['require_upper']:
if not any(c.isupper() for c in password):
return False
if 'require_lower' in constraints and constraints['require_lower']:
if not any(c.islower() for c in password):
return False
if 'require_special' in constraints and constraints['require_special']:
special_chars = "!@#$%^&*()_-+=[]{}|;:,.<>?/"
if not any(c in special_chars for c in password):
return False
return True
7.2 评估指标
# src/evaluation/metrics.py
import numpy as np
from collections import Counter
import math
from sklearn.metrics import pairwise_distances
from scipy.spatial.distance import cosine
class PasswordMetrics:
@staticmethod
def uniqueness(generated_passwords):
"""计算生成密码的唯一性"""
unique_count = len(set(generated_passwords))
total_count = len(generated_passwords)
return unique_count / total_count, unique_count
@staticmethod
def novelty(generated_passwords, training_passwords):
"""计算相对于训练集的新颖性"""
training_set = set(training_passwords)
novel_count = sum(1 for pwd in generated_passwords if pwd not in training_set)
return novel_count / len(generated_passwords), novel_count
@staticmethod
def entropy(password):
"""计算密码的香农熵"""
if not password:
return 0
freq = Counter(password)
probs = [count / len(password) for count in freq.values()]
return -sum(p * math.log2(p) for p in probs)
@staticmethod
def average_entropy(passwords):
"""计算平均熵"""
entropies = [PasswordMetrics.entropy(pwd) for pwd in passwords]
return np.mean(entropies) if entropies else 0
@staticmethod
def length_distribution(passwords):
"""计算长度分布"""
lengths = [len(pwd) for pwd in passwords]
return {
'mean': np.mean(lengths),
'std': np.std(lengths),
'min': np.min(lengths),
'max': np.max(lengths),
'histogram': np.bincount(lengths)
}
@staticmethod
def character_distribution(passwords):
"""计算字符分布"""
all_chars = ''.join(passwords)
char_count = Counter(all_chars)
total_chars = len(all_chars)
return {
char: count / total_chars for char, count in char_count.items()
}
@staticmethod
def similarity_to_training(generated_passwords, training_passwords, sample_size=1000):
"""计算与训练集的相似度"""
if len(generated_passwords) > sample_size:
gen_sample = np.random.choice(generated_passwords, sample_size, replace=False)
else:
gen_sample = generated_passwords
if len(training_passwords) > sample_size:
train_sample = np.random.choice(training_passwords, sample_size, replace=False)
else:
train_sample = training_passwords
# 创建字符频率向量
def create_char_vector(passwords, charset):
vector = np.zeros(len(charset))
all_chars = ''.join(passwords)
counter = Counter(all_chars)
for char, count in counter.items():
if char in charset:
idx = charset.index(char)
vector[idx] = count
return vector / len(all_chars) if len(all_chars) > 0 else vector
# 获取所有字符
all_chars = sorted(set(''.join(gen_sample) + ''.join(train_sample)))
gen_vector = create_char_vector(gen_sample, all_chars)
train_vector = create_char_vector(train_sample, all_chars)
# 计算余弦相似度
similarity = 1 - cosine(gen_vector, train_vector)
return similarity
7.3 综合评估器
# src/evaluation/evaluator.py
import json
import matplotlib.pyplot as plt
import seaborn as sns
from .metrics import PasswordMetrics
class PasswordEvaluator:
def __init__(self, training_passwords):
self.training_passwords = training_passwords
def comprehensive_evaluation(self, generated_passwords, output_file=None):
"""综合评估生成的密码"""
results = {}
# 唯一性
uniqueness, unique_count = PasswordMetrics.uniqueness(generated_passwords)
results['uniqueness'] = uniqueness
results['unique_count'] = unique_count
# 新颖性
novelty, novel_count = PasswordMetrics.novelty(generated_passwords, self.training_passwords)
results['novelty'] = novelty
results['novel_count'] = novel_count
# 平均熵
avg_entropy = PasswordMetrics.average_entropy(generated_passwords)
results['average_entropy'] = avg_entropy
# 长度分布
length_stats = PasswordMetrics.length_distribution(generated_passwords)
results['length_distribution'] = length_stats
# 字符分布
char_dist = PasswordMetrics.character_distribution(generated_passwords)
results['character_distribution'] = char_dist
# 与训练集的相似度
similarity = PasswordMetrics.similarity_to_training(generated_passwords, self.training_passwords)
results['similarity_to_training'] = similarity
# 保存结果
if output_file:
with open(output_file, 'w') as f:
json.dump(results, f, indent=2)
# 生成可视化图表
self._generate_visualizations(generated_passwords, output_file)
return results
def _generate_visualizations(self, passwords, output_file):
"""生成可视化图表"""
base_name = output_file.replace('.json', '')
# 长度分布直方图
lengths = [len(pwd) for pwd in passwords]
plt.figure(figsize=(10, 6))
plt.hist(lengths, bins=range(min(lengths), max(lengths) + 1), alpha=0.7)
plt.title('Password Length Distribution')
plt.xlabel('Length')
plt.ylabel('Frequency')
plt.savefig(f'{base_name}_length_dist.png')
plt.close()
# 字符类型分布
char_types = {'digit': 0, 'lower': 0, 'upper': 0, 'special': 0}
special_chars = "!@#$%^&*()_-+=[]{}|;:,.<>?/"
for pwd in passwords:
for char in pwd:
if char.isdigit():
char_types['digit'] += 1
elif char.islower():
char_types['lower'] += 1
elif char.isupper():
char_types['upper'] += 1
elif char in special_chars:
char_types['special'] += 1
total_chars = sum(char_types.values())
if total_chars > 0:
char_types = {k: v/total_chars for k, v in char_types.items()}
plt.figure(figsize=(8, 6))
plt.bar(char_types.keys(), char_types.values())
plt.title('Character Type Distribution')
plt.ylabel('Proportion')
plt.savefig(f'{base_name}_char_type_dist.png')
plt.close()
# Top字符分布
all_chars = ''.join(passwords)
char_freq = Counter(all_chars)
top_chars = dict(sorted(char_freq.items(), key=lambda x: x[1], reverse=True)[:20])
plt.figure(figsize=(12, 6))
plt.bar(top_chars.keys(), top_chars.values())
plt.title('Top 20 Character Frequency')
plt.xlabel('Character')
plt.ylabel('Frequency')
plt.savefig(f'{base_name}_top_chars.png')
plt.close()
8. 系统集成与用户界面
8.1 命令行界面
# src/cli/main.py
import argparse
import yaml
import logging
from pathlib import Path
from preprocessing.data_loader import PasswordDataLoader
from preprocessing.encoder import PasswordEncoder
from preprocessing.dataset import create_data_loaders
from models.passgan import PassGAN
from training.trainer import PassGANTrainer
from generation.batch_generator import BatchPasswordGenerator
from evaluation.evaluator import PasswordEvaluator
def setup_logging():
"""设置日志配置"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('passgan_system.log'),
logging.StreamHandler()
]
)
def load_config(config_path):
"""加载配置文件"""
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='PassGAN Password Generation System')
subparsers = parser.add_subparsers(dest='command', help='Command to execute')
# 训练命令
train_parser = subparsers.add_parser('train', help='Train the PassGAN model')
train_parser.add_argument('--config', type=str, required=True, help='Path to config file')
train_parser.add_argument('--resume', type=str, help='Path to checkpoint to resume from')
# 生成命令
generate_parser = subparsers.add_parser('generate', help='Generate passwords')
generate_parser.add_argument('--config', type=str, required=True, help='Path to config file')
generate_parser.add_argument('--model', type=str, required=True, help='Path to trained model')
generate_parser.add_argument('--num-samples', type=int, default=10000, help='Number of passwords to generate')
generate_parser.add_argument('--output', type=str, required=True, help='Output file path')
# 评估命令
eval_parser = subparsers.add_parser('evaluate', help='Evaluate generated passwords')
eval_parser.add_argument('--generated', type=str, required=True, help='Path to generated passwords')
eval_parser.add_argument('--training', type=str, required=True, help='Path to training passwords')
eval_parser.add_argument('--output', type=str, required=True, help='Output evaluation file')
args = parser.parse_args()
setup_logging()
logger = logging.getLogger(__name__)
if args.command == 'train':
# 训练模式
config = load_config(args.config)
logger.info("Starting training process...")
# 数据加载和预处理
data_loader = PasswordDataLoader(config)
raw_passwords = data_loader.load_raw_data(config['data']['input_path'])
cleaned_passwords = data_loader.clean_data(raw_passwords)
# 编码器
encoder = PasswordEncoder(config)
encoded_data = encoder.batch_encode(cleaned_passwords)
# 创建数据加载器
train_loader, _ = create_data_loaders(encoded_data, config)
# 创建模型
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = PassGAN(config, encoder, device)
# 恢复训练(如果指定)
if args.resume:
model.load_models(args.resume)
logger.info(f"Resumed training from {args.resume}")
# 训练
trainer = PassGANTrainer(model, train_loader, config, device)
trainer.train()
logger.info("Training completed!")
elif args.command == 'generate':
# 生成模式
config = load_config(args.config)
logger.info("Starting password generation...")
# 加载编码器
encoder = PasswordEncoder(config)
# 加载模型
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = PassGAN(config, encoder, device)
model.load_models(args.model)
# 生成密码
generator = BatchPasswordGenerator(model, encoder, device)
passwords = generator.generate_batch(args.num_samples)
# 保存生成的密码
with open(args.output, 'w') as f:
for pwd in passwords:
f.write(f"{pwd}\n")
logger.info(f"Generated {len(passwords)} passwords to {args.output}")
elif args.command == 'evaluate':
# 评估模式
logger.info("Starting evaluation...")
# 加载生成的密码
with open(args.generated, 'r') as f:
generated_passwords = [line.strip() for line in f]
# 加载训练密码
with open(args.training, 'r') as f:
training_passwords = [line.strip() for line in f]
# 评估
evaluator = PasswordEvaluator(training_passwords)
results = evaluator.comprehensive_evaluation(generated_passwords, args.output)
logger.info(f"Evaluation completed. Results saved to {args.output}")
logger.info(f"Uniqueness: {results['uniqueness']:.4f}")
logger.info(f"Novelty: {results['novelty']:.4f}")
logger.info(f"Average Entropy: {results['average_entropy']:.4f}")
else:
parser.print_help()
if __name__ == "__main__":
main()
8.2 配置文件示例
# configs/train_config.yaml
data:
input_path: "data/raw/rockyou.txt"
output_path: "data/processed/encoded_passwords.npy"
min_length: 4
max_length: 16
train_ratio: 0.8
vocab_size: 256
ascii_only: true
model:
latent_dim: 100
gen_hidden_dim: 512
disc_hidden_dim: 512
seq_length: 16
training:
batch_size: 64
epochs: 1000
gen_learning_rate: 0.0002
disc_learning_rate: 0.0002
beta1: 0.5
beta2: 0.999
disc_iterations: 1
sample_interval: 100
generation:
num_samples: 10000
temperature: 0.8
evaluation:
test_size: 1000
metrics: ["unique_ratio", "novelty", "similarity"]
9. 系统测试与验证
9.1 单元测试
# tests/test_preprocessing.py
import unittest
import numpy as np
from src.preprocessing.data_loader import PasswordDataLoader
from src.preprocessing.encoder import PasswordEncoder
class TestPreprocessing(unittest.TestCase):
def setUp(self):
self.config = {
'data': {
'min_length': 4,
'max_length': 16,
'vocab_size': 256
},
'model': {
'seq_length': 16
}
}
self.sample_passwords = [
"password123",
"hello world",
"test@123",
"short",
"verylongpasswordthatexceedslimit"
]
def test_data_cleaning(self):
loader = PasswordDataLoader(self.config)
cleaned = loader.clean_data(self.sample_passwords)
# 应该过滤掉过短和过长的密码
self.assertEqual(len(cleaned), 3)
self.assertNotIn("short", cleaned)
self.assertNotIn("verylongpasswordthatexceedslimit", cleaned)
def test_encoder_decoder(self):
encoder = PasswordEncoder(self.config)
# 测试编码解码
test_password = "test@123"
encoded = encoder.encode(test_password)
decoded = encoder.decode(encoded)
# 应该能够正确还原密码
self.assertEqual(decoded, test_password)
def test_batch_encoding(self):
encoder = PasswordEncoder(self.config)
encoded_batch = encoder.batch_encode(self.sample_passwords[:3])
# 批处理应该返回正确形状的数组
self.assertEqual(encoded_batch.shape, (3, self.config['model']['seq_length']))
if __name__ == '__main__':
unittest.main()
9.2 集成测试
# tests/test_integration.py
import unittest
import tempfile
import torch
from src.models.passgan import PassGAN
from src.preprocessing.encoder import PasswordEncoder
class TestIntegration(unittest.TestCase):
def setUp(self):
self.config = {
'data': {
'vocab_size': 256
},
'model': {
'latent_dim': 10,
'gen_hidden_dim': 32,
'disc_hidden_dim': 32,
'seq_length': 16
},
'training': {
'gen_learning_rate': 0.0002,
'disc_learning_rate': 0.0002,
'beta1': 0.5,
'beta2': 0.999
}
}
self.encoder = PasswordEncoder(self.config)
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
def test_model_creation(self):
"""测试模型创建和基本功能"""
model = PassGAN(self.config, self.encoder, self.device)
# 测试生成器前向传播
batch_size = 5
z = torch.randn(batch_size, self.config['model']['latent_dim']).to(self.device)
samples, logits = model.generator(z)
self.assertEqual(samples.shape, (batch_size, self.config['model']['seq_length']))
self.assertEqual(logits.shape, (batch_size, self.config['model']['seq_length'],
self.encoder.vocab_size + 4))
# 测试判别器前向传播
validity = model.discriminator(samples)
self.assertEqual(validity.shape, (batch_size, 1))
def test_password_generation(self):
"""测试密码生成"""
model = PassGAN(self.config, self.encoder, self.device)
# 生成少量样本
passwords = model.sample(10, temperature=0.8)
self.assertEqual(len(passwords), 10)
for pwd in passwords:
self.assertIsInstance(pwd, str)
self.assertTrue(0 < len(pwd) <= self.config['model']['seq_length'] - 2) # 减去开始和结束令牌
def test_model_save_load(self):
"""测试模型保存和加载"""
with tempfile.NamedTemporaryFile(delete=False) as tmp:
model_path = tmp.name
try:
model = PassGAN(self.config, self.encoder, self.device)
# 保存模型
model.save_models(model_path)
# 创建新模型并加载
new_model = PassGAN(self.config, self.encoder, self.device)
new_model.load_models(model_path)
# 测试加载的模型是否能正常工作
passwords = new_model.sample(5)
self.assertEqual(len(passwords), 5)
finally:
import os
if os.path.exists(model_path):
os.unlink(model_path)
if __name__ == '__main__':
unittest.main()
10. 性能优化与部署
10.1 性能优化策略
# src/utils/optimization.py
import torch
import time
from contextlib import contextmanager
@contextmanager
def torch_timing(description):
"""PyTorch操作计时上下文管理器"""
if torch.cuda.is_available():
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)
start.record()
else:
start = time.time()
yield
if torch.cuda.is_available():
end.record()
torch.cuda.synchronize()
elapsed = start.elapsed_time(end) / 1000 # 转换为秒
else:
elapsed = time.time() - start
print(f"{description}: {elapsed:.4f} seconds")
def optimize_model_performance(model, config):
"""优化模型性能"""
# 混合精度训练
if config.get('use_amp', False) and torch.cuda.is_available():
from torch.cuda.amp import autocast, GradScaler
model.scaler = GradScaler()
model.use_amp = True
else:
model.use_amp = False
# 数据并行(多GPU)
if torch.cuda.device_count() > 1 and config.get('data_parallel', False):
model.generator = torch.nn.DataParallel(model.generator)
model.discriminator = torch.nn.DataParallel(model.discriminator)
return model
def memory_optimization_hooks():
"""内存优化钩子"""
# 在前向传播后清空中间变量
def clear_memory_hook(module, input, output):
if hasattr(module, 'intermediate_values'):
del module.intermediate_values
return clear_memory_hook
10.2 模型量化与剪枝
# src/utils/quantization.py
import torch
import torch.nn.utils.prune as prune
def quantize_model(model, quantization_bits=8):
"""模型量化"""
if quantization_bits == 8:
# 使用PyTorch的量化功能
model = torch.quantization.quantize_dynamic(
model, {torch.nn.Linear, torch.nn.LSTM}, dtype=torch.qint8
)
elif quantization_bits == 16 and torch.cuda.is_available():
# 使用半精度浮点数
model = model.half()
return model
def prune_model(model, pruning_amount=0.2):
"""模型剪枝"""
parameters_to_prune = []
# 收集所有可剪枝的参数
for name, module in model.named_modules():
if isinstance(module, (torch.nn.Linear, torch.nn.Conv1d)):
parameters_to_prune.append((module, 'weight'))
# 全局剪枝
prune.global_unstructured(
parameters_to_prune,
pruning_method=prune.L1Unstructured,
amount=pruning_amount,
)
# 永久移除剪枝掩码
for module, param_name in parameters_to_prune:
prune.remove(module, param_name)
return model
def optimize_for_inference(model, example_input):
"""为推理优化模型"""
# 模型剪枝
model = prune_model(model, pruning_amount=0.1)
# 模型量化
model = quantize_model(model, quantization_bits=8)
# 使用TorchScript编译
model_scripted = torch.jit.trace(model, example_input)
return model_scripted
11. 安全性与伦理考虑
11.1 安全使用指南
# docs/security_guidelines.md
# PassGAN系统安全使用指南
## 1. 合法使用
- 本系统仅用于安全研究和教育目的
- 在使用前确保获得所有必要的授权和许可
- 禁止用于任何非法或恶意活动
## 2. 数据保护
- 处理真实密码数据时采取适当的安全措施
- 训练完成后及时删除或安全存储敏感数据
- 使用加密存储和传输数据
## 3. 模型安全
- 定期更新模型以防止过时
- 使用数字签名验证模型完整性
- 限制对训练好的模型的访问权限
## 4. 输出管理
- 生成的密码样本应妥善保管
- 避免在公共环境中泄露生成的密码
- 定期清理生成的临时文件
## 5. 合规性
- 遵守当地法律法规
- 遵循行业最佳实践和标准
- 进行定期的安全审计
# 示例安全配置
security_config = {
'data_encryption': True,
'model_signature_verification': True,
'access_control': {
'require_authentication': True,
'role_based_access': True
},
'audit_logging': True,
'automatic_data_purging': True
}
11.2 伦理考虑
# docs/ethics_considerations.md
# PassGAN系统伦理考虑
## 1. 隐私保护
- 匿名化处理所有训练数据
- 最小化数据收集原则
- 提供数据删除机制
## 2. 偏见与公平性
- 检测和缓解模型中的偏见
- 确保生成的密码不包含敏感信息
- 定期进行公平性评估
## 3. 透明度
- 明确说明系统能力和局限性
- 提供可解释的评估结果
- 公开使用的算法和方法
## 4. 责任与问责
- 明确系统使用责任方
- 建立问题报告和响应机制
- 保持开发过程的透明度
## 5. 社会影响
- 评估系统对社会的潜在影响
- 积极参与行业伦理讨论
- 遵循负责任创新原则
12. 总结与未来展望
本项目实现了一个完整的基于PassGAN的密码训练系统,涵盖了从数据预处理到模型训练、密码生成和评估的完整流程。系统采用模块化设计,具有良好的可扩展性和可维护性。
12.1 技术总结
- 数据预处理:实现了高效的数据清洗和编码机制,支持大规模密码数据集的处理
- 模型架构:基于LSTM和CNN设计了生成器和判别器网络,能够有效学习密码分布特征
- 训练优化:实现了稳定的GAN训练流程,包含多种优化策略和监控机制
- 评估体系:建立了全面的密码质量评估指标体系,包括唯一性、新颖性、熵值等多个维度
- 系统集成:提供了完整的命令行界面和配置系统,便于实际部署和使用
12.2 未来改进方向
-
模型架构改进:
- 探索Transformer等新型网络结构
- 引入注意力机制提高生成长序列的能力
- 尝试条件生成模型以支持特定约束的密码生成
-
训练效率提升:
- 实现分布式训练支持
- 优化内存使用以处理更大规模数据集
- 探索更稳定的GAN训练技术
-
功能扩展:
- 添加实时密码强度评估功能
- 支持多模态密码生成(如图形密码)
- 开发Web界面和API服务
-
安全性增强:
- 实现差分隐私训练
- 添加模型水印和溯源功能
- 强化系统的访问控制和审计功能
12.3 应用前景
本系统在以下领域具有广阔的应用前景:
- 网络安全评估:帮助组织评估其密码策略的有效性
- 密码学研究:为密码强度分析和破解抵抗性研究提供工具
- 用户教育:生成示例密码帮助用户理解强密码的特征
- 系统开发:集成到身份验证系统中进行实时密码强度检查
通过持续的技术改进和负责任的使用,基于PassGAN的密码训练系统将成为网络安全领域的重要工具,为提升数字身份安全做出贡献。
更多推荐
所有评论(0)