661 lines
25 KiB
JavaScript
Executable File
661 lines
25 KiB
JavaScript
Executable File
var assert = require('assert');
|
|
var Stream = require('readable-stream');
|
|
var StreamTest = require('streamtest');
|
|
var StreamQueue = require('../src');
|
|
|
|
|
|
|
|
// Tests
|
|
describe('StreamQueue', function() {
|
|
|
|
// Iterating through versions
|
|
StreamTest.versions.forEach(function(version) {
|
|
|
|
describe('for ' + version + ' streams', function() {
|
|
|
|
describe('in binary mode', function() {
|
|
|
|
describe('and with async streams', function() {
|
|
|
|
it('should work with functionnal API', function(done) {
|
|
StreamQueue(
|
|
StreamTest[version].fromChunks(['wa','dup']),
|
|
StreamTest[version].fromChunks(['pl','op']),
|
|
StreamTest[version].fromChunks(['ki','koo','lol'])
|
|
).pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
});
|
|
|
|
it('should work with functionnal API and options', function(done) {
|
|
StreamQueue({},
|
|
StreamTest[version].fromChunks(['wa','dup']),
|
|
StreamTest[version].fromChunks(['pl','op']),
|
|
StreamTest[version].fromChunks(['ki','koo','lol'])
|
|
).pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
});
|
|
|
|
it('should work with POO API', function(done) {
|
|
var queue = new StreamQueue();
|
|
queue.queue(StreamTest[version].fromChunks(['wa','dup']));
|
|
queue.queue(StreamTest[version].fromChunks(['pl','op']));
|
|
queue.queue(StreamTest[version].fromChunks(['ki','koo','lol']));
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
it('should pause streams in flowing mode', function(done) {
|
|
var queue = new StreamQueue({
|
|
pauseFlowingStream: true,
|
|
resumeFlowingStream: true
|
|
});
|
|
var flowingStream = StreamTest[version].fromChunks(['pl','op']);
|
|
flowingStream.on('data', function() {});
|
|
queue.queue(StreamTest[version].fromChunks(['wa','dup']));
|
|
queue.queue(flowingStream);
|
|
queue.queue(StreamTest[version].fromChunks(['ki','koo','lol']));
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
it('should work with POO API and options', function(done) {
|
|
var queue = new StreamQueue({
|
|
pauseFlowingStream: true,
|
|
resumeFlowingStream: true
|
|
});
|
|
queue.queue(StreamTest[version].fromChunks(['wa','dup']));
|
|
queue.queue(StreamTest[version].fromChunks(['pl','op']));
|
|
queue.queue(StreamTest[version].fromChunks(['ki','koo','lol']));
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
it('should work with POO API and a late done call', function(done) {
|
|
var queue = new StreamQueue();
|
|
queue.queue(StreamTest[version].fromChunks(['wa','dup']));
|
|
queue.queue(StreamTest[version].fromChunks(['pl','op']));
|
|
queue.queue(StreamTest[version].fromChunks(['ki','koo','lol']));
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
setTimeout(function() {
|
|
queue.done();
|
|
}, 100);
|
|
});
|
|
|
|
it('should work with POO API and no stream plus sync done', function(done) {
|
|
var queue = new StreamQueue();
|
|
assert.equal(queue.length, 0);
|
|
queue.queue();
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, '');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
it('should work with POO API and no stream plus async done', function(done) {
|
|
var queue = new StreamQueue();
|
|
assert.equal(queue.length, 0);
|
|
queue.queue();
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, '');
|
|
done();
|
|
}));
|
|
setTimeout(function() {
|
|
queue.done();
|
|
}, 100);
|
|
});
|
|
|
|
it('should work with POO API and a streamqueue stream plus async done', function(done) {
|
|
var queue = new StreamQueue();
|
|
var child = new StreamQueue();
|
|
queue.queue(child);
|
|
assert.equal(queue.length, 1);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, '');
|
|
done();
|
|
}));
|
|
child.done();
|
|
setTimeout(function() {
|
|
queue.done();
|
|
}, 100);
|
|
});
|
|
|
|
it('should work with POO API and a streamqueue stream plus async done', function(done) {
|
|
var queue = new StreamQueue();
|
|
var child = new StreamQueue();
|
|
queue.queue(child);
|
|
assert.equal(queue.length, 1);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, '');
|
|
done();
|
|
}));
|
|
child.done();
|
|
queue.done();
|
|
});
|
|
|
|
it('should work with POO API and a streamqueue ended stream plus async done', function(done) {
|
|
var queue = new StreamQueue();
|
|
var child = new StreamQueue();
|
|
queue.queue(child);
|
|
child.done();
|
|
assert.equal(queue.length, 1);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, '');
|
|
done();
|
|
}));
|
|
setTimeout(function() {
|
|
queue.done();
|
|
}, 100);
|
|
});
|
|
|
|
it('should fire end asynchronously with streams', function(done) {
|
|
var queue = new StreamQueue();
|
|
var ended = false;
|
|
queue.queue(StreamTest[version].fromChunks(['wa','dup'])
|
|
.on('end', function() {
|
|
assert.equal(ended, false);
|
|
}));
|
|
queue.queue(StreamTest[version].fromChunks(['pl','op'])
|
|
.on('end', function() {
|
|
assert.equal(ended, false);
|
|
}));
|
|
queue.queue(StreamTest[version].fromChunks(['ki','koo','lol'])
|
|
.on('end', function() {
|
|
assert.equal(ended, false);
|
|
}));
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.on('end', function() {
|
|
ended = true;
|
|
});
|
|
queue.done();
|
|
assert.equal(ended, false);
|
|
});
|
|
|
|
it('should fire end asynchronously when empty', function(done) {
|
|
var queue = new StreamQueue();
|
|
var ended = false;
|
|
assert.equal(queue.length, 0);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, '');
|
|
done();
|
|
}));
|
|
queue.on('end', function() {
|
|
ended = true;
|
|
});
|
|
queue.done();
|
|
assert.equal(ended, false);
|
|
});
|
|
|
|
it('should work with POO API and a streamqueue ended stream plus sync done', function(done) {
|
|
var queue = new StreamQueue();
|
|
var child = new StreamQueue();
|
|
queue.queue(child);
|
|
child.done();
|
|
assert.equal(queue.length, 1);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, '');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
it('should work with POO API and a streamqueue ended stream plus async done', function(done) {
|
|
var queue = new StreamQueue();
|
|
var child = new StreamQueue();
|
|
child.done();
|
|
queue.queue(child);
|
|
assert.equal(queue.length, 1);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, '');
|
|
done();
|
|
}));
|
|
setTimeout(function() {
|
|
queue.done();
|
|
}, 100);
|
|
});
|
|
|
|
it('should work with POO API and a streamqueue ended stream plus sync done', function(done) {
|
|
var queue = new StreamQueue();
|
|
var child = new StreamQueue();
|
|
child.done();
|
|
queue.queue(child);
|
|
assert.equal(queue.length, 1);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, '');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
if('v2' === version)
|
|
it('should reemit errors', function(done) {
|
|
var _err;
|
|
var queue = new StreamQueue();
|
|
queue.queue(StreamTest[version].fromErroredChunks(new Error('Aouch!'), []));
|
|
queue.queue(StreamTest[version].fromChunks(['wa','dup']));
|
|
queue.queue(StreamTest[version].fromChunks(['pl','op']));
|
|
queue.queue(StreamTest[version].fromChunks(['ki','koo','lol']));
|
|
assert.equal(queue.length, 4);
|
|
queue.on('error', function(err) {
|
|
_err = err;
|
|
});
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert(_err);
|
|
assert.equal(_err.message, 'Aouch!');
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
if('v2' === version)
|
|
it('should reemit errors elsewhere', function(done) {
|
|
var _err;
|
|
var queue = new StreamQueue();
|
|
queue.queue(StreamTest[version].fromChunks(['wa','dup']));
|
|
queue.queue(StreamTest[version].fromChunks(['pl','op']));
|
|
queue.queue(StreamTest[version].fromErroredChunks(new Error('Aouch!'), []));
|
|
queue.queue(StreamTest[version].fromChunks(['ki','koo','lol']));
|
|
assert.equal(queue.length, 4);
|
|
queue.on('error', function(err) {
|
|
_err = err;
|
|
});
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert(_err);
|
|
assert.equal(_err.message, 'Aouch!');
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
});
|
|
|
|
describe('and with sync streams', function() {
|
|
|
|
it('should work with functionnal API', function(done) {
|
|
var stream1 = StreamTest[version].syncReadableChunks();
|
|
var stream2 = StreamTest[version].syncReadableChunks();
|
|
var stream3 = StreamTest[version].syncReadableChunks();
|
|
StreamQueue(
|
|
stream1, stream2, stream3
|
|
).pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
StreamTest[version].syncWrite(stream1, ['wa','dup']);
|
|
StreamTest[version].syncWrite(stream2, ['pl','op']);
|
|
StreamTest[version].syncWrite(stream3, ['ki','koo', 'lol']);
|
|
});
|
|
|
|
it('should work with POO API', function(done) {
|
|
var queue = new StreamQueue();
|
|
var stream1 = StreamTest[version].syncReadableChunks();
|
|
var stream2 = StreamTest[version].syncReadableChunks();
|
|
var stream3 = StreamTest[version].syncReadableChunks();
|
|
queue.queue(stream1);
|
|
queue.queue(stream2);
|
|
queue.queue(stream3);
|
|
StreamTest[version].syncWrite(stream1, ['wa','dup']);
|
|
StreamTest[version].syncWrite(stream2, ['pl','op']);
|
|
StreamTest[version].syncWrite(stream3, ['ki','koo', 'lol']);
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
it('should emit an error when calling done twice', function(done) {
|
|
var queue = new StreamQueue();
|
|
var stream1 = StreamTest[version].syncReadableChunks();
|
|
var stream2 = StreamTest[version].syncReadableChunks();
|
|
var stream3 = StreamTest[version].syncReadableChunks();
|
|
queue.queue(stream1);
|
|
queue.queue(stream2);
|
|
queue.queue(stream3);
|
|
StreamTest[version].syncWrite(stream1, ['wa','dup']);
|
|
StreamTest[version].syncWrite(stream2, ['pl','op']);
|
|
StreamTest[version].syncWrite(stream3, ['ki','koo', 'lol']);
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
assert.throws(function() {
|
|
queue.done();
|
|
});
|
|
});
|
|
|
|
it('should emit an error when queueing after done was called', function(done) {
|
|
var queue = new StreamQueue();
|
|
var stream1 = StreamTest[version].syncReadableChunks();
|
|
var stream2 = StreamTest[version].syncReadableChunks();
|
|
var stream3 = StreamTest[version].syncReadableChunks();
|
|
queue.queue(stream1);
|
|
queue.queue(stream2);
|
|
queue.queue(stream3);
|
|
StreamTest[version].syncWrite(stream1, ['wa','dup']);
|
|
StreamTest[version].syncWrite(stream2, ['pl','op']);
|
|
StreamTest[version].syncWrite(stream3, ['ki','koo', 'lol']);
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
assert.throws(function() {
|
|
queue.queue(StreamTest[version].syncReadableChunks());
|
|
});
|
|
});
|
|
|
|
if('v2' === version)
|
|
it('should reemit errors', function(done) {
|
|
var _err;
|
|
var queue = new StreamQueue();
|
|
var stream1 = StreamTest[version].syncReadableChunks();
|
|
var stream2 = StreamTest[version].syncReadableChunks();
|
|
var stream3 = StreamTest[version].syncReadableChunks();
|
|
var stream4 = StreamTest[version].syncReadableChunks();
|
|
queue.queue(stream1);
|
|
queue.queue(stream2);
|
|
queue.queue(stream3);
|
|
queue.queue(stream4);
|
|
queue.on('error', function(err) {
|
|
_err = err;
|
|
});
|
|
StreamTest[version].syncError(stream1, new Error('Aouch!'));
|
|
StreamTest[version].syncWrite(stream2, ['wa','dup']);
|
|
StreamTest[version].syncWrite(stream3, ['pl','op']);
|
|
StreamTest[version].syncWrite(stream4, ['ki','koo', 'lol']);
|
|
assert.equal(queue.length, 4);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert(_err);
|
|
assert.equal(_err.message, 'Aouch!');
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
});
|
|
|
|
describe('and with functions returning streams', function() {
|
|
|
|
it('should work with functionnal API', function(done) {
|
|
StreamQueue(
|
|
StreamTest[version].fromChunks.bind(null, ['wa','dup']),
|
|
StreamTest[version].fromChunks.bind(null, ['pl','op']),
|
|
StreamTest[version].fromChunks.bind(null, ['ki','koo','lol'])
|
|
).pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
});
|
|
|
|
it('should work with functionnal API and options', function(done) {
|
|
StreamQueue(
|
|
StreamTest[version].fromChunks.bind(null, ['wa','dup']),
|
|
StreamTest[version].fromChunks.bind(null, ['pl','op']),
|
|
StreamTest[version].fromChunks.bind(null, ['ki','koo','lol'])
|
|
).pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
});
|
|
|
|
it('should work with POO API', function(done) {
|
|
var queue = new StreamQueue();
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['wa','dup']));
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['pl','op']));
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['ki','koo','lol']));
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
it('should pause streams in flowing mode', function(done) {
|
|
var queue = new StreamQueue({
|
|
pauseFlowingStream: true,
|
|
resumeFlowingStream: true
|
|
});
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['wa','dup']));
|
|
queue.queue(function() {
|
|
var stream = StreamTest[version].fromChunks(['pl','op']);
|
|
stream.on('data', function() {});
|
|
return stream;
|
|
});
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['ki','koo','lol']));
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
it('should work with POO API and options', function(done) {
|
|
var queue = new StreamQueue({
|
|
pauseFlowingStream: true,
|
|
resumeFlowingStream: true
|
|
});
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['wa','dup']));
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['pl','op']));
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['ki','koo','lol']));
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
it('should work with POO API and a late done call', function(done) {
|
|
var queue = new StreamQueue();
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['wa','dup']));
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['pl','op']));
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['ki','koo','lol']));
|
|
assert.equal(queue.length, 3);
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
setTimeout(function() {
|
|
queue.done();
|
|
}, 100);
|
|
});
|
|
|
|
if('v2' === version)
|
|
it('should reemit errors', function(done) {
|
|
var _err;
|
|
var queue = new StreamQueue();
|
|
queue.queue(StreamTest[version].fromErroredChunks.bind(null, new Error('Aouch!'), []));
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['wa','dup']));
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['pl','op']));
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['ki','koo','lol']));
|
|
assert.equal(queue.length, 4);
|
|
queue.on('error', function(err) {
|
|
_err = err;
|
|
});
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert(_err);
|
|
assert.equal(_err.message, 'Aouch!');
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
if('v2' === version)
|
|
it('should reemit errors elsewhere', function(done) {
|
|
var _err;
|
|
var queue = new StreamQueue();
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['wa','dup']));
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['pl','op']));
|
|
queue.queue(StreamTest[version].fromErroredChunks.bind(null, new Error('Aouch!'), []));
|
|
queue.queue(StreamTest[version].fromChunks.bind(null, ['ki','koo','lol']));
|
|
assert.equal(queue.length, 4);
|
|
queue.on('error', function(err) {
|
|
_err = err;
|
|
});
|
|
queue.pipe(StreamTest[version].toText(function(err, text) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert(_err);
|
|
assert.equal(_err.message, 'Aouch!');
|
|
assert.equal(text, 'wadupplopkikoolol');
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
describe('in object mode', function() {
|
|
|
|
it('should work', function(done) {
|
|
var queue = new StreamQueue({objectMode: true});
|
|
queue.queue(StreamTest[version].fromObjects([{s:'wa'},{s:'dup'}]));
|
|
queue.queue(StreamTest[version].fromObjects([{s:'pl'},{s:'op'}]));
|
|
queue.queue(StreamTest[version].fromObjects([{s:'ki'},{s:'koo'},{s:'lol'}]));
|
|
queue.pipe(StreamTest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
done(err);
|
|
}
|
|
assert.deepEqual(objs, [{s:'wa'},{s:'dup'},{s:'pl'},{s:'op'},{s:'ki'},{s:'koo'},{s:'lol'}]);
|
|
done();
|
|
}));
|
|
queue.done();
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|