GuoXin Li's Blog

高光谱分类-Hyperspectral Classification

字数统计: 3.5k阅读时长: 17 min
2021/01/04 Share

HybridSN

阅读论文:

HybridSN: Exploring 3D-2D CNN Feature Hierarchy for Hyperspectral Image Classification

背景知识

高光谱图像 hyperspectral image:

高光谱遥感指具有高光谱分辨率的遥感数据获取、处理、分析和应用的科学与技术,通常采用覆盖一定波谱范围的成像光谱仪和非成像光谱仪两种传感器获取数据,利用大量窄波段电磁波获取感兴趣目标的理化信息,其基础是光谱学(Spectroscopy)

高光谱图像分类:

分类是高光谱遥感影像处理和应用的一项重要内容,其最终目标是给影像中的每个像元赋以唯一的类别标识。然而,高光谱遥感影像的高维特性、波段间高度相关性、光谱混合等使得高光谱遥感影像分类面临巨大挑战.

作者提出 Hybrid-CNN模型的原因

  • 单纯的2D-CNN并不能从光谱维度中提取出良好的判别特征图。
  • 同样,深层的3D-CNN在计算上更加复杂,单独的3D-CNN对于在许多光谱波段上具有相似纹理的类似乎表现更差。
  • 利用3D和2D混合能够充分地利用光谱特征和空间特征来提高分类精度。

获取数据,并引入基本函数库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
! wget http://www.ehu.eus/ccwintco/uploads/6/67/Indian_pines_corrected.mat
! wget http://www.ehu.eus/ccwintco/uploads/c/c4/Indian_pines_gt.mat
! pip install spectral

import numpy as np
import matplotlib.pyplot as plt
import scipy.io as sio
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score
import spectral
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

定义 hybridSN类

模型的网络结构如下:

Screen Shot 2020-12-31 at 16.53.52

三维卷积部分

  • conv1:(1, 30, 25, 25), 8个 7x3x3 的卷积核 ==>(8, 24, 23, 23)
  • conv2:(8, 24, 23, 23), 16个 5x3x3 的卷积核 ==>(16, 20, 21, 21)
  • conv3:(16, 20, 21, 21),32个 3x3x3 的卷积核 ==>(32, 18, 19, 19)

二维卷积

将前面的 32*18 reshape 得到 (576,19,19)

  • (576, 19, 19) 64个 3x3 的卷积核,==> (64, 17, 17)

  • flatten 操作,变为 18496 维的向量

  • 使用 256,128 的全连接层,都是用0.4的 Dropout
  • 最后输出为 16 个节点,是最终的分类类别数

HybridSN 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class_num = 16

class HybridSN(nn.Module):
def __init__(self):
super(HybridSN,self).__init__()
# 3D
self.conv3d_1=nn.Sequential(
nn.Conv3d(1,8,kernel_size=(7, 3, 3),stride=1, padding=0),
nn.BatchNorm3d(8),
nn.ReLU(inplace=True),
)
self.conv3d_2 = nn.Sequential(
nn.Conv3d(8, 16, kernel_size=(5, 3, 3), stride=1, padding=0),
nn.BatchNorm3d(16),
nn.ReLU(inplace = True),
)
self.conv3d_3 = nn.Sequential(
nn.Conv3d(16, 32, kernel_size=(3, 3, 3), stride=1, padding=0),
nn.BatchNorm3d(32),
nn.ReLU(inplace = True)
)
# 2D
self.conv2d = nn.Sequential(
nn.Conv2d(576, 64, kernel_size=(3, 3), stride=1, padding=0),
nn.BatchNorm2d(64),
nn.ReLU(inplace = True),
)
self.fc1 = nn.Linear(18496,256)
self.fc2 = nn.Linear(256,128)
self.fc3 = nn.Linear(128,16)
self.dropout = nn.Dropout(p = 0.4)

def forward(self,x):
out = self.conv3d_1(x)
out = self.conv3d_2(out)
out = self.conv3d_3(out)
out = self.conv2d(out.reshape(out.shape[0],-1,19,19))
out = out.reshape(out.shape[0],-1)
out = F.relu(self.dropout(self.fc1(out)))
out = F.relu(self.dropout(self.fc2(out)))
out = self.fc3(out)
return out

# 随机输入,测试网络结构是否通
# x = torch.randn(1, 1, 30, 25, 25)
# net = HybridSN()
# y = net(x)
# print(y.shape)

创建数据集

首先对高光谱数据实施PCA降维;然后创建 keras 方便处理的数据格式;然后随机抽取 10% 数据做为训练集,剩余的做为测试集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 对高光谱数据 X 应用 PCA 变换
def applyPCA(X, numComponents):
newX = np.reshape(X, (-1, X.shape[2]))
pca = PCA(n_components=numComponents, whiten=True)
newX = pca.fit_transform(newX)
newX = np.reshape(newX, (X.shape[0], X.shape[1], numComponents))
return newX

# 对单个像素周围提取 patch 时,边缘像素就无法取了,因此,给这部分像素进行 padding 操作
def padWithZeros(X, margin=2):
newX = np.zeros((X.shape[0] + 2 * margin, X.shape[1] + 2* margin, X.shape[2]))
x_offset = margin
y_offset = margin
newX[x_offset:X.shape[0] + x_offset, y_offset:X.shape[1] + y_offset, :] = X
return newX

# 在每个像素周围提取 patch ,然后创建成符合 keras 处理的格式
def createImageCubes(X, y, windowSize=5, removeZeroLabels = True):
# 给 X 做 padding
margin = int((windowSize - 1) / 2)
zeroPaddedX = padWithZeros(X, margin=margin)
# split patches
patchesData = np.zeros((X.shape[0] * X.shape[1], windowSize, windowSize, X.shape[2]))
patchesLabels = np.zeros((X.shape[0] * X.shape[1]))
patchIndex = 0
for r in range(margin, zeroPaddedX.shape[0] - margin):
for c in range(margin, zeroPaddedX.shape[1] - margin):
patch = zeroPaddedX[r - margin:r + margin + 1, c - margin:c + margin + 1]
patchesData[patchIndex, :, :, :] = patch
patchesLabels[patchIndex] = y[r-margin, c-margin]
patchIndex = patchIndex + 1
if removeZeroLabels:
patchesData = patchesData[patchesLabels>0,:,:,:]
patchesLabels = patchesLabels[patchesLabels>0]
patchesLabels -= 1
return patchesData, patchesLabels

def splitTrainTestSet(X, y, testRatio, randomState=345):
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=testRatio, random_state=randomState, stratify=y)
return X_train, X_test, y_train, y_test

读取并创建数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# 地物类别
class_num = 16
X = sio.loadmat('Indian_pines_corrected.mat')['indian_pines_corrected']
y = sio.loadmat('Indian_pines_gt.mat')['indian_pines_gt']

# 用于测试样本的比例
test_ratio = 0.90
# 每个像素周围提取 patch 的尺寸
patch_size = 25
# 使用 PCA 降维,得到主成分的数量
pca_components = 30

print('Hyperspectral data shape: ', X.shape)
print('Label shape: ', y.shape)

print('\n... ... PCA tranformation ... ...')
X_pca = applyPCA(X, numComponents=pca_components)
print('Data shape after PCA: ', X_pca.shape)

print('\n... ... create data cubes ... ...')
X_pca, y = createImageCubes(X_pca, y, windowSize=patch_size)
print('Data cube X shape: ', X_pca.shape)
print('Data cube y shape: ', y.shape)

print('\n... ... create train & test data ... ...')
Xtrain, Xtest, ytrain, ytest = splitTrainTestSet(X_pca, y, test_ratio)
print('Xtrain shape: ', Xtrain.shape)
print('Xtest shape: ', Xtest.shape)

# 改变 Xtrain, Ytrain 的形状,以符合 keras 的要求
Xtrain = Xtrain.reshape(-1, patch_size, patch_size, pca_components, 1)
Xtest = Xtest.reshape(-1, patch_size, patch_size, pca_components, 1)
print('before transpose: Xtrain shape: ', Xtrain.shape)
print('before transpose: Xtest shape: ', Xtest.shape)

# 为了适应 pytorch 结构,数据要做 transpose
Xtrain = Xtrain.transpose(0, 4, 3, 1, 2)
Xtest = Xtest.transpose(0, 4, 3, 1, 2)
print('after transpose: Xtrain shape: ', Xtrain.shape)
print('after transpose: Xtest shape: ', Xtest.shape)


""" Training dataset"""
class TrainDS(torch.utils.data.Dataset):
def __init__(self):
self.len = Xtrain.shape[0]
self.x_data = torch.FloatTensor(Xtrain)
self.y_data = torch.LongTensor(ytrain)
def __getitem__(self, index):
# 根据索引返回数据和对应的标签
return self.x_data[index], self.y_data[index]
def __len__(self):
# 返回文件数据的数目
return self.len

""" Testing dataset"""
class TestDS(torch.utils.data.Dataset):
def __init__(self):
self.len = Xtest.shape[0]
self.x_data = torch.FloatTensor(Xtest)
self.y_data = torch.LongTensor(ytest)
def __getitem__(self, index):
# 根据索引返回数据和对应的标签
return self.x_data[index], self.y_data[index]
def __len__(self):
# 返回文件数据的数目
return self.len

# 创建 trainloader 和 testloader
trainset = TrainDS()
testset = TestDS()
train_loader = torch.utils.data.DataLoader(dataset=trainset, batch_size=128, shuffle=True, num_workers=2)
test_loader = torch.utils.data.DataLoader(dataset=testset, batch_size=128, shuffle=False, num_workers=2)

out:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Hyperspectral data shape:  (145, 145, 200)
Label shape: (145, 145)

... ... PCA tranformation ... ...
Data shape after PCA: (145, 145, 30)

... ... create data cubes ... ...
Data cube X shape: (10249, 25, 25, 30)
Data cube y shape: (10249,)

... ... create train & test data ... ...
Xtrain shape: (1024, 25, 25, 30)
Xtest shape: (9225, 25, 25, 30)
before transpose: Xtrain shape: (1024, 25, 25, 30, 1)
before transpose: Xtest shape: (9225, 25, 25, 30, 1)
after transpose: Xtrain shape: (1024, 1, 30, 25, 25)
after transpose: Xtest shape: (9225, 1, 30, 25, 25)

开始训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 使用GPU训练,可以在菜单 "代码执行工具" -> "更改运行时类型" 里进行设置
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 网络放到GPU上
net = HybridSN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.001)

# 开始训练
total_loss = 0
for epoch in range(100):
for i, (inputs, labels) in enumerate(train_loader):
inputs = inputs.to(device)
labels = labels.to(device)
# 优化器梯度归零
optimizer.zero_grad()
# 正向传播 + 反向传播 + 优化
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
print('[Epoch: %d] [loss avg: %.4f] [current loss: %.4f]' %(epoch + 1, total_loss/(epoch+1), loss.item()))

print('Finished Training')

out:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[Epoch: 1]   [loss avg: 20.8664]   [current loss: 2.5130]
[Epoch: 2] [loss avg: 20.1485] [current loss: 2.3768]
[Epoch: 3] [loss avg: 19.5493] [current loss: 2.2176]
[Epoch: 4] [loss avg: 18.9607] [current loss: 2.0665]
[Epoch: 5] [loss avg: 18.4340] [current loss: 1.9149]
...

[Epoch: 70] [loss avg: 2.4159] [current loss: 0.0473]
[Epoch: 71] [loss avg: 2.3844] [current loss: 0.0155]
[Epoch: 72] [loss avg: 2.3523] [current loss: 0.0012]
[Epoch: 73] [loss avg: 2.3204] [current loss: 0.0043]
[Epoch: 74] [loss avg: 2.2897] [current loss: 0.0007]
[Epoch: 75] [loss avg: 2.2593] [current loss: 0.0004]
...
[Epoch: 95] [loss avg: 1.8108] [current loss: 0.0410]
[Epoch: 96] [loss avg: 1.7933] [current loss: 0.0426]
[Epoch: 97] [loss avg: 1.7755] [current loss: 0.0059]
[Epoch: 98] [loss avg: 1.7581] [current loss: 0.0113]
[Epoch: 99] [loss avg: 1.7415] [current loss: 0.0016]
[Epoch: 100] [loss avg: 1.7261] [current loss: 0.0187]
Finished Training

模型测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
count = 0
# 模型测试
for inputs, _ in test_loader:
inputs = inputs.to(device)
outputs = net(inputs)
outputs = np.argmax(outputs.detach().cpu().numpy(), axis=1)
if count == 0:
y_pred_test = outputs
count = 1
else:
y_pred_test = np.concatenate( (y_pred_test, outputs) )

# 生成分类报告
classification = classification_report(ytest, y_pred_test, digits=4)
print(classification)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
              precision    recall  f1-score   support

0.0 0.9487 0.9024 0.9250 41
1.0 0.9540 0.9518 0.9529 1285
2.0 0.9495 0.9572 0.9533 747
3.0 1.0000 0.8028 0.8906 213
4.0 0.8747 0.9954 0.9312 435
5.0 0.9847 0.9787 0.9817 657
6.0 1.0000 0.9200 0.9583 25
7.0 0.9534 1.0000 0.9762 430
8.0 0.8571 0.6667 0.7500 18
9.0 0.9939 0.9371 0.9647 875
10.0 0.9695 0.9941 0.9817 2210
11.0 0.9393 0.9270 0.9331 534
12.0 0.9432 0.8973 0.9197 185
13.0 0.9806 0.9781 0.9793 1139
14.0 0.9647 0.9452 0.9549 347
15.0 0.9459 0.8333 0.8861 84

accuracy 0.9623 9225
macro avg 0.9537 0.9179 0.9337 9225
weighted avg 0.9631 0.9623 0.9620 9225

显示分类结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#显示分类结果
# load the original image
X = sio.loadmat('Indian_pines_corrected.mat')['indian_pines_corrected']
y = sio.loadmat('Indian_pines_gt.mat')['indian_pines_gt']

height = y.shape[0]
width = y.shape[1]

X = applyPCA(X, numComponents= pca_components)
X = padWithZeros(X, patch_size//2)

# 逐像素预测类别
outputs = np.zeros((height,width))
for i in range(height):
for j in range(width):
if int(y[i,j]) == 0:
continue
else :
image_patch = X[i:i+patch_size, j:j+patch_size, :]
image_patch = image_patch.reshape(1,image_patch.shape[0],image_patch.shape[1], image_patch.shape[2], 1)
X_test_image = torch.FloatTensor(image_patch.transpose(0, 4, 3, 1, 2)).to(device)
prediction = net(X_test_image)
prediction = np.argmax(prediction.detach().cpu().numpy(), axis=1)
outputs[i][j] = prediction+1
if i % 20 == 0:
print('... ... row ', i, ' handling ... ...')

predict_image = spectral.imshow(classes = outputs.astype(int),figsize =(5,5))

image-20210104131557618

加入注意力机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# 通道注意力机制
class ChannelAttention(nn.Module):
def __init__(self, in_planes, ratio=16):
super(ChannelAttention, self).__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.max_pool = nn.AdaptiveMaxPool2d(1)

self.fc1 = nn.Conv2d(in_planes, in_planes // 16, 1, bias=False)
self.relu1 = nn.ReLU()
self.fc2 = nn.Conv2d(in_planes // 16, in_planes, 1, bias=False)

self.sigmoid = nn.Sigmoid()

def forward(self, x):
avg_out = self.fc2(self.relu1(self.fc1(self.avg_pool(x))))
max_out = self.fc2(self.relu1(self.fc1(self.max_pool(x))))
out = avg_out + max_out
return self.sigmoid(out)
# 空间注意力机制
class SpatialAttention(nn.Module):
def __init__(self, kernel_size=7):
super(SpatialAttention, self).__init__()

assert kernel_size in (3, 7), 'kernel size must be 3 or 7'
padding = 3 if kernel_size == 7 else 1

self.conv1 = nn.Conv2d(2, 1, kernel_size, padding=padding, bias=False)
self.sigmoid = nn.Sigmoid()

def forward(self, x):
avg_out = torch.mean(x, dim=1, keepdim=True)
max_out, _ = torch.max(x, dim=1, keepdim=True)
x = torch.cat([avg_out, max_out], dim=1)
x = self.conv1(x)
return self.sigmoid(x)
class HybridSN(nn.Module):
def __init__(self, num_classes=16, self_attention=True):
super(HybridSN, self).__init__()
self.self_attention = self_attention

# 3D
self.block_1_3D = nn.Sequential(
nn.Conv3d(
in_channels=1,
out_channels=8,
kernel_size=(7, 3, 3),
stride=1,
padding=0
),
nn.ReLU(inplace=True),
nn.Conv3d(
in_channels=8,
out_channels=16,
kernel_size=(5, 3, 3),
stride=1,
padding=0
),
nn.ReLU(inplace=True),
nn.Conv3d(
in_channels=16,
out_channels=32,
kernel_size=(3, 3, 3),
stride=1,
padding=0
),
nn.ReLU(inplace=True)
)

if self_attention:
self.channel_attention_1 = ChannelAttention(576)
self.spatial_attention_1 = SpatialAttention(kernel_size=7)

# 2D
self.block_2_2D = nn.Sequential(
nn.Conv2d(
in_channels=576,
out_channels=64,
kernel_size=(3, 3)
),
nn.ReLU(inplace=True)
)

if self_attention:
self.channel_attention_2 = ChannelAttention(64)
self.spatial_attention_2 = SpatialAttention(kernel_size=7)

# full connect
self.classifier = nn.Sequential(
nn.Linear(
in_features=18496,
out_features=256
),
nn.Dropout(p=0.4),
nn.Linear(
in_features=256,
out_features=128
),
nn.Dropout(p=0.4),
nn.Linear(
in_features=128,
out_features=num_classes
)
)
def forward(self, x):
y = self.block_1_3D(x)
y = y.view(-1, y.shape[1] * y.shape[2], y.shape[3], y.shape[4])
if self.self_attention:
y = self.channel_attention_1(y) * y
y = self.spatial_attention_1(y) * y
y = self.block_2_2D(y)
if self.self_attention:
y = self.channel_attention_2(y) * y
y = self.spatial_attention_2(y) * y

y = y.view(y.size(0), -1)

y = self.classifier(y)
return y

Screen Shot at 00.29.32

模型检测

测试次数 测试结果
测试1 Screen Shot 2021-01-03 at 17.12.28
测试2 image-20210103171324381
测试3 Screen Shot 2021-01-03 at 17.13.46

思考题🤔

一:3D卷机与2D卷积的区别

  • 卷积的方向和输出的形状很重要
一维卷积 二维卷积 三维卷积
Screen Shot 2021-01-03 at 15.29.45 image-20210103153053452 image-20210103153105101
  • 卷积核的区别:2D 3D
二维卷积 三维卷积
v2-8a6695c2e086525ac5a61610348739b2_b.webp v2-86e2bd970d07f9d6e1d921b248e45a3a_b.webp
  • 3D 卷积多了一个深度的通道,但是这跟单纯的2D卷积的多通道卷积区别在于,3D卷积由于卷积核本身就是3D的,所以权重共享。
  • 3D卷积多了一个深度通道,找个深度可能是视频上的连续帧,也可能是立体图像中的不同切片。

二:训练网络,然后多测试几次,会发现每次分类的结果都不一样,请思考为什么?

这是因为在训练model之后再进行测试样本时,需要在 model 之前加上 model.eval() 这个函数调用。否则即使不训练,也会去改变权值,这是 model 中含有的 batch normalization 即 BN 所导致的。

在模型检测 model.eval() 后,Pytorch 会自动把 BN 和 Dropout 固定住,不会取平均,而是用训练好的值,一旦测试的的batch_size过小,很容易就会被BN层影响结果。

所以需要对源代码的训练和测试分别加上 model.train() 和 model.eval()来区分开来,这样多次测试的结果就一致了。

三:如果想要进一步提升高光谱图像的分类性能,可以如何使用注意力机制?

我们可以粗略地把神经注意机制类比成一个可以专注于输入内容的某一子集(或特征)的神经网络. 注意力机制最早是由 DeepMind 为图像分类提出的,这让「神经网络在执行预测任务时可以更多关注输入中的相关部分,更少关注不相关的部分」。

注意力机制像人眼观察事物的模式,使网络更加有侧重的学习,以此提高网络的学习能力。

可以通过构建不同的注意力模块(通道注意力模块、光谱注意力模块、空间注意力模块)然后将其并入到原有的卷积网络中,所构建的注意力子网络模块能够分别关注到通道、光谱域和空间域中更多的信息。

  • 通道注意力机制(Channel Attention,CA)是对同一个特征图的不同通道进行选择 优化,获取重校订的通道信息;
  • 空间注意力机制(Spatial Attention,SA)则是对同一个特征图的所有空间位置重新分配权重,然后通过 Sigmoid 函数来激活得到非线性的重校订上下文信息。

参考:

CATALOG
  1. 1. HybridSN
    1. 1.1. 背景知识
    2. 1.2. 获取数据,并引入基本函数库
    3. 1.3. 定义 hybridSN类
    4. 1.4. 创建数据集
    5. 1.5. 开始训练
    6. 1.6. 模型测试
    7. 1.7. 显示分类结果
    8. 1.8. 加入注意力机制
    9. 1.9. 思考题🤔