gen-thread/index.js

236 lines
6.4 KiB
JavaScript

// Yet Another Hack to fight node.js callback hell: generator-based coroutines
// Distinctive features:
// - simple to use:
// - does not require promisification of existing callback-based code
// - does not require async/await
// - safely checks control flow and reports exceptions
// - also supports promises (like await)
module.exports.run = runThread;
module.exports.runParallel = runParallel;
var current;
module.exports.unsafe = function()
{
return current;
};
module.exports.p = function(p)
{
p._stack = new Error().stack;
return p;
};
module.exports.callback = module.exports.cb = function()
{
return threadCallback.call(current);
};
module.exports.errorfirst = module.exports.ef = function()
{
return errorFirst.call(current);
};
module.exports.throttle = function(count)
{
return throttleThread.call(current, count);
};
function runThread(generator, onsuccess, onerror)
{
var thread = function()
{
thread._current = null;
// pass parameters as yield result
callGen(thread, 'next', Array.prototype.slice.call(arguments, 0));
};
thread._gen = generator.next ? generator : generator();
thread._finishThrottleQueue = finishThrottleQueue.bind(thread);
thread._onsuccess = onsuccess;
thread._onerror = onerror;
thread._running = false;
callGen(thread, 'next', []);
return thread;
}
function getStack(fn)
{
return fn._stack.replace(/Error[\s\S]*at.*(exports\.(cb|ef|errorfirst|p\s)|Function\.(errorFirst|threadCallback)).*/, '');
}
function callGen(thread, method, arg)
{
if (thread._running)
{
// callback called while generator is already running
thread._result = [ method, arg ];
return;
}
var v;
var prev = current;
thread._running = true;
current = thread;
try
{
while (1)
{
v = thread._gen[method](arg);
if (!v.done && thread._result)
{
method = thread._result[0];
arg = thread._result[1];
thread._result = null;
}
else
break;
}
}
catch (e)
{
v = { error: e };
}
thread._running = false;
current = prev;
if (v.done || v.error)
{
// generator finished
thread._done = true;
process.nextTick(thread._finishThrottleQueue);
}
if (v.error)
{
if (thread._onerror)
thread._onerror(v.error);
else
throw v.error;
}
else if (v.done && thread._onsuccess)
thread._onsuccess(v.value);
else if (typeof v.value == 'object' && v.value.then && !thread._current)
{
// check if v.value is a Promise
// (but not if an explicit .then(gen.cb()) callback is already set by caller)
v.value.then(function(value)
{
// use process.nextTick so Promise does not intercept our exceptions
process.nextTick(function() { callGen(thread, 'next', value); });
}, function(e)
{
if (v.value._stack)
{
// report stack trace captured with `yield gen.p(promise)`
var m = /^([\s\S]*?)((\n\s*at.*)*)$/.exec(e.stack);
if (m)
e.stack = m[1]+getStack(v.value)+'\n-- async error thrown at:'+m[2];
}
process.nextTick(function() { callGen(thread, 'throw', e); });
});
}
}
function threadCallback()
{
var thread = this;
var fn = function()
{
if (thread._current != fn)
{
throw new Error('Broken control flow! Callback'+
getStack(thread._current)+
'\nmust be called to resume thread, but this one is called instead:'+
getStack(fn)+'\n--'
);
}
thread._current = null;
return callGen(thread, 'next', Array.prototype.slice.call(arguments, 0));
};
fn._stack = new Error().stack;
thread._current = fn;
return fn;
}
function errorFirst()
{
var thread = this;
var fn = function()
{
if (thread._current != fn)
{
throw new Error('Broken control flow! Callback'+
getStack(thread._current)+
'\nmust be called to resume thread, but this one is called instead:'+
getStack(fn)+'\n--'
);
}
thread._current = null;
if (arguments[0])
{
var e = arguments[0];
var m = /^([\s\S]*?)((\n\s*at.*)*)$/.exec(e.stack);
if (m)
e.stack = m[1]+getStack(fn)+'\n-- async error thrown at:'+m[2];
return callGen(thread, 'throw', e);
}
return callGen(thread, 'next', Array.prototype.slice.call(arguments, 1));
};
fn._stack = new Error().stack;
thread._current = fn;
return fn;
}
function throttleThread(count)
{
if (!this.throttleData)
this.throttleData = this._gen.__proto__._genThreadThrottle = this._gen.__proto__._genThreadThrottle || { queue: [], pending: [] };
this._finishThrottleQueue();
if (this.throttleData.queue.length < count)
{
this.throttleData.queue.push(this);
process.nextTick(threadCallback.call(this));
}
else
this.throttleData.pending.push([ this, threadCallback.call(this), count ]);
}
function finishThrottleQueue()
{
if (!this.throttleData)
return;
for (var i = 0; i < this.throttleData.queue.length; i++)
{
if (this.throttleData.queue[i]._done)
{
this.throttleData.queue.splice(i, 1);
i--;
}
}
while (this.throttleData.pending.length > 0 && this.throttleData.queue.length < this.throttleData.pending[0][2])
{
var t = this.throttleData.pending.shift();
this.throttleData.queue.push(t[0]);
process.nextTick(t[1]);
}
}
function runParallel(threads, done)
{
var results = [];
var errors = [];
var resultCount = 0;
var allDone = function(i, result, error)
{
if (!results[i] && !errors[i])
{
if (error)
errors[i] = error;
else
results[i] = result;
resultCount++;
if (resultCount == threads.length)
done(results, errors);
}
};
threads.map((t, i) => runThread(t, function(result) { allDone(i, result); }, function(error) { allDone(i, null, error); }));
}