Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 64 additions & 22 deletions lib/puma/plugin/solid_queue.rb
Original file line number Diff line number Diff line change
@@ -1,41 +1,83 @@
require "puma/plugin"

module Puma
class DSL
def solid_queue_mode(mode = :fork)
@options[:solid_queue_mode] = mode.to_sym
end
end
end

Puma::Plugin.create do
attr_reader :puma_pid, :solid_queue_pid, :log_writer, :solid_queue_supervisor

def start(launcher)
@log_writer = launcher.log_writer
@puma_pid = $$

in_background do
monitor_solid_queue
if launcher.options[:solid_queue_mode] == :async
start_async(launcher)
else
start_forked(launcher)
end
end

if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
end
private
def start_forked(launcher)
in_background do
monitor_solid_queue
end

launcher.events.on_stopped { stop_solid_queue }
launcher.events.on_restart { stop_solid_queue }
else
launcher.events.after_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start(mode: :fork)
end
end

launcher.events.on_stopped { stop_solid_queue_fork }
launcher.events.on_restart { stop_solid_queue_fork }
else
launcher.events.after_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start(mode: :fork)
end
end

launcher.events.after_stopped { stop_solid_queue_fork }
launcher.events.before_restart { stop_solid_queue_fork }
end
end

def start_async(launcher)
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
@solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false)
end

launcher.events.on_stopped { @solid_queue_supervisor&.stop }

launcher.events.on_restart do
solid_queue_supervisor&.stop
@solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false)
end
else
launcher.events.after_booted do
@solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false)
end

launcher.events.after_stopped { @solid_queue_supervisor&.stop }

launcher.events.after_stopped { stop_solid_queue }
launcher.events.before_restart { stop_solid_queue }
launcher.events.before_restart do
solid_queue_supervisor&.stop
@solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false)
end
end
end
end

private
def stop_solid_queue
def stop_solid_queue_fork
Process.waitpid(solid_queue_pid, Process::WNOHANG)
log "Stopping Solid Queue..."
Process.kill(:INT, solid_queue_pid) if solid_queue_pid
Expand All @@ -48,7 +90,7 @@ def monitor_puma
end

def monitor_solid_queue
monitor(:solid_queue_dead?, "Detected Solid Queue has gone away, stopping Puma...")
monitor(:solid_queue_fork_dead?, "Detected Solid Queue has gone away, stopping Puma...")
end

def monitor(process_dead, message)
Expand All @@ -62,7 +104,7 @@ def monitor(process_dead, message)
end
end

def solid_queue_dead?
def solid_queue_fork_dead?
if solid_queue_started?
Process.waitpid(solid_queue_pid, Process::WNOHANG)
end
Expand Down
37 changes: 37 additions & 0 deletions lib/solid_queue/async_supervisor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

module SolidQueue
class AsyncSupervisor < Supervisor
private

def check_and_replace_terminated_processes
terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? }
terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) }
end

def replace_thread(thread_id, instance)
SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload|
payload[:thread] = instance

error = Processes::ThreadTerminatedError.new(terminated_instance.name)
release_claimed_jobs_by(terminated_instance, with_error: error)

start_process(configured_processes.delete(thread_id))
end
end

def perform_graceful_termination
process_instances.values.each(&:stop)

Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? })
end

def perform_immediate_termination
exit!
end

def all_processes_terminated?
process_instances.values.none?(&:alive?)
end
end
end
2 changes: 2 additions & 0 deletions lib/solid_queue/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class Cli < Thor
desc: "Path to config file (default: #{Configuration::DEFAULT_CONFIG_FILE_PATH}).",
banner: "SOLID_QUEUE_CONFIG"

class_option :mode, type: :string, default: "fork", enum: %w[ fork async ], desc: "Whether to fork processes for workers and dispatchers (fork) or to run these in the same process as the supervisor (async)"

class_option :recurring_schedule_file, type: :string,
desc: "Path to recurring schedule definition (default: #{Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH}).",
banner: "SOLID_QUEUE_RECURRING_SCHEDULE"
Expand Down
17 changes: 16 additions & 1 deletion lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ def error_messages
end
end

def mode
@options[:mode].to_s.inquiry
end

def standalone?
mode.fork? || @options[:standalone]
end

private
attr_reader :options

Expand Down Expand Up @@ -84,6 +92,8 @@ def ensure_correctly_sized_thread_pool

def default_options
{
mode: :fork,
standalone: true,
config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH),
recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH),
only_work: false,
Expand All @@ -110,7 +120,12 @@ def skip_recurring_tasks?

def workers
workers_options.flat_map do |worker_options|
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
processes = if mode.fork?
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
else
1
end

processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
end
end
Expand Down
68 changes: 68 additions & 0 deletions lib/solid_queue/fork_supervisor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# frozen_string_literal: true

module SolidQueue
class ForkSupervisor < Supervisor
private

def perform_graceful_termination
term_forks

Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? }) do
reap_terminated_forks
end
end

def perform_immediate_termination
quit_forks
end

def term_forks
signal_processes(process_instances.keys, :TERM)
end

def quit_forks
signal_processes(process_instances.keys, :QUIT)
end

def check_and_replace_terminated_processes
loop do
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
break unless pid

replace_fork(pid, status)
end
end

def reap_terminated_forks
loop do
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
break unless pid

if (terminated_fork = process_instances.delete(pid)) && !status.exited? || status.exitstatus > 0
error = Processes::ProcessExitError.new(status)
release_claimed_jobs_by(terminated_fork, with_error: error)
end

configured_processes.delete(pid)
end
rescue SystemCallError
# All children already reaped
end

def replace_fork(pid, status)
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
if terminated_fork = process_instances.delete(pid)
payload[:fork] = terminated_fork
error = Processes::ProcessExitError.new(status)
release_claimed_jobs_by(terminated_fork, with_error: error)

start_process(configured_processes.delete(pid))
end
end
end

def all_processes_terminated?
process_instances.empty?
end
end
end
32 changes: 25 additions & 7 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,26 @@ module Runnable
attr_writer :mode

def start
boot

if running_async?
@thread = create_thread { run }
else
run_in_mode do
boot
run
end
end

def stop
super

wake_up
@thread&.join

# When not supervised, block until the thread terminates for backward
# compatibility with code that expects stop to be synchronous.
# When supervised, the supervisor controls the shutdown timeout.
unless supervised?
@thread&.join
end
end

def alive?
!running_async? || @thread&.alive?
end

private
Expand All @@ -30,6 +36,18 @@ def mode
(@mode || DEFAULT_MODE).to_s.inquiry
end

def run_in_mode(&block)
case
when running_as_fork?
fork(&block)
when running_async?
@thread = create_thread(&block)
@thread.object_id
else
block.call
end
end

def boot
SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) do
Expand Down
11 changes: 11 additions & 0 deletions lib/solid_queue/processes/thread_terminated_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module SolidQueue
module Processes
class ThreadTerminatedError < RuntimeError
def initialize(name)
super("Thread #{name} terminated unexpectedly")
end
end
end
end
Loading
Loading