Package org.cdlib.mrt.zk

Merritt Queue Design

The Merritt Ingest Queue will be refactored in 2024 to enable a number of goals.
  • Match ingest workload to available resources (compute, memory, working storage)
  • dynamically provision resources to match demand
  • dynamically manage running thread count based on processing load
  • Hold jobs based on temporary holds (collection lock, storage node lock, queue hold)
  • Graceful resumption of processing in progress
  • Allow processing to be resumed on any ingest host. The previous implementation managed state in memory which prevented this capability
  • Accurate notification of ingest completion (including inventory recording)
  • Send accurate summary email on completion of a batch regardless of any interruption that occurred while processing

Batch Queue vs Job Queue

The work of the Merritt Ingest Service takes place at a Job level. Merritt Depositors initiate submissions at a Batch level. The primary function of the Batch Queue is to provide notification to a depositor once all jobs for a batch have completed.

Use of ZooKeeper

Merritt Utilizes ZooKeeper for the following features
  • Creation/validation of distributed (ephemeral node) locks
  • Creation of unique node names across the distributed node structure (sequential nodes)
  • Manage data across the distributed node structure to allow any worker to acquire a job/batch (persistent nodes)

The ZooKeeper documentation advises keeping the payload of shared data relatively small.

The Merritt ZooKeeper design sames read-only data as JSON objects.

More volatile (read/write) fields are saved as Int, Long, String and very small JSON objects.

Code Examples

Create Batch

 ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null);
 JSONObject batchSub = new JSONObject("...");
 Batch batch = Batch.createBatch(zk, batchSub);
 

Consumer Daemon Acquires Batch and Creates Jobs

 // An ephemeral lock is created when the batch is acquired
 // The ephemeral lock will be released when the ZooKeeper connection is closed
 try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) {
   Batch batch = Batch.acquirePendingBatch(zk)
   JSONObject jobConfig = Job.createJobConfiguration(...);
   Job j = Job.createJob(zk, batch.id(), jobConfig);
 }
 

Consumer Daemon Acquires Pending Job and Moves Job to Estimating

 try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) {
   Job jj = Job.acquireJob(zk, JobState.Pending);
   jj.setStatus(zk, jj.status().stateChange(JobState.Estimating));
 }
 

Consumer Daemon Acquires Estimating Job and Updates Priority

 try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) {
   Job jj = Job.acquireJob(zk, JobState.Estimating);
   jj.setPriority(3);
   jj.setStatus(zk, jj.status().success()));
 }
 

Acquire Completed Batch, Perform Reporting

 try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) {
   Batch batch = Batch.acquireBatchForReporting(zk);
   //Notify depositor of job status
   batch.setStatus(zk, batch.status().success()));
   //Admin thread will perform batch.delete(zk);
 }
 

Create Assembly Request

 ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null);
 JSONObject tokenData = new JSONObject("...");
 JSONObject tokenData = Access.createTokenData(...)
 Access access = Access.createAssembly(zk, Access.Queues.small, tokenData);
 

Consumer Daemon Acquires Assembly Request

 // An ephemeral lock is created when the batch is acquired
 // The ephemeral lock will be released when the ZooKeeper connection is closed
 try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) {
   Access access = Batch.acquirePendingAssembly(zk, Access.Queues.small);
   //Do stuff
   access.setStatus(zk, AccessState.Processing);
 }
 
See Also:
Design Document