SockPQd - Клиентский интерфейс PQD-Facade

PQD::Facade — клиентский pure-perl интерфейс к sockPQd.

sockPQd — гибкий, высокопроизводительный, мультиплексирующий диспетчер очередей с приоритетами с поддержкой отказоустойчивости. За документацией по самому sockPQd вы можете обратиться к perldoc для PQD::Daemon или к статье SockPQd - диспетчер очередей с приоритетами.

PQD::Facade предоставляет клиентский интерфейс к серверам sockPQd, и поддерживает практически полный спектр возможностей, включая резервирование и опциональное логгирование через Log::Log4perl.

Do you want to try some new features? By joining the beta, you will get access to experimental features, at the risk of encountering bugs and issues.

Ок Нет, спасибо

Пример

use PQD::Facade;
use Log::Log4perl; # OPTIONAL
 
my $pqd = PQD::Facade->new(
    servers => [ [ "server1.local", 11361 ], [ "server2.local", 11361 ] ],
    timeout => 5,
    logger  => get_logger("PQD.Facade.Log.Path"), # OPTIONAL
);
 
my $job = $pqd->retry(sub { $pqd->get(
    blocking => 1,
    expire   => 300,
    then     => 'later',
) } );
 
$pqd->retry(sub { $pqd->put(
    queue    => "test",
    data     => [ { key1 => "value1", key2 => "value2" }, "abc" ],
    priority => 1,
    new_item => 1,
) } );
 
$pqd->retry(sub { $job->done });

Методы

Ниже описаны методы PQD::Facade.

$pqd = PQD::Facade->new(%params)

Конструктор объекта соединения с кластером sockPQd (возможно, состоящим из всего лишь одного сервера). Сразу пробует соединиться со всеми серверами кластера и выбрать из них сервер с минимальным ID; если соединение ни с одним из серверов не удаётся и не был передан параметр nonfatal, возвращает undef. Параметры принимает в хеше, вот их список:

servers
Ссылка на массив двухэлементных массивов, первый элемент которых — адрес (IP или DNS-адрес), а второй — порт сервера. В этом массиве обязательно должны быть перечислены все сервера вашего кластера, так как резервирование осуществляет именно клиентская библиотека. Параметр является обязательным.
timeout
Значение времени ожидания для лежащих в основе TCP/IP соединений.
nonfatal
Если передан этот параметр с истинным значением, то неудача соединения со всеми серверами кластера во время создания объекта не ведёт к возвращению undef — объект всё-таки создаётся, но в отключённом состоянии.
logger
Логгер Log4perl-а, полученный произвольными средствами. Если этот параметр задан, в переданный логгер будут направляться различные сообщения от объекта.
post_connect
Ссылка на функцию, которую следует вызывать после каждого переподключения к какому-либо серверу. Первым параметром ей в таких случаях передаётся объект соединения $self.

$pqd->retry($coderef)

Метод для осуществления бесконечного цикла попыток выполнения переданной функции $coderef с ожиданием возвращения ей не-undef значения, и переподключением после каждой попытки. Все функции объекта соединения и объектов заданий возвращают undef в случае, если происходит ошибка передачи данных между сервером и клиентом, поэтому метод retry совместим со всеми ними. Основное предполагаемое использование — «обрамление» вызовов этих методов в $pqd->retry с помощью использования анонимных функций, например: $pqd->retry(sub { $job->done });

$job = $pqd->get(%params)

Метод получения задания из очереди. Возвращает undef в случае ошибки, 0 в случае пустой очереди и неблокирующего режима, и объект задания PQD::Facade::Job в случае успешного получения заданий. Параметры принимает в хеше, вот их список:

queue
Имя очереди или несколько имён очередей, перечисленных через символ '|', из одной из которых нужно получить задание. Имя каждой очереди может содержать только символы [a-zA-Z0-9_]. Параметр необязательный, без его указания выборка идёт из произвольной очереди.
blocking
Истинное значение данного параметра делает операцию блокирующей — в случае отсутствия подходящих заданий вызывающий поток блокируется до момента появления хотя бы одного такого задания.
empty
Истинное значение параметра empty вместе с blocking изменяет поведения блокирования — вызывающий поток разблокируется также в случае полного опустошения выбранной очереди.
expire
Целое положительное число, специальный параметр, говорящий о том, что если через expire секунд sockPQd не поступит информация о том, что задание либо успешно выполнено ($job->done), либо успешно провалено ($job->later), его следует либо заново поставить в очередь, либо просто удалить.
then
Какой конкретно вариант поведения expire будет использован, зависит от параметра then и режима, в котором работает сервер sockPQd. Если параметр then указан и равен 'done', задания будут удаляться. Если параметр then указан и равен 'later', задания будут ставиться в очередь заново. Если параметр then не указан, поведение зависит от параметров сервера — по умолчанию это перепостановка заданий в очередь, но при запуске сервера можно указать и обратное поведение.

$pqd->put(%params)

Метод добавления задания в очередь. Параметры принимает в хеше, вот их список:

queue
Имя очереди, к которой относится задание. Может состоять из символов [a-zA-Z0-9_].
priority
Приоритет задания — любое целое число. Задания, имеющие больший приоритет, попадают в начало своей очереди.
data
Данные задания — любое значение. Скаляры передаются, как есть, прочие структуры данных сериализуются и десериализуются с помощью функций Storable (nfreeze и thaw).
item
ID «элемента» задания. Как можно прочитать в документации, sockPQd может следить за выполнение групп заданий, и передача значения item и есть способ отнесения задания к группе с ID = item. ID может состоять им символов [a-zA-Z0-9_].
new_item
Истинное значение данного параметра говорит о том, что sockPQd должен сгенерировать уникальный ID группы заданий на своё усмотрение, отнести задание к этой группе и сообщить сгенерированный ID нам — в случае успеха возвращаемое методом put значение будет равняться этому новому ID.
wait
Специальный параметр, говорящий о том, что сразу после добавления задания в очередь queue нужно заблокировать наш поток в ожидании появления задания с таким же ID (переданным через item или сгенерированным через new_item) в другой очереди с именем, равным значению параметра wait. Важно то, что совокупность двух операций — добавления и ожидания задания — в данном случае выполняется атомарно. При указании параметра wait обязательным становится указание одного из параметром item или new_item. Соответственно, при успехе в этом случае put возвращает объект задания.
Параметр wait несовместим с параметрами pipe и at.
expire
Использование expire возможно только при заданном параметре wait, и аналогично параметру wait в методе get (см.выше).
then
Использование then возможно только при заданных параметрах wait и expire, и аналогично параметру then в методе get (см.выше).
pipe
Истинное значение данного параметра говорит о том, что задание следует на самом деле трактовать как событие. А в чём разница, спросите вы. Разница в том, что произошедшее событие при отсутствии ожидающих его клиентов удаляется, а не помещается в очередь — так и здесь: если такое задание не попадёт ни одному из клиентов сразу же, оно удаляется.
Параметр pipe несовместим с параметрами at и wait.
at
Специальный параметр, UNIX время отложенного запуска задания (должно быть больше текущего системного UNIX времени). Клиентская библиотека осуществляет отложенный запуск заданий прозрачно для разработчика с использованием двух соединений с одним и тем же сервером и приёма, описанного в документации PQD::Daemon — одно соединение добавляет задание, а второе ожидает его получения с параметром expire, равным разности at и текущего системного времени, сразу получает и игнорирует это задание.
Параметр at несовместим с параметрами pipe и wait.

$pqd->totals($queue)

Получение статистики по всем очередям в случае $queue=undef или только по очереди $queue, если этот параметр задан. В списочном контексте возвращает массив из 4 целых чисел — количества очередей, количества приоритетов, количества заданий в очередях и количества «выполняющихся» (полученных с помощью get(), но ещё не завершённых с помощью done() или later()) заданий. В скалярном контексте возвращает число, равное сумме 3-его и 4-ого элементов вышеописанного массива.

$pqd->runlist($queue, $data)

Получает список всех выполняющих на данный момент заданий из всех очередей или только очереди $queue, без самих данных заданий в случае ложного значения $data, и с данными в случае истинного значения $data. В случае ошибки возвращает undef, в случае успеха — ссылку на массив хешей. Элементами хешей являются:

rid
ID выполняющегося задания.
queue
Очередь, из которой задание было извлечено для выполнения.
item
ID группы заданий, к которой оно относится, или undef, если задание не относится ни к одной группе заданий.
priority
Исходный приоритет задания.
expire
UNIX время, означающее момент, в который задание будет «просрочено» — удалено или поставлено в очередь заново.
data_len и data
Длина данных и сами данные в сыром виде, соответственно. Только в случае, если значение параметра метода $data было истинно.

$pqd->backup($file)

Переводит сервер в оффлайн-режим и сохраняет его текущее состояние в файл $file.

$pqd->restore($file)

Восстанавливает состояние сервера из файла $file и переводит его в онлайн-режим.

$pqd->shutdown

Останавливает сервер.

PQD::Facade::Job

Ниже перечислены методы и поля объекта задания — PQD::Facade::Job. Объект являет собой хеш, в котором можно найти следующие элементы:

rid
ID выполняющегося задания.
queue
Очередь, из которой было получено задание.
item
ID группы заданий, к которой оно относится, или undef, если задание не относится ни к одной группе заданий.
priority
Исходный приоритет задания.
expire
UNIX время, означающее момент, в который задание будет «просрочено» — удалено или поставлено в очередь заново.
data
Десериализованные данные задания.
sockd
Объект соединения, к которому относится задание.
finq
Только после успешного выполнения $job->done — если finq содержит истинное значение, это значит, что задание было последним в своей очереди, и больше в ней нет ни выполняющихся, ни ожидающих получения заданий.
fini
Только после успешного выполнения $job->done — если fini содержит истинное значение, это значит, что задание было последним в своей группе заданий, и больше в ней нет ни выполняющихся, ни ожидающих получения заданий.

Методы

Далее перечислены методы объекта задания. Их немного:

$job->done

Сообщить об успешном выполнении задания серверу sockPQd. При успехе функция возвращает истинное значение, а задание удаляется из списка выполняемых на сервере. В принципе, истинные возвращаемые значения также дают вторую возможность отслеживания параметров finq и fini — если $job->done вернуло значение 1, значит, задание просто выполнено, если 2, то fini истинно, если 3, то finq истинно, а если 4, истинны и finq, и fini. При возникновении ошибки соединения возвращается undef.

$job->later

Сообщить о проваленном задании серверу sockPQd — это означает, что при успехе функция вернёт истинное значение, а задание будет удалено из списка выполненных на сервере, но одновременно будет добавлено обратно в конец очереди, из которой было получено изначально.

$job->requeue

Добавить задание в очередь вызовом $job->{sockd}->put(). Ключевой момент в том, что перед вызовом данного метода можно модифицировать элементы хеша $job — например, очередь, приоритет, данные задания… Что даёт возможность добавления «похожих» заданий.