Option name | Type | Description |
---|---|---|
nq | object | A Nodequad object |
root | object | Root of observeable config and state object |
AQ32Protocol object responsible for vehicle protocol specifics. The returned instance is a subclass of event emitter and allows wildcard events.
var AQ32Protocol = module.exports = function AQ32Protocol(aq, root) {
events.EventEmitter2.call(this, { wildcard: true });
this.aq = aq, this.root = root,
this.ee = { config: O(this.root.config), state: O(this.root.state) },
this.streams = {},
this.activeStreams = [];
var self = this;
this.ee.listeners = {
config: function callListener(ev) { handleConfigChanges.call(self, ev) },
state: function callListener(ev) { handleStateChanges.call(self, ev) }
};
Object.keys(methodMapping).forEach(function forEachMethodToMap(methodName) {
self.root.methods[methodName] = function callMappedMethod() {
self.commandQueue.push({
type: 'method',
command: methodMapping[methodName]
});
};
});
this.ee.config.on('change', this.ee.listeners.config);
//this.ee.state.on('change', this.handleStateChanges());
this.commandQueue = [];
Array.observe(this.commandQueue, this.handleCommands());
};
util.inherits(AQ32Protocol, events.EventEmitter2);
Option name | Type | Description |
---|---|---|
path | String | of config or state to sync, wildcards supported |
Sync a config or state path from vehicle (preferring vehicle data as source of truth)
AQ32Protocol.prototype.sync = function sync(path) {
var self = this;
findCommandsByPath(self.root, path).forEach(function forEachCommand(command) {
self.commandQueue.push({
type: 'getter',
command: command,
path: path
});
});
};
Option name | Type | Description |
---|---|---|
path | String | of config or state to stream, wildcards supported |
Begin streaming of data from vehicle
AQ32Protocol.prototype.stream = function stream(path) {
var self = this,
driverStream = self.aq.getDriver().getStream();
findCommandsByPath(this.root, path).forEach(function setupStream(command) {
console.log('setup stream', command);
var c = command.desc;
if(self.streams[command.name] === undefined && c.commands.stream) {
self.streams[command.name] = new streamMapping[c.stream](self, c);
console.dir(self.streams);
self.activeStreams.push(self.streams[command.name]);
self.streams[command.name].on('data', function streamDataCb(obj) {
// copy over props from stream to root object
_.assign(self.root, obj);
});
}
});
if(!this.streaming) this.streaming = multiplexStreams();
function multiplexStreams() {
var samples = 0;
function sampled() {
if(samples > AQ32Protocol.STREAM_MUX_SAMPLES && self.activeStreams.length > 0) {
// unpipe first stream
driverStream.unpipe(self.activeStreams[0]);
// set the oldest pending stream as active on the stack
self.activeStreams.shift(self.activeStreams.pop());
// send the command character to the driver
self.aq.getDriver().send(self.activeStreams[0].command.commands.stream, 0, function nextStream() {
// pipe the output to this stream
driverStream.pipe(self.activeStreams[0]);
// reset samples
samples = 0;
});
} else {
samples++;
}
}
self.activeStreams[0].on('data', sampled);
driverStream.pipe(self.activeStreams[0]);
self.aq.getDriver().send(self.activeStreams[0].command.commands.stream);
return true;
}
};
Option name | Type | Description |
---|---|---|
path | String | of config or state to stream, wildcards supported |
Pause streaming of data from vehicle
AQ32Protocol.prototype.pauseStream = function pauseStream(path) {
var self = this;
findCommandsByPath(this.root, path).forEach(function pauseStreamByCommand(command) {
if(self.streams[command] != undefined) {
// unpipe
self.aq.getDriver().getStream().unpipe(self.streams[command]);
// remove all listeners
self.streams[command].removeAllListeners();
// remove from array
self.activeStreams.splice(self.activeStreams.indexOf(self.streams[command]), 1);
// delete from hash
delete self.streams[command];
}
});
// check array and reset streaming condition
if(this.activeStreams.length === 0) this.streaming = false;
};