891 lines
25 KiB
JavaScript
891 lines
25 KiB
JavaScript
// Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8
|
||
// (c) Виталий Филиппов, 2019-2021
|
||
// Версия 2021-09-16
|
||
|
||
// В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи,
|
||
// благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ]
|
||
// Здесь так нельзя, поэтому этот синтаксис мы заменяем на { 't1.a=t2.a': [], 't2.b': [ 1, 2, 3 ] }
|
||
// Или на [ 't1.a=t2.a', [ 't2.b', [ 1, 2, 3 ] ] ] - можно писать и так, и так
|
||
|
||
const pg = require('pg');
|
||
const pg_escape = require('pg-escape');
|
||
|
||
// Сраный node-postgres конвертирует даты в Date и портит таймзону
|
||
|
||
const DATATYPE_DATE = 1082;
|
||
const DATATYPE_TIME = 1083;
|
||
const DATATYPE_TIMESTAMP = 1114;
|
||
const DATATYPE_TIMESTAMPTZ = 1184;
|
||
const DATATYPE_TIMETZ = 1266;
|
||
|
||
pg.types.setTypeParser(DATATYPE_DATE, pgToString);
|
||
pg.types.setTypeParser(DATATYPE_TIME, pgToString);
|
||
pg.types.setTypeParser(DATATYPE_TIMESTAMP, pgToString);
|
||
pg.types.setTypeParser(DATATYPE_TIMESTAMPTZ, pgToString);
|
||
pg.types.setTypeParser(DATATYPE_TIMETZ, pgToString);
|
||
|
||
function pgToString(val)
|
||
{
|
||
return val === null ? null : val;
|
||
}
|
||
|
||
const MS_HASH = 0;
|
||
const MS_LIST = 1;
|
||
const MS_ROW = 2;
|
||
const MS_COL = 4;
|
||
const MS_VALUE = 6;
|
||
|
||
function select_builder(tables, fields, where, options)
|
||
{
|
||
options = options||{};
|
||
let sql = 'SELECT ', bind = [];
|
||
if (options['DISTINCT ON'] || options.distinct_on)
|
||
{
|
||
let group = options['DISTINCT ON'] || options.distinct_on;
|
||
group = group instanceof Array ? group : [ group ];
|
||
sql += 'DISTINCT ON ('+group.join(', ')+') ';
|
||
}
|
||
if (fields instanceof Array)
|
||
{
|
||
sql += fields.join(', ');
|
||
}
|
||
else if (typeof fields == 'string')
|
||
{
|
||
sql += fields;
|
||
}
|
||
else if (typeof fields == 'object')
|
||
{
|
||
sql += Object.keys(fields).map(k => fields[k]+' AS '+k).join(', ');
|
||
}
|
||
else
|
||
{
|
||
throw new Error('fields = '+fields+' is invalid');
|
||
}
|
||
sql += ' FROM ';
|
||
const t = tables_builder(tables);
|
||
sql += t.sql;
|
||
t.bind.forEach(v => bind.push(v));
|
||
where = where_builder(where);
|
||
sql += ' WHERE '+(where.sql || '1=1');
|
||
where.bind.forEach(v => bind.push(v));
|
||
if (t.moreWhere)
|
||
{
|
||
sql += ' AND '+t.moreWhere.sql;
|
||
t.moreWhere.bind.forEach(v => bind.push(v));
|
||
}
|
||
if (options['GROUP BY'] || options.group_by)
|
||
{
|
||
let group = options['GROUP BY'] || options.group_by;
|
||
group = group instanceof Array ? group : [ group ];
|
||
sql += ' GROUP BY '+group.join(', ');
|
||
}
|
||
if (options['ORDER BY'] || options.order_by)
|
||
{
|
||
let order = options['ORDER BY'] || options.order_by;
|
||
order = order instanceof Array ? order : [ order ];
|
||
sql += ' ORDER BY '+order.join(', ');
|
||
}
|
||
if (options.LIMIT || options.limit)
|
||
{
|
||
sql += ' LIMIT '+((options.LIMIT || options.limit) | 0);
|
||
}
|
||
if (options.OFFSET || options.offset)
|
||
{
|
||
sql += ' OFFSET '+((options.OFFSET || options.offset) | 0);
|
||
}
|
||
if (options['FOR UPDATE'] || options.for_update)
|
||
{
|
||
sql += ' FOR UPDATE';
|
||
}
|
||
else if (options['LOCK IN SHARE MODE'] || options.lock_share)
|
||
{
|
||
sql += ' LOCK IN SHARE MODE';
|
||
}
|
||
return new Text(sql, bind);
|
||
}
|
||
|
||
function tables_builder(tables)
|
||
{
|
||
let sql = '', bind = [];
|
||
let moreWhere = null;
|
||
let first = true;
|
||
if (typeof tables == 'string')
|
||
{
|
||
sql = tables;
|
||
return { sql, bind, moreWhere };
|
||
}
|
||
else if (tables instanceof Text)
|
||
{
|
||
return { sql: tables.sql, bind: tables.bind, moreWhere };
|
||
}
|
||
for (const k in tables)
|
||
{
|
||
let jointype = 'INNER', table = tables[k], conds = null;
|
||
if (table instanceof Array)
|
||
{
|
||
[ jointype, table, conds ] = table;
|
||
}
|
||
if (!first)
|
||
{
|
||
sql += ' ' + jointype.toUpperCase() + ' JOIN ';
|
||
}
|
||
let more_on;
|
||
if (table instanceof Pg_Values)
|
||
{
|
||
sql += '(VALUES ';
|
||
let i = 0;
|
||
for (const row of table.rows)
|
||
{
|
||
sql += (i > 0 ? ', (' : '(') + table.keys.map(() => '?').join(', ')+')';
|
||
table.keys.forEach(k => bind.push(row[k]));
|
||
i++;
|
||
}
|
||
sql += ') AS '+k+'('+table.keys.join(', ')+')';
|
||
}
|
||
else if (table instanceof Text)
|
||
{
|
||
sql += '(' + table.sql + ') ' + k;
|
||
table.bind.forEach(v => bind.push(v));
|
||
}
|
||
else if (typeof table == 'object')
|
||
{
|
||
// Nested join, `k` alias is ignored
|
||
let subjoin = tables_builder(table);
|
||
if (subjoin.moreWhere)
|
||
{
|
||
more_on = subjoin.moreWhere;
|
||
}
|
||
if (Object.keys(table).length > 1)
|
||
{
|
||
sql += "("+subjoin.sql+")";
|
||
}
|
||
else
|
||
{
|
||
sql += subjoin.sql;
|
||
}
|
||
subjoin.bind.forEach(v => bind.push(v));
|
||
}
|
||
else
|
||
{
|
||
sql += table + ' ' + k;
|
||
}
|
||
conds = where_builder(conds);
|
||
if (more_on)
|
||
{
|
||
if (!conds.sql)
|
||
conds = more_on;
|
||
else
|
||
{
|
||
conds.sql += ' AND ' + more_on.sql;
|
||
more_on.bind.forEach(v => conds.bind.push(v));
|
||
}
|
||
}
|
||
if (!first)
|
||
{
|
||
sql += ' ON ' + (conds.sql || '1=1');
|
||
conds.bind.forEach(v => bind.push(v));
|
||
}
|
||
else
|
||
{
|
||
// Бывает удобно указывать WHERE как условие "JOIN" первой таблицы
|
||
moreWhere = conds.sql ? conds : null;
|
||
first = false;
|
||
}
|
||
}
|
||
return { sql, bind, moreWhere };
|
||
}
|
||
|
||
function where_in(key, values, result, bind, for_where)
|
||
{
|
||
if (values[0] instanceof Array)
|
||
{
|
||
// only for WHERE: [ '(a, b)', [ [ 1, 2 ], [ 3, 4 ], ... ] ]
|
||
if (!for_where)
|
||
{
|
||
throw new Error('IN syntax can only be used inside WHERE');
|
||
}
|
||
result.push(key + ' in (' + values.map(vi => '('+vi.map(() => '?').join(', ')+')') + ')');
|
||
values.forEach(vi => vi.forEach(v => bind.push(v)));
|
||
}
|
||
else if (!for_where)
|
||
{
|
||
if (values.length > 1)
|
||
{
|
||
throw new Error('IN syntax can only be used inside WHERE');
|
||
}
|
||
result.push(key+' = ?');
|
||
bind.push(values[0]);
|
||
}
|
||
else
|
||
{
|
||
// [ field, [ values ] ]
|
||
let non_null = values.filter(vi => vi != null);
|
||
let has_null = values.length > non_null.length;
|
||
if (non_null.length > 0)
|
||
{
|
||
result.push(
|
||
key+' in (' + non_null.map(() => '?').join(', ') + ')' +
|
||
(has_null ? ' or '+key+' is null' : '')
|
||
);
|
||
non_null.forEach(v => bind.push(v));
|
||
}
|
||
else if (has_null)
|
||
{
|
||
result.push(key+' is null');
|
||
}
|
||
}
|
||
}
|
||
|
||
// fields: one of:
|
||
// - string: 'a=b AND c=d'
|
||
// - array: [ 'a=b', [ 'a=? or b=?', 1, 2 ], [ 'a', [ 1, 2 ] ] ]
|
||
// - object: { a: 1, b: [ 1, 2 ], 'a = b': [], '(a, b)': [ [ 1, 2 ], [ 2, 3 ] ], 'c=? or d=?': [ 2, 3 ] }
|
||
// - key does not contain '?', value is a scalar or non-empty array => (key IN ...)
|
||
// - key does not contain '?', value is an empty array => just (key)
|
||
// - key contains '?', value is a scalar or non-empty array => (key) with bind params (...value)
|
||
// - key is numeric, then value is treated as in array
|
||
function where_or_set(fields, for_where)
|
||
{
|
||
if (typeof fields == 'string')
|
||
{
|
||
return { sql: fields, bind: [] };
|
||
}
|
||
const where = [], bind = [];
|
||
for (let k in fields)
|
||
{
|
||
let v = fields[k];
|
||
v = v instanceof Array ? v : [ v ];
|
||
if (/^\d+$/.exec(k))
|
||
{
|
||
if (v.length == 0 || typeof v[0] !== 'string')
|
||
{
|
||
// invalid value
|
||
continue;
|
||
}
|
||
else if (v.length == 1)
|
||
{
|
||
// [ text ] or just text
|
||
where.push(v[0]);
|
||
}
|
||
else if (v[0].indexOf('?') >= 0)
|
||
{
|
||
// [ text, bind1, bind2, ... ]
|
||
// FIXME: check bind variable count
|
||
where.push(v[0]);
|
||
for (let i = 1; i < v.length; i++)
|
||
bind.push(v[i]);
|
||
}
|
||
else
|
||
{
|
||
// [ field, [ ...values ] ]
|
||
if (v.length > 2)
|
||
{
|
||
throw new Error('Invalid condition: '+JSON.stringify(v));
|
||
}
|
||
v[1] = v[1] instanceof Array ? v[1] : [ v[1] ];
|
||
where_in(v[0], v[1], where, bind, for_where);
|
||
}
|
||
}
|
||
else
|
||
{
|
||
if (k.indexOf('?') >= 0 || v.length == 0)
|
||
{
|
||
// { expr: [ bind ] }
|
||
// FIXME: check bind variable count
|
||
where.push(k);
|
||
for (let i = 0; i < v.length; i++)
|
||
bind.push(v[i]);
|
||
}
|
||
else
|
||
{
|
||
where_in(k, v, where, bind, for_where);
|
||
}
|
||
}
|
||
}
|
||
if (!for_where)
|
||
{
|
||
// SET
|
||
return { sql: where.join(', '), bind };
|
||
}
|
||
// WHERE
|
||
return { sql: where.length ? '('+where.join(') and (')+')' : '', bind };
|
||
}
|
||
|
||
function where_builder(where)
|
||
{
|
||
return where_or_set(where, true);
|
||
}
|
||
|
||
/**
|
||
* Разбивает набор таблиц на основную обновляемую + набор дополнительных
|
||
*
|
||
* Идея в том, чтобы обрабатывать хотя бы 2 простые ситуации:
|
||
* UPDATE table1 INNER JOIN table2 ...
|
||
* UPDATE table1 LEFT JOIN table2 ...
|
||
*/
|
||
function split_using(tables)
|
||
{
|
||
if (typeof tables == 'string')
|
||
{
|
||
return { what: { sql: tables, bind: [] }, using: null, moreWhere: null };
|
||
}
|
||
let first = null;
|
||
let is_next_inner = true;
|
||
let i = 0;
|
||
for (let k in tables)
|
||
{
|
||
let t = tables[k];
|
||
if (i == 0)
|
||
{
|
||
if (t instanceof Array && typeof(t[1]) != 'string')
|
||
{
|
||
throw new Error('Can only update/delete from real tables, not sub-select, sub-join or VALUES');
|
||
}
|
||
first = k;
|
||
}
|
||
else if (i == 1)
|
||
{
|
||
is_next_inner = !(t instanceof Array) || t[0].toLowerCase() == 'inner';
|
||
}
|
||
else
|
||
{
|
||
break;
|
||
}
|
||
i++;
|
||
}
|
||
let what, moreWhere;
|
||
if (is_next_inner)
|
||
{
|
||
what = tables_builder({ [first]: tables[first] });
|
||
delete tables[first];
|
||
moreWhere = what.moreWhere;
|
||
what.moreWhere = null;
|
||
}
|
||
else
|
||
{
|
||
what = tables_builder({ ["_"+first]: tables[first] });
|
||
const cond = '_'+first+'.ctid='+(/^\d+$/.exec(first) ? tables[first] : first)+'.ctid';
|
||
moreWhere = what.moreWhere
|
||
? { sql: what.moreWhere.sql+' AND '+cond, bind: what.moreWhere.bind }
|
||
: { sql: cond, bind: [] };
|
||
what.moreWhere = null;
|
||
}
|
||
return { what, using: Object.keys(tables).length > 0 ? tables : null, moreWhere };
|
||
}
|
||
|
||
function quote(v)
|
||
{
|
||
if (v == null)
|
||
return 'null';
|
||
else if (v === true || v === false)
|
||
return ''+v;
|
||
else if (typeof v == 'object')
|
||
v = JSON.stringify(v);
|
||
else if (typeof v == 'number')
|
||
return v;
|
||
return pg_escape.literal(v);
|
||
}
|
||
|
||
// Встроить все bind пераметры, выраженные как ?, в строку.
|
||
// Почему? А потому, что в node-postgres, похоже, есть лимит числа bind переменных. После ~30000 он сваливается.
|
||
// FIXME: В postgresql есть оператор ?. Его надо научиться экранировать. Либо сразу строить запрос в строчку
|
||
// и не заниматься ерундой с подстановками.
|
||
function _inline(sql, bind)
|
||
{
|
||
let i = 0;
|
||
sql = sql.replace(
|
||
/'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g,
|
||
(m, m1) =>
|
||
{
|
||
if (!m1)
|
||
return m;
|
||
return quote(bind[i++]);
|
||
}
|
||
);
|
||
return sql;
|
||
}
|
||
|
||
// dbh: Connection or pg.Client
|
||
async function select(dbh, tables, fields, where, options, format)
|
||
{
|
||
let sql_text, calc_found_rows, found_rows;
|
||
if (arguments.length == 2 || arguments.length == 3)
|
||
{
|
||
sql_text = tables instanceof Text ? tables : new Text(tables, []);
|
||
format = fields;
|
||
}
|
||
else
|
||
{
|
||
if (options && options.calc_found_rows)
|
||
{
|
||
calc_found_rows = true;
|
||
fields = fields instanceof Array ? [ ...fields ] : [ fields ];
|
||
fields.push('COUNT(*) OVER () \"*\"');
|
||
}
|
||
sql_text = select_builder(tables, fields, where, options);
|
||
}
|
||
let data = await dbh.query(sql_text.sql, sql_text.bind);
|
||
if (calc_found_rows)
|
||
{
|
||
if (!data.rows.length)
|
||
found_rows = 0;
|
||
else
|
||
{
|
||
found_rows = data.rows[0]['*'];
|
||
data.rows.forEach(r => delete r['*']);
|
||
}
|
||
}
|
||
if (format & MS_LIST)
|
||
data = data.rows.map(r => Object.values(r));
|
||
else if (format & MS_COL)
|
||
data = data.rows.map(r => Object.values(r)[0]);
|
||
else
|
||
data = data.rows;
|
||
if (format & MS_ROW)
|
||
data = data[0];
|
||
return calc_found_rows ? [ found_rows, data ] : data;
|
||
}
|
||
|
||
async function insert(dbh, table, rows, options)
|
||
{
|
||
if (!(rows instanceof Array))
|
||
{
|
||
rows = [ rows ];
|
||
}
|
||
if (!rows.length)
|
||
{
|
||
return null;
|
||
}
|
||
const keys = Object.keys(rows[0]);
|
||
let sql = 'insert into '+table+' ('+keys.join(', ')+') values ';
|
||
let i = 0;
|
||
for (const row of rows)
|
||
{
|
||
let j = 0;
|
||
sql += (i > 0 ? ', (' : '(');
|
||
for (let k of keys)
|
||
{
|
||
if (j > 0)
|
||
sql += ', ';
|
||
if (row[k] == null)
|
||
sql += 'default';
|
||
else if (row[k] instanceof Text)
|
||
sql += row[k].toString();
|
||
else
|
||
sql += quote(row[k]);
|
||
j++;
|
||
}
|
||
sql += ')';
|
||
i++;
|
||
}
|
||
if (options)
|
||
{
|
||
if (options.upsert)
|
||
{
|
||
sql += ' on conflict '+
|
||
(options.upsert instanceof Array
|
||
? '('+options.upsert.join(', ')+')'
|
||
: (typeof options.upsert == 'string' ? options.upsert : '(id)'))+
|
||
' do update set '+
|
||
(options.upsert_fields || keys).map(k => `${k} = excluded.${k}`).join(', ');
|
||
}
|
||
else if (options.ignore)
|
||
{
|
||
sql += ' on conflict '+
|
||
(options.ignore instanceof Array
|
||
? '('+options.ignore.join(', ')+')'
|
||
: (typeof options.ignore == 'string' ? options.ignore : '(id)'))+
|
||
' do nothing';
|
||
}
|
||
if (options.returning)
|
||
{
|
||
sql += ' returning '+options.returning;
|
||
return (await dbh.query(sql)).rows;
|
||
}
|
||
}
|
||
return await dbh.query(sql);
|
||
}
|
||
|
||
async function _delete(dbh, table, where, options)
|
||
{
|
||
where = where_builder(where);
|
||
const split = split_using(table);
|
||
if (split.using)
|
||
{
|
||
split.using = tables_builder(split.using);
|
||
}
|
||
let sql = 'delete from '+split.what.sql+
|
||
(split.using ? ' using '+split.using.sql : '')+
|
||
' where '+(where.sql || '1=1')+(split.moreWhere ? ' and '+split.moreWhere.sql : '');
|
||
let bind = [ ...split.what.bind, ...where.bind, ...(split.moreWhere ? split.moreWhere.bind : []) ];
|
||
if (options && options.returning)
|
||
{
|
||
sql += ' returning '+options.returning;
|
||
return (await dbh.query(sql, bind)).rows;
|
||
}
|
||
return await dbh.query(sql, bind);
|
||
}
|
||
|
||
async function update(dbh, table, set, where, options)
|
||
{
|
||
set = where_or_set(set, false);
|
||
where = where_builder(where);
|
||
const split = split_using(table);
|
||
if (split.using)
|
||
{
|
||
split.using = tables_builder(split.using);
|
||
}
|
||
let sql = 'update '+split.what.sql+' set '+set.sql+
|
||
(split.using ? ' from '+split.using.sql : '')+
|
||
' where '+(where.sql || '1=1')+(split.moreWhere ? ' and '+split.moreWhere.sql : '');
|
||
let bind = [
|
||
...split.what.bind,
|
||
...set.bind,
|
||
...(split.using ? split.using.bind : []),
|
||
...where.bind,
|
||
...(split.moreWhere ? split.moreWhere.bind : [])
|
||
];
|
||
if (options && options.returning)
|
||
{
|
||
sql += ' returning '+options.returning;
|
||
return (await dbh.query(sql, bind)).rows;
|
||
}
|
||
return await dbh.query(sql, bind);
|
||
}
|
||
|
||
function values(rows)
|
||
{
|
||
return new Pg_Values(Object.keys(rows[0]), rows);
|
||
}
|
||
|
||
class Pg_Values
|
||
{
|
||
constructor(keys, rows)
|
||
{
|
||
this.keys = keys;
|
||
this.rows = rows;
|
||
}
|
||
}
|
||
|
||
class Text
|
||
{
|
||
constructor(sql, bind)
|
||
{
|
||
this.sql = sql;
|
||
this.bind = bind || [];
|
||
}
|
||
|
||
toString()
|
||
{
|
||
return _inline(this.sql, this.bind);
|
||
}
|
||
|
||
concat(text)
|
||
{
|
||
return new Text(this.sql+text.sql, [ ...this.bind, ...text.bind ]);
|
||
}
|
||
}
|
||
|
||
class ConnectionBase
|
||
{
|
||
async select(tables, fields, where, options, format)
|
||
{
|
||
return arguments.length <= 2
|
||
? await select(this, tables, fields)
|
||
: await select(this, tables, fields, where, options, format);
|
||
}
|
||
|
||
async insert(table, rows, options)
|
||
{
|
||
return await insert(this, table, rows, options);
|
||
}
|
||
|
||
async update(table, set, where, options)
|
||
{
|
||
return await update(this, table, set, where, options);
|
||
}
|
||
|
||
async delete(table, where, options)
|
||
{
|
||
return await _delete(this, table, where, options);
|
||
}
|
||
}
|
||
|
||
ConnectionBase.prototype.HASH = MS_HASH;
|
||
ConnectionBase.prototype.LIST = MS_LIST;
|
||
ConnectionBase.prototype.ROW = MS_ROW;
|
||
ConnectionBase.prototype.COL = MS_COL;
|
||
ConnectionBase.prototype.VALUE = MS_VALUE;
|
||
ConnectionBase.prototype.select_builder = select_builder;
|
||
|
||
// Обёртка PostgreSQL-подключения
|
||
// Автоматически переподключается при отвале
|
||
class Connection extends ConnectionBase
|
||
{
|
||
constructor(config)
|
||
{
|
||
super();
|
||
this.config = config;
|
||
this.init_connect = [];
|
||
this.in_transaction = null;
|
||
this.transaction_queue = [];
|
||
this.quote = quote;
|
||
}
|
||
|
||
handleError(dbh, e)
|
||
{
|
||
// Проверяем, что в нас не кидается ошибкой старое подключение к БД
|
||
if (dbh == this.dbh)
|
||
{
|
||
console.warn(e);
|
||
console.warn('Database connection dropped. Reconnecting');
|
||
this.dbh = null;
|
||
this.connection_lost = true;
|
||
this.connect();
|
||
}
|
||
}
|
||
|
||
getConnection()
|
||
{
|
||
return this.dbh;
|
||
}
|
||
|
||
async runWhenConnected(cb)
|
||
{
|
||
if (this.dbh)
|
||
{
|
||
await cb(this);
|
||
}
|
||
this.init_connect.push(cb);
|
||
}
|
||
|
||
dontRunWhenConnected(cb)
|
||
{
|
||
this.init_connect = this.init_connect.filter(c => c != cb);
|
||
}
|
||
|
||
async connect()
|
||
{
|
||
if (this.dbh)
|
||
{
|
||
return this.dbh;
|
||
}
|
||
let retry = this.config.retry || 30;
|
||
// eslint-disable-next-line no-constant-condition
|
||
while (true)
|
||
{
|
||
try
|
||
{
|
||
let dbh = this.dbh = new pg.Client(this.config);
|
||
await this.dbh.connect();
|
||
for (const cb of this.init_connect)
|
||
{
|
||
await cb(this);
|
||
}
|
||
dbh.on('error', e => this.handleError(dbh, e));
|
||
return this.dbh;
|
||
}
|
||
catch (e)
|
||
{
|
||
this.dbh = null;
|
||
console.warn(e);
|
||
console.warn('Trying to connect again in '+retry+' seconds');
|
||
await new Promise((r, j) => setTimeout(r, retry*1000));
|
||
if (this.dbh)
|
||
{
|
||
return this.dbh;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
async begin()
|
||
{
|
||
if (this.in_transaction)
|
||
{
|
||
// Если уже кто-то активен - ждём его
|
||
await new Promise((resolve, reject) => this.transaction_queue.push(resolve));
|
||
}
|
||
this.in_transaction = new Transaction(this);
|
||
await this._query('begin');
|
||
return this.in_transaction;
|
||
}
|
||
|
||
async query(sql, bind)
|
||
{
|
||
if (sql.length == 5 && sql.toLowerCase() == 'begin')
|
||
{
|
||
throw new Error('Do not use transactions in asynchronous code directly!');
|
||
}
|
||
if (this.in_transaction)
|
||
{
|
||
// Если уже кто-то активен - ждём его
|
||
await new Promise((resolve, reject) => this.transaction_queue.push(resolve));
|
||
}
|
||
const r = await this._query(sql, bind);
|
||
// Если есть ещё кто-то в очереди - пусть проходит
|
||
this._next_txn();
|
||
return r;
|
||
}
|
||
|
||
async end()
|
||
{
|
||
if (!this.dbh)
|
||
{
|
||
return;
|
||
}
|
||
if (this.in_transaction)
|
||
{
|
||
// Если уже кто-то активен - ждём его
|
||
await new Promise((resolve, reject) => this.transaction_queue.push(resolve));
|
||
}
|
||
await this.dbh.end();
|
||
this.dbh = null;
|
||
}
|
||
|
||
_next_txn()
|
||
{
|
||
this.in_transaction = null;
|
||
const next = this.transaction_queue.shift();
|
||
if (next)
|
||
next();
|
||
}
|
||
|
||
async _query(sql, bind)
|
||
{
|
||
const do_lock = !this.in_transaction;
|
||
if (!this.dbh)
|
||
await this.connect();
|
||
if (this.in_transaction && this.connection_lost)
|
||
{
|
||
this._next_txn();
|
||
throw new Error('Connection lost while in transaction');
|
||
}
|
||
this.connection_lost = false;
|
||
sql = (bind && bind.length ? _inline(sql, bind) : sql);
|
||
let start_time;
|
||
try
|
||
{
|
||
if (this.config.log_queries)
|
||
{
|
||
start_time = Date.now();
|
||
}
|
||
if (do_lock)
|
||
this._in_transaction = true;
|
||
const r = await this.dbh.query(sql);
|
||
if (this.config.log_queries)
|
||
{
|
||
const tm = (Date.now()-start_time)/1000;
|
||
if (!this.config.slow_query_time || tm > this.config.slow_query_time)
|
||
{
|
||
console.log('> pid='+process.pid+' '+tm.toFixed(3)+' '+sql);
|
||
}
|
||
}
|
||
return r;
|
||
}
|
||
catch (e)
|
||
{
|
||
// в postgresql надо откатывать всю транзакцию при любой ошибке
|
||
// не падать, если в процессе выполнения запроса отвалилось подключение
|
||
if (this.dbh)
|
||
{
|
||
if (this.in_transaction)
|
||
{
|
||
if (this.in_transaction === true)
|
||
this._next_txn();
|
||
else
|
||
await this.in_transaction.query('rollback');
|
||
}
|
||
else
|
||
{
|
||
if (this.config.log_queries)
|
||
console.log('> rollback');
|
||
await this.dbh.query('rollback');
|
||
}
|
||
}
|
||
e.message = 'Error running query: '+sql+'\n'+e.message;
|
||
throw e;
|
||
}
|
||
}
|
||
}
|
||
|
||
// Интересная обёртка для транзакций - позволяет использовать транзакции в асинхронном коде в одном подключении БД
|
||
class Transaction extends ConnectionBase
|
||
{
|
||
constructor(dbh)
|
||
{
|
||
super();
|
||
this.dbh = dbh;
|
||
this.nested = 0;
|
||
this.rolled_back = false;
|
||
}
|
||
|
||
async begin()
|
||
{
|
||
// Вложенная транзакция
|
||
// SAVEPOINT пока не поддерживаем - просто делаем вид, что началась вложенная транзакция
|
||
this.nested++;
|
||
return this;
|
||
}
|
||
|
||
async commit()
|
||
{
|
||
if (this.nested > 0)
|
||
{
|
||
this.nested--;
|
||
}
|
||
else
|
||
{
|
||
await this.query('commit');
|
||
}
|
||
}
|
||
|
||
async rollback()
|
||
{
|
||
if (!this.rolled_back)
|
||
{
|
||
await this.query('rollback');
|
||
this.rolled_back = true;
|
||
}
|
||
else if (this.nested > 0)
|
||
{
|
||
this.nested--;
|
||
}
|
||
}
|
||
|
||
async query(sql, bind)
|
||
{
|
||
if (this.rolled_back)
|
||
{
|
||
return null;
|
||
}
|
||
// Здесь уже ждать никого не надо, т.к. если мы сюда попали - то уже дождались своей очереди априори
|
||
const r = await this.dbh._query(sql, bind);
|
||
if (sql.length == 6 && sql.toLowerCase() == 'commit' ||
|
||
sql.length == 8 && sql.toLowerCase() == 'rollback')
|
||
{
|
||
this.dbh._next_txn();
|
||
}
|
||
return r;
|
||
}
|
||
}
|
||
|
||
module.exports = {
|
||
select_builder,
|
||
where_builder,
|
||
quote,
|
||
quote_into: _inline,
|
||
select,
|
||
insert,
|
||
delete: _delete,
|
||
update,
|
||
values,
|
||
Text,
|
||
Connection,
|
||
MS_HASH,
|
||
MS_LIST,
|
||
MS_ROW,
|
||
MS_COL,
|
||
MS_VALUE,
|
||
};
|