nodequad

AQ32Protocol

declaration
AQ32Protocol

Option name Type Description
nq object A Nodequad object
root object Root of observeable config and state object

AQ32Protocol object responsible for vehicle protocol specifics. The returned instance is a subclass of event emitter and allows wildcard events.

var AQ32Protocol = module.exports = function AQ32Protocol(aq, root) {
	events.EventEmitter2.call(this, { wildcard: true });

	this.aq = aq, this.root = root,
	this.ee = { config: O(this.root.config), state: O(this.root.state) },
	this.streams = {},
	this.activeStreams = [];

	var self = this;
	this.ee.listeners = {
		config: function callListener(ev) { handleConfigChanges.call(self, ev) },
		state:  function callListener(ev) { handleStateChanges.call(self, ev) }
	};

	Object.keys(methodMapping).forEach(function forEachMethodToMap(methodName) {
		self.root.methods[methodName] = function callMappedMethod() {
			self.commandQueue.push({
				type: 'method',
				command: methodMapping[methodName]
			});
		};
	});

	this.ee.config.on('change', this.ee.listeners.config);
	//this.ee.state.on('change', this.handleStateChanges());

	this.commandQueue = [];
	Array.observe(this.commandQueue, this.handleCommands());
};
util.inherits(AQ32Protocol, events.EventEmitter2);

sync

method
AQ32Protocol.prototype.sync()

Option name Type Description
path String of config or state to sync, wildcards supported

Sync a config or state path from vehicle (preferring vehicle data as source of truth)

AQ32Protocol.prototype.sync = function sync(path) {
	var self = this;

	findCommandsByPath(self.root, path).forEach(function forEachCommand(command) {
		self.commandQueue.push({
			type: 'getter',
			command: command,
			path: path
		});
	});
};

stream

method
AQ32Protocol.prototype.stream()

Option name Type Description
path String of config or state to stream, wildcards supported

Begin streaming of data from vehicle

AQ32Protocol.prototype.stream = function stream(path) {
	var self = this,
	    driverStream = self.aq.getDriver().getStream();

	findCommandsByPath(this.root, path).forEach(function setupStream(command) {
		console.log('setup stream', command);
		var c = command.desc;
		if(self.streams[command.name] === undefined && c.commands.stream) {
			self.streams[command.name] = new streamMapping[c.stream](self, c);
			console.dir(self.streams);
			self.activeStreams.push(self.streams[command.name]);
			self.streams[command.name].on('data', function streamDataCb(obj) {
				// copy over props from stream to root object
				_.assign(self.root, obj);
			});
		}
	});

	if(!this.streaming) this.streaming = multiplexStreams();

	function multiplexStreams() {
		var samples = 0;
		function sampled() {
			if(samples > AQ32Protocol.STREAM_MUX_SAMPLES && self.activeStreams.length > 0) {
				// unpipe first stream
				driverStream.unpipe(self.activeStreams[0]);
				// set the oldest pending stream as active on the stack
				self.activeStreams.shift(self.activeStreams.pop());
				// send the command character to the driver
				self.aq.getDriver().send(self.activeStreams[0].command.commands.stream, 0, function nextStream() {
					// pipe the output to this stream
					driverStream.pipe(self.activeStreams[0]);
					// reset samples
					samples = 0;
				});
			} else {
				samples++;
			}
		}

		self.activeStreams[0].on('data', sampled);
		driverStream.pipe(self.activeStreams[0]);
		self.aq.getDriver().send(self.activeStreams[0].command.commands.stream);

		return true;
	}
};

pauseStream

method
AQ32Protocol.prototype.pauseStream()

Option name Type Description
path String of config or state to stream, wildcards supported

Pause streaming of data from vehicle

AQ32Protocol.prototype.pauseStream = function pauseStream(path) {
	var self = this;

	findCommandsByPath(this.root, path).forEach(function pauseStreamByCommand(command) {
		if(self.streams[command] != undefined) {
			// unpipe
			self.aq.getDriver().getStream().unpipe(self.streams[command]);
			// remove all listeners
			self.streams[command].removeAllListeners();
			// remove from array
			self.activeStreams.splice(self.activeStreams.indexOf(self.streams[command]), 1);
			// delete from hash
			delete self.streams[command];
		}
	});

	// check array and reset streaming condition
	if(this.activeStreams.length === 0) this.streaming = false;
};