9 #ifndef  SERIALIZATION_ZMQ_H    10 #define  SERIALIZATION_ZMQ_H    33                 template <
typename ZMQ_SOCKET_TYPE>
    35                         ZMQ_SOCKET_TYPE zmq_socket,
    37                         const size_t max_packet_len = 0)
    40                         if (!buf) 
throw std::bad_alloc();
    45                                 throw std::runtime_error(
"[mrpt_send_to_zmq] Serialized object has 0 bytes, which probably means something went wrong...");
    46                         unsigned int nPkts = (!max_packet_len) ? 1U : static_cast<unsigned int>(ceil(
double(nBytes)/max_packet_len));
    47                         for (
unsigned int iPkt=0;iPkt<nPkts;++iPkt)
    51                                 if (!fd) 
throw std::bad_alloc();
    54                                 void *pkt_data = 
reinterpret_cast<char*
>(fd->
buf->
getRawBufferData()) + max_packet_len*iPkt;
    55                                 size_t nBytesThisPkt = nBytes-max_packet_len*iPkt;
    56                                 if (max_packet_len!=0 && nBytesThisPkt>max_packet_len) nBytesThisPkt=max_packet_len;
    60                                         throw std::runtime_error(
"[mrpt_send_to_zmq] Error in zmq_msg_init_data()");
    62                                 const int sent_size = zmq_msg_send (&message, zmq_socket, fd->
do_free ? 0 : ZMQ_SNDMORE);
    63                                 if (0!=zmq_msg_close (&message))
    64                                         throw std::runtime_error(
"[mrpt_send_to_zmq] Error in zmq_msg_close()");
    65                                 if (sent_size!=static_cast<int>(nBytesThisPkt))
    66                                         throw std::runtime_error(
"[mrpt_send_to_zmq] Error in zmq_msg_send()");
    73                 template <
typename ZMQ_SOCKET_TYPE, 
typename VECTOR_MSG_T>
    75                         ZMQ_SOCKET_TYPE zmq_socket,
    76                         VECTOR_MSG_T &out_lst_msgs,
    79                         size_t * rx_obj_length_in_bytes)
    81                         if (rx_obj_length_in_bytes) *rx_obj_length_in_bytes = 0;
    85                         size_t more_size = 
sizeof(more);
    89                                 zmq_msg_t *msg = 
new zmq_msg_t();
    90                                 if (0!=zmq_msg_init (msg))
    92                                 out_lst_msgs.push_back(msg);
    94                                 int rc = zmq_msg_recv(msg,zmq_socket, dont_wait ? ZMQ_DONTWAIT : 0);
    95                                 if (rc==-1) 
return false;
    97                                 rc = zmq_getsockopt (zmq_socket, ZMQ_RCVMORE, &more, &more_size);
    98                                 if (rc!=0) 
return false;
   100                                 if (out_lst_msgs.size()==1 && !more) {
   102                                         if (rx_obj_length_in_bytes) *rx_obj_length_in_bytes = zmq_msg_size(msg);
   106                         if (out_lst_msgs.size()>1) {
   107                                 for (
size_t i=0;i<out_lst_msgs.size();i++) {
   108                                         target_buf.
WriteBuffer(zmq_msg_data(out_lst_msgs[i]),zmq_msg_size(out_lst_msgs[i]));
   110                                 if (rx_obj_length_in_bytes) *rx_obj_length_in_bytes = target_buf.
getTotalBytesCount();
   117                         template <
typename VECTOR_MSG_T>
   120                                 for (
size_t i=0;i<lst_msgs.size();++i) {
   121                                         zmq_msg_close (lst_msgs[i]);
   143                 template <
typename ZMQ_SOCKET_TYPE>
   144                 mrpt::utils::CSerializablePtr 
mrpt_recv_from_zmq(ZMQ_SOCKET_TYPE zmq_socket, 
bool dont_wait = 
false, 
size_t * rx_obj_length_in_bytes = NULL)
   147                         mrpt::utils::CSerializablePtr 
obj;
   148                         std::vector<zmq_msg_t*> lst_msgs_to_close;
   163                 template <
typename ZMQ_SOCKET_TYPE>
   167                         std::vector<zmq_msg_t*> lst_msgs_to_close;
 
CSerializablePtr ReadObject()
Reads an object from stream, its class determined at runtime, and returns a smart pointer to the obje...
 
mrpt::utils::CSerializablePtr mrpt_recv_from_zmq(ZMQ_SOCKET_TYPE zmq_socket, bool dont_wait=false, size_t *rx_obj_length_in_bytes=NULL)
Receives an MRPT object from a ZMQ socket, determining the type of the object on-the-fly. 
 
The virtual base class which provides a unified interface for all persistent objects in MRPT...
 
void WriteBuffer(const void *Buffer, size_t Count)
Writes a block of bytes to the stream from Buffer. 
 
GLsizei GLsizei GLuint * obj
 
void Clear()
Clears the memory buffer. 
 
This CStream derived class allow using a memory buffer as a CStream. 
 
void WriteObject(const CSerializable *o)
Writes an object to the stream. 
 
void BASE_IMPEXP free_fn_for_zmq(void *data, void *hint)
Used in mrpt_send_to_zmq(). hint points to a TFreeFnDataForZMQ struct, to be freed here...
 
void * getRawBufferData()
Method for getting a pointer to the raw stored data. 
 
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries. 
 
void assignMemoryNotOwn(const void *data, const uint64_t nBytesInData)
Initilize the data in the stream from a block of memory which is NEITHER OWNED NOR COPIED by the obje...
 
void mrpt_send_to_zmq(ZMQ_SOCKET_TYPE zmq_socket, const mrpt::utils::CSerializable &obj, const size_t max_packet_len=0)
Send an MRPT object to a ZMQ socket. 
 
void free_zmq_msg_lst(VECTOR_MSG_T &lst_msgs)
 
uint64_t getTotalBytesCount() MRPT_OVERRIDE
Returns the total size of the internal buffer. 
 
uint64_t Seek(int64_t Offset, CStream::TSeekOrigin Origin=sFromBeginning) MRPT_OVERRIDE
Introduces a pure virtual method for moving to a specified position in the streamed resource...
 
bool mrpt_recv_from_zmq_into(ZMQ_SOCKET_TYPE zmq_socket, mrpt::utils::CSerializable &target_object, bool dont_wait=false, size_t *rx_obj_length_in_bytes=NULL)
Like mrpt_recv_from_zmq() but without dynamically allocating the received object, more efficient to u...
 
bool mrpt_recv_from_zmq_buf(ZMQ_SOCKET_TYPE zmq_socket, VECTOR_MSG_T &out_lst_msgs, mrpt::utils::CMemoryStream &target_buf, bool dont_wait, size_t *rx_obj_length_in_bytes)
Users may normally call mrpt_recv_from_zmq() and mrpt_recv_from_zmq_into().