NULL handling, quote(), DISTINCT ON and transaction wrapper

master
Vitaliy Filippov 2019-07-05 13:51:43 +03:00
parent f37c5d33fa
commit 70dcc2c70b
1 changed files with 208 additions and 47 deletions

View File

@ -1,6 +1,6 @@
// Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8 // Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8
// (c) Виталий Филиппов, 2019 // (c) Виталий Филиппов, 2019
// Версия 2019-06-05 // Версия 2019-07-04
// В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи, // В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи,
// благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ] // благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ]
@ -25,7 +25,14 @@ const MS_VALUE = 6;
function select_builder(tables, fields, where, options) function select_builder(tables, fields, where, options)
{ {
options = options||{};
let sql = 'SELECT ', bind = []; let sql = 'SELECT ', bind = [];
if (options['DISTINCT ON'] || options.distinct_on)
{
let group = options['DISTINCT ON'] || options.distinct_on;
group = group instanceof Array ? group : [ group ];
sql += 'DISTINCT ON ('+group.join(', ')+') ';
}
if (fields instanceof Array) if (fields instanceof Array)
{ {
sql += fields.join(', '); sql += fields.join(', ');
@ -54,7 +61,6 @@ function select_builder(tables, fields, where, options)
sql += ' AND '+t.moreWhere.sql; sql += ' AND '+t.moreWhere.sql;
bind.push.apply(bind, t.moreWhere.bind); bind.push.apply(bind, t.moreWhere.bind);
} }
options = options||{};
if (options['GROUP BY'] || options.group_by) if (options['GROUP BY'] || options.group_by)
{ {
let group = options['GROUP BY'] || options.group_by; let group = options['GROUP BY'] || options.group_by;
@ -168,7 +174,7 @@ function tables_builder(tables)
// - key does not contain '?', value is an empty array => just (key) // - key does not contain '?', value is an empty array => just (key)
// - key contains '?', value is a scalar or non-empty array => (key) with bind params (...value) // - key contains '?', value is a scalar or non-empty array => (key) with bind params (...value)
// - key is numeric, then value is treated as in array // - key is numeric, then value is treated as in array
function where_or_set(fields, where) function where_or_set(fields, for_condition)
{ {
if (typeof fields == 'string') if (typeof fields == 'string')
{ {
@ -205,30 +211,43 @@ function where_or_set(fields, where)
v = v instanceof Array ? v : [ v ]; v = v instanceof Array ? v : [ v ];
if (v.length == 1 && v[0] == null) if (v.length == 1 && v[0] == null)
{ {
w.push(where ? k+' is null' : k+' = null'); w.push(for_condition ? k+' is null' : k+' = null');
} }
else else
{ {
if ((v.length > 1 || v[0] instanceof Array) && !where) // a IN (...) or (a, b) IN ((...), ...)
if ((v.length > 1 || v[0] instanceof Array) && !for_condition)
{ {
throw new Error('IN syntax can only be used inside WHERE'); throw new Error('IN syntax can only be used inside WHERE');
} }
if (v[0] instanceof Array) if (v[0] instanceof Array)
{ {
// (a, b) in (...) // (a, b) in ((...), ...)
w.push(k + ' in (' + v.map(vi => '('+vi.map(() => '?').join(', ')+')') + ')'); w.push(k + ' in (' + v.map(vi => '('+vi.map(() => '?').join(', ')+')') + ')');
v.map(vi => bind.push.apply(bind, vi)); v.map(vi => bind.push.apply(bind, vi));
} }
else if (!for_condition)
{
w.push(k+' = ?');
bind.push(v[0]);
}
else else
{ {
w.push(v.length == 1 let n = v.length;
? k + ' = ?' v = v.filter(vi => vi != null);
: k + ' in (' + v.map(() => '?').join(', ') + ')'); if (v.length > 0)
bind.push.apply(bind, v); {
w.push(k+' in (' + v.map(() => '?').join(', ') + ')' + (n > v.length ? ' or '+k+' is null' : ''));
bind.push.apply(bind, v);
}
else if (n > 0)
{
w.push(k+' is null');
}
} }
} }
} }
if (!where) if (!for_condition)
{ {
// SET // SET
return { sql: w.join(', '), bind }; return { sql: w.join(', '), bind };
@ -299,6 +318,19 @@ function split_using(tables)
return { what, using: Object.keys(tables).length > 0 ? tables : null, moreWhere }; return { what, using: Object.keys(tables).length > 0 ? tables : null, moreWhere };
} }
function quote(v)
{
if (!pg_escape)
pg_escape = require('pg-escape');
if (v == null)
return 'null';
else if (typeof v == 'object')
v = JSON.stringify(v);
else if (typeof v == 'number')
return v;
return '\''+pg_escape.string(v)+'\'';
}
// Превратить bind пераметры, выраженные как ?, в вид $n (как в node-postgres) // Превратить bind пераметры, выраженные как ?, в вид $n (как в node-postgres)
function _positional(sql) function _positional(sql)
{ {
@ -322,12 +354,12 @@ function _inline(sql, bind)
if (!m1) if (!m1)
return m; return m;
let v = bind[i++]; let v = bind[i++];
if (typeof v == 'object') if (v == null)
return 'null';
else if (typeof v == 'object')
v = JSON.stringify(v); v = JSON.stringify(v);
else if (typeof v == 'number') else if (typeof v == 'number')
return v; return v;
else if (v == null)
return 'null';
return '\''+pg_escape.string(v)+'\''; return '\''+pg_escape.string(v)+'\'';
} }
); );
@ -337,8 +369,17 @@ function _inline(sql, bind)
// dbh: Connection // dbh: Connection
async function select(dbh, tables, fields, where, options, format) async function select(dbh, tables, fields, where, options, format)
{ {
let { sql, bind } = select_builder(tables, fields, where, options); let sql_text;
let data = await dbh.query(sql, bind); if (arguments.length == 2 || arguments.length == 3)
{
sql_text = tables instanceof Text ? tables : new Text(tables, []);
format = fields;
}
else
{
sql_text = select_builder(tables, fields, where, options);
}
let data = await dbh.query(sql_text.sql, sql_text.bind);
if ((format & MS_LIST) || (format & MS_COL)) if ((format & MS_LIST) || (format & MS_COL))
data = data.rows.map(r => Object.values(r)); data = data.rows.map(r => Object.values(r));
else else
@ -494,40 +535,92 @@ class Text
} }
} }
class ConnectionBase
{
async select(tables, fields, where, options, format)
{
return arguments.length <= 2
? await select(this, tables, fields)
: await select(this, tables, fields, where, options, format);
}
async insert(table, rows, options)
{
return await insert(this, table, rows, options);
}
async update(table, set, where, options)
{
return await update(this, table, set, where, options);
}
async delete(table, where, options)
{
return await _delete(this, table, where, options);
}
}
// Обёртка PostgreSQL-подключения // Обёртка PostgreSQL-подключения
// Автоматически переподключается при отвале // Автоматически переподключается при отвале
class Connection class Connection extends ConnectionBase
{ {
constructor(config, init_callback) constructor(config)
{ {
super();
this.config = config; this.config = config;
this.init_callback = init_callback; this.init_connect = [];
this.in_transaction = null;
this.transaction_queue = [];
this.onerror = (e) => this.onerror = (e) =>
{ {
console.warn(e); console.warn(e);
console.warn('Database connection dropped. Reconnecting'); console.warn('Database connection dropped. Reconnecting');
this.dbh = null; this.dbh = null;
this.connection_lost = true;
this.connect(); this.connect();
}; };
this.quote = quote;
}
getConnection()
{
return this.dbh;
}
async runWhenConnected(cb)
{
if (this.dbh)
{
await cb(this);
}
this.init_connect.push(cb);
}
dontRunWhenConnected(cb)
{
this.init_connect = this.init_connect.filter(c => c != cb);
} }
async connect() async connect()
{ {
if (this.dbh) if (this.dbh)
{
return this.dbh; return this.dbh;
}
let retry = this.config.retry || 30; let retry = this.config.retry || 30;
// eslint-disable-next-line no-constant-condition
while (true) while (true)
{ {
try try
{ {
this.dbh = new pg.Client(this.config); this.dbh = new pg.Client(this.config);
await this.dbh.connect(); await this.dbh.connect();
if (this.init_callback) for (const cb of this.init_connect)
{ {
this.init_callback(); await cb(this);
} }
this.dbh.on('error', this.onerror); this.dbh.on('error', this.onerror);
return; return this.dbh;
} }
catch (e) catch (e)
{ {
@ -536,49 +629,117 @@ class Connection
await new Promise((r, j) => setTimeout(r, retry*1000)); await new Promise((r, j) => setTimeout(r, retry*1000));
} }
} }
return this.dbh; }
async begin()
{
if (this.in_transaction)
{
// Если уже кто-то активен - ждём его
await new Promise((resolve, reject) => this.transaction_queue.push(resolve));
}
this.in_transaction = new Transaction(this);
await this._query('begin');
return this.in_transaction;
} }
async query(sql, bind) async query(sql, bind)
{ {
if (!this.dbh) if (sql.length == 5 && sql.toLowerCase() == 'begin')
await this.connect(); {
console.log('> '+(bind && bind.length ? _inline(sql, bind) : sql)); throw new Error('Do not use transactions in asynchronous code directly!');
return await this.dbh.query(_positional(sql), bind); }
if (this.in_transaction)
{
// Если уже кто-то активен - ждём его
await new Promise((resolve, reject) => this.transaction_queue.push(resolve));
}
const r = await this._query(sql, bind);
// Если есть ещё кто-то в очереди - пусть проходит
this._next_txn();
return r;
} }
async select(tables, fields, where, options, format) _next_txn()
{
this.in_transaction = null;
const next = this.transaction_queue.shift();
if (next)
next();
}
async _query(sql, bind)
{ {
if (!this.dbh) if (!this.dbh)
await this.connect(); await this.connect();
return await select(this, tables, fields, where, options, format); if (this.in_transaction && this.connection_lost)
{
this._next_txn();
throw new Error('Connection lost while in transaction');
}
this.connection_lost = false;
if (this.config.log_queries)
console.log('> '+(bind && bind.length ? _inline(sql, bind) : sql));
try
{
if (!this.in_transaction)
{
this.in_transaction = true;
}
const r = await this.dbh.query(bind && bind.length ? _positional(sql) : sql, bind);
if (this.in_transaction === true)
{
this.in_transaction = false;
}
return r;
}
catch (e)
{
// в postgresql надо откатывать всю транзакцию при любой ошибке
if (this.in_transaction === true)
{
this.in_transaction = false;
}
if (this.in_transaction)
{
await this.in_transaction.query('rollback');
}
else
{
if (this.config.log_queries)
console.log('> rollback');
await this.dbh.query('rollback');
}
throw e;
}
}
}
class Transaction extends ConnectionBase
{
constructor(dbh)
{
super();
this.dbh = dbh;
} }
async insert(table, rows, options) async query(sql, bind)
{ {
if (!this.dbh) // Здесь уже ждать никого не надо, т.к. если мы сюда попали - то уже дождались своей очереди априори
await this.connect(); const r = await this.dbh._query(sql, bind);
return await insert(this, table, rows, options); if (sql.length == 6 && sql.toLowerCase() == 'commit' ||
} sql.length == 8 && sql.toLowerCase() == 'rollback')
{
async update(table, set, where, options) this.dbh._next_txn();
{ }
if (!this.dbh) return r;
await this.connect();
return await update(this, table, set, where, options);
}
async delete(table, where, options)
{
if (!this.dbh)
await this.connect();
return await _delete(this, table, where, options);
} }
} }
module.exports = { module.exports = {
select_builder, select_builder,
where_builder, where_builder,
quote,
quote_into: _inline, quote_into: _inline,
quote_positional: _positional, quote_positional: _positional,
select, select,