MRPT  1.9.9
CNTRIPClient.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | http://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2018, Individual contributors, see AUTHORS file |
6  | See: http://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See details in http://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "hwdrivers-precomp.h" // Precompiled headers
11 
14 #include <mrpt/comms/net_utils.h>
15 #include <mrpt/math/wrap2pi.h>
16 #include <mrpt/core/format.h>
18 #include <mrpt/core/bits_math.h>
19 #include <iostream>
20 
21 using namespace mrpt;
22 using namespace mrpt::comms;
23 using namespace mrpt::system;
24 using namespace mrpt::hwdrivers;
25 using namespace mrpt::math;
26 using namespace std;
27 
28 /* --------------------------------------------------------
29  CNTRIPClient
30  -------------------------------------------------------- */
31 CNTRIPClient::CNTRIPClient()
32  : m_thread(),
33  m_thread_exit(false),
34  m_thread_do_process(false),
35  m_waiting_answer_connection(false),
36  m_answer_connection(connError),
37  m_args()
38 {
39  m_thread = std::thread(&CNTRIPClient::private_ntrip_thread, this);
40 }
41 
42 /* --------------------------------------------------------
43  ~CNTRIPClient
44  -------------------------------------------------------- */
46 {
47  this->close();
48  if (m_thread.joinable())
49  {
50  m_thread_exit = true;
51  m_thread.join();
52  }
53 }
54 
55 /* --------------------------------------------------------
56  close
57  -------------------------------------------------------- */
59 {
61  if (!m_thread_do_process) return;
62  m_thread_do_process = false;
63  m_sem_sock_closed.get_future().wait_for(500ms);
64 }
65 
66 /* --------------------------------------------------------
67  open
68  -------------------------------------------------------- */
69 bool CNTRIPClient::open(const NTRIPArgs& params, string& out_errmsg)
70 {
71  this->close();
72 
73  if (params.mountpoint.empty())
74  {
75  out_errmsg = "MOUNTPOINT cannot be empty.";
76  return false;
77  }
78  if (params.server.empty())
79  {
80  out_errmsg = "Server address cannot be empty.";
81  return false;
82  }
83 
84  // Try to open it:
87  out_errmsg.clear();
88 
89  m_args = params;
90  m_thread_do_process = true;
91 
92  // Wait until the thread tell us the initial result...
93  if (m_sem_first_connect_done.get_future().wait_for(6s) ==
94  std::future_status::timeout)
95  {
96  out_errmsg = "Timeout waiting thread response";
97  return false;
98  }
99 
100  switch (m_answer_connection)
101  {
102  case connOk:
103  return true;
104  case connError:
105  out_errmsg = format(
106  "Error trying to connect to server '%s'",
107  params.server.c_str());
108  return false;
109  case connUnauthorized:
110  out_errmsg = format(
111  "Authentication failed for server '%s'", params.server.c_str());
112  return false;
113 
114  default:
115  out_errmsg = "UNKNOWN m_answer_connection!!";
116  return false;
117  }
118 }
119 
120 /* --------------------------------------------------------
121  THE WORKING THREAD
122  -------------------------------------------------------- */
124 {
125  try
126  {
127  CClientTCPSocket my_sock;
128 
129  bool last_thread_do_process = m_thread_do_process;
130 
131  while (!m_thread_exit)
132  {
133  if (!m_thread_do_process)
134  {
135  if (my_sock.isConnected())
136  {
137  // Close connection:
138  try
139  {
140  my_sock.close();
141  }
142  catch (...)
143  {
144  }
145  }
146  else
147  {
148  // Nothing to be done... just wait
149  }
150 
151  if (last_thread_do_process) // Let the waiting caller continue
152  // now.
153  m_sem_sock_closed.set_value();
154 
155  last_thread_do_process = m_thread_do_process;
156  std::this_thread::sleep_for(100ms);
157  continue;
158  }
159 
160  last_thread_do_process = m_thread_do_process;
161 
162  // We have a mission to do here... is the channel already open??
163 
164  if (!my_sock.isConnected())
165  {
166  TConnResult connect_res = connError;
167 
168  std::vector<uint8_t> buf;
169  try
170  {
171  // Nope, it's the first time: get params and try open the
172  // connection:
173  stream_data.clear();
174 
175  cout << format(
176  "[CNTRIPClient] Trying to connect to %s:%i\n",
177  m_args.server.c_str(), m_args.port);
178 
179  my_sock.connect(m_args.server, m_args.port);
180  if (m_thread_exit) break;
181 
182  // Prepare HTTP request:
183  // -------------------------------------------
184  string req = format(
185  "GET /%s HTTP/1.0\r\n", m_args.mountpoint.c_str());
186 
187  if (isalpha(m_args.server[0]))
188  req += format("Host: %s\r\n", m_args.server.c_str());
189 
190  req += "User-Agent: NTRIP MRPT Library\r\n";
191  req += "Accept: */*\r\n";
192  req += "Connection: close\r\n";
193 
194  // Implement HTTP Basic authentication:
195  // See:
196  // http://en.wikipedia.org/wiki/Basic_access_authentication
197  if (!m_args.user.empty())
198  {
199  string auth_str =
200  m_args.user + string(":") + m_args.password;
201  std::vector<uint8_t> v(auth_str.size());
202  ::memcpy(&v[0], &auth_str[0], auth_str.size());
203 
204  string encoded_str;
205  mrpt::system::encodeBase64(v, encoded_str);
206 
207  req += "Authorization: Basic ";
208  req += encoded_str;
209  req += "\r\n";
210  }
211 
212  // End:
213  req += "\r\n";
214  // cout << req;
215 
216  // Send:
217  my_sock.sendString(req);
218 
219  // Try to read the header of the response:
220  size_t to_read_now = 30;
221  buf.resize(to_read_now);
222  size_t len =
223  my_sock.readAsync(&buf[0], to_read_now, 4000, 200);
224 
225  buf.resize(len);
226 
227  if ((len != 0) && my_sock.isConnected())
228  connect_res = connOk;
229  }
230  catch (std::exception&)
231  {
232  // cout << e.what() << endl;
233  connect_res = connError;
234  }
235 
236  // We are not disconnected yet, it's a good thing... anyway,
237  // check the answer code:
238  if (!buf.empty())
239  {
240  string resp;
241  resp.resize(buf.size());
242  ::memcpy(&resp[0], &buf[0], buf.size());
243 
244  if (resp.find(" 200 ") == string::npos)
245  {
246  // It's NOT a good response...
247  connect_res = connError;
248 
249  // 401?
250  if (resp.find(" 401 ") != string::npos)
251  connect_res = connUnauthorized;
252  }
253  }
254 
255  // Signal my caller that the connection is established:
256  // ---------------------------------------------------------------
258  {
260 
261  m_answer_connection = connect_res;
262  m_sem_first_connect_done.set_value();
263  }
264 
265  if (connect_res != connOk) my_sock.close();
266  }
267 
268  // Retry if it was a failed connection.
269  if (!my_sock.isConnected())
270  {
271  std::this_thread::sleep_for(500ms);
272  continue;
273  }
274 
275  // Read data from the stream and accumulate it in a buffer:
276  // ----------------------------------------------------------------------
277  std::vector<uint8_t> buf;
278  size_t to_read_now = 1000;
279  buf.resize(to_read_now);
280  size_t len = my_sock.readAsync(&buf[0], to_read_now, 10, 5);
281 
282  buf.resize(len);
283 
284  if (my_sock.isConnected())
285  {
286  // Send data to main buffer:
287  if (stream_data.size() > 1024 * 8)
288  stream_data.clear(); // It seems nobody's reading it...
289 
290  stream_data.appendData(buf);
291  buf.clear();
292  }
293 
294  // Send back data to the server, if so requested:
295  // ------------------------------------------
296  std::vector<uint8_t> upload_data;
297  m_upload_data.readAndClear(upload_data);
298  if (!upload_data.empty())
299  {
300  const size_t N = upload_data.size();
301  const size_t nWritten =
302  my_sock.writeAsync(&upload_data[0], N, 1000);
303  if (nWritten != N)
304  cerr << "*ERROR*: Couldn't write back " << N
305  << " bytes to NTRIP server!.\n";
306  }
307 
308  std::this_thread::sleep_for(10ms);
309  } // end while
310 
311  } // end try
312  catch (exception& e)
313  {
314  cerr << "[CNTRIPClient] Exception in working thread: " << endl
315  << e.what() << endl;
316  }
317  catch (...)
318  {
319  cerr << "[CNTRIPClient] Runtime exception in working thread." << endl;
320  }
321 
322 } // end working thread
323 
324 /* --------------------------------------------------------
325  retrieveListOfMountpoints
326  -------------------------------------------------------- */
328  TListMountPoints& out_list, string& out_errmsg, const string& server,
329  int port, const string& auth_user, const string& auth_pass)
330 {
331  string content;
332  int http_code;
333  TParameters<string> my_headers;
334 
335  out_list.clear();
336 
338  string("http://") + server, content, out_errmsg, port, auth_user,
339  auth_pass, &http_code, &my_headers, nullptr, 6000);
340 
341  // Parse contents:
342  if (ret != net::erOk) return false;
343 
344  std::stringstream ss(content);
345  string lin;
346  while (std::getline(ss, lin, '\n'))
347  {
348  if (lin.size() < 5) continue;
349  if (0 != ::strncmp("STR;", lin.c_str(), 4)) continue;
350 
351  // ok, it's a stream:
352  deque<string> fields;
353  mrpt::system::tokenize(lin, ";", fields);
354 
355  if (fields.size() < 13) continue;
356 
357  TMountPoint mnt;
358 
359  mnt.mountpoint_name = fields[1];
360  mnt.id = fields[2];
361  mnt.format = fields[3];
362  mnt.format_details = fields[4];
363  mnt.carrier = atoi(fields[5].c_str());
364  mnt.nav_system = fields[6];
365  mnt.network = fields[7];
366  mnt.country_code = fields[8];
367  mnt.latitude = atof(fields[9].c_str());
368  mnt.longitude = atof(fields[10].c_str());
369 
370  // Longitude in range: -180,180
372 
373  mnt.needs_nmea = atoi(fields[11].c_str()) != 0;
374  mnt.net_ref_stations = atoi(fields[12].c_str()) != 0;
375 
376  if (fields.size() >= 19) mnt.extra_info = fields[18];
377 
378  out_list.push_back(mnt);
379  }
380 
381  return true;
382 }
383 
384 /** Enqueues a string to be sent back to the NTRIP server (e.g. GGA frames) */
386 {
387  if (data.empty()) return;
388 
389  std::vector<uint8_t> d(data.size());
390  ::memcpy(&d[0], &data[0], data.size());
392 }
A TCP socket that can be connected to a TCP server, implementing MRPT's CStream interface for passing...
void sendString(const std::string &str)
Writes a string to the socket.
bool isConnected()
Returns true if this objects represents a successfully connected socket.
void connect(const std::string &remotePartAddress, unsigned short remotePartTCPPort, unsigned int timeout_ms=0)
Establishes a connection with a remote part.
size_t readAsync(void *Buffer, const size_t Count, const int timeoutStart_ms=-1, const int timeoutBetween_ms=-1)
A method for reading from the socket with an optional timeout.
void close()
Closes the connection.
size_t writeAsync(const void *Buffer, const size_t Count, const int timeout_ms=-1)
A method for writing to the socket with optional timeouts.
void appendData(const std::vector< uint8_t > &d)
Append new data to the stream.
Definition: MT_buffer.h:52
size_t size()
Return the number of available bytes at this moment.
Definition: MT_buffer.h:42
void readAndClear(std::vector< uint8_t > &d)
Read the whole buffer and empty it.
Definition: MT_buffer.h:60
void clear()
Empty the buffer.
Definition: MT_buffer.h:34
std::promise< void > m_sem_first_connect_done
Definition: CNTRIPClient.h:115
bool open(const NTRIPArgs &params, std::string &out_errmsg)
Tries to open a given NTRIP stream and, if successful, launches a thread for continuously reading fro...
void close()
Closes the connection.
mrpt::containers::MT_buffer m_upload_data
Buffer for data to be sent back to the server.
Definition: CNTRIPClient.h:134
NTRIPArgs m_args
All the parameters for the NTRIP connection.
Definition: CNTRIPClient.h:131
void private_ntrip_thread()
The working thread.
mrpt::containers::MT_buffer stream_data
The buffer with all the bytes so-far read from the NTRIP server stream.
Definition: CNTRIPClient.h:161
std::promise< void > m_sem_sock_closed
Definition: CNTRIPClient.h:114
void sendBackToServer(const std::string &data)
Enqueues a string to be sent back to the NTRIP server (e.g.
static bool retrieveListOfMountpoints(TListMountPoints &out_list, std::string &out_errmsg, const std::string &server, int port=2101, const std::string &auth_user=std::string(), const std::string &auth_pass=std::string())
Connect to a given NTRIP caster and get the list of all available mountpoints and their parameters.
bool m_thread_do_process
Will be "true" between "open" and "close".
Definition: CNTRIPClient.h:119
virtual ~CNTRIPClient()
Default destructor.
std::list< TMountPoint > TListMountPoints
Used in CNTRIPClient::retrieveListOfMountpoints.
Definition: CNTRIPClient.h:85
const GLdouble * v
Definition: glext.h:3678
GLsizei GLsizei GLenum GLenum const GLvoid * data
Definition: glext.h:3547
GLenum GLsizei len
Definition: glext.h:4712
GLdouble s
Definition: glext.h:3676
GLenum const GLfloat * params
Definition: glext.h:3534
GLsizei const GLchar ** string
Definition: glext.h:4101
T wrapToPi(T a)
Modifies the given angle to translate it into the ]-pi,pi] range.
Definition: wrap2pi.h:51
ERRORCODE_HTTP http_get(const string &url, std::vector< uint8_t > &out_content, string &out_errormsg, int port=80, const string &auth_user=string(), const string &auth_pass=string(), int *out_http_responsecode=nullptr, mrpt::system::TParameters< string > *extra_headers=nullptr, mrpt::system::TParameters< string > *out_headers=nullptr, int timeout_ms=1000)
Perform an HTTP GET operation (version for retrieving the data as a std::vector<uint8_t>)
Definition: net_utils.cpp:386
ERRORCODE_HTTP
Possible returns from a HTTP request.
Definition: net_utils.h:32
std::string format(const char *fmt,...) MRPT_printf_format_check(1
A std::string version of C sprintf.
Definition: format.cpp:16
void memcpy(void *dest, size_t destSize, const void *src, size_t copyCount) noexcept
An OS and compiler independent version of "memcpy".
Definition: os.cpp:356
void encodeBase64(const std::vector< uint8_t > &inputData, std::string &outString)
Encode a sequence of bytes as a string in base-64.
Definition: base64.cpp:29
void tokenize(const std::string &inString, const std::string &inDelimiters, OUT_CONTAINER &outTokens, bool skipBlankTokens=true) noexcept
Tokenizes a string according to a set of delimiting characters.
Serial and networking devices and utilities.
Contains classes for various device interfaces.
This base provides a set of functions for maths stuff.
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
double RAD2DEG(const double x)
Radians to degrees.
double DEG2RAD(const double x)
Degrees to radians.
The arguments for connecting to a NTRIP stream, used in CNTRIPClient::open.
Definition: CNTRIPClient.h:91
A descriptor of one stream in an NTRIP Caster - See CNTRIPClient::retrieveListOfMountpoints.
Definition: CNTRIPClient.h:44
int carrier
0: No carrier phase, 1: L1, 2: L1+L2
Definition: CNTRIPClient.h:52
std::string country_code
ITA, ESP, DEU,...
Definition: CNTRIPClient.h:58
std::string format
RTCM 2.3, RTCM 3, CMR+, etc...
Definition: CNTRIPClient.h:49
For usage when passing a dynamic number of (numeric) arguments to a function, by name.
Definition: TParameters.h:55



Page generated by Doxygen 1.9.1 for MRPT 1.9.9 Git: 814d80880 Fri Aug 24 01:51:28 2018 +0200 at mar 26 may 2026 12:30:59 CEST