class MerrittZK::Job
Merritt Ingest Job
Queue item
Constants
- DIR
- PREFIX
Attributes
bid[R]
priority[R]
space_needed[R]
Public Class Methods
acquire_job(zk, state)
click to toggle source
# File lib/merritt_zk_job.rb, line 154 def self.acquire_job(zk, state) p = "#{DIR}/states/#{state.name.downcase}" return nil unless zk.exists?(p) zk.children(p).sort.each do |cp| j = Job.new(cp[3..]).load(zk) begin return j if j.lock(zk) rescue ZK::Exceptions::NodeExists # no action end end nil end
create_job(zk, bid, data, priority: 5, identifiers: {}, metadata: {})
click to toggle source
# File lib/merritt_zk_job.rb, line 130 def self.create_job(zk, bid, data, priority: 5, identifiers: {}, metadata: {}) id = QueueItem.create_id(zk, prefix_path) job = Job.new(id, bid: bid, data: data, identifiers: identifiers, metadata: metadata) job.set_data(zk, ZkKeys::BID, bid) job.set_data(zk, ZkKeys::PRIORITY, job.priority) job.set_data(zk, ZkKeys::SPACE_NEEEDED, job.space_needed) job.set_data(zk, ZkKeys::CONFIGURATION, data) job.set_data(zk, ZkKeys::IDENTIFIERS, identifiers) unless identifiers.empty? job.set_data(zk, ZkKeys::METADATA, metadata) unless metadata.empty? job.set_status_with_priority(zk, JobState.init, priority) job.set_job_state_path(zk) job.set_batch_state_path(zk) job end
list_jobs_as_json(zk)
click to toggle source
List jobs as a json object that will be consumed by the admin tool. This is a transitional representation that can be compatible with legacy job listings.
# File lib/merritt_zk_job.rb, line 189 def self.list_jobs_as_json(zk) jobs = [] zk.children(DIR).sort.each do |cp| next if cp == ZkKeys::STATES begin job = Job.new(cp) job.load_optimized(zk) jobjson = job.data jobjson[:id] = job.id jobjson[:bid] = job.bid jobjson[:status] = job.status_name jobs.append(jobjson) rescue StandardError => e puts "List Job #{cp} exception: #{e}" end end jobs end
new(id, bid: nil, data: nil, identifiers: {}, metadata: {})
click to toggle source
Calls superclass method
# File lib/merritt_zk_job.rb, line 14 def initialize(id, bid: nil, data: nil, identifiers: {}, metadata: {}) super(id, data: data) @bid = bid @priority = 5 @space_needed = 0 @job_state_path = nil @batch_state_path = nil @retry_count = 0 @identifiers = identifiers @metadata = metadata @inventory = {} end
prefix_path()
click to toggle source
# File lib/merritt_zk_job.rb, line 122 def self.prefix_path "#{DIR}/#{PREFIX}" end
Public Instance Methods
batch_state_subpath()
click to toggle source
# File lib/merritt_zk_job.rb, line 82 def batch_state_subpath return 'batch-failed' if @status.status == :Failed return 'batch-completed' if @status.status == :Completed return 'batch-deleted' if @status.status == :Deleted 'batch-processing' end
creator()
click to toggle source
# File lib/merritt_zk_job.rb, line 213 def creator data_prop('creator', '') end
delete(zk)
click to toggle source
# File lib/merritt_zk_job.rb, line 169 def delete(zk) raise MerrittZK::MerrittStateError, "Delete invalid #{path}" unless @status.deletable? unless @job_state_path.nil? # puts "DELETE #{@job_state_path}" zk.rm_rf(@job_state_path) end unless @batch_state_path.nil? # puts "DELETE #{@batch_state_path}" zk.rm_rf(@batch_state_path) end return if path.nil? || path.empty? # puts "DELETE #{path}" zk.rm_rf(path) end
filename()
click to toggle source
# File lib/merritt_zk_job.rb, line 225 def filename data_prop('filename', '') end
load_optimized(zk)
click to toggle source
for the admin tool
# File lib/merritt_zk_job.rb, line 33 def load_optimized(zk) raise MerrittZKNodeInvalid, "Missing Node #{path}" unless zk.exists?(path) load_status(zk, json_property(zk, ZkKeys::STATUS)) @data = json_property(zk, ZkKeys::CONFIGURATION) @bid = string_property(zk, ZkKeys::BID) self end
load_properties(zk)
click to toggle source
# File lib/merritt_zk_job.rb, line 42 def load_properties(zk) @data = json_property(zk, ZkKeys::CONFIGURATION) @bid = string_property(zk, ZkKeys::BID) @priority = int_property(zk, ZkKeys::PRIORITY) @space_needed = int_property(zk, ZkKeys::SPACE_NEEEDED) @identifiers = json_property(zk, ZkKeys::IDENTIFIERS) if zk.exists?("#{path}/#{ZkKeys::IDENTIFIERS}") @metadata = json_property(zk, ZkKeys::METADATA) if zk.exists?("#{path}/#{ZkKeys::METADATA}") @inventory = json_property(zk, ZkKeys::INVENTORY) if zk.exists?("#{path}/#{ZkKeys::INVENTORY}") set_job_state_path(zk) set_batch_state_path(zk) end
load_status(zk, js)
click to toggle source
Calls superclass method
# File lib/merritt_zk_job.rb, line 27 def load_status(zk, js) super(zk, js) @retry_count = js.fetch(:retry_count, 0) end
path()
click to toggle source
# File lib/merritt_zk_job.rb, line 126 def path "#{DIR}/#{@id}" end
profile()
click to toggle source
# File lib/merritt_zk_job.rb, line 217 def profile data_prop('profile', '') end
response_form()
click to toggle source
# File lib/merritt_zk_job.rb, line 221 def response_form data_prop('responseForm', '') end
set_batch_state_path(zk)
click to toggle source
# File lib/merritt_zk_job.rb, line 90 def set_batch_state_path(zk) bs = format('%s/%s/states/%s/%s', Batch.dir, @bid, batch_state_subpath, id) return if bs == @batch_state_path zk.delete(@batch_state_path) if @batch_state_path @batch_state_path = bs return if zk.exists?(@batch_state_path) p = File.dirname(@batch_state_path) pp = File.dirname(p) zk.create(pp, data: nil) unless zk.exists?(pp) zk.create(p, data: nil) unless zk.exists?(p) zk.create(@batch_state_path, data: nil) end
set_job_state_path(zk)
click to toggle source
# File lib/merritt_zk_job.rb, line 105 def set_job_state_path(zk) js = format('%s/states/%s/%02d-%s', DIR, status.name.downcase, priority, id) return if js == @job_state_path zk.delete(@job_state_path) if @job_state_path @job_state_path = js return if zk.exists?(@job_state_path) p = File.dirname(@job_state_path) zk.create(p, data: nil) unless zk.exists?(p) zk.create(@job_state_path, data: nil) end
set_priority(zk, priority)
click to toggle source
# File lib/merritt_zk_job.rb, line 56 def set_priority(zk, priority) return if priority == @priority @priority = priority set_data(zk, ZkKeys::PRIORITY, priority) end
set_space_needed(zk, space_needed)
click to toggle source
# File lib/merritt_zk_job.rb, line 63 def set_space_needed(zk, space_needed) return if space_needed == @space_needed @space_needed = space_needed set_data(zk, ZkKeys::SPACE_NEEEDED, space_needed) end
set_status(zk, status, message = '', job_retry: false)
click to toggle source
Calls superclass method
# File lib/merritt_zk_job.rb, line 70 def set_status(zk, status, message = '', job_retry: false) @retry_count += 1 if job_retry super(zk, status, message) set_job_state_path(zk) set_batch_state_path(zk) end
set_status_with_priority(zk, status, priority)
click to toggle source
# File lib/merritt_zk_job.rb, line 77 def set_status_with_priority(zk, status, priority) set_priority(zk, priority) set_status(zk, status) end
states()
click to toggle source
# File lib/merritt_zk_job.rb, line 118 def states JobState.states end
status_object(status)
click to toggle source
# File lib/merritt_zk_job.rb, line 145 def status_object(status) { status: status.name, last_successful_status: nil, last_modified: Time.now.to_s, retry_count: @retry_count } end
submitter()
click to toggle source
# File lib/merritt_zk_job.rb, line 209 def submitter data_prop('submitter', '') end
title()
click to toggle source
# File lib/merritt_zk_job.rb, line 237 def title data_prop('title', '') end
type()
click to toggle source
# File lib/merritt_zk_job.rb, line 233 def type data_prop('type', '') end
udpate()
click to toggle source
# File lib/merritt_zk_job.rb, line 229 def udpate data_prop('update', false) end