Main MRPT website > C++ reference for MRPT 1.9.9
CNationalInstrumentsDAQ.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | http://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2017, Individual contributors, see AUTHORS file |
6  | See: http://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See details in http://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "hwdrivers-precomp.h" // Precompiled headers
11 
13 #include <iterator> // advance()
14 
15 // If we have both, DAQmx & DAQmxBase, prefer DAQmx:
16 #define MRPT_HAS_SOME_NIDAQMX (MRPT_HAS_NIDAQMXBASE || MRPT_HAS_NIDAQMX)
17 
18 #define MRPT_USE_NIDAQMXBASE (MRPT_HAS_NIDAQMXBASE && !MRPT_HAS_NIDAQMX)
19 #define MRPT_USE_NIDAQMX (MRPT_HAS_NIDAQMX)
20 
21 #if MRPT_USE_NIDAQMXBASE
22 #include "NIDAQmxBase.h" // Include file for NI-DAQmx Base API
23 #endif
24 #if MRPT_USE_NIDAQMX
25 #include "NIDAQmx.h" // Include file for NI-DAQmx API
26 #endif
27 
28 // Macros to use either DAQmx or DAQmx Base automatically, depending on the
29 // installed libraries:
30 #if MRPT_USE_NIDAQMXBASE
31 #define MRPT_DAQmxGetExtendedErrorInfo DAQmxBaseGetExtendedErrorInfo
32 #define MRPT_DAQmxCreateTask DAQmxBaseCreateTask
33 #define MRPT_DAQmxCreateAIVoltageChan DAQmxBaseCreateAIVoltageChan
34 #define MRPT_DAQmxCreateAOVoltageChan DAQmxBaseCreateAOVoltageChan
35 #define MRPT_DAQmxCreateDIChan DAQmxBaseCreateDIChan
36 #define MRPT_DAQmxCreateDOChan DAQmxBaseCreateDOChan
37 #define MRPT_DAQmxCreateCIPeriodChan DAQmxBaseCreateCIPeriodChan
38 #define MRPT_DAQmxCreateCICountEdgesChan DAQmxBaseCreateCICountEdgesChan
39 #define MRPT_DAQmxCreateCIPulseWidthChan DAQmxBaseCreateCIPulseWidthChan
40 #define MRPT_DAQmxCreateCILinEncoderChan DAQmxBaseCreateCILinEncoderChan
41 #define MRPT_DAQmxCreateCIAngEncoderChan DAQmxBaseCreateCIAngEncoderChan
42 #define MRPT_DAQmxCreateCOPulseChanFreq DAQmxBaseCreateCOPulseChanFreq
43 #define MRPT_DAQmxCfgSampClkTiming DAQmxBaseCfgSampClkTiming
44 #define MRPT_DAQmxCfgInputBuffer DAQmxBaseCfgInputBuffer
45 #define MRPT_DAQmxCfgOutputBuffer DAQmxBaseCfgOutputBuffer
46 #define MRPT_DAQmxStartTask DAQmxBaseStartTask
47 #define MRPT_DAQmxStopTask DAQmxBaseStopTask
48 #define MRPT_DAQmxClearTask DAQmxBaseClearTask
49 #define MRPT_DAQmxReadAnalogF64 DAQmxBaseReadAnalogF64
50 #define MRPT_DAQmxReadCounterF64 DAQmxBaseReadCounterF64
51 #define MRPT_DAQmxReadDigitalU8 DAQmxBaseReadDigitalU8
52 #define MRPT_DAQmxWriteAnalogF64 DAQmxBaseWriteAnalogF64
53 #define MRPT_DAQmxWriteDigitalU32 DAQmxBaseWriteDigitalU32
54 #define MRPT_DAQmxWriteDigitalLines DAQmxBaseWriteDigitalLines
55 #else
56 #define MRPT_DAQmxGetExtendedErrorInfo DAQmxGetExtendedErrorInfo
57 #define MRPT_DAQmxCreateTask DAQmxCreateTask
58 #define MRPT_DAQmxCreateAIVoltageChan DAQmxCreateAIVoltageChan
59 #define MRPT_DAQmxCreateAOVoltageChan DAQmxCreateAOVoltageChan
60 #define MRPT_DAQmxCreateDIChan DAQmxCreateDIChan
61 #define MRPT_DAQmxCreateDOChan DAQmxCreateDOChan
62 #define MRPT_DAQmxCreateCIPeriodChan DAQmxCreateCIPeriodChan
63 #define MRPT_DAQmxCreateCICountEdgesChan DAQmxCreateCICountEdgesChan
64 #define MRPT_DAQmxCreateCIPulseWidthChan DAQmxCreateCIPulseWidthChan
65 #define MRPT_DAQmxCreateCILinEncoderChan DAQmxCreateCILinEncoderChan
66 #define MRPT_DAQmxCreateCIAngEncoderChan DAQmxCreateCIAngEncoderChan
67 #define MRPT_DAQmxCreateCOPulseChanFreq DAQmxCreateCOPulseChanFreq
68 #define MRPT_DAQmxCfgSampClkTiming DAQmxCfgSampClkTiming
69 #define MRPT_DAQmxCfgInputBuffer DAQmxCfgInputBuffer
70 #define MRPT_DAQmxCfgOutputBuffer DAQmxCfgOutputBuffer
71 #define MRPT_DAQmxStartTask DAQmxStartTask
72 #define MRPT_DAQmxStopTask DAQmxStopTask
73 #define MRPT_DAQmxClearTask DAQmxClearTask
74 #define MRPT_DAQmxReadAnalogF64 DAQmxReadAnalogF64
75 #define MRPT_DAQmxReadCounterF64 DAQmxReadCounterF64
76 #define MRPT_DAQmxReadDigitalU8 DAQmxReadDigitalU8
77 #define MRPT_DAQmxWriteAnalogF64 DAQmxWriteAnalogF64
78 #define MRPT_DAQmxWriteDigitalU32 DAQmxWriteDigitalU32
79 #define MRPT_DAQmxWriteDigitalLines DAQmxWriteDigitalLines
80 #endif
81 
82 // An auxiliary macro to check and report errors in the DAQmx library as
83 // exceptions with a well-explained message.
84 #define MRPT_DAQmx_ErrChk(functionCall) \
85  if ((functionCall) < 0) \
86  { \
87  char errBuff[2048]; \
88  MRPT_DAQmxGetExtendedErrorInfo(errBuff, 2048); \
89  std::string sErr = mrpt::format( \
90  "DAQ error: '%s'\nCalling: '%s'", errBuff, #functionCall); \
91  THROW_EXCEPTION(sErr) \
92  }
93 
94 using namespace mrpt::hwdrivers;
95 using namespace mrpt::obs;
96 using namespace mrpt::system;
97 using namespace std;
98 
100 
101 // ------------- CNationalInstrumentsDAQ::TInfoPerTask -----------
102 // Default ctor:
104  : taskHandle(0),
105  must_close(false),
106  is_closed(false),
107  new_obs_available(0),
108  task()
109 {
110 }
111 
112 /* -----------------------------------------------------
113  Constructor
114  ----------------------------------------------------- */
116  : mrpt::utils::COutputLogger("CNationalInstrumentsDAQ")
117 {
118  m_sensorLabel = "NIDAQ";
119 }
120 
121 // Just like "MRPT_LOAD_HERE_CONFIG_VAR" but...
122 #define MY_LOAD_HERE_CONFIG_VAR( \
123  variableName, variableType, targetVariable, configFileObject, \
124  sectionNameStr) \
125  targetVariable = configFileObject.read_##variableType( \
126  sectionNameStr, variableName, targetVariable, false);
127 
128 #define MY_LOAD_HERE_CONFIG_VAR_NO_DEFAULT( \
129  variableName, variableType, targetVariable, configFileObject, \
130  sectionNameStr) \
131  { \
132  try \
133  { \
134  targetVariable = configFileObject.read_##variableType( \
135  sectionNameStr, variableName, targetVariable, true); \
136  } \
137  catch (std::exception&) \
138  { \
139  THROW_EXCEPTION( \
140  format( \
141  "Value for '%s' not found in config file", \
142  std::string(variableName).c_str())); \
143  } \
144  }
145 
146 /* -----------------------------------------------------
147  loadConfig_sensorSpecific
148  ----------------------------------------------------- */
150  const mrpt::utils::CConfigFileBase& cfg, const std::string& sect)
151 {
152  // std::vector<TaskDescription> task_definitions;
153  task_definitions.clear();
154 
155  const unsigned int nTasks = cfg.read_uint64_t(sect, "num_tasks", 0, true);
156  if (!nTasks)
157  {
158  std::cerr << "[CNationalInstrumentsDAQ] Warning: Number of tasks is "
159  "zero. No datalogging will be done.\n";
160  }
161 
162  task_definitions.resize(nTasks);
163  for (unsigned int i = 0; i < nTasks; i++)
164  {
166  const string sTask = mrpt::format("task%u", i);
167 
168  // Read general settings for this task:
169  // ---------------------------------------
170  const string sChanns =
171  cfg.read_string(sect, sTask + string(".channels"), "", true);
172  vector<string> lstStrChanns;
173  mrpt::system::tokenize(sChanns, " \t,", lstStrChanns);
174  if (lstStrChanns.empty())
175  THROW_EXCEPTION_FMT("List of channels for task %u is empty!", i)
176 
178  sTask + string(".samplesPerSecond"), double, t.samplesPerSecond,
179  cfg, sect)
181  sTask + string(".samplesPerChannelToRead"), double,
182  t.samplesPerChannelToRead, cfg, sect)
184  sTask + string(".sampleClkSource"), string, t.sampleClkSource, cfg,
185  sect)
187  sTask + string(".bufferSamplesPerChannel"), double,
188  t.bufferSamplesPerChannel, cfg, sect)
189  t.taskLabel =
190  cfg.read_string(sect, sTask + string(".taskLabel"), sTask, false);
191 
192  for (size_t j = 0; j < lstStrChanns.size(); j++)
193  {
194  if (strCmpI(lstStrChanns[j], "ai"))
195  {
196  t.has_ai = true;
198  sTask + string(".ai.physicalChannel"), string,
199  t.ai.physicalChannel, cfg, sect)
201  sTask + string(".ai.physicalChannelCount"), uint64_t,
202  t.ai.physicalChannelCount, cfg, sect)
204  sTask + string(".ai.terminalConfig"), string,
205  t.ai.terminalConfig, cfg, sect)
207  sTask + string(".ai.minVal"), double, t.ai.minVal, cfg,
208  sect)
210  sTask + string(".ai.maxVal"), double, t.ai.maxVal, cfg,
211  sect)
212  }
213  else if (strCmpI(lstStrChanns[j], "ao"))
214  {
215  t.has_ao = true;
217  sTask + string(".ao.physicalChannel"), string,
218  t.ao.physicalChannel, cfg, sect)
220  sTask + string(".ao.physicalChannelCount"), uint64_t,
221  t.ao.physicalChannelCount, cfg, sect)
223  sTask + string(".ao.minVal"), double, t.ao.minVal, cfg,
224  sect)
226  sTask + string(".ao.maxVal"), double, t.ao.maxVal, cfg,
227  sect)
228  }
229  else if (strCmpI(lstStrChanns[j], "di"))
230  {
231  t.has_di = true;
233  sTask + string(".di.line"), string, t.di.line, cfg, sect)
234  }
235  else if (strCmpI(lstStrChanns[j], "do"))
236  {
237  t.has_do = true;
239  sTask + string(".do.line"), string, t.douts.line, cfg, sect)
240  }
241  else if (strCmpI(lstStrChanns[j], "ci_period"))
242  {
243  t.has_ci_period = true;
245  sTask + string(".ci_period.counter"), string,
246  t.ci_period.counter, cfg, sect)
248  sTask + string(".ci_period.minVal"), double,
249  t.ci_period.minVal, cfg, sect)
251  sTask + string(".ci_period.maxVal"), double,
252  t.ci_period.maxVal, cfg, sect)
254  sTask + string(".ci_period.units"), string,
255  t.ci_period.units, cfg, sect)
257  sTask + string(".ci_period.edge"), string, t.ci_period.edge,
258  cfg, sect)
260  sTask + string(".ci_period.measTime"), double,
261  t.ci_period.measTime, cfg, sect)
263  sTask + string(".ci_period.divisor"), int,
264  t.ci_period.divisor, cfg, sect)
265  }
266  else if (strCmpI(lstStrChanns[j], "ci_count_edges"))
267  {
268  t.has_ci_count_edges = true;
270  sTask + string(".ci_count_edges.counter"), string,
271  t.ci_count_edges.counter, cfg, sect)
273  sTask + string(".ci_count_edges.edge"), string,
274  t.ci_count_edges.edge, cfg, sect)
276  sTask + string(".ci_count_edges.initialCount"), int,
277  t.ci_count_edges.initialCount, cfg, sect)
279  sTask + string(".ci_count_edges.countDirection"), string,
280  t.ci_count_edges.countDirection, cfg, sect)
281  }
282  else if (strCmpI(lstStrChanns[j], "ci_pulse_width"))
283  {
284  t.has_ci_pulse_width = true;
286  sTask + string(".ci_pulse_width.counter"), string,
287  t.ci_pulse_width.counter, cfg, sect)
289  sTask + string(".ci_pulse_width.minVal"), double,
290  t.ci_pulse_width.minVal, cfg, sect)
292  sTask + string(".ci_pulse_width.maxVal"), double,
293  t.ci_pulse_width.maxVal, cfg, sect)
295  sTask + string(".ci_pulse_width.units"), string,
296  t.ci_pulse_width.units, cfg, sect)
298  sTask + string(".ci_pulse_width.startingEdge"), string,
299  t.ci_pulse_width.startingEdge, cfg, sect)
300  }
301  else if (strCmpI(lstStrChanns[j], "ci_lin_encoder"))
302  {
303  t.has_ci_lin_encoder = true;
305  sTask + string(".ci_lin_encoder.counter"), string,
306  t.ci_lin_encoder.counter, cfg, sect)
308  sTask + string(".ci_lin_encoder.decodingType"), string,
309  t.ci_lin_encoder.decodingType, cfg, sect)
311  sTask + string(".ci_lin_encoder.ZidxEnable"), bool,
312  t.ci_lin_encoder.ZidxEnable, cfg, sect)
314  sTask + string(".ci_lin_encoder.ZidxVal"), double,
315  t.ci_lin_encoder.ZidxVal, cfg, sect)
317  sTask + string(".ci_lin_encoder.ZidxPhase"), string,
318  t.ci_lin_encoder.ZidxPhase, cfg, sect)
320  sTask + string(".ci_lin_encoder.units"), string,
321  t.ci_lin_encoder.units, cfg, sect)
323  sTask + string(".ci_lin_encoder.distPerPulse"), double,
324  t.ci_lin_encoder.distPerPulse, cfg, sect)
326  sTask + string(".ci_lin_encoder.initialPos"), double,
327  t.ci_lin_encoder.initialPos, cfg, sect)
328  }
329  else if (strCmpI(lstStrChanns[j], "ci_ang_encoder"))
330  {
331  t.has_ci_ang_encoder = true;
333  sTask + string(".ci_ang_encoder.counter"), string,
334  t.ci_ang_encoder.counter, cfg, sect)
336  sTask + string(".ci_ang_encoder.decodingType"), string,
337  t.ci_ang_encoder.decodingType, cfg, sect)
339  sTask + string(".ci_ang_encoder.ZidxEnable"), bool,
340  t.ci_ang_encoder.ZidxEnable, cfg, sect)
342  sTask + string(".ci_ang_encoder.ZidxVal"), double,
343  t.ci_ang_encoder.ZidxVal, cfg, sect)
345  sTask + string(".ci_ang_encoder.ZidxPhase"), string,
346  t.ci_ang_encoder.ZidxPhase, cfg, sect)
348  sTask + string(".ci_ang_encoder.units"), string,
349  t.ci_ang_encoder.units, cfg, sect)
351  sTask + string(".ci_ang_encoder.pulsesPerRev"), int,
352  t.ci_ang_encoder.pulsesPerRev, cfg, sect)
354  sTask + string(".ci_ang_encoder.initialAngle"), double,
355  t.ci_ang_encoder.initialAngle, cfg, sect)
357  sTask + string(".ci_ang_encoder.decimate"), int,
358  t.ci_ang_encoder.decimate, cfg, sect)
359  }
360  else if (strCmpI(lstStrChanns[j], "co_pulses"))
361  {
362  t.has_co_pulses = true;
364  sTask + string(".co_pulses.counter"), string,
365  t.co_pulses.counter, cfg, sect)
367  sTask + string(".co_pulses.idleState"), string,
368  t.co_pulses.idleState, cfg, sect)
370  sTask + string(".co_pulses.initialDelay"), double,
371  t.co_pulses.initialDelay, cfg, sect)
373  sTask + string(".co_pulses.freq"), double, t.co_pulses.freq,
374  cfg, sect)
376  sTask + string(".co_pulses.dutyCycle"), double,
377  t.co_pulses.dutyCycle, cfg, sect)
378  }
379  else
380  {
382  "Unknown channel type '%s'! See the docs of "
383  "CNationalInstrumentsDAQ",
384  lstStrChanns[j].c_str())
385  }
386  } // end for each "k" channel in channel "i"
387  } // end for "i", each task
388 }
389 
390 /* -----------------------------------------------------
391  Destructor
392  ----------------------------------------------------- */
394 #if MRPT_HAS_SOME_NIDAQMX
395 // Declare a table to convert strings to their DAQmx #define values:
396 struct daqmx_str_val
397 {
398  const char* str;
399  int val;
400 };
401 
402 const daqmx_str_val daqmx_vals[] = {
403  {"DAQmx_Val_Cfg_Default", DAQmx_Val_Cfg_Default},
404  {"DAQmx_Val_RSE", DAQmx_Val_RSE},
405  {"DAQmx_Val_NRSE", DAQmx_Val_NRSE},
406  {"DAQmx_Val_Diff", DAQmx_Val_Diff},
407  {"DAQmx_Val_Seconds", DAQmx_Val_Seconds},
408  {"DAQmx_Val_Rising", DAQmx_Val_Rising},
409  {"DAQmx_Val_Falling", DAQmx_Val_Falling},
410  {"DAQmx_Val_CountUp", DAQmx_Val_CountUp},
411  {"DAQmx_Val_CountDown", DAQmx_Val_CountDown},
412  {"DAQmx_Val_ExtControlled", DAQmx_Val_ExtControlled},
413  {"DAQmx_Val_AHighBHigh", DAQmx_Val_AHighBHigh},
414  {"DAQmx_Val_AHighBLow", DAQmx_Val_AHighBLow},
415  {"DAQmx_Val_ALowBHigh", DAQmx_Val_ALowBHigh},
416  {"DAQmx_Val_ALowBLow", DAQmx_Val_ALowBLow},
417  {"DAQmx_Val_X1", DAQmx_Val_X1},
418  {"DAQmx_Val_X2", DAQmx_Val_X2},
419  {"DAQmx_Val_X4", DAQmx_Val_X4},
420  {"DAQmx_Val_Meters", DAQmx_Val_Meters},
421  {"DAQmx_Val_Inches", DAQmx_Val_Inches},
422  {"DAQmx_Val_Ticks", DAQmx_Val_Ticks},
423  {"DAQmx_Val_Degrees", DAQmx_Val_Degrees},
424  {"DAQmx_Val_Radians", DAQmx_Val_Radians},
425  {"DAQmx_Val_High", DAQmx_Val_High},
426  {"DAQmx_Val_Low", DAQmx_Val_Low}};
427 
428 int daqmx_defstr2num(const std::string& str)
429 {
430  const std::string s = mrpt::system::trim(str);
431 
432  for (unsigned int i = 0; i < sizeof(daqmx_vals) / sizeof(daqmx_vals[0]);
433  i++)
434  {
435  if (strCmpI(daqmx_vals[i].str, s.c_str())) return daqmx_vals[i].val;
436  }
437  THROW_EXCEPTION_FMT("Error: Unknown DAQmx constant: %s", s.c_str())
438 }
439 #endif
440 
441 /* -----------------------------------------------------
442  initialize
443 ----------------------------------------------------- */
445 {
446 #if MRPT_HAS_SOME_NIDAQMX
447  this->stop();
448 
449  for (size_t i = 0; i < task_definitions.size(); i++)
450  {
451  const TaskDescription& tf = task_definitions[i];
452 
453  // Try to create a new task:
454  m_running_tasks.push_back(TInfoPerTask());
455  TInfoPerTask& ipt = m_running_tasks.back();
456  ipt.task = tf; // Save a copy of the task info for the thread to have
457  // all the needed info
458 
459  try
460  {
461  TaskHandle& taskHandle =
462  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
463 
464  MRPT_DAQmx_ErrChk(MRPT_DAQmxCreateTask("", &taskHandle));
465 
466  if (tf.has_ai)
467  {
468  ASSERTMSG_(
469  tf.ai.physicalChannelCount > 0,
470  "ai.physicalChannelCount is zero! Please, define it "
471  "correctly.")
472 
475  taskHandle, tf.ai.physicalChannel.c_str(), nullptr,
476  daqmx_defstr2num(tf.ai.terminalConfig), tf.ai.minVal,
477  tf.ai.maxVal, DAQmx_Val_Volts, nullptr));
478  }
479  if (tf.has_ao)
480  {
481  ASSERTMSG_(
482  tf.ao.physicalChannelCount > 0,
483  "ai.physicalChannelCount is zero! Please, define it "
484  "correctly.")
485 
488  taskHandle, tf.ao.physicalChannel.c_str(), nullptr,
489  tf.ao.minVal, tf.ao.maxVal, DAQmx_Val_Volts, nullptr));
490  }
491  if (tf.has_di)
492  {
495  taskHandle, tf.di.line.c_str(), nullptr,
496  DAQmx_Val_ChanPerLine));
497  }
498  if (tf.has_do)
499  {
502  taskHandle, tf.douts.line.c_str(), nullptr,
503  DAQmx_Val_ChanPerLine));
504  }
505  if (tf.has_ci_period)
506  {
509  taskHandle, tf.ci_period.counter.c_str(), nullptr,
511  daqmx_defstr2num(tf.ci_period.units),
512  daqmx_defstr2num(tf.ci_period.edge),
513  DAQmx_Val_LowFreq1Ctr, tf.ci_period.measTime,
514  tf.ci_period.divisor, nullptr));
515  }
516  if (tf.has_ci_count_edges)
517  {
520  taskHandle, tf.ci_count_edges.counter.c_str(), nullptr,
521  daqmx_defstr2num(tf.ci_count_edges.edge),
523  daqmx_defstr2num(tf.ci_count_edges.countDirection)));
524  }
525  if (tf.has_ci_pulse_width)
526  {
529  taskHandle, tf.ci_pulse_width.counter.c_str(), nullptr,
531  daqmx_defstr2num(tf.ci_pulse_width.units),
532  daqmx_defstr2num(tf.ci_pulse_width.startingEdge),
533  nullptr));
534  }
535  if (tf.has_ci_lin_encoder)
536  {
539  taskHandle, tf.ci_lin_encoder.counter.c_str(), nullptr,
540  daqmx_defstr2num(tf.ci_lin_encoder.decodingType),
542  daqmx_defstr2num(tf.ci_lin_encoder.ZidxPhase),
543  daqmx_defstr2num(tf.ci_lin_encoder.units),
545  tf.ci_lin_encoder.initialPos, nullptr));
546  }
547  if (tf.has_ci_ang_encoder)
548  {
551  taskHandle, tf.ci_ang_encoder.counter.c_str(), nullptr,
552  daqmx_defstr2num(tf.ci_ang_encoder.decodingType),
554  daqmx_defstr2num(tf.ci_ang_encoder.ZidxPhase),
555  daqmx_defstr2num(tf.ci_ang_encoder.units),
557  tf.ci_ang_encoder.initialAngle, nullptr));
558  }
559  if (tf.has_co_pulses)
560  {
563  taskHandle, tf.co_pulses.counter.c_str(), nullptr,
564  DAQmx_Val_Hz, daqmx_defstr2num(tf.co_pulses.idleState),
566  tf.co_pulses.dutyCycle));
567  }
568 
569  // Seems to be needed to avoid an errors avoid like:
570  // " Onboard device memory overflow. Because of system and/or
571  // bus-bandwidth limitations, the driver could not read data from
572  // the device fast enough to keep up with the device throughput."
573  if (tf.has_ai || tf.has_di || tf.has_ci_period ||
576  {
577  // sample rate:
581  taskHandle, tf.sampleClkSource.c_str(),
582  tf.samplesPerSecond, DAQmx_Val_Rising,
583  DAQmx_Val_ContSamps, tf.samplesPerChannelToRead));
584 
587  taskHandle, tf.bufferSamplesPerChannel));
588  }
589 
590  if (tf.has_ao)
591  {
592  // Nothing to do as long as we only need "on demand" outputs.
593  // MRPT_DAQmx_ErrChk (MRPT_DAQmxCfgOutputBuffer(taskHandle,2
594  ///*tf.bufferSamplesPerChannel*/ ));
595  // // Output buffer MUST have some data before starting the
596  // task: write 0s:
597  // vector<double> d;
598  // d.assign(tf.ao.physicalChannelCount*2, 0.0);
599  // this->writeAnalogOutputTask(i,1 /* samples per channel */,
600  //&d[0], 0.10 /*timeout*/, false);
601  }
602 
603  // Create pipe:
605 
606  // Add a large timeout, just in case the writing thread dies
607  // unexpectedly so the reader doesn't hang on:
608  ipt.read_pipe->timeout_read_start_us = 100000; // 100ms
609  ipt.read_pipe->timeout_read_between_us = 100000; // 100ms
610 
612 
613  ipt.hThread = std::thread(
615  }
616  catch (std::exception const& e)
617  {
618  std::cerr << "[CNationalInstrumentsDAQ] Error:" << std::endl
619  << e.what() << std::endl;
620  if (ipt.taskHandle != nullptr)
621  {
622  TaskHandle& taskHandle =
623  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
624  MRPT_DAQmxStopTask(taskHandle);
625  MRPT_DAQmxClearTask(taskHandle);
626  }
627 
628  // Stop thread:
629  if (ipt.hThread.joinable())
630  {
631  ipt.must_close = true;
632  cerr << "[CNationalInstrumentsDAQ::initialize] Waiting for the "
633  "grabbing thread to end due to exception...\n";
634  ipt.hThread.join();
635  cerr << "[CNationalInstrumentsDAQ::initialize] Grabbing thread "
636  "ended.\n";
637  }
638 
639  // Remove from list:
640  m_running_tasks.erase(--m_running_tasks.end());
641 
642  std::cerr << "[CNationalInstrumentsDAQ] Error while creating "
643  "tasks. Closing other tasks before returning...\n";
644  this->stop();
645  std::cerr << "[CNationalInstrumentsDAQ] Closing tasks done.\n";
646 
647  throw; // Rethrow
648  }
649  } // end for each task_definitions[i]
650 
651 #else
652  THROW_EXCEPTION("MRPT was compiled without support for NI DAQmx!!")
653 #endif
654 }
655 
656 /** Stop the grabbing threads for DAQ tasks. It is automatically called at
657  * destruction. */
659 {
660  // Stop all threads:
662  it != m_running_tasks.end(); ++it)
663  {
664  it->must_close = true;
665  }
666  if (m_verbose)
667  cout << "[CNationalInstrumentsDAQ::stop] Waiting for grabbing threads "
668  "to end...\n";
670  it != m_running_tasks.end(); ++it)
671  {
672  // For some reason, join doesn't work...
673  if (it->hThread.joinable()) it->hThread.join();
674  // Polling:
675  // for (size_t tim=0;tim<250 && !it->is_closed;tim++) {
676  // std::this_thread::sleep_for(1ms); }
677  // it->hThread.clear();
678  }
679  if (m_verbose)
680  cout << "[CNationalInstrumentsDAQ::stop] All threads ended.\n";
681 
682 // Stop all NI tasks:
683 #if MRPT_HAS_SOME_NIDAQMX
685  it != m_running_tasks.end(); ++it)
686  {
687  TaskHandle& taskHandle =
688  *reinterpret_cast<TaskHandle*>(&it->taskHandle);
689 
690  MRPT_DAQmxStopTask(taskHandle);
691  MRPT_DAQmxClearTask(taskHandle);
692  taskHandle = nullptr;
693  }
694 #endif
695 }
696 
697 /** Returns true if initialize() was called successfully. */
699 {
700  return (!m_running_tasks.empty() && !m_running_tasks.begin()->is_closed);
701 }
702 
703 /*-------------------------------------------------------------
704  readFromDAQ
705 -------------------------------------------------------------*/
707  std::vector<mrpt::obs::CObservationRawDAQ::Ptr>& outObservations,
708  bool& hardwareError)
709 {
710  hardwareError = false;
711  outObservations.clear();
712 
713  if (!checkDAQIsWorking())
714  {
715  hardwareError = true;
716  return;
717  }
718 
719  // Read from the pipe:
720  m_state = ssWorking;
721 
723  it != m_running_tasks.end(); ++it)
724  {
725  CObservationRawDAQ tmp_obs;
726  try
727  {
728  if (it->new_obs_available != 0)
729  {
730  it->read_pipe->ReadObject(&tmp_obs);
731  --(it->new_obs_available);
732 
733  // Yes, valid block of samples was adquired:
734  outObservations.push_back(
736  }
737  }
738  catch (...)
739  {
740  // Timeout...
741  }
742  }
743 }
744 
745 /* -----------------------------------------------------
746  doProcess
747 ----------------------------------------------------- */
749 {
750  bool hwError;
752 
753  if (hwError)
754  {
755  m_state = ssError;
756  THROW_EXCEPTION("Couldn't start DAQ task!");
757  }
758 
759  if (!m_nextObservations.empty())
760  {
761  m_state = ssWorking;
762 
763  std::vector<mrpt::utils::CSerializable::Ptr> new_obs;
764  new_obs.resize(m_nextObservations.size());
765 
766  for (size_t i = 0; i < m_nextObservations.size(); i++)
767  new_obs[i] = m_nextObservations[i];
768 
769  appendObservations(new_obs);
770  }
771 }
772 
773 /* -----------------------------------------------------
774  grabbing_thread
775 ----------------------------------------------------- */
777 {
778 #if MRPT_HAS_SOME_NIDAQMX
779  try
780  {
781  TaskHandle& taskHandle =
782  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
783  if (m_verbose)
784  cout << "[CNationalInstrumentsDAQ::grabbing_thread] Starting "
785  "thread for task "
786  << ipt.taskHandle << "\n";
787 
788  MRPT_TODO("Add write timeout")
789  // ipt.write_pipe->timeout_read_between_us
790 
791  const float timeout =
793 
794  int err = 0;
795  vector<double> dBuf;
796  vector<uint8_t> u8Buf;
797 
798  const mrpt::obs::CObservationRawDAQ clean_obs;
800 
801  while (!ipt.must_close)
802  {
803  obs = clean_obs; // Start with an empty observation
804 
805  // Common stuff:
808  obs.sensorLabel = m_sensorLabel + string(".") + ipt.task.taskLabel;
809 
810  bool there_are_data = false; // At least one channel?
811 
812  // Read from each channel in this task:
813  // -----------------------------------------------
814  if (ipt.task.has_ai)
815  {
817  obs.AIN_interleaved = true;
818 
819  const uint32_t totalSamplesToRead =
822  dBuf.resize(totalSamplesToRead);
823  int32 pointsReadPerChan = -1;
824  if ((err = MRPT_DAQmxReadAnalogF64(
825  taskHandle, ipt.task.samplesPerChannelToRead, timeout,
826  obs.AIN_interleaved ? DAQmx_Val_GroupByScanNumber
827  : DAQmx_Val_GroupByChannel,
828  &dBuf[0], dBuf.size(), &pointsReadPerChan, nullptr)) <
829  0 &&
830  err != DAQmxErrorSamplesNotYetAvailable)
831  {
832  MRPT_DAQmx_ErrChk(err)
833  }
834  else if (pointsReadPerChan > 0)
835  {
837  totalSamplesToRead,
838  pointsReadPerChan * ipt.task.ai.physicalChannelCount)
839  obs.AIN_double = dBuf;
840  there_are_data = true;
841  if (m_verbose)
842  cout << "[CNationalInstrumentsDAQ::grabbing_thread] "
843  << pointsReadPerChan << " analog samples read.\n";
844  }
845  } // end AI
846  if (ipt.task.has_di)
847  {
848  const uint32_t totalSamplesToRead =
850  u8Buf.resize(totalSamplesToRead);
851 
852  int32 pointsReadPerChan = -1;
853  if ((err = MRPT_DAQmxReadDigitalU8(
854  taskHandle, ipt.task.samplesPerChannelToRead, timeout,
855  DAQmx_Val_GroupByChannel, &u8Buf[0], u8Buf.size(),
856  &pointsReadPerChan, nullptr)) < 0 &&
857  err != DAQmxErrorSamplesNotYetAvailable)
858  {
859  MRPT_DAQmx_ErrChk(err)
860  }
861  else if (pointsReadPerChan > 0)
862  {
864  totalSamplesToRead,
865  pointsReadPerChan * ipt.task.ai.physicalChannelCount)
866  obs.DIN = u8Buf;
867  there_are_data = true;
868  if (m_verbose)
869  cout << "[CNationalInstrumentsDAQ::grabbing_thread] "
870  << pointsReadPerChan << " digital samples read.\n";
871  }
872  } // end DI
874  {
875  const int32 totalSamplesToRead =
877  dBuf.resize(totalSamplesToRead);
878  int32 pointsReadPerChan = -1;
879  if ((err = MRPT_DAQmxReadCounterF64(
880  taskHandle, totalSamplesToRead, timeout, &dBuf[0],
881  dBuf.size(), &pointsReadPerChan, nullptr)) < 0 &&
882  err != DAQmxErrorSamplesNotYetAvailable)
883  {
884  MRPT_DAQmx_ErrChk(err)
885  }
886  else if (pointsReadPerChan > 0)
887  {
888  ASSERT_EQUAL_(totalSamplesToRead, pointsReadPerChan)
889 
890  // Decimate?
891  if (++ipt.task.ci_ang_encoder.decimate_cnt >=
893  {
895 
896  obs.CNTRIN_double = dBuf;
897  there_are_data = true;
898  if (m_verbose && !obs.CNTRIN_double.empty())
899  {
900  static int decim = 0;
901  if (!decim)
902  cout << "[CNationalInstrumentsDAQ::grabbing_"
903  "thread] "
904  << pointsReadPerChan
905  << " counter samples read ([0]="
906  << obs.CNTRIN_double[0] << ").\n";
907  if (++decim > 100) decim = 0;
908  }
909  }
910  }
911  } // end COUNTERS
912 
913  // Send the observation to the main thread:
914  if (there_are_data)
915  {
916  ++(ipt.new_obs_available);
917  ipt.write_pipe->WriteObject(&obs);
918  // std::this_thread::sleep_for(1ms); // This seems to be needed
919  // to allow all objs to be sent to the recv thread
920  }
921  else
922  {
923  std::this_thread::sleep_for(1ms);
924  }
925 
926  } // end of main thread loop
927  }
928  catch (std::exception& e)
929  {
930  std::cerr << "[CNationalInstrumentsDAQ::grabbing_thread] Exception:\n"
931  << e.what() << std::endl;
932  }
933 #endif // MRPT_HAS_SOME_NIDAQMX
934 
935  ipt.is_closed = true;
936 }
937 
939  size_t task_index, size_t nSamplesPerChannel, const double* volt_values,
940  double timeout, bool groupedByChannel)
941 {
942 #if MRPT_HAS_SOME_NIDAQMX
943  ASSERT_(task_index < m_running_tasks.size())
944 
946  std::advance(it, task_index);
947  TInfoPerTask& ipt = *it;
948  TaskHandle& taskHandle = *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
949 
950  int32 samplesWritten = 0;
951  int err = 0;
952  if (err = MRPT_DAQmxWriteAnalogF64(
953  taskHandle, nSamplesPerChannel, FALSE, timeout,
954  groupedByChannel ? DAQmx_Val_GroupByChannel
955  : DAQmx_Val_GroupByScanNumber,
956  const_cast<float64*>(volt_values), &samplesWritten, nullptr))
957  {
958  MRPT_DAQmx_ErrChk(err)
959  }
960 #else
961  MRPT_UNUSED_PARAM(task_index);
962  MRPT_UNUSED_PARAM(nSamplesPerChannel);
963  MRPT_UNUSED_PARAM(volt_values);
964  MRPT_UNUSED_PARAM(timeout);
965  MRPT_UNUSED_PARAM(groupedByChannel);
966 #endif
967 }
968 
970  size_t task_index, bool line_value, double timeout)
971 {
972 #if MRPT_HAS_SOME_NIDAQMX
973  ASSERT_(task_index < m_running_tasks.size())
974 
976  std::advance(it, task_index);
977  TInfoPerTask& ipt = *it;
978  TaskHandle& taskHandle = *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
979 
980  uInt8 dat = line_value ? 1 : 0;
981 
982  int32 samplesWritten = 0;
983  int32 nSamplesPerChannel = 1;
984  int err = 0;
985  if (err = MRPT_DAQmxWriteDigitalLines(
986  taskHandle, nSamplesPerChannel, FALSE, timeout,
987  DAQmx_Val_GroupByScanNumber, &dat, &samplesWritten, nullptr))
988  {
989  MRPT_DAQmx_ErrChk(err)
990  }
991 
992 #else
993  MRPT_UNUSED_PARAM(task_index);
994  MRPT_UNUSED_PARAM(line_value);
995  MRPT_UNUSED_PARAM(timeout);
996 #endif
997 }
998 
999 // Ctor:
1001  : has_ai(false),
1002  has_ao(false),
1003  has_di(false),
1004  has_do(false),
1005  has_ci_period(false),
1006  has_ci_count_edges(false),
1007  has_ci_pulse_width(false),
1008  has_ci_lin_encoder(false),
1009  has_ci_ang_encoder(false),
1010  has_co_pulses(false),
1011  samplesPerSecond(1000.0),
1012  bufferSamplesPerChannel(200000),
1013  samplesPerChannelToRead(1000)
1014 {
1015 }
#define ASSERT_EQUAL_(__A, __B)
uint16_t AIN_channel_count
Readings from analog input (ADCs) channels (vector length=channel count) in Volts.
#define MRPT_DAQmxCreateCILinEncoderChan
double sample_rate
Readings from ticks counters, such as quadrature encoders.
std::shared_ptr< CObservationRawDAQ > Ptr
Each of the tasks to create in CNationalInstrumentsDAQ::initialize().
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_ang_encoder_t ci_ang_encoder
Counter: uses an angular encoder to measure angular position.
GLdouble GLdouble t
Definition: glext.h:3689
std::string format(const char *fmt,...) MRPT_printf_format_check(1
A std::string version of C sprintf.
#define ASSERT_ABOVE_(__A, __B)
virtual void initialize()
Setup and launch the DAQ tasks, in parallel threads.
#define MRPT_DAQmxCreateDOChan
std::string line
The digital line (for example "Dev1/port0/line1")
This namespace provides a OS-independent interface to many useful functions: filenames manipulation...
Definition: math_frwds.h:30
#define MRPT_DAQmxCreateCIPulseWidthChan
std::string m_sensorLabel
See CGenericSensor.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_co_pulses_t co_pulses
Output counter: digital pulses output.
#define MRPT_DAQmxStartTask
#define THROW_EXCEPTION(msg)
#define THROW_EXCEPTION_FMT(_FORMAT_STRING,...)
Scalar * iterator
Definition: eigen_plugins.h:26
#define MRPT_DAQmxCreateAOVoltageChan
#define MRPT_DAQmxCreateDIChan
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_do_t douts
Digital outs (do)
mrpt::system::TTimeStamp now()
A shortcut for system::getCurrentTime.
Definition: datetime.h:76
Contains classes for various device interfaces.
std::string read_string(const std::string &section, const std::string &name, const std::string &defaultValue, bool failIfNotFound=false) const
#define MRPT_DAQmxCreateCIPeriodChan
STL namespace.
bool checkDAQIsWorking() const
Returns true if initialize() was called and at least one task is running.
TaskDescription task
A copy of the original task description that generated this thread.
GLdouble s
Definition: glext.h:3676
#define MRPT_DAQmxWriteDigitalLines
#define MRPT_DAQmxReadAnalogF64
#define MRPT_TODO(x)
This class allows loading and storing values and vectors of different types from a configuration text...
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_count_edges_t ci_count_edges
Counter: period of a digital signal.
void loadConfig_sensorSpecific(const mrpt::utils::CConfigFileBase &configSource, const std::string &iniSection)
See the class documentation at the top for expected parameters.
#define MRPT_DAQmxWriteAnalogF64
#define MRPT_UNUSED_PARAM(a)
Can be used to avoid "not used parameters" warnings from the compiler.
std::string sampleClkSource
Sample clock source: may be empty (default value) for some channels.
#define MRPT_DAQmxCreateCIAngEncoderChan
void writeDigitalOutputTask(size_t task_index, bool line_value, double timeout)
Changes the boolean state of one digital output line.
#define FALSE
Definition: jmorecfg.h:216
#define MRPT_DAQmxCfgInputBuffer
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_di_t di
Digital inputs (di)
#define MRPT_DAQmxReadCounterF64
This namespace contains representation of robot actions and observations.
int val
Definition: mrpt_jpeglib.h:955
void stop()
Stop the grabbing threads for DAQ tasks.
void grabbing_thread(TInfoPerTask &ipt)
Method to be executed in each parallel thread.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_pulse_width_t ci_pulse_width
Counter: measure the width of a digital pulse.
An interface to read from data acquisition boards compatible with National Instruments "DAQmx Base" o...
#define MY_LOAD_HERE_CONFIG_VAR( variableName, variableType, targetVariable, configFileObject, sectionNameStr)
bool AIN_interleaved
Whether the channels are interleaved (A0 A1 A2 A0 A1 A2...) or not (A0 A0 A0 A1 A1 A1 A2 A2 A2...
GLsizei const GLchar ** string
Definition: glext.h:4101
#define MRPT_DAQmxStopTask
std::vector< mrpt::obs::CObservationRawDAQ::Ptr > m_nextObservations
A buffer for doProcess.
Store raw data from a Data Acquisition (DAQ) device, such that input or output analog and digital cha...
#define IMPLEMENTS_GENERIC_SENSOR(class_name, NameSpace)
This must be inserted in all CGenericSensor classes implementation files:
void writeAnalogOutputTask(size_t task_index, size_t nSamplesPerChannel, const double *volt_values, double timeout, bool groupedByChannel)
Set voltage outputs to all the outputs in an AOUT task For the meaning of parameters, refere to NI DAQmx docs for DAQmxBaseWriteAnalogF64()
std::vector< double > CNTRIN_double
Readings from ticks counters, such as quadrature encoders.
uint32_t bufferSamplesPerChannel
(Default=0) From NI&#39;s docs: The number of samples the buffer can hold for each channel in the task...
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_lin_encoder_t ci_lin_encoder
Counter: uses a linear encoder to measure linear position.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ai_t ai
Analog inputs.
void tokenize(const std::string &inString, const std::string &inDelimiters, std::deque< std::string > &outTokens, bool skipBlankTokens=true) noexcept
Tokenizes a string according to a set of delimiting characters.
void readFromDAQ(std::vector< mrpt::obs::CObservationRawDAQ::Ptr > &outObservations, bool &hardwareError)
Receives data from the DAQ thread(s).
std::string line
The digital line (for example "Dev1/port0/line1")
std::string sensorLabel
An arbitrary label that can be used to identify the sensor.
Definition: CObservation.h:60
unsigned __int64 uint64_t
Definition: rptypes.h:50
#define MRPT_DAQmxCreateAIVoltageChan
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
std::vector< double > AIN_double
Readings from analog input (ADCs) channels (vector length=channel count) in Volts.
mrpt::system::TTimeStamp timestamp
The associated UTC time-stamp.
Definition: CObservation.h:58
uint32_t samplesPerChannelToRead
(Default=1000) The number of samples to grab at once from each channel.
std::vector< TaskDescription > task_definitions
Publicly accessible vector with the list of tasks to be launched upon call to CNationalInstrumentsDAQ...
void doProcess()
This method will be invoked at a minimum rate of "process_rate" (Hz)
std::unique_ptr< mrpt::synch::CPipeWriteEndPoint > write_pipe
unsigned int physicalChannelCount
IMPORTANT This must be the total number of channels listed in "physicalChannel" (e.g.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ao_t ao
Analog outputs.
#define ASSERT_(f)
#define MRPT_DAQmxCreateCOPulseChanFreq
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_period_t ci_period
Counter: period of a digital signal.
uint64_t read_uint64_t(const std::string &section, const std::string &name, uint64_t defaultValue, bool failIfNotFound=false) const
#define MRPT_DAQmx_ErrChk(functionCall)
unsigned int physicalChannelCount
IMPORTANT This must be the total number of channels listed in "physicalChannel" (e.g.
std::string trim(const std::string &str)
Removes leading and trailing spaces.
#define MRPT_DAQmxClearTask
std::unique_ptr< mrpt::synch::CPipeReadEndPoint > read_pipe
double samplesPerSecond
Sample clock config: samples per second.
std::vector< uint8_t > DIN
Present output values for 16-bit analog output (DACs) channels (vector length=channel count) in volts...
bool strCmpI(const std::string &s1, const std::string &s2)
Return true if the two strings are equal (case insensitive)
unsigned __int32 uint32_t
Definition: rptypes.h:47
#define ASSERTMSG_(f, __ERROR_MSG)
#define MY_LOAD_HERE_CONFIG_VAR_NO_DEFAULT( variableName, variableType, targetVariable, configFileObject, sectionNameStr)
#define MRPT_DAQmxReadDigitalU8
#define MRPT_DAQmxCfgSampClkTiming
#define MRPT_DAQmxCreateCICountEdgesChan
void appendObservations(const std::vector< mrpt::utils::CSerializable::Ptr > &obj)
This method must be called by derived classes to enqueue a new observation in the list to be returned...
#define MRPT_DAQmxCreateTask
static void createPipe(ReadPtr &outReadPipe, WritePtr &outWritePipe)
Creates a new pipe and returns the read & write end-points as newly allocated objects.
Definition: CPipe.h:161



Page generated by Doxygen 1.8.14 for MRPT 1.9.9 Git: ae4571287 Thu Nov 23 00:06:53 2017 +0100 at dom oct 27 23:51:55 CET 2019