MRPT  1.9.9
CPipe.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | https://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2020, Individual contributors, see AUTHORS file |
6  | See: https://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See: https://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "io-precomp.h" // Precompiled headers
11 
12 #include <mrpt/core/exceptions.h>
13 #include <mrpt/io/CPipe.h>
14 
15 #ifdef _WIN32
16 #include <windows.h>
17 #else
18 #include <sys/types.h>
19 #include <unistd.h>
20 #include <cstdio>
21 #include <cstdlib>
22 #endif
23 
24 using namespace mrpt::io;
25 
26 // ------------------ CPipe ------------------
27 
28 /** Creates a new pipe and returns the read & write end-points as newly
29  * allocated objects. */
31  CPipeReadEndPoint& outReadPipe, CPipeWriteEndPoint& outWritePipe)
32 {
33 #ifdef _WIN32
34  // Win32 pipes
35  HANDLE hRead, hWrite;
36  if (!CreatePipe(&hRead, &hWrite, nullptr, 0))
37  THROW_EXCEPTION("Win32 error creating pipe endpoints!");
38 
39  outReadPipe.m_pipe_file = hRead;
40  outWritePipe.m_pipe_file = hWrite;
41 #else
42  // UNIX pipes
43  int fds[2];
44  if (::pipe(fds)) THROW_EXCEPTION("Unix error creating pipe endpoints!");
45 
46  outReadPipe.m_pipe_file = fds[0];
47  outWritePipe.m_pipe_file = fds[1];
48 #endif
49 }
50 
51 // ------------------ CPipeBaseEndPoint ------------------
53 
55 // Close:
57 {
58  if (m_pipe_file)
59  {
60 #ifdef _WIN32
61  // Win32 pipes
62 
63  // Flush the pipe to allow the client to read the pipe's contents
64  // before disconnecting.
65  FlushFileBuffers((HANDLE)m_pipe_file);
66 
67  DisconnectNamedPipe((HANDLE)m_pipe_file);
68  CloseHandle((HANDLE)m_pipe_file);
69 #else
70  // UNIX pipes
71  ::fsync(m_pipe_file);
73 #endif
74  }
75  m_pipe_file = 0;
76 }
77 
78 /** De-serializes one end-point description, for example, from a parent process.
79  */
80 CPipeBaseEndPoint::CPipeBaseEndPoint(const std::string& serialized)
81 {
82  try
83  {
84 #ifdef _WIN32
85  uint64_t val = std::stoull(serialized);
86  m_pipe_file = reinterpret_cast<void*>(val);
87 #else
88  m_pipe_file = std::stoi(serialized);
89 #endif
90  }
91  catch (std::invalid_argument&)
92  {
93  THROW_EXCEPTION("Error parsing PIPE handle!");
94  }
95 }
96 
97 /** Converts the end-point into a string suitable for reconstruction at a child
98  * process.
99  * This *invalidates* this object, since only one real end-point can exist at
100  * once.
101  */
103 {
104  ASSERTMSG_(m_pipe_file != 0, "Pipe is closed, can't serialize!");
105 #ifdef _WIN32
106  return std::to_string(reinterpret_cast<uint64_t>(m_pipe_file));
107 #else
108  return std::to_string(m_pipe_file);
109 #endif
110 }
111 
112 // Methods that don't make sense in pipes:
113 uint64_t CPipeBaseEndPoint::Seek(int64_t, CStream::TSeekOrigin) { return 0; }
114 uint64_t CPipeBaseEndPoint::getTotalBytesCount() const { return 0; }
115 uint64_t CPipeBaseEndPoint::getPosition() const { return 0; }
116 /** Introduces a pure virtual method responsible for reading from the stream */
117 size_t CPipeBaseEndPoint::Read(void* Buffer, size_t Count)
118 {
119  ASSERTMSG_(m_pipe_file != 0, "Pipe is closed, can't read!");
120 
121 #if defined(_WIN32)
122  // Win32 pipes
123  DWORD nActuallyRead;
124  if (!ReadFile((HANDLE)m_pipe_file, Buffer, Count, &nActuallyRead, nullptr))
125  return 0;
126  else
127  return static_cast<size_t>(nActuallyRead);
128 #else
129  // UNIX pipes
131  {
132  // Read without timeout:
133  return ::read(m_pipe_file, Buffer, Count);
134  }
135  else
136  {
137  // Use timeouts:
138  size_t alreadyRead = 0;
139  bool timeoutExpired = false;
140 
141  struct timeval timeoutSelect
142  {
143  };
144  struct timeval* ptrTimeout{nullptr};
145 
146  // Init fd_set structure & add our socket to it:
147  fd_set read_fds;
148  FD_ZERO(&read_fds);
149  FD_SET(m_pipe_file, &read_fds);
150 
151  // Loop until timeout expires or the socket is closed.
152  while (alreadyRead < Count && !timeoutExpired)
153  {
154  // Use the "first" or "between" timeouts:
155  unsigned int curTimeout_us = alreadyRead == 0
158 
159  if (curTimeout_us == 0)
160  ptrTimeout = nullptr;
161  else
162  {
163  timeoutSelect.tv_sec = curTimeout_us / 1000000;
164  timeoutSelect.tv_usec = (curTimeout_us % 1000000);
165  ptrTimeout = &timeoutSelect;
166  }
167 
168  // Wait for received data
169  if (::select(
170  m_pipe_file + 1, // __nfds
171  &read_fds, // Wait for read
172  nullptr, // Wait for write
173  nullptr, // Wait for except.
174  ptrTimeout) // Timeout
175  != 1)
176  { // Timeout:
177  timeoutExpired = true;
178  }
179  else
180  {
181  // Compute remaining part:
182  const size_t remainToRead = Count - alreadyRead;
183 
184  // Receive bytes:
185  const size_t readNow = ::read(
186  m_pipe_file, reinterpret_cast<char*>(Buffer) + alreadyRead,
187  (int)remainToRead);
188 
189  if (readNow != static_cast<size_t>(-1))
190  {
191  // Accumulate the received length:
192  alreadyRead += readNow;
193  }
194  else
195  {
196  // Error:
197  this->close();
198  return alreadyRead;
199  }
200  if (readNow == 0 && remainToRead != 0)
201  {
202  // We had an event of data available, so if we have now a
203  // zero,
204  // the socket has been gracefully closed:
205  timeoutExpired = true;
206  close();
207  }
208  }
209  } // end while
210  return alreadyRead;
211  }
212 #endif
213 }
214 
215 /** Introduces a pure virtual method responsible for writing to the stream.
216  * Write attempts to write up to Count bytes to Buffer, and returns the
217  * number of bytes actually written. */
218 size_t CPipeBaseEndPoint::Write(const void* Buffer, size_t Count)
219 {
220  ASSERTMSG_(m_pipe_file != 0, "Pipe is closed, can't write!");
221 
222 #ifdef _WIN32
223  // Win32 pipes
224  DWORD nActuallyWritten;
225  if (!WriteFile(
226  (HANDLE)m_pipe_file, Buffer, Count, &nActuallyWritten, nullptr))
227  return 0;
228  else
229  return static_cast<size_t>(nActuallyWritten);
230 #else
231  // UNIX pipes
232  return ::write(m_pipe_file, Buffer, Count);
233 #endif
234 }
235 
236 // ------------- CPipeReadEndPoint -------------
238 CPipeReadEndPoint::CPipeReadEndPoint(const std::string& serialized)
239  : CPipeBaseEndPoint(serialized)
240 {
241 }
242 
243 // ------------- CPipeWriteEndPoint -------------
245 CPipeWriteEndPoint::CPipeWriteEndPoint(const std::string& serialized)
246  : CPipeBaseEndPoint(serialized)
247 {
248 }
TSeekOrigin
Used in CStream::Seek.
Definition: io/CStream.h:32
size_t Read(void *Buffer, size_t Count) override
Introduces a pure virtual method responsible for reading from the stream.
Definition: CPipe.cpp:117
std::string to_string(T v)
Just like std::to_string(), but with an overloaded version for std::string arguments.
Definition: format.h:36
#define THROW_EXCEPTION(msg)
Definition: exceptions.h:67
uint64_t getPosition() const override
Without effect in this class.
Definition: CPipe.cpp:115
void close()
Closes the pipe (normally not needed to be called by users, automatically done at destructor) ...
Definition: CPipe.cpp:56
size_t Write(const void *Buffer, size_t Count) override
Introduces a pure virtual method responsible for writing to the stream.
Definition: CPipe.cpp:218
std::string serialize()
Converts the end-point into a string suitable for reconstruction at a child process.
Definition: CPipe.cpp:102
uint64_t Seek(int64_t of, CStream::TSeekOrigin o=sFromBeginning) override
Without effect in this class.
Definition: CPipe.cpp:113
int val
Definition: mrpt_jpeglib.h:957
#define ASSERTMSG_(f, __ERROR_MSG)
Defines an assertion mechanism.
Definition: exceptions.h:108
The write end-point in a pipe created with mrpt::synch::CPipe.
Definition: CPipe.h:148
~CPipeBaseEndPoint() override
Definition: CPipe.cpp:54
unsigned int timeout_read_start_us
(Default=0) Timeout for read operations: microseconds (us) to wait for the first byte.
Definition: CPipe.h:86
uint64_t getTotalBytesCount() const override
Without effect in this class.
Definition: CPipe.cpp:114
The read end-point in a pipe created with mrpt::synch::CPipe.
Definition: CPipe.h:125
unsigned int timeout_read_between_us
(Default=0) Timeout between burst reads operations: microseconds (us) to wait between two partial rea...
Definition: CPipe.h:90
static void initializePipe(CPipeReadEndPoint &outReadPipe, CPipeWriteEndPoint &outWritePipe)
Creates a new pipe and returns the read & write end-points as newly allocated objects.
Definition: CPipe.cpp:30
Common interface of read & write pipe end-points.
Definition: CPipe.h:61



Page generated by Doxygen 1.8.14 for MRPT 1.9.9 Git: c7a3bec24 Sun Mar 29 18:33:13 2020 +0200 at dom mar 29 18:50:38 CEST 2020