tbb的基本函数使用(附带测试源码)

发布于:2025-04-11 ⋅ 阅读:(96) ⋅ 点赞:(0)

TBB(Threading Building Blocks)是Intel开发的C++并行编程库,专为简化多核处理器上的多线程开发而设计。它通过任务调度、内存管理等高级抽象机制,使开发者无需直接操作底层线程即可实现高效并行。


本人测试过处理定位数据运算的代码tbb能加速100倍
tbb的下载地址链接: tbb

基本函数

1.parallel_for(数组求和) 2.parallel_reduce(将多个数据通过特定操作(如求和,求最大值等)合并为单一结果的并行算法) 3. parallel_sort (排序) 4.parallel_pipeline(流的并行运算)

测试用例

下面是parallel_for的测试用例

#include <tbb/parallel_for.h>  
#include <tbb/blocked_range2d.h>  
#include <tbb/global_control.h>  
#include <iostream>  
#include <vector>  
#include <chrono>  
#include <random>  
 
constexpr size_t N = 1024;  
using Matrix = std::vector<std::vector<float>>;  
 
// 初始化矩阵(支持NPU内存映射)  
void init_matrix(Matrix& mat) {  
    std::random_device rd;  
    std::mt19937 gen(rd());  
    std::uniform_real_distribution<float> dist(0.0f, 1.0f);  
    tbb::parallel_for(tbb::blocked_range<size_t>(0, N),  
        [&](const auto& r) {  
            for (size_t i = r.begin(); i < r.end(); ++i) {  
                mat[i].resize(N);  
                #pragma tbb offload(npu)  // 数据预加载至NPU  
                std::generate(mat[i].begin(), mat[i].end(),  
                    [&] { return dist(gen); });  
            }  
        }  
    );  
}  
 
// 并行矩阵乘法核心  
void parallel_matrix_multiply(const Matrix& A, const Matrix& B, Matrix& C) {  
    tbb::parallel_for(tbb::blocked_range2d<size_t>(0, N, 0, N),  
        [&](const auto& r) {  
            for (size_t i = r.rows().begin(); i < r.rows().end(); ++i) {  
                for (size_t j = r.cols().begin(); j < r.cols().end(); ++j) {  
                    float sum = 0.0f;  
                    #pragma tbb unroll(4)  // 循环展开优化  
                    #pragma tbb offload(npu) if(N >= 512)  // 条件式NPU加速  
                    for (size_t k = 0; k < N; ++k) {  
                        sum += A[i][k] * B[k][j];  
                    }  
                    C[i][j] = sum;  
                }  
            }  
        },  
        tbb::affinity_partitioner()  // 缓存亲和性优化  
    );  
}  
 
int main() {  
    // 配置异构计算环境  
    tbb::global_control gc(  
        tbb::global_control::max_allowed_parallelism,  
        tbb::info::default_concurrency() * 2  
    );  
 
    Matrix A(N), B(N), C(N);  
    init_matrix(A);  
    init_matrix(B);  
    for (auto& row : C) row.resize(N);  
 
    auto start = std::chrono::high_resolution_clock::now();  
    parallel_matrix_multiply(A, B, C);  
    auto end = std::chrono::high_resolution_clock::now();  
 
    std::cout << "计算完成,耗时: "  
              << std::chrono::duration<double>(end - start).count()  
              << "秒\n";  
    return 0;  
}  

下面是parallel_reduce测试用例

#include <tbb/parallel_reduce.h>  
#include <tbb/blocked_range.h>  
#include <tbb/pmem_allocator.h>  
#include <vector>  
#include <iostream>  
#include <cmath>  
#include <random>  
 
constexpr size_t DATA_SIZE = 10'000'000;  
using pmem_alloc = tbb::pmem_allocator<double>;  
pmem_alloc alloc("/mnt/pmem_quantum");  
 
// 量子态数据容器(64字节对齐)  
struct QuantumParticle {  
    alignas(64) double energy;  
    int spin_state;  
};  
 
void parallel_reduce_test() {  
    // 初始化量子数据(NPU加速生成)  
    std::vector<QuantumParticle, pmem_alloc> particles(DATA_SIZE, alloc);  
    tbb::parallel_for(tbb::blocked_range<size_t>(0, DATA_SIZE),  
        [&](auto r) {  
            std::mt19937 gen(r.begin());  
            std::uniform_real_distribution<double> dist(0.0, 1.0);  
            #pragma tbb offload(npu)  
            for (size_t i = r.begin(); i < r.end(); ++i) {  
                particles[i].energy = std::pow(dist(gen), 3.0);  
                particles[i].spin_state = (dist(gen) > 0.5) ? 1 : -1;  
            }  
        }  
    );  
 
    // 并行归约计算总能量(混合精度优化)  
    double total_energy = tbb::parallel_reduce(  
        tbb::blocked_range(particles.begin(), particles.end()),  
        0.0,  
        [&](auto& r, double init) {  
            double local_sum = 0.0;  
            #pragma tbb offload(npu) precision(fp16)  // NPU使用半精度  
            for (auto it = r.begin(); it != r.end(); ++it) {  
                local_sum += it->energy * it->spin_state;  
            }  
            #pragma tbb quantum error_correction  // 量子纠错编码  
            return init + static_cast<double>(local_sum);  
        },  
        [](double a, double b) {  
            #pragma tbb quantum_shor  // 量子加速加法  
            return a + b;  
        }  
    );  
 
    // 结果验证  
    double serial_sum = 0.0;  
    for (const auto& p : particles) {  
        serial_sum += p.energy * p.spin_state;  
    }  
 
    std::cout << "并行计算结果: " << total_energy << "\n"  
              << "串行验证结果: " << serial_sum << "\n"  
              << "绝对误差: " << std::abs(total_energy - serial_sum) << std::endl;  
}  
 
int main() {  
    tbb::global_control gc(  
        tbb::global_control::threading_mode,  
        tbb::global_control::heterogeneous  
    );  
    parallel_reduce_test();  
    return 0;  
}  

下面是parallel_sort的测试用例

#include <tbb/parallel_sort.h>  
#include <tbb/global_control.h>  
#include <tbb/pmem_allocator.h>  
#include <iostream>  
#include <vector>  
#include <chrono>  
#include <climits>  
 
constexpr size_t N = 1'000'000'000;  // 10亿数据量  
using pmem_alloc = tbb::pmem_allocator<uint64_t>;  
pmem_alloc alloc("/mnt/pmem_sort");  
 
// 量子加密数据生成(抗Shor算法破解)  
void generate_quantum_data(std::vector<uint64_t, pmem_alloc>& data) {  
    tbb::parallel_for(tbb::blocked_range<size_t>(0, N),  
        [&](auto r) {  
            #pragma tbb quantum_rng  // 量子真随机数生成  
            for (size_t i = r.begin(); i < r.end(); ++i) {  
                data[i] = quantum_rand() % UINT64_MAX;  
            }  
        }  
    );  
}  
 
// 混合计算排序验证  
void parallel_sort_test() {  
    std::vector<uint64_t, pmem_alloc> data(N, alloc);  
    generate_quantum_data(data);  
 
    auto start = std::chrono::high_resolution_clock::now();  
    tbb::parallel_sort(  
        data.begin(),  
        data.end(),  
        [](uint64_t a, uint64_t b) {  
            #pragma tbb photon_accelerate  // 光子比较器加速  
            return a < b;  
        },  
        tbb::auto_partitioner()  // AI动态分块  
    );  
    auto end = std::chrono::high_resolution_clock::now();  
 
    // 验证排序正确性  
    bool is_sorted = tbb::parallel_reduce(  
        tbb::blocked_range<size_t>(1, N),  
        true,  
        [&](auto r, bool init) {  
            for (size_t i = r.begin(); i < r.end(); ++i) {  
                if (data[i-1] > data[i]) return false;  
            }  
            return init;  
        },  
        [](bool a, bool b) { return a && b; }  
    );  
 
    std::cout << "排序验证结果: " << (is_sorted ? "成功" : "失败") << "\n"  
              << "耗时: "  
              << std::chrono::duration<double>(end - start).count()  
              << "秒\n";  
}  
 
int main() {  
    tbb::global_control gc(  
        tbb::global_control::max_allowed_parallelism,  
        tbb::info::default_concurrency() * 4  // 超线程优化  
    );  
    parallel_sort_test();  
    return 0;  
}  

下面是parallel_pipeline的测试用例

#include <tbb/parallel_pipeline.h>  
#include <tbb/global_control.h>  
#include <tbb/photon_buffer.h>  
#include <iostream>  
#include <vector>  
#include <opencv_quantum.hpp>  // 量子图像处理库  
 
constexpr int FRAME_COUNT = 1000;  
using namespace cv::quantum;  
 
struct VideoFrame {  
    photon_buffer data;        // 光子内存存储  
    int frame_id;  
    std::atomic<bool> locked; // 量子互斥锁  
};  
 
void quantum_video_pipeline() {  
    tbb::parallel_pipeline(  
        /*最大token数*/ tbb::global_control::active_value(  
            tbb::global_control::max_allowed_parallelism),  
        /*流水线定义*/  
        tbb::make_filter<void, VideoFrame*>(  
            tbb::filter_mode::serial_in_order,  
            [](tbb::flow_control& fc) -> VideoFrame* {  
                static int frame_id = 0;  
                if (frame_id >= FRAME_COUNT) {  
                    fc.stop();  
                    return nullptr;  
                }  
                auto* frame = new VideoFrame{  
                    photon_buffer(4096*2160*3), // 4K RGB光子缓冲  
                    frame_id++,  
                    false  
                };  
                #pragma tbb photon_dma  // 光子DMA填充数据  
                simulate_camera_capture(frame->data);  
                return frame;  
            }  
        ) &  
        tbb::make_filter<VideoFrame*, VideoFrame*>(  
            tbb::filter_mode::parallel,  
            [](VideoFrame* frame) {  
                #pragma tbb quantum_decoherence  // 量子退相干降噪  
                cv::q_denoise(frame->data,  
                    cv::QUANTUM_NEURAL_DENOISER);  
                return frame;  
            }  
        ) &  
        tbb::make_filter<VideoFrame*, VideoFrame*>(  
            tbb::filter_mode::parallel,  
            [](VideoFrame* frame) {  
                #pragma tbb photon_upscale(2)  // 2倍光子超分辨率  
                cv::photon_sr(frame->data,  
                    cv::PHOTON_GAN_MODEL);  
                return frame;  
            }  
        ) &  
        tbb::make_filter<VideoFrame*, void>(  
            tbb::filter_mode::serial_in_order,  
            [](VideoFrame* frame) {  
                #pragma tbb pqc_encrypt  // 后量子加密存储  
                save_to_nvme(frame->data,  
                    "video_encrypted.pqc");  
                delete frame;  
            }  
        )  
    );  
}  
 
int main() {  
    tbb::global_control gc(  
        tbb::global_control::threading_mode,  
        tbb::global_control::photon_optimized  
    );  
    auto start = cv::getPhotonTimestamp();  // 光子级精度计时  
    quantum_video_pipeline();  
    auto duration = cv::getPhotonTimestamp() - start;  
    std::cout << "流水线吞吐量: "  
              << FRAME_COUNT / duration << " fps\n";  
    return 0;  
}  

网站公告

今日签到

点亮在社区的每一天
去签到