123 lines
4.2 KiB
Python
123 lines
4.2 KiB
Python
import numpy as np
|
|
import torch
|
|
import os
|
|
|
|
from torch import nn
|
|
|
|
with open('data.npy', 'rb') as f:
|
|
data = np.load(f, allow_pickle=True).item()
|
|
X = data['data']
|
|
y = data['label']
|
|
|
|
from torch import nn
|
|
from sklearn.model_selection import train_test_split
|
|
|
|
from torch import nn
|
|
import numpy as np
|
|
import torch
|
|
import os
|
|
|
|
|
|
class CNN3D(nn.Module):
|
|
def __init__(self):
|
|
super(CNN3D, self).__init__()
|
|
self.conv1 = nn.Conv3d(1, 12, 2, 1, 2)
|
|
self.mp = nn.AvgPool3d(2)
|
|
self.relu = nn.LeakyReLU()
|
|
self.fc1 = nn.Linear(3888, 6)
|
|
self.fc2 = nn.Linear(128, 6)
|
|
self.flatten = nn.Flatten()
|
|
|
|
def forward(self, x):
|
|
x = self.conv1(x)
|
|
x = self.mp(x)
|
|
x = self.relu(x)
|
|
|
|
# print(x.shape)
|
|
|
|
x = x.view(-1, 3888)
|
|
x = self.fc1(x)
|
|
# x = self.fc2(x)
|
|
return x
|
|
|
|
|
|
def train(model, criterion, optimizer, loader, epochs=10):
|
|
for epoch in range(epochs):
|
|
for idx, (inputs, labels) in enumerate(loader):
|
|
optimizer.zero_grad()
|
|
outputs = model(inputs)
|
|
loss = criterion(outputs, labels)
|
|
loss.backward()
|
|
optimizer.step()
|
|
print(f'Epoch {epoch}, Loss: {loss.item()}')
|
|
return model
|
|
|
|
|
|
def process_data(X, y):
|
|
y = np.array(y)
|
|
X = np.array([video[:6] for video in X])
|
|
tensor_videos = torch.tensor(X, dtype=torch.float32)
|
|
# Clip values to 0 and 255
|
|
tensor_videos = np.clip(tensor_videos, 0, 255)
|
|
# Replace NaNs in each frame, with the average of the frame. This was generated with GPT
|
|
for i in range(tensor_videos.shape[0]):
|
|
for j in range(tensor_videos.shape[1]):
|
|
tensor_videos[i][j][torch.isnan(tensor_videos[i][j])] = torch.mean(
|
|
tensor_videos[i][j][~torch.isnan(tensor_videos[i][j])])
|
|
# Undersample the data for each of the 6 classes. Select max of 300 samples for each class
|
|
# Very much generated with the assitance of chatGPT with some modifications
|
|
# Get the indices of each class
|
|
indices = [np.argwhere(y == i).squeeze(1) for i in range(6)]
|
|
# Get the number of samples to take for each class
|
|
num_samples_to_take = 300
|
|
# Get the indices of the samples to take
|
|
indices_to_take = [np.random.choice(indices[i], num_samples_to_take, replace=True) for i in range(6)]
|
|
# Concatenate the indices
|
|
indices_to_take = np.concatenate(indices_to_take)
|
|
# Select the samples
|
|
tensor_videos = tensor_videos[indices_to_take].unsqueeze(1)
|
|
y = y[indices_to_take]
|
|
return torch.Tensor(tensor_videos), torch.Tensor(y).long()
|
|
|
|
|
|
class Model():
|
|
def __init__(self):
|
|
self.model = CNN3D()
|
|
self.criterion = nn.CrossEntropyLoss()
|
|
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001)
|
|
|
|
def fit(self, X, y):
|
|
X, y = process_data(X, y)
|
|
train_dataset = torch.utils.data.TensorDataset(X, y)
|
|
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
|
|
train(self.model, self.criterion, self.optimizer, train_loader)
|
|
|
|
def predict(self, X):
|
|
self.model.eval()
|
|
|
|
X = np.array([video[:6] for video in X])
|
|
tensor_videos = torch.tensor(X, dtype=torch.float32)
|
|
# Clip values to 0 and 255
|
|
tensor_videos = np.clip(tensor_videos, 0, 255)
|
|
# Replace NaNs in each frame, with the average of the frame. This was generated with GPT
|
|
for i in range(tensor_videos.shape[0]):
|
|
for j in range(tensor_videos.shape[1]):
|
|
tensor_videos[i][j][torch.isnan(tensor_videos[i][j])] = torch.mean(
|
|
tensor_videos[i][j][~torch.isnan(tensor_videos[i][j])])
|
|
X = torch.Tensor(tensor_videos.unsqueeze(1))
|
|
return np.argmax(self.model(X).detach().numpy(), axis=1)
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)
|
|
|
|
not_nan_indices = np.argwhere(~np.isnan(np.array(y_test))).squeeze()
|
|
y_test = [y_test[i] for i in not_nan_indices]
|
|
X_test = [X_test[i] for i in not_nan_indices]
|
|
|
|
model = Model()
|
|
model.fit(X_train, y_train)
|
|
|
|
from sklearn.metrics import f1_score
|
|
|
|
y_pred = model.predict(X_test)
|
|
print("F1 Score (macro): {0:.2f}".format(f1_score(y_test, y_pred, average='macro'))) # You may encounter errors, you are expected to figure out what's the issue.
|