一、整体数据流概览
下面用一个简洁的 ASCII 流程图来表达各部分的关系,其中用 粗体 标出一直在执行的主线(matchingLoop
):
ROS 发布声纳原始数据
│
▼
1. sonarCallback 回调
│
▼
2. sonarMeasurementsReceived_ (线程安全队列)
│
├─[sonarConsumerLoop 线程]──►3. sonarMeasurements_ (历史缓存 Deque)
│ │
│ └─ gotSonarData 通知同步器
│
▼
**主融合线程 matchingLoop(循环执行)**
│
▼
4. waitForUpToDateSonarData → getSonarMeasurements(begin, end)
│
▼
5. Estimator::addStates 中取最新测量 → 构造 SonarError 残差块
│
▼
6. Ceres 优化:联合视觉与声纳约束
│
└───────────────┐
▼
(返回主融合线程,继续下一帧循环)
- 第一部分:ROS 回调线程接收原始数据并推入
sonarMeasurementsReceived_
。 - 第二部分:
sonarConsumerLoop
从接收队列消费,写入sonarMeasurements_
,并通过gotSonarData
通知主线程。 - 主线:
matchingLoop
永远在循环:- 等待同步完的声纳数据
- 切片取出本帧所需的声纳测量
- 在
addStates
中构造SonarError
- 与视觉残差一起交给 Ceres 优化
- 回到循环开头,处理下一帧。
二、关键模块 & 代码对应
功能 | 模块/函数 | 文件位置 | 作用描述 |
---|---|---|---|
1. 声纳回调 | Subscriber::sonarCallback |
subscriber.cpp |
接收 ROS 消息,量化、剔除噪声后推入接收队列 |
2. 接收队列缓冲 | ThreadedKFVio::sonarMeasurementsReceived_ |
ThreadedKFVio.h / .cpp |
线程安全队列,临时存储所有进来的 sonar 测量 |
3. 消费 & 历史缓存 | ThreadedKFVio::sonarConsumerLoop |
ThreadedKFVio.cpp |
从接收队列拉取数据,加锁后写入 sonarMeasurements_ deque |
4. 数据同步 & 切片 | matchingLoop 中的同步逻辑 |
ThreadedKFVio.cpp |
等待到足够新数据 → 调用 getSonarMeasurements(begin,end) |
5. 数据切片函数 | ThreadedKFVio::getSonarMeasurements |
ThreadedKFVio.cpp |
对 sonarMeasurements_ 加锁遍历,返回时间窗内测量队列 |
6. 添加残差到优化 | Estimator::addStates(...) |
Estimator.cpp |
取最新一条测量 → 转世界坐标 → 匹配视觉地图 → 添加 SonarError |
三、各部分详解
1. ROS 回调:sonarCallback
void sonarCallback(const imagenex831l::ProcessedRange::ConstPtr& msg);
// @Sharmin
/**
* @brief 处理 Imagenex 831L 声呐的 ProcessedRange 消息
* @param msg 经过驱动预处理后的声呐距离强度数据
*
* 主要流程:
* 1. 将极化角度 head_position 从度转弧度;
* 2. 找到强度数组中的最大值(排除最后 100 个采样点,以忽略远距离噪声);
* 3. 计算对应的绝对距离 range;
* 4. 若目标在 4.5 m 内且反射强度足够大,则注入 VIO/SLAM 后端。
*/
void Subscriber::sonarCallback(const imagenex831l::ProcessedRange::ConstPtr& msg) {
// 1. 量化分辨率(每个 bin 对应的实际距离,单位: 米)
double rangeResolution = msg->max_range / msg->intensity.size();
// 2. 寻找强度最大值
int max = 0;
int maxIndex = 0;
//
// 说明:实验时将 max_range 设置得较高,导致末端采样点中充满噪声;
// 这里排除末尾 100 个采样点,提高可靠性。
// @Sharmin: discarding as range was set higher (which introduced some noisy data) during data collection
for (unsigned int i = 0; i < msg->intensity.size() - 100; i++) {
if (msg->intensity[i] > max) {
max = msg->intensity[i];
maxIndex = i;
}
}
double range = (maxIndex + 1) * rangeResolution;
double heading = (msg->head_position * M_PI) / 180;
// 5. 过滤:只保留 4.5 m 内、强度 > 10 的目标
// No magic no!! within 4.5 meter
if (range < 4.5 && max > 10) {
// 6. 将时间戳封装为 okvis::Time,送入后端做状态更新
vioInterface_->addSonarMeasurement(okvis::Time(msg->header.stamp.sec, msg->header.stamp.nsec), range, heading);
// 径向距离 range 和极化角度 heading 作为声呐测量值,注入 VIO 后端
}
}
- 剔除末端噪声:遍历到
size()-100
- 量化:
range = index * resolution
- 入队:两种模式(Blocking/NonBlocking)
2. addSonarMeasurement
// @Sharmin
// Add a Sonar measurement.
//==============================================================================
// ThreadedKFVio::addSonarMeasurement()
//
// 将声呐量测封装后送入线程安全队列,供后端优化线程异步消费。
// • 参数 stamp —— 绝对时间戳(okvis::Time)
// • 参数 range —— 声呐测得的径向距离(米)
// • 参数 heading —— 声呐当前方位角(弧度)
//
// 若 VIO 处于阻塞模式 (blocking_ == true) :
// - 使用 PushBlockingIfFull();队列满时当前线程会等待,确保不丢帧。
// - 返回值固定 true 表示已成功压入。
// 否则(非阻塞模式):
// - 使用 PushNonBlockingDroppingIfFull();队列满则丢弃最旧元素,保持实时性。
// - 返回值为队列在压入后是否恰好只剩 1 个元素,
// 可用于触发后端线程“刚从空变非空”的唤醒逻辑。
//==============================================================================
bool ThreadedKFVio::addSonarMeasurement(const okvis::Time& stamp, double range, double heading) {
okvis::SonarMeasurement sonar_measurement;
sonar_measurement.timeStamp = stamp;
sonar_measurement.measurement.range = range;
sonar_measurement.measurement.heading = heading;
if (blocking_) {
sonarMeasurementsReceived_.PushBlockingIfFull(sonar_measurement, 1);
return true;
} else {
sonarMeasurementsReceived_.PushNonBlockingDroppingIfFull(sonar_measurement,
maxImuInputQueueSize_); // Running same rate as imu
return sonarMeasurementsReceived_.Size() == 1;
}
}
2. 消费线程:sonarConsumerLoop
# ThreadedKFVio.cpp
// 如果 sonar被使用,启动sonarConsumerThread_线程
if (parameters_.sensorList.isSonarUsed) {
sonarConsumerThread_ = std::thread(&ThreadedKFVio::sonarConsumerLoop, this); // @Sharmin
}
// @Sharmin
// Loop to process sonar measurements.
void ThreadedKFVio::sonarConsumerLoop() {
okvis::SonarMeasurement data; // 声纳测量数据结构
TimerSwitchable processSonarTimer("0 processSonarMeasurements", true); // 创建一个计时器,用于测量处理声纳数据的时间
for (;;) { // 无限循环,持续处理声纳数据
// get data and check for termination request
if (sonarMeasurementsReceived_.PopBlocking(&data) == false) return; // 阻塞式从队列中获取声纳数据,如果无法获取,则退出循环
processSonarTimer.start(); // 开始计时处理声纳数据
okvis::Time start; // 开始时间戳
const okvis::Time* end; // 结束时间戳的指针,不需要复制数据
{ // 使用大括号定义一个局部作用域,确保锁的及时释放
std::lock_guard<std::mutex> sonarLock(sonarMeasurements_mutex_); // 使用互斥锁保护声纳数据结构,防止数据竞争
// 确保接收的声纳测量数据的时间戳是递增的
OKVIS_ASSERT_TRUE(Exception,
sonarMeasurements_.empty() || sonarMeasurements_.back().timeStamp < data.timeStamp,
"Sonar measurement from the past received");
if (sonarMeasurements_.size() > 0)
start = sonarMeasurements_.back().timeStamp; // 如果已有数据,则开始时间戳为上一个数据的时间戳
else
start = okvis::Time(0, 0); // 如果没有数据,则开始时间戳设置为0
end = &data.timeStamp; // 设置结束时间戳为当前数据的时间戳
sonarMeasurements_.push_back(data); // 将新的声纳数据加入列表
} // 解锁 sonarMeasurements_mutex_
// 通知其他线程新的声纳数据已经到达
sonarFrameSynchronizer_.gotSonarData(data.timeStamp);
processSonarTimer.stop(); // 停止计时器
}
}
3. 数据同步与切片:matchingLoop
// Loop that matches frames with existing frames.
void ThreadedKFVio::matchingLoop() {
TimerSwitchable prepareToAddStateTimer("2.1 prepareToAddState", true);
TimerSwitchable waitForOptimizationTimer("2.2 waitForOptimization", true);
TimerSwitchable addStateTimer("2.3 addState", true);
TimerSwitchable matchingTimer("2.4 matching", true);
for (;;) {
// get new frame
std::shared_ptr<okvis::MultiFrame> frame;
// get data and check for termination request
if (keypointMeasurements_.PopBlocking(&frame) == false) return;
prepareToAddStateTimer.start();
// -- get relevant imu messages for new state
okvis::Time imuDataEndTime = frame->timestamp() + temporal_imu_data_overlap;
okvis::Time imuDataBeginTime = lastAddedStateTimestamp_ - temporal_imu_data_overlap;
OKVIS_ASSERT_TRUE_DBG(Exception,
imuDataBeginTime < imuDataEndTime,
"imu data end time is smaller than begin time."
<< "current frametimestamp " << frame->timestamp() << " (id: " << frame->id()
<< "last timestamp " << lastAddedStateTimestamp_
<< " (id: " << estimator_.currentFrameId());
// wait until all relevant imu messages have arrived and check for termination request
if (imuFrameSynchronizer_.waitForUpToDateImuData(okvis::Time(imuDataEndTime)) == false) return;
OKVIS_ASSERT_TRUE_DBG(Exception,
imuDataEndTime < imuMeasurements_.back().timeStamp,
"Waiting for up to date imu data seems to have failed!");
// TODO(Sharmin): check if needed to wait until all relevant sonar messages
okvis::ImuMeasurementDeque imuData = getImuMeasurments(imuDataBeginTime, imuDataEndTime);
prepareToAddStateTimer.stop();
// if imu_data is empty, either end_time > begin_time or
// no measurements in timeframe, should not happen, as we waited for measurements
if (imuData.size() == 0) continue;
// @Sharmin
// Sonar Data
// 初始化存储声纳测量数据的双端队列
okvis::SonarMeasurementDeque sonarData;
// 检查系统配置是否启用了声纳传感器
if (parameters_.sensorList.isSonarUsed) {
// 获取当前帧的时间戳作为声纳数据获取的结束时间
okvis::Time sonarDataEndTime = frame->timestamp();
// 获取上一个添加状态的时间戳作为声纳数据获取的开始时间
okvis::Time sonarDataBeginTime = lastAddedStateTimestamp_;
// 断言:确保声纳数据的开始时间小于结束时间
OKVIS_ASSERT_TRUE_DBG(
Exception, sonarDataBeginTime < sonarDataEndTime, "sonar data end time is smaller than begin time.");
// 等待直到所有相关的声纳消息都已到达,并检查是否有终止请求
if (sonarFrameSynchronizer_.waitForUpToDateSonarData(okvis::Time(sonarDataEndTime)) == false) {
// 如果等待失败,则返回
return;
}
// 再次断言:确保等待后获取的声纳数据的时间戳小于当前的最新时间戳
OKVIS_ASSERT_TRUE_DBG(Exception,
sonarDataEndTime < sonarMeasurements_.back().timeStamp,
"Waiting for up to date sonar data seems to have failed!");
// 获取指定时间范围内的声纳测量数据
sonarData = getSonarMeasurements(sonarDataBeginTime, sonarDataEndTime);
// 停止状态添加准备计时器(此处代码被注释)
// 如果获取的声纳数据为空,则跳过当前循环的后续步骤
if (sonarData.size() == 0) continue;
}
// Depth data
// necessary
{
waitForOptimizationTimer.start();
std::unique_lock<std::mutex> l(estimator_mutex_);
while (!optimizationDone_) optimizationNotification_.wait(l);
waitForOptimizationTimer.stop();
addStateTimer.start();
okvis::Time t0Matching = okvis::Time::now();
bool asKeyframe = false;
// @Sharmin
if (estimator_.addStates(frame, imuData, parameters_, sonarData, depthData, firstDepth_, asKeyframe)) {
lastAddedStateTimestamp_ = frame->timestamp();
addStateTimer.stop();
} else {
LOG(ERROR) << "Failed to add state! will drop multiframe.";
addStateTimer.stop();
continue;
}
// -- matching keypoints, initialising landmarks etc.
okvis::kinematics::Transformation T_WS;
estimator_.get_T_WS(frame->id(), T_WS);
matchingTimer.start();
frontend_.dataAssociationAndInitialization(estimator_, T_WS, parameters_, map_, frame, &asKeyframe);
matchingTimer.stop();
if (asKeyframe) estimator_.setKeyframe(frame->id(), asKeyframe);
if (!blocking_) {
double timeLimit =
parameters_.optimization.timeLimitForMatchingAndOptimization - (okvis::Time::now() - t0Matching).toSec();
estimator_.setOptimizationTimeLimit(std::max<double>(0.0, timeLimit), parameters_.optimization.min_iterations);
}
optimizationDone_ = false;
} // unlock estimator_mutex_
// use queue size 1 to propagate a congestion to the _matchedFrames queue
if (matchedFrames_.PushBlockingIfFull(frame, 1) == false) return;
}
}
- 等待:
waitForUpToDateSonarData
阻塞直到newestSonarDataStamp_ ≥ t_end
- 切片:仅取
[t_begin, t_end]
时间窗内的测量
4. 切片实现:getSonarMeasurements
okvis::SonarMeasurementDeque ThreadedKFVio::getSonarMeasurements(okvis::Time& sonarDataBeginTime,
okvis::Time& sonarDataEndTime) {
// sanity checks:
// if end time is smaller than begin time, return empty queue.
// if begin time is larger than newest sonar time, return empty queue.
if (sonarDataEndTime < sonarDataBeginTime || sonarDataBeginTime > sonarMeasurements_.back().timeStamp)
return okvis::SonarMeasurementDeque();
std::lock_guard<std::mutex> lock(sonarMeasurements_mutex_);
// get iterator to sonar data before previous frame
okvis::SonarMeasurementDeque::iterator first_sonar_package = sonarMeasurements_.begin();
okvis::SonarMeasurementDeque::iterator last_sonar_package = sonarMeasurements_.end();
// TODO(sharmin) go backwards through queue. Is probably faster.
for (auto iter = sonarMeasurements_.begin(); iter != sonarMeasurements_.end(); ++iter) {
// move first_sonar_package iterator back until iter->timeStamp is higher than requested begintime
if (iter->timeStamp <= sonarDataBeginTime) first_sonar_package = iter;
// set last_sonar_package iterator as soon as we hit first timeStamp higher than requested endtime & break
if (iter->timeStamp >= sonarDataEndTime) {
last_sonar_package = iter;
// since we want to include this last sonar measurement in returned Deque we
// increase last_sonar_package iterator once.
++last_sonar_package;
break;
}
}
// create copy of sonar buffer
return okvis::SonarMeasurementDeque(first_sonar_package, last_sonar_package);
}
5. 构造 & 添加残差:addStates
bool Estimator::addStates(okvis::MultiFramePtr multiFrame,
const okvis::ImuMeasurementDeque& imuMeasurements,
const okvis::VioParameters& params, /* @Sharmin */
const okvis::SonarMeasurementDeque& sonarMeasurements, /* @Sharmin */
const okvis::DepthMeasurementDeque& depthMeasurements,
double firstDepth, /* @Sharmin */
bool asKeyframe) {
// Note Sharmin: this is for imu propagation no matter isScaleRefined_ is true/false.
// TODO(Sharmin): Start actual optimization when isScaleRefined_ = true.
okvis::kinematics::Transformation T_WS;
okvis::SpeedAndBias speedAndBias;
if (statesMap_.empty()) {
// in case this is the first frame ever, let's initialize the pose:
bool success0 = initPoseFromImu(imuMeasurements, T_WS);
OKVIS_ASSERT_TRUE_DBG(Exception, success0, "pose could not be initialized from imu measurements.");
if (!success0) return false;
// Sharmin
if (multiFrame->numKeypoints() > 15) {
std::cout << "Initialized!! As enough keypoints are found" << std::endl;
std::cout << "Initial T_WS: " << T_WS.parameters();
} else {
return false;
}
// End Sharmin
speedAndBias.setZero();
speedAndBias.segment<3>(6) = imuParametersVec_.at(0).a0;
} else {
// get the previous states
uint64_t T_WS_id = statesMap_.rbegin()->second.id; // n-th state
uint64_t speedAndBias_id =
statesMap_.rbegin()->second.sensors.at(SensorStates::Imu).at(0).at(ImuSensorStates::SpeedAndBias).id;
OKVIS_ASSERT_TRUE_DBG(
Exception, mapPtr_->parameterBlockExists(T_WS_id), "this is an okvis bug. previous pose does not exist.");
T_WS = std::static_pointer_cast<ceres::PoseParameterBlock>(mapPtr_->parameterBlockPtr(T_WS_id))->estimate();
// OKVIS_ASSERT_TRUE_DBG(
// Exception, speedAndBias_id,
// "this is an okvis bug. previous speedAndBias does not exist.");
speedAndBias =
std::static_pointer_cast<ceres::SpeedAndBiasParameterBlock>(mapPtr_->parameterBlockPtr(speedAndBias_id))
->estimate();
// propagate pose and speedAndBias
// @Sharmin: using only imu measurements
// Modified to add scale
// Eigen::Matrix<double, 15, 15> covariance; // not used
// Eigen::Matrix<double, 15, 15> jacobian; // not used
Eigen::Vector3d acc_doubleinteg;
Eigen::Vector3d acc_integ;
double Del_t;
int numUsedImuMeasurements = ceres::ImuError::propagation(imuMeasurements,
imuParametersVec_.at(0),
T_WS,
speedAndBias,
statesMap_.rbegin()->second.timestamp,
multiFrame->timestamp(),
0,
0,
acc_doubleinteg,
acc_integ,
Del_t);
OKVIS_ASSERT_TRUE_DBG(Exception, numUsedImuMeasurements > 1, "propagation failed");
if (numUsedImuMeasurements < 1) {
LOG(INFO) << "numUsedImuMeasurements=" << numUsedImuMeasurements;
return false;
}
// Added by Sharmin
setImuPreIntegral(multiFrame->id(), acc_doubleinteg, acc_integ, Del_t);
}
// create a states object:
States states(asKeyframe, multiFrame->id(), multiFrame->timestamp());
// Added by Sharmin
stateCount_ = stateCount_ + 1;
// LOG (INFO) << "No. of state created: "<< stateCount_;
// LOG (INFO) << "statesMap_ size: "<< statesMap_.size();
// check if id was used before
OKVIS_ASSERT_TRUE_DBG(
Exception, statesMap_.find(states.id) == statesMap_.end(), "pose ID" << states.id << " was used before!");
// create global states
std::shared_ptr<okvis::ceres::PoseParameterBlock> poseParameterBlock(
new okvis::ceres::PoseParameterBlock(T_WS, states.id, multiFrame->timestamp()));
states.global.at(GlobalStates::T_WS).exists = true;
states.global.at(GlobalStates::T_WS).id = states.id;
if (statesMap_.empty()) {
referencePoseId_ = states.id; // set this as reference pose
if (!mapPtr_->addParameterBlock(poseParameterBlock, ceres::Map::Pose6d)) {
return false;
}
} else {
if (!mapPtr_->addParameterBlock(poseParameterBlock, ceres::Map::Pose6d)) {
return false;
}
}
// End @Sharmin
// add to buffer
statesMap_.insert(std::pair<uint64_t, States>(states.id, states));
multiFramePtrMap_.insert(std::pair<uint64_t, okvis::MultiFramePtr>(states.id, multiFrame));
// the following will point to the last states:
std::map<uint64_t, States>::reverse_iterator lastElementIterator = statesMap_.rbegin();
lastElementIterator++;
// cameras:
for (size_t i = 0; i < extrinsicsEstimationParametersVec_.size(); ++i) {
SpecificSensorStatesContainer cameraInfos(2);
cameraInfos.at(CameraSensorStates::T_SCi).exists = true;
cameraInfos.at(CameraSensorStates::Intrinsics).exists = false;
if (((extrinsicsEstimationParametersVec_.at(i).sigma_c_relative_translation < 1e-12) ||
(extrinsicsEstimationParametersVec_.at(i).sigma_c_relative_orientation < 1e-12)) &&
(statesMap_.size() > 1)) {
// use the same block...
cameraInfos.at(CameraSensorStates::T_SCi).id =
lastElementIterator->second.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id;
} else {
const okvis::kinematics::Transformation T_SC = *multiFrame->T_SC(i);
uint64_t id = IdProvider::instance().newId();
std::shared_ptr<okvis::ceres::PoseParameterBlock> extrinsicsParameterBlockPtr(
new okvis::ceres::PoseParameterBlock(T_SC, id, multiFrame->timestamp()));
if (!mapPtr_->addParameterBlock(extrinsicsParameterBlockPtr, ceres::Map::Pose6d)) {
return false;
}
cameraInfos.at(CameraSensorStates::T_SCi).id = id;
}
// update the states info
statesMap_.rbegin()->second.sensors.at(SensorStates::Camera).push_back(cameraInfos);
states.sensors.at(SensorStates::Camera).push_back(cameraInfos);
}
// IMU states are automatically propagated.
for (size_t i = 0; i < imuParametersVec_.size(); ++i) {
SpecificSensorStatesContainer imuInfo(2);
imuInfo.at(ImuSensorStates::SpeedAndBias).exists = true;
uint64_t id = IdProvider::instance().newId();
std::shared_ptr<okvis::ceres::SpeedAndBiasParameterBlock> speedAndBiasParameterBlock(
new okvis::ceres::SpeedAndBiasParameterBlock(speedAndBias, id, multiFrame->timestamp()));
if (!mapPtr_->addParameterBlock(speedAndBiasParameterBlock)) {
return false;
}
imuInfo.at(ImuSensorStates::SpeedAndBias).id = id;
statesMap_.rbegin()->second.sensors.at(SensorStates::Imu).push_back(imuInfo);
states.sensors.at(SensorStates::Imu).push_back(imuInfo);
}
// @Sharmin
// Depth
if (depthMeasurements.size() != 0) {
// Though there should not be more than one depth data
double mean_depth = 0.0;
for (auto depthMeasurements_it = depthMeasurements.begin(); depthMeasurements_it != depthMeasurements.end();
++depthMeasurements_it) {
mean_depth += depthMeasurements_it->measurement.depth;
}
mean_depth = mean_depth / depthMeasurements.size();
double information_depth = 5.0; // TODO(Sharmin) doublre check with the manual
std::shared_ptr<ceres::DepthError> depthError(new ceres::DepthError(mean_depth, information_depth, firstDepth));
mapPtr_->addResidualBlock(depthError, NULL, poseParameterBlock);
std::cout << "Residual block z: " << (*poseParameterBlock->parameters()) + 2 << std::endl;
}
// Sonar
std::vector<Eigen::Vector3d> landmarkSubset;
Eigen::Vector3d sonar_landmark;
double range = 0.0, heading = 0.0;
// @Sharmin
if (sonarMeasurements.size() != 0) {
auto last_sonarMeasurement_it = sonarMeasurements.rbegin();
// Taking the nearest range value to the n+1 th frame
range = last_sonarMeasurement_it->measurement.range;
heading = last_sonarMeasurement_it->measurement.heading;
okvis::kinematics::Transformation T_WSo = T_WS * params.sonar.T_SSo;
okvis::kinematics::Transformation sonar_point(Eigen::Vector3d(range * cos(heading), range * sin(heading), 0.0),
Eigen::Quaterniond(1.0, 0.0, 0.0, 0.0));
okvis::kinematics::Transformation T_WSo_point = T_WSo * sonar_point;
sonar_landmark = T_WSo_point.r();
// std::cout << "T_WSo: " << T_WSo.r() << std::endl;
// std::cout << "T_WSo_point: " << sonar_landmark << std::endl;
// may be the reverse searching is faster
for (PointMap::const_reverse_iterator rit = landmarksMap_.rbegin(); rit != landmarksMap_.rend(); ++rit) {
Eigen::Vector3d visual_landmark;
if (fabs(rit->second.point[3]) > 1.0e-8) {
visual_landmark = (rit->second.point / rit->second.point[3]).head<3>();
// LOG (INFO) << "Visual Landmark: " << visual_landmark;
}
// double distance_from_sonar = (sonar_landmark - visual_landmark).norm(); //Euclidean distance
if (fabs(sonar_landmark[0] - visual_landmark[0]) < 0.1 && fabs(sonar_landmark[1] - visual_landmark[1]) < 0.1 &&
fabs(sonar_landmark[2] - visual_landmark[2]) < 0.1) {
// TODO(sharmin) parameter!!
// searching around 10 cm of sonar landmark
landmarkSubset.push_back(visual_landmark);
}
}
// std::cout << "Size of visual patch: "<<landmarkSubset.size() << std::endl;
if (landmarkSubset.size() > 0) {
// LOG (INFO) << " Sonar added for ceres optimization";
// @Sharmin
// add sonarError and related addResidualBlock
double information_sonar = 1.0; // TODO(sharmin) calculate properly?
std::shared_ptr<ceres::SonarError> sonarError(
new ceres::SonarError(params, range, heading, information_sonar, landmarkSubset));
mapPtr_->addResidualBlock(sonarError, NULL, poseParameterBlock);
}
// End @Sharmin
}
// depending on whether or not this is the very beginning, we will add priors or relative terms to the last state:
if (statesMap_.size() == 1) {
// let's add a prior
Eigen::Matrix<double, 6, 6> information = Eigen::Matrix<double, 6, 6>::Zero();
information(5, 5) = 1.0e8;
information(0, 0) = 1.0e8;
information(1, 1) = 1.0e8;
information(2, 2) = 1.0e8;
std::shared_ptr<ceres::PoseError> poseError(new ceres::PoseError(T_WS, information));
/*auto id2= */ mapPtr_->addResidualBlock(poseError, NULL, poseParameterBlock);
// mapPtr_->isJacobianCorrect(id2,1.0e-6);
// sensor states
for (size_t i = 0; i < extrinsicsEstimationParametersVec_.size(); ++i) {
double translationStdev = extrinsicsEstimationParametersVec_.at(i).sigma_absolute_translation;
double translationVariance = translationStdev * translationStdev;
double rotationStdev = extrinsicsEstimationParametersVec_.at(i).sigma_absolute_orientation;
double rotationVariance = rotationStdev * rotationStdev;
if (translationVariance > 1.0e-16 && rotationVariance > 1.0e-16) {
const okvis::kinematics::Transformation T_SC = *multiFrame->T_SC(i);
std::shared_ptr<ceres::PoseError> cameraPoseError(
new ceres::PoseError(T_SC, translationVariance, rotationVariance));
// add to map
mapPtr_->addResidualBlock(
cameraPoseError,
NULL,
mapPtr_->parameterBlockPtr(states.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id));
// mapPtr_->isJacobianCorrect(id,1.0e-6);
} else {
mapPtr_->setParameterBlockConstant(
states.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id);
}
}
for (size_t i = 0; i < imuParametersVec_.size(); ++i) {
Eigen::Matrix<double, 6, 1> variances;
// get these from parameter file
const double sigma_bg = imuParametersVec_.at(0).sigma_bg;
const double sigma_ba = imuParametersVec_.at(0).sigma_ba;
std::shared_ptr<ceres::SpeedAndBiasError> speedAndBiasError(
new ceres::SpeedAndBiasError(speedAndBias, 1.0, sigma_bg * sigma_bg, sigma_ba * sigma_ba));
// add to map
mapPtr_->addResidualBlock(
speedAndBiasError,
NULL,
mapPtr_->parameterBlockPtr(states.sensors.at(SensorStates::Imu).at(i).at(ImuSensorStates::SpeedAndBias).id));
// mapPtr_->isJacobianCorrect(id,1.0e-6);
}
// @Sharmin
// End @Sharmin
} else {
// add IMU error terms
for (size_t i = 0; i < imuParametersVec_.size(); ++i) {
std::shared_ptr<ceres::ImuError> imuError(new ceres::ImuError(
imuMeasurements, imuParametersVec_.at(i), lastElementIterator->second.timestamp, states.timestamp));
/*::ceres::ResidualBlockId id = */ mapPtr_->addResidualBlock(
imuError,
NULL,
mapPtr_->parameterBlockPtr(lastElementIterator->second.id),
mapPtr_->parameterBlockPtr(
lastElementIterator->second.sensors.at(SensorStates::Imu).at(i).at(ImuSensorStates::SpeedAndBias).id),
mapPtr_->parameterBlockPtr(states.id),
mapPtr_->parameterBlockPtr(states.sensors.at(SensorStates::Imu).at(i).at(ImuSensorStates::SpeedAndBias).id));
// imuError->setRecomputeInformation(false);
// mapPtr_->isJacobianCorrect(id,1.0e-9);
// imuError->setRecomputeInformation(true);
}
// add relative sensor state errors
for (size_t i = 0; i < extrinsicsEstimationParametersVec_.size(); ++i) {
if (lastElementIterator->second.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id !=
states.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id) {
// i.e. they are different estimated variables, so link them with a temporal error term
double dt = (states.timestamp - lastElementIterator->second.timestamp).toSec();
double translationSigmaC = extrinsicsEstimationParametersVec_.at(i).sigma_c_relative_translation;
double translationVariance = translationSigmaC * translationSigmaC * dt;
double rotationSigmaC = extrinsicsEstimationParametersVec_.at(i).sigma_c_relative_orientation;
double rotationVariance = rotationSigmaC * rotationSigmaC * dt;
std::shared_ptr<ceres::RelativePoseError> relativeExtrinsicsError(
new ceres::RelativePoseError(translationVariance, rotationVariance));
mapPtr_->addResidualBlock(
relativeExtrinsicsError,
NULL,
mapPtr_->parameterBlockPtr(
lastElementIterator->second.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id),
mapPtr_->parameterBlockPtr(states.sensors.at(SensorStates::Camera).at(i).at(CameraSensorStates::T_SCi).id));
// mapPtr_->isJacobianCorrect(id,1.0e-6);
}
}
// only camera. this is slightly inconsistent, since the IMU error term contains both
// a term for global states as well as for the sensor-internal ones (i.e. biases).
// TODO(sharmin): magnetometer, pressure, ...
}
return true;
}
四、核心代码块
下面是针对声纳核心部分的精简说明,按调用顺序列出每个代码块的主要内容,重点聚焦声纳数据流与处理。
1. sonarCallback
—— ROS 回调 & 初步滤波
void Subscriber::sonarCallback(const imagenex831l::ProcessedRange::ConstPtr& msg) {
// 1. 量化距离
double resolution = msg->max_range / msg->intensity.size();
// 2. 排除末尾 100 点噪声,找到最大强度及索引
int maxInt = 0, maxIdx = 0;
for (unsigned i = 0; i < msg->intensity.size() - 100; ++i) {
if (msg->intensity[i] > maxInt) {
maxInt = msg->intensity[i];
maxIdx = i;
}
}
// 3. 计算 range 与 heading
double range = (maxIdx + 1) * resolution;
double heading = msg->head_position * M_PI / 180.0;
// 4. 阈值滤波:4.5m 内且强度>10
if (range < 4.5 && maxInt > 10) {
vioInterface_->addSonarMeasurement(
okvis::Time(msg->header.stamp.sec, msg->header.stamp.nsec),
range, heading);
}
}
- 功能:从原始强度数组提取最强反射点;滤除远端噪声;阈值后,将
(timestamp, range, heading)
送入后端队列。
2. addSonarMeasurement
—— 入队策略
bool ThreadedKFVio::addSonarMeasurement(
const okvis::Time& stamp, double range, double heading) {
okvis::SonarMeasurement m{stamp, {range, heading}};
if (blocking_) {
sonarMeasurementsReceived_.PushBlockingIfFull(m, 1);
return true;
} else {
sonarMeasurementsReceived_.PushNonBlockingDroppingIfFull(
m, maxImuInputQueueSize_);
return sonarMeasurementsReceived_.Size() == 1;
}
}
- 功能:将声纳测量封装后写入
sonarMeasurementsReceived_
线程安全队列。 - Blocking 模式:队列满时等待;NonBlocking:丢弃最旧。
3. sonarConsumerLoop
—— 后台消费 & 缓存
void ThreadedKFVio::sonarConsumerLoop() {
okvis::SonarMeasurement data;
for (;;) {
if (!sonarMeasurementsReceived_.PopBlocking(&data)) return;
{
std::lock_guard<std::mutex> lk(sonarMeasurements_mutex_);
// 保证时间戳递增
OKVIS_ASSERT_TRUE(Exception,
sonarMeasurements_.empty() ||
sonarMeasurements_.back().timeStamp < data.timeStamp);
sonarMeasurements_.push_back(data);
}
// 通知主线程:新数据已到达
sonarFrameSynchronizer_.gotSonarData(data.timeStamp);
}
}
- 功能:持续从接收队列取测量,写入带互斥锁的
deque
历史缓冲,并通过条件变量唤醒主线程。
4. getSonarMeasurements
—— 时间窗切片
okvis::SonarMeasurementDeque
ThreadedKFVio::getSonarMeasurements(
okvis::Time& begin, okvis::Time& end) {
if (end < begin || begin > sonarMeasurements_.back().timeStamp)
return {};
std::lock_guard<std::mutex> lk(sonarMeasurements_mutex_);
auto it0 = sonarMeasurements_.begin(), it1 = sonarMeasurements_.end();
for (auto it = sonarMeasurements_.begin(); it != sonarMeasurements_.end(); ++it) {
if (it->timeStamp <= begin) it0 = it;
if (it->timeStamp >= end) { it1 = std::next(it); break; }
}
return SonarMeasurementDeque(it0, it1);
}
- 功能:在
[begin, end]
时间范围内截取声纳测量子队列,供后端融合使用。
5. 主融合线程 matchingLoop
中的声纳片段
// 在 matchingLoop() 循环内
if (parameters_.sensorList.isSonarUsed) {
okvis::Time t_end = frame->timestamp();
okvis::Time t_begin = lastAddedStateTimestamp_;
// 等待最新声纳数据
if (!sonarFrameSynchronizer_.waitForUpToDateSonarData(t_end))
return;
// 切片取值
sonarData = getSonarMeasurements(t_begin, t_end);
if (sonarData.empty()) continue;
}
- 功能:在每帧处理时,先等待直到
sonarMeasurements_
中已有最新的声纳数据,再按帧时间窗切片,否则跳过。
6. Estimator::addStates
—— 构造 SonarError
if (!sonarMeasurements.empty()) {
auto last = sonarMeasurements.rbegin();
double range = last->measurement.range;
double heading = last->measurement.heading;
// 声纳→机体→世界坐标变换
auto T_WSo = T_WS * params.sonar.T_SSo;
auto sonar_pt = Transformation(
Vector3d(range*cos(heading), range*sin(heading), 0.0));
Vector3d landmark = (T_WSo * sonar_pt).r();
// 在视觉地图中匹配
for (auto it = landmarksMap_.rbegin(); it != landmarksMap_.rend(); ++it) {
Vector3d vis = it->second.point3d();
if ((vis - landmark).norm() < 0.1)
subset.push_back(vis);
}
if (!subset.empty()) {
auto err = std::make_shared<ceres::SonarError>(
params, range, heading, 1.0, subset);
mapPtr_->addResidualBlock(err, nullptr, poseParameterBlock);
}
}
- 功能:取最新测量、坐标转换、在视觉地图中找近邻点集,生成
SonarError
并添加到 Ceres 优化。
全流程主线
- 回调 →
addSonarMeasurement
→ - 消费线程 →
sonarMeasurements_
&gotSonarData
→ - matchingLoop
- 等待声纳同步 & 切片 →
addStates
中构造声纳残差(SonarError
) →- 联合视觉、IMU、深度残差交给 Ceres 优化
聚焦声纳部分,这几段代码共同保障了“实时接收、缓存、时序切片、残差构造”的闭环,将声纳观测无缝融合进 VIO 后端。
五、相关线程分析
下面逐步说明为什么要用这些线程,以及每一步的具体目的:
ROS 发布声纳原始数据
│
▼
1. sonarCallback 回调
│
▼
2. sonarMeasurementsReceived_ (线程安全队列)
│
├─[sonarConsumerLoop 线程]──►3. sonarMeasurements_ (历史缓存 Deque)
│ │
│ └─ gotSonarData 通知同步器
│
▼
**主融合线程 matchingLoop(循环执行)**
│
▼
4. waitForUpToDateSonarData → getSonarMeasurements(begin, end)
│
▼
5. Estimator::addStates 中取最新测量 → 构造 SonarError 残差块
│
▼
6. Ceres 优化:联合视觉与声纳约束
│
└───────────────┐
▼
(返回主融合线程,继续下一帧循环)
1. sonarCallback
回调
- 执行环境:由 ROS 的内部线程池或 I/O 线程触发。
- 目的:
- 立刻响应新到的硬件消息(最低延迟)。
- 做简单的滤波/阈值判断,快速决定“这条测量是否有效”。
- 不在此处做复杂或耗时的处理,避免阻塞 ROS 通讯。
2. sonarMeasurementsReceived_
线程安全队列
- 执行环境:共享于回调和消费线程。
- 目的:
- 将“中短期传入的测量”先缓冲起来,平滑峰值流量。
- 解耦“接收”与“处理”两者的速率差异。
- 保证在非阻塞模式下,不丢帧地接收;在阻塞模式下,可按需等待。
3. sonarConsumerLoop
消费线程
- 执行环境:独立后台线程,由
std::thread
启动。 - 目的:
- 异步从接收队列里取出每条测量,避免主融合线程或回调线程做 I/O 操作。
- 将测量追加到 历史缓存
sonarMeasurements_
(deque
),并单独加锁保护。 - 触发条件变量
gotSonarData
,告知主线程“新的测量已经入库,可以切片了”。
4. 主融合线程 matchingLoop
- 执行环境:应用的核心循环线程,永不停歇地处理每一帧视觉关键点。
- 主要目的:
- 等待直到 IMU、声纳等所有传感器在本帧时间窗内都有新数据(
waitForUpToDateSonarData
)。 - 切片出恰好对应当前帧时段的声纳测量(
getSonarMeasurements(begin,end)
)。 - 将视觉、IMU、声纳(和深度)一起喂给后端优化。
- 等待直到 IMU、声纳等所有传感器在本帧时间窗内都有新数据(
5. Estimator::addStates
中构造 SonarError
- 执行环境:
matchingLoop
内部加了互斥锁后执行。 - 目的:
- 从切片结果里取最新一条声纳测量,转到世界坐标系。
- 在视觉地图点集中找彼此对应的邻近点,生成测距残差块。
- 将
SonarError
加入到 Ceres 问题中,让优化同时考虑声纳与视觉一致性。
6. Ceres 优化:联合视觉与声纳约束
- 执行环境:由后端优化线程或在
addStates
内触发。 - 目的:
- 最小化所有残差(视觉投影差、IMU 预积分差、声纳测距差、深度差等)。
- 共同优化相机/IMU 轨迹与地图,使得各传感器观测一致。
7、为什么要这么多线程?
- 实时性与低延迟
- ROS 回调要极快返回,否则会影响硬件 IO。
- 主融合循环要持续跑,不能被 I/O 或数据处理偶尔卡住。
- 解耦与缓冲
- 回调线程只做最少处理,消费线程把数据“入库”,主线程按需“出库”。
- 三者各司其职,速率不同也不会相互阻塞。
- 数据一致性 & 时序对齐
- 历史缓存
deque
可以任意按照帧的时间窗切片,保证每帧用到的测量精确对应。 - 条件变量同步器避免主线程提早拿不到或落后拿到数据。
- 历史缓存
通过这种多线程+队列+同步器的设计,SVln2 能在保证实时响应的同时,又能做到多模态数据精确对齐与融合。