class AsyncIO::Base

Attributes

logger[RW]
queue[R]

Default: Number of threads to be spanwed is 5

NOTE:

Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.

threads[R]

Default: Number of threads to be spanwed is 5

NOTE:

Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.

Public Class Methods

new(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new }) click to toggle source
# File lib/async_io/base.rb, line 22
def initialize(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new })
  @logger   = args[:logger]
  @queue    = args[:queue]
  @threads  = []
  n_threads.times { @threads << Thread.new { consumer } }
end

Public Instance Methods

async(&payload) click to toggle source

Perform any sort of task that needs to be asynchronously done. NOTE: It does not return anything, as it receives and empty task. ( i.e empty block of code )

# File lib/async_io/base.rb, line 86
def async(&payload)
  worker(payload, proc {})
end
async_with(task) click to toggle source
# File lib/async_io/base.rb, line 90
def async_with(task)
  worker(proc {}, task)
end
clear_interval!() click to toggle source
# File lib/async_io/base.rb, line 112
def clear_interval!
  @interval.terminate
  @interval = nil
end
interval(seconds) { || ... } click to toggle source

TODO: Allow multiple intervals to run on the same thread by storing them in a list, and calling them later on.

# File lib/async_io/base.rb, line 99
def interval(seconds)
  new_interval? do
    while true
      rescuer { yield }
      sleep(seconds)
    end
  end
end
new_interval?() { || ... } click to toggle source
# File lib/async_io/base.rb, line 108
def new_interval?
  @interval ||= Thread.new { yield }
end
worker(payload, task) click to toggle source

It creates a new Worker, pushes it onto the queue, whenever a ‘task’ (i.e a Ruby object ) is finished it calls the payload and passes the result of that task to it.

For example:

def aget_user(uid, &payload)

worker(payload) do
  User.find(ui)
end

end

It returns the worker created for this particular task which you could send message done to it in order to retrieve its completed task. see async_io/worker.rb

For example: result = aget_user(1) { |u| Logger.info(u.name) }

# task may take a while to be done…

user = result.done user.name

> “John”

NOTE: Whenever you use the snippet above, if the task has not been finished yet you will get false whenever you send a message task to it. Once task is finished you will be able to get its result.

# File lib/async_io/base.rb, line 76
def worker(payload, task)
  Worker.new(payload, task).tap { |w| queue.push(w) }
end

Private Instance Methods

consumer() click to toggle source

Ruby Queue#pop sets non_block to false. It waits until data is pushed on to the queue and then process it.

# File lib/async_io/base.rb, line 34
def consumer
  rescuer do
    while worker = queue.pop
      worker.call
    end
  end
end