Fork me on GitHub

Reference

Models

TaskType Models a task type
Type String Any value to represent the task type
Version String The version of this task type
IsIdempotent Boolean Whether or not this task is idempotent 
 
Idempotent tasks can be restarted/re-executed when a workflow instance crashes or there is some other kind of error. non-idempotent tasks will only be attempted once.
Mode Task Mode Task queueing mode (see below)
Task Models a task
TaskId String Unique ID for the task
Type TaskType Task's type. If null, this is a container-only task
MetaData Map Optional metadata
Children Task[] Child tasks that execute after this task completes. NOTE: children can have multiple parents. i.e. specifying the same task as a child of different parents causes the task to wait for all parents to complete.
ExecutableTask Models a task that has been scheduled for execution as part of a run
RunId String Every separate run of a Task submitted is assigned a unique run ID by the Workflow Manager. It's used to reference the execution.
TaskId String The TaskID from the Task model
Type TaskType The task type from the Task model
MetaData Map The metadata from the Task model
TaskExecutionResult Models an execution result
Status Status Code The execution status (see below)
Message String Any message to store as the result. Note: tasks that execute after this can retrieve this message.
Result Data Map Any data to store as the result. Note: tasks that execute after this can retrieve this data.
Sub Task RunId String Optional sub-task (see below)

Task Modes

Tasks can be queued to execute in the order submitted, with a given priority, or a given delay. The possible values are:

STANDARD Executed in order submitted
DELAY Delayed tasks don't excecute immediately. They execute at a future time specified by the task metadata "__submit_value" (use Task.makeSpecialMeta()). The value is the Long.toString(epoch) value of the future ticks/epoch when the task should run. E.g. to have the task run 1 minute from when submitted use System.currentTimeMillis() + 60000.
PRIORITY Priority tasks are executed with the approximate priority given when submitted. i.e. a task submitted with priority 1 should execute before a task submitted with priority 5 even if the priority 1 task was submitted after the priority 5 task (of course only if the priority 5 task hasn't executed yet). Just as for DELAY, set the proiority using the task metadata "__submit_value" (or Task.makeSpecialMeta()).

Status Codes

When a task executes, it returns a result that specifies the task status. The status controls the remainder of the tasks in the workflow. The possible values are:

SUCCESS The task executed successfully
FAILED_CONTINUE The task failed, but the remaining tasks should still execute
FAILED_STOP The task failed and the remaining tasks in the run should be canceled

APIs

WorkflowManager

The main API is WorkflowManager. There are also Administration APIs documented here.

  • public void start();

    Starts the workflow manager which does its work asynchronously. In most use cases, you will have one WorkflowManager instance per server/JVM. However, you can create as many WorkflowManager instances as you need. Use as many servers as you need to distribute tasks: three servers will suffice for most uses.

  • public RunId submitTask(Task task);

    Submits the given task (with the task's associated children, etc.). The task will start nearly immediately. NOTE: there's no guarantee which server will execute the various tasks.

  • public boolean cancelRun(RunId runId);

    Attempt to cancel the given run. NOTE: the cancellation is scheduled and does not occur immediately. Currently executing tasks will continue. Only tasks that have not yet executed can be canceled.

  • public TaskExecutionResult getTaskExecutionResult(RunId runId, TaskId taskId);

    Child tasks can retrieve the results of parent tasks and alter their work based on the results.

  • public RunId submitSubTask(RunId parentRunId, Task task);

    Same as submitTask(Task) except that, when completed, the parent run will be notified. This method is meant to be used inside of TaskExecutor for a task that needs to initiate a sub-run and have the parent run wait for the sub-run to complete.

  • public WorkflowAdmin getAdmin();

    Returns the Administration APIs which are documented here.

  • public WorkflowListenerManager newWorkflowListenerManager();

    Allocates a new Workflow Listener Manager (see below).

WorkflowManagerBuilder

The WorkflowManagerBuilder is used to build new WorkflowManager instances.

  • public WorkflowManagerBuilder withCurator(CuratorFramework curator, String namespace, String version);

    Required - Set the Curator instance to use. In addition to the Curator instance, specify a namespace for the workflow and a version. The namespace and version combine to create a unique workflow. All instances using the same namespace and version are logically part of the same workflow.

  • public WorkflowManagerBuilder addingTaskExecutor(TaskExecutor taskExecutor, int qty, TaskType taskType);

    Adds a pool of task executors for a given task type to this instance of the workflow. The specified number of executors are allocated. Call this method multiple times to allocate executors for the various types of tasks that will be used in this workflow. You can choose to have all workflow instances execute all task types or target certain task types to certain instances.

    qty is the maximum concurrency for the given type of task for this instance. The logical concurrency for a given task type is the total qty of all instances in the workflow. e.g. if there are 3 instances in the workflow and instance A has 2 executors for task type "a", instance B has 3 executors for task type "a" and instance C has no executors for task type "a", the maximum concurrency for task type "a" is 5.

    IMPORTANT: every workflow cluster must have at least one instance that has task executor(s) for each task type that will be submitted to the workflow. i.e workflows will stall if there is no executor for a given task type.

  • public WorkflowManagerBuilder withInstanceName(String instanceName);

    Used in reporting. This will be the value recorded as tasks are executed. Via reporting, you can determine which instance has executed a given task. Default is: InetAddress.getLocalHost().getHostName()

  • public WorkflowManagerBuilder withQueueFactory(QueueFactory queueFactory);

    Pluggable queue factory. Default uses ZooKeeper for queuing.

  • public WorkflowManagerBuilder withAutoCleaner(AutoCleaner autoCleaner, Duration runPeriod);

    Sets an auto-cleaner that will run every given period. This is used to clean old runs. IMPORTANT: the auto cleaner will only run on the instance that is the current scheduler.

  • public WorkflowManagerBuilder withSerializer(Serializer serializer);

    By default, a JSON serializer is used to store data in ZooKeeper. Use this to specify an alternate serializer.

TaskExecutor

TaskExecutor is an interface and you are required to provide an implementation. The Workflow Manager uses your TaskExecutor instances to do the actual work of the tasks. TaskExecutor is a functional interface and has one method:

  • public TaskExecution newTaskExecution(WorkflowManager workflowManager, ExecutableTask executableTask);

    Create a task execution for the given task. TaskExecution are similar to JDK Runnables.

TaskExecution

Represents a task execution. A new task execution is allocated for each run of a task. The Workflow manager will call execute() from an internally managed thread pool when the task should perform its operation.

  • public TaskExecutionResult execute();

    Execute the task and return the result when complete (see status codes above).

Events

Via the WorkflowManager you can create a WorkflowListenerManager, start it and listen for various events. Events are represented by WorkflowEvent instances which have these methods:

  • public EventType getType();

    Returns the event type (see table below)

  • public RunId getRunId();

    Returns the RunId of the run that generated the event

  • public TaskId getTaskId();

    If the event is associated with a specific task, this is the TaskId

    Event Types
    RUN_STARTED A run has started. getRunId() is the run id.
    RUN_UPDATED A run has been updated - usually meaning it has completed. getRunId() is the run id.
    TASK_STARTED A task has started. getRunId() is the run id. getTaskId() is the task id.
    TASK_COMPLETED A task has completed. getRunId() is the run id. getTaskId() is the task id.