From 21494068857a9608997b2640908fc885e8638fbe Mon Sep 17 00:00:00 2001 From: Yadunand Prem Date: Sun, 28 Apr 2024 16:18:51 +0800 Subject: [PATCH] feat: 0.43 on coursemo --- cs2109s/labs/final/final.py | 122 ++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 cs2109s/labs/final/final.py diff --git a/cs2109s/labs/final/final.py b/cs2109s/labs/final/final.py new file mode 100644 index 0000000..8462916 --- /dev/null +++ b/cs2109s/labs/final/final.py @@ -0,0 +1,122 @@ +import numpy as np +import torch +import os + +from torch import nn + +with open('data.npy', 'rb') as f: + data = np.load(f, allow_pickle=True).item() + X = data['data'] + y = data['label'] + +from torch import nn +from sklearn.model_selection import train_test_split + +from torch import nn +import numpy as np +import torch +import os + + +class CNN3D(nn.Module): + def __init__(self): + super(CNN3D, self).__init__() + self.conv1 = nn.Conv3d(1, 12, 2, 1, 2) + self.mp = nn.AvgPool3d(2) + self.relu = nn.LeakyReLU() + self.fc1 = nn.Linear(3888, 6) + self.fc2 = nn.Linear(128, 6) + self.flatten = nn.Flatten() + + def forward(self, x): + x = self.conv1(x) + x = self.mp(x) + x = self.relu(x) + + # print(x.shape) + + x = x.view(-1, 3888) + x = self.fc1(x) + # x = self.fc2(x) + return x + + +def train(model, criterion, optimizer, loader, epochs=10): + for epoch in range(epochs): + for idx, (inputs, labels) in enumerate(loader): + optimizer.zero_grad() + outputs = model(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + print(f'Epoch {epoch}, Loss: {loss.item()}') + return model + + +def process_data(X, y): + y = np.array(y) + X = np.array([video[:6] for video in X]) + tensor_videos = torch.tensor(X, dtype=torch.float32) + # Clip values to 0 and 255 + tensor_videos = np.clip(tensor_videos, 0, 255) + # Replace NaNs in each frame, with the average of the frame. This was generated with GPT + for i in range(tensor_videos.shape[0]): + for j in range(tensor_videos.shape[1]): + tensor_videos[i][j][torch.isnan(tensor_videos[i][j])] = torch.mean( + tensor_videos[i][j][~torch.isnan(tensor_videos[i][j])]) + # Undersample the data for each of the 6 classes. Select max of 300 samples for each class + # Very much generated with the assitance of chatGPT with some modifications + # Get the indices of each class + indices = [np.argwhere(y == i).squeeze(1) for i in range(6)] + # Get the number of samples to take for each class + num_samples_to_take = 300 + # Get the indices of the samples to take + indices_to_take = [np.random.choice(indices[i], num_samples_to_take, replace=True) for i in range(6)] + # Concatenate the indices + indices_to_take = np.concatenate(indices_to_take) + # Select the samples + tensor_videos = tensor_videos[indices_to_take].unsqueeze(1) + y = y[indices_to_take] + return torch.Tensor(tensor_videos), torch.Tensor(y).long() + + +class Model(): + def __init__(self): + self.model = CNN3D() + self.criterion = nn.CrossEntropyLoss() + self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001) + + def fit(self, X, y): + X, y = process_data(X, y) + train_dataset = torch.utils.data.TensorDataset(X, y) + train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True) + train(self.model, self.criterion, self.optimizer, train_loader) + + def predict(self, X): + self.model.eval() + + X = np.array([video[:6] for video in X]) + tensor_videos = torch.tensor(X, dtype=torch.float32) + # Clip values to 0 and 255 + tensor_videos = np.clip(tensor_videos, 0, 255) + # Replace NaNs in each frame, with the average of the frame. This was generated with GPT + for i in range(tensor_videos.shape[0]): + for j in range(tensor_videos.shape[1]): + tensor_videos[i][j][torch.isnan(tensor_videos[i][j])] = torch.mean( + tensor_videos[i][j][~torch.isnan(tensor_videos[i][j])]) + X = torch.Tensor(tensor_videos.unsqueeze(1)) + return np.argmax(self.model(X).detach().numpy(), axis=1) + +X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1) + +not_nan_indices = np.argwhere(~np.isnan(np.array(y_test))).squeeze() +y_test = [y_test[i] for i in not_nan_indices] +X_test = [X_test[i] for i in not_nan_indices] + +model = Model() +model.fit(X_train, y_train) + +from sklearn.metrics import f1_score + +y_pred = model.predict(X_test) +print("F1 Score (macro): {0:.2f}".format(f1_score(y_test, y_pred, average='macro'))) # You may encounter errors, you are expected to figure out what's the issue.