Main MRPT website > C++ reference for MRPT 1.9.9
serialization_zmq.h
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | http://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2017, Individual contributors, see AUTHORS file |
6  | See: http://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See details in http://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 #ifndef SERIALIZATION_ZMQ_H
10 #define SERIALIZATION_ZMQ_H
11 
14 #include <cmath> // ceil()
15 
16 namespace mrpt
17 {
18 namespace utils
19 {
20 /** \addtogroup noncstream_serialization_zmq Serialization functions for ZMQ (v3
21  * or above) (in #include <mrpt/utils/serialization_zmq.h>)
22  * \ingroup noncstream_serialization
23  * @{ */
24 
25 /** Send an MRPT object to a ZMQ socket.
26  * \param[in] obj The object to be serialized and sent to the socket.
27  * \param[in] zmq_socket The zmq socket object.
28  * \param[in] max_packet_len The object will be split into a series of ZMQ
29  * "message parts" of this maximum length (in bytes). Default=0, which means do
30  * not split in parts.
31  * \note Including `<mrpt/utils/serialization_zmq.h>` requires libzmq to be
32  * available in your system and linked
33  * to your user code. This function can be used even if MRPT was built without
34  * ZMQ support, thanks to the use of templates.
35  * \exception std::exception If the object finds any critical error during
36  * serialization or on ZMQ errors.
37  * \note See examples of usage in
38  * https://github.com/MRPT/mrpt/tree/master/doc/mrpt-zeromq-example
39  */
40 template <typename ZMQ_SOCKET_TYPE>
42  ZMQ_SOCKET_TYPE zmq_socket, const mrpt::utils::CSerializable& obj,
43  const size_t max_packet_len = 0)
44 {
46  if (!buf) throw std::bad_alloc();
47 
48  buf->WriteObject(&obj);
49  const size_t nBytes = buf->getTotalBytesCount();
50  if (!nBytes)
51  throw std::runtime_error(
52  "[mrpt_send_to_zmq] Serialized object has 0 bytes, which probably "
53  "means something went wrong...");
54  unsigned int nPkts =
55  (!max_packet_len)
56  ? 1U
57  : static_cast<unsigned int>(ceil(double(nBytes) / max_packet_len));
58  for (unsigned int iPkt = 0; iPkt < nPkts; ++iPkt)
59  {
60  // Prepare a msg part:
63  if (!fd) throw std::bad_alloc();
64  fd->buf = buf;
65  fd->do_free =
66  iPkt ==
67  (nPkts - 1); // Free buffer only after the last part is disposed.
68  void* pkt_data = reinterpret_cast<char*>(fd->buf->getRawBufferData()) +
69  max_packet_len * iPkt;
70  size_t nBytesThisPkt = nBytes - max_packet_len * iPkt;
71  if (max_packet_len != 0 && nBytesThisPkt > max_packet_len)
72  nBytesThisPkt = max_packet_len;
73  // Build ZMQ msg:
74  zmq_msg_t message;
75  if (0 != zmq_msg_init_data(
76  &message, pkt_data, nBytesThisPkt,
78  throw std::runtime_error(
79  "[mrpt_send_to_zmq] Error in zmq_msg_init_data()");
80  // Send:
81  const int sent_size =
82  zmq_msg_send(&message, zmq_socket, fd->do_free ? 0 : ZMQ_SNDMORE);
83  if (0 != zmq_msg_close(&message))
84  throw std::runtime_error(
85  "[mrpt_send_to_zmq] Error in zmq_msg_close()");
86  if (sent_size != static_cast<int>(nBytesThisPkt))
87  throw std::runtime_error(
88  "[mrpt_send_to_zmq] Error in zmq_msg_send()");
89  }
90 }
91 
92 /** Users may normally call mrpt_recv_from_zmq() and mrpt_recv_from_zmq_into().
93  * This function just stores the received data into a memory buffer without
94  * parsing it into an MRPT object.
95  * \return false on any error */
96 template <typename ZMQ_SOCKET_TYPE, typename VECTOR_MSG_T>
98  ZMQ_SOCKET_TYPE zmq_socket, VECTOR_MSG_T& out_lst_msgs,
99  mrpt::utils::CMemoryStream& target_buf, bool dont_wait,
100  size_t* rx_obj_length_in_bytes)
101 {
102  if (rx_obj_length_in_bytes) *rx_obj_length_in_bytes = 0;
103  out_lst_msgs.clear();
104  target_buf.Clear();
105  int64_t more;
106  size_t more_size = sizeof(more);
107  do
108  {
109  // Init rx msg:
110  zmq_msg_t* msg = new zmq_msg_t();
111  if (0 != zmq_msg_init(msg)) return false;
112  out_lst_msgs.push_back(msg);
113  // Recv:
114  int rc = zmq_msg_recv(msg, zmq_socket, dont_wait ? ZMQ_DONTWAIT : 0);
115  if (rc == -1) return false;
116  // Determine if more message parts are to follow
117  rc = zmq_getsockopt(zmq_socket, ZMQ_RCVMORE, &more, &more_size);
118  if (rc != 0) return false;
119  // Only one part?
120  if (out_lst_msgs.size() == 1 && !more)
121  {
122  target_buf.assignMemoryNotOwn(zmq_msg_data(msg), zmq_msg_size(msg));
123  if (rx_obj_length_in_bytes)
124  *rx_obj_length_in_bytes = zmq_msg_size(msg);
125  }
126  } while (more);
127  // More than 1 part?
128  if (out_lst_msgs.size() > 1)
129  {
130  for (size_t i = 0; i < out_lst_msgs.size(); i++)
131  {
132  target_buf.WriteBuffer(
133  zmq_msg_data(out_lst_msgs[i]), zmq_msg_size(out_lst_msgs[i]));
134  }
135  if (rx_obj_length_in_bytes)
136  *rx_obj_length_in_bytes = target_buf.getTotalBytesCount();
137  target_buf.Seek(0);
138  }
139  return true;
140 }
141 
142 namespace internal
143 {
144 template <typename VECTOR_MSG_T>
145 void free_zmq_msg_lst(VECTOR_MSG_T& lst_msgs)
146 {
147  for (size_t i = 0; i < lst_msgs.size(); ++i)
148  {
149  zmq_msg_close(lst_msgs[i]);
150  delete lst_msgs[i];
151  }
152 }
153 }
154 
155 /** Receives an MRPT object from a ZMQ socket, determining the type of the
156  * object on-the-fly.
157  * \param[in] zmq_socket The zmq socket object.
158  * \param[in] dont_wait If true, will fail if there is no data ready to
159  * be read. If false (default) this function will block until data arrives.
160  * \param[out] rx_obj_length_in_bytes If non-nullptr, the object length will be
161  * stored here.
162  * \return An empty smart pointer if there was any error. The received
163  * object if all went OK.
164  * \note Including `<mrpt/utils/serialization_zmq.h>` requires libzmq to be
165  * available in your system and linked to your user code. This function
166  * can be used even if MRPT was built without ZMQ support, thanks to the
167  * use of templates.
168  * \exception std::exception If the object finds any critical error during
169  * de-serialization.
170  * \sa mrpt_recv_from_zmq_into
171  * \note See examples of usage in
172  * https://github.com/MRPT/mrpt/tree/master/doc/mrpt-zeromq-example
173  */
174 template <typename ZMQ_SOCKET_TYPE>
176  ZMQ_SOCKET_TYPE zmq_socket, bool dont_wait = false,
177  size_t* rx_obj_length_in_bytes = nullptr)
178 {
179  CMemoryStream target_buf;
181  std::vector<zmq_msg_t*> lst_msgs_to_close;
183  zmq_socket, lst_msgs_to_close, target_buf, dont_wait,
184  rx_obj_length_in_bytes))
185  return obj;
186  // De-serialize:
187  obj = target_buf.ReadObject();
188  internal::free_zmq_msg_lst(lst_msgs_to_close); // Free msgs mem
189  return obj;
190 }
191 /** Like mrpt_recv_from_zmq() but without dynamically allocating the received
192  * object,
193  * more efficient to use if the type of the received object is known in
194  * advance.
195  * \param[in] target_object The received object will be stored here. An
196  * exception will be raised upon type mismatch.
197  * \return true if all was OK, false on any ZMQ error.
198  * \sa mrpt_recv_from_zmq() for details on the rest of parameters.
199  * \note See examples of usage in
200  * https://github.com/MRPT/mrpt/tree/master/doc/mrpt-zeromq-example
201  */
202 template <typename ZMQ_SOCKET_TYPE>
204  ZMQ_SOCKET_TYPE zmq_socket, mrpt::utils::CSerializable& target_object,
205  bool dont_wait = false, size_t* rx_obj_length_in_bytes = nullptr)
206 {
207  CMemoryStream target_buf;
208  std::vector<zmq_msg_t*> lst_msgs_to_close;
210  zmq_socket, lst_msgs_to_close, target_buf, dont_wait,
211  rx_obj_length_in_bytes))
212  return false;
213  // De-serialize:
214  target_buf.ReadObject(&target_object);
215  internal::free_zmq_msg_lst(lst_msgs_to_close); // Free msgs mem
216  return true;
217 }
218 
219 /** @} */
220 } // End of namespace
221 } // End of namespace
222 #endif
mrpt::utils::CSerializable::Ptr mrpt_recv_from_zmq(ZMQ_SOCKET_TYPE zmq_socket, bool dont_wait=false, size_t *rx_obj_length_in_bytes=nullptr)
Receives an MRPT object from a ZMQ socket, determining the type of the object on-the-fly.
The virtual base class which provides a unified interface for all persistent objects in MRPT...
Definition: CSerializable.h:44
uint64_t Seek(uint64_t Offset, CStream::TSeekOrigin Origin=sFromBeginning) override
Introduces a pure virtual method for moving to a specified position in the streamed resource...
void WriteBuffer(const void *Buffer, size_t Count)
Writes a block of bytes to the stream from Buffer.
Definition: CStream.cpp:64
GLsizei GLsizei GLuint * obj
Definition: glext.h:4070
CSerializable::Ptr ReadObject()
Reads an object from stream, its class determined at runtime, and returns a smart pointer to the obje...
Definition: CStream.h:207
void Clear()
Clears the memory buffer.
__int64 int64_t
Definition: rptypes.h:49
This CStream derived class allow using a memory buffer as a CStream.
Definition: CMemoryStream.h:27
void WriteObject(const CSerializable *o)
Writes an object to the stream.
Definition: CStream.cpp:158
void free_fn_for_zmq(void *data, void *hint)
Used in mrpt_send_to_zmq().
std::shared_ptr< CSerializable > Ptr
Definition: CSerializable.h:47
void * getRawBufferData()
Method for getting a pointer to the raw stored data.
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
void assignMemoryNotOwn(const void *data, const uint64_t nBytesInData)
Initilize the data in the stream from a block of memory which is NEITHER OWNED NOR COPIED by the obje...
void mrpt_send_to_zmq(ZMQ_SOCKET_TYPE zmq_socket, const mrpt::utils::CSerializable &obj, const size_t max_packet_len=0)
Send an MRPT object to a ZMQ socket.
void free_zmq_msg_lst(VECTOR_MSG_T &lst_msgs)
bool mrpt_recv_from_zmq_into(ZMQ_SOCKET_TYPE zmq_socket, mrpt::utils::CSerializable &target_object, bool dont_wait=false, size_t *rx_obj_length_in_bytes=nullptr)
Like mrpt_recv_from_zmq() but without dynamically allocating the received object, more efficient to u...
uint64_t getTotalBytesCount() override
Returns the total size of the internal buffer.
bool mrpt_recv_from_zmq_buf(ZMQ_SOCKET_TYPE zmq_socket, VECTOR_MSG_T &out_lst_msgs, mrpt::utils::CMemoryStream &target_buf, bool dont_wait, size_t *rx_obj_length_in_bytes)
Users may normally call mrpt_recv_from_zmq() and mrpt_recv_from_zmq_into().



Page generated by Doxygen 1.8.14 for MRPT 1.9.9 Git: ae4571287 Thu Nov 23 00:06:53 2017 +0100 at dom oct 27 23:51:55 CET 2019