(02)Cartographer源码无死角解析-(25) 阻塞队列BlockingQueue,与OrderedMultiQueue成员函数
admin
2024-02-04 04:52:43
0

讲解关于slam一系列文章汇总链接:史上最全slam从零开始,针对于本栏目讲解(02)Cartographer源码无死角解析-链接如下:
(02)Cartographer源码无死角解析- (00)目录_最新无死角讲解:https://blog.csdn.net/weixin_43013761/article/details/127350885
 
文末正下方中心提供了本人联系方式,点击本人照片即可显示WX→官方认证{\color{blue}{文末正下方中心}提供了本人 \color{red} 联系方式,\color{blue}点击本人照片即可显示WX→官方认证}文末正下方中心提供了本人联系方式,点击本人照片即可显示WX→官方认证
 

一、前言

在上一篇文章中,对 src/cartographer/cartographer/sensor/internal/ordered_multi_queue.cc 中的 class OrderedMultiQueue 还有好些函数没有进行讲解。回顾一下 ordered_multi_queue.h 这个头文件,可以看到之前提及到的结构体

  struct Queue {common::BlockingQueue> queue;   // 存储数据的队列Callback callback;                                    // 本数据队列对应的回调函数bool finished = false;                                // 这个queue是否finished};

那么先来看看 BlockingQueue 的实现,位于 src/cartographer/cartographer/common/internal/blocking_queue.h 文件中。在 src 文件中存在 src/cartographer 与 src/cartographer_ros 两个文件夹,其实 src/cartographer 个独立的工程, src/cartographer_ros 是数据格式进行转换,然后加载到 src/cartographer 的数据队列中即可。也就是说 src/cartographer_ros 类似一个中转站,把传感器数据整理好之后交给 src/cartographer,src/cartographer 就可以正常运转了。

BlockingQueue 其就是一个阻塞队列,也就是生产者与消费者的红线,通过他把 生产者(src/cartographer_ros) 与 消费者( src/cartographer) 联系到了一起,下面简单描述一下:

// A thread-safe blocking queue that is useful for producer/consumer patterns.
// 'T' must be movable.
// 一个线程安全的阻塞队列, 对 生产者/消费者模式 很有用./**为什么要使用生产者消费者模式, 顺序执行不就可以了吗?生产者消费者到底有什么意义?解耦生产者和消费者之间不直接依赖, 通过缓冲区通讯, 将两个类之间的耦合度降到最低。并发 (异步)生产者直接调用消费者, 两者是同步(阻塞)的, 如果消费者吞吐数据很慢, 这时候生产者白白浪费大好时光。而使用这种模式之后, 生产者将数据丢到缓冲区, 继续生产, 完全不依赖消费者, 程序执行效率会大大提高。复用:通过将生产者类和消费者类独立开来, 可以对生产者类和消费者类进行独立的复用与扩展*/

 

二、class BlockingQueue 函数分析

(1)\color{blue}(1)(1) BlockingQueue 是一个模板类,模板参数就是 std::deque 队列存储的数据类类型,其存在成员变量 std::deque deque_ GUARDED_BY(mutex_),表示该队列存储的数据类型为T,且如果 mutex_ 上锁则会阻塞,也就是众多消费者与生产者之中,在一个时刻有且只能有一个线程进行队列操作,从而达到数据保护的目的。

(2)\color{blue}(2)(2) BlockingQueue::Push(T t)的功能是往队列中添加一个数据。当然在添加之前会调用一个 lambda 表示式函数 predicate,判断一下当前队列是否已满,如果满了则会进入阻塞状态。该函数是在 OrderedMultiQueue::Add() 中被调用的。

(3)\color{blue}(3)(3) BlockingQueue::QueueNotFullCondition() 的功能就是判断判断队列的数据是否满了,需要注意的是,如果创建 BlockingQueue 实例时,不传入参数,则 queue_size_ = kInfiniteQueueSize = 0。那么 BlockingQueue::QueueNotFullCondition() 永远都返回 true,也就是说队列永远没有满,无限制大小,默许默认就是这样设计的。

(4)\color{blue}(4)(4)BlockingQueue::Pop() 的功能是取出数据,其先上锁,然后调用 lambda 表达式函数判断数据是否为空,为空则阻塞等待,否则取出数据进行消费。该函数在 OrderedMultiQueue::Dispatch() 中被调用,下面会进行详细的讲解。

(5)\color{blue}(5)(5) BlockingQueue::PopWithTimeout 其功能与 BlockingQueue::Pop() 比较类似,只是他的阻塞是有时间限定的,如果超过了限定的时间,队列中依旧没有数据可以取出,则放回一个 nullptr 指针。
 

三、class BlockingQueue 代码注释

代码于 src/cartographer/cartographer/common/internal/blocking_queue.h 中实现

template 
class BlockingQueue {public:static constexpr size_t kInfiniteQueueSize = 0;// Constructs a blocking queue with infinite queue size.// 构造一个具有无限队列大小的阻塞队列//注意这里的默认构造函数调用了带一个参数的构造函数BlockingQueue() : BlockingQueue(kInfiniteQueueSize) {}BlockingQueue(const BlockingQueue&) = delete; //禁用拷贝构造函数BlockingQueue& operator=(const BlockingQueue&) = delete; //禁用赋值符重载函数// Constructs a blocking queue with a size of 'queue_size'.// 构造一个大小为 queue_size 的阻塞队列,explicit BlockingQueue(const size_t queue_size) : queue_size_(queue_size) {}// Pushes a value onto the queue. Blocks if the queue is full.// 将值压入队列. 如果队列已满, 则阻塞void Push(T t) {// 首先定义判断函数// 因为QueueNotFullCondition()函数会判断队列大小//所以执行该函数时需要上锁,使用 EXCLUSIVE_LOCKS_REQUIRED 声明//如果调用该函数时没有上锁,则会报错const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {return QueueNotFullCondition();};// absl::Mutex的更多信息可看: https://www.jianshu.com/p/d2834abd6796// absl官网: https://abseil.io/about/// 如果数据满了, 就进行等待absl::MutexLock lock(&mutex_); //上锁mutex_.Await(absl::Condition(&predicate)); //阻塞等待// 将数据加入队列, 移动而非拷贝deque_.push_back(std::move(t));}// Like push, but returns false if 'timeout' is reached.// 与Push()类似, 但是超时后返回falsebool PushWithTimeout(T t, const common::Duration timeout) {const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {return QueueNotFullCondition();};absl::MutexLock lock(&mutex_);if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),absl::FromChrono(timeout))) {return false;}deque_.push_back(std::move(t));return true;}// Pops the next value from the queue. Blocks until a value is available.// 取出数据, 如果数据队列为空则进行等待T Pop() {const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {return !QueueEmptyCondition();};// 等待直到数据队列中至少有一个数据absl::MutexLock lock(&mutex_);mutex_.Await(absl::Condition(&predicate));T t = std::move(deque_.front());deque_.pop_front();return t;}// Like Pop, but can timeout. Returns nullptr in this case.// 与Pop()类似, 但是超时后返回nullptrT PopWithTimeout(const common::Duration timeout) {const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {return !QueueEmptyCondition();};absl::MutexLock lock(&mutex_);if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),absl::FromChrono(timeout))) {return nullptr;}T t = std::move(deque_.front());deque_.pop_front();return t;}// Like Peek, but can timeout. Returns nullptr in this case.// 与Peek()类似, 但是超时后返回nullptrtemplate R* PeekWithTimeout(const common::Duration timeout) {const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {return !QueueEmptyCondition();};absl::MutexLock lock(&mutex_);if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),absl::FromChrono(timeout))) {return nullptr;}return deque_.front().get();}// Returns the next value in the queue or nullptr if the queue is empty.// Maintains ownership. This assumes a member function get() that returns// a pointer to the given type R.// 返回第一个数据的指针, 如果队列为空则返回nullptrtemplate const R* Peek() {absl::MutexLock lock(&mutex_);if (deque_.empty()) {return nullptr;}return deque_.front().get();}// Returns the number of items currently in the queue.// 返回当前队列中的项目数size_t Size() {absl::MutexLock lock(&mutex_);return deque_.size();}// Blocks until the queue is empty.// 等待直到队列为空void WaitUntilEmpty() {const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {return QueueEmptyCondition();};absl::MutexLock lock(&mutex_);mutex_.Await(absl::Condition(&predicate));}private:// Returns true iff the queue is empty.// 如果队列为空, 则返回truebool QueueEmptyCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {return deque_.empty();}// Returns true iff the queue is not full.// 如果队列未满, 则返回truebool QueueNotFullCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {return queue_size_ == kInfiniteQueueSize || deque_.size() < queue_size_;}absl::Mutex mutex_;const size_t queue_size_ GUARDED_BY(mutex_);std::deque deque_ GUARDED_BY(mutex_);
};

 

四、OrderedMultiQueue 函数分析

了解了 BlockingQueue() 之后,在继续上一篇的博客,对 OrderedMultiQueue 进行分析。在 OrderedMultiQueue::Add() 函数中,其会调用 it->second.queue.Push(std::move(data)) ,把数据 data 添加到与之对应的 trajectory_id topic 队列之中,然后调用 OrderedMultiQueue::Dispatch() 进行数据的分发。在讲解该函数之前,先来了解如下几个函数:

1、CannotMakeProgress()

// 标记queue_key为阻塞者,并按条件发布log,等等这个数据
void OrderedMultiQueue::CannotMakeProgress(const QueueKey& queue_key) {// 标记queue_key为阻塞者blocker_ = queue_key;for (auto& entry : queues_) {// queue_key对应的数据队列为空,而某一个传感器数据队列的数据已经大于kMaxQueueSize了// 有问题, 进行报错if (entry.second.queue.Size() > kMaxQueueSize) {// 在该语句第1、61、121……次被执行的时候, 记录日志信息LOG_EVERY_N(WARNING, 60) << "Queue waiting for data: " << queue_key;// [ WARN] [1628516438.493835120, 1606808659.273453929]: W0809 21:40:38.000000 10662 ordered_multi_queue.cc:230] Queue waiting for data: (0, points2)// [ WARN] [1628516439.089736487, 1606808659.869309184]: W0809 21:40:39.000000 10662 ordered_multi_queue.cc:230] Queue waiting for data: (0, points2)return;}}
}

该函数一般是在队列为空的时候对用,假若队列 queue_key 为空,则会对其他所有队列进行遍历,查看一下有没有是否存在数据超过 kMaxQueueSize=500,如果有,则会信息打印进行警告,类似如下:

[ WARN] [1628516438.493835120, 1606808659.273453929]: W0809 21:40:38.000000 10662 ordered_multi_queue.cc:230] Queue waiting for data: (0, points2)

一般情况下,如果订阅的话题错了,或者话题发布者没有正常发送消息,则会出现该类打印。

2、CannotMakeProgress()

// 返回阻塞的队列的QueueKey
QueueKey OrderedMultiQueue::GetBlocker() const {CHECK(!queues_.empty());return blocker_;
}

如果目前有数据队列阻塞了,则该队列的 QueueKey 就会被保存在 OrderedMultiQueue::blocker_ 之中,调用该函数即可查看。

3、GetCommonStartTime()

 

五、结语

通过该篇博客,了解到了 BlockingQueue 与 OrderedMultiQueue 的成员函数,但是他们是怎么串联起来的,如何被调用的,或许不是很清楚。不过,没有关系,下一篇博客就会进行详细讲解,其主要设计的函数就是 OrderedMultiQueue::Dispatch()。

 
 
 

相关内容

热门资讯

谁能给我介绍点英文歌? 谁能给我介绍点英文歌?on the floor、halo、rooling in the deep、i...
玛莎给许哲佩《气球》写的推荐文... 玛莎给许哲佩《气球》写的推荐文案玛莎写的真好哈
求一个故事的原文 求一个故事的原文故事没看过,听你一说就知道这故事是教育人:通过自己双手劳动得来的东西才是最好的!名字...
张艺凡拜托粉丝别走,却被质疑卖... 张艺凡拜托粉丝别走,却被质疑卖惨,难道是有什么秘密吗?我觉得其实也没有什么秘密,她可能只是想要去挽回...
洛克王国天赋的影响,真正懂的请... 洛克王国天赋的影响,真正懂的请进!性格影响宠物某些天赋属性(±10%)天赋值是直接加到宠物属性上的,...
取个有创意的餐厅名。 取个有创意的餐厅名。有家餐厅。。宫保鸡丁,以菜名作为店名一家菜馆聚缘庄一级棒餐厅1、新世纪大酒店2、...
关于紫水晶的消磁方法 关于紫水晶的消磁方法可不可以用薰衣草精油来为紫水晶消磁 可以的话该怎么做 如果不可以又是...
曹国藩是一个怎样的人,请详细说... 曹国藩是一个怎样的人,请详细说一下.谢谢.是一个有野心的人 不过又胆小 为人聪明
王鹤棣在新版《流星花园》里饰演... 王鹤棣在新版《流星花园》里饰演的是谁呢?应该饰演的是道明寺,是一个最重要的男主角,其实他的演技还是比...
曾领兵大胜秦国的信陵君,他的军... 曾领兵大胜秦国的信陵君,他的军事水平如何?中上,战国四大君子之一,名气是很高的,但是军事水平来说,战...
给儿童看恐怖片会产生心理阴影吗... 给儿童看恐怖片会产生心理阴影吗?该怎么办?给儿童看恐怖片会产生一定的心理阴影,所以也不要给孩子看这种...
鲁班通过观察想象试验发明了锯,... 鲁班通过观察想象试验发明了锯,从故事中得到了什么启示从故事中得到的启示是:凡事要用心观察。关心生活中...
2-8人去张家界旅游团五天四晚... 最近在计划一次家庭出游,我们一家加上几个朋友一共八个人,准备去张家界来个五天四晚的纯玩团。一开始还在...
甘南旅行宝典,甘南玩法全解析,... 甘南旅行宝典:甘南玩法全解析,99%游客没试过 在甘南,每一片草原都是通向天空的阶梯,每一座寺院都是...
暑假去哪玩丨大王山三大乐园开启... 三湘都市报7月6日讯(全媒体记者 丁鹏志)杂技演员从空中翻腾而下、绸吊仙子在丝带上翩跹起舞、摩托车手...
怎么给孩子数学启蒙?数学启蒙从... 怎么给孩子数学启蒙?数学启蒙从几岁开始最好?我认为6~7岁启蒙最好,这个时候可以给孩子报数学启蒙班来...
7月新疆10天环线游,解锁北疆... 7月新疆10天环线游,解锁北疆绝美风光 7月,当全国各地逐渐被暑气笼罩,新疆北疆却迎来了它一年中最美...
逊克县奇克镇:点燃盛夏狂欢 续... 7月6日,黑龙江省逊克县奇克镇黎明湾界江草原成功举办了一场精彩纷呈的夏日主题活动,以多元的文化体验、...
小学生越来越优秀,需要培养良好... 小学生越来越优秀,需要培养良好的学习习惯,哪几个方面需要重视呢?家长需要重视家庭环境的建设,要让孩子...
黄山5天4晚行程推荐,黄山旅游... 黄山,这座屹立于皖南大地的奇峰峻岭,宛如一幅流动的山水画卷,自古以来便吸引着无数文人墨客为之倾倒。它...