Pytorch Multiclass Classification

2021. 10. 11. 19:27

Pytorch를 이용한 Multiclass Classification

Custom Data

feature 5개, label 종류 6개로 이루어진 데이터.

각 label마다 기본 base 값을 토대로 무작위로 생성 (Total 1200개)

Custom CSV Data

import pandas as pd

dataFrame = pd.read_csv("custom_random_data.csv", delimiter=",");
# label 종류 별 feature 표준편차 확인

표준편차가 큰 F3 feature를 제외한 나머지로 학습을 진행

Custom Dataset, Custom Model

class CustomDataset(Dataset):
    def __init__(self, data, label):
        self.x = [i for i in data]
        self.y = [i for i in label]

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        x = self.x[idx]
        y = self.y[idx]
        x = np.array(x)

        return x, y

class CustomModel(nn.Module):
    def __init__(self, feature_length, label_size):
        super(CustomModel, self).__init__()
        self.relu = nn.ReLU()

        self.fc1 = nn.Linear(feature_length, 128)
        self.fc2 = nn.Linear(128, 256)
        self.fc3 = nn.Linear(256, label_size)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)

        return x



def train(model : CustomModel, criterion : nn.CrossEntropyLoss, train_loader : DataLoader, 
        valid_loader : DataLoader, device, val_every : int, threshold : float):

    save_model_location = "weights/"
    save_model_name = "best.pt"
    save_model_min_loss = "min_loss.pt"

    total_loss = 0
    min_loss = float("inf")
    min_loss_epoch = 0
    calc_loss = 0.0

    best_accuracy = 0.0

    for i in range(epoch):
        for x, y in train_loader:
            x = x.float().to(device)
            y = y.long().to(device)

            outputs = model(x) # forward

            loss = criterion(outputs, y)

            total_loss += loss.item()

        calc_loss = total_loss / len(train_loader)
        print('Epoch [{}/{}], Train Loss: {:.4f}, Best Valid Accuracy: {:.4f}'
            .format(i+1, epoch, calc_loss, best_accuracy))

        if calc_loss < min_loss:
            min_loss = calc_loss
            min_loss_epoch = i+1

            torch.save(model.state_dict(), save_model_location + save_model_min_loss)
            print("--" * 25)
            print("Saved | Min Loss: {:.4f}, Min Loss Epoch: {}".format(min_loss, min_loss_epoch))
            print("--" * 25)

        if ((i + 1) % val_every) == 0:
            average_valid_loss, total_acc, total_count = \
                validate(model, criterion, valid_loader, device, threshold)
            if best_accuracy <= (total_acc/total_count):
                check = False
                if best_accuracy == (total_acc/total_count):
                    if min_loss < calc_loss:
                        check = True
                if best_accuracy != (total_acc/total_count) or check == True:
                    best_accuracy = total_acc/total_count
                    saving_best_model_path = save_model_location + save_model_name
                    print("--" * 25)
                    torch.save(model.state_dict(), saving_best_model_path)
                    print(f'Valid Accuracy: {best_accuracy:.4f}, Valid Loss: {average_valid_loss:.4f}')
                    print(f"Saving Best Model(Path): {saving_best_model_path}")
                    print("--" * 25)

        total_loss = 0



def validate(model : CustomModel, criterion : nn.CrossEntropyLoss, valid_loader : DataLoader, device, threshold : float):

    valid_loss = 0.0
    total_acc, total_count = 0, 0

    softmax = torch.nn.Softmax(dim=1)

    with torch.no_grad():                    
        # validation loop
        for x, y in valid_loader:
            x = x.float().to(device)
            y = y.long().to(device)

            outputs = model(x)

            loss = criterion(outputs, y)
            valid_loss += loss.item()

            outputs = softmax(outputs)
            outputs = (outputs > threshold).int()
            total_acc += (outputs.argmax(1) == y).sum().item()
            total_count += len(y)

    average_valid_loss = valid_loss / len(valid_loader)
    return average_valid_loss, total_acc, total_count



def evaluate(model, test_loader, classes, label_numbers, device, threshold=0.7):
    y_pred = []
    y_true = []
    pred_ans = [0] * len(label_numbers)
    true_cnts = [0] * len(label_numbers)
    softmax = torch.nn.Softmax(dim=1)

    with torch.no_grad():
        for x, y in test_loader:
            x = x.float().to(device)
            y = y.long().to(device)

            outputs = model(x)
            outputs = softmax(outputs)
            outputs = (outputs > threshold).int()

    y_pred = np.argmax(y_pred, axis=1)

    print("Classification Report:")
    print(classification_report(y_true, y_pred, labels=label_numbers, digits=4))
    for pred, true in zip(y_pred, y_true):
        if pred == true:

    for ln in label_numbers:
        if true_cnts[ln] == 0: continue
        print(f"Name [{classes[ln]}]:",
            f"{pred_ans[ln]/true_cnts[ln]:.4f}", f"({pred_ans[ln]}/{true_cnts[ln]})")



import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

import torch
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn

dataFrame = pd.read_csv("custom_random_data.csv", delimiter=",");

selected_cols = ["F1", "F2", "F4", "F5", "Name"]
selected_feautures = ["F1", "F2", "F4", "F5"]
dataFrame = dataFrame[selected_cols]

# feature 개수, linear input
feature_length = len(dataFrame.columns) - 1

# string type label을 value로 변환하기 위해 참조 list로 변환
name_list = list(set(dataFrame["Name"]))
name_list = np.sort(name_list)
name_dict = {}

for i in range(len(name_list)):
    name_dict[name_list[i]] = i

x_data = dataFrame[selected_feautures].to_numpy()
labels_np = dataFrame["Name"].to_numpy()
y_data = [name_dict[d] for d in labels_np]

# data 확인
# print(x_data, y_data)

# train, test, valid set으로 나눔
test_ratio = 0.2
valid_ratio = 0.2
random_state = 42

# train, test set
x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, test_size=test_ratio, shuffle=True, stratify=y_data, random_state=random_state)
# train, valid set
x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=valid_ratio, shuffle=True, stratify=y_train, random_state=random_state)

# dataset, dataloader, model 생성
batch_size = 16
train_dataset = CustomDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_dataset = CustomDataset(x_valid, y_valid)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True)
device = torch.device('cpu') # torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = CustomModel(feature_length, len(name_list)).to(device)

# loss function, optimizer, scheduler
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters() , lr = 0.01)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.01)

# parameter
epoch = 50
val_every = 3
threshold = 0.8

# train enable
enable_train = 1

if enable_train == 1:
    train(model, criterion, train_loader, valid_loader, device, val_every, threshold)

    label_numbers = list(name_dict.values())

    test_dataset = CustomDataset(x_test, y_test)
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

    evaluate(model, test_loader, name_list, label_numbers, device, threshold)



Train Result
Evaluate Result


