SockPQd - диспетчер очередей с приоритетами

Материал из YourcmcWiki
Перейти к: навигация, поиск

sockPQd — демон, управляющий множеством равноправных очередей с приоритетами и принимающий команды через TCP-сокеты. PQD::Daemon — головной модуль демона.

Чтобы воспользоваться sockPQd, необходимо скачать модули PQD::xx, а также исполняемый файл pqd.pl.

Описание

sockPQd очень гибок — вы поймёте это и сами после прочтения документации. По сути sockPQd является высокопроизводительным и надёжным диспетчером заданий. С помощью sockPQd вы превращаете свой сервер в клиента — доставку запросов (заданий) обеспечивает sockPQd. В основе распределения заданий лежит модель, представляющая из себя множество очередей с приоритетами.

sockPQd может служить «балансировщиком нагрузки» между очередями. sockPQd управляет множеством очередей с приоритетами. Каждая очередь имеет название, которое передаётся в запросах добавления и извлечения элементов из очереди вместе с приоритетом и данными задания. В случае выбора задания из произвольного очереди sockPQd выступает именно в роли «балансировщика нагрузки» между очередями, так как все очереди рассматриваются как равноправные, и задание выбирается из произвольной. Если же вы хотите реализовать целочисленные «приоритеты» очередей, чтобы средние количества выбранных из очередей заданий были им пропорциональны, просто используйте N очередей вместо одной, где N — приоритет очереди, и помещайте свои задания случайно в одну из них.

sockPQd не является ни многопоточным, ни многопроцессным демоном. Обслуживание всех клиентов осуществляется одним потоком, мультиплексирующим запросы с помощью libevent и её Perl-биндингов Event::Lib. Это ликвидирует необходимость в межпроцессном взаимодействии и синхронизации и позволяет sockPQd быть простым, а следовательно, надёжным. Кроме того, это же позволяет легко блокировать клиентов, просто откладывая на время ответ и предоставляя клиентской библиотеке осуществление блокировки вызывающего процесса или потока с помощью использования блокирующего ожидания входящих данных на сокете. Хотя, мультиплексирование имеет и недостаток: добавление и извлечение крупных заданий будут задерживать всех остальных клиентов. Следовательно, применяя sockPQd, нужно стараться использовать задания небольшого размера (до ~1450 байт, если помнить о том, что стандартное значение MTU (Maximum Transmission Unit — максимальный размер одного пакета) для протокола TCP/IP равно 1500 байтам). Но в любом случае, использование мелких заданий — хорошо, а больших — плохо, поэтому недостаток несущественный.

sockPQd поддерживает возможность резервирования сервера очереди. Если запустить sockPQd на нескольких серверах, и каждому указать полный список всех запущенных sockPQd, то они получат последовательные <ID сервера>: 1, 2, 3 и т. д. В этом случае все изменения с любого из серверов будут реплицироваться на остальные по внутренним соединениям. Идея <ID сервера> в том, чтобы клиенты работали всегда только с одним (логически «главным») сервером, а остальные (логически «подчинённые») были резервными. Главным сервером в каждый момент времени должен являться только сервер с минимальным <ID сервера>. В случае его остановки клиенты должны соединяться со следующим, и т. д. Если далее сервер, бывший главным, снова включится в работу, он получит не номер 1, а (max+1), где max — максимум из всех номеров запущенных серверов.

sockPQd может следить за опустошением именованных очередей. Если после выполнения задания очередь опустела, и нет заданий, относящихся к данной очереди и выполняющихся в данный момент, при ответе на запрос DONE sockPQd оповещает клиента о том, что очередь опустела.

sockPQd может следить за выполнением групп заданий, далее называемых «элементами», или подпоследовательностями очереди. При постановке задания в очередь клиент может передать «имя элемента», к которому в этом случае будет отнесено задание. Идея здесь заключается в том, что бывают ситуации, когда задания логически являются частями некоторых более крупных задач, и необходимо отслеживать момент завершения выполнения каждой такой задачи, являющейся объединением нескольких заданий в одной или даже в разных очередях.

Другая возможность слежения за выполнением групп заданий заключается в том, что вместе с запросом добавления задания (в данном случае «входного» задания) возможно сразу же запросить появления задания, относящегося к этой же группе заданий («выходного» задания), в другой очереди. Идея такова: допустим, клиент хочет, чтобы его задание было выполнено, и хочет дождаться, блокируясь на сокете, момента выполнения этого задания. Таким образом он ставит во «входную» очередь своё задание, система обработки обрабатывает его, и добавляет задание с конечной информацией в «выходную» очередь; клиент сразу получит это «задание» с результатами своего запроса.

Очевидно, завершение выполнения такого «элемента» может произойти лишь точно в момент после выполнения последнего из заданий, входящих в её состав, поэтому при ответе на запрос DONE sockPQd проверяет наличие заданий, входящих в состав той же «задачи», и если таких более нет, оповещает об этом клиента.

sockPQd может ограничивать время выполнения заданий. При извлечении задания из очереди клиент может указать максимальное время выполнения задания, по истечении которого его следует либо уничтожить, либо поставить в очередь заново; такое поведение бывает необходимо для повышения надёжности обработки задач.

sockPQd может играть роль распределённого планировщика заданий. Очевидно, здесь весь смысл заключается в «отложенном добавлении» заданий в очередь. То есть, в добавлении заданий в очередь не сразу, а лишь по истечении некоторого срока. Это можно сделать, добавляя задание в очередь, сразу же извлекая его оттуда с указанием максимального времени выполнения, «забывая» про него и ставя его в очередь заново через заданное время. При этом факт опустошения очереди будет установлен как раз только после удаления всех таких «отложенных» заданий.

Следует также отметить, что очередь с приоритетами, поддерживающая две основных команды — «получить задание из очереди» и «добавить задание в очередь», является структурой, весьма трудно поддающейся распараллеливанию, так как, как минимум, такая структура данных не знает команд типа «только чтение» — каждый запрос либо добавляет задание в очередь, либо удаляет задание из очереди. Однако обычно в задачах распараллеливание очереди заданий и не требуется, по причине очень небольших размеров каждого задания и относительно большой вычислительной сложности каждого.

Протокол sockPQd

Команда и ответ (статус) всегда передаются по сокету \x0d\x0a-завершёнными строками. Когда нужно передать блок данных, в строке запроса или ответа передаётся длина блока данных, а сам блок данных передаётся после строки.

Список команд протокола sockPQd:

GET — получить задание из очереди

Запрос: GET [имя_очереди[|имя_очереди|...]] [EXPIRE <секунды> [THEN (DONE|LATER)]]
Запрос: GETB [имя_очереди[|имя_очереди|...]] [EXPIRE <секунды> [THEN (DONE|LATER)]]
Запрос: GETBE [имя_очереди[|имя_очереди|...]] [EXPIRE <секунды> [THEN (DONE|LATER)]]
 Ответ: 200 OK <имя_очереди> <id_задания> <приоритет> <длина_данных> [IS <имя_элемента>]
        <данные>
 Ответ: 404 Queue Empty
 Ответ: 500 Offline Mode

Действие:

С равной вероятностью из одной из непустых именованных очередей с приоритетами, либо точно из одной из очередей, заданной в запросе по имени, извлекается первое задание с наивысшим приоритетом, этому заданию присваивается новый <id_задания> (целое положительное число) и данные задания передаются клиенту в виде <данных> в ответе.

Если задание относится к элементу (подпоследовательности очереди) с именем <имя_элемента>, в ответе будет присутствовать строка IS <имя_элемента>.

Если в запросе указано GETB и задание для выдачи отсутствует, то ответ на команду откладывается до появления нужного задания. Если в запросе не указано GETB и задание для выдачи отсутствует, sockPQd отдаёт ответ «404 Queue Empty».

Если в запросе указано GETBE, задание для выдачи отсутствует, но при этом существуют незавершённые задания из одной из заданных очередей, то ответ на команду откладывается либо до появления задания для выдачи, либо до завершения всех выполняющихся заданий из очереди.

Если в запросе указано EXPIRE=<секунды>, то при выдаче задания будет сохранено максимальное время его выполнения, и если реальное время выполнения превысит максимальное, то задание будет автоматически поставлено обратно в конец очереди или удалено. По умолчанию задание будет заново поставлено в конец очереди. При запуске sockPQd ключ -d указывает, что по умолчанию нужно удалять такие задания. Кроме того, если в запросе указать THEN DONE (или THEN LATER), то вне зависимости от действия по умолчанию задание будет удалено (или поставлено в конец очереди).

Если в данный момент демон работает в оффлайн-режиме, то команда не выполняется и не блокируется до появления задания, а sockPQd сразу отдаёт ответ «500 Offline Mode».

PUT — добавить задание в очередь

Запрос: PUT <q> <p> <длина_данных> [(IS <имя_элемента>|NEW) [WAIT <oq> [EXPIRE <сек> [THEN (DONE|LATER)]]]]
        <данные>
 Ответ: 200 OK
 Ответ: 200 OK IS <имя_элемента>
 Ответ: 206 Wait for output
        200 OK <имя_очереди> <id_задания> <приоритет> <длина_данных> [IS <имя_элемента>]
        <данные>
 Ответ: 206 Wait for output IS <имя_элемента>
        200 OK <имя_очереди> <id_задания> <приоритет> <длина_данных> [IS <имя_элемента>]
        <данные>
 Ответ: 500 Offline Mode

Действие:

В именованную очередь с именем добавляется новое задание с приоритетом <p>. <Данные> задания сохраняются для последующей передачи исполнителям. Если в запросе передаётся <имя_элемента>, задание относится к элементу (подпоследовательности очереди) с именем <имя_элемента>. Имя очереди и имя элемента могут состоять из латинских букв, цифр и символа '_'. <p> — целое, возможно, отрицательное, число.

Если указать в запросе WAIT <oq>, sockPQd отдаст клиенту ответ 206 Wait for output, и будет ожидать появления задания, относящегося к элементу с именем <имя_элемента> в очереди с именем <oq>; когда такое задание станет доступно, оно сразу же будет отдано клиенту так же, как в ответе на GET-запрос. Если при этом в запросе было указано EXPIRE, то по прошествии <сек> секунд после отдачи задания оно будет поставлено в очередь заново или удалено, точно так же как и в команде GET, в зависимости от указания ключа -d при запуске sockPQd и указания в запросе THEN.

Если в данный момент демон работает в оффлайн-режиме, команда не выполняется, а только отдаётся ответ «500 Offline Mode».

DONE — передать факт успешного выполнения задания

Запрос: DONE <id_задания>
 Ответ: 200 OK
 Ответ: 200 OK FINQ
 Ответ: 200 OK FINI
 Ответ: 200 OK FINQ FINI
 Ответ: 404 Job Not Found
 Ответ: 500 Offline Mode

Действие:

Выданное ответом на запрос GET задание удаляется из очереди. Если выданного задания с таким <id_задания> в данный момент не существует, отдаётся ответ 404 Job Not Found.

Если с выполнением данного задания заканчиваются задания, относящиеся к той же очереди, что и оно само, в ответе передаётся FINQ.

Если выполняемое задание относилось к некоторому элементу (подпоследовательности очереди), и после выполнения данного задания более не остаётся заданий, относящихся к этому же элементу, в ответе передаётся FINI.

Если в данный момент демон работает в оффлайн-режиме, команда не выполняется, а только отдаётся ответ «500 Offline Mode».

LATER — передать факт неуспешного выполнения задания

Запрос: LATER <id_задания>
 Ответ: 200 OK
 Ответ: 404 Job Not Found
 Ответ: 500 Offline Mode

Действие:

Выданное ответом на запрос GET задание заново ставится в конец очереди. Если выданного задания с таким <id_задания> в данный момент не существует, отдаётся ответ 404 Job Not Found. Если в данный момент демон работает в оффлайн-режиме, команда не выполняется, а только отдаётся ответ «500 Offline Mode».

SYNC GET

Запрос: SYNC GET <имя_очереди> <id_задания> <приоритет>
Запрос: SYNC GET LAST <имя_очереди> <id_задания> <приоритет>
 Ответ: 200 OK

Действие:

Первому, или последнему в случае второго вида запроса, заданию с приоритетом <приоритет> из очереди с именем <имя_очереди> присваивается номер <id_задания>.

Репликации данного изменения на другие сервера не происходит. Оффлайн-режим игнорируется. Команда используется серверами кластера sockPQd при репликации изменений.

SYNC PUT, SYNC DONE, SYNC LATER

Команды полностью аналогичны PUT, DONE и LATER во всём, кроме двух пунктов:

  • Команды SYNC PUT, SYNC DONE и SYNC LATER не реплицируются на другие сервера, так как предполагается, что они сами используются только серверами кластера и только при репликации изменений.
  • Команды SYNC PUT, SYNC DONE и SYNC LATER не обращают внимание на оффлайн-режим, то есть никогда не отдают ответ «500 Offline Mode».

TOTAL — получить статистику по очередям

Запрос: TOTAL [имя_очереди]
 Ответ: 200 OK <очереди> <приоритеты> <задания> <выполняющиеся_задания>

Действие:

В ответе sockPQd передаёт общее количество именованных очередей, классов приоритетов по всем очередям, и заданий по всем очередям. Если в запросе было указано имя очереди, возвращается статистика только по очереди с этим именем.

SNAPSHOT — получить снимок состояния sockPQd

Запрос: SNAPSHOT
Запрос: SNAPSHOT <число> <IP> <порт>
 Ответ: 200 OK <длина_данных>
        <данные>

Действие:

sockPQd создаёт мгновенный снимок состояния и передаёт его в ответе в качестве <данных>.

Также, если в запросе указано <число>, <IP> и <порт>, текущее соединение сохраняется как соединение с сервером из кластера с ID <число> и адресом <IP>:<порт>. После этого сохранённое соединение может быть использовано для репликации.

Вторая форма запроса необходима для того, чтобы сервер из кластера мог сохранить снимок данных и сразу включиться в репликацию изменений, происходящих после передачи снимка.

RUNLIST — получить список «выполняющихся заданий»

Запрос: RUNLIST
Запрос: RUNLIST DATA
 Ответ: 200 OK <n_количество_выполняющихся_заданий>
        id_задания_1> <имя_очереди> <приоритет> [длина_данных] [EXPIRE <время_просрочки>]
        ...
        <id_задания_n> <имя_очереди> <приоритет> [длина_данных] [EXPIRE <время_просрочки>]
        [<данные_1>
        ...
        <данные_n>]

Действие:

В ответе sockPQd передаёт список «выполняющихся» в данный момент заданий с их ID задания, именами очередей, приоритетами, временами экспирации (просрочки) и, если в запросе было указано DATA, данными самих заданий. При передаче данных в начале передаётся по 1 строке на задание с информацией о нём, а далее их данные подряд, друг за другом в том же порядке.

CLONE — восстановить состояние из снимка

Запрос: CLONE <длина_данных>
        <данные>
 Ответ: 200 OK
 Ответ: 500 Invalid Snapshot

Действие:

Если <данные> содержат корректный снимок состояние sockPQd, sockPQd восстанавливает состояние из снимка и отвечает 200 OK. В противном случае sockPQd отвечает 500 Invalid Snapshot.

OFFLINE — переключить демон в режим оффлайн

Запрос: OFFLINE
 Ответ: 200 OK Switched to Offline Mode

Действие:

sockPQd переключается в оффлайн-режим, в котором на все команды работы с очередью, кроме репликационных, отдаётся ответ «500 Offline Mode».

ONLINE — переключить демон в режим онлайн

Запрос: ONLINE
 Ответ: 200 OK Switched to Online Mode

Действие:

sockPQd переключается в обычный режим.

INTERCONNECT — определить и/или передать ID сервера

Запрос: INTERCONNECT
Запрос: INTERCONNECT <число> <IP> <порт>
 Ответ: 200 OK <число>

Действие:

В ответе sockPQd передаёт свой ID сервера. ID сервера — целое положительное число. Первому стартующему серверу кластера присваивается номер 1, каждому последующему (максимальный ID сервера в кластере) + 1.

Также, если в запросе указано <число>, <IP> и <порт>, текущее соединение сохраняется как соединение с сервером из кластера с ID <число> и адресом <IP>:<порт>. После этого сохранённое соединение может быть использовано для репликации.

QUIT — закрыть текущее соединение

Запрос: QUIT
 Ответ: 221 Goodbye

Действие:

sockPQd закрывает текущее соединение.

SHUTDOWN — завершить работу демона

Запрос: SHUTDOWN
 Ответ: 221 Shutting Down

Действие:

sockPQd завершает работу, все клиенты отключаются.

Тесты производительности

Ниже приведены результаты двух простых тестов производительности старой версии sockPQd, призванных оценить зависимость времени выполнения основных операций от общего количества заданий, присутствующих на данный момент во всех очередях, находящихся под управлением sockPQd.

Первый тест представляет собой десятикратное измерение времени добавления 40000 мелких (размером 20-40 байт) заданий со случайными приоритетами, каждое в случайную из 10 очередей. Таким образом, на графике отражено время добавления новых 40000 заданий при наличии в очередях суммарно 0, 40000, 80000, и т. д., заданий.

Второй тест представляет собой десятикратное измерение времени извлечения и «выполнения» (то есть, сообщения о выполнении задания в sockPQd) 40000 заданий из случайных очередей. Таким образом, на графике отражено время извлечения 40000 заданий при наличии в очередях суммарно 40000, 80000, и т. д., заданий.

Добавление заданий в очередь
Извлечение заданий из очереди

Таким образом, на рисунке мы видим, что:

  • Производительность добавления заданий в очередь практически не зависит от количества заданий в очереди, подскакивает лишь в начале, а далее колеблется возле приблизительно одних значений (начиная со 160000 заданий в очереди, добавление новых 40000 занимало приблизительно по 7.6 секунд).
  • Производительность извлечения заданий из очереди линейно зависит от количества заданий в очереди; при изменении количества заданий в очереди от 40000 до 200000 разница в производительности составила приблизительно 7%. Это обусловлено использованием массивов, а не связных списков — при каждом удалении элемента из начала массива происходят копирования данных, замедляющие процесс. Решить данную проблему можно, используя вместо массивов связные списки заданий (см. ниже).
  • Производительность добавления заданий в очередь при наличии в очереди 100000-200000 заданий составляет примерно 5260 заданий в секунду.
  • Производительность извлечения заданий из очереди при наличии в очереди 100000-200000 заданий составляет примерно 2580 заданий в секунду.
  • Произвольные скачки, вероятно, обусловлены случайными факторами (тестирование производилось на рабочем компьютере, на котором одновременно были запущены и прочие программы).

После проведения этих тестов sockPQd был изменён для использования связных списков вместо массивов, как более оптимальной и масштабируемой структуры данных; тесты были повторены с той разницей, что теперь проводилось сначала 20 повторов добавления 20000 заданий, а потом 20 повторов извлечения 20000 заданий, вместо 10 повторов по 40000 заданий. Результаты тестов можно видеть на рисунках:

Связные списки — извлечение заданий
Связные списки — добавление заданий

Видно, что после этого изменения производительность извлечения задач из очереди перестала линейно зависеть от общего количества заданий и составляет приблизительно 3225 заданий в секунду; производительность добавления задач в очередь также возросла (приблизительно на 16\%) и составляет теперь в среднем 6115 заданий в секунду.

История

sockPQd был разработан при реализации дипломной работы.

За основу модели распределения нагрузки по физическим узлам была принята стандартная структура данных «очередь с приоритетами», как наиболее очевидно подходящая под задачи последовательной обработки большого количества запросов. Как известно, обычная очередь — это структура, добавление элементов в которую происходит только в конец, а извлечение элементов из которой происходит только из начала. Очередь с приоритетами можно рассматривать как список обычных очередей, в котором каждой очереди присвоено некоторое целое число; добавление элементов производится точно в указанную очередь, а извлечение элементов производится всегда из начала непустой очереди с максимальным приоритетом.

Основными требованиями к компоненту были высокая и максимально независимая от количества запросов в системе производительность, простота, отказоустойчивость, упор на сетевое взаимодействие узлов, а также гибкость и наличие различных способов использования. Результатом разработки стал отказоустойчивый UNIX-демон sockPQd — диспетчер очередей с приоритетами, обслуживающий клиентов по протоколу TCP/IP, хранящий всю базу заданий в каждый момент времени полностью в оперативной памяти и поддерживающий работу в кластерной среде — репликацию изменений на другие узлы (не обязательно физические; вполне возможно настроить репликацию между двумя демонами sockPQd, расположенными на одном физическом узле; цель таких манипуляций, конечно, осталась бы неясна).

Новая версия

Новая версия, переписанная с использованием Event::Lib, имеет гораздо более простой и структурированный код, а также показывает немного лучшую производительность.

Основным улучшением в ней являлась оптимизация кода. В первой версии производительность уменьшала специфики языка, например, в области обработки ошибок работы с сокетами.

Кроме того, старая версия sockPQd использовала для мультиплексирования запросов модуль IO::Select, основанный на системном вызове select(2), описанном в стандартах POSIX.1-2001 и 4.4BSD, а этот системный вызов, к сожалению, имеет некоторые недостатки — например, ограничение в 1024 одновременных соединения (реальное число может варьироваться от реализации к реализации) [1]. Здесь улучшением, собственно, и послужил перевод sockPQd на использование высокопроизводительной библиотеки обработки событий libevent.

Вообще, из [1] и источников, на которые ссылается эта статья, можно почерпнуть много полезной информации по поводу реализации высокопроизводительных серверов, обслуживающих большие количества запросов.

Дальнейшее развитие

Что ещё можно было бы реализовать в механизме самого диспетчера?

Реализация временной выгрузки крупных отрезков очереди на жёсткий диск с целью добавления возможности поддержки очередей, состоящих из действительно больших количеств заданий, или просто из крупных заданий (хотя последнее в рамках разработанной модели и не приветствуется). Реализация параллельных очередей (заметьте, что слова «с приоритетами» здесь отсутствуют — ниже объяснено, почему). То есть, реализация поддержки возможности распределения нагрузки на диспетчер по нескольким узлам. Идея распараллеливания очереди в том, что по достижении достаточно большого количества заданий в очереди становится возможно время от времени распределять списки заданий по нескольким узлам для их последующей выдачи клиентам.

Два недостатка такого подхода: во-первых, он может частично нарушать порядок выдачи заданий клиентам; во-вторых, он требует гораздо более совершенных методик синхронизации между отдельными узлами, что приводит к значительному усложнению структуры sockPQd. Кроме того, выше указано, что здесь рассматриваются уже не очереди с приоритетами, а обычные очереди. Вопрос: почему? Ответ: потому, что если мы и можем пренебречь изменением порядка выдачи заданий с одним приоритетом, то пренебречь выдачей задания с большим приоритетом позже, возможно, многих заданий с меньшим приоритетом мы не можем. Всё это, конечно, не значит, что в данном случае потребуется полный отказ от поддержки приоритетов, однако каждое добавление задания с большим приоритетом будет провоцировать новую синхронизацию между узлами, что обязательно приведёт к снижению производительности и главное — масштабируемости.

Однако, как говорят нам и тесты, и здравый смысл, вместо реализации этого пути для дальнейшего масштабирования диспетчера гораздо проще модифицировать клиентские библиотеки и добавить в них возможность использования нескольких кластеров диспетчеров вместо одного. Чуть большая необходимость в распараллеливании очередей может появиться лишь при создании действительно огромных вычислительных кластеров… Хотя даже в этой ситуации присутствуют более простые пути решения проблемы.

Ссылки

  1. 1,0 1,1 Dan Kegel. The C10K problem. http://www.kegel.com/c10k.html