数据结构与算法代码实战讲解之:并行算法

发布于:2023-10-25 ⋅ 阅读:(74) ⋅ 点赞:(0)

作者:禅与计算机程序设计艺术

1.背景介绍

随着科技的不断进步、人们对生活的需求日益增长、社会生产力的迅速提升等诸多因素的影响,电子信息领域的软件系统正在飞速发展。与此同时,基于计算机系统的软件系统也在呈现爆炸性增长态势。如何有效地处理海量数据、快速响应用户请求、降低计算资源消耗,成为了当今各类软件系统需要面临的问题。

大数据、云计算、移动互联网、物联网、机器学习、图像识别、智能视频分析、语音识别、智能机器人等热门技术的广泛应用给软件系统带来的压力越来越大。为应对这种挑战,并行算法的研究逐渐成为热门话题。并行算法是指采用多个处理器或线程进行运算以解决大规模并行计算问题的算法。并行算法的研究发展历史经历了串行算法到指令级并行算法再到微核级别并行算法等多个阶段,目前已然成为一种重要的研究方向。

对于并行算法相关知识的掌握,能够更好地理解并行算法的设计及其运行机制,帮助开发人员更快地开发出高效率、可扩展性强的软件系统。因此,数据结构与算法代码实战讲解之:并行算法就是为了帮助读者了解并行算法的基本原理、基本方法、基本工具和实现技巧,以期达到以下目的:

  1. 熟悉并行算法的基本概念、特点和发展过程;
  2. 掌握并行算法的设计原则、基本框架和基本操作方法;
  3. 具备独立编写并行算法的能力;
  4. 通过阅读并行算法的代码示例,加深对并行算法的理解;
  5. 提升自己解决并行算法问题的能力。

本书中所有实例均采用C++语言编程。希望通过本书的学习,读者能够充分理解并行算法的原理,掌握并行算法的基本技能,并在实际项目中运用其知识解决实际问题。另外,本书亦适用于所有对并行算法感兴趣的读者,无论是否具有相关经验。

2.核心概念与联系

并行计算模型

并行计算模型(Parallel Computing Model)描述了执行并行任务所需的硬件资源和软件环境。主要包括以下几种模型:

  • 分布式系统模型:分布式系统由多台处理器(CPU、GPU等)组成,不同的处理器之间可以通信传输数据。因此,分布式系统模型可以充分利用并行处理机的资源。常见的分布式系统模型有并行/串行分布模型、共享内存模型、分布式文件系统模型等。
  • 集群系统模型:集群系统由多台计算节点(CPU、主存、存储设备等)组成,每个节点上都运行一个进程,它们共同工作协作完成任务。这种模型最大的优势是通过网络互连可以实现不同节点之间的通信。常见的集群系统模型有共享存储系统模型、网格计算系统模型、超标量系统模型等。
  • 多处理机模型:多处理机系统由多台计算机主板组成,每块主板上都集成若干个处理器,可以并行处理任务。多处理机模型也可以采用分布式系统模型或集群系统模型的某种形式,但由于各种复杂原因,多处理机模型往往比其他两种模型要简单一些。常见的多处理机模型有MPP模型、SMP模型、NUMA模型等。

并行算法

并行算法(Parallel Algorithm)是一种用于处理具有多个数据项的数据集合的通用计算模型。它通常由并行化的基本算法、数据划分和调度策略三部分构成。其中,基本算法即为串行算法中的常见算法,例如排序算法、矩阵乘法算法、树搜索算法等。数据划分即将整个数据集合划分成若干个数据子集,并将每个数据子集分配到不同的处理器上执行。调度策略则定义如何根据处理器的空闲时间安排任务的执行顺序。

OpenMP

OpenMP(Open Multi-Processing)是一个并行编程接口,它提供了C/C++/Fortran、Java、Python等多种编程语言的多平台支持。OpenMP通过提供多种并行语句来简化并行算法的编程难度,包括共享变量的隐式并行化、手动并行化、同步语句、Worksharing构造、Loop Construct、Tasking Construct等。OpenMP API被包括GNU GCC、Intel compiler、PGI compiler等主流编译器集成。

CUDA

CUDA(Compute Unified Device Architecture)是由NVIDIA公司推出的基于图形处理单元(Graphics Processing Unit, GPU)的并行编程接口。CUDA提供了专门的GPU函数库,可以用来编写高性能的并行算法。CUDA API被包括NVIDIA CUDA Toolkit、AMD APP SDK、英伟达 CUDA Toolkit、第三方编译器等主流软件集成。

MPI

MPI(Message Passing Interface)是一套消息传递标准,它定义了应用程序之间的数据交换方式。最初设计为高性能、可移植性和易用性,目前已经成为众多科研机构和工程界应用的标准。MPI提供了一致的编程模型和API接口,并通过一系列的标准操作符定义了通信原语。MPI可以用来编写高度并行化的并行算法。

3.核心算法原理和具体操作步骤以及数学模型公式详细讲解

1. 并行排序算法——基数排序

原理

基数排序(Radix sort)是桶排序的一个变种,其基本思想是按照低位先排序,然后收集;再按照高位排序,再收集;依次类推,直到最高位。

基数排序是属于“多关键字排序”算法,其核心是“分配”和“收集”两个步骤。首先,从低到高扫描待排序序列,取出第i位数字作为基准值,把该位相同的记录放入同一队列。然后,在这个队列上进行插入排序。在第二轮排序中,取得新的基准值,把该位相同的记录放入新的队列。依此类推,直到按最高位排序完毕。最后,所有队列合并成为一个有序的序列。

该算法的时间复杂度为O(d*(n+b)), n为元素个数,d为待排序的最大数位数,b为排序范围的大小。当然,如果基数排序只针对整数排序的话,d=log10(b)。因为基数排序的时间复杂度依赖于待排序数据的最高位。但是,一般情况下基数排序的时间复杂度会大于O(n log n),所以基数排序并不是一种稳定的排序算法。

操作步骤

  1. 设待排序元素的个数为n,最大数位数为d,则得出序列长度为max_num^d。
  2. 初始化计数数组count[0...d]和输出数组output[0...n-1]。
  3. 从低位到高位对待排序数组进行排序,对每个数位,用计数数组统计各个数位上的数字个数。
  4. 将计数数组中的元素累加至前一位元素,如count[i] = count[i]+count[i-1], i=1 to d。
  5. 生成输出数组,先扫描第一位数字,得到count[0]个数的最小值,将其输出至output数组中,并减去该值。然后,向后扫描,将每个位置上的值输出至output数组中,并减去该值。直至输出数组为空。
  6. 对生成的输出数组进行倒序排序。
  7. 输出排序后的结果。

数学模型公式

基数排序算法中,元素有k个关键码,关键码的位数是d,那么序列的长度max_num^d。其中,max_num是待排序元素中的最大数。假设基数排序是k路归并排序,则下列数学模型给出了它的平均时间复杂度。

T(n)=(n+b)(k/(k-1))log2(max_num)/((2^k)dlog10(e))

其中,n为输入元素的个数,b为最大数的位数。k为元素的关键码个数,d为关键码的位数,log2为2的对数,log10为以10为底的对数。e为常数10^(-9),表示计算机浮点误差。

基数排序的时间复杂度取决于关键码的分布情况。如果关键码各项之间的差距很小,比如都是0~9之间的,那么基数排序的时间复杂度约为O(kn)。而如果关键码之间差距很大,比如有很多的关键码都是1、2、3,那么基数排序的时间复杂度可能非常高。

2. 并行搜索算法——分治法

原理

分治法(Divide and Conquer)是一种递归算法,它将待求解的大问题分割成几个较小的相同问题,然后分别求解这些小问题,最后将各自的解组合在一起,就得到了原问题的一个解。分治法有如下特点:

  1. 把一个复杂的问题分解成两个或更多的相同或相似的问题。
  2. 求解两个或更多的子问题,之后再合并子问题的解。
  3. 每个子问题都可以递归求解。

分治法有很多优点,其中最重要的是它可以在多处理机环境中有效地并行化。而且,它还可以保证正确性和简单性,这在很多问题中都显得尤为重要。

分治法可以划分大问题的多个小问题,每个小问题又可以继续分解,并最终合并。在这个过程中,我们遵循分治模式,如重复地应用分治模式,就可以将大问题的解一步步缩小到足够小的程度,使之易于处理。

分治法可以用到很多地方,其中最常见的就是排序和搜索。在数据库管理系统中,对记录排序通常可以使用分治算法。例如,排序一百万条记录需要花费很长的时间,但是通过使用分治算法,就可以把它拆分成两千五百条左右的子问题,每个子问题只需要处理十万条记录,这样就可以在短时间内处理完这百万条记录。

操作步骤

  1. 用分治模式将大问题分解为多个小问题。
  2. 为每个小问题寻找一个解。
  3. 使用合并算法合并各个小问题的解。
  4. 测试并修改方案,直到达到要求的精度。

数学模型公式

分治算法的数学模型比较复杂。为了估算复杂度,我们假设把大问题分解为两部分,每个部分包含n/2个元素。设M为每个小问题的最坏情况运行时间,则T(n)=2T(n/2)+M(n)。由于时间复杂度是对数级别的,因此用一个常数系数a=2^(log2(n)-floor(log2(n)))代替,得到的算法时间复杂度为Θ(nlogn)。分治算法的空间复杂度为Θ(n), 只需要额外的n个元素保存中间结果。

3. 并行并行算法——快速排序

原理

快速排序(QuickSort)是对冒泡排序的一种改进,其基本思想是选定一个元素作为枢轴(pivot),将数组分成两个子数组,左边的子数组的元素都小于等于枢轴,右边的子数组的元素都大于等于枢轴。然后,对两个子数组重复以上步骤,直至子数组的长度为1。

快速排序的时间复杂度为O(nlogn)最好、平均、最坏三个情况下。并且,它是不稳定的排序算法。快速排序的空间复杂度为O(logn)。

操作步骤

  1. 在数组中选择一个元素作为枢轴。
  2. 以枢轴为中心,将数组分成两个子数组,左边的子数组的元素都小于等于枢轴,右边的子ARRAY的元素都大于等于枢AXIS。
  3. 对两个子数组重复以上步骤,直至子数组的长度为1。

数学模型公式

快速排序的平均时间复杂度为O(nlogn)。但在最坏情况下,快速排序的速度可能会很慢,比如在所有元素都是唯一的情况下,时间复杂度为O(n^2)。然而,最坏情况下仍然比平均情况要好。

快速排序的空间复杂度为O(logn),这是由于栈空间的限制。

4.具体代码实例和详细解释说明

1. 并行排序算法——基数排序

void RadixSort(int arr[], int n){
    int max_num = INT_MIN; // find the maximum number in the array

    for (int i = 0; i < n; ++i) {
        if (arr[i] > max_num)
            max_num = arr[i];
    }

    int exp = 1;
    while ((max_num / exp)!= 0) {
        CountingSort(exp);
        exp *= 10; // move to next digit
    }
}

void CountingSort(int exp) {
    int output[MAX_NUM];
    memset(output, 0, sizeof(output));

    int freq[10] = { 0 };

    // count frequency of each digit
    for (int i = 0; i < n; ++i) {
        int index = (arr[i]/exp)%10;
        freq[index]++;
    }

    // compute cumulative sum of frequencies
    for (int i = 1; i < 10; ++i)
        freq[i] += freq[i - 1];

    // fill output array with sorted numbers
    for (int i = n - 1; i >= 0; --i) {
        int index = (arr[i] / exp) % 10;
        output[freq[index]] = arr[i];
        freq[index]--;
    }

    memcpy(arr, output, sizeof(arr));
}

2. 并行搜索算法——分治法

// recursive binary search function
int BinarySearchRec(int arr[], int left, int right, int x) {
    if (right >= left) {
        int mid = left + (right - left) / 2;

        // If element is present at the middle itself
        if (arr[mid] == x)
            return mid;

        // If element is smaller than mid, then it can only be present in left subarray
        if (arr[mid] > x)
            return BinarySearchRec(arr, left, mid - 1, x);

        // Else the element can only be present in right subarray
        else
            return BinarySearchRec(arr, mid + 1, right, x);
    }

    // Element is not present in array
    return -1;
}


// parallel binary search function using multiple threads
int BinarySearchPar(int arr[], int n, int x) {
    static thread threads[MAX_THREADS];
    static int tids[MAX_THREADS];
    static bool running[MAX_THREADS];

    const int numThreads = min(n, MAX_THREADS);

    int partitionSize = ceil(float(n) / float(numThreads));

    // start a new thread for each partition
    for (int tid = 0; tid < numThreads; ++tid) {
        running[tid] = true;
        tids[tid] = tid;
        threads[tid] = thread(&BinarySearchRecursiveHelper, ref(running[tid]),
                               ref(tids[tid]), ref(x), arr + tid * partitionSize,
                               partitionSize);
    }

    // wait until all threads are done
    for (int tid = 0; tid < numThreads; ++tid) {
        threads[tid].join();
    }

    // merge results from different partitions
    int resultIndex = lowerBound(tids, 0, numThreads, x);
    if (resultIndex < numThreads && tids[resultIndex] == x)
        return resultIndex * partitionSize + bsearch(arr + tids[resultIndex] * partitionSize,
                                                    partitionSize, x);

    return -1;
}


// helper function for parallelized binary search algorithm
void BinarySearchRecursiveHelper(bool &isRunning, int &tid, int x,
                                  vector<int>::iterator arrBegin, size_t len) {
    while (isRunning) {
        lock_guard<mutex> lck(mtx);

        // check if we still have work to do on this partition
        if (!q.empty()) {
            pair<vector<int>::iterator, vector<int>::iterator> p = q.front();
            q.pop();

            auto begin = upperBound(p.first, p.second, x);
            auto end = lowerBound(p.first, p.second, *(begin - 1) + 1);

            // perform local binary search on current range
            auto res = bsearch(begin, end, x);

            // add resulting index to global queue
            if (res!= end)
                indexes.push({tid, res});
        }

        // signal other threads that they should stop working on their partitions
        if (stop.load() ||!isRunning || q.empty()) {
            conditionVariable.notify_all();
        }
    }
}


// initialize the worker threads and create queues of work items for them
void initWorkers(size_t n) {
    const int numThreads = min(n, MAX_THREADS);

    for (int tid = 0; tid < numThreads; ++tid) {
        threads.emplace_back([&, tid]() {
            taskQueue.emplace(tid, qSize / numThreads);

            while (true) {
                unique_lock<mutex> lck(taskMutex);

                conditionVariable.wait(lck, [&]() -> bool {
                    return!taskQueue.empty() || stopSignal.load();
                });

                if (stopSignal.load())
                    break;

                pair<vector<int>::iterator, vector<int>::iterator> p = taskQueue.top();
                taskQueue.pop();
                lck.unlock();

                // perform local binary search on current range
                auto begin = upperBound(p.first, p.second, x);
                auto end = lowerBound(p.first, p.second, *(begin - 1) + 1);

                auto res = bsearch(begin, end, x);

                if (res!= end) {
                    indexes.push({tid, res});
                }
            }
        });
    }
}
本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到