Мультипотоковый TCP сервер на Common Lisp

Материал из wiki.lissyara.su
Перейти к: навигация, поиск

Я написал это хау-ту по причине, того, что Я убил много времени на реализацию TCP сервера в CL; отсутствие опыта работы в Lisp привело к этому, кроме того, Я не нашёл каких-либо руководств или статей в этой области. Кроме того, есть много не документированных аргументов в функциях, из-за не знания которых, функции работали не надлежащим способом. Всё это привело к трате времени в попытках разобраться, что же происходит (:ready-only в usocket:wait-for-input). Скудная документация - это плохая ситуация для Common Lisp. :-(

Задача - написать простой класс TCP server используемый в работающих нитях для обработки запросов. Есть один слушающий тред к которому подключаются клиенты. Прослушиватель должен читать данные с сокета, затем при наличии полной команды, доступной клиенту, отправлять её для обработки в рабочую нить. (Идея была в создании нескольких прослушивателей, но в данное время это несколько проблемно.)

Предпосылки

Данное хау-ту не предназначено для Lisp экспертов, но и в то же время не предназначено и для абсолютных новичков.

Мы будем использовать две библиотеки, предоставляющие единое API для нитей и сокетов для многих реализаций CL: usocket и bordeaux-threads. Также, Я использую SBCL-подобные вещи (sb-queue и semaphores).

Я не буду описывать как устанавливать и загружать библиотеки (вы можете использовать asdf или cl-build).

Слушающая нить

Устанавливаем сокет примерно так:

 (setf master-socket
       (usocket:socket-listen host port
                              :reuse-address t
                              :element-type 'unsigned-byte))

После этого вызова, мы можем использовать wait-for-input на этом сокете для ожидания входящего соединения:

(usocket:wait-for-input master-socket)

Этот вызов вернёт ответ в том случае, когда будут доступны данные в данном сокете. Относительно серверного сокета, это означает "входящие соединения". Идём дальше, для создания клиентского сокета мы будем использовать socket-accept:

(setf new-client (usocket:socket-accept master-socket))

Следуя этому коду, сокет new-client будет использоваться для отправки данных или приёму данных с клиента. Сокеты - это объекты, которые включают в себя поток для использования socket-stream. Для отправки "Hello" клиенту, введите слеюущий код:

(format (usocket:socket-stream new-client) "Hello~%")

( хотя это не сработает для примера приведённого выше, потому что мы открыли двоичный, не символьный, поток через :element-type 'unsigned-byte выше; socket-accept может принимать в качестве аргумента :element-type, но если клиентский сокет отсутствует, то будет использован тип сокета сервера).

[ помните: мы можем вызвать socket-accept без предварительного вызова wait-for-input; socket-accept просто блокирует нить пока доступно входящее соединение. Если мы захотим получить единственную нить для прослушивания множественных сокетов, то нам точно понадобится wait-for-input ]

Ожидание данных на нескольких сокетах

Это всё хорошо, но как принимать данные от новых клиентов без блокировки приёма новых соединений сервером? Одним из выходов является выделение по одной нити для каждого клиента, но такое решение плохо масштабируется в случае нескольких сотен одинаковых соединений. По этой причине, wait-for-input может получать список сокетов и возвращать ответ в случае активации какого-либо сокета:

(setf sockets (list master-socket))
;; see below for the :ready-only description
(loop
  (loop :for s :in (wait-for-input sockets :ready-only t) :do
    (if (eq s master-socket)
        ;; THEN: new connection
        (let ((new (usocket:socket-accept s)))
          ;; add the new socket to the list of sockets that we're listening to
          (setf sockets (nconc sockets `(,new)))
          (handle-client-connect new))
        ;; ELSE: data from a connected client
        (handle-client-input s))))

Этот путь позволяет использовать единственную нить для обработки данных с нескольких клиентов и принимать новых клиентов. Аргумент :ready-only t для функции wait-for-input обязателен, без этого аргумента в противном случае wait-for-input вернёт оригинальный список (в данном случае сокеты) взамен единичного значения. Далее вызывающий должен пройти по всему списку и получить значение слота usocket::state для каждого сокета по его расположению в списке, вне зависимости есть данные в нём или нет. К сожалению, этот момент не освещён в документации usocket API и естественно поведение программы будет отличаться от ожидаемого при попытке считать данные из неготового сокета. Мне пришлось поковыряться в исходниках и там Я нашёл упоминание про аргумент ready-only, должен заметить, что при его использовании всё работает так, как Я хотел. Тред в списке рассылки посвящённый аргументу read-only.

Чтение данных с клиентов

Ещё одна вещь, которую мы должны учесть, это то, что данные не приходят сразу. Предположим, что у нас есть строчно-текстовый протокол и мы должны исполнять команду каждый раз, когда клиент посылает новую строку, у нас появится возможность реализовать это примерно так (read-line (socket-stream s)), но эта операция приведёт к блокировке до тех пор, пока пользователь не отправит новую строку. Таким образом, кроме медленных клиентов наш сервер сможет заблокировать любой пользователь просто подключившись к серверу и отправив символ отличный от символа новой строки.

Так функция handle-client-input читает все доступные байты из потока и сохраняет их в буфер; при следующем вызове для какого-либо сокета, эта функция будет добавлять данные к буферу до тех пор, пока не считает всю команду. В этот момент буфер очистится и команда будет выполнена (через посылку команды к работающей нити, так что мы не заблокируем прослушиватель).

Это вспомогательная функция для чтения данных из сокета. Принимает сокет и буфер (в виде регулируемого массива не нулевых байтов):

(defun collect-input (socket buffer &optional (end-char 0))
  (loop
     :with stream = (socket-stream socket)
     :with byte
     :while (listen stream)
     :doing
     (setq byte (read-byte stream))
     (when (= byte end-char)
       (return t))
     (vector-push-extend byte buffer)))

Она (функция) также принимает опциональный аргумент конец-строки, обозначающий байт, которым завершается команда. Байт конец-строки отбрасывается. В моём случае, команды завершаются байтом null (значение по умолчанию для конца-строки). Эта функция возвращает T при завершении чтения всей команды. Если она возвращает T, то это обозначает что в потоке есть ещё данные.

Для использования этой функции Вы должны ассоциировать массив буфера с каждым сокетом. Я объявил класс "client", который содержит сокет и буфер и имеет на сервере хэш таблицу "connections" представляющую карту соответствия сокетов и клиентских объектов. В классе клиента Я написал этот код:

(defun reset-buffer (client)
  (setf (fill-pointer (client-buffer client)) 0))
 
(defgeneric client-read (client socket)
  (:method ((client client) socket)
    (with-slots (buffer) client
      (when (collect-input socket buffer)
        (prog1
            (utf-8-bytes-to-string buffer)
          (reset-buffer client))))))

Когда считывается полная команда, метод client-read создаёт строку в кодировке UTF-8 (utf-8-bytes-to-string из trivial-utf-8). Так мы можем получить функцию handle-client-input, про которую Я упоминал ранее:

(defgeneric handle-client-input (server socket)
  (:method ((server server) socket)
    (with-slots (connections) server
      (let ((client (gethash socket connections)))
        (awhen (client-read client socket)
          (send-to-workers server (curry #'client-on-command client it)))))))

Немного описаний:

1. Есть класс сервера, который содержит хэш соединений (карта сокета -> клиентский объект).
2. awhen - это макрос из anaphora. Ведёт себя примерно также как и стандартный when, получает условие и блок кода, который выполняется в случае, когда условие не равно false; отличие в том, что в рамках кода, значение привязано к значению состояния. Это простое определение, и даже можно не загружать anaphora:
 (defmacro awhen (cond &body body)
   `(let ((it ,cond))
      (when it
        ,@body)))
3. curry - это вспомогательная функция для "частичных приложений" - получает функцию, число аргументов и возвращает новую функцию, которую можно вызвать с заданными аргументами:
(defun curry (fun &rest args1)
  (lambda (&rest args2)
    (apply fun (append args1 args2))))
4. send-to-workers получает функцию и ставит её в очередь запуска в некоторой рабочей нити.
5. Как Я и ожидал, возвращение collect-input'ом значения T, обозначает, что есть доступные данные в потоке; однако в этом случае немедленно будет возвращён wait-for-input, так что нет надобности проверять handle-client-input.
6. client-on-command - общая функция вызываемая в случае наличия команды. Она будет вызвана из рабочей нити, поэтому надо быть аккуратным в случае одновременного доступа к общин значениям.

Пул рабочих нитей

Рабочие нити должны выполнять команды от клиентов (поставленные в очередь функцией send-to-workers). Это совсем не тривиальная задача, которую можно правильно реализовать. Моей первой попыткой было использование переменных состояния, как это описано в документации к bordeaux-threads:

Переменные состояния - это специальный механизм для управления нитями. Этот механизм отправляет нити в спячку до достижения какого-либо состояния, после того как это состояние будет достигнуто, нити пробуждаются от сна другой нитью.
Переменные состояния должны использоваться совместно с блокировкой доступа к состоянию интересующего объекта. Процедура выглядит так:
Предположим, что у нас есть две нити A и B, и какой-нибудь канал событий C. A - это используемые события в C, и B - это возникающие события. CV - это значение-состояния
  1. A - это блокировка доступа к C
  2. Удаляем нити и все события доступные в C
  3. Когда C станет пустым, A вызывает CONDITION-WAIT, который автоматический освобождает блокировку и отправляет A в спячку до CV
  4. Ожидаем уведомления; CONDITION-WAIT будет заблокирован перед возвратом
  5. Переходим к шагу 2, до тех пор, пока нить работает
Когда B генерирует событие E, то выполняется следующее
  1. блокируем C
  2. добавляем E в канал
  3. вызываем CONDITION-NOTIFY на CV для пробуждения спящей нити
  4. убираем блокировку

Схема приведённая выше работает в случае, когда есть одна производящая нить (B) и одна потребительская нить (A), но что будет если мы будем использовать несколько потребителей? Поскольку A будет блокирована в определённый момент, это означает, что остальные нити также будут блокированы в этот момент. Все команды будут выполняться последовательно в единственной нити. Возможно, что переменные состояния сделаны только для управления двумя нитями.

В любом случае, Я решил эту проблему через использование семафоров и thread-safe очередью. Эта вещь реализована в SBCL, но Я не уверен в том, что она (семафоры и thread-safe очереди) есть в других CL реализациях.

Функция работающей нити

Рабочая нить исполняет следующий код:

(defgeneric worker-thread (server)
  (:method ((server server))
    (handler-case
        (with-slots (cmdqueue-sem cmdqueue) server
          (loop
             (loop :for event = (sb-queue:dequeue cmdqueue)
                :while event
                :do (funcall event))
             (sb-thread:wait-on-semaphore cmdqueue-sem)))
      (shutting-down ()
        ;; anything to do here?
        )
      (error (condition)
        (bt:with-lock-held ((server-workers-mutex server))
          (delete (bt:current-thread) (server-workers server)))
        (format t "~A"  condition)
        ;; XXX: should start another worker here, or we'll run out
        ))))

Этот код просто получает событие (точнее, функции) из общей очереди и исполняет их. Поскольку sb-queue находится в thread-safe, нам не нужно производить их явную блокировку. Если есть множественные события и рабочие нити находятся в awaken, то они будут принимать события одновременно при обработке нитью события.

Когда очередь будет пустой, она вызовет wait-on-semaphore, который в свою очередь отправит нить в спячку пока не появятся другие доступные события.

Добавление заданий для работающих нитей

Теперь мы можем написать функцию send-to-workers, про которую было упомянуто выше (функция, которая вызывается из слушающей нити):

(defgeneric send-to-workers (server event)
  (:method ((server server) event)
    (with-slots (cmdqueue-sem cmdqueue) server
      (sb-queue:enqueue event cmdqueue)
      (sb-thread:signal-semaphore cmdqueue-sem))))

Если работающие нити в данный момент не доступны, то в конечном итоге, очередь будет содержать несколько событий. Однако, проблемы "последнего пробуждения" не возникнет, поскольку какая-нибудь нить завершит исполнение (событие вызова функции (funcall)), цикл будет продолжен и события будут выходить из очереди. С другой стороны, семафоры не являются "бинарными" (включен или выключен), но они могут содержать целочисленное значение. signal-semaphore увеличивает значение на единицу, в то время как wait-for-semaphore уменьшает это значение на единицу (за исключением нуля, в этом случае нить уходит в спячку).

Выключение сервера

Функция рабочей нити обрабатывает состояние shutting-down (в данный момент это действие просто игнорируется). Для выключения работающих нитей, просто скажите:

(loop :repeat *number-of-workers* :doing
    (send-to-workers server (curry #'signal 'shutting-down)))
;; need to wait for them too
(loop :for th :in *worker-threads* :doing
    (bt:join-thread th))

Когда такое событие дойдёт до работающей нити, эта нить будет остановлена, таким образом, если мы отправим достаточное количество таких сообщений, то все работающие нити будут остановлены. Все события пришедшие после события выключения будут закрыты.

Для выключения слушающей нити, Я применил почти идеальное решение (Я не совсем уверен как правильно останавливать wait-for-input, кроме как послать ему :timeout):

(bt:interrupt-thread *listener-thread*
  #'(lambda () (signal 'shutting-down)))
(bt:join-thread *listener-thread*)

Этот код эффективно останавливает нить без оглядки на текущие события (возможно работает и в wait-for-input, но это уже другая вещь) и сигнализирует о shutting-down состоянии. Не безопасно прерывать нить в процессе выполняемой работы :-), но эта операция будет происходить при выключении сервера.

Дополнительно

  • Поскольку мы работаем с несколькими рабочими нитями и командами, который выполняются одновременно, то есть возможность отсылки ответа от клиента в другом порядке. Вы должны создать свой протокол с учётом этой особенности. Когда Вы отправляете запрос, Вы не должны быть уверены, что следующий ответ от сервера будет предназначен для Вас - во избежание такой проблемы включите ID запроса, который вернёт Вам сервер.
  • Каждый объект клиента должен блокировать мьютекс перед записью данных в сокет. Причина - данные должны быть отправлены с рабочих нитей, которые выполняются одновременно; без мьютекса другая сторона будет получать мусор.
  • Выше не было примера кода для обработки внезапного отсоединения клиента, но такой код легко добавить в код в разделе "Ожидание данных на нескольких сокетах". Мы должны вставить проверку на EOF для сокета клиента перед вызовом handle-client-input; если нет доступных данных, этот клиент будет разрушен. Мы можем сделать это с помощью listen. Это мой завершённый код для слушающей нити (название provider-thread выбрано по причине, поддержки команд в работающих нитях):
(defgeneric provider-thread (server master-socket)
  (:method ((server server) master-socket)
    (let ((sockets (list master-socket))
          (connlock (server-connections-mutex server)))
      (handler-case
          (loop
             (loop :for s :in (wait-for-input sockets :ready-only t) :doing
                (handler-case
                    (if (eq s master-socket)
                        ;; THEN: we have new connection
                        (progn
                          (bt:with-lock-held (connlock)
                            (unless (null (slot-value s 'usocket::state))
                              (let ((new (socket-accept s)))
                                (setf sockets (push new sockets))
                                (handle-client-connect server new)))))
                        ;; ELSE: client socket
                        (if (listen (socket-stream s))
                            ;; THEN: input available
                            (handle-client-input server s)
                            ;; ELSE: EOF, lost connection
                            (progn
                              (bt:with-lock-held (connlock)
                                (handle-client-disconnect server s))
                              (setf sockets (delete s sockets))
                              (socket-close s))))
                  (end-of-file ()
                    ;; not sure we ever get here
                    ))))
        (shutting-down ()
          ;; anything to do here?
          )))))

Этот код сыроват и его нужно немного пригладить. В любом случае, Я надеюсь, что эти примечания будут кому-нибудь полезны.

Оригинал здесь. Переводчик: Charlz_Klug.