MRPT  1.9.9
CNationalInstrumentsDAQ.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | http://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2018, Individual contributors, see AUTHORS file |
6  | See: http://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See details in http://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "hwdrivers-precomp.h" // Precompiled headers
11 
13 #include <iterator> // advance()
14 #include <iostream>
16 
17 // If we have both, DAQmx & DAQmxBase, prefer DAQmx:
18 #define MRPT_HAS_SOME_NIDAQMX (MRPT_HAS_NIDAQMXBASE || MRPT_HAS_NIDAQMX)
19 
20 #define MRPT_USE_NIDAQMXBASE (MRPT_HAS_NIDAQMXBASE && !MRPT_HAS_NIDAQMX)
21 #define MRPT_USE_NIDAQMX (MRPT_HAS_NIDAQMX)
22 
23 #if MRPT_USE_NIDAQMXBASE
24 #include "NIDAQmxBase.h" // Include file for NI-DAQmx Base API
25 #endif
26 #if MRPT_USE_NIDAQMX
27 #include "NIDAQmx.h" // Include file for NI-DAQmx API
28 #endif
29 
30 // Macros to use either DAQmx or DAQmx Base automatically, depending on the
31 // installed libraries:
32 #if MRPT_USE_NIDAQMXBASE
33 #define MRPT_DAQmxGetExtendedErrorInfo DAQmxBaseGetExtendedErrorInfo
34 #define MRPT_DAQmxCreateTask DAQmxBaseCreateTask
35 #define MRPT_DAQmxCreateAIVoltageChan DAQmxBaseCreateAIVoltageChan
36 #define MRPT_DAQmxCreateAOVoltageChan DAQmxBaseCreateAOVoltageChan
37 #define MRPT_DAQmxCreateDIChan DAQmxBaseCreateDIChan
38 #define MRPT_DAQmxCreateDOChan DAQmxBaseCreateDOChan
39 #define MRPT_DAQmxCreateCIPeriodChan DAQmxBaseCreateCIPeriodChan
40 #define MRPT_DAQmxCreateCICountEdgesChan DAQmxBaseCreateCICountEdgesChan
41 #define MRPT_DAQmxCreateCIPulseWidthChan DAQmxBaseCreateCIPulseWidthChan
42 #define MRPT_DAQmxCreateCILinEncoderChan DAQmxBaseCreateCILinEncoderChan
43 #define MRPT_DAQmxCreateCIAngEncoderChan DAQmxBaseCreateCIAngEncoderChan
44 #define MRPT_DAQmxCreateCOPulseChanFreq DAQmxBaseCreateCOPulseChanFreq
45 #define MRPT_DAQmxCfgSampClkTiming DAQmxBaseCfgSampClkTiming
46 #define MRPT_DAQmxCfgInputBuffer DAQmxBaseCfgInputBuffer
47 #define MRPT_DAQmxCfgOutputBuffer DAQmxBaseCfgOutputBuffer
48 #define MRPT_DAQmxStartTask DAQmxBaseStartTask
49 #define MRPT_DAQmxStopTask DAQmxBaseStopTask
50 #define MRPT_DAQmxClearTask DAQmxBaseClearTask
51 #define MRPT_DAQmxReadAnalogF64 DAQmxBaseReadAnalogF64
52 #define MRPT_DAQmxReadCounterF64 DAQmxBaseReadCounterF64
53 #define MRPT_DAQmxReadDigitalU8 DAQmxBaseReadDigitalU8
54 #define MRPT_DAQmxWriteAnalogF64 DAQmxBaseWriteAnalogF64
55 #define MRPT_DAQmxWriteDigitalU32 DAQmxBaseWriteDigitalU32
56 #define MRPT_DAQmxWriteDigitalLines DAQmxBaseWriteDigitalLines
57 #else
58 #define MRPT_DAQmxGetExtendedErrorInfo DAQmxGetExtendedErrorInfo
59 #define MRPT_DAQmxCreateTask DAQmxCreateTask
60 #define MRPT_DAQmxCreateAIVoltageChan DAQmxCreateAIVoltageChan
61 #define MRPT_DAQmxCreateAOVoltageChan DAQmxCreateAOVoltageChan
62 #define MRPT_DAQmxCreateDIChan DAQmxCreateDIChan
63 #define MRPT_DAQmxCreateDOChan DAQmxCreateDOChan
64 #define MRPT_DAQmxCreateCIPeriodChan DAQmxCreateCIPeriodChan
65 #define MRPT_DAQmxCreateCICountEdgesChan DAQmxCreateCICountEdgesChan
66 #define MRPT_DAQmxCreateCIPulseWidthChan DAQmxCreateCIPulseWidthChan
67 #define MRPT_DAQmxCreateCILinEncoderChan DAQmxCreateCILinEncoderChan
68 #define MRPT_DAQmxCreateCIAngEncoderChan DAQmxCreateCIAngEncoderChan
69 #define MRPT_DAQmxCreateCOPulseChanFreq DAQmxCreateCOPulseChanFreq
70 #define MRPT_DAQmxCfgSampClkTiming DAQmxCfgSampClkTiming
71 #define MRPT_DAQmxCfgInputBuffer DAQmxCfgInputBuffer
72 #define MRPT_DAQmxCfgOutputBuffer DAQmxCfgOutputBuffer
73 #define MRPT_DAQmxStartTask DAQmxStartTask
74 #define MRPT_DAQmxStopTask DAQmxStopTask
75 #define MRPT_DAQmxClearTask DAQmxClearTask
76 #define MRPT_DAQmxReadAnalogF64 DAQmxReadAnalogF64
77 #define MRPT_DAQmxReadCounterF64 DAQmxReadCounterF64
78 #define MRPT_DAQmxReadDigitalU8 DAQmxReadDigitalU8
79 #define MRPT_DAQmxWriteAnalogF64 DAQmxWriteAnalogF64
80 #define MRPT_DAQmxWriteDigitalU32 DAQmxWriteDigitalU32
81 #define MRPT_DAQmxWriteDigitalLines DAQmxWriteDigitalLines
82 #endif
83 
84 // An auxiliary macro to check and report errors in the DAQmx library as
85 // exceptions with a well-explained message.
86 #define MRPT_DAQmx_ErrChk(functionCall) \
87  if ((functionCall) < 0) \
88  { \
89  char errBuff[2048]; \
90  MRPT_DAQmxGetExtendedErrorInfo(errBuff, 2048); \
91  std::string sErr = mrpt::format( \
92  "DAQ error: '%s'\nCalling: '%s'", errBuff, #functionCall); \
93  THROW_EXCEPTION(sErr); \
94  }
95 
96 using namespace mrpt::hwdrivers;
97 using namespace mrpt::obs;
98 using namespace mrpt::system;
99 using namespace std;
100 
102 
103 // ------------- CNationalInstrumentsDAQ::TInfoPerTask -----------
104 // Default ctor:
106  : taskHandle(0),
107  must_close(false),
108  is_closed(false),
109  new_obs_available(0),
110  task()
111 {
112 }
113 
114 /* -----------------------------------------------------
115  Constructor
116  ----------------------------------------------------- */
118  : mrpt::system::COutputLogger("CNationalInstrumentsDAQ")
119 {
120  m_sensorLabel = "NIDAQ";
121 }
122 
123 // Just like "MRPT_LOAD_HERE_CONFIG_VAR" but...
124 #define MY_LOAD_HERE_CONFIG_VAR( \
125  variableName, variableType, targetVariable, configFileObject, \
126  sectionNameStr) \
127  targetVariable = configFileObject.read_##variableType( \
128  sectionNameStr, variableName, targetVariable, false);
129 
130 #define MY_LOAD_HERE_CONFIG_VAR_NO_DEFAULT( \
131  variableName, variableType, targetVariable, configFileObject, \
132  sectionNameStr) \
133  { \
134  try \
135  { \
136  targetVariable = configFileObject.read_##variableType( \
137  sectionNameStr, variableName, targetVariable, true); \
138  } \
139  catch (std::exception&) \
140  { \
141  THROW_EXCEPTION( \
142  format( \
143  "Value for '%s' not found in config file", \
144  std::string(variableName).c_str())); \
145  } \
146  }
147 
148 /* -----------------------------------------------------
149  loadConfig_sensorSpecific
150  ----------------------------------------------------- */
152  const mrpt::config::CConfigFileBase& cfg, const std::string& sect)
153 {
154  // std::vector<TaskDescription> task_definitions;
155  task_definitions.clear();
156 
157  const unsigned int nTasks = cfg.read_uint64_t(sect, "num_tasks", 0, true);
158  if (!nTasks)
159  {
160  std::cerr << "[CNationalInstrumentsDAQ] Warning: Number of tasks is "
161  "zero. No datalogging will be done.\n";
162  }
163 
164  task_definitions.resize(nTasks);
165  for (unsigned int i = 0; i < nTasks; i++)
166  {
168  const string sTask = mrpt::format("task%u", i);
169 
170  // Read general settings for this task:
171  // ---------------------------------------
172  const string sChanns =
173  cfg.read_string(sect, sTask + string(".channels"), "", true);
174  vector<string> lstStrChanns;
175  mrpt::system::tokenize(sChanns, " \t,", lstStrChanns);
176  if (lstStrChanns.empty())
177  THROW_EXCEPTION_FMT("List of channels for task %u is empty!", i)
178 
180  sTask + string(".samplesPerSecond"), double, t.samplesPerSecond,
181  cfg, sect)
183  sTask + string(".samplesPerChannelToRead"), double,
184  t.samplesPerChannelToRead, cfg, sect)
186  sTask + string(".sampleClkSource"), string, t.sampleClkSource, cfg,
187  sect)
189  sTask + string(".bufferSamplesPerChannel"), double,
190  t.bufferSamplesPerChannel, cfg, sect)
191  t.taskLabel =
192  cfg.read_string(sect, sTask + string(".taskLabel"), sTask, false);
193 
194  for (size_t j = 0; j < lstStrChanns.size(); j++)
195  {
196  if (strCmpI(lstStrChanns[j], "ai"))
197  {
198  t.has_ai = true;
200  sTask + string(".ai.physicalChannel"), string,
201  t.ai.physicalChannel, cfg, sect)
203  sTask + string(".ai.physicalChannelCount"), uint64_t,
204  t.ai.physicalChannelCount, cfg, sect)
206  sTask + string(".ai.terminalConfig"), string,
207  t.ai.terminalConfig, cfg, sect)
209  sTask + string(".ai.minVal"), double, t.ai.minVal, cfg,
210  sect)
212  sTask + string(".ai.maxVal"), double, t.ai.maxVal, cfg,
213  sect)
214  }
215  else if (strCmpI(lstStrChanns[j], "ao"))
216  {
217  t.has_ao = true;
219  sTask + string(".ao.physicalChannel"), string,
220  t.ao.physicalChannel, cfg, sect)
222  sTask + string(".ao.physicalChannelCount"), uint64_t,
223  t.ao.physicalChannelCount, cfg, sect)
225  sTask + string(".ao.minVal"), double, t.ao.minVal, cfg,
226  sect)
228  sTask + string(".ao.maxVal"), double, t.ao.maxVal, cfg,
229  sect)
230  }
231  else if (strCmpI(lstStrChanns[j], "di"))
232  {
233  t.has_di = true;
235  sTask + string(".di.line"), string, t.di.line, cfg, sect)
236  }
237  else if (strCmpI(lstStrChanns[j], "do"))
238  {
239  t.has_do = true;
241  sTask + string(".do.line"), string, t.douts.line, cfg, sect)
242  }
243  else if (strCmpI(lstStrChanns[j], "ci_period"))
244  {
245  t.has_ci_period = true;
247  sTask + string(".ci_period.counter"), string,
248  t.ci_period.counter, cfg, sect)
250  sTask + string(".ci_period.minVal"), double,
251  t.ci_period.minVal, cfg, sect)
253  sTask + string(".ci_period.maxVal"), double,
254  t.ci_period.maxVal, cfg, sect)
256  sTask + string(".ci_period.units"), string,
257  t.ci_period.units, cfg, sect)
259  sTask + string(".ci_period.edge"), string, t.ci_period.edge,
260  cfg, sect)
262  sTask + string(".ci_period.measTime"), double,
263  t.ci_period.measTime, cfg, sect)
265  sTask + string(".ci_period.divisor"), int,
266  t.ci_period.divisor, cfg, sect)
267  }
268  else if (strCmpI(lstStrChanns[j], "ci_count_edges"))
269  {
270  t.has_ci_count_edges = true;
272  sTask + string(".ci_count_edges.counter"), string,
273  t.ci_count_edges.counter, cfg, sect)
275  sTask + string(".ci_count_edges.edge"), string,
276  t.ci_count_edges.edge, cfg, sect)
278  sTask + string(".ci_count_edges.initialCount"), int,
279  t.ci_count_edges.initialCount, cfg, sect)
281  sTask + string(".ci_count_edges.countDirection"), string,
282  t.ci_count_edges.countDirection, cfg, sect)
283  }
284  else if (strCmpI(lstStrChanns[j], "ci_pulse_width"))
285  {
286  t.has_ci_pulse_width = true;
288  sTask + string(".ci_pulse_width.counter"), string,
289  t.ci_pulse_width.counter, cfg, sect)
291  sTask + string(".ci_pulse_width.minVal"), double,
292  t.ci_pulse_width.minVal, cfg, sect)
294  sTask + string(".ci_pulse_width.maxVal"), double,
295  t.ci_pulse_width.maxVal, cfg, sect)
297  sTask + string(".ci_pulse_width.units"), string,
298  t.ci_pulse_width.units, cfg, sect)
300  sTask + string(".ci_pulse_width.startingEdge"), string,
301  t.ci_pulse_width.startingEdge, cfg, sect)
302  }
303  else if (strCmpI(lstStrChanns[j], "ci_lin_encoder"))
304  {
305  t.has_ci_lin_encoder = true;
307  sTask + string(".ci_lin_encoder.counter"), string,
308  t.ci_lin_encoder.counter, cfg, sect)
310  sTask + string(".ci_lin_encoder.decodingType"), string,
311  t.ci_lin_encoder.decodingType, cfg, sect)
313  sTask + string(".ci_lin_encoder.ZidxEnable"), bool,
314  t.ci_lin_encoder.ZidxEnable, cfg, sect)
316  sTask + string(".ci_lin_encoder.ZidxVal"), double,
317  t.ci_lin_encoder.ZidxVal, cfg, sect)
319  sTask + string(".ci_lin_encoder.ZidxPhase"), string,
320  t.ci_lin_encoder.ZidxPhase, cfg, sect)
322  sTask + string(".ci_lin_encoder.units"), string,
323  t.ci_lin_encoder.units, cfg, sect)
325  sTask + string(".ci_lin_encoder.distPerPulse"), double,
326  t.ci_lin_encoder.distPerPulse, cfg, sect)
328  sTask + string(".ci_lin_encoder.initialPos"), double,
329  t.ci_lin_encoder.initialPos, cfg, sect)
330  }
331  else if (strCmpI(lstStrChanns[j], "ci_ang_encoder"))
332  {
333  t.has_ci_ang_encoder = true;
335  sTask + string(".ci_ang_encoder.counter"), string,
336  t.ci_ang_encoder.counter, cfg, sect)
338  sTask + string(".ci_ang_encoder.decodingType"), string,
339  t.ci_ang_encoder.decodingType, cfg, sect)
341  sTask + string(".ci_ang_encoder.ZidxEnable"), bool,
342  t.ci_ang_encoder.ZidxEnable, cfg, sect)
344  sTask + string(".ci_ang_encoder.ZidxVal"), double,
345  t.ci_ang_encoder.ZidxVal, cfg, sect)
347  sTask + string(".ci_ang_encoder.ZidxPhase"), string,
348  t.ci_ang_encoder.ZidxPhase, cfg, sect)
350  sTask + string(".ci_ang_encoder.units"), string,
351  t.ci_ang_encoder.units, cfg, sect)
353  sTask + string(".ci_ang_encoder.pulsesPerRev"), int,
354  t.ci_ang_encoder.pulsesPerRev, cfg, sect)
356  sTask + string(".ci_ang_encoder.initialAngle"), double,
357  t.ci_ang_encoder.initialAngle, cfg, sect)
359  sTask + string(".ci_ang_encoder.decimate"), int,
360  t.ci_ang_encoder.decimate, cfg, sect)
361  }
362  else if (strCmpI(lstStrChanns[j], "co_pulses"))
363  {
364  t.has_co_pulses = true;
366  sTask + string(".co_pulses.counter"), string,
367  t.co_pulses.counter, cfg, sect)
369  sTask + string(".co_pulses.idleState"), string,
370  t.co_pulses.idleState, cfg, sect)
372  sTask + string(".co_pulses.initialDelay"), double,
373  t.co_pulses.initialDelay, cfg, sect)
375  sTask + string(".co_pulses.freq"), double, t.co_pulses.freq,
376  cfg, sect)
378  sTask + string(".co_pulses.dutyCycle"), double,
379  t.co_pulses.dutyCycle, cfg, sect)
380  }
381  else
382  {
384  "Unknown channel type '%s'! See the docs of "
385  "CNationalInstrumentsDAQ",
386  lstStrChanns[j].c_str())
387  }
388  } // end for each "k" channel in channel "i"
389  } // end for "i", each task
390 }
391 
392 /* -----------------------------------------------------
393  Destructor
394  ----------------------------------------------------- */
396 #if MRPT_HAS_SOME_NIDAQMX
397 // Declare a table to convert strings to their DAQmx #define values:
398 struct daqmx_str_val
399 {
400  const char* str;
401  int val;
402 };
403 
404 const daqmx_str_val daqmx_vals[] = {
405  {"DAQmx_Val_Cfg_Default", DAQmx_Val_Cfg_Default},
406  {"DAQmx_Val_RSE", DAQmx_Val_RSE},
407  {"DAQmx_Val_NRSE", DAQmx_Val_NRSE},
408  {"DAQmx_Val_Diff", DAQmx_Val_Diff},
409  {"DAQmx_Val_Seconds", DAQmx_Val_Seconds},
410  {"DAQmx_Val_Rising", DAQmx_Val_Rising},
411  {"DAQmx_Val_Falling", DAQmx_Val_Falling},
412  {"DAQmx_Val_CountUp", DAQmx_Val_CountUp},
413  {"DAQmx_Val_CountDown", DAQmx_Val_CountDown},
414  {"DAQmx_Val_ExtControlled", DAQmx_Val_ExtControlled},
415  {"DAQmx_Val_AHighBHigh", DAQmx_Val_AHighBHigh},
416  {"DAQmx_Val_AHighBLow", DAQmx_Val_AHighBLow},
417  {"DAQmx_Val_ALowBHigh", DAQmx_Val_ALowBHigh},
418  {"DAQmx_Val_ALowBLow", DAQmx_Val_ALowBLow},
419  {"DAQmx_Val_X1", DAQmx_Val_X1},
420  {"DAQmx_Val_X2", DAQmx_Val_X2},
421  {"DAQmx_Val_X4", DAQmx_Val_X4},
422  {"DAQmx_Val_Meters", DAQmx_Val_Meters},
423  {"DAQmx_Val_Inches", DAQmx_Val_Inches},
424  {"DAQmx_Val_Ticks", DAQmx_Val_Ticks},
425  {"DAQmx_Val_Degrees", DAQmx_Val_Degrees},
426  {"DAQmx_Val_Radians", DAQmx_Val_Radians},
427  {"DAQmx_Val_High", DAQmx_Val_High},
428  {"DAQmx_Val_Low", DAQmx_Val_Low}};
429 
430 int daqmx_defstr2num(const std::string& str)
431 {
432  const std::string s = mrpt::system::trim(str);
433 
434  for (unsigned int i = 0; i < sizeof(daqmx_vals) / sizeof(daqmx_vals[0]);
435  i++)
436  {
437  if (strCmpI(daqmx_vals[i].str, s.c_str())) return daqmx_vals[i].val;
438  }
439  THROW_EXCEPTION_FMT("Error: Unknown DAQmx constant: %s", s.c_str())
440 }
441 #endif
442 
443 /* -----------------------------------------------------
444  initialize
445 ----------------------------------------------------- */
447 {
448 #if MRPT_HAS_SOME_NIDAQMX
449  this->stop();
450 
451  for (size_t i = 0; i < task_definitions.size(); i++)
452  {
453  const TaskDescription& tf = task_definitions[i];
454 
455  // Try to create a new task:
456  m_running_tasks.push_back(TInfoPerTask());
457  TInfoPerTask& ipt = m_running_tasks.back();
458  ipt.task = tf; // Save a copy of the task info for the thread to have
459  // all the needed info
460 
461  try
462  {
463  TaskHandle& taskHandle =
464  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
465 
466  MRPT_DAQmx_ErrChk(MRPT_DAQmxCreateTask("", &taskHandle));
467 
468  if (tf.has_ai)
469  {
470  ASSERTMSG_(
471  tf.ai.physicalChannelCount > 0,
472  "ai.physicalChannelCount is zero! Please, define it "
473  "correctly.")
474 
477  taskHandle, tf.ai.physicalChannel.c_str(), nullptr,
478  daqmx_defstr2num(tf.ai.terminalConfig), tf.ai.minVal,
479  tf.ai.maxVal, DAQmx_Val_Volts, nullptr));
480  }
481  if (tf.has_ao)
482  {
483  ASSERTMSG_(
484  tf.ao.physicalChannelCount > 0,
485  "ai.physicalChannelCount is zero! Please, define it "
486  "correctly.")
487 
490  taskHandle, tf.ao.physicalChannel.c_str(), nullptr,
491  tf.ao.minVal, tf.ao.maxVal, DAQmx_Val_Volts, nullptr));
492  }
493  if (tf.has_di)
494  {
497  taskHandle, tf.di.line.c_str(), nullptr,
498  DAQmx_Val_ChanPerLine));
499  }
500  if (tf.has_do)
501  {
504  taskHandle, tf.douts.line.c_str(), nullptr,
505  DAQmx_Val_ChanPerLine));
506  }
507  if (tf.has_ci_period)
508  {
511  taskHandle, tf.ci_period.counter.c_str(), nullptr,
513  daqmx_defstr2num(tf.ci_period.units),
514  daqmx_defstr2num(tf.ci_period.edge),
515  DAQmx_Val_LowFreq1Ctr, tf.ci_period.measTime,
516  tf.ci_period.divisor, nullptr));
517  }
518  if (tf.has_ci_count_edges)
519  {
522  taskHandle, tf.ci_count_edges.counter.c_str(), nullptr,
523  daqmx_defstr2num(tf.ci_count_edges.edge),
525  daqmx_defstr2num(tf.ci_count_edges.countDirection)));
526  }
527  if (tf.has_ci_pulse_width)
528  {
531  taskHandle, tf.ci_pulse_width.counter.c_str(), nullptr,
533  daqmx_defstr2num(tf.ci_pulse_width.units),
534  daqmx_defstr2num(tf.ci_pulse_width.startingEdge),
535  nullptr));
536  }
537  if (tf.has_ci_lin_encoder)
538  {
541  taskHandle, tf.ci_lin_encoder.counter.c_str(), nullptr,
542  daqmx_defstr2num(tf.ci_lin_encoder.decodingType),
544  daqmx_defstr2num(tf.ci_lin_encoder.ZidxPhase),
545  daqmx_defstr2num(tf.ci_lin_encoder.units),
547  tf.ci_lin_encoder.initialPos, nullptr));
548  }
549  if (tf.has_ci_ang_encoder)
550  {
553  taskHandle, tf.ci_ang_encoder.counter.c_str(), nullptr,
554  daqmx_defstr2num(tf.ci_ang_encoder.decodingType),
556  daqmx_defstr2num(tf.ci_ang_encoder.ZidxPhase),
557  daqmx_defstr2num(tf.ci_ang_encoder.units),
559  tf.ci_ang_encoder.initialAngle, nullptr));
560  }
561  if (tf.has_co_pulses)
562  {
565  taskHandle, tf.co_pulses.counter.c_str(), nullptr,
566  DAQmx_Val_Hz, daqmx_defstr2num(tf.co_pulses.idleState),
568  tf.co_pulses.dutyCycle));
569  }
570 
571  // Seems to be needed to avoid an errors avoid like:
572  // " Onboard device memory overflow. Because of system and/or
573  // bus-bandwidth limitations, the driver could not read data from
574  // the device fast enough to keep up with the device throughput."
575  if (tf.has_ai || tf.has_di || tf.has_ci_period ||
578  {
579  // sample rate:
583  taskHandle, tf.sampleClkSource.c_str(),
584  tf.samplesPerSecond, DAQmx_Val_Rising,
585  DAQmx_Val_ContSamps, tf.samplesPerChannelToRead));
586 
589  taskHandle, tf.bufferSamplesPerChannel));
590  }
591 
592  if (tf.has_ao)
593  {
594  // Nothing to do as long as we only need "on demand" outputs.
595  // MRPT_DAQmx_ErrChk (MRPT_DAQmxCfgOutputBuffer(taskHandle,2
596  ///*tf.bufferSamplesPerChannel*/ ));
597  // // Output buffer MUST have some data before starting the
598  // task: write 0s:
599  // vector<double> d;
600  // d.assign(tf.ao.physicalChannelCount*2, 0.0);
601  // this->writeAnalogOutputTask(i,1 /* samples per channel */,
602  //&d[0], 0.10 /*timeout*/, false);
603  }
604 
605  // Create pipe:
606  mrpt::synch::CPipe::createPipe(ipt.read_pipe, ipt.write_pipe);
607 
608  // Add a large timeout, just in case the writing thread dies
609  // unexpectedly so the reader doesn't hang on:
610  ipt.read_pipe->timeout_read_start_us = 100000; // 100ms
611  ipt.read_pipe->timeout_read_between_us = 100000; // 100ms
612 
614 
615  ipt.hThread = std::thread(
617  }
618  catch (std::exception const& e)
619  {
620  std::cerr << "[CNationalInstrumentsDAQ] Error:" << std::endl
621  << e.what() << std::endl;
622  if (ipt.taskHandle != nullptr)
623  {
624  TaskHandle& taskHandle =
625  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
626  MRPT_DAQmxStopTask(taskHandle);
627  MRPT_DAQmxClearTask(taskHandle);
628  }
629 
630  // Stop thread:
631  if (ipt.hThread.joinable())
632  {
633  ipt.must_close = true;
634  cerr << "[CNationalInstrumentsDAQ::initialize] Waiting for the "
635  "grabbing thread to end due to exception...\n";
636  ipt.hThread.join();
637  cerr << "[CNationalInstrumentsDAQ::initialize] Grabbing thread "
638  "ended.\n";
639  }
640 
641  // Remove from list:
642  m_running_tasks.erase(--m_running_tasks.end());
643 
644  std::cerr << "[CNationalInstrumentsDAQ] Error while creating "
645  "tasks. Closing other tasks before returning...\n";
646  this->stop();
647  std::cerr << "[CNationalInstrumentsDAQ] Closing tasks done.\n";
648 
649  throw; // Rethrow
650  }
651  } // end for each task_definitions[i]
652 
653 #else
654  THROW_EXCEPTION("MRPT was compiled without support for NI DAQmx!!");
655 #endif
656 }
657 
658 /** Stop the grabbing threads for DAQ tasks. It is automatically called at
659  * destruction. */
661 {
662  // Stop all threads:
664  it != m_running_tasks.end(); ++it)
665  {
666  it->must_close = true;
667  }
668  if (m_verbose)
669  cout << "[CNationalInstrumentsDAQ::stop] Waiting for grabbing threads "
670  "to end...\n";
672  it != m_running_tasks.end(); ++it)
673  {
674  // For some reason, join doesn't work...
675  if (it->hThread.joinable()) it->hThread.join();
676  // Polling:
677  // for (size_t tim=0;tim<250 && !it->is_closed;tim++) {
678  // std::this_thread::sleep_for(1ms); }
679  // it->hThread.clear();
680  }
681  if (m_verbose)
682  cout << "[CNationalInstrumentsDAQ::stop] All threads ended.\n";
683 
684 // Stop all NI tasks:
685 #if MRPT_HAS_SOME_NIDAQMX
687  it != m_running_tasks.end(); ++it)
688  {
689  TaskHandle& taskHandle =
690  *reinterpret_cast<TaskHandle*>(&it->taskHandle);
691 
692  MRPT_DAQmxStopTask(taskHandle);
693  MRPT_DAQmxClearTask(taskHandle);
694  taskHandle = nullptr;
695  }
696 #endif
697 }
698 
699 /** Returns true if initialize() was called successfully. */
701 {
702  return (!m_running_tasks.empty() && !m_running_tasks.begin()->is_closed);
703 }
704 
705 /*-------------------------------------------------------------
706  readFromDAQ
707 -------------------------------------------------------------*/
709  std::vector<mrpt::obs::CObservationRawDAQ::Ptr>& outObservations,
710  bool& hardwareError)
711 {
712  hardwareError = false;
713  outObservations.clear();
714 
715  if (!checkDAQIsWorking())
716  {
717  hardwareError = true;
718  return;
719  }
720 
721  // Read from the pipe:
722  m_state = ssWorking;
723 
725  it != m_running_tasks.end(); ++it)
726  {
727  CObservationRawDAQ tmp_obs;
728  try
729  {
730  if (it->new_obs_available != 0)
731  {
732  auto arch = mrpt::serialization::archiveFrom(*it->read_pipe);
733  arch.ReadObject(&tmp_obs);
734  --(it->new_obs_available);
735 
736  // Yes, valid block of samples was adquired:
737  outObservations.push_back(
739  }
740  }
741  catch (...)
742  {
743  // Timeout...
744  }
745  }
746 }
747 
748 /* -----------------------------------------------------
749  doProcess
750 ----------------------------------------------------- */
752 {
753  bool hwError;
755 
756  if (hwError)
757  {
758  m_state = ssError;
759  THROW_EXCEPTION("Couldn't start DAQ task!");
760  }
761 
762  if (!m_nextObservations.empty())
763  {
764  m_state = ssWorking;
765 
766  std::vector<mrpt::serialization::CSerializable::Ptr> new_obs;
767  new_obs.resize(m_nextObservations.size());
768 
769  for (size_t i = 0; i < m_nextObservations.size(); i++)
770  new_obs[i] = m_nextObservations[i];
771 
772  appendObservations(new_obs);
773  }
774 }
775 
776 /* -----------------------------------------------------
777  grabbing_thread
778 ----------------------------------------------------- */
780 {
781 #if MRPT_HAS_SOME_NIDAQMX
782  try
783  {
784  TaskHandle& taskHandle =
785  *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
786  if (m_verbose)
787  cout << "[CNationalInstrumentsDAQ::grabbing_thread] Starting "
788  "thread for task "
789  << ipt.taskHandle << "\n";
790 
791  MRPT_TODO("Add write timeout")
792  // ipt.write_pipe->timeout_read_between_us
793 
794  const float timeout =
796 
797  int err = 0;
798  vector<double> dBuf;
799  vector<uint8_t> u8Buf;
800 
801  const mrpt::obs::CObservationRawDAQ clean_obs;
803 
804  while (!ipt.must_close)
805  {
806  obs = clean_obs; // Start with an empty observation
807 
808  // Common stuff:
811  obs.sensorLabel = m_sensorLabel + string(".") + ipt.task.taskLabel;
812 
813  bool there_are_data = false; // At least one channel?
814 
815  // Read from each channel in this task:
816  // -----------------------------------------------
817  if (ipt.task.has_ai)
818  {
820  obs.AIN_interleaved = true;
821 
822  const uint32_t totalSamplesToRead =
825  dBuf.resize(totalSamplesToRead);
826  int32 pointsReadPerChan = -1;
827  if ((err = MRPT_DAQmxReadAnalogF64(
828  taskHandle, ipt.task.samplesPerChannelToRead, timeout,
829  obs.AIN_interleaved ? DAQmx_Val_GroupByScanNumber
830  : DAQmx_Val_GroupByChannel,
831  &dBuf[0], dBuf.size(), &pointsReadPerChan, nullptr)) <
832  0 &&
833  err != DAQmxErrorSamplesNotYetAvailable)
834  {
835  MRPT_DAQmx_ErrChk(err)
836  }
837  else if (pointsReadPerChan > 0)
838  {
840  totalSamplesToRead,
841  pointsReadPerChan * ipt.task.ai.physicalChannelCount)
842  obs.AIN_double = dBuf;
843  there_are_data = true;
844  if (m_verbose)
845  cout << "[CNationalInstrumentsDAQ::grabbing_thread] "
846  << pointsReadPerChan << " analog samples read.\n";
847  }
848  } // end AI
849  if (ipt.task.has_di)
850  {
851  const uint32_t totalSamplesToRead =
853  u8Buf.resize(totalSamplesToRead);
854 
855  int32 pointsReadPerChan = -1;
856  if ((err = MRPT_DAQmxReadDigitalU8(
857  taskHandle, ipt.task.samplesPerChannelToRead, timeout,
858  DAQmx_Val_GroupByChannel, &u8Buf[0], u8Buf.size(),
859  &pointsReadPerChan, nullptr)) < 0 &&
860  err != DAQmxErrorSamplesNotYetAvailable)
861  {
862  MRPT_DAQmx_ErrChk(err)
863  }
864  else if (pointsReadPerChan > 0)
865  {
867  totalSamplesToRead,
868  pointsReadPerChan * ipt.task.ai.physicalChannelCount)
869  obs.DIN = u8Buf;
870  there_are_data = true;
871  if (m_verbose)
872  cout << "[CNationalInstrumentsDAQ::grabbing_thread] "
873  << pointsReadPerChan << " digital samples read.\n";
874  }
875  } // end DI
877  {
878  const int32 totalSamplesToRead =
880  dBuf.resize(totalSamplesToRead);
881  int32 pointsReadPerChan = -1;
882  if ((err = MRPT_DAQmxReadCounterF64(
883  taskHandle, totalSamplesToRead, timeout, &dBuf[0],
884  dBuf.size(), &pointsReadPerChan, nullptr)) < 0 &&
885  err != DAQmxErrorSamplesNotYetAvailable)
886  {
887  MRPT_DAQmx_ErrChk(err)
888  }
889  else if (pointsReadPerChan > 0)
890  {
891  ASSERT_EQUAL_(totalSamplesToRead, pointsReadPerChan);
892  // Decimate?
893  if (++ipt.task.ci_ang_encoder.decimate_cnt >=
895  {
897 
898  obs.CNTRIN_double = dBuf;
899  there_are_data = true;
900  if (m_verbose && !obs.CNTRIN_double.empty())
901  {
902  static int decim = 0;
903  if (!decim)
904  cout << "[CNationalInstrumentsDAQ::grabbing_"
905  "thread] "
906  << pointsReadPerChan
907  << " counter samples read ([0]="
908  << obs.CNTRIN_double[0] << ").\n";
909  if (++decim > 100) decim = 0;
910  }
911  }
912  }
913  } // end COUNTERS
914 
915  // Send the observation to the main thread:
916  if (there_are_data)
917  {
918  ++(ipt.new_obs_available);
919  ipt.write_pipe->WriteObject(&obs);
920  // std::this_thread::sleep_for(1ms); // This seems to be needed
921  // to allow all objs to be sent to the recv thread
922  }
923  else
924  {
925  std::this_thread::sleep_for(1ms);
926  }
927 
928  } // end of main thread loop
929  }
930  catch (std::exception& e)
931  {
932  std::cerr << "[CNationalInstrumentsDAQ::grabbing_thread] Exception:\n"
933  << e.what() << std::endl;
934  }
935 #endif // MRPT_HAS_SOME_NIDAQMX
936 
937  ipt.is_closed = true;
938 }
939 
941  size_t task_index, size_t nSamplesPerChannel, const double* volt_values,
942  double timeout, bool groupedByChannel)
943 {
944 #if MRPT_HAS_SOME_NIDAQMX
945  ASSERT_(task_index < m_running_tasks.size());
947  std::advance(it, task_index);
948  TInfoPerTask& ipt = *it;
949  TaskHandle& taskHandle = *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
950 
951  int32 samplesWritten = 0;
952  int err = 0;
953  if (err = MRPT_DAQmxWriteAnalogF64(
954  taskHandle, nSamplesPerChannel, FALSE, timeout,
955  groupedByChannel ? DAQmx_Val_GroupByChannel
956  : DAQmx_Val_GroupByScanNumber,
957  const_cast<float64*>(volt_values), &samplesWritten, nullptr))
958  {
959  MRPT_DAQmx_ErrChk(err)
960  }
961 #else
962  MRPT_UNUSED_PARAM(task_index);
963  MRPT_UNUSED_PARAM(nSamplesPerChannel);
964  MRPT_UNUSED_PARAM(volt_values);
965  MRPT_UNUSED_PARAM(timeout);
966  MRPT_UNUSED_PARAM(groupedByChannel);
967 #endif
968 }
969 
971  size_t task_index, bool line_value, double timeout)
972 {
973 #if MRPT_HAS_SOME_NIDAQMX
974  ASSERT_(task_index < m_running_tasks.size());
976  std::advance(it, task_index);
977  TInfoPerTask& ipt = *it;
978  TaskHandle& taskHandle = *reinterpret_cast<TaskHandle*>(&ipt.taskHandle);
979 
980  uInt8 dat = line_value ? 1 : 0;
981 
982  int32 samplesWritten = 0;
983  int32 nSamplesPerChannel = 1;
984  int err = 0;
985  if (err = MRPT_DAQmxWriteDigitalLines(
986  taskHandle, nSamplesPerChannel, FALSE, timeout,
987  DAQmx_Val_GroupByScanNumber, &dat, &samplesWritten, nullptr))
988  {
989  MRPT_DAQmx_ErrChk(err)
990  }
991 
992 #else
993  MRPT_UNUSED_PARAM(task_index);
994  MRPT_UNUSED_PARAM(line_value);
995  MRPT_UNUSED_PARAM(timeout);
996 #endif
997 }
998 
999 // Ctor:
1001  : has_ai(false),
1002  has_ao(false),
1003  has_di(false),
1004  has_do(false),
1005  has_ci_period(false),
1006  has_ci_count_edges(false),
1007  has_ci_pulse_width(false),
1008  has_ci_lin_encoder(false),
1009  has_ci_ang_encoder(false),
1010  has_co_pulses(false),
1011  samplesPerSecond(1000.0),
1012  bufferSamplesPerChannel(200000),
1013  samplesPerChannelToRead(1000)
1014 {
1015 }
#define IMPLEMENTS_GENERIC_SENSOR(class_name, NameSpace)
This must be inserted in all CGenericSensor classes implementation files:
#define MRPT_DAQmxCreateCOPulseChanFreq
#define MRPT_DAQmxCreateCICountEdgesChan
#define MRPT_DAQmxWriteDigitalLines
#define MRPT_DAQmxCfgInputBuffer
#define MRPT_DAQmxCfgSampClkTiming
#define MRPT_DAQmxStopTask
#define MRPT_DAQmxStartTask
#define MRPT_DAQmxCreateCILinEncoderChan
#define MRPT_DAQmxCreateCIPulseWidthChan
#define MRPT_DAQmxCreateTask
#define MRPT_DAQmxReadCounterF64
#define MRPT_DAQmxCreateAIVoltageChan
#define MRPT_DAQmxReadDigitalU8
#define MRPT_DAQmxCreateCIAngEncoderChan
#define MY_LOAD_HERE_CONFIG_VAR_NO_DEFAULT( variableName, variableType, targetVariable, configFileObject, sectionNameStr)
#define MRPT_DAQmxCreateCIPeriodChan
#define MRPT_DAQmxCreateAOVoltageChan
#define MRPT_DAQmxCreateDOChan
#define MY_LOAD_HERE_CONFIG_VAR( variableName, variableType, targetVariable, configFileObject, sectionNameStr)
#define MRPT_DAQmx_ErrChk(functionCall)
#define MRPT_DAQmxCreateDIChan
#define MRPT_DAQmxClearTask
#define MRPT_DAQmxReadAnalogF64
#define MRPT_DAQmxWriteAnalogF64
This class allows loading and storing values and vectors of different types from a configuration text...
std::string read_string(const std::string &section, const std::string &name, const std::string &defaultValue, bool failIfNotFound=false) const
uint64_t read_uint64_t(const std::string &section, const std::string &name, uint64_t defaultValue, bool failIfNotFound=false) const
void appendObservations(const std::vector< mrpt::serialization::CSerializable::Ptr > &obj)
This method must be called by derived classes to enqueue a new observation in the list to be returned...
std::string m_sensorLabel
See CGenericSensor.
An interface to read from data acquisition boards compatible with National Instruments "DAQmx Base" o...
void stop()
Stop the grabbing threads for DAQ tasks.
virtual void initialize()
Setup and launch the DAQ tasks, in parallel threads.
std::vector< mrpt::obs::CObservationRawDAQ::Ptr > m_nextObservations
A buffer for doProcess.
void loadConfig_sensorSpecific(const mrpt::config::CConfigFileBase &configSource, const std::string &iniSection)
See the class documentation at the top for expected parameters.
void writeAnalogOutputTask(size_t task_index, size_t nSamplesPerChannel, const double *volt_values, double timeout, bool groupedByChannel)
Set voltage outputs to all the outputs in an AOUT task For the meaning of parameters,...
bool checkDAQIsWorking() const
Returns true if initialize() was called and at least one task is running.
void readFromDAQ(std::vector< mrpt::obs::CObservationRawDAQ::Ptr > &outObservations, bool &hardwareError)
Receives data from the DAQ thread(s).
void writeDigitalOutputTask(size_t task_index, bool line_value, double timeout)
Changes the boolean state of one digital output line.
std::vector< TaskDescription > task_definitions
Publicly accessible vector with the list of tasks to be launched upon call to CNationalInstrumentsDAQ...
void grabbing_thread(TInfoPerTask &ipt)
Method to be executed in each parallel thread.
void doProcess()
This method will be invoked at a minimum rate of "process_rate" (Hz)
std::string sensorLabel
An arbitrary label that can be used to identify the sensor.
Definition: CObservation.h:62
mrpt::system::TTimeStamp timestamp
The associated UTC time-stamp.
Definition: CObservation.h:60
Store raw data from a Data Acquisition (DAQ) device, such that input or output analog and digital cha...
std::vector< double > CNTRIN_double
Readings from ticks counters, such as quadrature encoders.
uint16_t AIN_channel_count
Readings from analog input (ADCs) channels (vector length=channel count) in Volts.
double sample_rate
Readings from ticks counters, such as quadrature encoders.
bool AIN_interleaved
Whether the channels are interleaved (A0 A1 A2 A0 A1 A2...) or not (A0 A0 A0 A1 A1 A1 A2 A2 A2....
std::vector< double > AIN_double
Readings from analog input (ADCs) channels (vector length=channel count) in Volts.
std::vector< uint8_t > DIN
Present output values for 16-bit analog output (DACs) channels (vector length=channel count) in volts...
std::shared_ptr< CObservationRawDAQ > Ptr
Versatile class for consistent logging and management of output messages.
#define MRPT_TODO(x)
Definition: common.h:129
#define MRPT_UNUSED_PARAM(a)
Determines whether this is an X86 or AMD64 platform.
Definition: common.h:186
Scalar * iterator
Definition: eigen_plugins.h:26
#define ASSERT_EQUAL_(__A, __B)
Assert comparing two values, reporting their actual values upon failure.
Definition: exceptions.h:153
#define ASSERT_ABOVE_(__A, __B)
Definition: exceptions.h:171
#define ASSERT_(f)
Defines an assertion mechanism.
Definition: exceptions.h:113
#define THROW_EXCEPTION(msg)
Definition: exceptions.h:41
#define ASSERTMSG_(f, __ERROR_MSG)
Defines an assertion mechanism.
Definition: exceptions.h:101
#define THROW_EXCEPTION_FMT(_FORMAT_STRING,...)
Definition: exceptions.h:43
GLdouble GLdouble t
Definition: glext.h:3689
GLdouble s
Definition: glext.h:3676
GLsizei const GLchar ** string
Definition: glext.h:4101
std::string format(const char *fmt,...) MRPT_printf_format_check(1
A std::string version of C sprintf.
Definition: format.cpp:16
std::string trim(const std::string &str)
Removes leading and trailing spaces.
bool strCmpI(const std::string &s1, const std::string &s2)
Return true if the two strings are equal (case insensitive)
void tokenize(const std::string &inString, const std::string &inDelimiters, OUT_CONTAINER &outTokens, bool skipBlankTokens=true) noexcept
Tokenizes a string according to a set of delimiting characters.
mrpt::system::TTimeStamp now()
A shortcut for system::getCurrentTime.
Definition: datetime.h:87
int val
Definition: mrpt_jpeglib.h:955
Contains classes for various device interfaces.
This namespace contains representation of robot actions and observations.
CArchiveStreamBase< STREAM > archiveFrom(STREAM &s)
Helper function to create a templatized wrapper CArchive object for a: MRPT's CStream,...
Definition: CArchive.h:555
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
unsigned __int32 uint32_t
Definition: rptypes.h:47
unsigned __int64 uint64_t
Definition: rptypes.h:50
std::unique_ptr< mrpt::io::CPipeWriteEndPoint > write_pipe
TaskDescription task
A copy of the original task description that generated this thread.
std::unique_ptr< mrpt::io::CPipeReadEndPoint > read_pipe
unsigned int physicalChannelCount
IMPORTANT This must be the total number of channels listed in "physicalChannel" (e....
unsigned int physicalChannelCount
IMPORTANT This must be the total number of channels listed in "physicalChannel" (e....
std::string line
The digital line (for example "Dev1/port0/line1")
std::string line
The digital line (for example "Dev1/port0/line1")
Each of the tasks to create in CNationalInstrumentsDAQ::initialize().
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_count_edges_t ci_count_edges
Counter: period of a digital signal.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_co_pulses_t co_pulses
Output counter: digital pulses output.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_pulse_width_t ci_pulse_width
Counter: measure the width of a digital pulse.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ai_t ai
Analog inputs.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_lin_encoder_t ci_lin_encoder
Counter: uses a linear encoder to measure linear position.
double samplesPerSecond
Sample clock config: samples per second.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ao_t ao
Analog outputs.
uint32_t samplesPerChannelToRead
(Default=1000) The number of samples to grab at once from each channel.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_ang_encoder_t ci_ang_encoder
Counter: uses an angular encoder to measure angular position.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_di_t di
Digital inputs (di)
uint32_t bufferSamplesPerChannel
(Default=0) From NI's docs: The number of samples the buffer can hold for each channel in the task.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_ci_period_t ci_period
Counter: period of a digital signal.
std::string sampleClkSource
Sample clock source: may be empty (default value) for some channels.
struct mrpt::hwdrivers::CNationalInstrumentsDAQ::TaskDescription::desc_do_t douts
Digital outs (do)
#define FALSE
Definition: xmlParser.h:231



Page generated by Doxygen 1.9.1 for MRPT 1.9.9 Git: 814d80880 Fri Aug 24 01:51:28 2018 +0200 at mar 26 may 2026 12:30:59 CEST