Today's Question:  What weekend projects have you created?        GIVE A SHOUT

Technical Article => Programming =>  Ruby

A plugin to update last_error in Delayed Job

  sonic0002      2017-11-18 13:05:49      303    0

delayed_job is a process based asynchronous task processing gem which can be ran at background. It will fork the specified number of processes to execute the tasks asynchronously. The task status is usually stored in the database so that it can be easily integrated into a Rails application where asynchronous job execution is desired.

Normally when a job fails to execute or error occurs, it would save the error into the database with the column last_error. Ideally all these will be handled by the delayed_job gem itself, we no need to worry about anything. However, in some extreme cases, we would want to update the last_error. For example, the last_error contains some characters which the database cannot handle and that would kill the delayed_job process. How can we update last_error before saving it to database? In this post, we would provide a solution by using delayed job plugin.

When delayed job is started, it would fork a process in the background and polls the delayed_job database table and work off any available job. When a job is reserved, it would start to run the job and lock the corresponding job record in the database. The record will be deleted if a job is successfully performed. But if an error is occurring, the job would be requeued and worked on in a later time and its error would be updated in the database. The error would normally contain the reason why the job fails and the stacktrace if there is any and also the error from the low level command executed. Since the error would come from various sources, it would probably contain something whose encoding is not the same as that of the database and therefore some special characters would cause database error which in turn causes the delayed job to die.

The error would look like: when saving the last_error to database

default_worker.10 | ArgumentError: string contains null byte

To prevent the delayed job from dying, there would be a few solutions to resolve this, like updating encoding of the database which is risky though. Or is there a way we can process the error message before it is saved to the database? Probably, but the challenge is that delayed job is working by itself which doesn't need any other coding besides the normal initialization and configuration. Fortunately, delayed job allows plugins which can be invoked and affects the global behavior of delayed jobs while different actions are being performed.  These plugins can define callbacks and hooks which would be called when various events happen. 

The callbacks and hooks spread in the whole lifecycle of a job, this gives us the chance to inject something when it's trying to do something. There are a few events in the lifecycle of a job.  According to its source code, we could see below events:

  :enqueue    => [:job],
  :execute    => [:worker],
  :loop       => [:worker],
  :perform    => [:worker, :job],
  :error      => [:worker, :job],
  :failure    => [:worker, :job],
  :invoke_job => [:job]

Here the events interest us would be error or failure, but after digging into its source code, we could see below code:

def run(job)
  job_say job, 'RUNNING'
  runtime = Benchmark.realtime do
	Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) { job.invoke_job }
  job_say job, format('COMPLETED after %.4f', runtime)
  return true # did work
rescue DeserializationError => error
  job_say job, "FAILED permanently with #{}: #{error.message}", 'error'

  job.error = error
rescue Exception => error # rubocop:disable RescueException
  self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, error) }
  return false # work failed

# Reschedule the job in the future (when a job fails).
# Uses an exponential scale depending on the number of failed attempts.
def reschedule(job, time = nil)
  if (job.attempts += 1) < max_attempts(job)
	time ||= job.reschedule_at
	job.run_at = time
	job_say job, "REMOVED permanently because of #{job.attempts} consecutive failures", 'error'

def failed(job)
  self.class.lifecycle.run_callbacks(:failure, self, job) do
	rescue => error
	  say "Error when running failure callback: #{error}", 'error'
	  say error.backtrace.join("\n"), 'error'
	  job.destroy_failed_jobs? ? job.destroy :!

In run(), when the job failed, it would either call failed() or run the callback with event error. And the error callback is going to reschedule the job and finally calls failed() if maximum attempts exceed and still fails. Hence we should focus on the failure event, the default callback behavior for failure event is it would save the error into the database(!) if it's not configured to destroy the job when failing.  

So we will need to do something while running the failure callback. We have two options here, one is to write a job specific hook which will be called when job.hook(:failure) is called, the other one is a global option which is using delayed job plugin. Next, we will work on the global option. In the run_callbacks() method, we would luckily find it defines three hooks as well - before, around or after. This means there are three places we can do something around an event 

  • Before the event behavior is performed
  • Around the event behavior
  • After the event behavior is performed

See below the Callback code:

class Callback
	def initialize
	  @before = []
	  @after = []

	  # Identity proc. Avoids special cases when there is no existing around chain.
	  @around = lambda { |*args, &block|*args) }

	def execute(*args, &block)
	  @before.each { |c|*args) }
	  result =*args, &block)
	  @after.each { |c|*args) }

	def add(type, &callback)
	  case type
	  when :before
		@before << callback
	  when :after
		@after << callback
	  when :around
		chain = @around # use a local variable so that the current chain is closed over in the following lambda
		@around = lambda { |*a, &block|*a) { |*b|*b, &block) } }
		raise InvalidCallback, "Invalid callback type: #{type}"

 To update the error message, we would choose to do it when around hook is called. Since the job which contains the error message is available when the around hook is invoked and we would just update the error here and then proceed with the original behavior. It's just like intercepting something and update it and then proceed.

The plugin would look something like:

require 'delayed_job'

class ErrorDelayedJobPlugin < Delayed::Plugin
  def self.update_last_error(event, job)
      unless job.last_error.nil?
        job.last_error = job.last_error.gsub("\u0000", '')  # Replace null byte
        job.last_error = job.last_error.encode('UTF-8', invalid: :replace, undef: :replace, replace: '')
    rescue => e

  callbacks do |lifecycle|
    lifecycle.around(:failure) do |worker, job, *args, &block|
      update_last_error(:around_failure, job), job)

From above code, we can see there are four arguments in the do block. The job is the running job which contains the error, and the &block is a reference to the default block to be ran which is to save the data to the database. From the implementation, what it does is when the failure event callback is ran, it first updates the last_error by substituting the null byte with an empty string. Thereafter, it continues the normal behavior of saving the error into database.

To enable the plugin, register it in app/config/initializers/delayed_job.rb. Adding below into the delayed_job.rb.

require 'delayed_job/error_delayed_job_plugin'   # Path of the plugin file
Delayed::Worker.plugins << ErrorDelayedJobPlugin

In this way, we can manipulate what we want to store in the delayed_job table. This can be applied to update other columns or doing other things with delayed job plugins.


Share on Facebook  Share on Twitter  Share on Google+  Share on Weibo  Share on Reddit  Share on Digg  Share on Tumblr 



No comment for this article.


Git commit in case of fire

By sonic0002