Package org.cdlib.mrt.zk
Merritt Queue Design
The Merritt Ingest Queue will be refactored in 2024 to enable a number of goals.- Match ingest workload to available resources (compute, memory, working storage)
- dynamically provision resources to match demand
- dynamically manage running thread count based on processing load
- Hold jobs based on temporary holds (collection lock, storage node lock, queue hold)
- Graceful resumption of processing in progress
- Allow processing to be resumed on any ingest host. The previous implementation managed state in memory which prevented this capability
- Accurate notification of ingest completion (including inventory recording)
- Send accurate summary email on completion of a batch regardless of any interruption that occurred while processing
Batch Queue vs Job Queue
The work of the Merritt Ingest Service takes place at a Job level. Merritt Depositors initiate submissions at a Batch level. The primary function of the Batch Queue is to provide notification to a depositor once all jobs for a batch have completed.Use of ZooKeeper
Merritt Utilizes ZooKeeper for the following features- Creation/validation of distributed (ephemeral node) locks
- Creation of unique node names across the distributed node structure (sequential nodes)
- Manage data across the distributed node structure to allow any worker to acquire a job/batch (persistent nodes)
The ZooKeeper documentation advises keeping the payload of shared data relatively small.
The Merritt ZooKeeper design sames read-only data as JSON objects.
More volatile (read/write) fields are saved as Int, Long, String and very small JSON objects.
Code Examples
Create Batch
ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null); JSONObject batchSub = new JSONObject("..."); Batch batch = Batch.createBatch(zk, batchSub);
Consumer Daemon Acquires Batch and Creates Jobs
// An ephemeral lock is created when the batch is acquired // The ephemeral lock will be released when the ZooKeeper connection is closed try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) { Batch batch = Batch.acquirePendingBatch(zk) JSONObject jobConfig = Job.createJobConfiguration(...); Job j = Job.createJob(zk, batch.id(), jobConfig); }
Consumer Daemon Acquires Pending Job and Moves Job to Estimating
try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) { Job jj = Job.acquireJob(zk, JobState.Pending); jj.setStatus(zk, jj.status().stateChange(JobState.Estimating)); }
Consumer Daemon Acquires Estimating Job and Updates Priority
try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) { Job jj = Job.acquireJob(zk, JobState.Estimating); jj.setPriority(3); jj.setStatus(zk, jj.status().success())); }
Acquire Completed Batch, Perform Reporting
try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) { Batch batch = Batch.acquireBatchForReporting(zk); //Notify depositor of job status batch.setStatus(zk, batch.status().success())); //Admin thread will perform batch.delete(zk); }
Create Assembly Request
ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null); JSONObject tokenData = new JSONObject("..."); JSONObject tokenData = Access.createTokenData(...) Access access = Access.createAssembly(zk, Access.Queues.small, tokenData);
Consumer Daemon Acquires Assembly Request
// An ephemeral lock is created when the batch is acquired // The ephemeral lock will be released when the ZooKeeper connection is closed try(ZooKeeper zk = new ZooKeeper("localhost:8084", 100, null)) { Access access = Batch.acquirePendingAssembly(zk, Access.Queues.small); //Do stuff access.setStatus(zk, AccessState.Processing); }
- See Also:
- Design Document
-
Interface Summary Interface Description IngestState Common interface for Ingest Queue State Enums -
Class Summary Class Description Access Class to manage a Merritt Access Assembly Request.Batch Class to manage a Merritt Ingest Batch in the Batch Queue.Job Class to manage a Merritt Ingest Job in the Job Queue.MerrittLocks Static methods to set and release Merritt Locks.QueueItem Base Class for Common functions for Merritt Ingest Batches and Merritt Ingest Jobs.QueueItemHelper The static methods in this class also provides a simplified interface for common ZooKeeper API calls. -
Enum Summary Enum Description Access.Queues Access Assembly Queue namesAccessState Access Assembly State TransitionsBatchState Batch State TransitionsJobState Job State TransitionsMerrittJsonKey Lookup key names for properties stored as JSON within ZooKeeper nodes.QueueItem.BatchJobStates Standardized path names for Merritt Zookeeper nodesQueueItem.ZkPaths Standardized path names for Merritt Zookeeper nodesQueueItem.ZkPrefixes Standardized prefix names for Merritt Zookeeper sequential nodesZKKey Defines relative pathnames to ZooKeeper nodes for a Batch or a Job. -
Exception Summary Exception Description MerrittStateError Exception indicates that an illegal operation was invoked on an object with an incompatible state (ie attempting to delete a Job or Batch that is not in a deleteable state)MerrittZKNodeInvalid Exception thown when attempting to modify the data payload for a ZooKeeper node.