Main MRPT website > C++ reference for MRPT 1.9.9
CNationalInstrumentsDAQ.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | http://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2017, Individual contributors, see AUTHORS file |
6  | See: http://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See details in http://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "hwdrivers-precomp.h" // Precompiled headers
11 
13 #include <iterator> // advance()
14 
15 // If we have both, DAQmx & DAQmxBase, prefer DAQmx:
16 #define MRPT_HAS_SOME_NIDAQMX (MRPT_HAS_NIDAQMXBASE || MRPT_HAS_NIDAQMX)
17 
18 #define MRPT_USE_NIDAQMXBASE (MRPT_HAS_NIDAQMXBASE && !MRPT_HAS_NIDAQMX)
19 #define MRPT_USE_NIDAQMX (MRPT_HAS_NIDAQMX)
20 
21 #if MRPT_USE_NIDAQMXBASE
22 #include "NIDAQmxBase.h" // Include file for NI-DAQmx Base API
23 #endif
24 #if MRPT_USE_NIDAQMX
25 #include "NIDAQmx.h" // Include file for NI-DAQmx API
26 #endif
27 
28 // Macros to use either DAQmx or DAQmx Base automatically, depending on the
29 // installed libraries:
30 #if MRPT_USE_NIDAQMXBASE
31 #define MRPT_DAQmxGetExtendedErrorInfo DAQmxBaseGetExtendedErrorInfo
32 #define MRPT_DAQmxCreateTask DAQmxBaseCreateTask
33 #define MRPT_DAQmxCreateAIVoltageChan DAQmxBaseCreateAIVoltageChan
34 #define MRPT_DAQmxCreateAOVoltageChan DAQmxBaseCreateAOVoltageChan
35 #define MRPT_DAQmxCreateDIChan DAQmxBaseCreateDIChan
36 #define MRPT_DAQmxCreateDOChan DAQmxBaseCreateDOChan
37 #define MRPT_DAQmxCreateCIPeriodChan DAQmxBaseCreateCIPeriodChan
38 #define MRPT_DAQmxCreateCICountEdgesChan DAQmxBaseCreateCICountEdgesChan
39 #define MRPT_DAQmxCreateCIPulseWidthChan DAQmxBaseCreateCIPulseWidthChan
40 #define MRPT_DAQmxCreateCILinEncoderChan DAQmxBaseCreateCILinEncoderChan
41 #define MRPT_DAQmxCreateCIAngEncoderChan DAQmxBaseCreateCIAngEncoderChan
42 #define MRPT_DAQmxCreateCOPulseChanFreq DAQmxBaseCreateCOPulseChanFreq
43 #define MRPT_DAQmxCfgSampClkTiming DAQmxBaseCfgSampClkTiming
44 #define MRPT_DAQmxCfgInputBuffer DAQmxBaseCfgInputBuffer
45 #define MRPT_DAQmxCfgOutputBuffer DAQmxBaseCfgOutputBuffer
46 #define MRPT_DAQmxStartTask DAQmxBaseStartTask
47 #define MRPT_DAQmxStopTask DAQmxBaseStopTask
48 #define MRPT_DAQmxClearTask DAQmxBaseClearTask
49 #define MRPT_DAQmxReadAnalogF64 DAQmxBaseReadAnalogF64
50 #define MRPT_DAQmxReadCounterF64 DAQmxBaseReadCounterF64
51 #define MRPT_DAQmxReadDigitalU8 DAQmxBaseReadDigitalU8
52 #define MRPT_DAQmxWriteAnalogF64 DAQmxBaseWriteAnalogF64
53 #define MRPT_DAQmxWriteDigitalU32 DAQmxBaseWriteDigitalU32
54 #define MRPT_DAQmxWriteDigitalLines DAQmxBaseWriteDigitalLines
55 #else
56 #define MRPT_DAQmxGetExtendedErrorInfo DAQmxGetExtendedErrorInfo
57 #define MRPT_DAQmxCreateTask DAQmxCreateTask
58 #define MRPT_DAQmxCreateAIVoltageChan DAQmxCreateAIVoltageChan
59 #define MRPT_DAQmxCreateAOVoltageChan DAQmxCreateAOVoltageChan
60 #define MRPT_DAQmxCreateDIChan DAQmxCreateDIChan
61 #define MRPT_DAQmxCreateDOChan DAQmxCreateDOChan
62 #define MRPT_DAQmxCreateCIPeriodChan DAQmxCreateCIPeriodChan
63 #define MRPT_DAQmxCreateCICountEdgesChan DAQmxCreateCICountEdgesChan
64 #define MRPT_DAQmxCreateCIPulseWidthChan DAQmxCreateCIPulseWidthChan
65 #define MRPT_DAQmxCreateCILinEncoderChan DAQmxCreateCILinEncoderChan
66 #define MRPT_DAQmxCreateCIAngEncoderChan DAQmxCreateCIAngEncoderChan
67 #define MRPT_DAQmxCreateCOPulseChanFreq DAQmxCreateCOPulseChanFreq
68 #define MRPT_DAQmxCfgSampClkTiming DAQmxCfgSampClkTiming
69 #define MRPT_DAQmxCfgInputBuffer DAQmxCfgInputBuffer
70 #define MRPT_DAQmxCfgOutputBuffer DAQmxCfgOutputBuffer
71 #define MRPT_DAQmxStartTask DAQmxStartTask
72 #define MRPT_DAQmxStopTask DAQmxStopTask
73 #define MRPT_DAQmxClearTask DAQmxClearTask
74 #define MRPT_DAQmxReadAnalogF64 DAQmxReadAnalogF64
75 #define MRPT_DAQmxReadCounterF64 DAQmxReadCounterF64
76 #define MRPT_DAQmxReadDigitalU8 DAQmxReadDigitalU8
77 #define MRPT_DAQmxWriteAnalogF64 DAQmxWriteAnalogF64
78 #define MRPT_DAQmxWriteDigitalU32 DAQmxWriteDigitalU32
79 #define MRPT_DAQmxWriteDigitalLines DAQmxWriteDigitalLines
80 #endif
81 
82 // An auxiliary macro to check and report errors in the DAQmx library as
83 // exceptions with a well-explained message.
84 #define MRPT_DAQmx_ErrChk(functionCall) \
85  if ((functionCall) < 0) \
86  { \
87  char errBuff[2048]; \
88  MRPT_DAQmxGetExtendedErrorInfo(errBuff, 2048); \
89  std::string sErr = mrpt::format( \
90  "DAQ error: '%s'\nCalling: '%s'", errBuff, #functionCall); \
91  THROW_EXCEPTION(sErr) \
92  }
93 
94 using namespace mrpt::hwdrivers;
95 using namespace mrpt::obs;
96 using namespace mrpt::system;
97 using namespace std;
98 
100 
101 // ------------- CNationalInstrumentsDAQ::TInfoPerTask -----------
102 // Default ctor:
104  : taskHandle(0),
105  must_close(false),
106  is_closed(false),
107  new_obs_available(0),
108  task()
109 {
110 }
111 
112 /* -----------------------------------------------------
113  Constructor
114  ----------------------------------------------------- */
116  : mrpt::utils::COutputLogger("CNationalInstrumentsDAQ")
117 {
118  m_sensorLabel = "NIDAQ";
119 }
120 
121 // Just like "MRPT_LOAD_HERE_CONFIG_VAR" but...
122 #define MY_LOAD_HERE_CONFIG_VAR( \
123  variableName, variableType, targetVariable, configFileObject, \
124  sectionNameStr) \
125  targetVariable = configFileObject.read_##variableType( \
126  sectionNameStr, variableName, targetVariable, false);
127 
128 #define MY_LOAD_HERE_CONFIG_VAR_NO_DEFAULT( \
129  variableName, variableType, targetVariable, configFileObject, \
130  sectionNameStr) \
131  { \
132  try \
133  { \
134  targetVariable = configFileObject.read_##variableType( \
135  sectionNameStr, variableName, targetVariable, true); \
136  } \
137  catch (std::exception&) \
138  { \
139  THROW_EXCEPTION( \
140  format( \
141  "Value for '%s' not found in config file", \
142  std::string(variableName).c_str())); \
143  } \
144  }
145 
146 /* -----------------------------------------------------
147  loadConfig_sensorSpecific
148  ----------------------------------------------------- */
150  const mrpt::utils::CConfigFileBase& cfg, const std::string& sect)
151 {
152  // std::vector<TaskDescription> task_definitions;
153  task_definitions.clear();
154 
155  const unsigned int nTasks = cfg.read_uint64_t(sect, "num_tasks", 0, true);
156  if (!nTasks)
157  {
158  std::cerr << "[CNationalInstrumentsDAQ] Warning: Number of tasks is "
159  "zero. No datalogging will be done.\n";
160  }
161 
162  task_definitions.resize(nTasks);
163  for (unsigned int i = 0; i < nTasks; i++)
164  {
166  const string sTask = mrpt::format("task%u", i);
167 
168  // Read general settings for this task:
169  // ---------------------------------------
170  const string sChanns =
171  cfg.read_string(sect, sTask + string(".channels"), "", true);
172  vector<string> lstStrChanns;
173  mrpt::system::tokenize(sChanns, " \t,", lstStrChanns);
174  if (lstStrChanns.empty())
175  THROW_EXCEPTION_FMT("List of channels for task %u is empty!", i)
176 
178  sTask + string(".samplesPerSecond"), double, t.samplesPerSecond,
179  cfg, sect)
181  sTask + string(".samplesPerChannelToRead"), double,
182  t.samplesPerChannelToRead, cfg, sect)
184  sTask + string(".sampleClkSource"), string, t.sampleClkSource, cfg,
185  sect)
187  sTask + string(".bufferSamplesPerChannel"), double,
188  t.bufferSamplesPerChannel, cfg, sect)
189  t.taskLabel =
190  cfg.read_string(sect, sTask + string(".taskLabel"), sTask, false);
191 
192  for (size_t j = 0; j < lstStrChanns.size(); j++)
193  {
194  if (strCmpI(lstStrChanns[j], "ai"))
195  {
196  t.has_ai = true;
198  sTask + string(".ai.physicalChannel"), string,
199  t.ai.physicalChannel, cfg, sect)
201  sTask + string(".ai.physicalChannelCount"), uint64_t,
202  t.ai.physicalChannelCount, cfg, sect)
204  sTask + string(".ai.terminalConfig"), string,
205  t.ai.terminalConfig, cfg, sect)
207  sTask + string(".ai.minVal"), double, t.ai.minVal, cfg,
208  sect)
210  sTask + string(".ai.maxVal"), double, t.ai.maxVal, cfg,
211  sect)
212  }
213  else if (strCmpI(lstStrChanns[j], "ao"))
214  {
215  t.has_ao = true;
217  sTask + string(".ao.physicalChannel"), string,
218  t.ao.physicalChannel, cfg, sect)
220  sTask + string(".ao.physicalChannelCount"), uint64_t,
221  t.ao.physicalChannelCount, cfg, sect)
223  sTask + string(".ao.minVal"), double, t.ao.minVal, cfg,
224  sect)
226  sTask + string(".ao.maxVal"), double, t.ao.maxVal, cfg,
227  sect)
228  }
229  else if (strCmpI(lstStrChanns[j], "di"))
230  {
231  t.has_di = true;
233  sTask + string(".di.line"), string, t.di.line, cfg, sect)
234  }
235  else if (strCmpI(lstStrChanns[j], "do"))
236  {
237  t.has_do = true;
239  sTask + string(".do.line"), string, t.douts.line, cfg, sect)
240  }
241  else if (strCmpI(lstStrChanns[j], "ci_period"))
242  {
243  t.has_ci_period = true;
245  sTask + string(".ci_period.counter"), string,
246  t.ci_period.counter, cfg, sect)
248  sTask + string(".ci_period.minVal"), double,
249  t.ci_period.minVal, cfg, sect)
251  sTask + string(".ci_period.maxVal"), double,
252  t.ci_period.maxVal, cfg, sect)
254  sTask + string(".ci_period.units"), string,
255  t.ci_period.units, cfg, sect)
257  sTask + string(".ci_period.edge"), string, t.ci_period.edge,
258  cfg, sect)
260  sTask + string(".ci_period.measTime"), double,
261  t.ci_period.measTime, cfg, sect)
263  sTask + string(".ci_period.divisor"), int,
264  t.ci_period.divisor, cfg, sect)
265  }
266  else if (strCmpI(lstStrChanns[j], "ci_count_edges"))
267  {
268  t.has_ci_count_edges = true;
270  sTask + string(".ci_count_edges.counter"), string,
271  t.ci_count_edges.counter, cfg, sect)
273  sTask + string(".ci_count_edges.edge"), string,
274  t.ci_count_edges.edge, cfg, sect)
276  sTask + string(".ci_count_edges.initialCount"), int,
277  t.ci_count_edges.initialCount, cfg, sect)
279  sTask + string(".ci_count_edges.countDirection"), string,
280  t.ci_count_edges.countDirection, cfg, sect)
281  }
282  else if (strCmpI(lstStrChanns[j], "ci_pulse_width"))
283  {
284  t.has_ci_pulse_width = true;
286  sTask + string(".ci_pulse_width.counter"), string,
287  t.ci_pulse_width.counter, cfg, sect)
289  sTask + string(".ci_pulse_width.minVal"), double,
290  t.ci_pulse_width.minVal, cfg, sect)
292  sTask + string(".ci_pulse_width.maxVal"), double,
293  t.ci_pulse_width.maxVal, cfg, sect)
295  sTask + string(".ci_pulse_width.units"), string,
296  t.ci_pulse_width.units, cfg, sect)
298  sTask + string(".ci_pulse_width.startingEdge"), string,
299  t.ci_pulse_width.startingEdge, cfg, sect)
300  }
301  else if (strCmpI(lstStrChanns[j], "ci_lin_encoder"))
302  {
303  t.has_ci_lin_encoder = true;
305  sTask + string(".ci_lin_encoder.counter"), string,
306  t.ci_lin_encoder.counter, cfg, sect)
308  sTask + string(".ci_lin_encoder.decodingType"), string,
309  t.ci_lin_encoder.decodingType, cfg, sect)
311  sTask + string(".ci_lin_encoder.ZidxEnable"), bool,
312  t.ci_lin_encoder.ZidxEnable, cfg, sect)
314  sTask + string(".ci_lin_encoder.ZidxVal"), double,
315  t.ci_lin_encoder.ZidxVal, cfg, sect)
317  sTask + string(".ci_lin_encoder.ZidxPhase"), string,
318  t.ci_lin_encoder.ZidxPhase, cfg, sect)
320  sTask + string(".ci_lin_encoder.units"), string,
321  t.ci_lin_encoder.units, cfg, sect)
323  sTask + string(".ci_lin_encoder.distPerPulse"), double,
324  t.ci_lin_encoder.distPerPulse, cfg, sect)
326  sTask + string(".ci_lin_encoder.initialPos"), double,
327  t.ci_lin_encoder.initialPos, cfg, sect)
328  }
329  else if (strCmpI(lstStrChanns[j], "ci_ang_encoder"))
330  {
331  t.has_ci_ang_encoder = true;
333  sTask + string(".ci_ang_encoder.counter"), string,
334  t.ci_ang_encoder.counter, cfg, sect)
336  sTask + string(".ci_ang_encoder.decodingType"), string,
337  t.ci_ang_encoder.decodingType, cfg, sect)
339  sTask + string(".ci_ang_encoder.ZidxEnable"), bool,
340  t.ci_ang_encoder.ZidxEnable, cfg, sect)
342  sTask + string(".ci_ang_encoder.ZidxVal"), double,
343  t.ci_ang_encoder.ZidxVal, cfg, sect)
345  sTask + string(".ci_ang_encoder.ZidxPhase"), string,
346  t.ci_ang_encoder.ZidxPhase, cfg, sect)
348  sTask + string(".ci_ang_encoder.units"), string,
349  t.ci_ang_encoder.units, cfg, sect)
351  sTask + string(".ci_ang_encoder.pulsesPerRev"), int,
352  t.ci_ang_encoder.pulsesPerRev, cfg, sect)
354  sTask + string(".ci_ang_encoder.initialAngle"), double,
355  t.ci_ang_encoder.initialAngle, cfg, sect)
357  sTask + string(".ci_ang_encoder.decimate"), int,
358  t.ci_ang_encoder.decimate, cfg, sect)
359  }
360  else if (strCmpI(lstStrChanns[j], "co_pulses"))
361  {
362  t.has_co_pulses = true;
364  sTask + string(".co_pulses.counter"), string,
365  t.co_pulses.counter, cfg, sect)
367  sTask + string(".co_pulses.idleState"), string,
368  t.co_pulses.idleState, cfg, sect)
370  sTask + string(".co_pulses.initialDelay"), double,
371  t.co_pulses.initialDelay, cfg, sect)
373  sTask + string(".co_pulses.freq"), double, t.co_pulses.freq,
374  cfg, sect)
376  sTask + string(".co_pulses.dutyCycle"), double,
377  t.co_pulses.dutyCycle, cfg, sect)
378  }
379  else
380  {
382  "Unknown channel type '%s'! See the docs of "
383  "CNationalInstrumentsDAQ",
384  lstStrChanns[j].c_str())
385  }
386  } // end for each "k" channel in channel "i"
387  } // end for "i", each task
388 }
389 
390 /* -----------------------------------------------------
391  Destructor
392  ----------------------------------------------------- */
394 #if MRPT_HAS_SOME_NIDAQMX
395 // Declare a table to convert strings to their DAQmx #define values:
396 struct daqmx_str_val
397 {
398  const char* str;
399  int val;
400 };
401 
402 const daqmx_str_val daqmx_vals[] = {
403  {"DAQmx_Val_Cfg_Default", DAQmx_Val_Cfg_Default},
404  {"DAQmx_Val_RSE", DAQmx_Val_RSE},
405  {"DAQmx_Val_NRSE", DAQmx_Val_NRSE},
406  {"DAQmx_Val_Diff", DAQmx_Val_Diff},
407  {"DAQmx_Val_Seconds", DAQmx_Val_Seconds},
408  {"DAQmx_Val_Rising", DAQmx_Val_Rising},
409  {"DAQmx_Val_Falling", DAQmx_Val_Falling},
410  {"DAQmx_Val_CountUp", DAQmx_Val_CountUp},
411  {"DAQmx_Val_CountDown", DAQmx_Val_CountDown},
412  {"DAQmx_Val_ExtControlled", DAQmx_Val_ExtControlled},
413  {"DAQmx_Val_AHighBHigh", DAQmx_Val_AHighBHigh},
414  {"DAQmx_Val_AHighBLow", DAQmx_Val_AHighBLow},
415  {"DAQmx_Val_ALowBHigh", DAQmx_Val_ALowBHigh},
416  {"DAQmx_Val_ALowBLow", DAQmx_Val_ALowBLow},
417  {"DAQmx_Val_X1", DAQmx_Val_X1},
418  {"DAQmx_Val_X2", DAQmx_Val_X2},
419  {"DAQmx_Val_X4", DAQmx_Val_X4},
420  {"DAQmx_Val_Meters", DAQmx_Val_Meters},
421  {"DAQmx_Val_Inches", DAQmx_Val_Inches},
422  {"DAQmx_Val_Ticks", DAQmx_Val_Ticks},
423  {"DAQmx_Val_Degrees", DAQmx_Val_Degrees},
424  {"DAQmx_Val_Radians", DAQmx_Val_Radians},
425  {"DAQmx_Val_High", DAQmx_Val_High},
426  {"DAQmx_Val_Low", DAQmx_Val_Low}};
427 
428 int daqmx_defstr2num(const std::string& str)
429 {
430  const std::string s = mrpt::system::trim(str);
431 
432  for (unsigned int i = 0; i < sizeof(daqmx_vals) / sizeof(daqmx_vals[0]);
433  i++)
434  {
435  if (strCmpI(daqmx_vals[i].str, s.c_str())) return daqmx_vals[i].val;
436  }
437  THROW_EXCEPTION_FMT("Error: Unknown DAQmx constant: %s", s.c_str())
438 }
439 #endif
440 
441 /* -----------------------------------------------------
442  initialize
443 ----------------------------------------------------- */
445 {
446 #if MRPT_HAS_SOME_NIDAQMX
447  this->stop();
448 
449  for (size_t i = 0; i < task_definitions.size(); i++)
450  {
451  const TaskDescription& tf = task_definitions[i];
452 
453  // Try to create a new task:
454  m_running_tasks.push_back(TInfoPerTask());
455  TInfoPerTask& ipt = m_running_tasks.back();
456  ipt.task = tf; // Save a copy of the task info for the thread to have
457  // all the needed info
458 
459  try
460  {
461  TaskHandle& taskHandle =
462  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
463 
464  MRPT_DAQmx_ErrChk(MRPT_DAQmxCreateTask("", &taskHandle));
465 
466  if (tf.has_ai)
467  {
468  ASSERTMSG_(
469  tf.ai.physicalChannelCount > 0,
470  "ai.physicalChannelCount is zero! Please, define it "
471  "correctly.")
472 
475  taskHandle, tf.ai.physicalChannel.c_str(), nullptr,
476  daqmx_defstr2num(tf.ai.terminalConfig), tf.ai.minVal,
477  tf.ai.maxVal, DAQmx_Val_Volts, nullptr));
478  }
479  if (tf.has_ao)
480  {
481  ASSERTMSG_(
482  tf.ao.physicalChannelCount > 0,
483  "ai.physicalChannelCount is zero! Please, define it "
484  "correctly.")
485 
488  taskHandle, tf.ao.physicalChannel.c_str(), nullptr,
489  tf.ao.minVal, tf.ao.maxVal, DAQmx_Val_Volts, nullptr));
490  }
491  if (tf.has_di)
492  {
495  taskHandle, tf.di.line.c_str(), nullptr,
496  DAQmx_Val_ChanPerLine));
497  }
498  if (tf.has_do)
499  {
502  taskHandle, tf.douts.line.c_str(), nullptr,
503  DAQmx_Val_ChanPerLine));
504  }
505  if (tf.has_ci_period)
506  {
509  taskHandle, tf.ci_period.counter.c_str(), nullptr,
511  daqmx_defstr2num(tf.ci_period.units),
512  daqmx_defstr2num(tf.ci_period.edge),
513  DAQmx_Val_LowFreq1Ctr, tf.ci_period.measTime,
514  tf.ci_period.divisor, nullptr));
515  }
516  if (tf.has_ci_count_edges)
517  {
520  taskHandle, tf.ci_count_edges.counter.c_str(), nullptr,
521  daqmx_defstr2num(tf.ci_count_edges.edge),
523  daqmx_defstr2num(tf.ci_count_edges.countDirection)));
524  }
525  if (tf.has_ci_pulse_width)
526  {
529  taskHandle, tf.ci_pulse_width.counter.c_str(), nullptr,
531  daqmx_defstr2num(tf.ci_pulse_width.units),
532  daqmx_defstr2num(tf.ci_pulse_width.startingEdge),
533  nullptr));
534  }
535  if (tf.has_ci_lin_encoder)
536  {
539  taskHandle, tf.ci_lin_encoder.counter.c_str(), nullptr,
540  daqmx_defstr2num(tf.ci_lin_encoder.decodingType),
542  daqmx_defstr2num(tf.ci_lin_encoder.ZidxPhase),
543  daqmx_defstr2num(tf.ci_lin_encoder.units),
545  tf.ci_lin_encoder.initialPos, nullptr));
546  }
547  if (tf.has_ci_ang_encoder)
548  {
551  taskHandle, tf.ci_ang_encoder.counter.c_str(), nullptr,
552  daqmx_defstr2num(tf.ci_ang_encoder.decodingType),
554  daqmx_defstr2num(tf.ci_ang_encoder.ZidxPhase),
555  daqmx_defstr2num(tf.ci_ang_encoder.units),
557  tf.ci_ang_encoder.initialAngle, nullptr));
558  }
559  if (tf.has_co_pulses)
560  {
563  taskHandle, tf.co_pulses.counter.c_str(), nullptr,
564  DAQmx_Val_Hz, daqmx_defstr2num(tf.co_pulses.idleState),
566  tf.co_pulses.dutyCycle));
567  }
568 
569  // Seems to be needed to avoid an errors avoid like:
570  // " Onboard device memory overflow. Because of system and/or
571  // bus-bandwidth limitations, the driver could not read data from
572  // the device fast enough to keep up with the device throughput."
573  if (tf.has_ai || tf.has_di || tf.has_ci_period ||
576  {
577  // sample rate:
581  taskHandle, tf.sampleClkSource.c_str(),
582  tf.samplesPerSecond, DAQmx_Val_Rising,
583  DAQmx_Val_ContSamps, tf.samplesPerChannelToRead));
584 
587  taskHandle, tf.bufferSamplesPerChannel));
588  }
589 
590  if (tf.has_ao)
591  {
592  // Nothing to do as long as we only need "on demand" outputs.
593  // MRPT_DAQmx_ErrChk (MRPT_DAQmxCfgOutputBuffer(taskHandle,2
594  ///*tf.bufferSamplesPerChannel*/ ));
595  // // Output buffer MUST have some data before starting the
596  // task: write 0s:
597  // vector<double> d;
598  // d.assign(tf.ao.physicalChannelCount*2, 0.0);
599  // this->writeAnalogOutputTask(i,1 /* samples per channel */,
600  //&d[0], 0.10 /*timeout*/, false);
601  }
602 
603  // Create pipe:
605 
606  // Add a large timeout, just in case the writing thread dies
607  // unexpectedly so the reader doesn't hang on:
608  ipt.read_pipe->timeout_read_start_us = 100000; // 100ms
609  ipt.read_pipe->timeout_read_between_us = 100000; // 100ms
610 
612 
613  ipt.hThread = std::thread(
615  }
616  catch (std::exception const& e)
617  {
618  std::cerr << "[CNationalInstrumentsDAQ] Error:" << std::endl
619  << e.what() << std::endl;
620  if (ipt.taskHandle != nullptr)
621  {
622  TaskHandle& taskHandle =
623  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
624  MRPT_DAQmxStopTask(taskHandle);
625  MRPT_DAQmxClearTask(taskHandle);
626  }
627 
628  // Stop thread:
629  if (ipt.hThread.joinable())
630  {
631  ipt.must_close = true;
632  cerr << "[CNationalInstrumentsDAQ::initialize] Waiting for the "
633  "grabbing thread to end due to exception...\n";
634  ipt.hThread.join();
635  cerr << "[CNationalInstrumentsDAQ::initialize] Grabbing thread "
636  "ended.\n";
637  }
638 
639  // Remove from list:
640  m_running_tasks.erase(--m_running_tasks.end());
641 
642  std::cerr << "[CNationalInstrumentsDAQ] Error while creating "
643  "tasks. Closing other tasks before returning...\n";
644  this->stop();
645  std::cerr << "[CNationalInstrumentsDAQ] Closing tasks done.\n";
646 
647  throw; // Rethrow
648  }
649  } // end for each task_definitions[i]
650 
651 #else
652  THROW_EXCEPTION("MRPT was compiled without support for NI DAQmx!!")
653 #endif
654 }
655 
656 /** Stop the grabbing threads for DAQ tasks. It is automatically called at
657  * destruction. */
659 {
660  // Stop all threads:
662  it != m_running_tasks.end(); ++it)
663  {
664  it->must_close = true;
665  }
666  if (m_verbose)
667  cout << "[CNationalInstrumentsDAQ::stop] Waiting for grabbing threads "
668  "to end...\n";
670  it != m_running_tasks.end(); ++it)
671  {
672  // For some reason, join doesn't work...
673  if (it->hThread.joinable()) it->hThread.join();
674  // Polling:
675  // for (size_t tim=0;tim<250 && !it->is_closed;tim++) {
676  // std::this_thread::sleep_for(1ms); }
677  // it->hThread.clear();
678  }
679  if (m_verbose)
680  cout << "[CNationalInstrumentsDAQ::stop] All threads ended.\n";
681 
682 // Stop all NI tasks:
683 #if MRPT_HAS_SOME_NIDAQMX
685  it != m_running_tasks.end(); ++it)
686  {
687  TaskHandle& taskHandle =
688  *reinterpret_cast<TaskHandle*>(&it->taskHandle);
689 
690  MRPT_DAQmxStopTask(taskHandle);
691  MRPT_DAQmxClearTask(taskHandle);
692  taskHandle = nullptr;
693  }
694 #endif
695 }
696 
697 /** Returns true if initialize() was called successfully. */
699 {
700  return (!m_running_tasks.empty() && !m_running_tasks.begin()->is_closed);
701 }
702 
703 /*-------------------------------------------------------------
704  readFromDAQ
705 -------------------------------------------------------------*/
707  std::vector<mrpt::obs::CObservationRawDAQ::Ptr>& outObservations,
708  bool& hardwareError)
709 {
710  hardwareError = false;
711  outObservations.clear();
712 
713  if (!checkDAQIsWorking())
714  {
715  hardwareError = true;
716  return;
717  }
718 
719  // Read from the pipe:
720  m_state = ssWorking;
721 
723  it != m_running_tasks.end(); ++it)
724  {
725  CObservationRawDAQ tmp_obs;
726  try
727  {
728  if (it->new_obs_available != 0)
729  {
730  it->read_pipe->ReadObject(&tmp_obs);
731  --(it->new_obs_available);
732 
733  // Yes, valid block of samples was adquired:
734  outObservations.push_back(
736  }
737  }
738  catch (...)
739  {
740  // Timeout...
741  }
742  }
743 }
744 
745 /* -----------------------------------------------------
746  doProcess
747 ----------------------------------------------------- */
749 {
750  bool hwError;
752 
753  if (hwError)
754  {
755  m_state = ssError;
756  THROW_EXCEPTION("Couldn't start DAQ task!");
757  }
758 
759  if (!m_nextObservations.empty())
760  {
761  m_state = ssWorking;
762 
763  std::vector<mrpt::utils::CSerializable::Ptr> new_obs;
764  new_obs.resize(m_nextObservations.size());
765 
766  for (size_t i = 0; i < m_nextObservations.size(); i++)
767  new_obs[i] = m_nextObservations[i];
768 
769  appendObservations(new_obs);
770  }
771 }
772 
773 /* -----------------------------------------------------
774  grabbing_thread
775 ----------------------------------------------------- */
777 {
778 #if MRPT_HAS_SOME_NIDAQMX
779  try
780  {
781  TaskHandle& taskHandle =
782  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
783  if (m_verbose)
784  cout << "[CNationalInstrumentsDAQ::grabbing_thread] Starting "
785  "thread for task "
786  << ipt.taskHandle << "\n";
787 
788  MRPT_TODO("Add write timeout")
789  // ipt.write_pipe->timeout_read_between_us
790 
791  const float timeout =
793 
794  int err = 0;
795  vector<double> dBuf;
796  vector<uint8_t> u8Buf;
797 
798  const mrpt::obs::CObservationRawDAQ clean_obs;
800 
801  while (!ipt.must_close)
802  {
803  obs = clean_obs; // Start with an empty observation
804 
805  // Common stuff:
808  obs.sensorLabel = m_sensorLabel + string(".") + ipt.task.taskLabel;
809 
810  bool there_are_data = false; // At least one channel?
811 
812  // Read from each channel in this task:
813  // -----------------------------------------------
814  if (ipt.task.has_ai)
815  {
817  obs.AIN_interleaved = true;
818 
819  const uint32_t totalSamplesToRead =
822  dBuf.resize(totalSamplesToRead);
823  int32 pointsReadPerChan = -1;
824  if ((err = MRPT_DAQmxReadAnalogF64(
825  taskHandle, ipt.task.samplesPerChannelToRead, timeout,
826  obs.AIN_interleaved ? DAQmx_Val_GroupByScanNumber
827  : DAQmx_Val_GroupByChannel,
828  &dBuf[0], dBuf.size(), &pointsReadPerChan, nullptr)) <
829  0 &&
830  err != DAQmxErrorSamplesNotYetAvailable)
831  {
832  MRPT_DAQmx_ErrChk(err)
833  }
834  else if (pointsReadPerChan > 0)
835  {
837  totalSamplesToRead,
838  pointsReadPerChan * ipt.task.ai.physicalChannelCount)
839  obs.AIN_double = dBuf;
840  there_are_data = true;
841  if (m_verbose)
842  cout << "[CNationalInstrumentsDAQ::grabbing_thread] "
843  << pointsReadPerChan << " analog samples read.\n";
844  }
845  } // end AI
846  if (ipt.task.has_di)
847  {
848  const uint32_t totalSamplesToRead =
850  u8Buf.resize(totalSamplesToRead);
851 
852  int32 pointsReadPerChan = -1;
853  if ((err = MRPT_DAQmxReadDigitalU8(
854  taskHandle, ipt.task.samplesPerChannelToRead, timeout,
855  DAQmx_Val_GroupByChannel, &u8Buf[0], u8Buf.size(),
856  &pointsReadPerChan, nullptr)) < 0 &&
857  err != DAQmxErrorSamplesNotYetAvailable)
858  {
859  MRPT_DAQmx_ErrChk(err)
860  }
861  else if (pointsReadPerChan > 0)
862  {
864  totalSamplesToRead,
865  pointsReadPerChan * ipt.task.ai.physicalChannelCount)
866  obs.DIN = u8Buf;
867  there_are_data = true;
868  if (m_verbose)
869  cout << "[CNationalInstrumentsDAQ::grabbing_thread] "
870  << pointsReadPerChan << " digital samples read.\n";
871  }
872  } // end DI
874  {
875  const int32 totalSamplesToRead =
877  dBuf.resize(totalSamplesToRead);
878  int32 pointsReadPerChan = -1;
879  if ((err = MRPT_DAQmxReadCounterF64(
880  taskHandle, totalSamplesToRead, timeout, &dBuf[0],
881  dBuf.size(), &pointsReadPerChan, nullptr)) < 0 &&
882  err != DAQmxErrorSamplesNotYetAvailable)
883  {
884  MRPT_DAQmx_ErrChk(err)
885  }
886  else if (pointsReadPerChan > 0)
887  {
888  ASSERT_EQUAL_(totalSamplesToRead, pointsReadPerChan)
889 
890  // Decimate?
891  if (++ipt.task.ci_ang_encoder.decimate_cnt >=
893  {
895 
896  obs.CNTRIN_double = dBuf;
897  there_are_data = true;
898  if (m_verbose && !obs.CNTRIN_double.empty())
899  {
900  static int decim = 0;
901  if (!decim)
902  cout << "[CNationalInstrumentsDAQ::grabbing_"
903  "thread] "
904  << pointsReadPerChan
905  << " counter samples read ([0]="
906  << obs.CNTRIN_double[0] << ").\n";
907  if (++decim > 100) decim = 0;
908  }
909  }
910  }
911  } // end COUNTERS
912 
913  // Send the observation to the main thread:
914  if (there_are_data)
915  {
916  ++(ipt.new_obs_available);
917  ipt.write_pipe->WriteObject(&obs);
918  // std::this_thread::sleep_for(1ms); // This seems to be needed
919  // to allow all objs to be sent to the recv thread
920  }
921  else
922  {
923  std::this_thread::sleep_for(1ms);
924  }
925 
926  } // end of main thread loop
927  }
928  catch (std::exception& e)
929  {
930  std::cerr << "[CNationalInstrumentsDAQ::grabbing_thread] Exception:\n"
931  << e.what() << std::endl;
932  }
933 #endif // MRPT_HAS_SOME_NIDAQMX
934 
935  ipt.is_closed = true;
936 }
937 
939  size_t task_index, size_t nSamplesPerChannel, const double* volt_values,
940  double timeout, bool groupedByChannel)
941 {
942 #if MRPT_HAS_SOME_NIDAQMX
943  ASSERT_(task_index < m_running_tasks.size())
944 
946  std::advance(it, task_index);
947  TInfoPerTask& ipt = *it;
948  TaskHandle& taskHandle = *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
949 
950  int32 samplesWritten = 0;
951  int err = 0;
952  if (err = MRPT_DAQmxWriteAnalogF64(
953  taskHandle, nSamplesPerChannel, FALSE, timeout,
954  groupedByChannel ? DAQmx_Val_GroupByChannel
955  : DAQmx_Val_GroupByScanNumber,
956  const_cast<float64*>(volt_values), &samplesWritten, nullptr))
957  {
958  MRPT_DAQmx_ErrChk(err)
959  }
960 #else
961  MRPT_UNUSED_PARAM(task_index);
962  MRPT_UNUSED_PARAM(nSamplesPerChannel);
963  MRPT_UNUSED_PARAM(volt_values);
964  MRPT_UNUSED_PARAM(timeout);
965  MRPT_UNUSED_PARAM(groupedByChannel);
966 #endif
967 }
968 
970  size_t task_index, bool line_value, double timeout)
971 {
972 #if MRPT_HAS_SOME_NIDAQMX
973  ASSERT_(task_index < m_running_tasks.size())
974 
976  std::advance(it, task_index);
977  TInfoPerTask& ipt = *it;
978  TaskHandle& taskHandle = *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
979 
980  uInt8 dat = line_value ? 1 : 0;
981 
982  int32 samplesWritten = 0;
983  int32 nSamplesPerChannel = 1;
984  int err = 0;
985  if (err = MRPT_DAQmxWriteDigitalLines(
986  taskHandle, nSamplesPerChannel, FALSE, timeout,
987  DAQmx_Val_GroupByScanNumber, &dat, &samplesWritten, nullptr))
988  {
989  MRPT_DAQmx_ErrChk(err)
990  }
991 
992 #else
993  MRPT_UNUSED_PARAM(task_index);
994  MRPT_UNUSED_PARAM(line_value);
995  MRPT_UNUSED_PARAM(timeout);
996 #endif
997 }
998 
999 // Ctor:
1001  : has_ai(false),
1002  has_ao(false),
1003  has_di(false),
1004  has_do(false),
1005  has_ci_period(false),
1006  has_ci_count_edges(false),
1007  has_ci_pulse_width(false),
1008  has_ci_lin_encoder(false),
1009  has_ci_ang_encoder(false),
1010  has_co_pulses(false),
1011  samplesPerSecond(1000.0),
1012  bufferSamplesPerChannel(200000),
1013  samplesPerChannelToRead(1000)
1014 {
1015 }
#define IMPLEMENTS_GENERIC_SENSOR(class_name, NameSpace)
This must be inserted in all CGenericSensor classes implementation files:
#define MRPT_DAQmxCreateCOPulseChanFreq
#define MRPT_DAQmxCreateCICountEdgesChan
#define MRPT_DAQmxWriteDigitalLines
#define MRPT_DAQmxCfgInputBuffer
#define MRPT_DAQmxCfgSampClkTiming
#define MRPT_DAQmxStopTask
#define MRPT_DAQmxStartTask
#define MRPT_DAQmxCreateCILinEncoderChan
#define MRPT_DAQmxCreateCIPulseWidthChan
#define MRPT_DAQmxCreateTask
#define MRPT_DAQmxReadCounterF64
#define MRPT_DAQmxCreateAIVoltageChan
#define MRPT_DAQmxReadDigitalU8
#define MRPT_DAQmxCreateCIAngEncoderChan
#define MY_LOAD_HERE_CONFIG_VAR_NO_DEFAULT( variableName, variableType, targetVariable, configFileObject, sectionNameStr)
#define MRPT_DAQmxCreateCIPeriodChan
#define MRPT_DAQmxCreateAOVoltageChan
#define MRPT_DAQmxCreateDOChan
#define MY_LOAD_HERE_CONFIG_VAR( variableName, variableType, targetVariable, configFileObject, sectionNameStr)
#define MRPT_DAQmx_ErrChk(functionCall)
#define MRPT_DAQmxCreateDIChan
#define MRPT_DAQmxClearTask
#define MRPT_DAQmxReadAnalogF64
#define MRPT_DAQmxWriteAnalogF64
void appendObservations(const std::vector< mrpt::utils::CSerializable::Ptr > &obj)
This method must be called by derived classes to enqueue a new observation in the list to be returned...
std::string m_sensorLabel
See CGenericSensor.
An interface to read from data acquisition boards compatible with National Instruments "DAQmx Base" o...
void stop()
Stop the grabbing threads for DAQ tasks.
virtual void initialize()
Setup and launch the DAQ tasks, in parallel threads.
void loadConfig_sensorSpecific(const mrpt::utils::CConfigFileBase &configSource, const std::string &iniSection)
See the class documentation at the top for expected parameters.
std::vector< mrpt::obs::CObservationRawDAQ::Ptr > m_nextObservations
A buffer for doProcess.
void writeAnalogOutputTask(size_t task_index, size_t nSamplesPerChannel, const double *volt_values, double timeout, bool groupedByChannel)
Set voltage outputs to all the outputs in an AOUT task For the meaning of parameters,...
bool checkDAQIsWorking() const
Returns true if initialize() was called and at least one task is running.
void readFromDAQ(std::vector< mrpt::obs::CObservationRawDAQ::Ptr > &outObservations, bool &hardwareError)
Receives data from the DAQ thread(s).
void writeDigitalOutputTask(size_t task_index, bool line_value, double timeout)
Changes the boolean state of one digital output line.
std::vector< TaskDescription > task_definitions
Publicly accessible vector with the list of tasks to be launched upon call to CNationalInstrumentsDAQ...
void grabbing_thread(TInfoPerTask &ipt)
Method to be executed in each parallel thread.
void doProcess()
This method will be invoked at a minimum rate of "process_rate" (Hz)
std::string sensorLabel
An arbitrary label that can be used to identify the sensor.
Definition: CObservation.h:60
mrpt::system::TTimeStamp timestamp
The associated UTC time-stamp.
Definition: CObservation.h:58
Store raw data from a Data Acquisition (DAQ) device, such that input or output analog and digital cha...
std::vector< double > CNTRIN_double
Readings from ticks counters, such as quadrature encoders.
uint16_t AIN_channel_count
Readings from analog input (ADCs) channels (vector length=channel count) in Volts.
double sample_rate
Readings from ticks counters, such as quadrature encoders.
bool AIN_interleaved
Whether the channels are interleaved (A0 A1 A2 A0 A1 A2...) or not (A0 A0 A0 A1 A1 A1 A2 A2 A2....
std::vector< double > AIN_double
Readings from analog input (ADCs) channels (vector length=channel count) in Volts.
std::vector< uint8_t > DIN
Present output values for 16-bit analog output (DACs) channels (vector length=channel count) in volts...
std::shared_ptr< CObservationRawDAQ > Ptr
static void createPipe(ReadPtr &outReadPipe, WritePtr &outWritePipe)
Creates a new pipe and returns the read & write end-points as newly allocated objects.
Definition: CPipe.h:161
This class allows loading and storing values and vectors of different types from a configuration text...
std::string read_string(const std::string &section, const std::string &name, const std::string &defaultValue, bool failIfNotFound=false) const
uint64_t read_uint64_t(const std::string &section, const std::string &name, uint64_t defaultValue, bool failIfNotFound=false) const
Scalar * iterator
Definition: eigen_plugins.h:26
GLdouble GLdouble t
Definition: glext.h:3689
GLdouble s
Definition: glext.h:3676
GLsizei const GLchar ** string
Definition: glext.h:4101
void tokenize(const std::string &inString, const std::string &inDelimiters, std::deque< std::string > &outTokens, bool skipBlankTokens=true) noexcept
Tokenizes a string according to a set of delimiting characters.
std::string trim(const std::string &str)
Removes leading and trailing spaces.
bool strCmpI(const std::string &s1, const std::string &s2)
Return true if the two strings are equal (case insensitive)
mrpt::system::TTimeStamp now()
A shortcut for system::getCurrentTime.
Definition: datetime.h:76
#define FALSE
Definition: jmorecfg.h:216
int val
Definition: mrpt_jpeglib.h:955
#define ASSERT_EQUAL_(__A, __B)
Definition: mrpt_macros.h:318
#define MRPT_TODO(x)
Definition: mrpt_macros.h:82
#define ASSERT_ABOVE_(__A, __B)
Definition: mrpt_macros.h:327
#define ASSERT_(f)
Definition: mrpt_macros.h:309
#define THROW_EXCEPTION(msg)
Definition: mrpt_macros.h:111
#define MRPT_UNUSED_PARAM(a)
Can be used to avoid "not used parameters" warnings from the compiler.
Definition: mrpt_macros.h:365
#define ASSERTMSG_(f, __ERROR_MSG)
Definition: mrpt_macros.h:306
#define THROW_EXCEPTION_FMT(_FORMAT_STRING,...)
Definition: mrpt_macros.h:121
Contains classes for various device interfaces.
This namespace contains representation of robot actions and observations.
This namespace provides a OS-independent interface to many useful functions: filenames manipulation,...
Definition: math_frwds.h:31
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
std::string format(const char *fmt,...) MRPT_printf_format_check(1
A std::string version of C sprintf.
Definition: format.cpp:19
unsigned __int32 uint32_t
Definition: rptypes.h:47
unsigned __int64 uint64_t
Definition: rptypes.h:50
std::unique_ptr< mrpt::synch::CPipeReadEndPoint > read_pipe
TaskDescription task
A copy of the original task description that generated this thread.
std::unique_ptr< mrpt::synch::CPipeWriteEndPoint > write_pipe
unsigned int physicalChannelCount
IMPORTANT This must be the total number of channels listed in "physicalChannel" (e....
unsigned int physicalChannelCount
IMPORTANT This must be the total number of channels listed in "physicalChannel" (e....
std::string line
The digital line (for example "Dev1/port0/line1")
std::string line
The digital line (for example "Dev1/port0/line1")
Each of the tasks to create in CNationalInstrumentsDAQ::initialize().
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_count_edges_t ci_count_edges
Counter: period of a digital signal.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_co_pulses_t co_pulses
Output counter: digital pulses output.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_pulse_width_t ci_pulse_width
Counter: measure the width of a digital pulse.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ai_t ai
Analog inputs.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_lin_encoder_t ci_lin_encoder
Counter: uses a linear encoder to measure linear position.
double samplesPerSecond
Sample clock config: samples per second.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ao_t ao
Analog outputs.
uint32_t samplesPerChannelToRead
(Default=1000) The number of samples to grab at once from each channel.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_ang_encoder_t ci_ang_encoder
Counter: uses an angular encoder to measure angular position.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_di_t di
Digital inputs (di)
uint32_t bufferSamplesPerChannel
(Default=0) From NI's docs: The number of samples the buffer can hold for each channel in the task.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_period_t ci_period
Counter: period of a digital signal.
std::string sampleClkSource
Sample clock source: may be empty (default value) for some channels.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_do_t douts
Digital outs (do)



Page generated by Doxygen 1.9.1 for MRPT 1.9.9 Git: 63ea9d1f1 Thu Nov 23 00:06:53 2017 +0100 at mar 26 may 2026 12:19:29 CEST