Main MRPT website > C++ reference for MRPT 1.9.9
CPipe.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | http://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2017, Individual contributors, see AUTHORS file |
6  | See: http://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See details in http://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "base-precomp.h" // Precompiled headers
11 
12 #include <mrpt/synch/CPipe.h>
13 
14 #ifdef MRPT_OS_WINDOWS
15 #include <windows.h>
16 #else
17 #include <sys/types.h>
18 #include <unistd.h>
19 #include <stdio.h>
20 #include <stdlib.h>
21 #endif
22 
23 using namespace mrpt::synch;
24 using namespace mrpt::utils;
25 
26 // ------------------ CPipe ------------------
27 
28 /** Creates a new pipe and returns the read & write end-points as newly
29  * allocated objects. */
31  CPipeReadEndPoint& outReadPipe, CPipeWriteEndPoint& outWritePipe)
32 {
33 #ifdef MRPT_OS_WINDOWS
34  // Win32 pipes
35  HANDLE hRead, hWrite;
36  if (!CreatePipe(&hRead, &hWrite, nullptr, 0))
37  THROW_EXCEPTION("Win32 error creating pipe endpoints!")
38 
39  outReadPipe.m_pipe_file = hRead;
40  outWritePipe.m_pipe_file = hWrite;
41 #else
42  // UNIX pipes
43  int fds[2];
44  if (::pipe(fds)) THROW_EXCEPTION("Unix error creating pipe endpoints!")
45 
46  outReadPipe.m_pipe_file = fds[0];
47  outWritePipe.m_pipe_file = fds[1];
48 #endif
49 }
50 
51 // ------------------ CPipeBaseEndPoint ------------------
53  : timeout_read_start_us(0), timeout_read_between_us(0), m_pipe_file(0)
54 {
55 }
56 
58 // Close:
60 {
61  if (m_pipe_file)
62  {
63 #ifdef MRPT_OS_WINDOWS
64  // Win32 pipes
65 
66  // Flush the pipe to allow the client to read the pipe's contents
67  // before disconnecting.
68  FlushFileBuffers((HANDLE)m_pipe_file);
69 
70  DisconnectNamedPipe((HANDLE)m_pipe_file);
71  CloseHandle((HANDLE)m_pipe_file);
72 #else
73  // UNIX pipes
74  ::fsync(m_pipe_file);
76 #endif
77  }
78  m_pipe_file = 0;
79 }
80 
81 /** De-serializes one end-point description, for example, from a parent process.
82  */
84 {
85  std::istringstream ss;
86  ss.str(serialized);
87 
88 #ifdef MRPT_OS_WINDOWS
89  // Win32 pipes
90  uint64_t val;
91  if (!(ss >> val))
92  {
93  THROW_EXCEPTION("Error parsing PIPE handle!")
94  }
95  m_pipe_file = reinterpret_cast<void*>(val);
96 #else
97  // UNIX pipes
98  ss >> m_pipe_file;
99 #endif
100 }
101 
102 /** Converts the end-point into a string suitable for reconstruction at a child
103 * process.
104 * This *invalidates* this object, since only one real end-point can exist at
105 * once.
106 */
108 {
109  ASSERTMSG_(m_pipe_file != 0, "Pipe is closed, can't serialize!")
110  std::stringstream ss;
111 #ifdef MRPT_OS_WINDOWS
112  // Win32 pipes
113  ss << reinterpret_cast<uint64_t>(m_pipe_file);
114 #else
115  // UNIX pipes
116  ss << m_pipe_file;
117 #endif
118  m_pipe_file = 0; // We don't own this file anymore...
119  return ss.str();
120 }
121 
122 // Methods that don't make sense in pipes:
126 /** Introduces a pure virtual method responsible for reading from the stream */
127 size_t CPipeBaseEndPoint::Read(void* Buffer, size_t Count)
128 {
129  ASSERTMSG_(m_pipe_file != 0, "Pipe is closed, can't read!")
130 
131 #if defined(MRPT_OS_WINDOWS)
132  // Win32 pipes
133  DWORD nActuallyRead;
134  if (!ReadFile((HANDLE)m_pipe_file, Buffer, Count, &nActuallyRead, nullptr))
135  return 0;
136  else
137  return static_cast<size_t>(nActuallyRead);
138 #else
139  // UNIX pipes
141  {
142  // Read without timeout:
143  return ::read(m_pipe_file, Buffer, Count);
144  }
145  else
146  {
147  // Use timeouts:
148  size_t alreadyRead = 0;
149  bool timeoutExpired = false;
150 
151  struct timeval timeoutSelect;
152  struct timeval* ptrTimeout;
153 
154  // Init fd_set structure & add our socket to it:
155  fd_set read_fds;
156  FD_ZERO(&read_fds);
157  FD_SET(m_pipe_file, &read_fds);
158 
159  // Loop until timeout expires or the socket is closed.
160  while (alreadyRead < Count && !timeoutExpired)
161  {
162  // Use the "first" or "between" timeouts:
163  unsigned int curTimeout_us = alreadyRead == 0
166 
167  if (curTimeout_us == 0)
168  ptrTimeout = nullptr;
169  else
170  {
171  timeoutSelect.tv_sec = curTimeout_us / 1000000;
172  timeoutSelect.tv_usec = (curTimeout_us % 1000000);
173  ptrTimeout = &timeoutSelect;
174  }
175 
176  // Wait for received data
177  if (::select(
178  m_pipe_file + 1, // __nfds
179  &read_fds, // Wait for read
180  nullptr, // Wait for write
181  nullptr, // Wait for except.
182  ptrTimeout) // Timeout
183  != 1)
184  { // Timeout:
185  timeoutExpired = true;
186  }
187  else
188  {
189  // Compute remaining part:
190  const size_t remainToRead = Count - alreadyRead;
191 
192  // Receive bytes:
193  const size_t readNow = ::read(
194  m_pipe_file, ((char*)Buffer) + alreadyRead,
195  (int)remainToRead);
196 
197  if (readNow != static_cast<size_t>(-1))
198  {
199  // Accumulate the received length:
200  alreadyRead += readNow;
201  }
202  else
203  {
204  // Error:
205  this->close();
206  return alreadyRead;
207  }
208  if (readNow == 0 && remainToRead != 0)
209  {
210  // We had an event of data available, so if we have now a
211  // zero,
212  // the socket has been gracefully closed:
213  timeoutExpired = true;
214  close();
215  }
216  }
217  } // end while
218  return alreadyRead;
219  }
220 #endif
221 }
222 
223 /** Introduces a pure virtual method responsible for writing to the stream.
224  * Write attempts to write up to Count bytes to Buffer, and returns the
225  * number of bytes actually written. */
226 size_t CPipeBaseEndPoint::Write(const void* Buffer, size_t Count)
227 {
228  ASSERTMSG_(m_pipe_file != 0, "Pipe is closed, can't write!")
229 
230 #ifdef MRPT_OS_WINDOWS
231  // Win32 pipes
232  DWORD nActuallyWritten;
233  if (!WriteFile(
234  (HANDLE)m_pipe_file, Buffer, Count, &nActuallyWritten, nullptr))
235  return 0;
236  else
237  return static_cast<size_t>(nActuallyWritten);
238 #else
239  // UNIX pipes
240  return ::write(m_pipe_file, Buffer, Count);
241 #endif
242 }
243 
244 // ------------- CPipeReadEndPoint -------------
247  : CPipeBaseEndPoint(serialized)
248 {
249 }
250 
251 // ------------- CPipeWriteEndPoint -------------
254  : CPipeBaseEndPoint(serialized)
255 {
256 }
Classes for serialization, sockets, ini-file manipulation, streams, list of properties-values, timewatch, extensions to STL.
TSeekOrigin
Used in CStream::Seek.
Definition: CStream.h:45
#define THROW_EXCEPTION(msg)
std::string serialize()
Converts the end-point into a string suitable for reconstruction at a child process.
Definition: CPipe.cpp:107
The write end-point in a pipe created with mrpt::synch::CPipe.
Definition: CPipe.h:144
virtual uint64_t getPosition() override
Without effect in this class.
Definition: CPipe.cpp:125
virtual size_t Write(const void *Buffer, size_t Count) override
Introduces a pure virtual method responsible for writing to the stream.
Definition: CPipe.cpp:226
unsigned int timeout_read_start_us
(Default=0) Timeout for read operations: microseconds (us) to wait for the first byte.
Definition: CPipe.h:88
static void initializePipe(CPipeReadEndPoint &outReadPipe, CPipeWriteEndPoint &outWritePipe)
Creates a new pipe and returns the read & write end-points as newly allocated objects.
Definition: CPipe.cpp:30
The read end-point in a pipe created with mrpt::synch::CPipe.
Definition: CPipe.h:125
virtual uint64_t Seek(uint64_t Offset, CStream::TSeekOrigin Origin=sFromBeginning) override
Without effect in this class.
Definition: CPipe.cpp:123
int val
Definition: mrpt_jpeglib.h:955
virtual size_t Read(void *Buffer, size_t Count) override
Introduces a pure virtual method responsible for reading from the stream.
Definition: CPipe.cpp:127
virtual uint64_t getTotalBytesCount() override
Without effect in this class.
Definition: CPipe.cpp:124
GLsizei const GLchar ** string
Definition: glext.h:4101
unsigned __int64 uint64_t
Definition: rptypes.h:50
Common interface of read & write pipe end-points.
Definition: CPipe.h:63
void close()
Closes the pipe (normally not needed to be called by users, automatically done at destructor) ...
Definition: CPipe.cpp:59
unsigned int timeout_read_between_us
(Default=0) Timeout between burst reads operations: microseconds (us) to wait between two partial rea...
Definition: CPipe.h:92
#define ASSERTMSG_(f, __ERROR_MSG)



Page generated by Doxygen 1.8.14 for MRPT 1.9.9 Git: ae4571287 Thu Nov 23 00:06:53 2017 +0100 at dom oct 27 23:51:55 CET 2019