import torch, datetime, time
import torch.nn as nn
import numpy as np
from sklearn.model_selection import train_test_split

def get_char_set():
    with open('AES.txt') as f:
        AES = f.read()

    with open('DES.txt') as f:
        DES = f.read()

    char_set = set(list(set(AES)) + list(set(DES)))

    return list(char_set)

def get_data():
    char_set = get_char_set()

    with open('AES.txt') as f:
        AES = np.array(list(map(lambda x:np.array(list(x.strip())), f.readlines()))[:10000])

    with open('DES.txt') as f:
        Base64 = np.array(list(map(lambda x:np.array(list(x.strip())), f.readlines()))[:10000])

    x, y = [], []
    for i in AES:
        temp = []
        for j in i:
            temp_ = np.zeros(shape=(len(char_set)))
            temp_[char_set.index(j)] = 1
            temp.append(temp_)
        x.append(np.concatenate(temp))
        y.append(0)

    for i in Base64:
        temp = []
        for j in i:
            temp_ = np.zeros(shape=(len(char_set)))
            temp_[char_set.index(j)] = 1
            temp.append(temp_)
        x.append(np.concatenate(temp))
        y.append(1)

    x, y = np.array(x), np.array(y)
    x = np.expand_dims(x, axis=1)
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, shuffle=True, random_state=0)
    return x_train, x_test, y_train, y_test

class CNN(nn.Module):
    def __init__(self, num_classes):
        super(CNN, self).__init__()

        self.conv = nn.Sequential(
            nn.Conv1d(1, 32, kernel_size=7, stride=1, padding=0),
            nn.BatchNorm1d(32),
            nn.ReLU(inplace=True),
            nn.Conv1d(32, 32, kernel_size=7, stride=1, padding=0),
            nn.BatchNorm1d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=10, stride=10),

            nn.Conv1d(32, 64, kernel_size=7, stride=1, padding=0),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.Conv1d(64, 64, kernel_size=7, stride=1, padding=0),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=4, stride=4),

            nn.Conv1d(64, 128, kernel_size=7, stride=1, padding=0),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Conv1d(128, 128, kernel_size=7, stride=1, padding=0),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=4, stride=4),

            nn.Flatten()
        )

        self.fc = nn.Linear(32 * 20, num_classes)

    def forward(self, x):
        x = self.conv(x)
        x = self.fc(x)
        return x

if __name__ == '__main__':
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(DEVICE)

    BATCH_SIZE = 32
    EPOCH = 50

    x_train, x_test, y_train, y_test = get_data()
    model = CNN(num_classes=2)

    train_dataset = torch.utils.data.TensorDataset(torch.from_numpy(x_train), torch.from_numpy(y_train))
    train_iter = torch.utils.data.DataLoader(train_dataset, BATCH_SIZE, shuffle=True)

    test_dataset = torch.utils.data.TensorDataset(torch.from_numpy(x_test), torch.from_numpy(y_test))
    test_iter = torch.utils.data.DataLoader(test_dataset, BATCH_SIZE, shuffle=True)

    optimizer = torch.optim.Adam(params=model.parameters(), lr=0.0005, weight_decay=0.0005)
    loss = torch.nn.CrossEntropyLoss().to(DEVICE)

    best_acc = 0
    print('{} begin train on {}!'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), DEVICE))
    with open('train.log', 'w+') as f:
        f.write('loss,val_loss,acc,val_acc')

    for epoch in range(EPOCH):
        model.to(DEVICE)
        model.train()
        train_loss = 0
        correct = 0
        begin = time.time()
        for x, y in train_iter:
            x, y = x.to(DEVICE), y.to(DEVICE).long()

            pred = model(x.float())
            l = loss(pred, y)
            optimizer.zero_grad()
            l.backward()
            optimizer.step()

            y_pred = torch.max(pred.data, 1)
            correct += (y_pred.indices == y).sum().to('cpu').item()

            train_loss += float(l.data)
        train_loss /= len(train_iter)
        train_acc = (correct / (len(train_iter) * BATCH_SIZE))

        test_loss = 0
        correct = 0
        model.eval()
        with torch.no_grad():
            for x, y in test_iter:
                x, y = x.to(DEVICE), y.to(DEVICE).long()

                pred = model(x.float())
                l = loss(pred, y)
                test_loss += float(l.data)

                y_pred = torch.max(pred.data, 1)
                correct += (y_pred.indices == y).sum().to('cpu').item()
        test_loss /= len(test_iter)
        test_acc = (correct / (len(test_iter) * BATCH_SIZE))

        if test_acc > best_acc:
            best_acc = test_acc
            model.to('cpu')
            torch.save(model, 'model.pht')
        with open('train.log', 'a+') as f:
            f.write('\n{:.5f},{:.5f},{:.4f},{:.4f}\n'.format(train_loss, test_loss, train_acc, test_acc))
        print('{} epoch:{}, time:{:.2f}s, train_loss:{:.5f}, val_loss:{:.5f}, train_acc:{:.4f}, val_acc:{:.4f}'.format(
            datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            epoch + 1, time.time() - begin, train_loss, test_loss, train_acc, test_acc
        ))

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐