rails
February 10

Обещаю (promise) быть быстрым!  

Пора. Пора сделать небольшой цикл про производительность Ruby. Начнём с удобного и распространённого паттерна Promise/Future.

У нас на проекте основной стек разработки это Ruby (с небольшими включениями Rails), а нагрузка вполне тянет на так любимый всеми хайлоад. И за последний год у нас накопился большой опыт в решении задач, связанных с оптимизацией производительности и потребления ресурсов. К слову, используемые ресурсы, несмотря на стек и нагрузку, мы поддерживаем на очень скромном уровне.

Итак, Promise. Эта конструкция хорошо нам знакома из-за своей распространённости в мире javascript, однако сам термин (точнее семейство Promise/Future/Delay) был предложен давно, еще в 1976-1977 годах. Его идея состоит в том, чтобы иметь объект, представляющий собой значение, которого еще нет, но оно появится в будущем. Нас же в данной статье интересует возможность непосредственного ожидания результата - то есть блокировка и конечно её производительность.

Зачем?

Как я писал выше, мы плотно используем Ruby, а в нём потоки (threads) и нити (fibers) для обеспечения параллельности обработки событий из Kafka, RabbitMQ и прочих транспортов. В коде это обычно представлено в виде пулов потоков/файберов, что влечет за собой асинхронность обработки. А асинхронность часто требует блокировок для получения результатов. Тут мы и используем объекты, реализующие интерфейс, соответствующий паттерну Promise. Пример:

# допустим обычный синхронный HTTP-обработчик
def process_blocking_http_request(request)
  # Очень важное сообщение - нелья потерять
  vip_message = process(request)
  # неважное сообщение (например статус) - можно и потерять
  status_message = get_status(vip_message)

  vip_promise = RabbitMQConnectionPool.send_message(vip_message)

  _status_promise = RabbitMQConnectionPool.send_message(status_message)
  _kafka_promise = KafkaConnectionPool.send_message(status_promise)

  # Ждём положительного Ack от транспорта (или кидаем исключение),
  # потому что нужна строгая гарантия
  vip_promise.value!

  # публикацию остальных сообщений не ждём
  render_ok(request)
end

Из всех возможностей паттерна Future/Promise нас в данном случае интересует только блокировка - насколько она выполняется эффективно и трудозатратна.

Что будем мерить?

Мерить будем производительность различных способов реализовать блокировку (она же wait, она же value):

  • Promises::ConcurrentRuby - решение из распространённой библиотеки concurrent_ruby (не важно что именно там под капотом);
  • Promises::ConditionVariable - блокировка с использованием данного примитива синхронизации;
  • Promises::Queue - блокировка с использованием Queue;
  • Promises::Socket - ожидание на паре сокетов IO.pipe;
  • Promises::Thread - использование промиса из библиотеки ruby threads;

Цель данного бенчмарка заключается в том, чтобы посмотреть производительность разных типов блокировок, причём реальную, которая включает в себя накладные расходы на создание и инициализацию всех объектов/ресурсов при создании объекта Promise - вдруг Queue работает быстро, а создаётся медленно???

Тест кейс

Сам тест представляет собой запуск 10 потоков обработчиков, которые в цикле создают объект Promise, вызывают блокировку и считают свои полные циклы. Также есть один поток, который резолвит (resolve/fulfill) созданные объекты как не в себя, установленное число раз (в данном тесте 900 тысяч). Измеряем мы общее время, за которое будет выполнено 900 тысяч холостых задач.

class Worker

  def initialize(manager, num)
    @manager = manager
    @result = 0

    @thread = Thread.new(@manager) do |m|
      while running?
        promise = m.do_job_with_promise()
        promise.wait

        break unless running?
      end
    end
  end

  def running?
    !@shutdown
  end

  def shutdown!
    @shutdown = true
  end
end

Ну всё, хватит, пояснения в конце - давай цифры!

Ruby 2.7

Ruby 3.3.0

Ruby 3.3.0 🚀 YJIT

Духота и подробности

Конечно, при тестировании используется отключение GC, также в режиме, где включен современно-молодёжный YJIT, используется полный цикл прогрева, да и цифра 900 тысяч выбрана для того, чтоб YJIT успел раздуплиться и пооптимизировать всё, что ему надо. Данные тесты проводились на быстрой 12-ти ядерной машине с максимальным приоритетом (nice) на всякий случай. Никаких упираний не было. Теперь немного подробнее про каждый тип блокировки.

Promises::Queue

При работе каждый раз создаётся объект Queue, блокировка осуществляется с помощью вызова queue.pop, разблокировка с помощью queue.push.

class Promises::Queue
  def initialize(*args)
    @queue = ::Queue.new
  end

  def wait
    @queue.pop
  end

  def fulfill(value)
    @queue.push(value)
  end
end

Promises::ConditionVariable

При работе каждый раз создаётся новый объект ConditionVariable и Mutex, блокировка осуществляется с помощью вызова cv.wait(mutex), разблокировка с помощью cv.signal.

class Promises::ConditionVariable
  def initialize(*args)
    @cv = ::ConditionVariable.new
    @mutex = Mutex.new
  end

  def wait
    @mutex.synchronize do
      @value = nil
      @cv.wait(@mutex) while @value.nil?
      @value
    end
  end

  def fulfill(value)
    @mutex.synchronize do
      @value = value
    end
    @cv.signal
  end
end

Promises::Socket

При работе каждый раз создаётся новый объект (пара) IO.pipe, блокировка осуществляется с помощью вызова read_io.gets, разблокировка с помощью write_io.puts(value). Учитывая накладные расходы на создание объектов, а также IO::close, без которого мгновенно наступает исчерпание дескрипторов (смотри бонус в конце).

class Promises::Socket
  def initialize(*args)
    @read_io, @write_io = IO.pipe
  end

  def wait
    @read_io.gets rescue nil
    @value
  end

  def fulfill(value)
    @value = value
    @write_io.puts(value) rescue nil
  end

  def close!
    @write_io.close rescue nil
    @read_io.close rescue nil
  end
end

Promises::Thread

При работе каждый раз создаётся новый объект Thread.promise, блокировка осуществляется с помощью вызова promise.value, разблокировка с помощью promise.deliver(value).

class Promises::Thread
  def initialize(*args)
    @promise = ::Thread.promise
  end

  def wait
    @promise.value
  end

  def fulfill(value)
    @promise.deliver(value)
  end
end

Concurrent::Promises

При работе каждый раз создаётся новый объект Concurrent::Promises::Future, блокировка осуществляется с помощью вызова promise.value, разблокировка с помощью promise.fulfill(value).

class ConcurrentRuby < Base
  def initialize(*args)
    super
    @promise = Concurrent::Promises.resolvable_future
  end

  def wait
    @promise.value
  end

  def fulfill(value)
    @promise.fulfill(value)
  end
end

Результаты и выводы

Несмотря на то, что Condition Variable как примитив синхронизации создавался именно для такого кейса, использовать его реально сложно - есть нюансы (например, он МОЖЕТ просыпаться сам без воздействия) и неочевидности. Сокеты надо закрывать, а чужие решения обычно имеют более высокий уровень обобщения/удобства, но проигрывают грубой силе :) Репозиторий с бенчмарками: RND-SOFT/benchmarks

В общем, если вам надо где-то что-то ждать, то используйте Queue - это самый быстрый, простой и очень оптимизированный в потрохах инструмент.

Бонус

Выше мы протестировали вариант, когда всё по-настоящему - Promise, и все его потроха создаются каждый раз заново (заново IO.pair и close к нему, заново Mutex и ConditionVariabel и пр.):

def do_job_with_promise(prepared)
  @promise&.close! #это только для Socket

  @promise ||= @klass.new

  @queue << @promise
  @promise
end

Но на закуску надо чего-то сладенького - теоретическую производительность, когда внутренние ресурсы НЕ создаются, а используются повторно. То есть в данном случае за весь тест будет создаваться всего 10 Queue, 10 пар сокетов и 10 Mutex+CV. Concurrent и Thread не умеют “сбрасываться”, поэтому их результаты не меняются:

def do_job_with_promise(prepared)
  @promise ||= @klass.new(prepared)
  @promise.reset
  
  @queue << @promise
  @promise
end

Результат сильно поменялся (оно и понятно):

Ссылочки для чтения на ночь: