Main MRPT website > C++ reference for MRPT 1.5.6
serialization_zmq.h
Go to the documentation of this file.
1 /* +---------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | http://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2017, Individual contributors, see AUTHORS file |
6  | See: http://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See details in http://www.mrpt.org/License |
8  +---------------------------------------------------------------------------+ */
9 #ifndef SERIALIZATION_ZMQ_H
10 #define SERIALIZATION_ZMQ_H
11 
14 #include <cmath> // ceil()
15 
16 namespace mrpt
17 {
18  namespace utils
19  {
20  /** \addtogroup noncstream_serialization_zmq Serialization functions for ZMQ (v3 or above) (in #include <mrpt/utils/serialization_zmq.h>)
21  * \ingroup noncstream_serialization
22  * @{ */
23 
24  /** Send an MRPT object to a ZMQ socket.
25  * \param[in] obj The object to be serialized and sent to the socket.
26  * \param[in] zmq_socket The zmq socket object.
27  * \param[in] max_packet_len The object will be split into a series of ZMQ "message parts" of this maximum length (in bytes). Default=0, which means do not split in parts.
28  * \note Including `<mrpt/utils/serialization_zmq.h>` requires libzmq to be available in your system and linked
29  * to your user code. This function can be used even if MRPT was built without ZMQ support, thanks to the use of templates.
30  * \exception std::exception If the object finds any critical error during serialization or on ZMQ errors.
31  * \note See examples of usage in https://github.com/MRPT/mrpt/tree/master/doc/mrpt-zeromq-example
32  */
33  template <typename ZMQ_SOCKET_TYPE>
35  ZMQ_SOCKET_TYPE zmq_socket,
37  const size_t max_packet_len = 0)
38  {
40  if (!buf) throw std::bad_alloc();
41 
42  buf->WriteObject(&obj);
43  const size_t nBytes = buf->getTotalBytesCount();
44  if (!nBytes)
45  throw std::runtime_error("[mrpt_send_to_zmq] Serialized object has 0 bytes, which probably means something went wrong...");
46  unsigned int nPkts = (!max_packet_len) ? 1U : static_cast<unsigned int>(ceil(double(nBytes)/max_packet_len));
47  for (unsigned int iPkt=0;iPkt<nPkts;++iPkt)
48  {
49  // Prepare a msg part:
51  if (!fd) throw std::bad_alloc();
52  fd->buf = buf;
53  fd->do_free = iPkt==(nPkts-1); // Free buffer only after the last part is disposed.
54  void *pkt_data = reinterpret_cast<char*>(fd->buf->getRawBufferData()) + max_packet_len*iPkt;
55  size_t nBytesThisPkt = nBytes-max_packet_len*iPkt;
56  if (max_packet_len!=0 && nBytesThisPkt>max_packet_len) nBytesThisPkt=max_packet_len;
57  // Build ZMQ msg:
58  zmq_msg_t message;
59  if (0!=zmq_msg_init_data(&message,pkt_data , nBytesThisPkt, &mrpt::utils::internal::free_fn_for_zmq, fd))
60  throw std::runtime_error("[mrpt_send_to_zmq] Error in zmq_msg_init_data()");
61  // Send:
62  const int sent_size = zmq_msg_send (&message, zmq_socket, fd->do_free ? 0 : ZMQ_SNDMORE);
63  if (0!=zmq_msg_close (&message))
64  throw std::runtime_error("[mrpt_send_to_zmq] Error in zmq_msg_close()");
65  if (sent_size!=static_cast<int>(nBytesThisPkt))
66  throw std::runtime_error("[mrpt_send_to_zmq] Error in zmq_msg_send()");
67  }
68  }
69 
70  /** Users may normally call mrpt_recv_from_zmq() and mrpt_recv_from_zmq_into().
71  * This function just stores the received data into a memory buffer without parsing it into an MRPT object.
72  * \return false on any error */
73  template <typename ZMQ_SOCKET_TYPE, typename VECTOR_MSG_T>
75  ZMQ_SOCKET_TYPE zmq_socket,
76  VECTOR_MSG_T &out_lst_msgs,
77  mrpt::utils::CMemoryStream &target_buf,
78  bool dont_wait,
79  size_t * rx_obj_length_in_bytes)
80  {
81  if (rx_obj_length_in_bytes) *rx_obj_length_in_bytes = 0;
82  out_lst_msgs.clear();
83  target_buf.Clear();
84  int64_t more;
85  size_t more_size = sizeof(more);
86  do
87  {
88  // Init rx msg:
89  zmq_msg_t *msg = new zmq_msg_t();
90  if (0!=zmq_msg_init (msg))
91  return false;
92  out_lst_msgs.push_back(msg);
93  // Recv:
94  int rc = zmq_msg_recv(msg,zmq_socket, dont_wait ? ZMQ_DONTWAIT : 0);
95  if (rc==-1) return false;
96  // Determine if more message parts are to follow
97  rc = zmq_getsockopt (zmq_socket, ZMQ_RCVMORE, &more, &more_size);
98  if (rc!=0) return false;
99  // Only one part?
100  if (out_lst_msgs.size()==1 && !more) {
101  target_buf.assignMemoryNotOwn(zmq_msg_data(msg), zmq_msg_size(msg));
102  if (rx_obj_length_in_bytes) *rx_obj_length_in_bytes = zmq_msg_size(msg);
103  }
104  } while (more);
105  // More than 1 part?
106  if (out_lst_msgs.size()>1) {
107  for (size_t i=0;i<out_lst_msgs.size();i++) {
108  target_buf.WriteBuffer(zmq_msg_data(out_lst_msgs[i]),zmq_msg_size(out_lst_msgs[i]));
109  }
110  if (rx_obj_length_in_bytes) *rx_obj_length_in_bytes = target_buf.getTotalBytesCount();
111  target_buf.Seek(0);
112  }
113  return true;
114  }
115 
116  namespace internal {
117  template <typename VECTOR_MSG_T>
118  void free_zmq_msg_lst(VECTOR_MSG_T &lst_msgs)
119  {
120  for (size_t i=0;i<lst_msgs.size();++i) {
121  zmq_msg_close (lst_msgs[i]);
122  delete lst_msgs[i];
123  }
124  }
125  }
126 
127  /** Receives an MRPT object from a ZMQ socket, determining the type of the
128  * object on-the-fly.
129  * \param[in] zmq_socket The zmq socket object.
130  * \param[in] dont_wait If true, will fail if there is no data ready to
131  * be read. If false (default) this function will block until data arrives.
132  * \param[out] rx_obj_length_in_bytes If non-NULL, the object length will be stored here.
133  * \return An empty smart pointer if there was any error. The received
134  * object if all went OK.
135  * \note Including `<mrpt/utils/serialization_zmq.h>` requires libzmq to be
136  * available in your system and linked to your user code. This function
137  * can be used even if MRPT was built without ZMQ support, thanks to the
138  * use of templates.
139  * \exception std::exception If the object finds any critical error during de-serialization.
140  * \sa mrpt_recv_from_zmq_into
141  * \note See examples of usage in https://github.com/MRPT/mrpt/tree/master/doc/mrpt-zeromq-example
142  */
143  template <typename ZMQ_SOCKET_TYPE>
144  mrpt::utils::CSerializablePtr mrpt_recv_from_zmq(ZMQ_SOCKET_TYPE zmq_socket, bool dont_wait = false, size_t * rx_obj_length_in_bytes = NULL)
145  {
146  CMemoryStream target_buf;
147  mrpt::utils::CSerializablePtr obj;
148  std::vector<zmq_msg_t*> lst_msgs_to_close;
149  if (!mrpt_recv_from_zmq_buf(zmq_socket,lst_msgs_to_close, target_buf,dont_wait,rx_obj_length_in_bytes))
150  return obj;
151  // De-serialize:
152  obj = target_buf.ReadObject();
153  internal::free_zmq_msg_lst(lst_msgs_to_close); // Free msgs mem
154  return obj;
155  }
156  /** Like mrpt_recv_from_zmq() but without dynamically allocating the received object,
157  * more efficient to use if the type of the received object is known in advance.
158  * \param[in] target_object The received object will be stored here. An exception will be raised upon type mismatch.
159  * \return true if all was OK, false on any ZMQ error.
160  * \sa mrpt_recv_from_zmq() for details on the rest of parameters.
161  * \note See examples of usage in https://github.com/MRPT/mrpt/tree/master/doc/mrpt-zeromq-example
162  */
163  template <typename ZMQ_SOCKET_TYPE>
164  bool mrpt_recv_from_zmq_into(ZMQ_SOCKET_TYPE zmq_socket, mrpt::utils::CSerializable &target_object, bool dont_wait = false, size_t * rx_obj_length_in_bytes = NULL)
165  {
166  CMemoryStream target_buf;
167  std::vector<zmq_msg_t*> lst_msgs_to_close;
168  if (!mrpt_recv_from_zmq_buf(zmq_socket,lst_msgs_to_close, target_buf,dont_wait,rx_obj_length_in_bytes))
169  return false;
170  // De-serialize:
171  target_buf.ReadObject(&target_object);
172  internal::free_zmq_msg_lst(lst_msgs_to_close); // Free msgs mem
173  return true;
174  }
175 
176  /** @} */
177  } // End of namespace
178 } // End of namespace
179 #endif
CSerializablePtr ReadObject()
Reads an object from stream, its class determined at runtime, and returns a smart pointer to the obje...
Definition: CStream.cpp:486
mrpt::utils::CSerializablePtr mrpt_recv_from_zmq(ZMQ_SOCKET_TYPE zmq_socket, bool dont_wait=false, size_t *rx_obj_length_in_bytes=NULL)
Receives an MRPT object from a ZMQ socket, determining the type of the object on-the-fly.
The virtual base class which provides a unified interface for all persistent objects in MRPT...
Definition: CSerializable.h:39
void WriteBuffer(const void *Buffer, size_t Count)
Writes a block of bytes to the stream from Buffer.
Definition: CStream.cpp:67
GLsizei GLsizei GLuint * obj
Definition: glext.h:3902
void Clear()
Clears the memory buffer.
__int64 int64_t
Definition: rptypes.h:51
This CStream derived class allow using a memory buffer as a CStream.
Definition: CMemoryStream.h:26
void WriteObject(const CSerializable *o)
Writes an object to the stream.
Definition: CStream.cpp:166
void BASE_IMPEXP free_fn_for_zmq(void *data, void *hint)
Used in mrpt_send_to_zmq(). hint points to a TFreeFnDataForZMQ struct, to be freed here...
void * getRawBufferData()
Method for getting a pointer to the raw stored data.
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
void assignMemoryNotOwn(const void *data, const uint64_t nBytesInData)
Initilize the data in the stream from a block of memory which is NEITHER OWNED NOR COPIED by the obje...
void mrpt_send_to_zmq(ZMQ_SOCKET_TYPE zmq_socket, const mrpt::utils::CSerializable &obj, const size_t max_packet_len=0)
Send an MRPT object to a ZMQ socket.
void free_zmq_msg_lst(VECTOR_MSG_T &lst_msgs)
uint64_t getTotalBytesCount() MRPT_OVERRIDE
Returns the total size of the internal buffer.
uint64_t Seek(int64_t Offset, CStream::TSeekOrigin Origin=sFromBeginning) MRPT_OVERRIDE
Introduces a pure virtual method for moving to a specified position in the streamed resource...
bool mrpt_recv_from_zmq_into(ZMQ_SOCKET_TYPE zmq_socket, mrpt::utils::CSerializable &target_object, bool dont_wait=false, size_t *rx_obj_length_in_bytes=NULL)
Like mrpt_recv_from_zmq() but without dynamically allocating the received object, more efficient to u...
bool mrpt_recv_from_zmq_buf(ZMQ_SOCKET_TYPE zmq_socket, VECTOR_MSG_T &out_lst_msgs, mrpt::utils::CMemoryStream &target_buf, bool dont_wait, size_t *rx_obj_length_in_bytes)
Users may normally call mrpt_recv_from_zmq() and mrpt_recv_from_zmq_into().



Page generated by Doxygen 1.8.14 for MRPT 1.5.6 Git: 4c65e8431 Tue Apr 24 08:18:17 2018 +0200 at lun oct 28 01:35:26 CET 2019