Main MRPT website > C++ reference for MRPT 1.5.6
CPipe.cpp
Go to the documentation of this file.
1 /* +---------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | http://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2017, Individual contributors, see AUTHORS file |
6  | See: http://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See details in http://www.mrpt.org/License |
8  +---------------------------------------------------------------------------+ */
9 
10 #include "base-precomp.h" // Precompiled headers
11 #include <mrpt/utils/mrpt_inttypes.h> // For PRIu64
12 
13 #include <mrpt/synch/CPipe.h>
14 
15 #ifdef MRPT_OS_WINDOWS
16 # include <windows.h>
17 #else
18 # include <sys/types.h>
19 # include <unistd.h>
20 # include <stdio.h>
21 # include <stdlib.h>
22 #endif
23 
24 using namespace mrpt::utils;
25 using namespace mrpt::synch;
26 
27 // ------------------ CPipe ------------------
28 
29 /** Creates a new pipe and returns the read & write end-points as newly allocated objects. */
30 void CPipe::initializePipe(CPipeReadEndPoint& outReadPipe, CPipeWriteEndPoint& outWritePipe)
31 {
32 # ifdef MRPT_OS_WINDOWS
33  // Win32 pipes
34  HANDLE hRead, hWrite;
35  if (!CreatePipe(&hRead, &hWrite, NULL, 0))
36  THROW_EXCEPTION("Win32 error creating pipe endpoints!")
37 
38  outReadPipe.m_pipe_file = hRead;
39  outWritePipe.m_pipe_file = hWrite;
40 #else
41  // UNIX pipes
42  int fds[2];
43  if (::pipe(fds))
44  THROW_EXCEPTION("Unix error creating pipe endpoints!")
45 
46  outReadPipe.m_pipe_file = fds[0];
47  outWritePipe.m_pipe_file = fds[1];
48 #endif
49 }
50 
51 
52 // ------------------ CPipeBaseEndPoint ------------------
53 CPipeBaseEndPoint::CPipeBaseEndPoint() :
54  timeout_read_start_us(0),
55  timeout_read_between_us(0),
56  m_pipe_file(0)
57 {
58 }
59 
61 {
62  this->close();
63 }
64 
65 // Close:
67 {
68  if (m_pipe_file)
69  {
70 # ifdef MRPT_OS_WINDOWS
71  // Win32 pipes
72 
73  // Flush the pipe to allow the client to read the pipe's contents
74  // before disconnecting.
75  FlushFileBuffers((HANDLE)m_pipe_file);
76 
77  DisconnectNamedPipe((HANDLE)m_pipe_file);
78  CloseHandle((HANDLE)m_pipe_file);
79 #else
80  // UNIX pipes
81  ::fsync(m_pipe_file);
83 #endif
84  }
85  m_pipe_file = 0;
86 }
87 
88 /** De-serializes one end-point description, for example, from a parent process. */
90 {
91  std::istringstream ss;
92  ss.str(serialized);
93 
94 #ifdef MRPT_OS_WINDOWS
95  // Win32 pipes
96  uint64_t val;
97  if (!(ss >> val))
98  { THROW_EXCEPTION("Error parsing PIPE handle!") }
99  m_pipe_file = reinterpret_cast<void*>(val);
100 #else
101  // UNIX pipes
102  ss >> m_pipe_file;
103 #endif
104 }
105 
106 /** Converts the end-point into a string suitable for reconstruction at a child process.
107 * This *invalidates* this object, since only one real end-point can exist at once.
108 */
110 {
111  ASSERTMSG_(m_pipe_file!=0, "Pipe is closed, can't serialize!")
112  std::stringstream ss;
113 #ifdef MRPT_OS_WINDOWS
114  // Win32 pipes
115  ss << reinterpret_cast<uint64_t>(m_pipe_file);
116 #else
117  // UNIX pipes
118  ss << m_pipe_file;
119 #endif
120  m_pipe_file=0; // We don't own this file anymore...
121  return ss.str();
122 }
123 
124 // Methods that don't make sense in pipes:
128 
129 /** Introduces a pure virtual method responsible for reading from the stream */
130 size_t CPipeBaseEndPoint::Read(void *Buffer, size_t Count)
131 {
132  ASSERTMSG_(m_pipe_file!=0, "Pipe is closed, can't read!")
133 
134 #if defined(MRPT_OS_WINDOWS)
135  // Win32 pipes
136  DWORD nActuallyRead;
137  if (!ReadFile((HANDLE)m_pipe_file, Buffer, Count,&nActuallyRead, NULL ))
138  return 0;
139  else return static_cast<size_t>(nActuallyRead);
140 #else
141  // UNIX pipes
143  // Read without timeout:
144  return ::read(m_pipe_file,Buffer,Count);
145  }
146  else
147  {
148  // Use timeouts:
149  size_t alreadyRead = 0;
150  bool timeoutExpired = false;
151 
152  struct timeval timeoutSelect;
153  struct timeval *ptrTimeout;
154 
155  // Init fd_set structure & add our socket to it:
156  fd_set read_fds;
157  FD_ZERO(&read_fds);
158  FD_SET(m_pipe_file, &read_fds);
159 
160  // Loop until timeout expires or the socket is closed.
161  while ( alreadyRead<Count && !timeoutExpired )
162  {
163  // Use the "first" or "between" timeouts:
164  unsigned int curTimeout_us = alreadyRead==0 ? timeout_read_start_us : timeout_read_between_us;
165 
166  if (curTimeout_us==0)
167  ptrTimeout = NULL;
168  else
169  {
170  timeoutSelect.tv_sec = curTimeout_us / 1000000;
171  timeoutSelect.tv_usec = (curTimeout_us % 1000000);
172  ptrTimeout = &timeoutSelect;
173  }
174 
175  // Wait for received data
176  if (::select(
177  m_pipe_file+1, // __nfds
178  &read_fds, // Wait for read
179  NULL, // Wait for write
180  NULL, // Wait for except.
181  ptrTimeout) // Timeout
182  != 1)
183  { // Timeout:
184  timeoutExpired = true;
185  }
186  else
187  {
188  // Compute remaining part:
189  const size_t remainToRead = Count - alreadyRead;
190 
191  // Receive bytes:
192  const size_t readNow = ::read(m_pipe_file,((char*)Buffer) + alreadyRead, (int)remainToRead);
193 
194  if (readNow != static_cast<size_t>(-1))
195  {
196  // Accumulate the received length:
197  alreadyRead += readNow;
198  }
199  else
200  {
201  // Error:
202  this->close();
203  return alreadyRead;
204  }
205  if (readNow==0 && remainToRead!=0)
206  {
207  // We had an event of data available, so if we have now a zero,
208  // the socket has been gracefully closed:
209  timeoutExpired = true;
210  close();
211  }
212  }
213  } // end while
214  return alreadyRead;
215  }
216 #endif
217 }
218 
219 /** Introduces a pure virtual method responsible for writing to the stream.
220  * Write attempts to write up to Count bytes to Buffer, and returns the number of bytes actually written. */
221 size_t CPipeBaseEndPoint::Write(const void *Buffer, size_t Count)
222 {
223  ASSERTMSG_(m_pipe_file!=0, "Pipe is closed, can't write!")
224 
225 #ifdef MRPT_OS_WINDOWS
226  // Win32 pipes
227  DWORD nActuallyWritten;
228  if (!WriteFile((HANDLE)m_pipe_file, Buffer, Count,&nActuallyWritten, NULL ))
229  return 0;
230  else return static_cast<size_t>(nActuallyWritten);
231 #else
232  // UNIX pipes
233  return ::write(m_pipe_file,Buffer,Count);
234 #endif
235 }
236 
237 
238 // ------------- CPipeReadEndPoint -------------
240 {
241 }
242 
244 {
245 }
246 
247 // ------------- CPipeWriteEndPoint -------------
249 {
250 }
251 
253 {
254 }
Classes for serialization, sockets, ini-file manipulation, streams, list of properties-values, timewatch, extensions to STL.
Definition: zip.h:16
virtual uint64_t Seek(int64_t Offset, CStream::TSeekOrigin Origin=sFromBeginning) MRPT_OVERRIDE
Without effect in this class.
Definition: CPipe.cpp:125
TSeekOrigin
Used in CStream::Seek.
Definition: CStream.h:42
virtual uint64_t getTotalBytesCount() MRPT_OVERRIDE
Without effect in this class.
Definition: CPipe.cpp:126
#define THROW_EXCEPTION(msg)
std::string serialize()
Converts the end-point into a string suitable for reconstruction at a child process.
Definition: CPipe.cpp:109
The write end-point in a pipe created with mrpt::synch::CPipe.
Definition: CPipe.h:117
unsigned int timeout_read_start_us
(Default=0) Timeout for read operations: microseconds (us) to wait for the first byte. 0 means infinite timeout.
Definition: CPipe.h:77
__int64 int64_t
Definition: rptypes.h:51
The read end-point in a pipe created with mrpt::synch::CPipe.
Definition: CPipe.h:102
int val
Definition: mrpt_jpeglib.h:953
This namespace provides multitask, synchronization utilities.
Definition: atomic_incr.h:29
GLsizei const GLchar ** string
Definition: glext.h:3919
virtual size_t Read(void *Buffer, size_t Count) MRPT_OVERRIDE
Introduces a pure virtual method responsible for reading from the stream.
Definition: CPipe.cpp:130
unsigned __int64 uint64_t
Definition: rptypes.h:52
Common interface of read & write pipe end-points.
Definition: CPipe.h:60
virtual uint64_t getPosition() MRPT_OVERRIDE
Without effect in this class.
Definition: CPipe.cpp:127
void close()
Closes the pipe (normally not needed to be called by users, automatically done at destructor) ...
Definition: CPipe.cpp:66
unsigned int timeout_read_between_us
(Default=0) Timeout between burst reads operations: microseconds (us) to wait between two partial rea...
Definition: CPipe.h:78
virtual size_t Write(const void *Buffer, size_t Count) MRPT_OVERRIDE
Introduces a pure virtual method responsible for writing to the stream.
Definition: CPipe.cpp:221
#define ASSERTMSG_(f, __ERROR_MSG)



Page generated by Doxygen 1.8.14 for MRPT 1.5.6 Git: 4c65e8431 Tue Apr 24 08:18:17 2018 +0200 at lun oct 28 01:35:26 CET 2019