MRPT  2.0.0
CNationalInstrumentsDAQ.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | https://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2020, Individual contributors, see AUTHORS file |
6  | See: https://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See: https://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "hwdrivers-precomp.h" // Precompiled headers
11 
14 #include <iostream>
15 #include <iterator> // advance()
16 
17 // If we have both, DAQmx & DAQmxBase, prefer DAQmx:
18 #define MRPT_HAS_SOME_NIDAQMX (MRPT_HAS_NIDAQMXBASE || MRPT_HAS_NIDAQMX)
19 
20 #define MRPT_USE_NIDAQMXBASE (MRPT_HAS_NIDAQMXBASE && !MRPT_HAS_NIDAQMX)
21 #define MRPT_USE_NIDAQMX (MRPT_HAS_NIDAQMX)
22 
23 #if MRPT_USE_NIDAQMXBASE
24 #include "NIDAQmxBase.h" // Include file for NI-DAQmx Base API
25 #endif
26 #if MRPT_USE_NIDAQMX
27 #include "NIDAQmx.h" // Include file for NI-DAQmx API
28 #endif
29 
30 // Macros to use either DAQmx or DAQmx Base automatically, depending on the
31 // installed libraries:
32 #if MRPT_USE_NIDAQMXBASE
33 #define MRPT_DAQmxGetExtendedErrorInfo DAQmxBaseGetExtendedErrorInfo
34 #define MRPT_DAQmxCreateTask DAQmxBaseCreateTask
35 #define MRPT_DAQmxCreateAIVoltageChan DAQmxBaseCreateAIVoltageChan
36 #define MRPT_DAQmxCreateAOVoltageChan DAQmxBaseCreateAOVoltageChan
37 #define MRPT_DAQmxCreateDIChan DAQmxBaseCreateDIChan
38 #define MRPT_DAQmxCreateDOChan DAQmxBaseCreateDOChan
39 #define MRPT_DAQmxCreateCIPeriodChan DAQmxBaseCreateCIPeriodChan
40 #define MRPT_DAQmxCreateCICountEdgesChan DAQmxBaseCreateCICountEdgesChan
41 #define MRPT_DAQmxCreateCIPulseWidthChan DAQmxBaseCreateCIPulseWidthChan
42 #define MRPT_DAQmxCreateCILinEncoderChan DAQmxBaseCreateCILinEncoderChan
43 #define MRPT_DAQmxCreateCIAngEncoderChan DAQmxBaseCreateCIAngEncoderChan
44 #define MRPT_DAQmxCreateCOPulseChanFreq DAQmxBaseCreateCOPulseChanFreq
45 #define MRPT_DAQmxCfgSampClkTiming DAQmxBaseCfgSampClkTiming
46 #define MRPT_DAQmxCfgInputBuffer DAQmxBaseCfgInputBuffer
47 #define MRPT_DAQmxCfgOutputBuffer DAQmxBaseCfgOutputBuffer
48 #define MRPT_DAQmxStartTask DAQmxBaseStartTask
49 #define MRPT_DAQmxStopTask DAQmxBaseStopTask
50 #define MRPT_DAQmxClearTask DAQmxBaseClearTask
51 #define MRPT_DAQmxReadAnalogF64 DAQmxBaseReadAnalogF64
52 #define MRPT_DAQmxReadCounterF64 DAQmxBaseReadCounterF64
53 #define MRPT_DAQmxReadDigitalU8 DAQmxBaseReadDigitalU8
54 #define MRPT_DAQmxWriteAnalogF64 DAQmxBaseWriteAnalogF64
55 #define MRPT_DAQmxWriteDigitalU32 DAQmxBaseWriteDigitalU32
56 #define MRPT_DAQmxWriteDigitalLines DAQmxBaseWriteDigitalLines
57 #else
58 #define MRPT_DAQmxGetExtendedErrorInfo DAQmxGetExtendedErrorInfo
59 #define MRPT_DAQmxCreateTask DAQmxCreateTask
60 #define MRPT_DAQmxCreateAIVoltageChan DAQmxCreateAIVoltageChan
61 #define MRPT_DAQmxCreateAOVoltageChan DAQmxCreateAOVoltageChan
62 #define MRPT_DAQmxCreateDIChan DAQmxCreateDIChan
63 #define MRPT_DAQmxCreateDOChan DAQmxCreateDOChan
64 #define MRPT_DAQmxCreateCIPeriodChan DAQmxCreateCIPeriodChan
65 #define MRPT_DAQmxCreateCICountEdgesChan DAQmxCreateCICountEdgesChan
66 #define MRPT_DAQmxCreateCIPulseWidthChan DAQmxCreateCIPulseWidthChan
67 #define MRPT_DAQmxCreateCILinEncoderChan DAQmxCreateCILinEncoderChan
68 #define MRPT_DAQmxCreateCIAngEncoderChan DAQmxCreateCIAngEncoderChan
69 #define MRPT_DAQmxCreateCOPulseChanFreq DAQmxCreateCOPulseChanFreq
70 #define MRPT_DAQmxCfgSampClkTiming DAQmxCfgSampClkTiming
71 #define MRPT_DAQmxCfgInputBuffer DAQmxCfgInputBuffer
72 #define MRPT_DAQmxCfgOutputBuffer DAQmxCfgOutputBuffer
73 #define MRPT_DAQmxStartTask DAQmxStartTask
74 #define MRPT_DAQmxStopTask DAQmxStopTask
75 #define MRPT_DAQmxClearTask DAQmxClearTask
76 #define MRPT_DAQmxReadAnalogF64 DAQmxReadAnalogF64
77 #define MRPT_DAQmxReadCounterF64 DAQmxReadCounterF64
78 #define MRPT_DAQmxReadDigitalU8 DAQmxReadDigitalU8
79 #define MRPT_DAQmxWriteAnalogF64 DAQmxWriteAnalogF64
80 #define MRPT_DAQmxWriteDigitalU32 DAQmxWriteDigitalU32
81 #define MRPT_DAQmxWriteDigitalLines DAQmxWriteDigitalLines
82 #endif
83 
84 // An auxiliary macro to check and report errors in the DAQmx library as
85 // exceptions with a well-explained message.
86 #define MRPT_DAQmx_ErrChk(functionCall) \
87  if ((functionCall) < 0) \
88  { \
89  char errBuff[2048]; \
90  MRPT_DAQmxGetExtendedErrorInfo(errBuff, 2048); \
91  std::string sErr = mrpt::format( \
92  "DAQ error: '%s'\nCalling: '%s'", errBuff, #functionCall); \
93  THROW_EXCEPTION(sErr); \
94  }
95 
96 using namespace mrpt::hwdrivers;
97 using namespace mrpt::obs;
98 using namespace mrpt::system;
99 using namespace std;
100 
102 
103 // ------------- CNationalInstrumentsDAQ::TInfoPerTask -----------
104 // Default ctor:
106  : new_obs_available(0), task()
107 {
108 }
109 
110 /* -----------------------------------------------------
111  Constructor
112  ----------------------------------------------------- */
114  : mrpt::system::COutputLogger("CNationalInstrumentsDAQ")
115 {
116  m_sensorLabel = "NIDAQ";
117 }
118 
119 // Just like "MRPT_LOAD_HERE_CONFIG_VAR" but...
120 #define MY_LOAD_HERE_CONFIG_VAR( \
121  variableName, variableType, targetVariable, configFileObject, \
122  sectionNameStr) \
123  targetVariable = configFileObject.read_##variableType( \
124  sectionNameStr, variableName, targetVariable, false);
125 
126 #define MY_LOAD_HERE_CONFIG_VAR_NO_DEFAULT( \
127  variableName, variableType, targetVariable, configFileObject, \
128  sectionNameStr) \
129  { \
130  try \
131  { \
132  targetVariable = configFileObject.read_##variableType( \
133  sectionNameStr, variableName, targetVariable, true); \
134  } \
135  catch (std::exception&) \
136  { \
137  THROW_EXCEPTION(format( \
138  "Value for '%s' not found in config file", \
139  std::string(variableName).c_str())); \
140  } \
141  }
142 
143 /* -----------------------------------------------------
144  loadConfig_sensorSpecific
145  ----------------------------------------------------- */
147  const mrpt::config::CConfigFileBase& cfg, const std::string& sect)
148 {
149  // std::vector<TaskDescription> task_definitions;
150  task_definitions.clear();
151 
152  const unsigned int nTasks = cfg.read_uint64_t(sect, "num_tasks", 0, true);
153  if (!nTasks)
154  {
155  std::cerr << "[CNationalInstrumentsDAQ] Warning: Number of tasks is "
156  "zero. No datalogging will be done.\n";
157  }
158 
159  task_definitions.resize(nTasks);
160  for (unsigned int i = 0; i < nTasks; i++)
161  {
163  const string sTask = mrpt::format("task%u", i);
164 
165  // Read general settings for this task:
166  // ---------------------------------------
167  const string sChanns =
168  cfg.read_string(sect, sTask + string(".channels"), "", true);
169  vector<string> lstStrChanns;
170  mrpt::system::tokenize(sChanns, " \t,", lstStrChanns);
171  if (lstStrChanns.empty())
172  THROW_EXCEPTION_FMT("List of channels for task %u is empty!", i);
173 
175  sTask + string(".samplesPerSecond"), double, t.samplesPerSecond,
176  cfg, sect)
178  sTask + string(".samplesPerChannelToRead"), double,
181  sTask + string(".sampleClkSource"), string, t.sampleClkSource, cfg,
182  sect)
184  sTask + string(".bufferSamplesPerChannel"), double,
186  t.taskLabel =
187  cfg.read_string(sect, sTask + string(".taskLabel"), sTask, false);
188 
189  for (auto& lstStrChann : lstStrChanns)
190  {
191  if (strCmpI(lstStrChann, "ai"))
192  {
193  t.has_ai = true;
195  sTask + string(".ai.physicalChannel"), string,
196  t.ai.physicalChannel, cfg, sect)
198  sTask + string(".ai.physicalChannelCount"), uint64_t,
199  t.ai.physicalChannelCount, cfg, sect)
201  sTask + string(".ai.terminalConfig"), string,
202  t.ai.terminalConfig, cfg, sect)
204  sTask + string(".ai.minVal"), double, t.ai.minVal, cfg,
205  sect)
207  sTask + string(".ai.maxVal"), double, t.ai.maxVal, cfg,
208  sect)
209  }
210  else if (strCmpI(lstStrChann, "ao"))
211  {
212  t.has_ao = true;
214  sTask + string(".ao.physicalChannel"), string,
215  t.ao.physicalChannel, cfg, sect)
217  sTask + string(".ao.physicalChannelCount"), uint64_t,
218  t.ao.physicalChannelCount, cfg, sect)
220  sTask + string(".ao.minVal"), double, t.ao.minVal, cfg,
221  sect)
223  sTask + string(".ao.maxVal"), double, t.ao.maxVal, cfg,
224  sect)
225  }
226  else if (strCmpI(lstStrChann, "di"))
227  {
228  t.has_di = true;
230  sTask + string(".di.line"), string, t.di.line, cfg, sect)
231  }
232  else if (strCmpI(lstStrChann, "do"))
233  {
234  t.has_do = true;
236  sTask + string(".do.line"), string, t.douts.line, cfg, sect)
237  }
238  else if (strCmpI(lstStrChann, "ci_period"))
239  {
240  t.has_ci_period = true;
242  sTask + string(".ci_period.counter"), string,
243  t.ci_period.counter, cfg, sect)
245  sTask + string(".ci_period.minVal"), double,
246  t.ci_period.minVal, cfg, sect)
248  sTask + string(".ci_period.maxVal"), double,
249  t.ci_period.maxVal, cfg, sect)
251  sTask + string(".ci_period.units"), string,
252  t.ci_period.units, cfg, sect)
254  sTask + string(".ci_period.edge"), string, t.ci_period.edge,
255  cfg, sect)
257  sTask + string(".ci_period.measTime"), double,
258  t.ci_period.measTime, cfg, sect)
260  sTask + string(".ci_period.divisor"), int,
261  t.ci_period.divisor, cfg, sect)
262  }
263  else if (strCmpI(lstStrChann, "ci_count_edges"))
264  {
265  t.has_ci_count_edges = true;
267  sTask + string(".ci_count_edges.counter"), string,
268  t.ci_count_edges.counter, cfg, sect)
270  sTask + string(".ci_count_edges.edge"), string,
271  t.ci_count_edges.edge, cfg, sect)
273  sTask + string(".ci_count_edges.initialCount"), int,
276  sTask + string(".ci_count_edges.countDirection"), string,
278  }
279  else if (strCmpI(lstStrChann, "ci_pulse_width"))
280  {
281  t.has_ci_pulse_width = true;
283  sTask + string(".ci_pulse_width.counter"), string,
284  t.ci_pulse_width.counter, cfg, sect)
286  sTask + string(".ci_pulse_width.minVal"), double,
287  t.ci_pulse_width.minVal, cfg, sect)
289  sTask + string(".ci_pulse_width.maxVal"), double,
290  t.ci_pulse_width.maxVal, cfg, sect)
292  sTask + string(".ci_pulse_width.units"), string,
293  t.ci_pulse_width.units, cfg, sect)
295  sTask + string(".ci_pulse_width.startingEdge"), string,
297  }
298  else if (strCmpI(lstStrChann, "ci_lin_encoder"))
299  {
300  t.has_ci_lin_encoder = true;
302  sTask + string(".ci_lin_encoder.counter"), string,
303  t.ci_lin_encoder.counter, cfg, sect)
305  sTask + string(".ci_lin_encoder.decodingType"), string,
308  sTask + string(".ci_lin_encoder.ZidxEnable"), bool,
311  sTask + string(".ci_lin_encoder.ZidxVal"), double,
312  t.ci_lin_encoder.ZidxVal, cfg, sect)
314  sTask + string(".ci_lin_encoder.ZidxPhase"), string,
315  t.ci_lin_encoder.ZidxPhase, cfg, sect)
317  sTask + string(".ci_lin_encoder.units"), string,
318  t.ci_lin_encoder.units, cfg, sect)
320  sTask + string(".ci_lin_encoder.distPerPulse"), double,
323  sTask + string(".ci_lin_encoder.initialPos"), double,
325  }
326  else if (strCmpI(lstStrChann, "ci_ang_encoder"))
327  {
328  t.has_ci_ang_encoder = true;
330  sTask + string(".ci_ang_encoder.counter"), string,
331  t.ci_ang_encoder.counter, cfg, sect)
333  sTask + string(".ci_ang_encoder.decodingType"), string,
336  sTask + string(".ci_ang_encoder.ZidxEnable"), bool,
339  sTask + string(".ci_ang_encoder.ZidxVal"), double,
340  t.ci_ang_encoder.ZidxVal, cfg, sect)
342  sTask + string(".ci_ang_encoder.ZidxPhase"), string,
343  t.ci_ang_encoder.ZidxPhase, cfg, sect)
345  sTask + string(".ci_ang_encoder.units"), string,
346  t.ci_ang_encoder.units, cfg, sect)
348  sTask + string(".ci_ang_encoder.pulsesPerRev"), int,
351  sTask + string(".ci_ang_encoder.initialAngle"), double,
354  sTask + string(".ci_ang_encoder.decimate"), int,
355  t.ci_ang_encoder.decimate, cfg, sect)
356  }
357  else if (strCmpI(lstStrChann, "co_pulses"))
358  {
359  t.has_co_pulses = true;
361  sTask + string(".co_pulses.counter"), string,
362  t.co_pulses.counter, cfg, sect)
364  sTask + string(".co_pulses.idleState"), string,
365  t.co_pulses.idleState, cfg, sect)
367  sTask + string(".co_pulses.initialDelay"), double,
368  t.co_pulses.initialDelay, cfg, sect)
370  sTask + string(".co_pulses.freq"), double, t.co_pulses.freq,
371  cfg, sect)
373  sTask + string(".co_pulses.dutyCycle"), double,
374  t.co_pulses.dutyCycle, cfg, sect)
375  }
376  else
377  {
379  "Unknown channel type '%s'! See the docs of "
380  "CNationalInstrumentsDAQ",
381  lstStrChann.c_str());
382  }
383  } // end for each "k" channel in channel "i"
384  } // end for "i", each task
385 }
386 
387 /* -----------------------------------------------------
388  Destructor
389  ----------------------------------------------------- */
391 #if MRPT_HAS_SOME_NIDAQMX
392 // Declare a table to convert strings to their DAQmx #define values:
393 struct daqmx_str_val
394 {
395  const char* str;
396  int val;
397 };
398 
399 const daqmx_str_val daqmx_vals[] = {
400  {"DAQmx_Val_Cfg_Default", DAQmx_Val_Cfg_Default},
401  {"DAQmx_Val_RSE", DAQmx_Val_RSE},
402  {"DAQmx_Val_NRSE", DAQmx_Val_NRSE},
403  {"DAQmx_Val_Diff", DAQmx_Val_Diff},
404  {"DAQmx_Val_Seconds", DAQmx_Val_Seconds},
405  {"DAQmx_Val_Rising", DAQmx_Val_Rising},
406  {"DAQmx_Val_Falling", DAQmx_Val_Falling},
407  {"DAQmx_Val_CountUp", DAQmx_Val_CountUp},
408  {"DAQmx_Val_CountDown", DAQmx_Val_CountDown},
409  {"DAQmx_Val_ExtControlled", DAQmx_Val_ExtControlled},
410  {"DAQmx_Val_AHighBHigh", DAQmx_Val_AHighBHigh},
411  {"DAQmx_Val_AHighBLow", DAQmx_Val_AHighBLow},
412  {"DAQmx_Val_ALowBHigh", DAQmx_Val_ALowBHigh},
413  {"DAQmx_Val_ALowBLow", DAQmx_Val_ALowBLow},
414  {"DAQmx_Val_X1", DAQmx_Val_X1},
415  {"DAQmx_Val_X2", DAQmx_Val_X2},
416  {"DAQmx_Val_X4", DAQmx_Val_X4},
417  {"DAQmx_Val_Meters", DAQmx_Val_Meters},
418  {"DAQmx_Val_Inches", DAQmx_Val_Inches},
419  {"DAQmx_Val_Ticks", DAQmx_Val_Ticks},
420  {"DAQmx_Val_Degrees", DAQmx_Val_Degrees},
421  {"DAQmx_Val_Radians", DAQmx_Val_Radians},
422  {"DAQmx_Val_High", DAQmx_Val_High},
423  {"DAQmx_Val_Low", DAQmx_Val_Low}};
424 
425 int daqmx_defstr2num(const std::string& str)
426 {
427  const std::string s = mrpt::system::trim(str);
428 
429  for (unsigned int i = 0; i < sizeof(daqmx_vals) / sizeof(daqmx_vals[0]);
430  i++)
431  {
432  if (strCmpI(daqmx_vals[i].str, s.c_str())) return daqmx_vals[i].val;
433  }
434  THROW_EXCEPTION_FMT("Error: Unknown DAQmx constant: %s", s.c_str());
435 }
436 #endif
437 
438 /* -----------------------------------------------------
439  initialize
440 ----------------------------------------------------- */
442 {
443 #if MRPT_HAS_SOME_NIDAQMX
444  this->stop();
445 
446  for (size_t i = 0; i < task_definitions.size(); i++)
447  {
448  const TaskDescription& tf = task_definitions[i];
449 
450  // Try to create a new task:
451  m_running_tasks.push_back(TInfoPerTask());
452  TInfoPerTask& ipt = m_running_tasks.back();
453  ipt.task = tf; // Save a copy of the task info for the thread to have
454  // all the needed info
455 
456  try
457  {
458  TaskHandle& taskHandle =
459  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
460 
461  MRPT_DAQmx_ErrChk(MRPT_DAQmxCreateTask("", &taskHandle));
462 
463  if (tf.has_ai)
464  {
465  ASSERTMSG_(
466  tf.ai.physicalChannelCount > 0,
467  "ai.physicalChannelCount is zero! Please, define it "
468  "correctly.")
469 
471  taskHandle, tf.ai.physicalChannel.c_str(), nullptr,
472  daqmx_defstr2num(tf.ai.terminalConfig), tf.ai.minVal,
473  tf.ai.maxVal, DAQmx_Val_Volts, nullptr));
474  }
475  if (tf.has_ao)
476  {
477  ASSERTMSG_(
478  tf.ao.physicalChannelCount > 0,
479  "ai.physicalChannelCount is zero! Please, define it "
480  "correctly.")
481 
483  taskHandle, tf.ao.physicalChannel.c_str(), nullptr,
484  tf.ao.minVal, tf.ao.maxVal, DAQmx_Val_Volts, nullptr));
485  }
486  if (tf.has_di)
487  {
489  taskHandle, tf.di.line.c_str(), nullptr,
490  DAQmx_Val_ChanPerLine));
491  }
492  if (tf.has_do)
493  {
495  taskHandle, tf.douts.line.c_str(), nullptr,
496  DAQmx_Val_ChanPerLine));
497  }
498  if (tf.has_ci_period)
499  {
501  taskHandle, tf.ci_period.counter.c_str(), nullptr,
503  daqmx_defstr2num(tf.ci_period.units),
504  daqmx_defstr2num(tf.ci_period.edge), DAQmx_Val_LowFreq1Ctr,
505  tf.ci_period.measTime, tf.ci_period.divisor, nullptr));
506  }
507  if (tf.has_ci_count_edges)
508  {
510  taskHandle, tf.ci_count_edges.counter.c_str(), nullptr,
511  daqmx_defstr2num(tf.ci_count_edges.edge),
513  daqmx_defstr2num(tf.ci_count_edges.countDirection)));
514  }
515  if (tf.has_ci_pulse_width)
516  {
518  taskHandle, tf.ci_pulse_width.counter.c_str(), nullptr,
520  daqmx_defstr2num(tf.ci_pulse_width.units),
521  daqmx_defstr2num(tf.ci_pulse_width.startingEdge), nullptr));
522  }
523  if (tf.has_ci_lin_encoder)
524  {
526  taskHandle, tf.ci_lin_encoder.counter.c_str(), nullptr,
527  daqmx_defstr2num(tf.ci_lin_encoder.decodingType),
529  daqmx_defstr2num(tf.ci_lin_encoder.ZidxPhase),
530  daqmx_defstr2num(tf.ci_lin_encoder.units),
532  tf.ci_lin_encoder.initialPos, nullptr));
533  }
534  if (tf.has_ci_ang_encoder)
535  {
537  taskHandle, tf.ci_ang_encoder.counter.c_str(), nullptr,
538  daqmx_defstr2num(tf.ci_ang_encoder.decodingType),
540  daqmx_defstr2num(tf.ci_ang_encoder.ZidxPhase),
541  daqmx_defstr2num(tf.ci_ang_encoder.units),
543  tf.ci_ang_encoder.initialAngle, nullptr));
544  }
545  if (tf.has_co_pulses)
546  {
548  taskHandle, tf.co_pulses.counter.c_str(), nullptr,
549  DAQmx_Val_Hz, daqmx_defstr2num(tf.co_pulses.idleState),
551  tf.co_pulses.dutyCycle));
552  }
553 
554  // Seems to be needed to avoid an errors avoid like:
555  // " Onboard device memory overflow. Because of system and/or
556  // bus-bandwidth limitations, the driver could not read data from
557  // the device fast enough to keep up with the device throughput."
558  if (tf.has_ai || tf.has_di || tf.has_ci_period ||
561  {
562  // sample rate:
565  taskHandle, tf.sampleClkSource.c_str(), tf.samplesPerSecond,
566  DAQmx_Val_Rising, DAQmx_Val_ContSamps,
568 
570  taskHandle, tf.bufferSamplesPerChannel));
571  }
572 
573  if (tf.has_ao)
574  {
575  // Nothing to do as long as we only need "on demand" outputs.
576  // MRPT_DAQmx_ErrChk (MRPT_DAQmxCfgOutputBuffer(taskHandle,2
577  ///*tf.bufferSamplesPerChannel*/ ));
578  // // Output buffer MUST have some data before starting the
579  // task: write 0s:
580  // vector<double> d;
581  // d.assign(tf.ao.physicalChannelCount*2, 0.0);
582  // this->writeAnalogOutputTask(i,1 /* samples per channel */,
583  //&d[0], 0.10 /*timeout*/, false);
584  }
585 
586  // Create pipe:
587  mrpt::synch::CPipe::createPipe(ipt.read_pipe, ipt.write_pipe);
588 
589  // Add a large timeout, just in case the writing thread dies
590  // unexpectedly so the reader doesn't hang on:
591  ipt.read_pipe->timeout_read_start_us = 100000; // 100ms
592  ipt.read_pipe->timeout_read_between_us = 100000; // 100ms
593 
595 
596  ipt.hThread = std::thread(
598  }
599  catch (std::exception const& e)
600  {
601  std::cerr << "[CNationalInstrumentsDAQ] Error:" << std::endl
602  << e.what() << std::endl;
603  if (ipt.taskHandle != nullptr)
604  {
605  TaskHandle& taskHandle =
606  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
607  MRPT_DAQmxStopTask(taskHandle);
608  MRPT_DAQmxClearTask(taskHandle);
609  }
610 
611  // Stop thread:
612  if (ipt.hThread.joinable())
613  {
614  ipt.must_close = true;
615  cerr << "[CNationalInstrumentsDAQ::initialize] Waiting for the "
616  "grabbing thread to end due to exception...\n";
617  ipt.hThread.join();
618  cerr << "[CNationalInstrumentsDAQ::initialize] Grabbing thread "
619  "ended.\n";
620  }
621 
622  // Remove from list:
623  m_running_tasks.erase(--m_running_tasks.end());
624 
625  std::cerr << "[CNationalInstrumentsDAQ] Error while creating "
626  "tasks. Closing other tasks before returning...\n";
627  this->stop();
628  std::cerr << "[CNationalInstrumentsDAQ] Closing tasks done.\n";
629 
630  throw; // Rethrow
631  }
632  } // end for each task_definitions[i]
633 
634 #else
635  THROW_EXCEPTION("MRPT was compiled without support for NI DAQmx!!");
636 #endif
637 }
638 
639 /** Stop the grabbing threads for DAQ tasks. It is automatically called at
640  * destruction. */
642 {
643  // Stop all threads:
644  for (auto& m_running_task : m_running_tasks)
645  {
646  m_running_task.must_close = true;
647  }
648  if (m_verbose)
649  cout << "[CNationalInstrumentsDAQ::stop] Waiting for grabbing threads "
650  "to end...\n";
651  for (auto& m_running_task : m_running_tasks)
652  {
653  // For some reason, join doesn't work...
654  if (m_running_task.hThread.joinable()) m_running_task.hThread.join();
655  // Polling:
656  // for (size_t tim=0;tim<250 && !it->is_closed;tim++) {
657  // std::this_thread::sleep_for(1ms); }
658  // it->hThread.clear();
659  }
660  if (m_verbose)
661  cout << "[CNationalInstrumentsDAQ::stop] All threads ended.\n";
662 
663 // Stop all NI tasks:
664 #if MRPT_HAS_SOME_NIDAQMX
665  for (list<TInfoPerTask>::iterator it = m_running_tasks.begin();
666  it != m_running_tasks.end(); ++it)
667  {
668  TaskHandle& taskHandle =
669  *reinterpret_cast<TaskHandle*>(&it->taskHandle);
670 
671  MRPT_DAQmxStopTask(taskHandle);
672  MRPT_DAQmxClearTask(taskHandle);
673  taskHandle = nullptr;
674  }
675 #endif
676 }
677 
678 /** Returns true if initialize() was called successfully. */
680 {
681  return (!m_running_tasks.empty() && !m_running_tasks.begin()->is_closed);
682 }
683 
684 /*-------------------------------------------------------------
685  readFromDAQ
686 -------------------------------------------------------------*/
688  std::vector<mrpt::obs::CObservationRawDAQ::Ptr>& outObservations,
689  bool& hardwareError)
690 {
691  hardwareError = false;
692  outObservations.clear();
693 
694  if (!checkDAQIsWorking())
695  {
696  hardwareError = true;
697  return;
698  }
699 
700  // Read from the pipe:
701  m_state = ssWorking;
702 
703  for (auto& m_running_task : m_running_tasks)
704  {
705  CObservationRawDAQ tmp_obs;
706  try
707  {
708  if (m_running_task.new_obs_available != 0)
709  {
710  auto arch =
711  mrpt::serialization::archiveFrom(*m_running_task.read_pipe);
712  arch.ReadObject(&tmp_obs);
713  --(m_running_task.new_obs_available);
714 
715  // Yes, valid block of samples was adquired:
716  outObservations.push_back(CObservationRawDAQ::Create(tmp_obs));
717  }
718  }
719  catch (...)
720  {
721  // Timeout...
722  }
723  }
724 }
725 
726 /* -----------------------------------------------------
727  doProcess
728 ----------------------------------------------------- */
730 {
731  bool hwError;
733 
734  if (hwError)
735  {
736  m_state = ssError;
737  THROW_EXCEPTION("Couldn't start DAQ task!");
738  }
739 
740  if (!m_nextObservations.empty())
741  {
742  m_state = ssWorking;
743 
744  std::vector<mrpt::serialization::CSerializable::Ptr> new_obs;
745  new_obs.resize(m_nextObservations.size());
746 
747  for (size_t i = 0; i < m_nextObservations.size(); i++)
748  new_obs[i] = m_nextObservations[i];
749 
750  appendObservations(new_obs);
751  }
752 }
753 
754 /* -----------------------------------------------------
755  grabbing_thread
756 ----------------------------------------------------- */
758 {
759 #if MRPT_HAS_SOME_NIDAQMX
760  try
761  {
762  TaskHandle& taskHandle =
763  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
764  if (m_verbose)
765  cout << "[CNationalInstrumentsDAQ::grabbing_thread] Starting "
766  "thread for task "
767  << ipt.taskHandle << "\n";
768 
769  MRPT_TODO("Add write timeout")
770  // ipt.write_pipe->timeout_read_between_us
771 
772  const float timeout =
774 
775  int err = 0;
776  vector<double> dBuf;
777  vector<uint8_t> u8Buf;
778 
779  const mrpt::obs::CObservationRawDAQ clean_obs;
781 
782  while (!ipt.must_close)
783  {
784  obs = clean_obs; // Start with an empty observation
785 
786  // Common stuff:
789  obs.sensorLabel = m_sensorLabel + string(".") + ipt.task.taskLabel;
790 
791  bool there_are_data = false; // At least one channel?
792 
793  // Read from each channel in this task:
794  // -----------------------------------------------
795  if (ipt.task.has_ai)
796  {
798  obs.AIN_interleaved = true;
799 
800  const uint32_t totalSamplesToRead =
803  dBuf.resize(totalSamplesToRead);
804  int32 pointsReadPerChan = -1;
805  if ((err = MRPT_DAQmxReadAnalogF64(
806  taskHandle, ipt.task.samplesPerChannelToRead, timeout,
807  obs.AIN_interleaved ? DAQmx_Val_GroupByScanNumber
808  : DAQmx_Val_GroupByChannel,
809  &dBuf[0], dBuf.size(), &pointsReadPerChan, nullptr)) <
810  0 &&
811  err != DAQmxErrorSamplesNotYetAvailable)
812  {
813  MRPT_DAQmx_ErrChk(err)
814  }
815  else if (pointsReadPerChan > 0)
816  {
818  totalSamplesToRead,
819  pointsReadPerChan * ipt.task.ai.physicalChannelCount)
820  obs.AIN_double = dBuf;
821  there_are_data = true;
822  if (m_verbose)
823  cout << "[CNationalInstrumentsDAQ::grabbing_thread] "
824  << pointsReadPerChan << " analog samples read.\n";
825  }
826  } // end AI
827  if (ipt.task.has_di)
828  {
829  const uint32_t totalSamplesToRead =
831  u8Buf.resize(totalSamplesToRead);
832 
833  int32 pointsReadPerChan = -1;
834  if ((err = MRPT_DAQmxReadDigitalU8(
835  taskHandle, ipt.task.samplesPerChannelToRead, timeout,
836  DAQmx_Val_GroupByChannel, &u8Buf[0], u8Buf.size(),
837  &pointsReadPerChan, nullptr)) < 0 &&
838  err != DAQmxErrorSamplesNotYetAvailable)
839  {
840  MRPT_DAQmx_ErrChk(err)
841  }
842  else if (pointsReadPerChan > 0)
843  {
845  totalSamplesToRead,
846  pointsReadPerChan * ipt.task.ai.physicalChannelCount)
847  obs.DIN = u8Buf;
848  there_are_data = true;
849  if (m_verbose)
850  cout << "[CNationalInstrumentsDAQ::grabbing_thread] "
851  << pointsReadPerChan << " digital samples read.\n";
852  }
853  } // end DI
855  {
856  const int32 totalSamplesToRead =
858  dBuf.resize(totalSamplesToRead);
859  int32 pointsReadPerChan = -1;
860  if ((err = MRPT_DAQmxReadCounterF64(
861  taskHandle, totalSamplesToRead, timeout, &dBuf[0],
862  dBuf.size(), &pointsReadPerChan, nullptr)) < 0 &&
863  err != DAQmxErrorSamplesNotYetAvailable)
864  {
865  MRPT_DAQmx_ErrChk(err)
866  }
867  else if (pointsReadPerChan > 0)
868  {
869  ASSERT_EQUAL_(totalSamplesToRead, pointsReadPerChan);
870  // Decimate?
871  if (++ipt.task.ci_ang_encoder.decimate_cnt >=
873  {
875 
876  obs.CNTRIN_double = dBuf;
877  there_are_data = true;
878  if (m_verbose && !obs.CNTRIN_double.empty())
879  {
880  static int decim = 0;
881  if (!decim)
882  cout << "[CNationalInstrumentsDAQ::grabbing_"
883  "thread] "
884  << pointsReadPerChan
885  << " counter samples read ([0]="
886  << obs.CNTRIN_double[0] << ").\n";
887  if (++decim > 100) decim = 0;
888  }
889  }
890  }
891  } // end COUNTERS
892 
893  // Send the observation to the main thread:
894  if (there_are_data)
895  {
896  ++(ipt.new_obs_available);
897  ipt.write_pipe->WriteObject(&obs);
898  // std::this_thread::sleep_for(1ms); // This seems to be needed
899  // to allow all objs to be sent to the recv thread
900  }
901  else
902  {
903  std::this_thread::sleep_for(1ms);
904  }
905 
906  } // end of main thread loop
907  }
908  catch (const std::exception& e)
909  {
910  std::cerr << "[CNationalInstrumentsDAQ::grabbing_thread] Exception:\n"
911  << e.what() << std::endl;
912  }
913 #endif // MRPT_HAS_SOME_NIDAQMX
914 
915  ipt.is_closed = true;
916 }
917 
919  [[maybe_unused]] size_t task_index,
920  [[maybe_unused]] size_t nSamplesPerChannel,
921  [[maybe_unused]] const double* volt_values, [[maybe_unused]] double timeout,
922  [[maybe_unused]] bool groupedByChannel)
923 {
924 #if MRPT_HAS_SOME_NIDAQMX
925  ASSERT_(task_index < m_running_tasks.size());
926  std::list<TInfoPerTask>::iterator it = m_running_tasks.begin();
927  std::advance(it, task_index);
928  TInfoPerTask& ipt = *it;
929  TaskHandle& taskHandle = *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
930 
931  int32 samplesWritten = 0;
932  int err = 0;
933  if (err = MRPT_DAQmxWriteAnalogF64(
934  taskHandle, nSamplesPerChannel, FALSE, timeout,
935  groupedByChannel ? DAQmx_Val_GroupByChannel
936  : DAQmx_Val_GroupByScanNumber,
937  const_cast<float64*>(volt_values), &samplesWritten, nullptr))
938  {
939  MRPT_DAQmx_ErrChk(err)
940  }
941 #endif
942 }
943 
945  [[maybe_unused]] size_t task_index, [[maybe_unused]] bool line_value,
946  [[maybe_unused]] double timeout)
947 {
948 #if MRPT_HAS_SOME_NIDAQMX
949  ASSERT_(task_index < m_running_tasks.size());
950  std::list<TInfoPerTask>::iterator it = m_running_tasks.begin();
951  std::advance(it, task_index);
952  TInfoPerTask& ipt = *it;
953  TaskHandle& taskHandle = *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
954 
955  uInt8 dat = line_value ? 1 : 0;
956 
957  int32 samplesWritten = 0;
958  int32 nSamplesPerChannel = 1;
959  int err = 0;
960  if (err = MRPT_DAQmxWriteDigitalLines(
961  taskHandle, nSamplesPerChannel, FALSE, timeout,
962  DAQmx_Val_GroupByScanNumber, &dat, &samplesWritten, nullptr))
963  {
964  MRPT_DAQmx_ErrChk(err)
965  }
966 #endif
967 }
968 
969 // Ctor:
uint16_t AIN_channel_count
Readings from analog input (ADCs) channels (vector length=channel count) in Volts.
#define MRPT_DAQmxCreateCILinEncoderChan
double sample_rate
Readings from ticks counters, such as quadrature encoders.
Each of the tasks to create in CNationalInstrumentsDAQ::initialize().
std::string read_string(const std::string &section, const std::string &name, const std::string &defaultValue, bool failIfNotFound=false) const
void loadConfig_sensorSpecific(const mrpt::config::CConfigFileBase &configSource, const std::string &iniSection) override
See the class documentation at the top for expected parameters.
#define MRPT_DAQmxCreateDOChan
std::string line
The digital line (for example "Dev1/port0/line1")
#define THROW_EXCEPTION(msg)
Definition: exceptions.h:67
std::string std::string format(std::string_view fmt, ARGS &&... args)
Definition: format.h:26
#define MRPT_DAQmxCreateCIPulseWidthChan
std::string m_sensorLabel
See CGenericSensor.
#define MRPT_DAQmxStartTask
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_co_pulses_t co_pulses
#define MRPT_DAQmxCreateAOVoltageChan
#define MRPT_DAQmxCreateDIChan
mrpt::system::TTimeStamp now()
A shortcut for system::getCurrentTime.
Definition: datetime.h:86
Contains classes for various device interfaces.
void doProcess() override
This method will be invoked at a minimum rate of "process_rate" (Hz)
#define MRPT_DAQmxCreateCIPeriodChan
STL namespace.
bool checkDAQIsWorking() const
Returns true if initialize() was called and at least one task is running.
void initialize() override
Setup and launch the DAQ tasks, in parallel threads.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_ang_encoder_t ci_ang_encoder
TaskDescription task
A copy of the original task description that generated this thread.
#define MRPT_DAQmxWriteDigitalLines
void tokenize(const std::string &inString, const std::string &inDelimiters, OUT_CONTAINER &outTokens, bool skipBlankTokens=true) noexcept
Tokenizes a string according to a set of delimiting characters.
#define MRPT_DAQmxReadAnalogF64
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_pulse_width_t ci_pulse_width
void appendObservations(const std::vector< mrpt::serialization::CSerializable::Ptr > &obj)
This method must be called by derived classes to enqueue a new observation in the list to be returned...
CArchiveStreamBase< STREAM > archiveFrom(STREAM &s)
Helper function to create a templatized wrapper CArchive object for a: MRPT&#39;s CStream, std::istream, std::ostream, std::stringstream.
Definition: CArchive.h:592
#define ASSERT_(f)
Defines an assertion mechanism.
Definition: exceptions.h:120
This class allows loading and storing values and vectors of different types from a configuration text...
#define MRPT_DAQmxWriteAnalogF64
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_do_t douts
#define ASSERT_EQUAL_(__A, __B)
Assert comparing two values, reporting their actual values upon failure.
Definition: exceptions.h:137
std::string sampleClkSource
Sample clock source: may be empty (default value) for some channels.
#define MRPT_DAQmxCreateCIAngEncoderChan
void writeDigitalOutputTask(size_t task_index, bool line_value, double timeout)
Changes the boolean state of one digital output line.
#define MRPT_DAQmxCfgInputBuffer
Versatile class for consistent logging and management of output messages.
#define MRPT_DAQmxReadCounterF64
This namespace contains representation of robot actions and observations.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_di_t di
int val
Definition: mrpt_jpeglib.h:957
uint64_t read_uint64_t(const std::string &section, const std::string &name, uint64_t defaultValue, bool failIfNotFound=false) const
void stop()
Stop the grabbing threads for DAQ tasks.
void grabbing_thread(TInfoPerTask &ipt)
Method to be executed in each parallel thread.
constexpr auto sect
An interface to read from data acquisition boards compatible with National Instruments "DAQmx Base" o...
#define MY_LOAD_HERE_CONFIG_VAR( variableName, variableType, targetVariable, configFileObject, sectionNameStr)
#define ASSERTMSG_(f, __ERROR_MSG)
Defines an assertion mechanism.
Definition: exceptions.h:108
bool AIN_interleaved
Whether the channels are interleaved (A0 A1 A2 A0 A1 A2...) or not (A0 A0 A0 A1 A1 A1 A2 A2 A2...
#define MRPT_DAQmxStopTask
std::vector< mrpt::obs::CObservationRawDAQ::Ptr > m_nextObservations
A buffer for doProcess.
Store raw data from a Data Acquisition (DAQ) device, such that input or output analog and digital cha...
#define IMPLEMENTS_GENERIC_SENSOR(class_name, NameSpace)
This must be inserted in all CGenericSensor classes implementation files:
void writeAnalogOutputTask(size_t task_index, size_t nSamplesPerChannel, const double *volt_values, double timeout, bool groupedByChannel)
Set voltage outputs to all the outputs in an AOUT task For the meaning of parameters, refere to NI DAQmx docs for DAQmxBaseWriteAnalogF64()
std::vector< double > CNTRIN_double
Readings from ticks counters, such as quadrature encoders.
uint32_t bufferSamplesPerChannel
(Default=0) From NI&#39;s docs: The number of samples the buffer can hold for each channel in the task...
void readFromDAQ(std::vector< mrpt::obs::CObservationRawDAQ::Ptr > &outObservations, bool &hardwareError)
Receives data from the DAQ thread(s).
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_count_edges_t ci_count_edges
std::string line
The digital line (for example "Dev1/port0/line1")
std::string sensorLabel
An arbitrary label that can be used to identify the sensor.
Definition: CObservation.h:62
#define MRPT_DAQmxCreateAIVoltageChan
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
std::vector< double > AIN_double
Readings from analog input (ADCs) channels (vector length=channel count) in Volts.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_lin_encoder_t ci_lin_encoder
mrpt::system::TTimeStamp timestamp
The associated UTC time-stamp.
Definition: CObservation.h:60
uint32_t samplesPerChannelToRead
(Default=1000) The number of samples to grab at once from each channel.
std::vector< TaskDescription > task_definitions
Publicly accessible vector with the list of tasks to be launched upon call to CNationalInstrumentsDAQ...
unsigned int physicalChannelCount
IMPORTANT This must be the total number of channels listed in "physicalChannel" (e.g.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ai_t ai
#define ASSERT_ABOVE_(__A, __B)
Definition: exceptions.h:155
std::unique_ptr< mrpt::io::CPipeWriteEndPoint > write_pipe
#define MRPT_DAQmxCreateCOPulseChanFreq
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_period_t ci_period
Counter: period of a digital signal.
#define MRPT_DAQmx_ErrChk(functionCall)
unsigned int physicalChannelCount
IMPORTANT This must be the total number of channels listed in "physicalChannel" (e.g.
std::string trim(const std::string &str)
Removes leading and trailing spaces.
MRPT_TODO("toPointCloud / calibration")
#define MRPT_DAQmxClearTask
double samplesPerSecond
Sample clock config: samples per second.
std::vector< uint8_t > DIN
Present output values for 16-bit analog output (DACs) channels (vector length=channel count) in volts...
std::unique_ptr< mrpt::io::CPipeReadEndPoint > read_pipe
#define THROW_EXCEPTION_FMT(_FORMAT_STRING,...)
Definition: exceptions.h:69
bool strCmpI(const std::string &s1, const std::string &s2)
Return true if the two strings are equal (case insensitive)
#define MY_LOAD_HERE_CONFIG_VAR_NO_DEFAULT( variableName, variableType, targetVariable, configFileObject, sectionNameStr)
#define MRPT_DAQmxReadDigitalU8
#define MRPT_DAQmxCfgSampClkTiming
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ao_t ao
#define MRPT_DAQmxCreateCICountEdgesChan
#define MRPT_DAQmxCreateTask



Page generated by Doxygen 1.8.14 for MRPT 2.0.0 Git: b38439d21 Tue Mar 31 19:58:06 2020 +0200 at miƩ abr 1 00:50:30 CEST 2020