class Orchestrator::Master

Attributes

thread[R]

Public Class Methods

new(thread) click to toggle source
# File lib/orchestrator/remote/master.rb, line 8
def initialize(thread)
    @thread = thread

    @accept_connection = method :accept_connection
    @new_connection =    method :new_connection
    @bind_error =        method :bind_error

    @shutdown = true
    @edge_nodes = ::ThreadSafe::Cache.new # id => connection
    @requests   = {} # req_id => defer
    @req_map    = {} # connection => ::Set.new (req_id)

    @signal_bind   = @thread.async method(:bind_actual)
    @signal_unbind = @thread.async method(:unbind_actual)

    @request_id = 0
end

Public Instance Methods

bind() click to toggle source
# File lib/orchestrator/remote/master.rb, line 81
def bind
    @signal_bind.call
end
online?(id) click to toggle source
# File lib/orchestrator/remote/master.rb, line 72
def online?(id)
    edge = @edge_nodes[id]
    edge && edge.connected ? edge : false
end
request(edge_id, details) click to toggle source

ping pong exec bind unbind notify status success failure

# File lib/orchestrator/remote/master.rb, line 41
def request(edge_id, details)
    defer = @thread.defer

    # Lookup node
    connection = online? id
    if connection
        @thread.schedule do
            if connection.connected
                @request_id += 1
                @requests[@request_id] = defer
                @req_map[connection] ||= ::Set.new
                @req_map[connection] << @request_id
                
                # Send the request
                connection.write(::JSON.fast_generate({
                    id: @request_id,

                })).catch do |reason|
                    on_failure(defer, edge_id, details)
                end
            else
                on_failure(defer, edge_id, details)
            end
        end
    else
        on_failure(defer, edge_id, details)
    end

    defer.promise
end
unbind() click to toggle source
# File lib/orchestrator/remote/master.rb, line 77
def unbind
    @signal_unbind.call
end

Protected Instance Methods

accept_connection(client) click to toggle source

Once the connection is accepted we disable Nagles Algorithm This improves performance as we are using vectored or scatter/gather IO Then the spider delegates to the gazelle loops

# File lib/orchestrator/remote/master.rb, line 129
def accept_connection(client)
    client.enable_nodelay
    # TODO:: auth client and then signal the interested parties
end
bind_actual(*args) click to toggle source
# File lib/orchestrator/remote/master.rb, line 106
def bind_actual(*args)
    return unless @shutdown
    @shutdown = false

    # Bind the socket
    @tcp = @thread.tcp
    @tcp.bind '0.0.0.0', 17838, @new_connection
    @tcp.listen 100 # smallish backlog is all we need

    # Delegate errors
    @tcp.catch @bind_error
    @tcp
end
bind_error(err) click to toggle source

Called when binding is closed due to an error

# File lib/orchestrator/remote/master.rb, line 135
def bind_error(err)
    return if @shutdown

    # TODO:: log the error

    # Attempt to recover!
    @thread.scheduler.in(1000) do
        bind
    end
end
new_connection(server) click to toggle source

There is a new connection pending. We accept it

# File lib/orchestrator/remote/master.rb, line 122
def new_connection(server)
    server.accept @accept_connection
end
on_failure(defer, edge_id, details) click to toggle source
# File lib/orchestrator/remote/master.rb, line 89
def on_failure(defer, edge_id, details)
    # Failed...
    # Are we loading this device locally or remotely?
    # Do we wait a small amount of time before trying again?
    # When should we fail the request?
end
process_request(defer, node, request) click to toggle source
# File lib/orchestrator/remote/master.rb, line 146
def process_request(defer, node, request)

end
unbind_actual(*args) click to toggle source

These are async methods.. They could be called more than once

# File lib/orchestrator/remote/master.rb, line 98
def unbind_actual(*args)
    return if @shutdown
    @shutdown = true

    @tcp.close unless @tcp.nil?
    @tcp = nil
end