【扒代码】data.py

发布于:2024-08-09 ⋅ 阅读:(145) ⋅ 点赞:(0)

数据增强函数 

import torch
from torchvision.transforms import functional as TVF

def tiling_augmentation(img, bboxes, density_map, resize, jitter, tile_size, hflip_p):
    # 定义一个辅助函数,用于根据给定的概率水平翻转张量
    def apply_hflip(tensor, apply):
        return TVF.hflip(tensor) if apply else tensor

    # 定义一个辅助函数,用于生成平铺的图像或密度图
    def make_tile(x, num_tiles, hflip, hflip_p, jitter=None):
        result = list()
        for j in range(num_tiles):
            row = list()
            for k in range(num_tiles):
                t = jitter(x) if jitter is not None else x
                if hflip[j, k] < hflip_p:  # 根据给定的概率水平翻转
                    t = apply_hflip(t, True)
                row.append(t)
            result.append(torch.cat(row, dim=-1))  # 按列连接
        return torch.cat(result, dim=-2)  # 按行连接

    x_tile, y_tile = tile_size  # 获取平铺的尺寸
    y_target, x_target = resize.size  # 获取目标尺寸
    num_tiles = max(int(x_tile.ceil()), int(y_tile.ceil()))  # 计算平铺的数量
    hflip = torch.rand(num_tiles, num_tiles)  # 随机生成水平翻转的决策矩阵

    # 对图像进行平铺增强
    img = make_tile(img, num_tiles, hflip, hflip_p, jitter)
    img = resize(img[..., :int(y_tile*y_target), :int(x_tile*x_target)])

    # 对密度图进行平铺增强
    density_map = make_tile(density_map, num_tiles, hflip, hflip_p)
    density_map = density_map[..., :int(y_tile*y_target), :int(x_tile*x_target)]
    original_sum = density_map.sum()  # 保存原始密度图的总和
    density_map = resize(density_map)  # 应用缩放
    density_map = density_map / density_map.sum() * original_sum  # 归一化并恢复原始总和

    # 根据第一个平铺的翻转状态调整边界框
    if hflip[0, 0] < hflip_p:
        bboxes[:, [0, 2]] = x_target - bboxes[:, [2, 0]]  # 更新边界框的左右边界
    bboxes = bboxes / torch.tensor([x_tile, y_tile, x_tile, y_tile])  # 调整边界框的大小

    return img, bboxes, density_map

这段代码定义了一个名为 tiling_augmentation 的函数,它用于对图像、边界框和密度图进行平铺增强处理。平铺增强是一种数据增强技术,通过将图像分割成小块(平铺),对每个小块进行随机变换(如水平翻转、缩放等),然后将它们重新组合,以增加数据集的多样性,提高模型的泛化能力。

功能解释

  • tiling_augmentation 函数接收图像 img、边界框 bboxes、密度图 density_map,以及其他几个参数,包括 resize(缩放函数)、jitter(抖动函数,用于增加图像的多样性)、tile_size(平铺的尺寸),以及 hflip_p(水平翻转的概率)
  • 函数首先定义了两个辅助函数:apply_hflip 用于水平翻转张量,make_tile 用于生成平铺的图像或密度图
  • make_tile 函数通过迭代每个平铺块,应用随机的水平翻转和(如果提供)抖动变换,然后将块按行列连接起来。
  • 图像和密度图使用 make_tile 函数进行增强,然后使用 resize 函数调整大小。
  • 密度图在缩放后进行归一化,以保持对象数量的一致性。
  • 边界框根据平铺的翻转状态进行调整,以确保它们在增强后的图像中保持正确。
  • 函数返回增强后的图像、调整后的边界框和密度图。

整体而言,tiling_augmentation 函数通过平铺增强技术,为图像和相关数据提供了一种有效的数据增强方法,有助于提高模型在面对不同图像变换时的鲁棒性。

FSC147Dataset 类

import os
import json
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms as T
from torchvision.transforms import functional as TVF

class FSC147Dataset(Dataset):
    def __init__(
        self, data_path, img_size, split='train', num_objects=3,
        tiling_p=0.5, zero_shot=False
    ):
        # 初始化数据集的属性
        self.split = split  # 数据集的分割类型(训练、验证或测试)
        self.data_path = data_path  # 数据集的根目录路径
        self.horizontal_flip_p = 0.5  # 水平翻转的概率
        self.tiling_p = tiling_p  # 平铺增强的概率
        self.img_size = img_size  # 图像的尺寸
        self.resize = T.Resize((img_size, img_size))  # 定义图像的缩放操作
        self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8)  # 定义颜色抖动增强
        self.num_objects = num_objects  # 每个图像中对象的数量
        self.zero_shot = zero_shot  # 是否为零样本学习场景

        # 加载数据集的分割信息和注释信息
        with open(os.path.join(self.data_path, 'Train_Test_Val_FSC_147.json'), 'rb') as file:
            splits = json.load(file)
            self.image_names = splits[split]
        with open(os.path.join(self.data_path, 'annotation_FSC147_384.json'), 'rb') as file:
            self.annotations = json.load(file)

    def __getitem__(self, idx: int):
        # 根据索引 idx 获取一个数据样本
        img = Image.open(os.path.join(
            self.data_path,
            'images_384_VarV2',
            self.image_names[idx]
        )).convert("RGB")  # 加载图像并转换为RGB格式

        # 根据数据集的分割类型进行不同的预处理
        if self.split != 'train':
            img = T.Compose([
                T.ToTensor(),  # 将图像转换为张量
                self.resize,  # 缩放图像
                T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 标准化
            ])(img)
        else:
            img = T.Compose([
                T.ToTensor(),  # 将图像转换为张量
                self.resize,  # 缩放图像
            ])(img)

        # 加载并处理边界框
        bboxes = torch.tensor(
            self.annotations[self.image_names[idx]]['box_examples_coordinates'],
            dtype=torch.float32
        )[:3, [0, 2], :].reshape(-1, 4)[:self.num_objects, ...]
        bboxes = bboxes / torch.tensor([w, h, w, h]) * self.img_size

        # 加载并处理密度图
        density_map = torch.from_numpy(np.load(os.path.join(
            self.data_path,
            'gt_density_map_adaptive_512_512_object_VarV2',
            os.path.splitext(self.image_names[idx])[0] + '.npy',
        ))).unsqueeze(0)

        # 如果图像尺寸不是512,则调整密度图的大小
        if self.img_size != 512:
            original_sum = density_map.sum()
            density_map = self.resize(density_map)
            density_map = density_map / density_map.sum() * original_sum

        # 数据增强
        tiled = False
        if self.split == 'train' and torch.rand(1) < self.tiling_p:
            tiled = True
            tile_size = (torch.rand(1) + 1, torch.rand(1) + 1)
            img, bboxes, density_map = tiling_augmentation(
                img, bboxes, density_map, self.resize,
                self.jitter, tile_size, self.horizontal_flip_p
            )

        # 如果是训练集且没有进行平铺增强,则应用颜色抖动
        if self.split == 'train':
            if not tiled:
                img = self.jitter(img)
            img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img)

        # 如果是训练集且没有进行平铺增强且随机数小于水平翻转概率,则水平翻转图像
        if self.split == 'train' and not tiled and torch.rand(1) < self.horizontal_flip_p:
            img = TVF.hflip(img)
            density_map = TVF.hflip(density_map)
            bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]]

        return img, bboxes, density_map

    def __len__(self):
        # 返回数据集中样本的数量
        return len(self.image_names)

功能解释

  • FSC147Dataset 类用于创建一个可以用于 PyTorch 数据加载器的对象,它加载 FSC-147 数据集中的图像、边界框和密度图,并可以对它们进行预处理和增强
  • 在初始化方法 __init__ 中,加载了数据集的分割信息和注释信息,并设置了图像的预处理和增强操作。
  • __getitem__ 方法根据索引 idx 加载并返回一个数据样本,包括图像、边界框和密度图。根据数据集的分割类型,应用了不同的预处理操作。对于训练集,还应用了平铺增强和颜色抖动等数据增强技术。
  • tiling_augmentation 函数用于执行平铺增强,它将图像分割成小块,对每个小块进行随机变换,然后重新组合。
  • Normalize 用于将图像的像素值标准化,以便于神经网络处理。
  • ColorJitter 用于颜色抖动增强,它可以随机改变图像的亮度、对比度、饱和度和色调
  • RandomApply 用于以一定概率应用变换序列。
  • __len__ 方法返回数据集中样本的数量,它被用于 PyTorch 数据加载器来确定迭代次数。

整体而言,FSC147Dataset 类提供了一个灵活的方式来加载和处理 FSC-147 数据集,支持多种预处理和数据增强技术,有助于提高模型的性能和泛化能力。

generate_density_maps

这段代码定义了一个名为 generate_density_maps 的函数,它用于生成密度图,这些密度图可以用于对象计数任务。此外,代码中还包含了一个命令行参数解析器,用于指定数据路径和图像尺寸。

import os
import torch
import numpy as np
from PIL import Image
from torchvision import transforms as T
from torchvision.ops import box_convert
from scipy.ndimage import gaussian_filter
from tqdm import tqdm
import argparse

def generate_density_maps(data_path, target_size=(512, 512)):
    # 计算密度图的保存路径
    density_map_path = os.path.join(
        data_path,
        f'gt_density_map_adaptive_{target_size[0]}_{target_size[1]}_object_VarV2'
    )
    # 如果路径不存在,则创建目录
    if not os.path.isdir(density_map_path):
        os.makedirs(density_map_path)

    # 加载注释信息
    with open(os.path.join(data_path, 'annotation_FSC147_384.json'), 'rb') as file:
        annotations = json.load(file)

    # 设置设备为GPU或CPU
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

    # 遍历注释中的所有图像
    for i, (image_name, ann) in enumerate(tqdm(annotations.items())):
        # 加载图像并获取高度和宽度
        img = Image.open(os.path.join(
            data_path,
            'images_384_VarV2',
            image_name
        ))
        _, h, w = T.ToTensor()(img).size()

        # 计算缩放比例
        h_ratio, w_ratio = target_size[0] / h, target_size[1] / w

        # 调整点的坐标
        points = (
            torch.tensor(ann['points'], device=device) *
            torch.tensor([w_ratio, h_ratio], device=device)
        ).long()
        # 限制点的坐标在目标尺寸内
        points[:, 0] = points[:, 0].clip(0, target_size[1] - 1)
        points[:, 1] = points[:, 1].clip(0, target_size[0] - 1)

        # 调整边界框的坐标
        bboxes = box_convert(torch.tensor(
            ann['box_examples_coordinates'],
            dtype=torch.float32,
            device=device
        )[:3, [0, 2], :].reshape(-1, 4), in_fmt='xyxy', out_fmt='xywh')
        bboxes = bboxes * torch.tensor([w_ratio, h_ratio, w_ratio, h_ratio], device=device)

        # 计算窗口大小
        window_size = bboxes.mean(dim=0)[2:].cpu().numpy()[::-1]

        # 初始化密度图
        dmap = torch.zeros(*target_size)
        # 根据点的坐标在密度图中增加计数
        for p in range(points.size(0)):
            dmap[points[p, 1], points[p, 0]] += 1
        # 应用高斯滤波器平滑密度图
        dmap = gaussian_filter(dmap.cpu().numpy(), window_size / 8)

        # 保存密度图
        np.save(os.path.join(density_map_path, os.path.splitext(image_name)[0] + '.npy'), dmap)

if __name__ == '__main__':
    # 创建命令行参数解析器
    parser = argparse.ArgumentParser("Density map generator", add_help=False)
    parser.add_argument(
        '--data_path',
        default='/home/nikola/master-thesis/data/fsc147/',
        type=str
    )
    parser.add_argument('--image_size', default=512, type=int)
    # 解析命令行参数
    args = parser.parse_args()
    # 调用函数生成密度图
    generate_density_maps(args.data_path, (args.image_size, args.image_size))

功能解释

  • generate_density_maps 函数接收数据路径和目标尺寸作为参数,用于生成和保存密度图。
  • 首先,它计算密度图的保存路径,并创建必要的目录。
  • 然后,它加载注释信息,并遍历每张图像及其注释。
  • 对于每张图像,它调整点和边界框的坐标,以适应目标尺寸。
  • 使用 box_convert 函数将边界框的格式从 xyxy 转换为 xywh
  • 根据点的坐标在密度图中增加计数,并应用高斯滤波器进行平滑处理。
  • 最后,它将生成的密度图保存为 .npy 文件。

if __name__ == '__main__': 部分,代码使用 argparse 库处理命令行参数,允许用户指定数据路径和图像尺寸。然后调用 generate_density_maps 函数执行密度图的生成。

整体而言,这段代码提供了一个自动化的工具,用于生成和保存密度图,这些密度图可以用于训练和评估对象计数模型。通过调整点和边界框的坐标以及应用高斯滤波器,生成的密度图为模型提供了丰富的空间信息,有助于提高计数任务的性能。

完整代码

import os
import json
import argparse

from PIL import Image
import numpy as np
from scipy.ndimage import gaussian_filter

import torch
from torch.utils.data import Dataset
from torchvision.ops import box_convert
from torchvision import transforms as T
from torchvision.transforms import functional as TVF

from tqdm import tqdm


def tiling_augmentation(img, bboxes, density_map, resize, jitter, tile_size, hflip_p):

    # 定义一个辅助函数,用于根据给定的概率水平翻转张量
    def apply_hflip(tensor, apply):
        return TVF.hflip(tensor) if apply else tensor

    # 定义一个辅助函数,用于生成平铺的图像或密度图
    def make_tile(x, num_tiles, hflip, hflip_p, jitter=None):
        result = list()
        for j in range(num_tiles):
            row = list()
            for k in range(num_tiles):
                t = jitter(x) if jitter is not None else x
                if hflip[j, k] < hflip_p:# 根据给定的概率水平翻转
                    t = TVF.hflip(t)
                row.append(t)
            result.append(torch.cat(row, dim=-1))# 按列连接
        return torch.cat(result, dim=-2)# 按行连接

    x_tile, y_tile = tile_size# 获取平铺的尺寸
    y_target, x_target = resize.size# 获取目标尺寸
    num_tiles = max(int(x_tile.ceil()), int(y_tile.ceil()))# 计算平铺的数量
    # whether to horizontally flip each tile
    hflip = torch.rand(num_tiles, num_tiles)# 随机生成水平翻转的决策矩阵

    img = make_tile(img, num_tiles, hflip, hflip_p, jitter)# 对图像进行平铺增强
    img = resize(img[..., :int(y_tile*y_target), :int(x_tile*x_target)])

    density_map = make_tile(density_map, num_tiles, hflip, hflip_p)# 对密度图进行平铺增强
    density_map = density_map[..., :int(y_tile*y_target), :int(x_tile*x_target)]
    original_sum = density_map.sum()# 保存原始密度图的总和
    density_map = resize(density_map)# 应用缩放
    density_map = density_map / density_map.sum() * original_sum# 归一化并恢复原始总和

    # 根据第一个平铺的翻转状态调整边界框
    if hflip[0, 0] < hflip_p:
        # 更新边界框的左右边界
        bboxes[:, [0, 2]] = x_target - bboxes[:, [2, 0]]  # TODO change
    # 调整边界框的大小
    bboxes = bboxes / torch.tensor([x_tile, y_tile, x_tile, y_tile])
    return img, bboxes, density_map


class FSC147Dataset(Dataset):

    def __init__(
        self, data_path, img_size, split='train', num_objects=3,
        tiling_p=0.5, zero_shot=False
    ):
         # 初始化数据集的属性
        self.split = split  # 数据集的分割类型(训练、验证或测试)
        self.data_path = data_path  # 数据集的根目录路径
        self.horizontal_flip_p = 0.5    # 水平翻转的概率
        self.tiling_p = tiling_p    # 平铺增强的概率 
        self.img_size = img_size    # 图像的尺寸
        self.resize = T.Resize((img_size, img_size))    # 定义图像的缩放操作
        self.jitter = T.RandomApply([T.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8) # 定义颜色抖动增强
        self.num_objects = num_objects  # 每个图像中对象的数量
        self.zero_shot = zero_shot  # 是否为零样本学习场景

        # 加载数据集的分割信息和注释信息
        with open(
            os.path.join(self.data_path, 'Train_Test_Val_FSC_147.json'), 'rb'
        ) as file:
            splits = json.load(file)
            self.image_names = splits[split]
        with open(
            os.path.join(self.data_path, 'annotation_FSC147_384.json'), 'rb'
        ) as file:
            self.annotations = json.load(file)

    def __getitem__(self, idx: int):
        # 根据索引 idx 获取一个数据样本

        # 加载图像并转换为RGB格式
        img = Image.open(os.path.join(
            self.data_path,
            'images_384_VarV2',
            self.image_names[idx]
        )).convert("RGB")
        w, h = img.size

        # 根据数据集的分割类型进行不同的预处理
        if self.split != 'train':
            img = T.Compose([
                T.ToTensor(),   # 将图像转换为张量
                self.resize,    # 缩放图像
                # 标准化
                T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
            ])(img)
        else:
            img = T.Compose([
                T.ToTensor(),   # 将图像转换为张量
                self.resize,    # 缩放图像
            ])(img)

        # 加载并处理边界框
        bboxes = torch.tensor(
            self.annotations[self.image_names[idx]]['box_examples_coordinates'],
            dtype=torch.float32
        )[:3, [0, 2], :].reshape(-1, 4)[:self.num_objects, ...]
        bboxes = bboxes / torch.tensor([w, h, w, h]) * self.img_size

        # 加载并处理密度图
        density_map = torch.from_numpy(np.load(os.path.join(
            self.data_path,
            'gt_density_map_adaptive_512_512_object_VarV2',
            os.path.splitext(self.image_names[idx])[0] + '.npy',
        ))).unsqueeze(0)

        # 如果图像尺寸不是512,则调整密度图的大小
        if self.img_size != 512:
            original_sum = density_map.sum()
            density_map = self.resize(density_map)
            density_map = density_map / density_map.sum() * original_sum

        # data augmentation  数据增强
        tiled = False
        if self.split == 'train' and torch.rand(1) < self.tiling_p:
            tiled = True
            tile_size = (torch.rand(1) + 1, torch.rand(1) + 1)
            img, bboxes, density_map = tiling_augmentation(
                img, bboxes, density_map, self.resize,
                self.jitter, tile_size, self.horizontal_flip_p
            )

        # 如果是训练集且没有进行平铺增强,则应用颜色抖动
        if self.split == 'train':
            if not tiled:
                img = self.jitter(img)
            img = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img)

        # 如果是训练集且没有进行平铺增强且随机数小于水平翻转概率,则水平翻转图像
        if self.split == 'train' and not tiled and torch.rand(1) < self.horizontal_flip_p:
            img = TVF.hflip(img)
            density_map = TVF.hflip(density_map)
            bboxes[:, [0, 2]] = self.img_size - bboxes[:, [2, 0]]

        return img, bboxes, density_map

    def __len__(self):
        # 返回数据集中样本的数量
        return len(self.image_names)


def generate_density_maps(data_path, target_size=(512, 512)):
    # 计算密度图的保存路径
    density_map_path = os.path.join(
        data_path,
        f'gt_density_map_adaptive_{target_size[0]}_{target_size[1]}_object_VarV2'
    )
    # 如果路径不存在,则创建目录
    if not os.path.isdir(density_map_path):
        os.makedirs(density_map_path)
    # 加载注释信息
    with open(
        os.path.join(data_path, 'annotation_FSC147_384.json'), 'rb'
    ) as file:
        annotations = json.load(file)

    # 设置设备为GPU或CPU
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

    # 遍历注释中的所有图像
    # 加载图像并获取高度和宽度
    for i, (image_name, ann) in enumerate(tqdm(annotations.items())):
        _, h, w = T.ToTensor()(Image.open(os.path.join(
            data_path,
            'images_384_VarV2',
            image_name
        ))).size()

        # 计算缩放比例
        h_ratio, w_ratio = target_size[0] / h, target_size[1] / w

        # 调整点的坐标
        points = (
            torch.tensor(ann['points'], device=device) *
            torch.tensor([w_ratio, h_ratio], device=device)
        ).long()

        # 限制点的坐标在目标尺寸内
        points[:, 0] = points[:, 0].clip(0, target_size[1] - 1)
        points[:, 1] = points[:, 1].clip(0, target_size[0] - 1)
        # 调整边界框的坐标
        bboxes = box_convert(torch.tensor(
            ann['box_examples_coordinates'],
            dtype=torch.float32,
            device=device
        )[:3, [0, 2], :].reshape(-1, 4), in_fmt='xyxy', out_fmt='xywh')
        bboxes = bboxes * torch.tensor([w_ratio, h_ratio, w_ratio, h_ratio], device=device)
        # 计算窗口大小
        window_size = bboxes.mean(dim=0)[2:].cpu().numpy()[::-1]

        # 初始化密度图
        dmap = torch.zeros(*target_size)
         # 根据点的坐标在密度图中增加计数
        for p in range(points.size(0)):
            dmap[points[p, 1], points[p, 0]] += 1
        # 应用高斯滤波器平滑密度图
        dmap = gaussian_filter(dmap.cpu().numpy(), window_size / 8)

        # 保存密度图
        np.save(os.path.join(density_map_path, os.path.splitext(image_name)[0] + '.npy'), dmap)


if __name__ == '__main__':
    # 创建命令行参数解析器
    parser = argparse.ArgumentParser("Density map generator", add_help=False)
    parser.add_argument(
        '--data_path',
        default='/home/nikola/master-thesis/data/fsc147/',
        type=str
    )
    parser.add_argument('--image_size', default=512, type=int)
    # 解析命令行参数
    args = parser.parse_args()
    # 调用函数生成密度图
    generate_density_maps(args.data_path, (args.image_size, args.image_size))


网站公告

今日签到

点亮在社区的每一天
去签到