数据增强函数
import torch
from torchvision.transforms import functional as TVF
def tiling_augmentation(img, bboxes, density_map, resize, jitter, tile_size, hflip_p):
# 定义一个辅助函数,用于根据给定的概率水平翻转张量
def apply_hflip(tensor, apply):
return TVF.hflip(tensor) if apply else tensor
# 定义一个辅助函数,用于生成平铺的图像或密度图
def make_tile(x, num_tiles, hflip, hflip_p, jitter=None):
result = list()
for j in range(num_tiles):
row = list()
for k in range(num_tiles):
t = jitter(x) if jitter is not None else x
if hflip[j, k] < hflip_p: # 根据给定的概率水平翻转
t = apply_hflip(t, True)
row.append(t)
result.append(torch.cat(row, dim=-1)) # 按列连接
return torch.cat(result, dim=-2) # 按行连接
x_tile, y_tile = tile_size # 获取平铺的尺寸
y_target, x_target = resize.size # 获取目标尺寸
num_tiles = max(int(x_tile.ceil()), int(y_tile.ceil())) # 计算平铺的数量
hflip = torch.rand(num_tiles, num_tiles) # 随机生成水平翻转的决策矩阵
# 对图像进行平铺增强
img = make_tile(img, num_tiles, hflip, hflip_p, jitter)
img = resize(img[..., :int(y_tile*y_target), :int(x_tile*x_target)])
# 对密度图进行平铺增强
density_map = make_tile(density_map, num_tiles, hflip, hflip_p)
density_map = density_map[..., :int(y_tile*y_target), :int(x_tile*x_target)]
original_sum = density_map.sum() # 保存原始密度图的总和
density_map = resize(density_map) # 应用缩放
density_map = density_map / density_map.sum() * original_sum # 归一化并恢复原始总和
# 根据第一个平铺的翻转状态调整边界框
if hflip[0, 0] < hflip_p:
bboxes[:, [0, 2]] = x_target - bboxes[:, [2, 0]] # 更新边界框的左右边界
bboxes = bboxes / torch.tensor([x_tile, y_tile, x_tile, y_tile]) # 调整边界框的大小
return img, bboxes, density_map
这段代码定义了一个名为 tiling_augmentation
的函数,它用于对图像、边界框和密度图进行平铺增强处理。平铺增强是一种数据增强技术,通过将图像分割成小块(平铺),对每个小块进行随机变换(如水平翻转、缩放等),然后将它们重新组合,以增加数据集的多样性,提高模型的泛化能力。
功能解释:
tiling_augmentation
函数接收图像img
、边界框bboxes
、密度图density_map
,以及其他几个参数,包括resize
(缩放函数)、jitter
(抖动函数,用于增加图像的多样性)、tile_size
(平铺的尺寸),以及hflip_p
(水平翻转的概率)。- 函数首先定义了两个辅助函数:
apply_hflip
用于水平翻转张量,make_tile
用于生成平铺的图像或密度图。 make_tile
函数通过迭代每个平铺块,应用随机的水平翻转和(如果提供)抖动变换,然后将块按行列连接起来。- 图像和密度图使用
make_tile
函数进行增强,然后使用resize
函数调整大小。 - 密度图在缩放后进行归一化,以保持对象数量的一致性。
- 边界框根据平铺的翻转状态进行调整,以确保它们在增强后的图像中保持正确。
- 函数返回增强后的图像、调整后的边界框和密度图。
整体而言,tiling_augmentation
函数通过平铺增强技术,为图像和相关数据提供了一种有效的数据增强方法,有助于提高模型在面对不同图像变换时的鲁棒性。
FSC147Dataset
类
import os
import json
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms as T
from torchvision.transforms import functional as TVF
class FSC147Dataset(Dataset):
def __init__(
self, data_path, img_size, split='train', num_objects=3,
tiling_p=0.5, zero_shot=False
):
# 初始化数据集的属性
self.split = split # 数据集的分割类型(训练、验证或测试)
self.data_path = data_path # 数据集的根目录路径
self.horizontal_flip_p = 0.5 # 水平翻转的概率
self.tiling_p = tiling_p # 平铺增强的概率
self.img_size = img_size # 图像的尺寸
self.resize = T.Resize((img_size, img_size)) # 定义图像的缩放操作
self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8) # 定义颜色抖动增强
self.num_objects = num_objects # 每个图像中对象的数量
self.zero_shot = zero_shot # 是否为零样本学习场景
# 加载数据集的分割信息和注释信息
with open(os.path.join(self.data_path, 'Train_Test_Val_FSC_147.json'), 'rb') as file:
splits = json.load(file)
self.image_names = splits[split]
with open(os.path.join(self.data_path, 'annotation_FSC147_384.json'), 'rb') as file:
self.annotations = json.load(file)
def __getitem__(self, idx: int):
# 根据索引 idx 获取一个数据样本
img = Image.open(os.path.join(
self.data_path,
'images_384_VarV2',
self.image_names[idx]
)).convert("RGB") # 加载图像并转换为RGB格式
# 根据数据集的分割类型进行不同的预处理
if self.split != 'train':
img = T.Compose([
T.ToTensor(), # 将图像转换为张量
self.resize, # 缩放图像
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 标准化
])(img)
else:
img = T.Compose([
T.ToTensor(), # 将图像转换为张量
self.resize, # 缩放图像
])(img)
# 加载并处理边界框
bboxes = torch.tensor(
self.annotations[self.image_names[idx]]['box_examples_coordinates'],
dtype=torch.float32
)[:3, [0, 2], :].reshape(-1, 4)[:self.num_objects, ...]
bboxes = bboxes / torch.tensor([w, h, w, h]) * self.img_size
# 加载并处理密度图
density_map = torch.from_numpy(np.load(os.path.join(
self.data_path,
'gt_density_map_adaptive_512_512_object_VarV2',
os.path.splitext(self.image_names[idx])[0] + '.npy',
))).unsqueeze(0)
# 如果图像尺寸不是512,则调整密度图的大小
if self.img_size != 512:
original_sum = density_map.sum()
density_map = self.resize(density_map)
density_map = density_map / density_map.sum() * original_sum
# 数据增强
tiled = False
if self.split == 'train' and torch.rand(1) < self.tiling_p:
tiled = True
tile_size = (torch.rand(1) + 1, torch.rand(1) + 1)
img, bboxes, density_map = tiling_augmentation(
img, bboxes, density_map, self.resize,
self.jitter, tile_size, self.horizontal_flip_p
)
# 如果是训练集且没有进行平铺增强,则应用颜色抖动
if self.split == 'train':
if not tiled:
img = self.jitter(img)
img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img)
# 如果是训练集且没有进行平铺增强且随机数小于水平翻转概率,则水平翻转图像
if self.split == 'train' and not tiled and torch.rand(1) < self.horizontal_flip_p:
img = TVF.hflip(img)
density_map = TVF.hflip(density_map)
bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]]
return img, bboxes, density_map
def __len__(self):
# 返回数据集中样本的数量
return len(self.image_names)
功能解释:
FSC147Dataset
类用于创建一个可以用于 PyTorch 数据加载器的对象,它加载 FSC-147 数据集中的图像、边界框和密度图,并可以对它们进行预处理和增强。- 在初始化方法
__init__
中,加载了数据集的分割信息和注释信息,并设置了图像的预处理和增强操作。 __getitem__
方法根据索引idx
加载并返回一个数据样本,包括图像、边界框和密度图。根据数据集的分割类型,应用了不同的预处理操作。对于训练集,还应用了平铺增强和颜色抖动等数据增强技术。tiling_augmentation
函数用于执行平铺增强,它将图像分割成小块,对每个小块进行随机变换,然后重新组合。Normalize
用于将图像的像素值标准化,以便于神经网络处理。ColorJitter
用于颜色抖动增强,它可以随机改变图像的亮度、对比度、饱和度和色调。RandomApply
用于以一定概率应用变换序列。__len__
方法返回数据集中样本的数量,它被用于 PyTorch 数据加载器来确定迭代次数。
整体而言,FSC147Dataset
类提供了一个灵活的方式来加载和处理 FSC-147 数据集,支持多种预处理和数据增强技术,有助于提高模型的性能和泛化能力。
generate_density_maps
这段代码定义了一个名为 generate_density_maps
的函数,它用于生成密度图,这些密度图可以用于对象计数任务。此外,代码中还包含了一个命令行参数解析器,用于指定数据路径和图像尺寸。
import os
import torch
import numpy as np
from PIL import Image
from torchvision import transforms as T
from torchvision.ops import box_convert
from scipy.ndimage import gaussian_filter
from tqdm import tqdm
import argparse
def generate_density_maps(data_path, target_size=(512, 512)):
# 计算密度图的保存路径
density_map_path = os.path.join(
data_path,
f'gt_density_map_adaptive_{target_size[0]}_{target_size[1]}_object_VarV2'
)
# 如果路径不存在,则创建目录
if not os.path.isdir(density_map_path):
os.makedirs(density_map_path)
# 加载注释信息
with open(os.path.join(data_path, 'annotation_FSC147_384.json'), 'rb') as file:
annotations = json.load(file)
# 设置设备为GPU或CPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# 遍历注释中的所有图像
for i, (image_name, ann) in enumerate(tqdm(annotations.items())):
# 加载图像并获取高度和宽度
img = Image.open(os.path.join(
data_path,
'images_384_VarV2',
image_name
))
_, h, w = T.ToTensor()(img).size()
# 计算缩放比例
h_ratio, w_ratio = target_size[0] / h, target_size[1] / w
# 调整点的坐标
points = (
torch.tensor(ann['points'], device=device) *
torch.tensor([w_ratio, h_ratio], device=device)
).long()
# 限制点的坐标在目标尺寸内
points[:, 0] = points[:, 0].clip(0, target_size[1] - 1)
points[:, 1] = points[:, 1].clip(0, target_size[0] - 1)
# 调整边界框的坐标
bboxes = box_convert(torch.tensor(
ann['box_examples_coordinates'],
dtype=torch.float32,
device=device
)[:3, [0, 2], :].reshape(-1, 4), in_fmt='xyxy', out_fmt='xywh')
bboxes = bboxes * torch.tensor([w_ratio, h_ratio, w_ratio, h_ratio], device=device)
# 计算窗口大小
window_size = bboxes.mean(dim=0)[2:].cpu().numpy()[::-1]
# 初始化密度图
dmap = torch.zeros(*target_size)
# 根据点的坐标在密度图中增加计数
for p in range(points.size(0)):
dmap[points[p, 1], points[p, 0]] += 1
# 应用高斯滤波器平滑密度图
dmap = gaussian_filter(dmap.cpu().numpy(), window_size / 8)
# 保存密度图
np.save(os.path.join(density_map_path, os.path.splitext(image_name)[0] + '.npy'), dmap)
if __name__ == '__main__':
# 创建命令行参数解析器
parser = argparse.ArgumentParser("Density map generator", add_help=False)
parser.add_argument(
'--data_path',
default='/home/nikola/master-thesis/data/fsc147/',
type=str
)
parser.add_argument('--image_size', default=512, type=int)
# 解析命令行参数
args = parser.parse_args()
# 调用函数生成密度图
generate_density_maps(args.data_path, (args.image_size, args.image_size))
功能解释:
generate_density_maps
函数接收数据路径和目标尺寸作为参数,用于生成和保存密度图。- 首先,它计算密度图的保存路径,并创建必要的目录。
- 然后,它加载注释信息,并遍历每张图像及其注释。
- 对于每张图像,它调整点和边界框的坐标,以适应目标尺寸。
- 使用
box_convert
函数将边界框的格式从xyxy
转换为xywh
。 - 根据点的坐标在密度图中增加计数,并应用高斯滤波器进行平滑处理。
- 最后,它将生成的密度图保存为
.npy
文件。
在 if __name__ == '__main__':
部分,代码使用 argparse
库处理命令行参数,允许用户指定数据路径和图像尺寸。然后调用 generate_density_maps
函数执行密度图的生成。
整体而言,这段代码提供了一个自动化的工具,用于生成和保存密度图,这些密度图可以用于训练和评估对象计数模型。通过调整点和边界框的坐标以及应用高斯滤波器,生成的密度图为模型提供了丰富的空间信息,有助于提高计数任务的性能。
完整代码
import os
import json
import argparse
from PIL import Image
import numpy as np
from scipy.ndimage import gaussian_filter
import torch
from torch.utils.data import Dataset
from torchvision.ops import box_convert
from torchvision import transforms as T
from torchvision.transforms import functional as TVF
from tqdm import tqdm
def tiling_augmentation(img, bboxes, density_map, resize, jitter, tile_size, hflip_p):
# 定义一个辅助函数,用于根据给定的概率水平翻转张量
def apply_hflip(tensor, apply):
return TVF.hflip(tensor) if apply else tensor
# 定义一个辅助函数,用于生成平铺的图像或密度图
def make_tile(x, num_tiles, hflip, hflip_p, jitter=None):
result = list()
for j in range(num_tiles):
row = list()
for k in range(num_tiles):
t = jitter(x) if jitter is not None else x
if hflip[j, k] < hflip_p:# 根据给定的概率水平翻转
t = TVF.hflip(t)
row.append(t)
result.append(torch.cat(row, dim=-1))# 按列连接
return torch.cat(result, dim=-2)# 按行连接
x_tile, y_tile = tile_size# 获取平铺的尺寸
y_target, x_target = resize.size# 获取目标尺寸
num_tiles = max(int(x_tile.ceil()), int(y_tile.ceil()))# 计算平铺的数量
# whether to horizontally flip each tile
hflip = torch.rand(num_tiles, num_tiles)# 随机生成水平翻转的决策矩阵
img = make_tile(img, num_tiles, hflip, hflip_p, jitter)# 对图像进行平铺增强
img = resize(img[..., :int(y_tile*y_target), :int(x_tile*x_target)])
density_map = make_tile(density_map, num_tiles, hflip, hflip_p)# 对密度图进行平铺增强
density_map = density_map[..., :int(y_tile*y_target), :int(x_tile*x_target)]
original_sum = density_map.sum()# 保存原始密度图的总和
density_map = resize(density_map)# 应用缩放
density_map = density_map / density_map.sum() * original_sum# 归一化并恢复原始总和
# 根据第一个平铺的翻转状态调整边界框
if hflip[0, 0] < hflip_p:
# 更新边界框的左右边界
bboxes[:, [0, 2]] = x_target - bboxes[:, [2, 0]] # TODO change
# 调整边界框的大小
bboxes = bboxes / torch.tensor([x_tile, y_tile, x_tile, y_tile])
return img, bboxes, density_map
class FSC147Dataset(Dataset):
def __init__(
self, data_path, img_size, split='train', num_objects=3,
tiling_p=0.5, zero_shot=False
):
# 初始化数据集的属性
self.split = split # 数据集的分割类型(训练、验证或测试)
self.data_path = data_path # 数据集的根目录路径
self.horizontal_flip_p = 0.5 # 水平翻转的概率
self.tiling_p = tiling_p # 平铺增强的概率
self.img_size = img_size # 图像的尺寸
self.resize = T.Resize((img_size, img_size)) # 定义图像的缩放操作
self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8) # 定义颜色抖动增强
self.num_objects = num_objects # 每个图像中对象的数量
self.zero_shot = zero_shot # 是否为零样本学习场景
# 加载数据集的分割信息和注释信息
with open(
os.path.join(self.data_path, 'Train_Test_Val_FSC_147.json'), 'rb'
) as file:
splits = json.load(file)
self.image_names = splits[split]
with open(
os.path.join(self.data_path, 'annotation_FSC147_384.json'), 'rb'
) as file:
self.annotations = json.load(file)
def __getitem__(self, idx: int):
# 根据索引 idx 获取一个数据样本
# 加载图像并转换为RGB格式
img = Image.open(os.path.join(
self.data_path,
'images_384_VarV2',
self.image_names[idx]
)).convert("RGB")
w, h = img.size
# 根据数据集的分割类型进行不同的预处理
if self.split != 'train':
img = T.Compose([
T.ToTensor(), # 将图像转换为张量
self.resize, # 缩放图像
# 标准化
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])(img)
else:
img = T.Compose([
T.ToTensor(), # 将图像转换为张量
self.resize, # 缩放图像
])(img)
# 加载并处理边界框
bboxes = torch.tensor(
self.annotations[self.image_names[idx]]['box_examples_coordinates'],
dtype=torch.float32
)[:3, [0, 2], :].reshape(-1, 4)[:self.num_objects, ...]
bboxes = bboxes / torch.tensor([w, h, w, h]) * self.img_size
# 加载并处理密度图
density_map = torch.from_numpy(np.load(os.path.join(
self.data_path,
'gt_density_map_adaptive_512_512_object_VarV2',
os.path.splitext(self.image_names[idx])[0] + '.npy',
))).unsqueeze(0)
# 如果图像尺寸不是512,则调整密度图的大小
if self.img_size != 512:
original_sum = density_map.sum()
density_map = self.resize(density_map)
density_map = density_map / density_map.sum() * original_sum
# data augmentation 数据增强
tiled = False
if self.split == 'train' and torch.rand(1) < self.tiling_p:
tiled = True
tile_size = (torch.rand(1) + 1, torch.rand(1) + 1)
img, bboxes, density_map = tiling_augmentation(
img, bboxes, density_map, self.resize,
self.jitter, tile_size, self.horizontal_flip_p
)
# 如果是训练集且没有进行平铺增强,则应用颜色抖动
if self.split == 'train':
if not tiled:
img = self.jitter(img)
img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img)
# 如果是训练集且没有进行平铺增强且随机数小于水平翻转概率,则水平翻转图像
if self.split == 'train' and not tiled and torch.rand(1) < self.horizontal_flip_p:
img = TVF.hflip(img)
density_map = TVF.hflip(density_map)
bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]]
return img, bboxes, density_map
def __len__(self):
# 返回数据集中样本的数量
return len(self.image_names)
def generate_density_maps(data_path, target_size=(512, 512)):
# 计算密度图的保存路径
density_map_path = os.path.join(
data_path,
f'gt_density_map_adaptive_{target_size[0]}_{target_size[1]}_object_VarV2'
)
# 如果路径不存在,则创建目录
if not os.path.isdir(density_map_path):
os.makedirs(density_map_path)
# 加载注释信息
with open(
os.path.join(data_path, 'annotation_FSC147_384.json'), 'rb'
) as file:
annotations = json.load(file)
# 设置设备为GPU或CPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# 遍历注释中的所有图像
# 加载图像并获取高度和宽度
for i, (image_name, ann) in enumerate(tqdm(annotations.items())):
_, h, w = T.ToTensor()(Image.open(os.path.join(
data_path,
'images_384_VarV2',
image_name
))).size()
# 计算缩放比例
h_ratio, w_ratio = target_size[0] / h, target_size[1] / w
# 调整点的坐标
points = (
torch.tensor(ann['points'], device=device) *
torch.tensor([w_ratio, h_ratio], device=device)
).long()
# 限制点的坐标在目标尺寸内
points[:, 0] = points[:, 0].clip(0, target_size[1] - 1)
points[:, 1] = points[:, 1].clip(0, target_size[0] - 1)
# 调整边界框的坐标
bboxes = box_convert(torch.tensor(
ann['box_examples_coordinates'],
dtype=torch.float32,
device=device
)[:3, [0, 2], :].reshape(-1, 4), in_fmt='xyxy', out_fmt='xywh')
bboxes = bboxes * torch.tensor([w_ratio, h_ratio, w_ratio, h_ratio], device=device)
# 计算窗口大小
window_size = bboxes.mean(dim=0)[2:].cpu().numpy()[::-1]
# 初始化密度图
dmap = torch.zeros(*target_size)
# 根据点的坐标在密度图中增加计数
for p in range(points.size(0)):
dmap[points[p, 1], points[p, 0]] += 1
# 应用高斯滤波器平滑密度图
dmap = gaussian_filter(dmap.cpu().numpy(), window_size / 8)
# 保存密度图
np.save(os.path.join(density_map_path, os.path.splitext(image_name)[0] + '.npy'), dmap)
if __name__ == '__main__':
# 创建命令行参数解析器
parser = argparse.ArgumentParser("Density map generator", add_help=False)
parser.add_argument(
'--data_path',
default='/home/nikola/master-thesis/data/fsc147/',
type=str
)
parser.add_argument('--image_size', default=512, type=int)
# 解析命令行参数
args = parser.parse_args()
# 调用函数生成密度图
generate_density_maps(args.data_path, (args.image_size, args.image_size))