3 * https://github.com/caolan/async
5 * Copyright 2010-2014 Caolan McMahon
6 * Released under the MIT license
12 function identity(v) {
22 // global on the server, window in the browser
25 // Establish the root object, `window` (`self`) in the browser, `global`
26 // on the server, or `this` in some virtual machines. We use `self`
27 // instead of `window` for `WebWorker` support.
28 var root = typeof self === 'object' && self.self === self && self ||
29 typeof global === 'object' && global.global === global && global ||
33 previous_async = root.async;
36 async.noConflict = function () {
37 root.async = previous_async;
41 function only_once(fn) {
43 if (fn === null) throw new Error("Callback was already called.");
44 fn.apply(this, arguments);
51 if (fn === null) return;
52 fn.apply(this, arguments);
57 //// cross-browser compatiblity functions ////
59 var _toString = Object.prototype.toString;
61 var _isArray = Array.isArray || function (obj) {
62 return _toString.call(obj) === '[object Array]';
65 // Ported from underscore.js isObject
66 var _isObject = function(obj) {
67 var type = typeof obj;
68 return type === 'function' || type === 'object' && !!obj;
71 function _isArrayLike(arr) {
72 return _isArray(arr) || (
73 // has a positive integer length property
74 typeof arr.length === "number" &&
80 function _arrayEach(arr, iterator) {
84 while (++index < length) {
85 iterator(arr[index], index, arr);
89 function _map(arr, iterator) {
92 result = Array(length);
94 while (++index < length) {
95 result[index] = iterator(arr[index], index, arr);
100 function _range(count) {
101 return _map(Array(count), function (v, i) { return i; });
104 function _reduce(arr, iterator, memo) {
105 _arrayEach(arr, function (x, i, a) {
106 memo = iterator(memo, x, i, a);
111 function _forEachOf(object, iterator) {
112 _arrayEach(_keys(object), function (key) {
113 iterator(object[key], key);
117 function _indexOf(arr, item) {
118 for (var i = 0; i < arr.length; i++) {
119 if (arr[i] === item) return i;
124 var _keys = Object.keys || function (obj) {
127 if (obj.hasOwnProperty(k)) {
134 function _keyIterator(coll) {
138 if (_isArrayLike(coll)) {
140 return function next() {
142 return i < len ? i : null;
147 return function next() {
149 return i < len ? keys[i] : null;
154 // Similar to ES6's rest param (http://ariya.ofilabs.com/2013/03/es6-and-rest-parameter.html)
155 // This accumulates the arguments passed into an array, after a given index.
156 // From underscore.js (https://github.com/jashkenas/underscore/pull/2140).
157 function _restParam(func, startIndex) {
158 startIndex = startIndex == null ? func.length - 1 : +startIndex;
160 var length = Math.max(arguments.length - startIndex, 0);
161 var rest = Array(length);
162 for (var index = 0; index < length; index++) {
163 rest[index] = arguments[index + startIndex];
165 switch (startIndex) {
166 case 0: return func.call(this, rest);
167 case 1: return func.call(this, arguments[0], rest);
169 // Currently unused but handle cases outside of the switch statement:
170 // var args = Array(startIndex + 1);
171 // for (index = 0; index < startIndex; index++) {
172 // args[index] = arguments[index];
174 // args[startIndex] = rest;
175 // return func.apply(this, args);
179 function _withoutIndex(iterator) {
180 return function (value, index, callback) {
181 return iterator(value, callback);
185 //// exported async module functions ////
187 //// nextTick implementation with browser-compatible fallback ////
189 // capture the global reference to guard against fakeTimer mocks
190 var _setImmediate = typeof setImmediate === 'function' && setImmediate;
192 var _delay = _setImmediate ? function(fn) {
193 // not a direct alias for IE10 compatibility
199 if (typeof process === 'object' && typeof process.nextTick === 'function') {
200 async.nextTick = process.nextTick;
202 async.nextTick = _delay;
204 async.setImmediate = _setImmediate ? _delay : async.nextTick;
208 async.each = function (arr, iterator, callback) {
209 return async.eachOf(arr, _withoutIndex(iterator), callback);
212 async.forEachSeries =
213 async.eachSeries = function (arr, iterator, callback) {
214 return async.eachOfSeries(arr, _withoutIndex(iterator), callback);
219 async.eachLimit = function (arr, limit, iterator, callback) {
220 return _eachOfLimit(limit)(arr, _withoutIndex(iterator), callback);
224 async.eachOf = function (object, iterator, callback) {
225 callback = _once(callback || noop);
226 object = object || [];
228 var iter = _keyIterator(object);
229 var key, completed = 0;
231 while ((key = iter()) != null) {
233 iterator(object[key], key, only_once(done));
236 if (completed === 0) callback(null);
243 // Check key is null in case iterator isn't exhausted
244 // and done resolved synchronously.
245 else if (key === null && completed <= 0) {
251 async.forEachOfSeries =
252 async.eachOfSeries = function (obj, iterator, callback) {
253 callback = _once(callback || noop);
255 var nextKey = _keyIterator(obj);
260 return callback(null);
262 iterator(obj[key], key, only_once(function (err) {
269 return callback(null);
272 async.setImmediate(iterate);
286 async.forEachOfLimit =
287 async.eachOfLimit = function (obj, limit, iterator, callback) {
288 _eachOfLimit(limit)(obj, iterator, callback);
291 function _eachOfLimit(limit) {
293 return function (obj, iterator, callback) {
294 callback = _once(callback || noop);
296 var nextKey = _keyIterator(obj);
298 return callback(null);
304 (function replenish () {
305 if (done && running <= 0) {
306 return callback(null);
309 while (running < limit && !errored) {
319 iterator(obj[key], key, only_once(function (err) {
335 function doParallel(fn) {
336 return function (obj, iterator, callback) {
337 return fn(async.eachOf, obj, iterator, callback);
340 function doParallelLimit(fn) {
341 return function (obj, limit, iterator, callback) {
342 return fn(_eachOfLimit(limit), obj, iterator, callback);
345 function doSeries(fn) {
346 return function (obj, iterator, callback) {
347 return fn(async.eachOfSeries, obj, iterator, callback);
351 function _asyncMap(eachfn, arr, iterator, callback) {
352 callback = _once(callback || noop);
354 var results = _isArrayLike(arr) ? [] : {};
355 eachfn(arr, function (value, index, callback) {
356 iterator(value, function (err, v) {
361 callback(err, results);
365 async.map = doParallel(_asyncMap);
366 async.mapSeries = doSeries(_asyncMap);
367 async.mapLimit = doParallelLimit(_asyncMap);
369 // reduce only has a series version, as doing reduce in parallel won't
370 // work in many situations.
373 async.reduce = function (arr, memo, iterator, callback) {
374 async.eachOfSeries(arr, function (x, i, callback) {
375 iterator(memo, x, function (err, v) {
385 async.reduceRight = function (arr, memo, iterator, callback) {
386 var reversed = _map(arr, identity).reverse();
387 async.reduce(reversed, memo, iterator, callback);
390 async.transform = function (arr, memo, iterator, callback) {
391 if (arguments.length === 3) {
394 memo = _isArray(arr) ? [] : {};
397 async.eachOf(arr, function(v, k, cb) {
398 iterator(memo, v, k, cb);
404 function _filter(eachfn, arr, iterator, callback) {
406 eachfn(arr, function (x, index, callback) {
407 iterator(x, function (v) {
409 results.push({index: index, value: x});
414 callback(_map(results.sort(function (a, b) {
415 return a.index - b.index;
423 async.filter = doParallel(_filter);
426 async.filterLimit = doParallelLimit(_filter);
429 async.filterSeries = doSeries(_filter);
431 function _reject(eachfn, arr, iterator, callback) {
432 _filter(eachfn, arr, function(value, cb) {
433 iterator(value, function(v) {
438 async.reject = doParallel(_reject);
439 async.rejectLimit = doParallelLimit(_reject);
440 async.rejectSeries = doSeries(_reject);
442 function _createTester(eachfn, check, getResult) {
443 return function(arr, limit, iterator, cb) {
445 if (cb) cb(getResult(false, void 0));
447 function iteratee(x, _, callback) {
448 if (!cb) return callback();
449 iterator(x, function (v) {
450 if (cb && check(v)) {
451 cb(getResult(true, x));
452 cb = iterator = false;
457 if (arguments.length > 3) {
458 eachfn(arr, limit, iteratee, done);
462 eachfn(arr, iteratee, done);
468 async.some = _createTester(async.eachOf, toBool, identity);
470 async.someLimit = _createTester(async.eachOfLimit, toBool, identity);
473 async.every = _createTester(async.eachOf, notId, notId);
475 async.everyLimit = _createTester(async.eachOfLimit, notId, notId);
477 function _findGetResult(v, x) {
480 async.detect = _createTester(async.eachOf, identity, _findGetResult);
481 async.detectSeries = _createTester(async.eachOfSeries, identity, _findGetResult);
482 async.detectLimit = _createTester(async.eachOfLimit, identity, _findGetResult);
484 async.sortBy = function (arr, iterator, callback) {
485 async.map(arr, function (x, callback) {
486 iterator(x, function (err, criteria) {
491 callback(null, {value: x, criteria: criteria});
494 }, function (err, results) {
496 return callback(err);
499 callback(null, _map(results.sort(comparator), function (x) {
506 function comparator(left, right) {
507 var a = left.criteria, b = right.criteria;
508 return a < b ? -1 : a > b ? 1 : 0;
512 async.auto = function (tasks, concurrency, callback) {
513 if (typeof arguments[1] === 'function') {
514 // concurrency is optional, shift the args.
515 callback = concurrency;
518 callback = _once(callback || noop);
519 var keys = _keys(tasks);
520 var remainingTasks = keys.length;
521 if (!remainingTasks) {
522 return callback(null);
525 concurrency = remainingTasks;
529 var runningTasks = 0;
531 var hasError = false;
534 function addListener(fn) {
535 listeners.unshift(fn);
537 function removeListener(fn) {
538 var idx = _indexOf(listeners, fn);
539 if (idx >= 0) listeners.splice(idx, 1);
541 function taskComplete() {
543 _arrayEach(listeners.slice(0), function (fn) {
548 addListener(function () {
549 if (!remainingTasks) {
550 callback(null, results);
554 _arrayEach(keys, function (k) {
555 if (hasError) return;
556 var task = _isArray(tasks[k]) ? tasks[k]: [tasks[k]];
557 var taskCallback = _restParam(function(err, args) {
559 if (args.length <= 1) {
563 var safeResults = {};
564 _forEachOf(results, function(val, rkey) {
565 safeResults[rkey] = val;
567 safeResults[k] = args;
570 callback(err, safeResults);
574 async.setImmediate(taskComplete);
577 var requires = task.slice(0, task.length - 1);
578 // prevent dead-locks
579 var len = requires.length;
582 if (!(dep = tasks[requires[len]])) {
583 throw new Error('Has nonexistent dependency in ' + requires.join(', '));
585 if (_isArray(dep) && _indexOf(dep, k) >= 0) {
586 throw new Error('Has cyclic dependencies');
590 return runningTasks < concurrency && _reduce(requires, function (a, x) {
591 return (a && results.hasOwnProperty(x));
592 }, true) && !results.hasOwnProperty(k);
596 task[task.length - 1](taskCallback, results);
599 addListener(listener);
601 function listener() {
604 removeListener(listener);
605 task[task.length - 1](taskCallback, results);
613 async.retry = function(times, task, callback) {
614 var DEFAULT_TIMES = 5;
615 var DEFAULT_INTERVAL = 0;
620 times: DEFAULT_TIMES,
621 interval: DEFAULT_INTERVAL
624 function parseTimes(acc, t){
625 if(typeof t === 'number'){
626 acc.times = parseInt(t, 10) || DEFAULT_TIMES;
627 } else if(typeof t === 'object'){
628 acc.times = parseInt(t.times, 10) || DEFAULT_TIMES;
629 acc.interval = parseInt(t.interval, 10) || DEFAULT_INTERVAL;
631 throw new Error('Unsupported argument type for \'times\': ' + typeof t);
635 var length = arguments.length;
636 if (length < 1 || length > 3) {
637 throw new Error('Invalid arguments - must be either (task), (task, callback), (times, task) or (times, task, callback)');
638 } else if (length <= 2 && typeof times === 'function') {
642 if (typeof times !== 'function') {
643 parseTimes(opts, times);
645 opts.callback = callback;
648 function wrappedTask(wrappedCallback, wrappedResults) {
649 function retryAttempt(task, finalAttempt) {
650 return function(seriesCallback) {
651 task(function(err, result){
652 seriesCallback(!err || finalAttempt, {err: err, result: result});
657 function retryInterval(interval){
658 return function(seriesCallback){
659 setTimeout(function(){
660 seriesCallback(null);
667 var finalAttempt = !(opts.times-=1);
668 attempts.push(retryAttempt(opts.task, finalAttempt));
669 if(!finalAttempt && opts.interval > 0){
670 attempts.push(retryInterval(opts.interval));
674 async.series(attempts, function(done, data){
675 data = data[data.length - 1];
676 (wrappedCallback || opts.callback)(data.err, data.result);
680 // If a callback is passed, run this as a controll flow
681 return opts.callback ? wrappedTask() : wrappedTask;
684 async.waterfall = function (tasks, callback) {
685 callback = _once(callback || noop);
686 if (!_isArray(tasks)) {
687 var err = new Error('First argument to waterfall must be an array of functions');
688 return callback(err);
693 function wrapIterator(iterator) {
694 return _restParam(function (err, args) {
696 callback.apply(null, [err].concat(args));
699 var next = iterator.next();
701 args.push(wrapIterator(next));
706 ensureAsync(iterator).apply(null, args);
710 wrapIterator(async.iterator(tasks))();
713 function _parallel(eachfn, tasks, callback) {
714 callback = callback || noop;
715 var results = _isArrayLike(tasks) ? [] : {};
717 eachfn(tasks, function (task, key, callback) {
718 task(_restParam(function (err, args) {
719 if (args.length <= 1) {
726 callback(err, results);
730 async.parallel = function (tasks, callback) {
731 _parallel(async.eachOf, tasks, callback);
734 async.parallelLimit = function(tasks, limit, callback) {
735 _parallel(_eachOfLimit(limit), tasks, callback);
738 async.series = function(tasks, callback) {
739 _parallel(async.eachOfSeries, tasks, callback);
742 async.iterator = function (tasks) {
743 function makeCallback(index) {
746 tasks[index].apply(null, arguments);
750 fn.next = function () {
751 return (index < tasks.length - 1) ? makeCallback(index + 1): null;
755 return makeCallback(0);
758 async.apply = _restParam(function (fn, args) {
759 return _restParam(function (callArgs) {
761 null, args.concat(callArgs)
766 function _concat(eachfn, arr, fn, callback) {
768 eachfn(arr, function (x, index, cb) {
769 fn(x, function (err, y) {
770 result = result.concat(y || []);
774 callback(err, result);
777 async.concat = doParallel(_concat);
778 async.concatSeries = doSeries(_concat);
780 async.whilst = function (test, iterator, callback) {
781 callback = callback || noop;
783 var next = _restParam(function(err, args) {
786 } else if (test.apply(this, args)) {
789 callback.apply(null, [null].concat(args));
798 async.doWhilst = function (iterator, test, callback) {
800 return async.whilst(function() {
801 return ++calls <= 1 || test.apply(this, arguments);
802 }, iterator, callback);
805 async.until = function (test, iterator, callback) {
806 return async.whilst(function() {
807 return !test.apply(this, arguments);
808 }, iterator, callback);
811 async.doUntil = function (iterator, test, callback) {
812 return async.doWhilst(iterator, function() {
813 return !test.apply(this, arguments);
817 async.during = function (test, iterator, callback) {
818 callback = callback || noop;
820 var next = _restParam(function(err, args) {
825 test.apply(this, args);
829 var check = function(err, truth) {
842 async.doDuring = function (iterator, test, callback) {
844 async.during(function(next) {
848 test.apply(this, arguments);
850 }, iterator, callback);
853 function _queue(worker, concurrency, payload) {
854 if (concurrency == null) {
857 else if(concurrency === 0) {
858 throw new Error('Concurrency must not be zero');
860 function _insert(q, data, pos, callback) {
861 if (callback != null && typeof callback !== "function") {
862 throw new Error("task callback must be a function");
865 if (!_isArray(data)) {
868 if(data.length === 0 && q.idle()) {
869 // call drain immediately if there are no tasks
870 return async.setImmediate(function() {
874 _arrayEach(data, function(task) {
877 callback: callback || noop
881 q.tasks.unshift(item);
886 if (q.tasks.length === q.concurrency) {
890 async.setImmediate(q.process);
892 function _next(q, tasks) {
897 var args = arguments;
898 _arrayEach(tasks, function (task) {
899 _arrayEach(workersList, function (worker, index) {
900 if (worker === task && !removed) {
901 workersList.splice(index, 1);
906 task.callback.apply(task, args);
908 if (q.tasks.length + workers === 0) {
916 var workersList = [];
919 concurrency: concurrency,
926 push: function (data, callback) {
927 _insert(q, data, false, callback);
933 unshift: function (data, callback) {
934 _insert(q, data, true, callback);
936 process: function () {
937 while(!q.paused && workers < q.concurrency && q.tasks.length){
939 var tasks = q.payload ?
940 q.tasks.splice(0, q.payload) :
941 q.tasks.splice(0, q.tasks.length);
943 var data = _map(tasks, function (task) {
947 if (q.tasks.length === 0) {
951 workersList.push(tasks[0]);
952 var cb = only_once(_next(q, tasks));
956 length: function () {
957 return q.tasks.length;
959 running: function () {
962 workersList: function () {
966 return q.tasks.length + workers === 0;
971 resume: function () {
972 if (q.paused === false) { return; }
974 var resumeCount = Math.min(q.concurrency, q.tasks.length);
975 // Need to call q.process once per concurrent
976 // worker to preserve full concurrency after pause
977 for (var w = 1; w <= resumeCount; w++) {
978 async.setImmediate(q.process);
985 async.queue = function (worker, concurrency) {
986 var q = _queue(function (items, cb) {
987 worker(items[0], cb);
993 async.priorityQueue = function (worker, concurrency) {
995 function _compareTasks(a, b){
996 return a.priority - b.priority;
999 function _binarySearch(sequence, item, compare) {
1001 end = sequence.length - 1;
1003 var mid = beg + ((end - beg + 1) >>> 1);
1004 if (compare(item, sequence[mid]) >= 0) {
1013 function _insert(q, data, priority, callback) {
1014 if (callback != null && typeof callback !== "function") {
1015 throw new Error("task callback must be a function");
1018 if (!_isArray(data)) {
1021 if(data.length === 0) {
1022 // call drain immediately if there are no tasks
1023 return async.setImmediate(function() {
1027 _arrayEach(data, function(task) {
1031 callback: typeof callback === 'function' ? callback : noop
1034 q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);
1036 if (q.tasks.length === q.concurrency) {
1039 async.setImmediate(q.process);
1043 // Start with a normal queue
1044 var q = async.queue(worker, concurrency);
1046 // Override push to accept second parameter representing priority
1047 q.push = function (data, priority, callback) {
1048 _insert(q, data, priority, callback);
1051 // Remove unshift function
1057 async.cargo = function (worker, payload) {
1058 return _queue(worker, 1, payload);
1061 function _console_fn(name) {
1062 return _restParam(function (fn, args) {
1063 fn.apply(null, args.concat([_restParam(function (err, args) {
1064 if (typeof console === 'object') {
1066 if (console.error) {
1070 else if (console[name]) {
1071 _arrayEach(args, function (x) {
1079 async.log = _console_fn('log');
1080 async.dir = _console_fn('dir');
1081 /*async.info = _console_fn('info');
1082 async.warn = _console_fn('warn');
1083 async.error = _console_fn('error');*/
1085 async.memoize = function (fn, hasher) {
1088 var has = Object.prototype.hasOwnProperty;
1089 hasher = hasher || identity;
1090 var memoized = _restParam(function memoized(args) {
1091 var callback = args.pop();
1092 var key = hasher.apply(null, args);
1093 if (has.call(memo, key)) {
1094 async.setImmediate(function () {
1095 callback.apply(null, memo[key]);
1098 else if (has.call(queues, key)) {
1099 queues[key].push(callback);
1102 queues[key] = [callback];
1103 fn.apply(null, args.concat([_restParam(function (args) {
1105 var q = queues[key];
1107 for (var i = 0, l = q.length; i < l; i++) {
1108 q[i].apply(null, args);
1113 memoized.memo = memo;
1114 memoized.unmemoized = fn;
1118 async.unmemoize = function (fn) {
1119 return function () {
1120 return (fn.unmemoized || fn).apply(null, arguments);
1124 function _times(mapper) {
1125 return function (count, iterator, callback) {
1126 mapper(_range(count), iterator, callback);
1130 async.times = _times(async.map);
1131 async.timesSeries = _times(async.mapSeries);
1132 async.timesLimit = function (count, limit, iterator, callback) {
1133 return async.mapLimit(_range(count), limit, iterator, callback);
1136 async.seq = function (/* functions... */) {
1137 var fns = arguments;
1138 return _restParam(function (args) {
1141 var callback = args[args.length - 1];
1142 if (typeof callback == 'function') {
1148 async.reduce(fns, args, function (newargs, fn, cb) {
1149 fn.apply(that, newargs.concat([_restParam(function (err, nextargs) {
1153 function (err, results) {
1154 callback.apply(that, [err].concat(results));
1159 async.compose = function (/* functions... */) {
1160 return async.seq.apply(null, Array.prototype.reverse.call(arguments));
1164 function _applyEach(eachfn) {
1165 return _restParam(function(fns, args) {
1166 var go = _restParam(function(args) {
1168 var callback = args.pop();
1169 return eachfn(fns, function (fn, _, cb) {
1170 fn.apply(that, args.concat([cb]));
1175 return go.apply(this, args);
1183 async.applyEach = _applyEach(async.eachOf);
1184 async.applyEachSeries = _applyEach(async.eachOfSeries);
1187 async.forever = function (fn, callback) {
1188 var done = only_once(callback || noop);
1189 var task = ensureAsync(fn);
1190 function next(err) {
1199 function ensureAsync(fn) {
1200 return _restParam(function (args) {
1201 var callback = args.pop();
1202 args.push(function () {
1203 var innerArgs = arguments;
1205 async.setImmediate(function () {
1206 callback.apply(null, innerArgs);
1209 callback.apply(null, innerArgs);
1213 fn.apply(this, args);
1218 async.ensureAsync = ensureAsync;
1220 async.constant = _restParam(function(values) {
1221 var args = [null].concat(values);
1222 return function (callback) {
1223 return callback.apply(this, args);
1228 async.asyncify = function asyncify(func) {
1229 return _restParam(function (args) {
1230 var callback = args.pop();
1233 result = func.apply(this, args);
1237 // if result is Promise object
1238 if (_isObject(result) && typeof result.then === "function") {
1239 result.then(function(value) {
1240 callback(null, value);
1241 })["catch"](function(err) {
1242 callback(err.message ? err : new Error(err));
1245 callback(null, result);
1251 if (typeof module === 'object' && module.exports) {
1252 module.exports = async;
1255 else if (typeof define === 'function' && define.amd) {
1256 define([], function () {
1260 // included directly via <script> tag