class MerrittZK::Access
Merritt Object Assembly Queue Item
Constants
- DIR
- LARGE
- PREFIX
- SMALL
Public Class Methods
acquire_pending_assembly(zk, queue_name)
click to toggle source
# File lib/merritt_zk_access.rb, line 49 def self.acquire_pending_assembly(zk, queue_name) zk.children(Access.dir(queue_name)).sort.each do |cp| a = Access.new(queue_name, cp) a.load(zk) next unless a.status == AccessState::Pending begin return a if a.lock(zk) rescue ZK::Exceptions::NodeExists # no action end end nil end
create_assembly(zk, queue_name, token)
click to toggle source
# File lib/merritt_zk_access.rb, line 41 def self.create_assembly(zk, queue_name, token) id = QueueItem.create_id(zk, prefix_path(queue_name)) access = Access.new(queue_name, id, data: token) access.set_data(zk, ZkKeys::TOKEN, token) access.set_status(zk, AccessState.init) access end
dir(queue_name)
click to toggle source
# File lib/merritt_zk_access.rb, line 29 def self.dir(queue_name) "#{DIR}/#{queue_name}" end
list_jobs_as_json(zk)
click to toggle source
List jobs as a json object that will be consumed by the admin tool. This is a transitional representation that can be compatible with legacy job listings.
# File lib/merritt_zk_access.rb, line 76 def self.list_jobs_as_json(zk) jobs = [] [SMALL, LARGE].each do |queue| zk.children("#{DIR}/#{queue}").sort.each do |cp| job = Access.new(queue, cp) job.load(zk) jobjson = job.data jobjson[:id] = cp jobjson[:queueNode] = Access.dir(queue) jobjson[:path] = job.path jobjson[:queueStatus] = jobjson[:status] jobjson[:status] = job.status.status jobjson[:date] = job.json_property(zk, ZkKeys::STATUS).fetch(:last_modified, '') jobs.append(jobjson) rescue StandardError => e puts "List Access #{cp} exception: #{e}" end end jobs end
new(queue_name, id, data: nil)
click to toggle source
Calls superclass method
# File lib/merritt_zk_access.rb, line 16 def initialize(queue_name, id, data: nil) super(id, data: data) @queue_name = queue_name end
prefix_path(queue_name)
click to toggle source
# File lib/merritt_zk_access.rb, line 33 def self.prefix_path(queue_name) "#{dir(queue_name)}/#{PREFIX}" end
Public Instance Methods
delete(zk)
click to toggle source
# File lib/merritt_zk_access.rb, line 64 def delete(zk) raise MerrittZK::MerrittStateError, "Delete invalid #{path}" unless @status.deletable? return if path.nil? || path.empty? # puts "DELETE #{path}" zk.rm_rf(path) end
load_properties(zk)
click to toggle source
# File lib/merritt_zk_access.rb, line 21 def load_properties(zk) @data = json_property(zk, ZkKeys::TOKEN) end
path()
click to toggle source
# File lib/merritt_zk_access.rb, line 37 def path "#{Access.dir(@queue_name)}/#{@id}" end
states()
click to toggle source
# File lib/merritt_zk_access.rb, line 25 def states AccessState.states end