48 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			48 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| 'use strict';
 | |
| 
 | |
| var Cell   = require('./cell'),
 | |
|     Pledge = require('./pledge');
 | |
| 
 | |
| var Pipeline = function(sessions) {
 | |
|   this._cells   = sessions.map(function(session) { return new Cell(session) });
 | |
|   this._stopped = { incoming: false, outgoing: false };
 | |
| };
 | |
| 
 | |
| Pipeline.prototype.processIncomingMessage = function(message, callback, context) {
 | |
|   if (this._stopped.incoming) return;
 | |
|   this._loop('incoming', this._cells.length - 1, -1, -1, message, callback, context);
 | |
| };
 | |
| 
 | |
| Pipeline.prototype.processOutgoingMessage = function(message, callback, context) {
 | |
|   if (this._stopped.outgoing) return;
 | |
|   this._loop('outgoing', 0, this._cells.length, 1, message, callback, context);
 | |
| };
 | |
| 
 | |
| Pipeline.prototype.close = function(callback, context) {
 | |
|   this._stopped = { incoming: true, outgoing: true };
 | |
| 
 | |
|   var closed = this._cells.map(function(a) { return a.close() });
 | |
|   if (callback)
 | |
|     Pledge.all(closed).then(function() { callback.call(context) });
 | |
| };
 | |
| 
 | |
| Pipeline.prototype._loop = function(direction, start, end, step, message, callback, context) {
 | |
|   var cells = this._cells,
 | |
|       n     = cells.length,
 | |
|       self  = this;
 | |
| 
 | |
|   while (n--) cells[n].pending(direction);
 | |
| 
 | |
|   var pipe = function(index, error, msg) {
 | |
|     if (index === end) return callback.call(context, error, msg);
 | |
| 
 | |
|     cells[index][direction](error, msg, function(err, m) {
 | |
|       if (err) self._stopped[direction] = true;
 | |
|       pipe(index + step, err, m);
 | |
|     });
 | |
|   };
 | |
|   pipe(start, null, message);
 | |
| };
 | |
| 
 | |
| module.exports = Pipeline;
 |