Fork me on GitHub

Getting Started

Background

The basic unit is a Task. Tasks have an id, type, optional metadata, and zero or more children. Tasks are submitted to be executed through a WorkflowManager instance. WorkflowManager instances are created using a WorkflowManagerBuilder. The work of a task is executed by a TaskExecutor instance which you must provide an implementation for.

Prerequisites

Nirmata Workflow uses Apache ZooKeeper and Apache Curator. You will need to run an Apache ZooKeeper cluster in order to use the Workflow Manager. You will also need to create a Curator client instance for the Workflow Manager to use. Refer to the documentation of each of these projects for details.

NOTE: Nirmata Workflow is a Java 8 Library

Using in Your Project

Nirmata Workflow is available from Maven Central:

GroupId com.nirmata.workflow
ArtifactId nirmata-workflow
  • From Maven
    <dependency>
        <groupId>com.nirmata.workflow</groupId>
        <artifactId>nirmata-workflow</artifactId>
        <version>${nirmata-workflow-version}</version>
    </dependency>
  • From Gradle
    dependencies {
        compile('com.nirmata.workflow:nirmata-workflow:$nirmataWorkflowVersion')
    }

Tasks

You can create task objects programmatically or via a file/stream. To load via file/stream, you can use the utility TaskLoader.load(). Here is an example tasks JSON file. It defines 4 tasks. The first task, "root" is a container only task - i.e. there's nothing to execute; it just has children. Container only tasks don't have a "task type". Then, there is "task1" and "task2" both of which have a child task, "task3". This means that "task3" will not execute until both "task1" and "task2" complete.

Task Types

Every task has a TaskType. The TaskType is used by your TaskExecutor implementation to do the work of the task. TaskType's have three fields: "type": a string that contains any value you define; "version": a version string used to version the tasks; "isIdempotent": whether or not the task is idempotent. TaskType also allows setting a priority or delay before a task executes.

TaskExecutor

The TaskExecutor is a factory that you provide which creates TaskExecution instances. Every time a task needs to be executed, the newTaskExecution() method of TaskExecutor is called to get a new TaskExecution instance. The TaskExecution's execute() method is then called to do the work of the task and return a result. A task is executed randomly on one of the servers that is configured to handle tasks with that task type. Nirmata Workflow runs TaskExecutions in internally managed thread pools.

WorkflowManagerBuilder

WorkflowManagerBuilder is used to build WorkflowManager instances. For most uses, you will have one WorkflowManager per server/JVM in your cluster. However, you can create as many WorkflowManager instances as you need. The builder has the following methods:

withCurator Required to set the Curator instance. You also specify a namespace and a version. The namespace and version combine to create a unique workflow. All instances using the same namespace and version are logically part of the same workflow.
addingTaskExecutor Adds a pool of task executors for a given task type to this instance of the workflow. The specified number of executors are allocated. Call this method multiple times to allocate executors for the various types of tasks that will be used in this workflow. You can choose to have all workflow instances execute all task types or target certain task types to certain instances. 
 
qty is the maximum concurrency for the given type of task for this instance. The logical concurrency for a given task type is the total qty of all instances in the workflow. e.g. if there are 3 instances in the workflow and instance A has 2 executors for task type "a", instance B has 3 executors for task type "a" and instance C has no executors for task type "a", the maximum concurrency for task type "a" is 5. 
 
IMPORTANT: every workflow cluster must have at least one instance that has task executor(s) for each task type that will be submitted to the workflow. i.e workflows will stall if there is no executor for a given task type.
withInstanceName optional - used in reporting. This will be the value recorded as tasks are executed. Via reporting, you can determine which instance has executed a given task. the Default is: InetAddress.getLocalHost().getHostName()
withAutoCleaner optional - Sets an auto-cleaner that will run every given period. This is used to clean old runs. IMPORTANT: the auto cleaner will only run on the instance that is the current scheduler.
withSerializer optional - By default, a JSON serializer is used to store data in ZooKeeper. Use this to specify an alternate serializer.
withQueueFactory optional - Pluggable queue factory. Default uses ZooKeeper for queuing.

WorkflowManager

WorkflowManager is the main API. Through it, you submit tasks, etc. Once created with WorkflowManagerBuilder, call the start() method and the workflow system will operate asynchronously. You can submit() tasks at any time and from any instance.