ROS合集(八)SVIn2声呐数据流动

发布于:2025-07-07 ⋅ 阅读:(14) ⋅ 点赞:(0)

一、整体数据流概览

下面用一个简洁的 ASCII 流程图来表达各部分的关系,其中用 粗体 标出一直在执行的主线(matchingLoop):

ROS 发布声纳原始数据
    │
    ▼
1. sonarCallback 回调
    │
    ▼
2. sonarMeasurementsReceived_ (线程安全队列)
    │
    ├─[sonarConsumerLoop 线程]──►3. sonarMeasurements_ (历史缓存 Deque)
    │                            │
    │                            └─ gotSonarData 通知同步器
    │
    ▼
**主融合线程 matchingLoop(循环执行)**
    │
    ▼
4. waitForUpToDateSonarData → getSonarMeasurements(begin, end)
    │
    ▼
5. Estimator::addStates 中取最新测量 → 构造 SonarError 残差块
    │
    ▼
6. Ceres 优化:联合视觉与声纳约束
    │
    └───────────────┐
                    ▼
(返回主融合线程,继续下一帧循环)
  • 第一部分:ROS 回调线程接收原始数据并推入 sonarMeasurementsReceived_
  • 第二部分sonarConsumerLoop 从接收队列消费,写入 sonarMeasurements_,并通过 gotSonarData 通知主线程。
  • 主线matchingLoop 永远在循环:
    1. 等待同步完的声纳数据
    2. 切片取出本帧所需的声纳测量
    3. addStates 中构造 SonarError
    4. 与视觉残差一起交给 Ceres 优化
    5. 回到循环开头,处理下一帧。

二、关键模块 & 代码对应

功能 模块/函数 文件位置 作用描述
1. 声纳回调 Subscriber::sonarCallback subscriber.cpp 接收 ROS 消息,量化、剔除噪声后推入接收队列
2. 接收队列缓冲 ThreadedKFVio::sonarMeasurementsReceived_ ThreadedKFVio.h / .cpp 线程安全队列,临时存储所有进来的 sonar 测量
3. 消费 & 历史缓存 ThreadedKFVio::sonarConsumerLoop ThreadedKFVio.cpp 从接收队列拉取数据,加锁后写入 sonarMeasurements_ deque
4. 数据同步 & 切片 matchingLoop 中的同步逻辑 ThreadedKFVio.cpp 等待到足够新数据 → 调用 getSonarMeasurements(begin,end)
5. 数据切片函数 ThreadedKFVio::getSonarMeasurements ThreadedKFVio.cpp sonarMeasurements_ 加锁遍历,返回时间窗内测量队列
6. 添加残差到优化 Estimator::addStates(...) Estimator.cpp 取最新一条测量 → 转世界坐标 → 匹配视觉地图 → 添加 SonarError

三、各部分详解

1. ROS 回调:sonarCallback

  void sonarCallback(const imagenex831l::ProcessedRange::ConstPtr& msg);
// @Sharmin
/**
 * @brief 处理 Imagenex 831L 声呐的 ProcessedRange 消息
 * @param msg  经过驱动预处理后的声呐距离强度数据
 *
 * 主要流程:
 *  1. 将极化角度 head_position 从度转弧度;
 *  2. 找到强度数组中的最大值(排除最后 100 个采样点,以忽略远距离噪声);
 *  3. 计算对应的绝对距离 range;
 *  4. 若目标在 4.5 m 内且反射强度足够大,则注入 VIO/SLAM 后端。
 */
void Subscriber::sonarCallback(const imagenex831l::ProcessedRange::ConstPtr& msg) {  
  // 1. 量化分辨率(每个 bin 对应的实际距离,单位: 米)
  double rangeResolution = msg->max_range / msg->intensity.size();
    // 2. 寻找强度最大值
  int max = 0;
  int maxIndex = 0;
  //
  // 说明:实验时将 max_range 设置得较高,导致末端采样点中充满噪声;
  //       这里排除末尾 100 个采样点,提高可靠性。
  // @Sharmin: discarding as range was set higher (which introduced some noisy data) during data collection
  for (unsigned int i = 0; i < msg->intensity.size() - 100; i++) {
    if (msg->intensity[i] > max) {
      max = msg->intensity[i];
      maxIndex = i;
    }
  }

  double range = (maxIndex + 1) * rangeResolution;
  double heading = (msg->head_position * M_PI) / 180;
 // 5. 过滤:只保留 4.5 m 内、强度 > 10 的目标
  // No magic no!! within 4.5 meter
  if (range < 4.5 && max > 10) {
    // 6. 将时间戳封装为 okvis::Time,送入后端做状态更新
    vioInterface_->addSonarMeasurement(okvis::Time(msg->header.stamp.sec, msg->header.stamp.nsec), range, heading);
    // 径向距离 range 和极化角度 heading 作为声呐测量值,注入 VIO 后端
  }
}
  • 剔除末端噪声:遍历到 size()-100
  • 量化range = index * resolution
  • 入队:两种模式(Blocking/NonBlocking)

2. addSonarMeasurement

// @Sharmin
// Add a Sonar measurement.
//==============================================================================
//  ThreadedKFVio::addSonarMeasurement()
//
//  将声呐量测封装后送入线程安全队列,供后端优化线程异步消费。
//  • 参数 stamp   —— 绝对时间戳(okvis::Time)
//  • 参数 range   —— 声呐测得的径向距离(米)
//  • 参数 heading —— 声呐当前方位角(弧度)
//
//  若 VIO 处于阻塞模式 (blocking_ == true) :
//     - 使用 PushBlockingIfFull();队列满时当前线程会等待,确保不丢帧。
//     - 返回值固定 true 表示已成功压入。
//  否则(非阻塞模式):
//     - 使用 PushNonBlockingDroppingIfFull();队列满则丢弃最旧元素,保持实时性。
//     - 返回值为队列在压入后是否恰好只剩 1 个元素,
//       可用于触发后端线程“刚从空变非空”的唤醒逻辑。
//==============================================================================
bool ThreadedKFVio::addSonarMeasurement(const okvis::Time& stamp, double range, double heading) {
  okvis::SonarMeasurement sonar_measurement;
  sonar_measurement.timeStamp = stamp;
  sonar_measurement.measurement.range = range;
  sonar_measurement.measurement.heading = heading;

  if (blocking_) {
    sonarMeasurementsReceived_.PushBlockingIfFull(sonar_measurement, 1);
    return true;
  } else {
    sonarMeasurementsReceived_.PushNonBlockingDroppingIfFull(sonar_measurement,
                                                             maxImuInputQueueSize_);  // Running same rate as imu
    return sonarMeasurementsReceived_.Size() == 1;
  }
}

2. 消费线程:sonarConsumerLoop

# ThreadedKFVio.cpp

// 如果 sonar被使用,启动sonarConsumerThread_线程
if (parameters_.sensorList.isSonarUsed) {
    sonarConsumerThread_ = std::thread(&ThreadedKFVio::sonarConsumerLoop, this);  // @Sharmin
}


// @Sharmin
// Loop to process sonar measurements.
void ThreadedKFVio::sonarConsumerLoop() {
  okvis::SonarMeasurement data;  // 声纳测量数据结构
  TimerSwitchable processSonarTimer("0 processSonarMeasurements", true);  // 创建一个计时器,用于测量处理声纳数据的时间

  for (;;) {  // 无限循环,持续处理声纳数据
    // get data and check for termination request
    if (sonarMeasurementsReceived_.PopBlocking(&data) == false) return;  // 阻塞式从队列中获取声纳数据,如果无法获取,则退出循环

    processSonarTimer.start();  // 开始计时处理声纳数据

    okvis::Time start;  // 开始时间戳
    const okvis::Time* end;  // 结束时间戳的指针,不需要复制数据

    {  // 使用大括号定义一个局部作用域,确保锁的及时释放
      std::lock_guard<std::mutex> sonarLock(sonarMeasurements_mutex_);  // 使用互斥锁保护声纳数据结构,防止数据竞争

      // 确保接收的声纳测量数据的时间戳是递增的
      OKVIS_ASSERT_TRUE(Exception,
                        sonarMeasurements_.empty() || sonarMeasurements_.back().timeStamp < data.timeStamp,
                        "Sonar measurement from the past received");
      if (sonarMeasurements_.size() > 0)
        start = sonarMeasurements_.back().timeStamp;  // 如果已有数据,则开始时间戳为上一个数据的时间戳
      else
        start = okvis::Time(0, 0);  // 如果没有数据,则开始时间戳设置为0

      end = &data.timeStamp;  // 设置结束时间戳为当前数据的时间戳
      sonarMeasurements_.push_back(data);  // 将新的声纳数据加入列表
    }  // 解锁 sonarMeasurements_mutex_

    // 通知其他线程新的声纳数据已经到达
    sonarFrameSynchronizer_.gotSonarData(data.timeStamp);

    processSonarTimer.stop();  // 停止计时器
  }
}

3. 数据同步与切片:matchingLoop

// Loop that matches frames with existing frames.
void ThreadedKFVio::matchingLoop() {
  TimerSwitchable prepareToAddStateTimer("2.1 prepareToAddState", true);
  TimerSwitchable waitForOptimizationTimer("2.2 waitForOptimization", true);
  TimerSwitchable addStateTimer("2.3 addState", true);
  TimerSwitchable matchingTimer("2.4 matching", true);

  for (;;) {
    // get new frame
    std::shared_ptr<okvis::MultiFrame> frame;

    // get data and check for termination request
    if (keypointMeasurements_.PopBlocking(&frame) == false) return;

    prepareToAddStateTimer.start();
    // -- get relevant imu messages for new state
    okvis::Time imuDataEndTime = frame->timestamp() + temporal_imu_data_overlap;
    okvis::Time imuDataBeginTime = lastAddedStateTimestamp_ - temporal_imu_data_overlap;

    OKVIS_ASSERT_TRUE_DBG(Exception,
                          imuDataBeginTime < imuDataEndTime,
                          "imu data end time is smaller than begin time."
                              << "current frametimestamp " << frame->timestamp() << " (id: " << frame->id()
                              << "last timestamp         " << lastAddedStateTimestamp_
                              << " (id: " << estimator_.currentFrameId());

    // wait until all relevant imu messages have arrived and check for termination request
    if (imuFrameSynchronizer_.waitForUpToDateImuData(okvis::Time(imuDataEndTime)) == false) return;
    OKVIS_ASSERT_TRUE_DBG(Exception,
                          imuDataEndTime < imuMeasurements_.back().timeStamp,
                          "Waiting for up to date imu data seems to have failed!");

    // TODO(Sharmin): check if needed to wait until all relevant sonar messages

    okvis::ImuMeasurementDeque imuData = getImuMeasurments(imuDataBeginTime, imuDataEndTime);

    prepareToAddStateTimer.stop();
    // if imu_data is empty, either end_time > begin_time or
    // no measurements in timeframe, should not happen, as we waited for measurements
    if (imuData.size() == 0) continue;

    // @Sharmin
    // Sonar Data
    // 初始化存储声纳测量数据的双端队列
    okvis::SonarMeasurementDeque sonarData;

    // 检查系统配置是否启用了声纳传感器
    if (parameters_.sensorList.isSonarUsed) {
      // 获取当前帧的时间戳作为声纳数据获取的结束时间
      okvis::Time sonarDataEndTime = frame->timestamp();
      // 获取上一个添加状态的时间戳作为声纳数据获取的开始时间
      okvis::Time sonarDataBeginTime = lastAddedStateTimestamp_;

      // 断言:确保声纳数据的开始时间小于结束时间
      OKVIS_ASSERT_TRUE_DBG(
          Exception, sonarDataBeginTime < sonarDataEndTime, "sonar data end time is smaller than begin time.");

      // 等待直到所有相关的声纳消息都已到达,并检查是否有终止请求
      if (sonarFrameSynchronizer_.waitForUpToDateSonarData(okvis::Time(sonarDataEndTime)) == false) {
        // 如果等待失败,则返回
        return;
      }

      // 再次断言:确保等待后获取的声纳数据的时间戳小于当前的最新时间戳
      OKVIS_ASSERT_TRUE_DBG(Exception,
                            sonarDataEndTime < sonarMeasurements_.back().timeStamp,
                            "Waiting for up to date sonar data seems to have failed!");

      // 获取指定时间范围内的声纳测量数据
      sonarData = getSonarMeasurements(sonarDataBeginTime, sonarDataEndTime);
      // 停止状态添加准备计时器(此处代码被注释)

      // 如果获取的声纳数据为空,则跳过当前循环的后续步骤
      if (sonarData.size() == 0) continue;
    }

    // Depth data
      
    // necessary
    {
      waitForOptimizationTimer.start();
      std::unique_lock<std::mutex> l(estimator_mutex_);
      while (!optimizationDone_) optimizationNotification_.wait(l);
      waitForOptimizationTimer.stop();
      addStateTimer.start();
      okvis::Time t0Matching = okvis::Time::now();
      bool asKeyframe = false;
      // @Sharmin
      if (estimator_.addStates(frame, imuData, parameters_, sonarData, depthData, firstDepth_, asKeyframe)) {
        lastAddedStateTimestamp_ = frame->timestamp();
        addStateTimer.stop();
      } else {
        LOG(ERROR) << "Failed to add state! will drop multiframe.";
        addStateTimer.stop();
        continue;
      }

      // -- matching keypoints, initialising landmarks etc.
      okvis::kinematics::Transformation T_WS;
      estimator_.get_T_WS(frame->id(), T_WS);
      matchingTimer.start();
      frontend_.dataAssociationAndInitialization(estimator_, T_WS, parameters_, map_, frame, &asKeyframe);
      matchingTimer.stop();
      if (asKeyframe) estimator_.setKeyframe(frame->id(), asKeyframe);
      if (!blocking_) {
        double timeLimit =
            parameters_.optimization.timeLimitForMatchingAndOptimization - (okvis::Time::now() - t0Matching).toSec();
        estimator_.setOptimizationTimeLimit(std::max<double>(0.0, timeLimit), parameters_.optimization.min_iterations);
      }
      optimizationDone_ = false;
    }  // unlock estimator_mutex_

    // use queue size 1 to propagate a congestion to the _matchedFrames queue
    if (matchedFrames_.PushBlockingIfFull(frame, 1) == false) return;
  }
}
  • 等待waitForUpToDateSonarData 阻塞直到 newestSonarDataStamp_ ≥ t_end
  • 切片:仅取 [t_begin, t_end] 时间窗内的测量

4. 切片实现:getSonarMeasurements

okvis::SonarMeasurementDeque ThreadedKFVio::getSonarMeasurements(okvis::Time& sonarDataBeginTime,
                                                                 okvis::Time& sonarDataEndTime) {
  // sanity checks:
  // if end time is smaller than begin time, return empty queue.
  // if begin time is larger than newest sonar time, return empty queue.
  if (sonarDataEndTime < sonarDataBeginTime || sonarDataBeginTime > sonarMeasurements_.back().timeStamp)
    return okvis::SonarMeasurementDeque();

  std::lock_guard<std::mutex> lock(sonarMeasurements_mutex_);
  // get iterator to sonar data before previous frame
  okvis::SonarMeasurementDeque::iterator first_sonar_package = sonarMeasurements_.begin();
  okvis::SonarMeasurementDeque::iterator last_sonar_package = sonarMeasurements_.end();
  // TODO(sharmin) go backwards through queue. Is probably faster.
  for (auto iter = sonarMeasurements_.begin(); iter != sonarMeasurements_.end(); ++iter) {
    // move first_sonar_package iterator back until iter->timeStamp is higher than requested begintime
    if (iter->timeStamp <= sonarDataBeginTime) first_sonar_package = iter;

    // set last_sonar_package iterator as soon as we hit first timeStamp higher than requested endtime & break
    if (iter->timeStamp >= sonarDataEndTime) {
      last_sonar_package = iter;
      // since we want to include this last sonar measurement in returned Deque we
      // increase last_sonar_package iterator once.
      ++last_sonar_package;
      break;
    }
  }

  // create copy of sonar buffer
  return okvis::SonarMeasurementDeque(first_sonar_package, last_sonar_package);
}

5. 构造 & 添加残差:addStates

bool Estimator::addStates(okvis::MultiFramePtr multiFrame,
                          const okvis::ImuMeasurementDeque& imuMeasurements,
                          const okvis::VioParameters& params,                    /* @Sharmin */
                          const okvis::SonarMeasurementDeque& sonarMeasurements, /* @Sharmin */
                          const okvis::DepthMeasurementDeque& depthMeasurements,
                          double firstDepth, /* @Sharmin */
                          bool asKeyframe) {
  // Note Sharmin: this is for imu propagation no matter isScaleRefined_ is true/false.
  // TODO(Sharmin): Start actual optimization when isScaleRefined_ = true.

  okvis::kinematics::Transformation T_WS;
  okvis::SpeedAndBias speedAndBias;
  if (statesMap_.empty()) {
    // in case this is the first frame ever, let's initialize the pose:
    bool success0 = initPoseFromImu(imuMeasurements, T_WS);
    OKVIS_ASSERT_TRUE_DBG(Exception, success0, "pose could not be initialized from imu measurements.");
    if (!success0) return false;

    // Sharmin
    if (multiFrame->numKeypoints() > 15) {
      std::cout << "Initialized!! As enough keypoints are found" << std::endl;
      std::cout << "Initial T_WS: " << T_WS.parameters();
    } else {
      return false;
    }
    // End Sharmin

    speedAndBias.setZero();
    speedAndBias.segment<3>(6) = imuParametersVec_.at(0).a0;
  } else {
    // get the previous states
    uint64_t T_WS_id = statesMap_.rbegin()->second.id;  // n-th state
    uint64_t speedAndBias_id =
        statesMap_.rbegin()->second.sensors.at(SensorStates::Imu).at(0).at(ImuSensorStates::SpeedAndBias).id;
    OKVIS_ASSERT_TRUE_DBG(
        Exception, mapPtr_->parameterBlockExists(T_WS_id), "this is an okvis bug. previous pose does not exist.");
    T_WS = std::static_pointer_cast<ceres::PoseParameterBlock>(mapPtr_->parameterBlockPtr(T_WS_id))->estimate();
    // OKVIS_ASSERT_TRUE_DBG(
    //    Exception, speedAndBias_id,
    //    "this is an okvis bug. previous speedAndBias does not exist.");
    speedAndBias =
        std::static_pointer_cast<ceres::SpeedAndBiasParameterBlock>(mapPtr_->parameterBlockPtr(speedAndBias_id))
            ->estimate();

    // propagate pose and speedAndBias
    // @Sharmin: using only imu measurements
    // Modified to add scale
    // Eigen::Matrix<double, 15, 15> covariance; // not used
    // Eigen::Matrix<double, 15, 15> jacobian;  // not used
    Eigen::Vector3d acc_doubleinteg;
    Eigen::Vector3d acc_integ;
    double Del_t;

    int numUsedImuMeasurements = ceres::ImuError::propagation(imuMeasurements,
                                                              imuParametersVec_.at(0),
                                                              T_WS,
                                                              speedAndBias,
                                                              statesMap_.rbegin()->second.timestamp,
                                                              multiFrame->timestamp(),
                                                              0,
                                                              0,
                                                              acc_doubleinteg,
                                                              acc_integ,
                                                              Del_t);

    OKVIS_ASSERT_TRUE_DBG(Exception, numUsedImuMeasurements > 1, "propagation failed");
    if (numUsedImuMeasurements < 1) {
      LOG(INFO) << "numUsedImuMeasurements=" << numUsedImuMeasurements;
      return false;
    }

    // Added by Sharmin
    setImuPreIntegral(multiFrame->id(), acc_doubleinteg, acc_integ, Del_t);
  }

  // create a states object:
  States states(asKeyframe, multiFrame->id(), multiFrame->timestamp());

  // Added by Sharmin
  stateCount_ = stateCount_ + 1;
  // LOG (INFO) << "No. of state created: "<< stateCount_;
  // LOG (INFO) << "statesMap_ size: "<< statesMap_.size();

  // check if id was used before
  OKVIS_ASSERT_TRUE_DBG(
      Exception, statesMap_.find(states.id) == statesMap_.end(), "pose ID" << states.id << " was used before!");

  // create global states
  std::shared_ptr<okvis::ceres::PoseParameterBlock> poseParameterBlock(
      new okvis::ceres::PoseParameterBlock(T_WS, states.id, multiFrame->timestamp()));
  states.global.at(GlobalStates::T_WS).exists = true;
  states.global.at(GlobalStates::T_WS).id = states.id;

  if (statesMap_.empty()) {
    referencePoseId_ = states.id;  // set this as reference pose
    if (!mapPtr_->addParameterBlock(poseParameterBlock, ceres::Map::Pose6d)) {
      return false;
    }
  } else {
    if (!mapPtr_->addParameterBlock(poseParameterBlock, ceres::Map::Pose6d)) {
      return false;
    }
  }

  // End @Sharmin

  // add to buffer
  statesMap_.insert(std::pair<uint64_t, States>(states.id, states));
  multiFramePtrMap_.insert(std::pair<uint64_t, okvis::MultiFramePtr>(states.id, multiFrame));

  // the following will point to the last states:
  std::map<uint64_t, States>::reverse_iterator lastElementIterator = statesMap_.rbegin();
  lastElementIterator++;

  // cameras:
  for (size_t i = 0; i < extrinsicsEstimationParametersVec_.size(); ++i) {
    SpecificSensorStatesContainer cameraInfos(2);
    cameraInfos.at(CameraSensorStates::T_SCi).exists = true;
    cameraInfos.at(CameraSensorStates::Intrinsics).exists = false;
    if (((extrinsicsEstimationParametersVec_.at(i).sigma_c_relative_translation < 1e-12) ||
         (extrinsicsEstimationParametersVec_.at(i).sigma_c_relative_orientation < 1e-12)) &&
        (statesMap_.size() > 1)) {
      // use the same block...
      cameraInfos.at(CameraSensorStates::T_SCi).id =
          lastElementIterator->second.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id;
    } else {
      const okvis::kinematics::Transformation T_SC = *multiFrame->T_SC(i);
      uint64_t id = IdProvider::instance().newId();
      std::shared_ptr<okvis::ceres::PoseParameterBlock> extrinsicsParameterBlockPtr(
          new okvis::ceres::PoseParameterBlock(T_SC, id, multiFrame->timestamp()));
      if (!mapPtr_->addParameterBlock(extrinsicsParameterBlockPtr, ceres::Map::Pose6d)) {
        return false;
      }
      cameraInfos.at(CameraSensorStates::T_SCi).id = id;
    }
    // update the states info
    statesMap_.rbegin()->second.sensors.at(SensorStates::Camera).push_back(cameraInfos);
    states.sensors.at(SensorStates::Camera).push_back(cameraInfos);
  }

  // IMU states are automatically propagated.
  for (size_t i = 0; i < imuParametersVec_.size(); ++i) {
    SpecificSensorStatesContainer imuInfo(2);
    imuInfo.at(ImuSensorStates::SpeedAndBias).exists = true;
    uint64_t id = IdProvider::instance().newId();
    std::shared_ptr<okvis::ceres::SpeedAndBiasParameterBlock> speedAndBiasParameterBlock(
        new okvis::ceres::SpeedAndBiasParameterBlock(speedAndBias, id, multiFrame->timestamp()));

    if (!mapPtr_->addParameterBlock(speedAndBiasParameterBlock)) {
      return false;
    }
    imuInfo.at(ImuSensorStates::SpeedAndBias).id = id;
    statesMap_.rbegin()->second.sensors.at(SensorStates::Imu).push_back(imuInfo);
    states.sensors.at(SensorStates::Imu).push_back(imuInfo);
  }

  // @Sharmin
  // Depth
  if (depthMeasurements.size() != 0) {
    // Though there should not be more than one depth data
    double mean_depth = 0.0;
    for (auto depthMeasurements_it = depthMeasurements.begin(); depthMeasurements_it != depthMeasurements.end();
         ++depthMeasurements_it) {
      mean_depth += depthMeasurements_it->measurement.depth;
    }
    mean_depth = mean_depth / depthMeasurements.size();

    double information_depth = 5.0;  // TODO(Sharmin) doublre check with the manual

    std::shared_ptr<ceres::DepthError> depthError(new ceres::DepthError(mean_depth, information_depth, firstDepth));
    mapPtr_->addResidualBlock(depthError, NULL, poseParameterBlock);
    std::cout << "Residual block z: " << (*poseParameterBlock->parameters()) + 2 << std::endl;
  }
  // Sonar
  std::vector<Eigen::Vector3d> landmarkSubset;
  Eigen::Vector3d sonar_landmark;
  double range = 0.0, heading = 0.0;

  // @Sharmin
  if (sonarMeasurements.size() != 0) {
    auto last_sonarMeasurement_it = sonarMeasurements.rbegin();

    // Taking the nearest range value to the n+1 th frame
    range = last_sonarMeasurement_it->measurement.range;
    heading = last_sonarMeasurement_it->measurement.heading;

    okvis::kinematics::Transformation T_WSo = T_WS * params.sonar.T_SSo;

    okvis::kinematics::Transformation sonar_point(Eigen::Vector3d(range * cos(heading), range * sin(heading), 0.0),
                                                  Eigen::Quaterniond(1.0, 0.0, 0.0, 0.0));
    okvis::kinematics::Transformation T_WSo_point = T_WSo * sonar_point;

    sonar_landmark = T_WSo_point.r();
    // std::cout << "T_WSo: " << T_WSo.r() << std::endl;
    // std::cout << "T_WSo_point: " << sonar_landmark << std::endl;

    // may be the reverse searching is faster
    for (PointMap::const_reverse_iterator rit = landmarksMap_.rbegin(); rit != landmarksMap_.rend(); ++rit) {
      Eigen::Vector3d visual_landmark;
      if (fabs(rit->second.point[3]) > 1.0e-8) {
        visual_landmark = (rit->second.point / rit->second.point[3]).head<3>();
        // LOG (INFO) << "Visual Landmark: " << visual_landmark;
      }
      // double distance_from_sonar = (sonar_landmark - visual_landmark).norm();  //Euclidean distance
      if (fabs(sonar_landmark[0] - visual_landmark[0]) < 0.1 && fabs(sonar_landmark[1] - visual_landmark[1]) < 0.1 &&
          fabs(sonar_landmark[2] - visual_landmark[2]) < 0.1) {
        // TODO(sharmin) parameter!!
        // searching around 10 cm of sonar landmark

        landmarkSubset.push_back(visual_landmark);
      }
    }

    // std::cout << "Size of visual patch: "<<landmarkSubset.size() << std::endl;

    if (landmarkSubset.size() > 0) {
      // LOG (INFO) << " Sonar added for ceres optimization";
      // @Sharmin
      // add sonarError and related addResidualBlock
      double information_sonar = 1.0;  // TODO(sharmin) calculate properly?

      std::shared_ptr<ceres::SonarError> sonarError(
          new ceres::SonarError(params, range, heading, information_sonar, landmarkSubset));
      mapPtr_->addResidualBlock(sonarError, NULL, poseParameterBlock);
    }
    // End @Sharmin
  }

  // depending on whether or not this is the very beginning, we will add priors or relative terms to the last state:
  if (statesMap_.size() == 1) {
    // let's add a prior
    Eigen::Matrix<double, 6, 6> information = Eigen::Matrix<double, 6, 6>::Zero();
    information(5, 5) = 1.0e8;
    information(0, 0) = 1.0e8;
    information(1, 1) = 1.0e8;
    information(2, 2) = 1.0e8;
    std::shared_ptr<ceres::PoseError> poseError(new ceres::PoseError(T_WS, information));
    /*auto id2= */ mapPtr_->addResidualBlock(poseError, NULL, poseParameterBlock);
    // mapPtr_->isJacobianCorrect(id2,1.0e-6);

    // sensor states
    for (size_t i = 0; i < extrinsicsEstimationParametersVec_.size(); ++i) {
      double translationStdev = extrinsicsEstimationParametersVec_.at(i).sigma_absolute_translation;
      double translationVariance = translationStdev * translationStdev;
      double rotationStdev = extrinsicsEstimationParametersVec_.at(i).sigma_absolute_orientation;
      double rotationVariance = rotationStdev * rotationStdev;
      if (translationVariance > 1.0e-16 && rotationVariance > 1.0e-16) {
        const okvis::kinematics::Transformation T_SC = *multiFrame->T_SC(i);
        std::shared_ptr<ceres::PoseError> cameraPoseError(
            new ceres::PoseError(T_SC, translationVariance, rotationVariance));
        // add to map
        mapPtr_->addResidualBlock(
            cameraPoseError,
            NULL,
            mapPtr_->parameterBlockPtr(states.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id));
        // mapPtr_->isJacobianCorrect(id,1.0e-6);
      } else {
        mapPtr_->setParameterBlockConstant(
            states.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id);
      }
    }
    for (size_t i = 0; i < imuParametersVec_.size(); ++i) {
      Eigen::Matrix<double, 6, 1> variances;
      // get these from parameter file
      const double sigma_bg = imuParametersVec_.at(0).sigma_bg;
      const double sigma_ba = imuParametersVec_.at(0).sigma_ba;
      std::shared_ptr<ceres::SpeedAndBiasError> speedAndBiasError(
          new ceres::SpeedAndBiasError(speedAndBias, 1.0, sigma_bg * sigma_bg, sigma_ba * sigma_ba));
      // add to map
      mapPtr_->addResidualBlock(
          speedAndBiasError,
          NULL,
          mapPtr_->parameterBlockPtr(states.sensors.at(SensorStates::Imu).at(i).at(ImuSensorStates::SpeedAndBias).id));
      // mapPtr_->isJacobianCorrect(id,1.0e-6);
    }

    // @Sharmin
    // End @Sharmin
  } else {
    // add IMU error terms
    for (size_t i = 0; i < imuParametersVec_.size(); ++i) {
      std::shared_ptr<ceres::ImuError> imuError(new ceres::ImuError(
          imuMeasurements, imuParametersVec_.at(i), lastElementIterator->second.timestamp, states.timestamp));
      /*::ceres::ResidualBlockId id = */ mapPtr_->addResidualBlock(
          imuError,
          NULL,
          mapPtr_->parameterBlockPtr(lastElementIterator->second.id),
          mapPtr_->parameterBlockPtr(
              lastElementIterator->second.sensors.at(SensorStates::Imu).at(i).at(ImuSensorStates::SpeedAndBias).id),
          mapPtr_->parameterBlockPtr(states.id),
          mapPtr_->parameterBlockPtr(states.sensors.at(SensorStates::Imu).at(i).at(ImuSensorStates::SpeedAndBias).id));
      // imuError->setRecomputeInformation(false);
      // mapPtr_->isJacobianCorrect(id,1.0e-9);
      // imuError->setRecomputeInformation(true);
    }

    // add relative sensor state errors
    for (size_t i = 0; i < extrinsicsEstimationParametersVec_.size(); ++i) {
      if (lastElementIterator->second.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id !=
          states.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id) {
        // i.e. they are different estimated variables, so link them with a temporal error term
        double dt = (states.timestamp - lastElementIterator->second.timestamp).toSec();
        double translationSigmaC = extrinsicsEstimationParametersVec_.at(i).sigma_c_relative_translation;
        double translationVariance = translationSigmaC * translationSigmaC * dt;
        double rotationSigmaC = extrinsicsEstimationParametersVec_.at(i).sigma_c_relative_orientation;
        double rotationVariance = rotationSigmaC * rotationSigmaC * dt;
        std::shared_ptr<ceres::RelativePoseError> relativeExtrinsicsError(
            new ceres::RelativePoseError(translationVariance, rotationVariance));
        mapPtr_->addResidualBlock(
            relativeExtrinsicsError,
            NULL,
            mapPtr_->parameterBlockPtr(
                lastElementIterator->second.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id),
            mapPtr_->parameterBlockPtr(states.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id));
        // mapPtr_->isJacobianCorrect(id,1.0e-6);
      }
    }
    // only camera. this is slightly inconsistent, since the IMU error term contains both
    // a term for global states as well as for the sensor-internal ones (i.e. biases).
    // TODO(sharmin): magnetometer, pressure, ...
  }

  return true;
}

四、核心代码块

下面是针对声纳核心部分的精简说明,按调用顺序列出每个代码块的主要内容,重点聚焦声纳数据流与处理。


1. sonarCallback —— ROS 回调 & 初步滤波

void Subscriber::sonarCallback(const imagenex831l::ProcessedRange::ConstPtr& msg) {
  // 1. 量化距离
  double resolution = msg->max_range / msg->intensity.size();
  // 2. 排除末尾 100 点噪声,找到最大强度及索引
  int maxInt = 0, maxIdx = 0;
  for (unsigned i = 0; i < msg->intensity.size() - 100; ++i) {
    if (msg->intensity[i] > maxInt) {
      maxInt = msg->intensity[i];
      maxIdx = i;
    }
  }
  // 3. 计算 range 与 heading
  double range   = (maxIdx + 1) * resolution;
  double heading = msg->head_position * M_PI / 180.0;
  // 4. 阈值滤波:4.5m 内且强度>10
  if (range < 4.5 && maxInt > 10) {
    vioInterface_->addSonarMeasurement(
      okvis::Time(msg->header.stamp.sec, msg->header.stamp.nsec),
      range, heading);
  }
}
  • 功能:从原始强度数组提取最强反射点;滤除远端噪声;阈值后,将 (timestamp, range, heading) 送入后端队列。

2. addSonarMeasurement —— 入队策略

bool ThreadedKFVio::addSonarMeasurement(
    const okvis::Time& stamp, double range, double heading) {
  okvis::SonarMeasurement m{stamp, {range, heading}};
  if (blocking_) {
    sonarMeasurementsReceived_.PushBlockingIfFull(m, 1);
    return true;
  } else {
    sonarMeasurementsReceived_.PushNonBlockingDroppingIfFull(
      m, maxImuInputQueueSize_);
    return sonarMeasurementsReceived_.Size() == 1;
  }
}
  • 功能:将声纳测量封装后写入 sonarMeasurementsReceived_ 线程安全队列。
  • Blocking 模式:队列满时等待;NonBlocking:丢弃最旧。

3. sonarConsumerLoop —— 后台消费 & 缓存

void ThreadedKFVio::sonarConsumerLoop() {
  okvis::SonarMeasurement data;
  for (;;) {
    if (!sonarMeasurementsReceived_.PopBlocking(&data)) return;
    {
      std::lock_guard<std::mutex> lk(sonarMeasurements_mutex_);
      // 保证时间戳递增
      OKVIS_ASSERT_TRUE(Exception,
        sonarMeasurements_.empty() ||
        sonarMeasurements_.back().timeStamp < data.timeStamp);
      sonarMeasurements_.push_back(data);
    }
    // 通知主线程:新数据已到达
    sonarFrameSynchronizer_.gotSonarData(data.timeStamp);
  }
}
  • 功能:持续从接收队列取测量,写入带互斥锁的 deque 历史缓冲,并通过条件变量唤醒主线程。

4. getSonarMeasurements —— 时间窗切片

okvis::SonarMeasurementDeque
ThreadedKFVio::getSonarMeasurements(
    okvis::Time& begin, okvis::Time& end) {
  if (end < begin || begin > sonarMeasurements_.back().timeStamp)
    return {};
  std::lock_guard<std::mutex> lk(sonarMeasurements_mutex_);
  auto it0 = sonarMeasurements_.begin(), it1 = sonarMeasurements_.end();
  for (auto it = sonarMeasurements_.begin(); it != sonarMeasurements_.end(); ++it) {
    if (it->timeStamp <= begin) it0 = it;
    if (it->timeStamp >= end) { it1 = std::next(it); break; }
  }
  return SonarMeasurementDeque(it0, it1);
}
  • 功能:在 [begin, end] 时间范围内截取声纳测量子队列,供后端融合使用。

5. 主融合线程 matchingLoop 中的声纳片段

// 在 matchingLoop() 循环内
if (parameters_.sensorList.isSonarUsed) {
  okvis::Time t_end   = frame->timestamp();
  okvis::Time t_begin = lastAddedStateTimestamp_;
  // 等待最新声纳数据
  if (!sonarFrameSynchronizer_.waitForUpToDateSonarData(t_end))
    return;
  // 切片取值
  sonarData = getSonarMeasurements(t_begin, t_end);
  if (sonarData.empty()) continue;
}
  • 功能:在每帧处理时,先等待直到 sonarMeasurements_ 中已有最新的声纳数据,再按帧时间窗切片,否则跳过。

6. Estimator::addStates —— 构造 SonarError

if (!sonarMeasurements.empty()) {
  auto last = sonarMeasurements.rbegin();
  double range   = last->measurement.range;
  double heading = last->measurement.heading;
  // 声纳→机体→世界坐标变换
  auto T_WSo = T_WS * params.sonar.T_SSo;
  auto sonar_pt = Transformation(
    Vector3d(range*cos(heading), range*sin(heading), 0.0));
  Vector3d landmark = (T_WSo * sonar_pt).r();
  // 在视觉地图中匹配
  for (auto it = landmarksMap_.rbegin(); it != landmarksMap_.rend(); ++it) {
    Vector3d vis = it->second.point3d();
    if ((vis - landmark).norm() < 0.1)
      subset.push_back(vis);
  }
  if (!subset.empty()) {
    auto err = std::make_shared<ceres::SonarError>(
      params, range, heading, 1.0, subset);
    mapPtr_->addResidualBlock(err, nullptr, poseParameterBlock);
  }
}
  • 功能:取最新测量、坐标转换、在视觉地图中找近邻点集,生成 SonarError 并添加到 Ceres 优化。

全流程主线

  1. 回调addSonarMeasurement
  2. 消费线程sonarMeasurements_ & gotSonarData
  3. matchingLoop
    • 等待声纳同步 & 切片 →
    • addStates 中构造声纳残差( SonarError ) →
    • 联合视觉、IMU、深度残差交给 Ceres 优化

聚焦声纳部分,这几段代码共同保障了“实时接收、缓存、时序切片、残差构造”的闭环,将声纳观测无缝融合进 VIO 后端。

五、相关线程分析

下面逐步说明为什么要用这些线程,以及每一步的具体目的:

ROS 发布声纳原始数据
    │
    ▼
1. sonarCallback 回调
    │
    ▼
2. sonarMeasurementsReceived_ (线程安全队列)
    │
    ├─[sonarConsumerLoop 线程]──►3. sonarMeasurements_ (历史缓存 Deque)
    │                            │
    │                            └─ gotSonarData 通知同步器
    │
    ▼
**主融合线程 matchingLoop(循环执行)**
    │
    ▼
4. waitForUpToDateSonarData → getSonarMeasurements(begin, end)
    │
    ▼
5. Estimator::addStates 中取最新测量 → 构造 SonarError 残差块
    │
    ▼
6. Ceres 优化:联合视觉与声纳约束
    │
    └───────────────┐
                    ▼
(返回主融合线程,继续下一帧循环)

1. sonarCallback 回调

  • 执行环境:由 ROS 的内部线程池或 I/O 线程触发。
  • 目的
    • 立刻响应新到的硬件消息(最低延迟)。
    • 做简单的滤波/阈值判断,快速决定“这条测量是否有效”。
    • 在此处做复杂或耗时的处理,避免阻塞 ROS 通讯。

2. sonarMeasurementsReceived_ 线程安全队列

  • 执行环境:共享于回调和消费线程。
  • 目的
    • 将“中短期传入的测量”先缓冲起来,平滑峰值流量。
    • 解耦“接收”与“处理”两者的速率差异。
    • 保证在非阻塞模式下,不丢帧地接收;在阻塞模式下,可按需等待。

3. sonarConsumerLoop 消费线程

  • 执行环境:独立后台线程,由 std::thread 启动。
  • 目的
    • 异步从接收队列里取出每条测量,避免主融合线程或回调线程做 I/O 操作。
    • 将测量追加到 历史缓存 sonarMeasurements_deque),并单独加锁保护。
    • 触发条件变量 gotSonarData,告知主线程“新的测量已经入库,可以切片了”。

4. 主融合线程 matchingLoop

  • 执行环境:应用的核心循环线程,永不停歇地处理每一帧视觉关键点。
  • 主要目的
    1. 等待直到 IMU、声纳等所有传感器在本帧时间窗内都有新数据(waitForUpToDateSonarData)。
    2. 切片出恰好对应当前帧时段的声纳测量(getSonarMeasurements(begin,end))。
    3. 将视觉、IMU、声纳(和深度)一起喂给后端优化。

5. Estimator::addStates 中构造 SonarError

  • 执行环境matchingLoop 内部加了互斥锁后执行。
  • 目的
    • 从切片结果里取最新一条声纳测量,转到世界坐标系。
    • 在视觉地图点集中找彼此对应的邻近点,生成测距残差块。
    • SonarError 加入到 Ceres 问题中,让优化同时考虑声纳与视觉一致性。

6. Ceres 优化:联合视觉与声纳约束

  • 执行环境:由后端优化线程或在 addStates 内触发。
  • 目的
    • 最小化所有残差(视觉投影差、IMU 预积分差、声纳测距差、深度差等)。
    • 共同优化相机/IMU 轨迹与地图,使得各传感器观测一致。

7、为什么要这么多线程?

  1. 实时性与低延迟
    • ROS 回调要极快返回,否则会影响硬件 IO。
    • 主融合循环要持续跑,不能被 I/O 或数据处理偶尔卡住。
  2. 解耦与缓冲
    • 回调线程只做最少处理,消费线程把数据“入库”,主线程按需“出库”。
    • 三者各司其职,速率不同也不会相互阻塞。
  3. 数据一致性 & 时序对齐
    • 历史缓存 deque 可以任意按照帧的时间窗切片,保证每帧用到的测量精确对应
    • 条件变量同步器避免主线程提早拿不到或落后拿到数据。

通过这种多线程+队列+同步器的设计,SVln2 能在保证实时响应的同时,又能做到多模态数据精确对齐与融合


网站公告

今日签到

点亮在社区的每一天
去签到