class MerrittZK::QueueItem
Base class for Merritt Queue Items that follow similar conventions
Attributes
data[R]
id[R]
status[R]
Public Class Methods
create_id(zk, prefix)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 109 def self.create_id(zk, prefix) path = zk.create(prefix, data: nil, mode: :persistent_sequential, ignore: :no_node) path.split('/')[-1] end
new(id, data: nil)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 28 def initialize(id, data: nil) @id = id @data = data @status = nil end
serialize(v)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 100 def self.serialize(v) return nil if v.nil? return v.to_s if v.is_a?(Integer) return v unless v.is_a?(Hash) return nil if v.empty? v.to_json end
Public Instance Methods
data_prop(_prop, defval)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 143 def data_prop(_prop, defval) return defval if @data.nil? @data.fetch(defval, '') end
int_property(zk, key)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 78 def int_property(zk, key) string_property(zk, key).to_i end
json_property(zk, key)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 69 def json_property(zk, key) s = string_property(zk, key) begin JSON.parse(s, symbolize_names: true) rescue StandardError raise MerrittZKNodeInvalid, "Node Object for (#{p}) does not contain valid json: #{s}" end end
load(zk)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 44 def load(zk) raise MerrittZKNodeInvalid, "Missing Node #{path}" unless zk.exists?(path) load_status(zk, json_property(zk, ZkKeys::STATUS)) load_properties(zk) self end
load_properties(zk)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 57 def load_properties(zk) # Job will override end
load_status(_zk, js)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 52 def load_status(_zk, js) s = js.fetch(:status, 'na').to_sym @status = states.fetch(s, nil) end
lock(zk)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 135 def lock(zk) zk.create("#{path}/#{ZkKeys::LOCK}", data: nil, mode: :ephemeral) end
path()
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 92 def path 'na' end
set_data(zk, key, data)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 82 def set_data(zk, key, data) p = "#{path}/#{key}" d = QueueItem.serialize(data) if zk.exists?(p) zk.set(p, d) else zk.create(p, data: d) end end
set_status(zk, status, message = '')
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 121 def set_status(zk, status, message = '') return if status == @status json = status_object(status) json[:message] = message data = QueueItem.serialize(json) if @status.nil? zk.create(status_path, data) else zk.set(status_path, data) end @status = status end
states()
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 40 def states {} end
status_name()
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 36 def status_name status.name end
status_object(status)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 114 def status_object(status) { status: status.name, last_modified: Time.now.to_s } end
status_path()
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 96 def status_path "#{path}/status" end
string_property(zk, key)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 61 def string_property(zk, key) p = "#{path}/#{key}" arr = zk.get(p) raise MerrittZKNodeInvalid, "Node Object for (#{p}) missing" if arr.nil? arr[0] end
unlock(zk)
click to toggle source
# File lib/merritt_zk_queue_item.rb, line 139 def unlock(zk) zk.delete("#{path}/#{ZkKeys::LOCK}") end