Here is a simple example that shows a Workflow Manager being started and a workflow of three tasks being submitted:
public class Example implements Closeable { private final TestingServer testingServer; private final CuratorFramework curator; public static void main(String[] args) throws Exception { Example example = new Example(); example.runExample(); example.close(); } public Example() throws Exception { // for testing purposes, start an in-memory test ZooKeeper instance testingServer = new TestingServer(); // allocate the Curator instance curator = CuratorFrameworkFactory.builder() .connectString(testingServer.getConnectString()) .retryPolicy(new ExponentialBackoffRetry(100, 3)) .build(); } public void runExample() { curator.start(); // for our example, we'll just have one task type TaskType taskType = new TaskType("my type", "1", true); // a task which will have two parents Task childTask = new Task(new TaskId("child task"), taskType); // parent #1 Task parentTask1 = new Task(new TaskId("parent 1"), taskType, Lists.newArrayList(childTask)); // parent #2 Task parentTask2 = new Task(new TaskId("parent 2"), taskType, Lists.newArrayList(childTask)); // a root container-only for the parent tasks Task rootTask = new Task(new TaskId(), Lists.newArrayList(parentTask1, parentTask2)); // an executor that just logs a message and returns ExampleTaskExecutor taskExecutor = new ExampleTaskExecutor(); // allocate the workflow manager with some executors for our type WorkflowManager workflowManager = WorkflowManagerBuilder.builder() .addingTaskExecutor(taskExecutor, 10, taskType) .withCurator(curator, "test", "1") .build(); WorkflowListenerManager workflowListenerManager = workflowManager.newWorkflowListenerManager(); try { // listen for run completion and count down a latch when it happens final CountDownLatch doneLatch = new CountDownLatch(1); WorkflowListener listener = new WorkflowListener() { @Override public void receiveEvent(WorkflowEvent event) { if ( event.getType() == WorkflowEvent.EventType.RUN_UPDATED ) { // note: the run could have had an error. RUN_UPDATED does not guarantee successful completion doneLatch.countDown(); } } }; workflowListenerManager.getListenable().addListener(listener); // start the manager and the listeners workflowManager.start(); workflowListenerManager.start(); // submit our task workflowManager.submitTask(rootTask); // you should see these messages in the console: // Executing task: Id{id='parent 1'} // Executing task: Id{id='parent 2'} // then // Executing task: Id{id='child task'} // wait for completion doneLatch.await(); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } finally { CloseableUtils.closeQuietly(workflowListenerManager); CloseableUtils.closeQuietly(workflowManager); } } @Override public void close() { CloseableUtils.closeQuietly(curator); CloseableUtils.closeQuietly(testingServer); } }
public class ExampleTaskExecutor implements TaskExecutor { @Override public TaskExecution newTaskExecution(WorkflowManager workflowManager, ExecutableTask executableTask) { return new TaskExecution() { @Override public TaskExecutionResult execute() { System.out.println("Executing task: " + executableTask.getTaskId()); return new TaskExecutionResult(TaskExecutionStatus.SUCCESS, "My message"); } }; } }