DDS最早应用在美国海军系统,目前已广泛应用于军工、机器人、自动驾驶等领域,国际主流军工厂商、ROS2操作系统等均将DDS作为其控制系统的核心中间件,为各类战舰、机器人等提供可靠、高实时的分布式通信功能。今天就带大家一起了解一下。
DDS(Data Distribution Service),即数据分发服务,是OMG(Object Management Group)对象管理组织发布的分布式通信规范,采用订阅发布模型,以中间件的形式提供通信服务,并提供QoS(Quality of Service)策略,保障数据实时、高效、灵活的分发。
DDS在网络栈中处于传输层的上面,以TCP/UDP为基础:

DDS的相关标准包括核心协议(DDSI-RTPS,DDS-XTypes,DDS-Security,Interface Definition Language (IDL)…),API(DDS C++ API,DDS Java API),拓展协议(DDS-RPC,DDS-XML…)等13份协议集合。在分布式系统中,DDS位于操作系统和应用程序之间,支持多种编程语言以及多种底层协议。

DDS采用以数据为中心的发布-订阅模型DCPS(Data-Centric Publish-Subscribe):

与常见数据通信方式的对比:

- Client-Server:比较常见的TCP、WS、REST等均为这种请求响应模式;
- Broker:消息队列,消息的生产和消费统一连接到broker,常见的MQTT、Kafka消息队列均属于这种模式;
- 广播:每个人都可以在通道上广播和接受消息,通信双方没有直接连接,而是统一接到bus总线上收发,有点像买卖双方都要到跳蚤市场一样稍微有点乱,汽车上常见的CANbus即为这种模式;
- DDS:与广播模型类似,但它可以在通信时只关心自己的消息,而不用看到那些不关心的嘈杂消息;
- DDS 规范 :DDS 规范描述了用于分布式应用程序通信和集成的以数据为中心的发布-订阅 (DCPS) 模型。该规范定义了应用程序接口 (API) 和通信语义(行为和服务质量),它们能够有效地将信息从信息生产者传递到匹配的消费者。DDS 规范的目的可以概括为“在正确的时间将正确的信息高效、稳健地传递到正确的地点”。
- DDSI-RTPS :RTPS(Real Time Publish Subscribe Protocol)协议通过TCP/UDP等实现最大努力可靠的发布-订阅。该规范定义了 DDS 的互操作性协议,其目的和范围是确保基于不同供应商的 DDS 实现的应用程序可以互操作。

DDS本身是OMG维护的标准,具体有很多开源和商业产品实现了这个标准,比如FastRTPS、OpenDDS、OpenSplice DDS、Cyclone DDS等。
DDS最早应用在美国海军系统,用于解决军舰系统复杂网络环境中大量软件间的兼容性问题。目前DDS在欧美已广泛应用于国防军工领域,THALES泰雷兹、EADS欧洲宇航防务、Raytheon雷神、Lockheed Martin洛克希德·马丁等国际主流军工厂商均将DDS作为其各种作战指挥与控制系统的核心中间件,为F124/宙斯盾DDG1000护卫舰、MQ-9捕食者无人机、波音预警机、MK-41舰载导弹发射系统、SSDS舰艇自卫系统、美国和欧洲宇航局发射控制系统等提供可靠、高实时的分布式通信功能。

在机器人领域,传统单机作业模式已发展为群体分布式协同,对于生产作业等环节中的机器人集群,存在相互协作的可靠、灵活、实时、安全的连接需求,在ROS2中也将DDS确定为通信层标准,促进了机器人产业的发展。


自动驾驶汽车是复杂的分布式系统,它结合了视觉、雷达、GPS、导航、规划和控制等组件,这些模块必须组成安全可靠的系统,实时分析复杂的环境并对其作出正确反应。多家自动驾驶平台均采用DDS集成多种复杂易购传感器收集的信息,以支撑其作出及时正确的控制决策。

物联网是一个物-物相连的互联网,需要持久、可靠、高性能网络架构支持。GE多种医疗设备之间已采用DDS进行通信,西门子风电机的巨型涡轮机内部控制、外部通过无线/卫星与发电厂数据传输也已广泛应用。
eprosima Fast DDS(以前称为 Fast-RTPS)是对OMG DDS标准的C++实现,是一个免费开源软件,遵循 Apache License 2.0,被ROS2设定为默认的消息中间件。
Fast-DDS 遵循 RTPS(实时发布订阅)协议,该协议通过不可靠传输(如 UDP)提供高效稳定的发布者-订阅者通信。RTPS 也是为数据分发服务 (DDS) 标准定义的互操作性协议。Fast-DDS公开了一个 API 以直接访问 RTPS 协议,使用户可以完全访问协议内部。
- 实时性:提供可配置的实时、可靠的发布-订阅通信策略,支持同步和异步数据发布模式。
- 服务发现:内置对发布者和订阅者的动态发现机制,即插即用连接,加入或离开网络无需配置。
- 流量控制:支持可配置的吞吐量控制,可限制在特定条件下发送的数据量;
- 传输方式:同时支持UDP/TCP/SHM(共享内存)传输;
- 安全性:提供可配置的安全通信,包括远程参与者的身份验证、实体的访问控制和数据加密等;
- 分布式:通过分布式网络可支持大量实体接入;
- 可移植性:遵循DDS规范、IDL定义等,应用程序只需重新编译就可以在不同DDS实现之间切换;
- 高性能:允许预分配资源,减少动态资源分配,避免无限制使用资源,最小化数据复制的需要;基于eProsima的Fast Buffers序列化,官网说其性能要高于Protocol Buffers和Thrift:

- 跨平台:支持Linux、Mac OS、QNX、Windows、VxWorks、iOS、Android、Raspbian等平台。
- 免费和开源:Fast DDS库、Fast RTPS库、Fast DDS-Gen、内部依赖项(如Fast CDR)和外部依赖项(如foonathan库)都是免费和开源的。

- 应用层:使用Fast DDS API 在分布式系统中实现通信的用户应用程序。
- DDS 层:DDS 通信中间件的实现。它允许部署一个或多个 DDS 域,其中同一域内的域参与者通过在域主题下发布/订阅来交换消息。
- RTPS 层:实现实时发布-订阅 (RTPS) 协议以支持与 DDS 应用程序的互操作性。该层充当传输层的抽象层。
- 传输层:快速 DDS可用于各种传输协议,例如不可靠传输协议 (UDP)、可靠传输协议 (TCP) 或共享内存传输协议 (SHM)。
2.1.Entity实体
Entity是所有 DDS 实体的抽象基类,DDS域下的所有对象都继承自Entity。包括以下类型:
- DomainParticipant:该实体是服务的入口点,充当发布者、订阅者和主题的工厂。
- Publisher:它充当可以创建任意数量的 DataWriters 的工厂。
- Subscriber:它充当可以创建任意数量的 DataReader 的工厂。
- Topic:该实体适合发布和订阅实体之间,并充当渠道。
- DataWriter:是负责数据分发的对象。
- DataReader:是用来访问接收到的数据的对象。
DDS 实体之间的层次结构:

所有实体共享的特征:
- 实体唯一ID:该 ID 在 DDS 实体及其对应的 RTPS 实体之间共享。ID存储在 Entity 基类上声明的 Instance Handle 对象中,可以使用 getter 函数get_instance_handle()访问该对象。
- QoS策略:每个实体都可以使用策略配置,对于每一个实体类型都有对应的QoS类。在实体创建或使用中可以通过::set_qos()方法进行策略配置。
- Listener监听器:所有实体类型都定义了一个抽象侦听器接口,其中包含将实体状态变化触发传递给应用的回调函数。用户可以重写接口实现自己的侦听器回调。
- Status与StatusCondition:每个实体都与一组状态对象相关联,这些状态对象的值表示该实体的通信状态 。这些状态值的更改会触发适当的侦听器回调的调用以异步通知应用程序。详见 、 。
监听器继承关系:

2.2.Policy策略
列几个常见的策略吧,相机的大家可以去看下官方文档:
- DeadlineQosPolicy:有效性策略,主要用于监控新数据的发布/接受频率,低于设定阈值时将发出报警。
- DestinationOrderQosPolicy:保序策略,用于同一个Topic的消息在reader时确保顺序。
- DurabilityQosPolicy:持久性策略,用于控制在没有reader的情况下,writer能否发送消息,以及后加入的reader能否读取加入前topic里的消息。
- EntityFactoryQosPolicy:实体工厂策略,用于disabled工具类的实体类。
- GroupDataQosPolicy:组策略,用于组合实现类似partition分布式的效果。
- HistoryQosPolicy:历史消息丢弃策略,控制是否只保留最新值以及保留的消息数量。
- LatencyBudgetQosPolicy:延迟策略,制定从写入数据到插入DataReader历史记录并通知到回调的最大可接受延迟。
- LivelinessQosPolicy:活跃策略,通过监控末次活跃后的等待时长来确定实体是否还活着。
- OwnershipQosPolicy:所有者策略,允许指定实例只能由一个DataWriter更新。
- PartitionQosPolicy:逻辑分区策略,DataReader只能接收Topic+指定Partition逻辑分区的消息。
- ReliabilityQosPolicy:可靠性策略,是否确保一定送达,并收到DataReader的确认。
2.3.Domain域
可以把域看作是一个虚拟网络,连接同域上运行的所有应用程序,并将它们与运行在不同域上的应用程序隔离开来。这样,多个独立的分布式应用程序可以在同一个物理网络中共存,而不会相互干扰,甚至不会相互感知。
每个域都有一个唯一的标识符,称为 domainId,它被实现为一个值。同 domainId 的应用程序属于同一个域并且能够通信。
2.3.1.Partitions分区
分区在域中引入了另一个实体隔离级别。虽然 DomainParticipant 如果它们在同一个域中将能够相互通信,但仍然可以隔离它们的和 并将它们分配给不同的。
- 与域和主题不同,分区可以在端点的生命周期内以很少的成本动态更改。具体来说,不启动新线程,不分配新内存,不影响更改历史。
- 与 Domain 和 Topic 不同,一个端点可以同时属于多个 Partition。
或对象的 数据成员上配置分区。分区名为字符串,可以使用通配符,但需要注意*通配不匹配默认的无名partition,即同topic尽可能要么pub/sub终端都用partition,要么都不用,尽量不混着用。

2.4.Publisher发布者
消息发布由Publisher和DataWriter共同完成。Publisher通过DataWriter向Topic发布消息,两者通过Listener监听进行异步回调处理。

2.5.Subscriber订阅者
订阅由Subscriber和DataReader共同完成。Subscriber创建DataReader,DataReader绑定Topic,当接收到数据时,通过Listener通知应用进行回调处理。

2.6.Topic主题
Topic用来连接同一类数据的发布方和订阅方。

单个Topic绑定到单个数据类型,所以每条数据可以理解为一个数据类型的实例:

Subscriber在订阅topic时,除了基本的订阅方式外,还可以选择提供数据筛选能力的contentFelteredTopic,它提供类似SQL的过滤参数配置:
你也可以自定义过滤策略,或结合业务场景在DataWriter端进行过滤。
3.1.并发和多线程
Fast DDS实现了一个并发多线程系统。每个DomainParticipant生成一组线程来处理后台任务,例如日志记录、消息接收和异步通信。这不会影响使用该库的方式,即Fast DDS API是线程安全的,因此可以从不同的线程安全的调用同一DomainParticipant上的任何方法。但是,当外部函数访问由库内运行线程修改的资源时,必须考虑这种多线程实现。这方面的一个示例是应用Listener侦听器回调中的修改资源。
Fast DDS多线程调度工作方式:
- 主线程:由应用程序管理。
- 事件线程:每个 DomainParticipant 拥有其中之一。它处理周期性和触发的时间事件。
- 异步写入线程:该线程管理所有 DomainParticipants 的异步写入。即使对于同步写,某些形式的通信也必须在后台启动。
- 接收线程:域参与者为每个接收通道生成一个线程,其中通道的概念取决于传输层(例如 UDP 端口)。
3.2.事件驱动
FastDDS通过一个时间事件系统使来响应某些条件并安排定期执行。由于大多数与 DDS 和 RTPS 元数据有关,因此用户很少看到它们。但是,用户可以通过在他们的应用程序中继承类实现周期性事件。
4.1.自发现协议
自发现协议定义了相同 Topic 下的 DataWriters 和 DataReader 相匹配的机制,以便它们可以共享数据。Fast DDS提供以下发现机制:
- 简单发现:这是默认的发现机制,在 RTPS 标准中定义并提供与其他 DDS 实现的兼容性。在这里,DomainParticipants 是在早期单独发现的,以便随后匹配它们实现的 DataWriter 和 DataReader。
- 发现服务器:这种发现机制使用集中式发现架构,其中服务器充当元流量发现的中心。
- 静态发现:这实现了 DomainParticipant 彼此之间的发现,但如果远程 DomainParticipant 事先知道这些实体,则可以跳过每个 DomainParticipant (DataReader/DataWriter) 中包含的实体的发现。
- 手动发现:此机制仅与 RTPS 层兼容。它允许用户使用其选择的任何外部元信息通道手动匹配和取消匹配 RTPSParticipants、RTPSWriters 和 RTPSReaders。
*注:详细解释和配置可以在查看。
4.2.安全
Fast DDS可通过在三个级别配置来提供安全通信:
- 远程域参与者的身份验证:DDS:Auth:PKI-DH插件使用受信任的证书颁发机构 (CA) 和 ECDSA 数字签名算法提供身份验证,以执行相互身份验证。它还使用椭圆曲线 Diffie-Hellman (ECDH) 密钥协议协议建立共享秘密。
- 实体的访问控制:DDS:Access:Permissions插件在DDS 域和主题级别为 DomainParticipants 提供访问控制。
- 数据加密:DDS:Crypto:AES-GCM-GMAC插件使用伽罗瓦计数器模式 (AES-GCM) 中的高级加密标准 (AES) 提供经过身份验证的加密。
*注:详见 。
4.3.日志
Fast DDS提供了一个可扩展的日志系统。Log类公开了三个宏定义以简化其使用logInfo、logWarning和logError。它使用正则表达式提供按类别过滤,以控制日志的详细程度。日志系统配置的详细信息详见。
如果反复多次尝试先启动再启动时,会发现偶发第1条消息sub端没收到的情况。其原因为subscriber的matched回调可能晚于publisher的matched约70ms,此时如果没有qos策略保障,pub时,sub端可能还没matched,会导致sub端丢失数据。验证pub/sub matched时间差及解决丢数据问题的方法如下:
5.3.使用idl生成应用层代码
代码太长不在这里贴了,简单来说、主要做了这么两件事:
- 定义participant成员、subscriber订阅/publisher发布、data reader读/writer写、topic、type(idl类型)、listener监听器,并在init()时初始化(关系链与fastdds架构图相同),在run()时进行消息的pub/sub处理。
- 重写data reader/writer listener监听器的on_publication_matched、on_subscription_matched、on_data_available回调方法,在发布或新消息到来时,进行相应的处理。
传输的消息格式类似protobuf,在.idl内定义后,通过fastddsgen自动生成对应类,在应用中使用即可。
DDS在服务质量(QoS)上提供非常多的保障途径,这也是它适用于国防军事、工业控制这些高可靠性、可安全性应用领域的原因。但这些应用都工作在有线网络下,在无线网络,特别是资源受限的情况下应用还比较少见。作为一个跨域通信中间件协议,其非常适合需要同时支持进程内、进程间、跨域通信 并对实时性和可靠性要求较高的场景。
好了,今天先到这里,接下来我们将继续深入的研究下FastDDS的底层实现机制。
参考:
一文读懂“数据分发服务DDS”(Data Distribution Service,RTPS,OMG)
DDS与FastRTPS
eProsima/Fast-DDS
Fast RTPS Installation from Sources
mac下cmake安装、配置和使用
Fast RTPS与Cyclone DDS与OpenSplice DDS对比测试
版权声明:
本文来源网络,所有图片文章版权属于原作者,如有侵权,联系删除。
本文网址:https://www.mushiming.com/mjsbk/4873.html
