引航计划Day8

发布于:2023-01-22 ⋅ 阅读:(8) ⋅ 点赞:(0) ⋅ 评论:(0)

引航计划第八日。今天的主要任务是路径规划与A*算法、路径跟踪与Pure Pursuit算法、路径规划局部追踪总流程以及运用曙光智算训练模型。

路径规划与A*算法

Introduction to the A* Algorithm

A*算法与广度优先搜索和迪杰斯特拉算法相比的区别如下:

广度首先搜索在各个方向上都同样探索。这是一种非常有用的算法,不仅用于常规路径查找,而且用于程序地图生成,平滑途径,距离图和其他类型的地图分析。
Dijkstra的算法(也称为统一的成本搜索)使我们可以优先考虑要探索的路径。它没有平等地探索所有可能的路径,而是有利于较低的成本路径。 我们可以分配较低的成本,以鼓励在道路上移动,较高的成本以避免森林,更高的成本阻止敌人接近敌人等等。 当运动成本各不相同时,我们使用它而不是广度首次搜索。
A*是针对单个目的地优化的Dijkstra算法的修改。 Dijkstra的算法可以找到通往所有位置的路径; A*找到通往一个位置或最接近几个位置的路径。 它优先考虑似乎更接近目标的道路,可以在尽可能靠近目标的同时考虑离起点和终点的距离。

核心代码如下:

frontier = PriorityQueue()
frontier.put(start, 0)
came_from = dict()
cost_so_far = dict()
came_from[start] = None
cost_so_far[start] = 0

while not frontier.empty():
   current = frontier.get()

   if current == goal:
      break
   
   for next in graph.neighbors(current):
      new_cost = cost_so_far[current] + graph.cost(current, next)
      if next not in cost_so_far or new_cost < cost_so_far[next]:
         cost_so_far[next] = new_cost
         priority = new_cost + heuristic(goal, next)
         frontier.put(next, priority)
         came_from[next] = current

A*算法演示小实验

输入图像大小,起终点位置,生成可视化最佳路径与每个点坐标:

路径跟踪与Pure Pursuit算法

https://thomasfermi.github.io/Algorithms-for-Automated-Driving/Control/BicycleModel.html

https://thomasfermi.github.io/Algorithms-for-Automated-Driving/Control/PurePursuit.html

Pure Pursuit是一种几何跟踪控制算法,也被称为纯跟踪控制算法。该算法最早由R. Wallace在1985年提出,其思想是基于当前车辆的后轮中心位置(车辆质心),在参考路径上向(称为前视距离)的距离匹配一个预瞄点,假设车辆后轮中心可以按照一定的转弯半径𝑅行驶至该预瞄点,然后根据前视距离、转弯半径𝑅、车辆坐标系下预瞄点的朝向角𝛼之间的几何关系来计算前轮转角。

 

 

 

程序运行结果如下:

路径规划局部追踪总流程

核心代码(含注释)如下:

'''
A*算法
'''
class Node():
    """A node class for A* Pathfinding"""

    def __init__(self, parent=None, position=None):
        self.parent = parent
        self.position = position

        self.g = 0
        self.h = 0
        self.f = 0

    def __eq__(self, other):
        return self.position == other.position

def astar(maze, start, end, time_limit=1):

    astar_start_time = time.time()

    if maze[end[0]][end[1]] != 0:
        return (None, 0)

    # Create start and end node
    start_node = Node(None, start)
    start_node.g = start_node.h = start_node.f = 0
    end_node = Node(None, end)
    end_node.g = end_node.h = end_node.f = 0

    # Initialize both open and closed list
    open_list = []
    closed_list = []

    # Add the start node
    open_list.append(start_node)

    # Loop until you find the end
    while len(open_list) > 0:

        if time.time() - astar_start_time >= time_limit:
            return (None, 1)

        # Get the current node
        current_node = open_list[0]
        current_index = 0
        for index, item in enumerate(open_list):
            if item.f < current_node.f:
                current_node = item
                current_index = index

        # Pop current off open list, add to closed list
        open_list.pop(current_index)
        closed_list.append(current_node)

        # Found the goal
        if current_node == end_node:
            path = []
            current = current_node
            while current is not None:
                path.append(current.position)
                current = current.parent
            path = path[::-1] # Return reversed path
            if len(path) == 0:
                return (None, 2)
            else:
                return (path, -1)

        # Generate children
        children = []
        for new_position in [(0, -1), (0, 1), (-1, 0), (1, 0), (-1, -1), (-1, 1), (1, -1), (1, 1)]: # Adjacent squares

            # Get node position
            # node_position = (current_node.position[0] + new_position[0], current_node.position[1] + new_position[1])
            node_position = [current_node.position[0] + new_position[0], current_node.position[1] + new_position[1]]

            # Make sure within range
            if node_position[0] > (len(maze) - 1) or node_position[0] < 0 or node_position[1] > (len(maze[len(maze)-1]) -1) or node_position[1] < 0:
                continue

            # Make sure walkable terrain
            if maze[node_position[0]][node_position[1]] != 0:
                continue

            # Create new node
            new_node = Node(current_node, node_position)

            # Append
            children.append(new_node)

        # Loop through children
        for child in children:

            # Child is on the closed list
            for closed_child in closed_list:
                if child == closed_child:
                    continue

            # Create the f, g, and h values
            child.g = current_node.g + 1
            child.h = ((child.position[0] - end_node.position[0]) ** 2) + ((child.position[1] - end_node.position[1]) ** 2)
            child.f = child.g + child.h

            # Child is already in the open list
            for open_node in open_list:
                if child == open_node and child.g > open_node.g:
                    continue

            # Add the child to the open list
            open_list.append(child)

'''
根据图像分割结果, 规划行驶路线
'''
def find_path(img, time_limit=1):
    
    '''
    开始计时
    '''
    start_time = time.time()

    if SHOW_IMAGE:
        plt.imshow(img)
        plt.show()

    '''
    从网络预测大小放缩到拍摄大小
    '''
    img = cv2.resize(img, DIM, interpolation=cv2.INTER_NEAREST)
    img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)       # 统一颜色

    '''
    矫正图片去畸变
    '''
    K = np.loadtxt('K_SVGA.csv', dtype=np.float32, delimiter=',')
    D = np.loadtxt('D_SVGA.csv', dtype=np.float32, delimiter=',')

    nK = K.copy()
    nK[0,0]=K[0,0]
    nK[1,1]=K[1,1]
    map1, map2 = cv2.fisheye.initUndistortRectifyMap(K, D, np.eye(3), nK, DIM, cv2.CV_16SC2)

    def undistort(img):
        img += 1
        undistorted_img = cv2.remap(img, map1, map2, interpolation=cv2.INTER_NEAREST, borderMode=cv2.BORDER_CONSTANT)

        return undistorted_img

    undistorted_img = undistort(img)
    undistorted_img = undistorted_img[:, :, 0]

    if SHOW_IMAGE:
        plt.imshow(undistorted_img)
        plt.show()

    '''
    鸟瞰图
    '''
    # 1.读取透视变换矩阵
    H = np.loadtxt('H_SVGA.csv', dtype=np.float32, delimiter=',')

    # 2.执行透视变换
    birdview_img = cv2.warpPerspective(undistorted_img, H, (DIM[1], DIM[0]), flags=cv2.INTER_NEAREST)
    birdview_img = birdview_img.swapaxes(1, 0)

    if SHOW_IMAGE:
        plt.imshow(birdview_img)
        plt.show()

    '''
    获取可行驶区域图
    实际尺寸与鸟瞰图像素换算比例为 2mm = 1px
    车身半宽为 7.5 cm, 即 37.5 px
    '''
    # 设置形态学操作的核
    kernel_1 = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))#定义结构元素的形状和大小
    kernel_2 = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (39, 39))
    kernel_3 = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (37, 37))

    # 获得 grass 标签
    grass = birdview_img == 2

    # 去除 grass 中误识别出的小区域
    if SHOW_IMAGE:
        plt.subplot(1, 2, 1)
        plt.imshow(grass)
    grass = morphology.remove_small_objects(grass, min_size=60, connectivity=1)
    grass = grass*np.uint8(2)
    # print('back datatype:', grass.dtype)
    if SHOW_IMAGE:
        plt.subplot(1, 2, 2)
        plt.imshow(grass)
        plt.show()

    # 获得背景标签
    back = birdview_img == 3
    # 去除背景中误识别出的小区域
    if SHOW_IMAGE:
        plt.subplot(1, 2, 1)
        plt.imshow(back)
    back = morphology.remove_small_objects(back, min_size=4000, connectivity=1)
    back = back*np.uint8(3)
    
    if SHOW_IMAGE:
        plt.subplot(1, 2, 2)
        plt.imshow(back)
        plt.show()

    # 可视化
    if SHOW_IMAGE:
        plt.subplot(1, 3, 1)
        plt.imshow(birdview_img, 'gray')

    # 初始化 drivable_img
    drivable_img = np.zeros(birdview_img.shape, dtype=np.uint8)
    drivable_img[birdview_img == 1] = 1

    # 先扩展 grass 部分
    dst = cv2.erode(grass, kernel_1)
    dst = cv2.dilate(dst, kernel_2)
    drivable_img = np.maximum(drivable_img, dst)

    if SHOW_IMAGE:
        plt.subplot(1, 3, 2)
        plt.imshow(drivable_img, 'gray')

    # 再扩展 back 部分
    dst = cv2.dilate(back, kernel_3)
    drivable_img = np.maximum(drivable_img, dst)

    if SHOW_IMAGE:
        plt.subplot(1, 3, 3)
        plt.imshow(drivable_img, 'gray')
        plt.show()

    (h, w) = drivable_img.shape
    maze = np.ones((h, w), dtype=np.uint8)
    maze[drivable_img==1] = 0

    print(cv2.resize(maze, (20, 20), interpolation=cv2.INTER_NEAREST))

    k = 20

    (maze_h, maze_w) = (h//k, w//k)
    print('maze_size:', (maze_w, maze_h))
    maze = cv2.resize(maze, (maze_w, maze_h), interpolation=cv2.INTER_NEAREST)
    
    '''
    补全车前的盲区
    '''

    maze[-60//k:, 320//k:480//k] = 0
    if SHOW_IMAGE:
        plt.imshow(maze)
        plt.show()

    (h, w) = maze.shape
    print('maze shape:', h, w)

    '''
    选取终点
    设定候选目标半径为 80 cm, 即 400 px, 400//k
    '''
    img = np.ones((h, w), dtype=np.uint8)
    cv2.circle(img, center=(w//2, h-1), radius=400//k, color=0, thickness=1)
    img = img*2
    if SHOW_IMAGE:
        plt.imshow(img)
        plt.show()

    final_candidate = (maze == img).astype(np.uint8)
    if SHOW_IMAGE:
        plt.imshow(final_candidate)
        plt.show()

    _, contours, _ = cv2.findContours(final_candidate, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    num_contour = len(contours)
    print('num of contours:', num_contour)
    print('contours:', contours)

    index = random.randint(0, num_contour-1)
    contour = contours[index]

    print('contour:', contour)

    num = contour.shape[0]
    contour = np.reshape(contour, (num, 2))

    index = random.randint(0, num-1)
    end = [contour[index, 1], contour[index,0]]

    if SHOW_IMAGE:
        plt.imshow(maze, 'gray')
        plt.scatter(end[1], end[0], s=10, c='red')
        plt.show()

    '''
    路径规划
    目标
    '''
    # start = (h-1, w//2)
    start = [h-1, w//2]

    maze = maze.tolist()
    (path, flag) = astar(maze, start, end, time_limit=time_limit)

    if path is None:
        if flag == 0:
            print('end point is not drivable!')
        elif flag == 1:
            print('exceed time limit!')
        else:
            print('no valid path!')

    if SHOW_IMAGE:
        from matplotlib.patches import Circle
        fig = plt.figure()
        axes = fig.subplots(1, 2)

        ax = axes[0]
        ax.imshow(maze)

        ax = axes[1]
        ax.imshow(maze)
        if path is not None:
            for i in range(0, len(path), 1):
                plt.scatter(path[i][1], path[i][0], s=5)
        circle = Circle(xy = [w//2, h-1], radius=400 // k)
        ax.add_patch(p=circle)
        circle.set(lw=3, facecolor='green', alpha=0.3)

        plt.show()
        
    return path

'''
路径平滑
'''

from copy import deepcopy

def printpaths(path, newpath):
    for old, new in zip(path, newpath):
        print('[' + ', '.join('%.3f' % x for x in old) +
              '] -> [' + ', '.join('%.3f' % x for x in new) + ']')


def smooth(path, weight_data=0.5, weight_smooth=0.1, tolerance=0.000001):
    new = deepcopy(path)
    dims = len(path[0])
    change = tolerance

    while change >= tolerance:
        change = 0.0
        for i in range(1, len(new) - 1):
            for j in range(dims):

                x_i = path[i][j]
                y_i, y_prev, y_next = new[i][j], new[i - 1][j], new[i + 1][j]

                y_i_saved = y_i
                y_i += weight_data * (x_i - y_i) + weight_smooth * (y_next + y_prev - (2 * y_i))
                new[i][j] = y_i

                change += abs(y_i - y_i_saved)
    return new

'''
主程序
'''
if __name__ == '__main__':

    SHOW_IMAGE = True
    DIM = (800, 600)

    input_data = cv2.imread('./test_image/train_3502_OUT.png', cv2.COLOR_BAYER_BG2GRAY)

    path = find_path(input_data, time_limit=5)
    smooth_path = smooth(path, weight_data=0.5, weight_smooth=0.3)
    
    path_array = np.array(path)
    smooth_path_array = np.array(smooth_path)

    plt.figure()
    plt.scatter(path_array[:, 1], path_array[:, 0], s=20, c='red', alpha=0.6)
    plt.scatter(smooth_path_array[:, 1], smooth_path_array[:, 0], s=20, c='green', alpha=0.6)
    plt.show()

总结为以下五个步骤:

  1. 分割结果去畸变,转鸟瞰图;
  2. 可行驶区域可视化;
  3. 补全车前盲区,路径平滑处理;
  4. 选取终点
  5. 路径规划

输出可视化结果如下:

运用曙光智算训练模型

1.登录ac.sugon.com曙光网站,修改submit.sh文件如下(各参数含义见另一篇博客):

 2.进入shell输入如下代码向服务器申请GPU资源:

salloc -p kshdtest -N 1 --gres=dcu:4 --cpus-per-task=18

 成功申请到用户名为j17r2n12的账号

3.输入如下代码进入申请到的环境:

ssh j17r2n12

4.激活自己的虚拟环境并进入unet_torch文件夹:

conda activate torch_1_12
cd 1_segmentation_sugon/unet_torch

5.运行1步骤修改过后的submit.sh文件:

./submit.sh

结果如下:

 就开始在服务器运行程序了,运行成功结果如下:

 6.权重文件保存在checkpoints文件夹中:

 

今天的任务成功结束啦!