page index
Introduction
The enhanced communication abstraction layer (eCAL) is a middleware that enables scalable, high performance interprocess communication on a single computer node or between different nodes in a computer network. The design is inspired by known Data Distribution Service for Real-Time Systems (see Data distribution service on wikipedia). The current eCAL implementation realizes a subset of such a DDS system, there is no support for Quality of Service (QoS) driven data transport. eCAL is designed for typical cloud computing scenarios where different processes exchange there I/O’s using a publisher/subscriber pattern. The data exchange is based on so called topics. A topic wraps the payload that should be exchanged with additional informations like a unique name, a type and a description. A topic can be connected to more then one publisher and/or subscriber. These are the basic elements of the eCAL API.
-
Topic: The most basic description of the data to be published and subscribed.
-
Publisher: A Publisher is the object responsible for the actual dissemination of publications.
-
Subscriber: A Subscriber is the object responsible for the actual reception of the data resulting from its subscriptions.
-
Callback: A Callback can be used to react on incoming messages.
eCAL is simplifying the data transport as much as possible, It uses different mechanism to transport a topic from a publisher to a connected subscriber. On the same computer node the data are exchanged by using memory mapped files. Between different computer nodes UDP multicast is used to spread the data as fast as possible.
A simple hello world example
Now lets see a minimal publisher/subscriber example exchanging the famous “hello world” message :-). First the publisher ..
1#include <ecal/ecal.h>2
3int main(int argc, char **argv)4{5 // initialize eCAL API6 eCAL::Initialize(argc, argv, "minimal_pub");7
8 // create a publisher (topic name "foo", type "std::string")9 eCAL::CPublisher pub("foo", "std::string");10
11 // send the content12 pub.Send("hello world");13
14 // finalize eCAL API15 eCAL::Finalize();16}
and the minimal subscriber ..
1#include <ecal/ecal.h>2
3int main(int argc, char **argv)4{5 // initialize eCAL API6 eCAL::Initialize(argc, argv, "minimal_sub");7
8 // create a subscriber (topic name "foo", type "std::string")9 eCAL::CSubscriber sub("foo", "std::string");10
11 // receive content12 std::string msg;13 sub.Receive(msg);14
15 // finalize eCAL API16 eCAL::Finalize();17}
Trigger on incoming topics
A callback function can be used to react on events of multiple subscribers. An extended “hello world” example could look like this. First the publisher part
1// create 2 publishers2eCAL::CPublisher pub1("foo1", "std::string");3eCAL::CPublisher pub2("foo2", "std::string");4
5// sending "hello world" on 2 different topics6while(eCAL::Ok())7{8 pub1.Send("hello");9 eCAL::Process::SleepMS(1000);10 pub2.Send("world");11}
We use a callback function to react on incoming topics (with “old school” function pointer)
1// define a subscriber callback function2void OnReceive(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_, const void* par_)3{4 printf("We received %s on topic %s\n.", (char*)data_->buf, topic_name_.c_str());5}6
7// create 2 subscriber8eCAL::CSubscriber sub1("foo1", "std::string");9eCAL::CSubscriber sub2("foo2", "std::string");10
11// register subscriber callback function12sub1.AddReceiveCallback(OnReceive);13sub2.AddReceiveCallback(OnReceive);14
15// idle main thread16while(eCAL::Ok())17{18 // sleep 100 ms19 std::this_thread::sleep_for(std::chrono::milliseconds(100));20}
or we can connect to a callback function using std::bind and a specialized string message subscriber
1// define a subscriber callback function2void OnReceive(const char* topic_name_, const std::string& message_)3{4 printf("We received %s on topic %s\n.", message_.c_str(), topic_name_.c_str());5}6
7// create 2 subscriber8eCAL::string::CSubscriber sub1("foo1");9eCAL::string::CSubscriber sub2("foo2");10
11// register subscriber callback function12auto callback = std::bind(OnReceive, std::placeholders::_1, std::placeholders::_2);13sub1.AddReceiveCallback(callback);14sub2.AddReceiveCallback(callback);15
16// idle main thread17while(eCAL::Ok())18{19 // sleep 100 ms20 std::this_thread::sleep_for(std::chrono::milliseconds(100));21}
(De)Serialization of objects
The eCAL middleware does not provide a mechanism to serialize or deserialize message objects. This has to be implemented in a higher level API. See google flatbuffers, cap’n proto or google protobuf for more details. Here a typical example using google protobuf. First you have to define a so called .proto file for the structure you want to serialize and deserialize. A simple Shape struct will be defined as follows ..
1message Shape2{3 enum ShapeType4 {5 CIRCLE = 0;6 TRIANGLE = 1;7 RECTANGLE = 2;8 }9 required ShapeType type = 1 [default = CIRCLE];10 required int32 size = 2;11}
After compiling that .proto file using the protobuf compiler you will receive a shape.pb.cc and shape.pb.h file that you have to include into your project. The serialization and deserialization is then quite simple ..
1// create a publisher (topic name "shape")2eCAL::protobuf::CPublisher<Shape> pub("shape");3
4// and generate a class instance of Shape5class Shape shape;6
7// modify type and size of the shape object8shape.set_type(Shape_ShapeType_CIRCLE);9shape.set_size(42);10
11// send the shape object12pub.Send(shape);
And last not least the matching deserialization ..
1// create a subscriber (topic name "shape")2eCAL::protobuf::CSubscriber<Shape> sub("shape");3
4// and generate a class instance of Shape5class Shape shape;6
7// receive the shape object8sub.Receive(shape);
A similar way can be used to work with google flatbuffers. The flatbuffer message schema looks like this
1namespace Game.Sample;2
3enum Color:byte { Red = 0, Green, Blue = 2 }4
5union Any { Monster } // add more elements..6
7struct Vec38{9 x:float;10 y:float;11 z:float;12}13
14table Monster15{16 pos:Vec3;17 mana:short = 150;18 hp:short = 100;19 name:string;20 friendly:bool = false (deprecated);21 inventory:[ubyte];22 color:Color = Blue;23}24
25root_type Monster;
After compiling the flatbuffer message we can send it the same way like a protocol buffer.
1// create a publisher (topic name "monster")2eCAL::flatbuffers::CPublisher<flatbuffers::FlatBufferBuilder> pub("monster");3
4// the generic builder instance5flatbuffers::FlatBufferBuilder builder;6
7// generate a class instance of Monster8auto vec = Game::Sample::Vec3(1, 2, 3);9
10auto name = builder.CreateString("Monster");11
12unsigned char inv_data[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };13auto inventory = builder.CreateVector(inv_data, 10);14
15// shortcut for creating monster with all fields set:16auto mloc = Game::Sample::CreateMonster(builder, &vec, 150, 80, name, inventory, Game::Sample::Color_Blue);17
18// finalize the message19builder.Finish(mloc);20
21// and send the monster22pub.Send(builder, -1);
Finally let’s receive it via receive callback. So first we define a callback function for the flatbuffer message.
1void OnMonster(const char* topic_name_, const flatbuffers::FlatBufferBuilder& msg_, const long long time_)2{3 // create monster4 auto monster = Game::Sample::GetMonster(msg_.GetBufferPointer());5
6 // print content7 std::cout << "monster pos x : " << monster->pos()->x() << std::endl;8 std::cout << "monster pos y : " << monster->pos()->y() << std::endl;9 std::cout << "monster pos z : " << monster->pos()->z() << std::endl;10 std::cout << "monster name : " << monster->name()->c_str() << std::endl;11}
and implement the flatbuffer subscriber.
1// create a subscriber (topic name "monster")2eCAL::flatbuffers::CSubscriber<flatbuffers::FlatBufferBuilder> sub("monster");3
4// add receive callback function (_1 = topic_name, _2 = msg, _3 = time)5auto callback = std::bind(OnMonster, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);6sub.AddReceiveCallback(callback);
Catching eCAL events
Beside the message receive callbacks eCAL provides an API to get informed if publisher / subscriber internal events get fired. The following example shows how to catch the connect and disconnect events of an eCAL publisher. First we define the event callback function ..
1void OnEvent(const char* topic_name_, const struct eCAL::SPubEventCallbackData* data_)2{3 std::cout << "topic name : " << topic_name_ << std::endl;4 switch (data_->type)5 {6 case pub_event_connected:7 std::cout << "event : " << "pub_event_connected" << std::endl;8 break;9 case pub_event_disconnected:10 std::cout << "event : " << "pub_event_disconnected" << std::endl;11 break;12 default:13 std::cout << "event : " << "unknown" << std::endl;14 break;15 }16 std::cout << std::endl;17}
Then we only have to bind the event we are interested in to the event callback.
1// create a publisher (topic name "person")2eCAL::protobuf::CPublisher<class pb::People::Person> pub("person");3
4// add event callback function (_1 = topic_name, _2 = event data struct)5auto evt_callback = std::bind(OnEvent, std::placeholders::_1, std::placeholders::_2);6pub.AddEventCallback(pub_event_connected, evt_callback);7pub.AddEventCallback(pub_event_disconnected, evt_callback);