MRPT  1.9.9
CPipe.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | http://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2018, Individual contributors, see AUTHORS file |
6  | See: http://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See details in http://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "io-precomp.h" // Precompiled headers
11 
12 #include <mrpt/io/CPipe.h>
13 #include <mrpt/core/exceptions.h>
14 
15 #ifdef _WIN32
16 #include <windows.h>
17 #else
18 #include <sys/types.h>
19 #include <unistd.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #endif
23 
24 using namespace mrpt::io;
25 
26 // ------------------ CPipe ------------------
27 
28 /** Creates a new pipe and returns the read & write end-points as newly
29  * allocated objects. */
31  CPipeReadEndPoint& outReadPipe, CPipeWriteEndPoint& outWritePipe)
32 {
33 #ifdef _WIN32
34  // Win32 pipes
35  HANDLE hRead, hWrite;
36  if (!CreatePipe(&hRead, &hWrite, nullptr, 0))
37  THROW_EXCEPTION("Win32 error creating pipe endpoints!");
38 
39  outReadPipe.m_pipe_file = hRead;
40  outWritePipe.m_pipe_file = hWrite;
41 #else
42  // UNIX pipes
43  int fds[2];
44  if (::pipe(fds)) THROW_EXCEPTION("Unix error creating pipe endpoints!");
45 
46  outReadPipe.m_pipe_file = fds[0];
47  outWritePipe.m_pipe_file = fds[1];
48 #endif
49 }
50 
51 // ------------------ CPipeBaseEndPoint ------------------
53  : timeout_read_start_us(0), timeout_read_between_us(0), m_pipe_file(0)
54 {
55 }
56 
58 // Close:
60 {
61  if (m_pipe_file)
62  {
63 #ifdef _WIN32
64  // Win32 pipes
65 
66  // Flush the pipe to allow the client to read the pipe's contents
67  // before disconnecting.
68  FlushFileBuffers((HANDLE)m_pipe_file);
69 
70  DisconnectNamedPipe((HANDLE)m_pipe_file);
71  CloseHandle((HANDLE)m_pipe_file);
72 #else
73  // UNIX pipes
74  ::fsync(m_pipe_file);
76 #endif
77  }
78  m_pipe_file = 0;
79 }
80 
81 /** De-serializes one end-point description, for example, from a parent process.
82  */
84 {
85  try
86  {
87 #ifdef _WIN32
88  uint64_t val = std::stoull(serialized);
89  m_pipe_file = reinterpret_cast<void*>(val);
90 #else
91  m_pipe_file = std::stoi(serialized);
92 #endif
93  }
94  catch (std::invalid_argument&)
95  {
96  THROW_EXCEPTION("Error parsing PIPE handle!");
97  }
98 }
99 
100 /** Converts the end-point into a string suitable for reconstruction at a child
101 * process.
102 * This *invalidates* this object, since only one real end-point can exist at
103 * once.
104 */
106 {
107  ASSERTMSG_(m_pipe_file != 0, "Pipe is closed, can't serialize!");
108 #ifdef _WIN32
109  return std::to_string(reinterpret_cast<uint64_t>(m_pipe_file));
110 #else
111  return std::to_string(m_pipe_file);
112 #endif
113 }
114 
115 // Methods that don't make sense in pipes:
119 /** Introduces a pure virtual method responsible for reading from the stream */
120 size_t CPipeBaseEndPoint::Read(void* Buffer, size_t Count)
121 {
122  ASSERTMSG_(m_pipe_file != 0, "Pipe is closed, can't read!");
123 
124 #if defined(_WIN32)
125  // Win32 pipes
126  DWORD nActuallyRead;
127  if (!ReadFile((HANDLE)m_pipe_file, Buffer, Count, &nActuallyRead, nullptr))
128  return 0;
129  else
130  return static_cast<size_t>(nActuallyRead);
131 #else
132  // UNIX pipes
134  {
135  // Read without timeout:
136  return ::read(m_pipe_file, Buffer, Count);
137  }
138  else
139  {
140  // Use timeouts:
141  size_t alreadyRead = 0;
142  bool timeoutExpired = false;
143 
144  struct timeval timeoutSelect;
145  struct timeval* ptrTimeout;
146 
147  // Init fd_set structure & add our socket to it:
148  fd_set read_fds;
149  FD_ZERO(&read_fds);
150  FD_SET(m_pipe_file, &read_fds);
151 
152  // Loop until timeout expires or the socket is closed.
153  while (alreadyRead < Count && !timeoutExpired)
154  {
155  // Use the "first" or "between" timeouts:
156  unsigned int curTimeout_us = alreadyRead == 0
159 
160  if (curTimeout_us == 0)
161  ptrTimeout = nullptr;
162  else
163  {
164  timeoutSelect.tv_sec = curTimeout_us / 1000000;
165  timeoutSelect.tv_usec = (curTimeout_us % 1000000);
166  ptrTimeout = &timeoutSelect;
167  }
168 
169  // Wait for received data
170  if (::select(
171  m_pipe_file + 1, // __nfds
172  &read_fds, // Wait for read
173  nullptr, // Wait for write
174  nullptr, // Wait for except.
175  ptrTimeout) // Timeout
176  != 1)
177  { // Timeout:
178  timeoutExpired = true;
179  }
180  else
181  {
182  // Compute remaining part:
183  const size_t remainToRead = Count - alreadyRead;
184 
185  // Receive bytes:
186  const size_t readNow = ::read(
187  m_pipe_file, ((char*)Buffer) + alreadyRead,
188  (int)remainToRead);
189 
190  if (readNow != static_cast<size_t>(-1))
191  {
192  // Accumulate the received length:
193  alreadyRead += readNow;
194  }
195  else
196  {
197  // Error:
198  this->close();
199  return alreadyRead;
200  }
201  if (readNow == 0 && remainToRead != 0)
202  {
203  // We had an event of data available, so if we have now a
204  // zero,
205  // the socket has been gracefully closed:
206  timeoutExpired = true;
207  close();
208  }
209  }
210  } // end while
211  return alreadyRead;
212  }
213 #endif
214 }
215 
216 /** Introduces a pure virtual method responsible for writing to the stream.
217  * Write attempts to write up to Count bytes to Buffer, and returns the
218  * number of bytes actually written. */
219 size_t CPipeBaseEndPoint::Write(const void* Buffer, size_t Count)
220 {
221  ASSERTMSG_(m_pipe_file != 0, "Pipe is closed, can't write!");
222 
223 #ifdef _WIN32
224  // Win32 pipes
225  DWORD nActuallyWritten;
226  if (!WriteFile(
227  (HANDLE)m_pipe_file, Buffer, Count, &nActuallyWritten, nullptr))
228  return 0;
229  else
230  return static_cast<size_t>(nActuallyWritten);
231 #else
232  // UNIX pipes
233  return ::write(m_pipe_file, Buffer, Count);
234 #endif
235 }
236 
237 // ------------- CPipeReadEndPoint -------------
240  : CPipeBaseEndPoint(serialized)
241 {
242 }
243 
244 // ------------- CPipeWriteEndPoint -------------
247  : CPipeBaseEndPoint(serialized)
248 {
249 }
TSeekOrigin
Used in CStream::Seek.
Definition: io/CStream.h:32
virtual size_t Read(void *Buffer, size_t Count) override
Introduces a pure virtual method responsible for reading from the stream.
Definition: CPipe.cpp:120
virtual ~CPipeBaseEndPoint()
Definition: CPipe.cpp:57
#define THROW_EXCEPTION(msg)
Definition: exceptions.h:41
virtual uint64_t getPosition() const override
Without effect in this class.
Definition: CPipe.cpp:118
void close()
Closes the pipe (normally not needed to be called by users, automatically done at destructor) ...
Definition: CPipe.cpp:59
virtual size_t Write(const void *Buffer, size_t Count) override
Introduces a pure virtual method responsible for writing to the stream.
Definition: CPipe.cpp:219
std::string serialize()
Converts the end-point into a string suitable for reconstruction at a child process.
Definition: CPipe.cpp:105
__int64 int64_t
Definition: rptypes.h:49
virtual uint64_t Seek(int64_t of, CStream::TSeekOrigin o=sFromBeginning) override
Without effect in this class.
Definition: CPipe.cpp:116
int val
Definition: mrpt_jpeglib.h:955
#define ASSERTMSG_(f, __ERROR_MSG)
Defines an assertion mechanism.
Definition: exceptions.h:101
GLsizei const GLchar ** string
Definition: glext.h:4101
The write end-point in a pipe created with mrpt::synch::CPipe.
Definition: CPipe.h:149
unsigned __int64 uint64_t
Definition: rptypes.h:50
unsigned int timeout_read_start_us
(Default=0) Timeout for read operations: microseconds (us) to wait for the first byte.
Definition: CPipe.h:86
virtual uint64_t getTotalBytesCount() const override
Without effect in this class.
Definition: CPipe.cpp:117
The read end-point in a pipe created with mrpt::synch::CPipe.
Definition: CPipe.h:126
std::string std::string to_string(T v)
Just like std::to_string(), but with an overloaded version for std::string arguments.
Definition: format.h:30
unsigned int timeout_read_between_us
(Default=0) Timeout between burst reads operations: microseconds (us) to wait between two partial rea...
Definition: CPipe.h:90
static void initializePipe(CPipeReadEndPoint &outReadPipe, CPipeWriteEndPoint &outWritePipe)
Creates a new pipe and returns the read & write end-points as newly allocated objects.
Definition: CPipe.cpp:30
Common interface of read & write pipe end-points.
Definition: CPipe.h:61



Page generated by Doxygen 1.8.14 for MRPT 1.9.9 Git: 7d5e6d718 Fri Aug 24 01:51:28 2018 +0200 at lun nov 2 08:35:50 CET 2020