--- /dev/null
+var stream = require("stream");
+
+var ForkStream = module.exports = function ForkStream(options) {
+ options = options || {};
+
+ options.objectMode = true;
+
+ stream.Writable.call(this, options);
+
+ if (options.classifier) {
+ this._classifier = options.classifier;
+ }
+
+ this.a = new stream.Readable(options);
+ this.b = new stream.Readable(options);
+
+ var self = this;
+
+ var resume = function resume() {
+ if (self.resume) {
+ var r = self.resume;
+ self.resume = null;
+ r.call(null);
+ }
+ };
+
+ this.a._read = resume;
+ this.b._read = resume;
+
+ this.on("finish", function() {
+ self.a.push(null);
+ self.b.push(null);
+ });
+};
+ForkStream.prototype = Object.create(stream.Writable.prototype, {constructor: {value: ForkStream}});
+
+ForkStream.prototype._classifier = function(e, done) {
+ return done(null, !!e);
+};
+
+ForkStream.prototype._write = function _write(input, encoding, done) {
+ var self = this;
+
+ this._classifier.call(null, input, function(err, res) {
+ if (err) {
+ return done(err);
+ }
+
+ var out = res ? self.a : self.b;
+
+ if (out.push(input)) {
+ return done();
+ } else {
+ self.resume = done;
+ }
+ });
+};