Обещаю (promise) быть быстрым!
Пора. Пора сделать небольшой цикл про производительность Ruby. Начнём с удобного и распространённого паттерна Promise/Future.
У нас на проекте основной стек разработки это Ruby (с небольшими включениями Rails), а нагрузка вполне тянет на так любимый всеми хайлоад. И за последний год у нас накопился большой опыт в решении задач, связанных с оптимизацией производительности и потребления ресурсов. К слову, используемые ресурсы, несмотря на стек и нагрузку, мы поддерживаем на очень скромном уровне.
Итак, Promise. Эта конструкция хорошо нам знакома из-за своей распространённости в мире javascript, однако сам термин (точнее семейство Promise/Future/Delay) был предложен давно, еще в 1976-1977 годах. Его идея состоит в том, чтобы иметь объект, представляющий собой значение, которого еще нет, но оно появится в будущем. Нас же в данной статье интересует возможность непосредственного ожидания результата - то есть блокировка и конечно её производительность.
Зачем?
Как я писал выше, мы плотно используем Ruby, а в нём потоки (threads) и нити (fibers) для обеспечения параллельности обработки событий из Kafka
, RabbitMQ
и прочих транспортов. В коде это обычно представлено в виде пулов потоков/файберов, что влечет за собой асинхронность обработки. А асинхронность часто требует блокировок для получения результатов. Тут мы и используем объекты, реализующие интерфейс, соответствующий паттерну Promise. Пример:
# допустим обычный синхронный HTTP-обработчик def process_blocking_http_request(request) # Очень важное сообщение - нелья потерять vip_message = process(request) # неважное сообщение (например статус) - можно и потерять status_message = get_status(vip_message) vip_promise = RabbitMQConnectionPool.send_message(vip_message) _status_promise = RabbitMQConnectionPool.send_message(status_message) _kafka_promise = KafkaConnectionPool.send_message(status_promise) # Ждём положительного Ack от транспорта (или кидаем исключение), # потому что нужна строгая гарантия vip_promise.value! # публикацию остальных сообщений не ждём render_ok(request) end
Из всех возможностей паттерна Future/Promise нас в данном случае интересует только блокировка - насколько она выполняется эффективно и трудозатратна.
Что будем мерить?
Мерить будем производительность различных способов реализовать блокировку (она же wait
, она же value
):
- Promises::ConcurrentRuby - решение из распространённой библиотеки concurrent_ruby (не важно что именно там под капотом);
- Promises::ConditionVariable - блокировка с использованием данного примитива синхронизации;
- Promises::Queue - блокировка с использованием Queue;
- Promises::Socket - ожидание на паре сокетов IO.pipe;
- Promises::Thread - использование промиса из библиотеки ruby threads;
Цель данного бенчмарка заключается в том, чтобы посмотреть производительность разных типов блокировок, причём реальную, которая включает в себя накладные расходы на создание и инициализацию всех объектов/ресурсов при создании объекта Promise - вдруг Queue работает быстро, а создаётся медленно???
Тест кейс
Сам тест представляет собой запуск 10 потоков обработчиков, которые в цикле создают объект Promise, вызывают блокировку и считают свои полные циклы. Также есть один поток, который резолвит (resolve/fulfill) созданные объекты как не в себя, установленное число раз (в данном тесте 900 тысяч). Измеряем мы общее время, за которое будет выполнено 900 тысяч холостых задач.
class Worker def initialize(manager, num) @manager = manager @result = 0 @thread = Thread.new(@manager) do |m| while running? promise = m.do_job_with_promise() promise.wait break unless running? end end end def running? !@shutdown end def shutdown! @shutdown = true end end
Ну всё, хватит, пояснения в конце - давай цифры!
Ruby 2.7
Ruby 3.3.0
Ruby 3.3.0 🚀 YJIT
Духота и подробности
Конечно, при тестировании используется отключение GC, также в режиме, где включен современно-молодёжный YJIT, используется полный цикл прогрева, да и цифра 900 тысяч выбрана для того, чтоб YJIT успел раздуплиться и пооптимизировать всё, что ему надо. Данные тесты проводились на быстрой 12-ти ядерной машине с максимальным приоритетом (nice) на всякий случай. Никаких упираний не было. Теперь немного подробнее про каждый тип блокировки.
Promises::Queue
При работе каждый раз создаётся объект Queue, блокировка осуществляется с помощью вызова queue.pop
, разблокировка с помощью queue.push
.
class Promises::Queue def initialize(*args) @queue = ::Queue.new end def wait @queue.pop end def fulfill(value) @queue.push(value) end end
Promises::ConditionVariable
При работе каждый раз создаётся новый объект ConditionVariable и Mutex, блокировка осуществляется с помощью вызова cv.wait(mutex)
, разблокировка с помощью cv.signal
.
class Promises::ConditionVariable def initialize(*args) @cv = ::ConditionVariable.new @mutex = Mutex.new end def wait @mutex.synchronize do @value = nil @cv.wait(@mutex) while @value.nil? @value end end def fulfill(value) @mutex.synchronize do @value = value end @cv.signal end end
Promises::Socket
При работе каждый раз создаётся новый объект (пара) IO.pipe, блокировка осуществляется с помощью вызова read_io.gets
, разблокировка с помощью write_io.puts(value)
. Учитывая накладные расходы на создание объектов, а также IO::close, без которого мгновенно наступает исчерпание дескрипторов (смотри бонус в конце).
class Promises::Socket def initialize(*args) @read_io, @write_io = IO.pipe end def wait @read_io.gets rescue nil @value end def fulfill(value) @value = value @write_io.puts(value) rescue nil end def close! @write_io.close rescue nil @read_io.close rescue nil end end
Promises::Thread
При работе каждый раз создаётся новый объект Thread.promise, блокировка осуществляется с помощью вызова promise.value
, разблокировка с помощью promise.deliver(value)
.
class Promises::Thread def initialize(*args) @promise = ::Thread.promise end def wait @promise.value end def fulfill(value) @promise.deliver(value) end end
Concurrent::Promises
При работе каждый раз создаётся новый объект Concurrent::Promises::Future, блокировка осуществляется с помощью вызова promise.value
, разблокировка с помощью promise.fulfill(value)
.
class ConcurrentRuby < Base def initialize(*args) super @promise = Concurrent::Promises.resolvable_future end def wait @promise.value end def fulfill(value) @promise.fulfill(value) end end
Результаты и выводы
Несмотря на то, что Condition Variable как примитив синхронизации создавался именно для такого кейса, использовать его реально сложно - есть нюансы (например, он МОЖЕТ просыпаться сам без воздействия) и неочевидности. Сокеты надо закрывать, а чужие решения обычно имеют более высокий уровень обобщения/удобства, но проигрывают грубой силе :) Репозиторий с бенчмарками: RND-SOFT/benchmarks
В общем, если вам надо где-то что-то ждать, то используйте Queue - это самый быстрый, простой и очень оптимизированный в потрохах инструмент.
Бонус
Выше мы протестировали вариант, когда всё по-настоящему - Promise, и все его потроха создаются каждый раз заново (заново IO.pair
и close
к нему, заново Mutex
и ConditionVariabel
и пр.):
def do_job_with_promise(prepared) @promise&.close! #это только для Socket @promise ||= @klass.new @queue << @promise @promise end
Но на закуску надо чего-то сладенького - теоретическую производительность, когда внутренние ресурсы НЕ создаются, а используются повторно. То есть в данном случае за весь тест будет создаваться всего 10 Queue, 10 пар сокетов и 10 Mutex+CV. Concurrent и Thread не умеют “сбрасываться”, поэтому их результаты не меняются:
def do_job_with_promise(prepared) @promise ||= @klass.new(prepared) @promise.reset @queue << @promise @promise end
Результат сильно поменялся (оно и понятно):
- https://github.com/RND-SOFT/benchmarks - код бенчмарка
- Condition Vaiable - не все знают, что это такое :)
- Process Synchronization in Linux Kernel - хорошая обзорная статья про ядро Linux
- 🤩 Ну и топчик - "этот ваш spinlock в userspace полный шлак!" 🤩 - наброс на вентилятор от самого Торвальдса по поводу статьи от геймдизайнера