Add auto-reconnecting connection class wrapper
parent
50744ac6f8
commit
f37c5d33fa
|
@ -1,6 +1,6 @@
|
||||||
// Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8
|
// Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8
|
||||||
// (c) Виталий Филиппов, 2019
|
// (c) Виталий Филиппов, 2019
|
||||||
// Версия 2019-05-14
|
// Версия 2019-06-05
|
||||||
|
|
||||||
// В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи,
|
// В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи,
|
||||||
// благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ]
|
// благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ]
|
||||||
|
@ -299,14 +299,15 @@ function split_using(tables)
|
||||||
return { what, using: Object.keys(tables).length > 0 ? tables : null, moreWhere };
|
return { what, using: Object.keys(tables).length > 0 ? tables : null, moreWhere };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Превратить bind пераметры, выраженные как ?, в вид $n (как в node-postgres)
|
||||||
function _positional(sql)
|
function _positional(sql)
|
||||||
{
|
{
|
||||||
let i = 0;
|
let i = 0;
|
||||||
sql = sql.replace(/'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g, (m, m1) => (m1 ? '$'+(++i) : m));
|
sql = sql.replace(/'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g, (m, m1) => (m1 ? '$'+(++i) : m));
|
||||||
console.log('> '+sql);
|
|
||||||
return sql;
|
return sql;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Встроить все bind пераметры, выраженные как ?, в строку
|
||||||
function _inline(sql, bind)
|
function _inline(sql, bind)
|
||||||
{
|
{
|
||||||
if (!pg_escape)
|
if (!pg_escape)
|
||||||
|
@ -314,15 +315,30 @@ function _inline(sql, bind)
|
||||||
pg_escape = require('pg-escape');
|
pg_escape = require('pg-escape');
|
||||||
}
|
}
|
||||||
let i = 0;
|
let i = 0;
|
||||||
sql = sql.replace(/'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g, (m, m1) => (m1 ? '\''+pg_escape.string(bind[i++])+'\'' : m));
|
sql = sql.replace(
|
||||||
|
/'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g,
|
||||||
|
(m, m1) =>
|
||||||
|
{
|
||||||
|
if (!m1)
|
||||||
|
return m;
|
||||||
|
let v = bind[i++];
|
||||||
|
if (typeof v == 'object')
|
||||||
|
v = JSON.stringify(v);
|
||||||
|
else if (typeof v == 'number')
|
||||||
|
return v;
|
||||||
|
else if (v == null)
|
||||||
|
return 'null';
|
||||||
|
return '\''+pg_escape.string(v)+'\'';
|
||||||
|
}
|
||||||
|
);
|
||||||
return sql;
|
return sql;
|
||||||
}
|
}
|
||||||
|
|
||||||
// dbh = node-postgres.Client
|
// dbh: Connection
|
||||||
async function select(dbh, tables, fields, where, options, format)
|
async function select(dbh, tables, fields, where, options, format)
|
||||||
{
|
{
|
||||||
let { sql, bind } = select_builder(tables, fields, where, options);
|
let { sql, bind } = select_builder(tables, fields, where, options);
|
||||||
let data = await dbh.query(_positional(sql), bind);
|
let data = await dbh.query(sql, bind);
|
||||||
if ((format & MS_LIST) || (format & MS_COL))
|
if ((format & MS_LIST) || (format & MS_COL))
|
||||||
data = data.rows.map(r => Object.values(r));
|
data = data.rows.map(r => Object.values(r));
|
||||||
else
|
else
|
||||||
|
@ -344,21 +360,58 @@ async function insert(dbh, table, rows, options)
|
||||||
{
|
{
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
if (!pg_escape)
|
||||||
|
{
|
||||||
|
pg_escape = require('pg-escape');
|
||||||
|
}
|
||||||
const keys = Object.keys(rows[0]);
|
const keys = Object.keys(rows[0]);
|
||||||
let sql = 'insert into '+table+' ('+keys.join(', ')+') values ';
|
let sql = 'insert into '+table+' ('+keys.join(', ')+') values ';
|
||||||
const bind = [];
|
|
||||||
let i = 0;
|
let i = 0;
|
||||||
for (const row of rows)
|
for (const row of rows)
|
||||||
{
|
{
|
||||||
sql += (i > 0 ? ', (' : '(') + keys.map(() => '$'+(++i)).join(', ')+')';
|
let j = 0;
|
||||||
bind.push.apply(bind, keys.map(k => row[k]));
|
sql += (i > 0 ? ', (' : '(');
|
||||||
|
for (let k of keys)
|
||||||
|
{
|
||||||
|
if (j > 0)
|
||||||
|
sql += ', ';
|
||||||
|
if (row[k] == null)
|
||||||
|
sql += 'default';
|
||||||
|
else if (typeof row[k] == 'object')
|
||||||
|
sql += '\''+pg_escape(JSON.stringify(row[k]))+'\'';
|
||||||
|
else
|
||||||
|
sql += '\''+pg_escape(''+row[k])+'\'';
|
||||||
|
j++;
|
||||||
}
|
}
|
||||||
if (options && options.returning)
|
sql += ')';
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
if (options)
|
||||||
|
{
|
||||||
|
if (options.upsert)
|
||||||
|
{
|
||||||
|
sql += ' on conflict '+
|
||||||
|
(options.upsert instanceof Array
|
||||||
|
? '('+options.upsert.join(', ')+')'
|
||||||
|
: (typeof options.upsert == 'string' ? options.upsert : '(id)'))+
|
||||||
|
' do update set '+
|
||||||
|
keys.map(k => `${k} = excluded.${k}`).join(', ');
|
||||||
|
}
|
||||||
|
else if (options.ignore)
|
||||||
|
{
|
||||||
|
sql += ' on conflict '+
|
||||||
|
(options.ignore instanceof Array
|
||||||
|
? '('+options.ignore.join(', ')+')'
|
||||||
|
: (typeof options.ignore == 'string' ? options.ignore : '(id)'))+
|
||||||
|
' do nothing';
|
||||||
|
}
|
||||||
|
if (options.returning)
|
||||||
{
|
{
|
||||||
sql += ' returning '+options.returning;
|
sql += ' returning '+options.returning;
|
||||||
return (await dbh.query(sql, bind)).rows;
|
return (await dbh.query(sql)).rows;
|
||||||
}
|
}
|
||||||
return await dbh.query(sql, bind);
|
}
|
||||||
|
return await dbh.query(sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function _delete(dbh, table, where, options)
|
async function _delete(dbh, table, where, options)
|
||||||
|
@ -376,9 +429,9 @@ async function _delete(dbh, table, where, options)
|
||||||
if (options && options.returning)
|
if (options && options.returning)
|
||||||
{
|
{
|
||||||
sql += ' returning '+options.returning;
|
sql += ' returning '+options.returning;
|
||||||
return (await dbh.query(_positional(sql), bind)).rows;
|
return (await dbh.query(sql, bind)).rows;
|
||||||
}
|
}
|
||||||
return await dbh.query(_positional(sql), bind);
|
return await dbh.query(sql, bind);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function update(dbh, table, set, where, options)
|
async function update(dbh, table, set, where, options)
|
||||||
|
@ -403,9 +456,9 @@ async function update(dbh, table, set, where, options)
|
||||||
if (options && options.returning)
|
if (options && options.returning)
|
||||||
{
|
{
|
||||||
sql += ' returning '+options.returning;
|
sql += ' returning '+options.returning;
|
||||||
return (await dbh.query(_positional(sql), bind)).rows;
|
return (await dbh.query(sql, bind)).rows;
|
||||||
}
|
}
|
||||||
return await dbh.query(_positional(sql), bind);
|
return await dbh.query(sql, bind);
|
||||||
}
|
}
|
||||||
|
|
||||||
function values(rows)
|
function values(rows)
|
||||||
|
@ -441,6 +494,88 @@ class Text
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Обёртка PostgreSQL-подключения
|
||||||
|
// Автоматически переподключается при отвале
|
||||||
|
class Connection
|
||||||
|
{
|
||||||
|
constructor(config, init_callback)
|
||||||
|
{
|
||||||
|
this.config = config;
|
||||||
|
this.init_callback = init_callback;
|
||||||
|
this.onerror = (e) =>
|
||||||
|
{
|
||||||
|
console.warn(e);
|
||||||
|
console.warn('Database connection dropped. Reconnecting');
|
||||||
|
this.dbh = null;
|
||||||
|
this.connect();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async connect()
|
||||||
|
{
|
||||||
|
if (this.dbh)
|
||||||
|
return this.dbh;
|
||||||
|
let retry = this.config.retry || 30;
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
this.dbh = new pg.Client(this.config);
|
||||||
|
await this.dbh.connect();
|
||||||
|
if (this.init_callback)
|
||||||
|
{
|
||||||
|
this.init_callback();
|
||||||
|
}
|
||||||
|
this.dbh.on('error', this.onerror);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
catch (e)
|
||||||
|
{
|
||||||
|
console.warn(e);
|
||||||
|
console.warn('Trying to connect again in '+retry+' seconds');
|
||||||
|
await new Promise((r, j) => setTimeout(r, retry*1000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return this.dbh;
|
||||||
|
}
|
||||||
|
|
||||||
|
async query(sql, bind)
|
||||||
|
{
|
||||||
|
if (!this.dbh)
|
||||||
|
await this.connect();
|
||||||
|
console.log('> '+(bind && bind.length ? _inline(sql, bind) : sql));
|
||||||
|
return await this.dbh.query(_positional(sql), bind);
|
||||||
|
}
|
||||||
|
|
||||||
|
async select(tables, fields, where, options, format)
|
||||||
|
{
|
||||||
|
if (!this.dbh)
|
||||||
|
await this.connect();
|
||||||
|
return await select(this, tables, fields, where, options, format);
|
||||||
|
}
|
||||||
|
|
||||||
|
async insert(table, rows, options)
|
||||||
|
{
|
||||||
|
if (!this.dbh)
|
||||||
|
await this.connect();
|
||||||
|
return await insert(this, table, rows, options);
|
||||||
|
}
|
||||||
|
|
||||||
|
async update(table, set, where, options)
|
||||||
|
{
|
||||||
|
if (!this.dbh)
|
||||||
|
await this.connect();
|
||||||
|
return await update(this, table, set, where, options);
|
||||||
|
}
|
||||||
|
|
||||||
|
async delete(table, where, options)
|
||||||
|
{
|
||||||
|
if (!this.dbh)
|
||||||
|
await this.connect();
|
||||||
|
return await _delete(this, table, where, options);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
select_builder,
|
select_builder,
|
||||||
where_builder,
|
where_builder,
|
||||||
|
@ -452,6 +587,7 @@ module.exports = {
|
||||||
update,
|
update,
|
||||||
values,
|
values,
|
||||||
Text,
|
Text,
|
||||||
|
Connection,
|
||||||
MS_HASH,
|
MS_HASH,
|
||||||
MS_LIST,
|
MS_LIST,
|
||||||
MS_ROW,
|
MS_ROW,
|
||||||
|
|
Loading…
Reference in New Issue