Main MRPT website > C++ reference for MRPT 1.9.9
CNationalInstrumentsDAQ.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | http://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2018, Individual contributors, see AUTHORS file |
6  | See: http://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See details in http://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "hwdrivers-precomp.h" // Precompiled headers
11 
13 #include <iterator> // advance()
14 #include <iostream>
16 
17 // If we have both, DAQmx & DAQmxBase, prefer DAQmx:
18 #define MRPT_HAS_SOME_NIDAQMX (MRPT_HAS_NIDAQMXBASE || MRPT_HAS_NIDAQMX)
19 
20 #define MRPT_USE_NIDAQMXBASE (MRPT_HAS_NIDAQMXBASE && !MRPT_HAS_NIDAQMX)
21 #define MRPT_USE_NIDAQMX (MRPT_HAS_NIDAQMX)
22 
23 #if MRPT_USE_NIDAQMXBASE
24 #include "NIDAQmxBase.h" // Include file for NI-DAQmx Base API
25 #endif
26 #if MRPT_USE_NIDAQMX
27 #include "NIDAQmx.h" // Include file for NI-DAQmx API
28 #endif
29 
30 // Macros to use either DAQmx or DAQmx Base automatically, depending on the
31 // installed libraries:
32 #if MRPT_USE_NIDAQMXBASE
33 #define MRPT_DAQmxGetExtendedErrorInfo DAQmxBaseGetExtendedErrorInfo
34 #define MRPT_DAQmxCreateTask DAQmxBaseCreateTask
35 #define MRPT_DAQmxCreateAIVoltageChan DAQmxBaseCreateAIVoltageChan
36 #define MRPT_DAQmxCreateAOVoltageChan DAQmxBaseCreateAOVoltageChan
37 #define MRPT_DAQmxCreateDIChan DAQmxBaseCreateDIChan
38 #define MRPT_DAQmxCreateDOChan DAQmxBaseCreateDOChan
39 #define MRPT_DAQmxCreateCIPeriodChan DAQmxBaseCreateCIPeriodChan
40 #define MRPT_DAQmxCreateCICountEdgesChan DAQmxBaseCreateCICountEdgesChan
41 #define MRPT_DAQmxCreateCIPulseWidthChan DAQmxBaseCreateCIPulseWidthChan
42 #define MRPT_DAQmxCreateCILinEncoderChan DAQmxBaseCreateCILinEncoderChan
43 #define MRPT_DAQmxCreateCIAngEncoderChan DAQmxBaseCreateCIAngEncoderChan
44 #define MRPT_DAQmxCreateCOPulseChanFreq DAQmxBaseCreateCOPulseChanFreq
45 #define MRPT_DAQmxCfgSampClkTiming DAQmxBaseCfgSampClkTiming
46 #define MRPT_DAQmxCfgInputBuffer DAQmxBaseCfgInputBuffer
47 #define MRPT_DAQmxCfgOutputBuffer DAQmxBaseCfgOutputBuffer
48 #define MRPT_DAQmxStartTask DAQmxBaseStartTask
49 #define MRPT_DAQmxStopTask DAQmxBaseStopTask
50 #define MRPT_DAQmxClearTask DAQmxBaseClearTask
51 #define MRPT_DAQmxReadAnalogF64 DAQmxBaseReadAnalogF64
52 #define MRPT_DAQmxReadCounterF64 DAQmxBaseReadCounterF64
53 #define MRPT_DAQmxReadDigitalU8 DAQmxBaseReadDigitalU8
54 #define MRPT_DAQmxWriteAnalogF64 DAQmxBaseWriteAnalogF64
55 #define MRPT_DAQmxWriteDigitalU32 DAQmxBaseWriteDigitalU32
56 #define MRPT_DAQmxWriteDigitalLines DAQmxBaseWriteDigitalLines
57 #else
58 #define MRPT_DAQmxGetExtendedErrorInfo DAQmxGetExtendedErrorInfo
59 #define MRPT_DAQmxCreateTask DAQmxCreateTask
60 #define MRPT_DAQmxCreateAIVoltageChan DAQmxCreateAIVoltageChan
61 #define MRPT_DAQmxCreateAOVoltageChan DAQmxCreateAOVoltageChan
62 #define MRPT_DAQmxCreateDIChan DAQmxCreateDIChan
63 #define MRPT_DAQmxCreateDOChan DAQmxCreateDOChan
64 #define MRPT_DAQmxCreateCIPeriodChan DAQmxCreateCIPeriodChan
65 #define MRPT_DAQmxCreateCICountEdgesChan DAQmxCreateCICountEdgesChan
66 #define MRPT_DAQmxCreateCIPulseWidthChan DAQmxCreateCIPulseWidthChan
67 #define MRPT_DAQmxCreateCILinEncoderChan DAQmxCreateCILinEncoderChan
68 #define MRPT_DAQmxCreateCIAngEncoderChan DAQmxCreateCIAngEncoderChan
69 #define MRPT_DAQmxCreateCOPulseChanFreq DAQmxCreateCOPulseChanFreq
70 #define MRPT_DAQmxCfgSampClkTiming DAQmxCfgSampClkTiming
71 #define MRPT_DAQmxCfgInputBuffer DAQmxCfgInputBuffer
72 #define MRPT_DAQmxCfgOutputBuffer DAQmxCfgOutputBuffer
73 #define MRPT_DAQmxStartTask DAQmxStartTask
74 #define MRPT_DAQmxStopTask DAQmxStopTask
75 #define MRPT_DAQmxClearTask DAQmxClearTask
76 #define MRPT_DAQmxReadAnalogF64 DAQmxReadAnalogF64
77 #define MRPT_DAQmxReadCounterF64 DAQmxReadCounterF64
78 #define MRPT_DAQmxReadDigitalU8 DAQmxReadDigitalU8
79 #define MRPT_DAQmxWriteAnalogF64 DAQmxWriteAnalogF64
80 #define MRPT_DAQmxWriteDigitalU32 DAQmxWriteDigitalU32
81 #define MRPT_DAQmxWriteDigitalLines DAQmxWriteDigitalLines
82 #endif
83 
84 // An auxiliary macro to check and report errors in the DAQmx library as
85 // exceptions with a well-explained message.
86 #define MRPT_DAQmx_ErrChk(functionCall) \
87  if ((functionCall) < 0) \
88  { \
89  char errBuff[2048]; \
90  MRPT_DAQmxGetExtendedErrorInfo(errBuff, 2048); \
91  std::string sErr = mrpt::format( \
92  "DAQ error: '%s'\nCalling: '%s'", errBuff, #functionCall); \
93  THROW_EXCEPTION(sErr); \
94  }
95 
96 using namespace mrpt::hwdrivers;
97 using namespace mrpt::obs;
98 using namespace mrpt::system;
99 using namespace std;
100 
102 
103 // ------------- CNationalInstrumentsDAQ::TInfoPerTask -----------
104 // Default ctor:
106  : taskHandle(0),
107  must_close(false),
108  is_closed(false),
109  new_obs_available(0),
110  task()
111 {
112 }
113 
114 /* -----------------------------------------------------
115  Constructor
116  ----------------------------------------------------- */
118  : mrpt::system::COutputLogger("CNationalInstrumentsDAQ")
119 {
120  m_sensorLabel = "NIDAQ";
121 }
122 
123 // Just like "MRPT_LOAD_HERE_CONFIG_VAR" but...
124 #define MY_LOAD_HERE_CONFIG_VAR( \
125  variableName, variableType, targetVariable, configFileObject, \
126  sectionNameStr) \
127  targetVariable = configFileObject.read_##variableType( \
128  sectionNameStr, variableName, targetVariable, false);
129 
130 #define MY_LOAD_HERE_CONFIG_VAR_NO_DEFAULT( \
131  variableName, variableType, targetVariable, configFileObject, \
132  sectionNameStr) \
133  { \
134  try \
135  { \
136  targetVariable = configFileObject.read_##variableType( \
137  sectionNameStr, variableName, targetVariable, true); \
138  } \
139  catch (std::exception&) \
140  { \
141  THROW_EXCEPTION( \
142  format( \
143  "Value for '%s' not found in config file", \
144  std::string(variableName).c_str())); \
145  } \
146  }
147 
148 /* -----------------------------------------------------
149  loadConfig_sensorSpecific
150  ----------------------------------------------------- */
152  const mrpt::config::CConfigFileBase& cfg, const std::string& sect)
153 {
154  // std::vector<TaskDescription> task_definitions;
155  task_definitions.clear();
156 
157  const unsigned int nTasks = cfg.read_uint64_t(sect, "num_tasks", 0, true);
158  if (!nTasks)
159  {
160  std::cerr << "[CNationalInstrumentsDAQ] Warning: Number of tasks is "
161  "zero. No datalogging will be done.\n";
162  }
163 
164  task_definitions.resize(nTasks);
165  for (unsigned int i = 0; i < nTasks; i++)
166  {
168  const string sTask = mrpt::format("task%u", i);
169 
170  // Read general settings for this task:
171  // ---------------------------------------
172  const string sChanns =
173  cfg.read_string(sect, sTask + string(".channels"), "", true);
174  vector<string> lstStrChanns;
175  mrpt::system::tokenize(sChanns, " \t,", lstStrChanns);
176  if (lstStrChanns.empty())
177  THROW_EXCEPTION_FMT("List of channels for task %u is empty!", i)
178 
180  sTask + string(".samplesPerSecond"), double, t.samplesPerSecond,
181  cfg, sect)
183  sTask + string(".samplesPerChannelToRead"), double,
184  t.samplesPerChannelToRead, cfg, sect)
186  sTask + string(".sampleClkSource"), string, t.sampleClkSource, cfg,
187  sect)
189  sTask + string(".bufferSamplesPerChannel"), double,
190  t.bufferSamplesPerChannel, cfg, sect)
191  t.taskLabel =
192  cfg.read_string(sect, sTask + string(".taskLabel"), sTask, false);
193 
194  for (size_t j = 0; j < lstStrChanns.size(); j++)
195  {
196  if (strCmpI(lstStrChanns[j], "ai"))
197  {
198  t.has_ai = true;
200  sTask + string(".ai.physicalChannel"), string,
201  t.ai.physicalChannel, cfg, sect)
203  sTask + string(".ai.physicalChannelCount"), uint64_t,
204  t.ai.physicalChannelCount, cfg, sect)
206  sTask + string(".ai.terminalConfig"), string,
207  t.ai.terminalConfig, cfg, sect)
209  sTask + string(".ai.minVal"), double, t.ai.minVal, cfg,
210  sect)
212  sTask + string(".ai.maxVal"), double, t.ai.maxVal, cfg,
213  sect)
214  }
215  else if (strCmpI(lstStrChanns[j], "ao"))
216  {
217  t.has_ao = true;
219  sTask + string(".ao.physicalChannel"), string,
220  t.ao.physicalChannel, cfg, sect)
222  sTask + string(".ao.physicalChannelCount"), uint64_t,
223  t.ao.physicalChannelCount, cfg, sect)
225  sTask + string(".ao.minVal"), double, t.ao.minVal, cfg,
226  sect)
228  sTask + string(".ao.maxVal"), double, t.ao.maxVal, cfg,
229  sect)
230  }
231  else if (strCmpI(lstStrChanns[j], "di"))
232  {
233  t.has_di = true;
235  sTask + string(".di.line"), string, t.di.line, cfg, sect)
236  }
237  else if (strCmpI(lstStrChanns[j], "do"))
238  {
239  t.has_do = true;
241  sTask + string(".do.line"), string, t.douts.line, cfg, sect)
242  }
243  else if (strCmpI(lstStrChanns[j], "ci_period"))
244  {
245  t.has_ci_period = true;
247  sTask + string(".ci_period.counter"), string,
248  t.ci_period.counter, cfg, sect)
250  sTask + string(".ci_period.minVal"), double,
251  t.ci_period.minVal, cfg, sect)
253  sTask + string(".ci_period.maxVal"), double,
254  t.ci_period.maxVal, cfg, sect)
256  sTask + string(".ci_period.units"), string,
257  t.ci_period.units, cfg, sect)
259  sTask + string(".ci_period.edge"), string, t.ci_period.edge,
260  cfg, sect)
262  sTask + string(".ci_period.measTime"), double,
263  t.ci_period.measTime, cfg, sect)
265  sTask + string(".ci_period.divisor"), int,
266  t.ci_period.divisor, cfg, sect)
267  }
268  else if (strCmpI(lstStrChanns[j], "ci_count_edges"))
269  {
270  t.has_ci_count_edges = true;
272  sTask + string(".ci_count_edges.counter"), string,
273  t.ci_count_edges.counter, cfg, sect)
275  sTask + string(".ci_count_edges.edge"), string,
276  t.ci_count_edges.edge, cfg, sect)
278  sTask + string(".ci_count_edges.initialCount"), int,
279  t.ci_count_edges.initialCount, cfg, sect)
281  sTask + string(".ci_count_edges.countDirection"), string,
282  t.ci_count_edges.countDirection, cfg, sect)
283  }
284  else if (strCmpI(lstStrChanns[j], "ci_pulse_width"))
285  {
286  t.has_ci_pulse_width = true;
288  sTask + string(".ci_pulse_width.counter"), string,
289  t.ci_pulse_width.counter, cfg, sect)
291  sTask + string(".ci_pulse_width.minVal"), double,
292  t.ci_pulse_width.minVal, cfg, sect)
294  sTask + string(".ci_pulse_width.maxVal"), double,
295  t.ci_pulse_width.maxVal, cfg, sect)
297  sTask + string(".ci_pulse_width.units"), string,
298  t.ci_pulse_width.units, cfg, sect)
300  sTask + string(".ci_pulse_width.startingEdge"), string,
301  t.ci_pulse_width.startingEdge, cfg, sect)
302  }
303  else if (strCmpI(lstStrChanns[j], "ci_lin_encoder"))
304  {
305  t.has_ci_lin_encoder = true;
307  sTask + string(".ci_lin_encoder.counter"), string,
308  t.ci_lin_encoder.counter, cfg, sect)
310  sTask + string(".ci_lin_encoder.decodingType"), string,
311  t.ci_lin_encoder.decodingType, cfg, sect)
313  sTask + string(".ci_lin_encoder.ZidxEnable"), bool,
314  t.ci_lin_encoder.ZidxEnable, cfg, sect)
316  sTask + string(".ci_lin_encoder.ZidxVal"), double,
317  t.ci_lin_encoder.ZidxVal, cfg, sect)
319  sTask + string(".ci_lin_encoder.ZidxPhase"), string,
320  t.ci_lin_encoder.ZidxPhase, cfg, sect)
322  sTask + string(".ci_lin_encoder.units"), string,
323  t.ci_lin_encoder.units, cfg, sect)
325  sTask + string(".ci_lin_encoder.distPerPulse"), double,
326  t.ci_lin_encoder.distPerPulse, cfg, sect)
328  sTask + string(".ci_lin_encoder.initialPos"), double,
329  t.ci_lin_encoder.initialPos, cfg, sect)
330  }
331  else if (strCmpI(lstStrChanns[j], "ci_ang_encoder"))
332  {
333  t.has_ci_ang_encoder = true;
335  sTask + string(".ci_ang_encoder.counter"), string,
336  t.ci_ang_encoder.counter, cfg, sect)
338  sTask + string(".ci_ang_encoder.decodingType"), string,
339  t.ci_ang_encoder.decodingType, cfg, sect)
341  sTask + string(".ci_ang_encoder.ZidxEnable"), bool,
342  t.ci_ang_encoder.ZidxEnable, cfg, sect)
344  sTask + string(".ci_ang_encoder.ZidxVal"), double,
345  t.ci_ang_encoder.ZidxVal, cfg, sect)
347  sTask + string(".ci_ang_encoder.ZidxPhase"), string,
348  t.ci_ang_encoder.ZidxPhase, cfg, sect)
350  sTask + string(".ci_ang_encoder.units"), string,
351  t.ci_ang_encoder.units, cfg, sect)
353  sTask + string(".ci_ang_encoder.pulsesPerRev"), int,
354  t.ci_ang_encoder.pulsesPerRev, cfg, sect)
356  sTask + string(".ci_ang_encoder.initialAngle"), double,
357  t.ci_ang_encoder.initialAngle, cfg, sect)
359  sTask + string(".ci_ang_encoder.decimate"), int,
360  t.ci_ang_encoder.decimate, cfg, sect)
361  }
362  else if (strCmpI(lstStrChanns[j], "co_pulses"))
363  {
364  t.has_co_pulses = true;
366  sTask + string(".co_pulses.counter"), string,
367  t.co_pulses.counter, cfg, sect)
369  sTask + string(".co_pulses.idleState"), string,
370  t.co_pulses.idleState, cfg, sect)
372  sTask + string(".co_pulses.initialDelay"), double,
373  t.co_pulses.initialDelay, cfg, sect)
375  sTask + string(".co_pulses.freq"), double, t.co_pulses.freq,
376  cfg, sect)
378  sTask + string(".co_pulses.dutyCycle"), double,
379  t.co_pulses.dutyCycle, cfg, sect)
380  }
381  else
382  {
384  "Unknown channel type '%s'! See the docs of "
385  "CNationalInstrumentsDAQ",
386  lstStrChanns[j].c_str())
387  }
388  } // end for each "k" channel in channel "i"
389  } // end for "i", each task
390 }
391 
392 /* -----------------------------------------------------
393  Destructor
394  ----------------------------------------------------- */
396 #if MRPT_HAS_SOME_NIDAQMX
397 // Declare a table to convert strings to their DAQmx #define values:
398 struct daqmx_str_val
399 {
400  const char* str;
401  int val;
402 };
403 
404 const daqmx_str_val daqmx_vals[] = {
405  {"DAQmx_Val_Cfg_Default", DAQmx_Val_Cfg_Default},
406  {"DAQmx_Val_RSE", DAQmx_Val_RSE},
407  {"DAQmx_Val_NRSE", DAQmx_Val_NRSE},
408  {"DAQmx_Val_Diff", DAQmx_Val_Diff},
409  {"DAQmx_Val_Seconds", DAQmx_Val_Seconds},
410  {"DAQmx_Val_Rising", DAQmx_Val_Rising},
411  {"DAQmx_Val_Falling", DAQmx_Val_Falling},
412  {"DAQmx_Val_CountUp", DAQmx_Val_CountUp},
413  {"DAQmx_Val_CountDown", DAQmx_Val_CountDown},
414  {"DAQmx_Val_ExtControlled", DAQmx_Val_ExtControlled},
415  {"DAQmx_Val_AHighBHigh", DAQmx_Val_AHighBHigh},
416  {"DAQmx_Val_AHighBLow", DAQmx_Val_AHighBLow},
417  {"DAQmx_Val_ALowBHigh", DAQmx_Val_ALowBHigh},
418  {"DAQmx_Val_ALowBLow", DAQmx_Val_ALowBLow},
419  {"DAQmx_Val_X1", DAQmx_Val_X1},
420  {"DAQmx_Val_X2", DAQmx_Val_X2},
421  {"DAQmx_Val_X4", DAQmx_Val_X4},
422  {"DAQmx_Val_Meters", DAQmx_Val_Meters},
423  {"DAQmx_Val_Inches", DAQmx_Val_Inches},
424  {"DAQmx_Val_Ticks", DAQmx_Val_Ticks},
425  {"DAQmx_Val_Degrees", DAQmx_Val_Degrees},
426  {"DAQmx_Val_Radians", DAQmx_Val_Radians},
427  {"DAQmx_Val_High", DAQmx_Val_High},
428  {"DAQmx_Val_Low", DAQmx_Val_Low}};
429 
430 int daqmx_defstr2num(const std::string& str)
431 {
432  const std::string s = mrpt::system::trim(str);
433 
434  for (unsigned int i = 0; i < sizeof(daqmx_vals) / sizeof(daqmx_vals[0]);
435  i++)
436  {
437  if (strCmpI(daqmx_vals[i].str, s.c_str())) return daqmx_vals[i].val;
438  }
439  THROW_EXCEPTION_FMT("Error: Unknown DAQmx constant: %s", s.c_str())
440 }
441 #endif
442 
443 /* -----------------------------------------------------
444  initialize
445 ----------------------------------------------------- */
447 {
448 #if MRPT_HAS_SOME_NIDAQMX
449  this->stop();
450 
451  for (size_t i = 0; i < task_definitions.size(); i++)
452  {
453  const TaskDescription& tf = task_definitions[i];
454 
455  // Try to create a new task:
456  m_running_tasks.push_back(TInfoPerTask());
457  TInfoPerTask& ipt = m_running_tasks.back();
458  ipt.task = tf; // Save a copy of the task info for the thread to have
459  // all the needed info
460 
461  try
462  {
463  TaskHandle& taskHandle =
464  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
465 
466  MRPT_DAQmx_ErrChk(MRPT_DAQmxCreateTask("", &taskHandle));
467 
468  if (tf.has_ai)
469  {
470  ASSERTMSG_(
471  tf.ai.physicalChannelCount > 0,
472  "ai.physicalChannelCount is zero! Please, define it "
473  "correctly.")
474 
477  taskHandle, tf.ai.physicalChannel.c_str(), nullptr,
478  daqmx_defstr2num(tf.ai.terminalConfig), tf.ai.minVal,
479  tf.ai.maxVal, DAQmx_Val_Volts, nullptr));
480  }
481  if (tf.has_ao)
482  {
483  ASSERTMSG_(
484  tf.ao.physicalChannelCount > 0,
485  "ai.physicalChannelCount is zero! Please, define it "
486  "correctly.")
487 
490  taskHandle, tf.ao.physicalChannel.c_str(), nullptr,
491  tf.ao.minVal, tf.ao.maxVal, DAQmx_Val_Volts, nullptr));
492  }
493  if (tf.has_di)
494  {
497  taskHandle, tf.di.line.c_str(), nullptr,
498  DAQmx_Val_ChanPerLine));
499  }
500  if (tf.has_do)
501  {
504  taskHandle, tf.douts.line.c_str(), nullptr,
505  DAQmx_Val_ChanPerLine));
506  }
507  if (tf.has_ci_period)
508  {
511  taskHandle, tf.ci_period.counter.c_str(), nullptr,
513  daqmx_defstr2num(tf.ci_period.units),
514  daqmx_defstr2num(tf.ci_period.edge),
515  DAQmx_Val_LowFreq1Ctr, tf.ci_period.measTime,
516  tf.ci_period.divisor, nullptr));
517  }
518  if (tf.has_ci_count_edges)
519  {
522  taskHandle, tf.ci_count_edges.counter.c_str(), nullptr,
523  daqmx_defstr2num(tf.ci_count_edges.edge),
525  daqmx_defstr2num(tf.ci_count_edges.countDirection)));
526  }
527  if (tf.has_ci_pulse_width)
528  {
531  taskHandle, tf.ci_pulse_width.counter.c_str(), nullptr,
533  daqmx_defstr2num(tf.ci_pulse_width.units),
534  daqmx_defstr2num(tf.ci_pulse_width.startingEdge),
535  nullptr));
536  }
537  if (tf.has_ci_lin_encoder)
538  {
541  taskHandle, tf.ci_lin_encoder.counter.c_str(), nullptr,
542  daqmx_defstr2num(tf.ci_lin_encoder.decodingType),
544  daqmx_defstr2num(tf.ci_lin_encoder.ZidxPhase),
545  daqmx_defstr2num(tf.ci_lin_encoder.units),
547  tf.ci_lin_encoder.initialPos, nullptr));
548  }
549  if (tf.has_ci_ang_encoder)
550  {
553  taskHandle, tf.ci_ang_encoder.counter.c_str(), nullptr,
554  daqmx_defstr2num(tf.ci_ang_encoder.decodingType),
556  daqmx_defstr2num(tf.ci_ang_encoder.ZidxPhase),
557  daqmx_defstr2num(tf.ci_ang_encoder.units),
559  tf.ci_ang_encoder.initialAngle, nullptr));
560  }
561  if (tf.has_co_pulses)
562  {
565  taskHandle, tf.co_pulses.counter.c_str(), nullptr,
566  DAQmx_Val_Hz, daqmx_defstr2num(tf.co_pulses.idleState),
568  tf.co_pulses.dutyCycle));
569  }
570 
571  // Seems to be needed to avoid an errors avoid like:
572  // " Onboard device memory overflow. Because of system and/or
573  // bus-bandwidth limitations, the driver could not read data from
574  // the device fast enough to keep up with the device throughput."
575  if (tf.has_ai || tf.has_di || tf.has_ci_period ||
578  {
579  // sample rate:
583  taskHandle, tf.sampleClkSource.c_str(),
584  tf.samplesPerSecond, DAQmx_Val_Rising,
585  DAQmx_Val_ContSamps, tf.samplesPerChannelToRead));
586 
589  taskHandle, tf.bufferSamplesPerChannel));
590  }
591 
592  if (tf.has_ao)
593  {
594  // Nothing to do as long as we only need "on demand" outputs.
595  // MRPT_DAQmx_ErrChk (MRPT_DAQmxCfgOutputBuffer(taskHandle,2
596  ///*tf.bufferSamplesPerChannel*/ ));
597  // // Output buffer MUST have some data before starting the
598  // task: write 0s:
599  // vector<double> d;
600  // d.assign(tf.ao.physicalChannelCount*2, 0.0);
601  // this->writeAnalogOutputTask(i,1 /* samples per channel */,
602  //&d[0], 0.10 /*timeout*/, false);
603  }
604 
605  // Create pipe:
606  mrpt::synch::CPipe::createPipe(ipt.read_pipe, ipt.write_pipe);
607 
608  // Add a large timeout, just in case the writing thread dies
609  // unexpectedly so the reader doesn't hang on:
610  ipt.read_pipe->timeout_read_start_us = 100000; // 100ms
611  ipt.read_pipe->timeout_read_between_us = 100000; // 100ms
612 
614 
615  ipt.hThread = std::thread(
617  }
618  catch (std::exception const& e)
619  {
620  std::cerr << "[CNationalInstrumentsDAQ] Error:" << std::endl
621  << e.what() << std::endl;
622  if (ipt.taskHandle != nullptr)
623  {
624  TaskHandle& taskHandle =
625  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
626  MRPT_DAQmxStopTask(taskHandle);
627  MRPT_DAQmxClearTask(taskHandle);
628  }
629 
630  // Stop thread:
631  if (ipt.hThread.joinable())
632  {
633  ipt.must_close = true;
634  cerr << "[CNationalInstrumentsDAQ::initialize] Waiting for the "
635  "grabbing thread to end due to exception...\n";
636  ipt.hThread.join();
637  cerr << "[CNationalInstrumentsDAQ::initialize] Grabbing thread "
638  "ended.\n";
639  }
640 
641  // Remove from list:
642  m_running_tasks.erase(--m_running_tasks.end());
643 
644  std::cerr << "[CNationalInstrumentsDAQ] Error while creating "
645  "tasks. Closing other tasks before returning...\n";
646  this->stop();
647  std::cerr << "[CNationalInstrumentsDAQ] Closing tasks done.\n";
648 
649  throw; // Rethrow
650  }
651  } // end for each task_definitions[i]
652 
653 #else
654  THROW_EXCEPTION("MRPT was compiled without support for NI DAQmx!!");
655 #endif
656 }
657 
658 /** Stop the grabbing threads for DAQ tasks. It is automatically called at
659  * destruction. */
661 {
662  // Stop all threads:
664  it != m_running_tasks.end(); ++it)
665  {
666  it->must_close = true;
667  }
668  if (m_verbose)
669  cout << "[CNationalInstrumentsDAQ::stop] Waiting for grabbing threads "
670  "to end...\n";
672  it != m_running_tasks.end(); ++it)
673  {
674  // For some reason, join doesn't work...
675  if (it->hThread.joinable()) it->hThread.join();
676  // Polling:
677  // for (size_t tim=0;tim<250 && !it->is_closed;tim++) {
678  // std::this_thread::sleep_for(1ms); }
679  // it->hThread.clear();
680  }
681  if (m_verbose)
682  cout << "[CNationalInstrumentsDAQ::stop] All threads ended.\n";
683 
684 // Stop all NI tasks:
685 #if MRPT_HAS_SOME_NIDAQMX
687  it != m_running_tasks.end(); ++it)
688  {
689  TaskHandle& taskHandle =
690  *reinterpret_cast<TaskHandle*>(&it->taskHandle);
691 
692  MRPT_DAQmxStopTask(taskHandle);
693  MRPT_DAQmxClearTask(taskHandle);
694  taskHandle = nullptr;
695  }
696 #endif
697 }
698 
699 /** Returns true if initialize() was called successfully. */
701 {
702  return (!m_running_tasks.empty() && !m_running_tasks.begin()->is_closed);
703 }
704 
705 /*-------------------------------------------------------------
706  readFromDAQ
707 -------------------------------------------------------------*/
709  std::vector<mrpt::obs::CObservationRawDAQ::Ptr>& outObservations,
710  bool& hardwareError)
711 {
712  hardwareError = false;
713  outObservations.clear();
714 
715  if (!checkDAQIsWorking())
716  {
717  hardwareError = true;
718  return;
719  }
720 
721  // Read from the pipe:
722  m_state = ssWorking;
723 
725  it != m_running_tasks.end(); ++it)
726  {
727  CObservationRawDAQ tmp_obs;
728  try
729  {
730  if (it->new_obs_available != 0)
731  {
732  auto arch = mrpt::serialization::archiveFrom(*it->read_pipe);
733  arch.ReadObject(&tmp_obs);
734  --(it->new_obs_available);
735 
736  // Yes, valid block of samples was adquired:
737  outObservations.push_back(
739  }
740  }
741  catch (...)
742  {
743  // Timeout...
744  }
745  }
746 }
747 
748 /* -----------------------------------------------------
749  doProcess
750 ----------------------------------------------------- */
752 {
753  bool hwError;
755 
756  if (hwError)
757  {
758  m_state = ssError;
759  THROW_EXCEPTION("Couldn't start DAQ task!");
760  }
761 
762  if (!m_nextObservations.empty())
763  {
764  m_state = ssWorking;
765 
766  std::vector<mrpt::serialization::CSerializable::Ptr> new_obs;
767  new_obs.resize(m_nextObservations.size());
768 
769  for (size_t i = 0; i < m_nextObservations.size(); i++)
770  new_obs[i] = m_nextObservations[i];
771 
772  appendObservations(new_obs);
773  }
774 }
775 
776 /* -----------------------------------------------------
777  grabbing_thread
778 ----------------------------------------------------- */
780 {
781 #if MRPT_HAS_SOME_NIDAQMX
782  try
783  {
784  TaskHandle& taskHandle =
785  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
786  if (m_verbose)
787  cout << "[CNationalInstrumentsDAQ::grabbing_thread] Starting "
788  "thread for task "
789  << ipt.taskHandle << "\n";
790 
791  MRPT_TODO("Add write timeout")
792  // ipt.write_pipe->timeout_read_between_us
793 
794  const float timeout =
796 
797  int err = 0;
798  vector<double> dBuf;
799  vector<uint8_t> u8Buf;
800 
801  const mrpt::obs::CObservationRawDAQ clean_obs;
803 
804  while (!ipt.must_close)
805  {
806  obs = clean_obs; // Start with an empty observation
807 
808  // Common stuff:
811  obs.sensorLabel = m_sensorLabel + string(".") + ipt.task.taskLabel;
812 
813  bool there_are_data = false; // At least one channel?
814 
815  // Read from each channel in this task:
816  // -----------------------------------------------
817  if (ipt.task.has_ai)
818  {
820  obs.AIN_interleaved = true;
821 
822  const uint32_t totalSamplesToRead =
825  dBuf.resize(totalSamplesToRead);
826  int32 pointsReadPerChan = -1;
827  if ((err = MRPT_DAQmxReadAnalogF64(
828  taskHandle, ipt.task.samplesPerChannelToRead, timeout,
829  obs.AIN_interleaved ? DAQmx_Val_GroupByScanNumber
830  : DAQmx_Val_GroupByChannel,
831  &dBuf[0], dBuf.size(), &pointsReadPerChan, nullptr)) <
832  0 &&
833  err != DAQmxErrorSamplesNotYetAvailable)
834  {
835  MRPT_DAQmx_ErrChk(err)
836  }
837  else if (pointsReadPerChan > 0)
838  {
840  totalSamplesToRead,
841  pointsReadPerChan * ipt.task.ai.physicalChannelCount)
842  obs.AIN_double = dBuf;
843  there_are_data = true;
844  if (m_verbose)
845  cout << "[CNationalInstrumentsDAQ::grabbing_thread] "
846  << pointsReadPerChan << " analog samples read.\n";
847  }
848  } // end AI
849  if (ipt.task.has_di)
850  {
851  const uint32_t totalSamplesToRead =
853  u8Buf.resize(totalSamplesToRead);
854 
855  int32 pointsReadPerChan = -1;
856  if ((err = MRPT_DAQmxReadDigitalU8(
857  taskHandle, ipt.task.samplesPerChannelToRead, timeout,
858  DAQmx_Val_GroupByChannel, &u8Buf[0], u8Buf.size(),
859  &pointsReadPerChan, nullptr)) < 0 &&
860  err != DAQmxErrorSamplesNotYetAvailable)
861  {
862  MRPT_DAQmx_ErrChk(err)
863  }
864  else if (pointsReadPerChan > 0)
865  {
867  totalSamplesToRead,
868  pointsReadPerChan * ipt.task.ai.physicalChannelCount)
869  obs.DIN = u8Buf;
870  there_are_data = true;
871  if (m_verbose)
872  cout << "[CNationalInstrumentsDAQ::grabbing_thread] "
873  << pointsReadPerChan << " digital samples read.\n";
874  }
875  } // end DI
877  {
878  const int32 totalSamplesToRead =
880  dBuf.resize(totalSamplesToRead);
881  int32 pointsReadPerChan = -1;
882  if ((err = MRPT_DAQmxReadCounterF64(
883  taskHandle, totalSamplesToRead, timeout, &dBuf[0],
884  dBuf.size(), &pointsReadPerChan, nullptr)) < 0 &&
885  err != DAQmxErrorSamplesNotYetAvailable)
886  {
887  MRPT_DAQmx_ErrChk(err)
888  }
889  else if (pointsReadPerChan > 0)
890  {
891  ASSERT_EQUAL_(totalSamplesToRead, pointsReadPerChan);
892  // Decimate?
893  if (++ipt.task.ci_ang_encoder.decimate_cnt >=
895  {
897 
898  obs.CNTRIN_double = dBuf;
899  there_are_data = true;
900  if (m_verbose && !obs.CNTRIN_double.empty())
901  {
902  static int decim = 0;
903  if (!decim)
904  cout << "[CNationalInstrumentsDAQ::grabbing_"
905  "thread] "
906  << pointsReadPerChan
907  << " counter samples read ([0]="
908  << obs.CNTRIN_double[0] << ").\n";
909  if (++decim > 100) decim = 0;
910  }
911  }
912  }
913  } // end COUNTERS
914 
915  // Send the observation to the main thread:
916  if (there_are_data)
917  {
918  ++(ipt.new_obs_available);
919  ipt.write_pipe->WriteObject(&obs);
920  // std::this_thread::sleep_for(1ms); // This seems to be needed
921  // to allow all objs to be sent to the recv thread
922  }
923  else
924  {
925  std::this_thread::sleep_for(1ms);
926  }
927 
928  } // end of main thread loop
929  }
930  catch (std::exception& e)
931  {
932  std::cerr << "[CNationalInstrumentsDAQ::grabbing_thread] Exception:\n"
933  << e.what() << std::endl;
934  }
935 #endif // MRPT_HAS_SOME_NIDAQMX
936 
937  ipt.is_closed = true;
938 }
939 
941  size_t task_index, size_t nSamplesPerChannel, const double* volt_values,
942  double timeout, bool groupedByChannel)
943 {
944 #if MRPT_HAS_SOME_NIDAQMX
945  ASSERT_(task_index < m_running_tasks.size());
947  std::advance(it, task_index);
948  TInfoPerTask& ipt = *it;
949  TaskHandle& taskHandle = *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
950 
951  int32 samplesWritten = 0;
952  int err = 0;
953  if (err = MRPT_DAQmxWriteAnalogF64(
954  taskHandle, nSamplesPerChannel, FALSE, timeout,
955  groupedByChannel ? DAQmx_Val_GroupByChannel
956  : DAQmx_Val_GroupByScanNumber,
957  const_cast<float64*>(volt_values), &samplesWritten, nullptr))
958  {
959  MRPT_DAQmx_ErrChk(err)
960  }
961 #else
962  MRPT_UNUSED_PARAM(task_index);
963  MRPT_UNUSED_PARAM(nSamplesPerChannel);
964  MRPT_UNUSED_PARAM(volt_values);
965  MRPT_UNUSED_PARAM(timeout);
966  MRPT_UNUSED_PARAM(groupedByChannel);
967 #endif
968 }
969 
971  size_t task_index, bool line_value, double timeout)
972 {
973 #if MRPT_HAS_SOME_NIDAQMX
974  ASSERT_(task_index < m_running_tasks.size());
976  std::advance(it, task_index);
977  TInfoPerTask& ipt = *it;
978  TaskHandle& taskHandle = *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
979 
980  uInt8 dat = line_value ? 1 : 0;
981 
982  int32 samplesWritten = 0;
983  int32 nSamplesPerChannel = 1;
984  int err = 0;
985  if (err = MRPT_DAQmxWriteDigitalLines(
986  taskHandle, nSamplesPerChannel, FALSE, timeout,
987  DAQmx_Val_GroupByScanNumber, &dat, &samplesWritten, nullptr))
988  {
989  MRPT_DAQmx_ErrChk(err)
990  }
991 
992 #else
993  MRPT_UNUSED_PARAM(task_index);
994  MRPT_UNUSED_PARAM(line_value);
995  MRPT_UNUSED_PARAM(timeout);
996 #endif
997 }
998 
999 // Ctor:
1001  : has_ai(false),
1002  has_ao(false),
1003  has_di(false),
1004  has_do(false),
1005  has_ci_period(false),
1006  has_ci_count_edges(false),
1007  has_ci_pulse_width(false),
1008  has_ci_lin_encoder(false),
1009  has_ci_ang_encoder(false),
1010  has_co_pulses(false),
1011  samplesPerSecond(1000.0),
1012  bufferSamplesPerChannel(200000),
1013  samplesPerChannelToRead(1000)
1014 {
1015 }
uint16_t AIN_channel_count
Readings from analog input (ADCs) channels (vector length=channel count) in Volts.
#define MRPT_DAQmxCreateCILinEncoderChan
Scalar * iterator
Definition: eigen_plugins.h:26
double sample_rate
Readings from ticks counters, such as quadrature encoders.
Each of the tasks to create in CNationalInstrumentsDAQ::initialize().
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_ang_encoder_t ci_ang_encoder
Counter: uses an angular encoder to measure angular position.
GLdouble GLdouble t
Definition: glext.h:3689
std::string read_string(const std::string &section, const std::string &name, const std::string &defaultValue, bool failIfNotFound=false) const
virtual void initialize()
Setup and launch the DAQ tasks, in parallel threads.
#define MRPT_DAQmxCreateDOChan
std::string line
The digital line (for example "Dev1/port0/line1")
This namespace provides a OS-independent interface to many useful functions: filenames manipulation...
Definition: math_frwds.h:25
#define THROW_EXCEPTION(msg)
Definition: exceptions.h:41
#define MRPT_DAQmxCreateCIPulseWidthChan
std::string m_sensorLabel
See CGenericSensor.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_co_pulses_t co_pulses
Output counter: digital pulses output.
#define MRPT_DAQmxStartTask
#define MRPT_DAQmxCreateAOVoltageChan
#define MRPT_DAQmxCreateDIChan
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_do_t douts
Digital outs (do)
mrpt::system::TTimeStamp now()
A shortcut for system::getCurrentTime.
Definition: datetime.h:75
Contains classes for various device interfaces.
#define MRPT_DAQmxCreateCIPeriodChan
STL namespace.
bool checkDAQIsWorking() const
Returns true if initialize() was called and at least one task is running.
TaskDescription task
A copy of the original task description that generated this thread.
GLdouble s
Definition: glext.h:3676
#define MRPT_DAQmxWriteDigitalLines
void tokenize(const std::string &inString, const std::string &inDelimiters, OUT_CONTAINER &outTokens, bool skipBlankTokens=true) noexcept
Tokenizes a string according to a set of delimiting characters.
#define MRPT_DAQmxReadAnalogF64
void appendObservations(const std::vector< mrpt::serialization::CSerializable::Ptr > &obj)
This method must be called by derived classes to enqueue a new observation in the list to be returned...
CArchiveStreamBase< STREAM > archiveFrom(STREAM &s)
Helper function to create a templatized wrapper CArchive object for a: MRPT&#39;s CStream, std::istream, std::ostream, std::stringstream
Definition: CArchive.h:561
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_count_edges_t ci_count_edges
Counter: period of a digital signal.
#define ASSERT_(f)
Defines an assertion mechanism.
Definition: exceptions.h:113
This class allows loading and storing values and vectors of different types from a configuration text...
#define MRPT_DAQmxWriteAnalogF64
#define ASSERT_EQUAL_(__A, __B)
Assert comparing two values, reporting their actual values upon failure.
Definition: exceptions.h:153
std::string sampleClkSource
Sample clock source: may be empty (default value) for some channels.
#define MRPT_DAQmxCreateCIAngEncoderChan
void writeDigitalOutputTask(size_t task_index, bool line_value, double timeout)
Changes the boolean state of one digital output line.
#define MRPT_DAQmxCfgInputBuffer
Versatile class for consistent logging and management of output messages.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_di_t di
Digital inputs (di)
#define MRPT_DAQmxReadCounterF64
This namespace contains representation of robot actions and observations.
int val
Definition: mrpt_jpeglib.h:955
uint64_t read_uint64_t(const std::string &section, const std::string &name, uint64_t defaultValue, bool failIfNotFound=false) const
std::string format(const char *fmt,...) MRPT_printf_format_check(1
A std::string version of C sprintf.
Definition: format.cpp:16
void stop()
Stop the grabbing threads for DAQ tasks.
void grabbing_thread(TInfoPerTask &ipt)
Method to be executed in each parallel thread.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_pulse_width_t ci_pulse_width
Counter: measure the width of a digital pulse.
An interface to read from data acquisition boards compatible with National Instruments "DAQmx Base" o...
#define MY_LOAD_HERE_CONFIG_VAR( variableName, variableType, targetVariable, configFileObject, sectionNameStr)
#define ASSERTMSG_(f, __ERROR_MSG)
Defines an assertion mechanism.
Definition: exceptions.h:101
bool AIN_interleaved
Whether the channels are interleaved (A0 A1 A2 A0 A1 A2...) or not (A0 A0 A0 A1 A1 A1 A2 A2 A2...
GLsizei const GLchar ** string
Definition: glext.h:4101
#define MRPT_DAQmxStopTask
std::vector< mrpt::obs::CObservationRawDAQ::Ptr > m_nextObservations
A buffer for doProcess.
Store raw data from a Data Acquisition (DAQ) device, such that input or output analog and digital cha...
#define IMPLEMENTS_GENERIC_SENSOR(class_name, NameSpace)
This must be inserted in all CGenericSensor classes implementation files:
void writeAnalogOutputTask(size_t task_index, size_t nSamplesPerChannel, const double *volt_values, double timeout, bool groupedByChannel)
Set voltage outputs to all the outputs in an AOUT task For the meaning of parameters, refere to NI DAQmx docs for DAQmxBaseWriteAnalogF64()
std::vector< double > CNTRIN_double
Readings from ticks counters, such as quadrature encoders.
uint32_t bufferSamplesPerChannel
(Default=0) From NI&#39;s docs: The number of samples the buffer can hold for each channel in the task...
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_lin_encoder_t ci_lin_encoder
Counter: uses a linear encoder to measure linear position.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ai_t ai
Analog inputs.
void readFromDAQ(std::vector< mrpt::obs::CObservationRawDAQ::Ptr > &outObservations, bool &hardwareError)
Receives data from the DAQ thread(s).
std::string line
The digital line (for example "Dev1/port0/line1")
std::string sensorLabel
An arbitrary label that can be used to identify the sensor.
Definition: CObservation.h:62
unsigned __int64 uint64_t
Definition: rptypes.h:50
#define MRPT_DAQmxCreateAIVoltageChan
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
std::vector< double > AIN_double
Readings from analog input (ADCs) channels (vector length=channel count) in Volts.
mrpt::system::TTimeStamp timestamp
The associated UTC time-stamp.
Definition: CObservation.h:60
#define MRPT_TODO(x)
Definition: common.h:129
uint32_t samplesPerChannelToRead
(Default=1000) The number of samples to grab at once from each channel.
std::vector< TaskDescription > task_definitions
Publicly accessible vector with the list of tasks to be launched upon call to CNationalInstrumentsDAQ...
void doProcess()
This method will be invoked at a minimum rate of "process_rate" (Hz)
void loadConfig_sensorSpecific(const mrpt::config::CConfigFileBase &configSource, const std::string &iniSection)
See the class documentation at the top for expected parameters.
#define FALSE
Definition: xmlParser.h:231
unsigned int physicalChannelCount
IMPORTANT This must be the total number of channels listed in "physicalChannel" (e.g.
#define ASSERT_ABOVE_(__A, __B)
Definition: exceptions.h:171
std::unique_ptr< mrpt::io::CPipeWriteEndPoint > write_pipe
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ao_t ao
Analog outputs.
#define MRPT_DAQmxCreateCOPulseChanFreq
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_period_t ci_period
Counter: period of a digital signal.
#define MRPT_DAQmx_ErrChk(functionCall)
unsigned int physicalChannelCount
IMPORTANT This must be the total number of channels listed in "physicalChannel" (e.g.
std::string trim(const std::string &str)
Removes leading and trailing spaces.
#define MRPT_DAQmxClearTask
double samplesPerSecond
Sample clock config: samples per second.
std::vector< uint8_t > DIN
Present output values for 16-bit analog output (DACs) channels (vector length=channel count) in volts...
std::unique_ptr< mrpt::io::CPipeReadEndPoint > read_pipe
#define THROW_EXCEPTION_FMT(_FORMAT_STRING,...)
Definition: exceptions.h:43
bool strCmpI(const std::string &s1, const std::string &s2)
Return true if the two strings are equal (case insensitive)
unsigned __int32 uint32_t
Definition: rptypes.h:47
#define MY_LOAD_HERE_CONFIG_VAR_NO_DEFAULT( variableName, variableType, targetVariable, configFileObject, sectionNameStr)
#define MRPT_DAQmxReadDigitalU8
#define MRPT_DAQmxCfgSampClkTiming
#define MRPT_DAQmxCreateCICountEdgesChan
#define MRPT_DAQmxCreateTask
#define MRPT_UNUSED_PARAM(a)
Determines whether this is an X86 or AMD64 platform.
Definition: common.h:186



Page generated by Doxygen 1.8.14 for MRPT 1.9.9 Git: ad3a9d8ae Tue May 1 23:10:22 2018 -0700 at lun oct 28 00:14:14 CET 2019