3 * https://github.com/caolan/async
5 * Copyright 2010-2014 Caolan McMahon
6 * Released under the MIT license
8 /*jshint onevar: false, indent:4 */
9 /*global setImmediate: false, setTimeout: false, console: false */
14 // global on the server, window in the browser
15 var root, previous_async;
19 previous_async = root.async;
22 async.noConflict = function () {
23 root.async = previous_async;
27 function only_once(fn) {
30 if (called) throw new Error("Callback was already called.");
32 fn.apply(root, arguments);
36 //// cross-browser compatiblity functions ////
38 var _toString = Object.prototype.toString;
40 var _isArray = Array.isArray || function (obj) {
41 return _toString.call(obj) === '[object Array]';
44 var _each = function (arr, iterator) {
45 for (var i = 0; i < arr.length; i += 1) {
46 iterator(arr[i], i, arr);
50 var _map = function (arr, iterator) {
52 return arr.map(iterator);
55 _each(arr, function (x, i, a) {
56 results.push(iterator(x, i, a));
61 var _reduce = function (arr, iterator, memo) {
63 return arr.reduce(iterator, memo);
65 _each(arr, function (x, i, a) {
66 memo = iterator(memo, x, i, a);
71 var _keys = function (obj) {
73 return Object.keys(obj);
77 if (obj.hasOwnProperty(k)) {
84 //// exported async module functions ////
86 //// nextTick implementation with browser-compatible fallback ////
87 if (typeof process === 'undefined' || !(process.nextTick)) {
88 if (typeof setImmediate === 'function') {
89 async.nextTick = function (fn) {
90 // not a direct alias for IE10 compatibility
93 async.setImmediate = async.nextTick;
96 async.nextTick = function (fn) {
99 async.setImmediate = async.nextTick;
103 async.nextTick = process.nextTick;
104 if (typeof setImmediate !== 'undefined') {
105 async.setImmediate = function (fn) {
106 // not a direct alias for IE10 compatibility
111 async.setImmediate = async.nextTick;
115 async.each = function (arr, iterator, callback) {
116 callback = callback || function () {};
121 _each(arr, function (x) {
122 iterator(x, only_once(done) );
127 callback = function () {};
131 if (completed >= arr.length) {
137 async.forEach = async.each;
139 async.eachSeries = function (arr, iterator, callback) {
140 callback = callback || function () {};
145 var iterate = function () {
146 iterator(arr[completed], function (err) {
149 callback = function () {};
153 if (completed >= arr.length) {
164 async.forEachSeries = async.eachSeries;
166 async.eachLimit = function (arr, limit, iterator, callback) {
167 var fn = _eachLimit(limit);
168 fn.apply(null, [arr, iterator, callback]);
170 async.forEachLimit = async.eachLimit;
172 var _eachLimit = function (limit) {
174 return function (arr, iterator, callback) {
175 callback = callback || function () {};
176 if (!arr.length || limit <= 0) {
183 (function replenish () {
184 if (completed >= arr.length) {
188 while (running < limit && started < arr.length) {
191 iterator(arr[started - 1], function (err) {
194 callback = function () {};
199 if (completed >= arr.length) {
213 var doParallel = function (fn) {
215 var args = Array.prototype.slice.call(arguments);
216 return fn.apply(null, [async.each].concat(args));
219 var doParallelLimit = function(limit, fn) {
221 var args = Array.prototype.slice.call(arguments);
222 return fn.apply(null, [_eachLimit(limit)].concat(args));
225 var doSeries = function (fn) {
227 var args = Array.prototype.slice.call(arguments);
228 return fn.apply(null, [async.eachSeries].concat(args));
233 var _asyncMap = function (eachfn, arr, iterator, callback) {
234 arr = _map(arr, function (x, i) {
235 return {index: i, value: x};
238 eachfn(arr, function (x, callback) {
239 iterator(x.value, function (err) {
245 eachfn(arr, function (x, callback) {
246 iterator(x.value, function (err, v) {
247 results[x.index] = v;
251 callback(err, results);
255 async.map = doParallel(_asyncMap);
256 async.mapSeries = doSeries(_asyncMap);
257 async.mapLimit = function (arr, limit, iterator, callback) {
258 return _mapLimit(limit)(arr, iterator, callback);
261 var _mapLimit = function(limit) {
262 return doParallelLimit(limit, _asyncMap);
265 // reduce only has a series version, as doing reduce in parallel won't
266 // work in many situations.
267 async.reduce = function (arr, memo, iterator, callback) {
268 async.eachSeries(arr, function (x, callback) {
269 iterator(memo, x, function (err, v) {
278 async.inject = async.reduce;
280 async.foldl = async.reduce;
282 async.reduceRight = function (arr, memo, iterator, callback) {
283 var reversed = _map(arr, function (x) {
286 async.reduce(reversed, memo, iterator, callback);
289 async.foldr = async.reduceRight;
291 var _filter = function (eachfn, arr, iterator, callback) {
293 arr = _map(arr, function (x, i) {
294 return {index: i, value: x};
296 eachfn(arr, function (x, callback) {
297 iterator(x.value, function (v) {
304 callback(_map(results.sort(function (a, b) {
305 return a.index - b.index;
311 async.filter = doParallel(_filter);
312 async.filterSeries = doSeries(_filter);
314 async.select = async.filter;
315 async.selectSeries = async.filterSeries;
317 var _reject = function (eachfn, arr, iterator, callback) {
319 arr = _map(arr, function (x, i) {
320 return {index: i, value: x};
322 eachfn(arr, function (x, callback) {
323 iterator(x.value, function (v) {
330 callback(_map(results.sort(function (a, b) {
331 return a.index - b.index;
337 async.reject = doParallel(_reject);
338 async.rejectSeries = doSeries(_reject);
340 var _detect = function (eachfn, arr, iterator, main_callback) {
341 eachfn(arr, function (x, callback) {
342 iterator(x, function (result) {
345 main_callback = function () {};
355 async.detect = doParallel(_detect);
356 async.detectSeries = doSeries(_detect);
358 async.some = function (arr, iterator, main_callback) {
359 async.each(arr, function (x, callback) {
360 iterator(x, function (v) {
363 main_callback = function () {};
368 main_callback(false);
372 async.any = async.some;
374 async.every = function (arr, iterator, main_callback) {
375 async.each(arr, function (x, callback) {
376 iterator(x, function (v) {
378 main_callback(false);
379 main_callback = function () {};
388 async.all = async.every;
390 async.sortBy = function (arr, iterator, callback) {
391 async.map(arr, function (x, callback) {
392 iterator(x, function (err, criteria) {
397 callback(null, {value: x, criteria: criteria});
400 }, function (err, results) {
402 return callback(err);
405 var fn = function (left, right) {
406 var a = left.criteria, b = right.criteria;
407 return a < b ? -1 : a > b ? 1 : 0;
409 callback(null, _map(results.sort(fn), function (x) {
416 async.auto = function (tasks, callback) {
417 callback = callback || function () {};
418 var keys = _keys(tasks);
419 var remainingTasks = keys.length
420 if (!remainingTasks) {
427 var addListener = function (fn) {
428 listeners.unshift(fn);
430 var removeListener = function (fn) {
431 for (var i = 0; i < listeners.length; i += 1) {
432 if (listeners[i] === fn) {
433 listeners.splice(i, 1);
438 var taskComplete = function () {
440 _each(listeners.slice(0), function (fn) {
445 addListener(function () {
446 if (!remainingTasks) {
447 var theCallback = callback;
448 // prevent final callback from calling itself if it errors
449 callback = function () {};
451 theCallback(null, results);
455 _each(keys, function (k) {
456 var task = _isArray(tasks[k]) ? tasks[k]: [tasks[k]];
457 var taskCallback = function (err) {
458 var args = Array.prototype.slice.call(arguments, 1);
459 if (args.length <= 1) {
463 var safeResults = {};
464 _each(_keys(results), function(rkey) {
465 safeResults[rkey] = results[rkey];
467 safeResults[k] = args;
468 callback(err, safeResults);
469 // stop subsequent errors hitting callback multiple times
470 callback = function () {};
474 async.setImmediate(taskComplete);
477 var requires = task.slice(0, Math.abs(task.length - 1)) || [];
478 var ready = function () {
479 return _reduce(requires, function (a, x) {
480 return (a && results.hasOwnProperty(x));
481 }, true) && !results.hasOwnProperty(k);
484 task[task.length - 1](taskCallback, results);
487 var listener = function () {
489 removeListener(listener);
490 task[task.length - 1](taskCallback, results);
493 addListener(listener);
498 async.retry = function(times, task, callback) {
499 var DEFAULT_TIMES = 5;
501 // Use defaults if times not passed
502 if (typeof times === 'function') {
505 times = DEFAULT_TIMES;
507 // Make sure times is a number
508 times = parseInt(times, 10) || DEFAULT_TIMES;
509 var wrappedTask = function(wrappedCallback, wrappedResults) {
510 var retryAttempt = function(task, finalAttempt) {
511 return function(seriesCallback) {
512 task(function(err, result){
513 seriesCallback(!err || finalAttempt, {err: err, result: result});
518 attempts.push(retryAttempt(task, !(times-=1)));
520 async.series(attempts, function(done, data){
521 data = data[data.length - 1];
522 (wrappedCallback || callback)(data.err, data.result);
525 // If a callback is passed, run this as a controll flow
526 return callback ? wrappedTask() : wrappedTask
529 async.waterfall = function (tasks, callback) {
530 callback = callback || function () {};
531 if (!_isArray(tasks)) {
532 var err = new Error('First argument to waterfall must be an array of functions');
533 return callback(err);
538 var wrapIterator = function (iterator) {
539 return function (err) {
541 callback.apply(null, arguments);
542 callback = function () {};
545 var args = Array.prototype.slice.call(arguments, 1);
546 var next = iterator.next();
548 args.push(wrapIterator(next));
553 async.setImmediate(function () {
554 iterator.apply(null, args);
559 wrapIterator(async.iterator(tasks))();
562 var _parallel = function(eachfn, tasks, callback) {
563 callback = callback || function () {};
564 if (_isArray(tasks)) {
565 eachfn.map(tasks, function (fn, callback) {
568 var args = Array.prototype.slice.call(arguments, 1);
569 if (args.length <= 1) {
572 callback.call(null, err, args);
579 eachfn.each(_keys(tasks), function (k, callback) {
580 tasks[k](function (err) {
581 var args = Array.prototype.slice.call(arguments, 1);
582 if (args.length <= 1) {
589 callback(err, results);
594 async.parallel = function (tasks, callback) {
595 _parallel({ map: async.map, each: async.each }, tasks, callback);
598 async.parallelLimit = function(tasks, limit, callback) {
599 _parallel({ map: _mapLimit(limit), each: _eachLimit(limit) }, tasks, callback);
602 async.series = function (tasks, callback) {
603 callback = callback || function () {};
604 if (_isArray(tasks)) {
605 async.mapSeries(tasks, function (fn, callback) {
608 var args = Array.prototype.slice.call(arguments, 1);
609 if (args.length <= 1) {
612 callback.call(null, err, args);
619 async.eachSeries(_keys(tasks), function (k, callback) {
620 tasks[k](function (err) {
621 var args = Array.prototype.slice.call(arguments, 1);
622 if (args.length <= 1) {
629 callback(err, results);
634 async.iterator = function (tasks) {
635 var makeCallback = function (index) {
636 var fn = function () {
638 tasks[index].apply(null, arguments);
642 fn.next = function () {
643 return (index < tasks.length - 1) ? makeCallback(index + 1): null;
647 return makeCallback(0);
650 async.apply = function (fn) {
651 var args = Array.prototype.slice.call(arguments, 1);
654 null, args.concat(Array.prototype.slice.call(arguments))
659 var _concat = function (eachfn, arr, fn, callback) {
661 eachfn(arr, function (x, cb) {
662 fn(x, function (err, y) {
663 r = r.concat(y || []);
670 async.concat = doParallel(_concat);
671 async.concatSeries = doSeries(_concat);
673 async.whilst = function (test, iterator, callback) {
675 iterator(function (err) {
677 return callback(err);
679 async.whilst(test, iterator, callback);
687 async.doWhilst = function (iterator, test, callback) {
688 iterator(function (err) {
690 return callback(err);
692 var args = Array.prototype.slice.call(arguments, 1);
693 if (test.apply(null, args)) {
694 async.doWhilst(iterator, test, callback);
702 async.until = function (test, iterator, callback) {
704 iterator(function (err) {
706 return callback(err);
708 async.until(test, iterator, callback);
716 async.doUntil = function (iterator, test, callback) {
717 iterator(function (err) {
719 return callback(err);
721 var args = Array.prototype.slice.call(arguments, 1);
722 if (!test.apply(null, args)) {
723 async.doUntil(iterator, test, callback);
731 async.queue = function (worker, concurrency) {
732 if (concurrency === undefined) {
735 function _insert(q, data, pos, callback) {
739 if (!_isArray(data)) {
742 if(data.length == 0) {
743 // call drain immediately if there are no tasks
744 return async.setImmediate(function() {
750 _each(data, function(task) {
753 callback: typeof callback === 'function' ? callback : null
757 q.tasks.unshift(item);
762 if (q.saturated && q.tasks.length === q.concurrency) {
765 async.setImmediate(q.process);
772 concurrency: concurrency,
778 push: function (data, callback) {
779 _insert(q, data, false, callback);
785 unshift: function (data, callback) {
786 _insert(q, data, true, callback);
788 process: function () {
789 if (!q.paused && workers < q.concurrency && q.tasks.length) {
790 var task = q.tasks.shift();
791 if (q.empty && q.tasks.length === 0) {
795 var next = function () {
798 task.callback.apply(task, arguments);
800 if (q.drain && q.tasks.length + workers === 0) {
805 var cb = only_once(next);
806 worker(task.data, cb);
809 length: function () {
810 return q.tasks.length;
812 running: function () {
816 return q.tasks.length + workers === 0;
819 if (q.paused === true) { return; }
822 resume: function () {
823 if (q.paused === false) { return; }
825 // Need to call q.process once per concurrent
826 // worker to preserve full concurrency after pause
827 for (var w = 1; w <= q.concurrency; w++) {
828 async.setImmediate(q.process);
835 async.priorityQueue = function (worker, concurrency) {
837 function _compareTasks(a, b){
838 return a.priority - b.priority;
841 function _binarySearch(sequence, item, compare) {
843 end = sequence.length - 1;
845 var mid = beg + ((end - beg + 1) >>> 1);
846 if (compare(item, sequence[mid]) >= 0) {
855 function _insert(q, data, priority, callback) {
859 if (!_isArray(data)) {
862 if(data.length == 0) {
863 // call drain immediately if there are no tasks
864 return async.setImmediate(function() {
870 _each(data, function(task) {
874 callback: typeof callback === 'function' ? callback : null
877 q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);
879 if (q.saturated && q.tasks.length === q.concurrency) {
882 async.setImmediate(q.process);
886 // Start with a normal queue
887 var q = async.queue(worker, concurrency);
889 // Override push to accept second parameter representing priority
890 q.push = function (data, priority, callback) {
891 _insert(q, data, priority, callback);
894 // Remove unshift function
900 async.cargo = function (worker, payload) {
911 push: function (data, callback) {
912 if (!_isArray(data)) {
915 _each(data, function(task) {
918 callback: typeof callback === 'function' ? callback : null
920 cargo.drained = false;
921 if (cargo.saturated && tasks.length === payload) {
925 async.setImmediate(cargo.process);
927 process: function process() {
929 if (tasks.length === 0) {
930 if(cargo.drain && !cargo.drained) cargo.drain();
931 cargo.drained = true;
935 var ts = typeof payload === 'number'
936 ? tasks.splice(0, payload)
937 : tasks.splice(0, tasks.length);
939 var ds = _map(ts, function (task) {
943 if(cargo.empty) cargo.empty();
945 worker(ds, function () {
948 var args = arguments;
949 _each(ts, function (data) {
951 data.callback.apply(null, args);
958 length: function () {
961 running: function () {
968 var _console_fn = function (name) {
969 return function (fn) {
970 var args = Array.prototype.slice.call(arguments, 1);
971 fn.apply(null, args.concat([function (err) {
972 var args = Array.prototype.slice.call(arguments, 1);
973 if (typeof console !== 'undefined') {
979 else if (console[name]) {
980 _each(args, function (x) {
988 async.log = _console_fn('log');
989 async.dir = _console_fn('dir');
990 /*async.info = _console_fn('info');
991 async.warn = _console_fn('warn');
992 async.error = _console_fn('error');*/
994 async.memoize = function (fn, hasher) {
997 hasher = hasher || function (x) {
1000 var memoized = function () {
1001 var args = Array.prototype.slice.call(arguments);
1002 var callback = args.pop();
1003 var key = hasher.apply(null, args);
1005 async.nextTick(function () {
1006 callback.apply(null, memo[key]);
1009 else if (key in queues) {
1010 queues[key].push(callback);
1013 queues[key] = [callback];
1014 fn.apply(null, args.concat([function () {
1015 memo[key] = arguments;
1016 var q = queues[key];
1018 for (var i = 0, l = q.length; i < l; i++) {
1019 q[i].apply(null, arguments);
1024 memoized.memo = memo;
1025 memoized.unmemoized = fn;
1029 async.unmemoize = function (fn) {
1030 return function () {
1031 return (fn.unmemoized || fn).apply(null, arguments);
1035 async.times = function (count, iterator, callback) {
1037 for (var i = 0; i < count; i++) {
1040 return async.map(counter, iterator, callback);
1043 async.timesSeries = function (count, iterator, callback) {
1045 for (var i = 0; i < count; i++) {
1048 return async.mapSeries(counter, iterator, callback);
1051 async.seq = function (/* functions... */) {
1052 var fns = arguments;
1053 return function () {
1055 var args = Array.prototype.slice.call(arguments);
1056 var callback = args.pop();
1057 async.reduce(fns, args, function (newargs, fn, cb) {
1058 fn.apply(that, newargs.concat([function () {
1059 var err = arguments[0];
1060 var nextargs = Array.prototype.slice.call(arguments, 1);
1064 function (err, results) {
1065 callback.apply(that, [err].concat(results));
1070 async.compose = function (/* functions... */) {
1071 return async.seq.apply(null, Array.prototype.reverse.call(arguments));
1074 var _applyEach = function (eachfn, fns /*args...*/) {
1075 var go = function () {
1077 var args = Array.prototype.slice.call(arguments);
1078 var callback = args.pop();
1079 return eachfn(fns, function (fn, cb) {
1080 fn.apply(that, args.concat([cb]));
1084 if (arguments.length > 2) {
1085 var args = Array.prototype.slice.call(arguments, 2);
1086 return go.apply(this, args);
1092 async.applyEach = doParallel(_applyEach);
1093 async.applyEachSeries = doSeries(_applyEach);
1095 async.forever = function (fn, callback) {
1096 function next(err) {
1099 return callback(err);
1109 if (typeof module !== 'undefined' && module.exports) {
1110 module.exports = async;
1113 else if (typeof define !== 'undefined' && define.amd) {
1114 define([], function () {
1118 // included directly via <script> tag