likeopera-backend/ImapManager.js

262 lines
7.5 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

const Imap = require('imap');
class ImapManager
{
constructor()
{
this.accounts = {};
this.connections = {};
this.busy = {};
this.selected = {};
this.queue = {};
this.onIdle = {};
this.onStopIdle = {};
}
setServer(accountId, settings)
{
this.accounts[accountId] = settings;
}
async getConnection(accountId, boxName, connKey, onIdle, onStopIdle)
{
connKey = accountId+(connKey||'');
if (this.connections[connKey])
{
let stoppingIdle = this.queue[connKey].length == 0;
if (this.busy[connKey])
{
// wait for the queue to finish
await new Promise((r, e) =>
{
this.queue[connKey].push(r);
});
}
if (stoppingIdle && this.onStopIdle[connKey])
{
// run "stop idle" callback
this.onStopIdle[connKey](accountId, this.connections[connKey]);
}
if (boxName && this.selected[connKey] != boxName)
{
// select different box
await new Promise((r, e) => this.connections[connKey].openBox(boxName, false, r));
this.selected[connKey] = boxName;
}
this.busy[connKey] = true;
return this.connections[connKey];
}
let srv = new Imap(this.accounts[accountId]);
// FIXME handle connection errors
await new Promise((r, e) =>
{
srv.once('ready', r);
srv.connect();
});
await new Promise((r, e) => srv._enqueue('ENABLE QRESYNC', r));
// Monkey-patch node-imap to support VANISHED responses
let oldUT = srv._parser._resUntagged;
srv._parser._resUntagged = function()
{
let m;
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
{
srv.emit('vanish', m[2].split(/,/).map(s => s.split(':')));
}
oldUT.apply(this);
};
srv.on('close', () =>
{
delete this.connections[connKey];
if (this.srv == srv)
{
this.srv = null;
}
});
if (boxName)
{
await new Promise((r, e) => srv.openBox(boxName, false, r));
this.selected[connKey] = boxName;
}
this.connections[connKey] = srv;
this.busy[connKey] = true;
this.queue[connKey] = [];
this.onIdle[connKey] = onIdle;
this.onStopIdle[connKey] = onStopIdle;
return srv;
}
releaseConnection(accountId, connKey, allowClose)
{
connKey = accountId + (connKey||'');
this.busy[connKey] = false;
if (this.queue[connKey].length)
{
(this.queue[connKey].shift())();
}
else if (allowClose)
{
this.connections[connKey].end();
delete this.connections[connKey];
delete this.busy[connKey];
delete this.queue[connKey];
delete this.selected[connKey];
}
else
{
if (this.onIdle[connKey])
this.onIdle[connKey](accountId, this.connections[connKey]);
}
}
async runFetch(srv, what, params, processor, args)
{
let f = srv.fetch(what, params);
let fetchState = {
...(args||{}),
paused: false,
synced: 0,
parsing: 0,
pending: [],
results: [],
srv: srv,
end: false,
};
let wait;
await new Promise((resolve, reject) =>
{
let end = () =>
{
if (!fetchState.pending.length)
{
resolve();
}
else
{
let m = fetchState.pending;
fetchState.pending = [];
processor(m, fetchState)
.then(results =>
{
if (results)
{
fetchState.results = fetchState.results.concat(results);
}
resolve();
})
.catch(reject);
}
};
f.on('message', (msg, seqnum) =>
{
this.onMessage(fetchState, msg, seqnum, processor)
.then(() =>
{
if (fetchState.end && !fetchState.parsing)
{
end();
}
})
.catch(reject);
});
f.once('end', () =>
{
fetchState.end = true;
if (!fetchState.parsing)
{
end();
}
});
});
return fetchState.results;
}
async onMessage(fetchState, msg, seqnum, processor)
{
let msgrow, attrs;
fetchState.parsing++;
try
{
[ msgrow, attrs ] = await this.parseMessage(msg, seqnum);
}
catch (e)
{
fetchState.parsing--;
throw e;
}
fetchState.parsing--;
// Workaround memory leak in node-imap
// TODO: send pull request
if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache)
{
delete fetchState.srv._curReq.fetchCache[seqnum];
}
fetchState.pending.push([ msgrow, attrs ]);
if (!fetchState.paused && fetchState.pending.length >= 100 && !fetchState.nopause)
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
fetchState.srv._parser._ignoreReadable = true;
fetchState.paused = true;
}
if (fetchState.pending.length >= 100)
{
let m = fetchState.pending;
fetchState.pending = [];
let result = await processor(m, fetchState);
if (result)
{
fetchState.results = fetchState.results.concat(result);
}
if (fetchState.paused)
{
fetchState.paused = false;
fetchState.srv._parser._ignoreReadable = false;
process.nextTick(fetchState.srv._parser._cbReadable);
}
}
}
async parseMessage(msg, seqnum)
{
let msgrow = {};
let attrs;
msg.on('body', function(stream, info)
{
let buffer;
stream.on('data', function(chunk)
{
if (!buffer)
buffer = chunk;
else
buffer = Buffer.concat([ buffer, chunk ]);
});
stream.once('end', function()
{
msgrow.headers = buffer;
});
});
msg.once('attributes', function(a)
{
attrs = a;
});
await new Promise((r, e) => msg.once('end', r));
msgrow.uid = attrs.uid;
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')).sort();
return [ msgrow, attrs ];
}
}
module.exports = ImapManager;