class MerrittZK::Batch
Merritt Batch
Queue Items
Constants
- BATCH_UUIDS
- DIR
- PREFIX
Attributes
has_failure[R]
Public Class Methods
acquire_batch_for_reporting_batch(zk)
click to toggle source
# File lib/merritt_zk_batch.rb, line 91 def self.acquire_batch_for_reporting_batch(zk) zk.children(DIR).sort.each do |cp| next unless zk.exists?("#{DIR}/#{cp}/states/batch-processing") next unless zk.children("#{DIR}/#{cp}/states/batch-processing").empty? b = Batch.new(cp) b.load(zk) begin next if b.status == BatchState::Completed || b.status == BatchState::Failed if b.lock(zk) b.set_status(zk, BatchState::Reporting) return b end rescue ZK::Exceptions::NodeExists # no action end end nil end
acquire_pending_batch(zk)
click to toggle source
# File lib/merritt_zk_batch.rb, line 72 def self.acquire_pending_batch(zk) zk.children(DIR).sort.each do |cp| next if zk.exists?("#{DIR}/#{cp}/#{ZkKeys::STATES}") b = Batch.new(cp) b.load(zk) begin if b.lock(zk) b.set_data(zk, ZkKeys::STATES, nil) b.set_status(zk, BatchState::Processing) return b end rescue ZK::Exceptions::NodeExists # no action end end nil end
batch_uuid_path(uuid)
click to toggle source
# File lib/merritt_zk_batch.rb, line 52 def self.batch_uuid_path(uuid) "#{BATCH_UUIDS}/#{uuid}" end
create_batch(zk, submission)
click to toggle source
# File lib/merritt_zk_batch.rb, line 62 def self.create_batch(zk, submission) id = QueueItem.create_id(zk, prefix_path) batch = Batch.new(id, data: submission) uuid = submission.fetch(:batchID, '') zk.create(batch_uuid_path(uuid), id) unless uuid.empty? batch.set_data(zk, ZkKeys::SUBMISSION, submission) batch.set_status(zk, BatchState.init) batch end
delete_completed_batches(zk)
click to toggle source
# File lib/merritt_zk_batch.rb, line 112 def self.delete_completed_batches(zk) ids = [] zk.children(DIR).sort.each do |cp| next unless zk.exists?("#{DIR}/#{cp}/states/batch-processing") next unless zk.children("#{DIR}/#{cp}/states/batch-processing").empty? b = Batch.new(cp) b.load(zk) begin next unless b.status == BatchState::Completed || b.status == BatchState::Deleted b.delete(zk) ids << b.id rescue ZK::Exceptions::NodeExists # no action end end ids end
dir()
click to toggle source
# File lib/merritt_zk_batch.rb, line 40 def self.dir DIR.to_s end
find_batch_by_uuid(zk, uuid)
click to toggle source
# File lib/merritt_zk_batch.rb, line 159 def self.find_batch_by_uuid(zk, uuid) return if uuid.empty? p = batch_uuid_path(uuid) return unless zk.exists?(p) arr = zk.get(p) return if arr.nil? bid = arr[0] return if bid.empty? Batch.new(bid) end
new(id, data: nil)
click to toggle source
Calls superclass method
# File lib/merritt_zk_batch.rb, line 15 def initialize(id, data: nil) super(id, data: data) @has_failure = false end
prefix_path()
click to toggle source
# File lib/merritt_zk_batch.rb, line 44 def self.prefix_path "#{DIR}/#{PREFIX}" end
Public Instance Methods
batch_uuid()
click to toggle source
# File lib/merritt_zk_batch.rb, line 56 def batch_uuid return '' if @data.nil? @data.fetch(:batchID, '') end
delete(zk)
click to toggle source
# File lib/merritt_zk_batch.rb, line 174 def delete(zk) raise MerrittZK::MerrittStateError, "Delete invalid #{path}" unless @status.deletable? %w[batch-processing batch-failed batch-completed].each do |state| p = "#{path}/states/#{state}" next unless zk.exists?(p) zk.children(p).each do |cp| MerrittZK::Job.new(cp).load(zk).delete(zk) end end load(zk) if @data.nil? zk.delete(Batch.batch_uuid_path(batch_uuid)) unless batch_uuid.empty? return if path.nil? || path.empty? # puts "DELETE #{path}" zk.rm_rf(path) end
get_completed_jobs(zk)
click to toggle source
# File lib/merritt_zk_batch.rb, line 132 def get_completed_jobs(zk) get_jobs(zk, 'batch-completed') end
get_deleted_jobs(zk)
click to toggle source
# File lib/merritt_zk_batch.rb, line 136 def get_deleted_jobs(zk) get_jobs(zk, 'batch-deleted') end
get_failed_jobs(zk)
click to toggle source
# File lib/merritt_zk_batch.rb, line 140 def get_failed_jobs(zk) get_jobs(zk, 'batch-failed') end
get_jobs(zk, state)
click to toggle source
# File lib/merritt_zk_batch.rb, line 148 def get_jobs(zk, state) jobs = [] p = "#{path}/states/#{state}" if zk.exists?(p) zk.children(p).each do |cp| jobs << Job.new(cp, bid: id) end end jobs end
get_processing_jobs(zk)
click to toggle source
# File lib/merritt_zk_batch.rb, line 144 def get_processing_jobs(zk) get_jobs(zk, 'batch-processing') end
load_has_failure(zk)
click to toggle source
# File lib/merritt_zk_batch.rb, line 22 def load_has_failure(zk) @has_failure = false p = "#{path}/states/batch-failed" return unless zk.exists?(p) return if zk.children(p).empty? @has_failure = true end
load_properties(zk)
click to toggle source
# File lib/merritt_zk_batch.rb, line 31 def load_properties(zk) @data = json_property(zk, ZkKeys::SUBMISSION) load_has_failure(zk) end
path()
click to toggle source
# File lib/merritt_zk_batch.rb, line 48 def path "#{DIR}/#{@id}" end
states()
click to toggle source
# File lib/merritt_zk_batch.rb, line 36 def states BatchState.states end