Fork me on GitHub

Example

Here is a simple example that shows a Workflow Manager being started and a workflow of three tasks being submitted:

Example.java

public class Example implements Closeable
{
    private final TestingServer testingServer;
    private final CuratorFramework curator;

    public static void main(String[] args) throws Exception
    {
        Example example = new Example();
        example.runExample();
        example.close();
    }

    public Example() throws Exception
    {
        // for testing purposes, start an in-memory test ZooKeeper instance
        testingServer = new TestingServer();

        // allocate the Curator instance
        curator = CuratorFrameworkFactory.builder()
            .connectString(testingServer.getConnectString())
            .retryPolicy(new ExponentialBackoffRetry(100, 3))
            .build();
    }

    public void runExample()
    {
        curator.start();

        // for our example, we'll just have one task type
        TaskType taskType = new TaskType("my type", "1", true);

        // a task which will have two parents
        Task childTask = new Task(new TaskId("child task"), taskType);

        // parent #1
        Task parentTask1 = new Task(new TaskId("parent 1"), taskType, Lists.newArrayList(childTask));

        // parent #2
        Task parentTask2 = new Task(new TaskId("parent 2"), taskType, Lists.newArrayList(childTask));

        // a root container-only for the parent tasks
        Task rootTask = new Task(new TaskId(), Lists.newArrayList(parentTask1, parentTask2));

        // an executor that just logs a message and returns
        ExampleTaskExecutor taskExecutor = new ExampleTaskExecutor();

        // allocate the workflow manager with some executors for our type
        WorkflowManager workflowManager = WorkflowManagerBuilder.builder()
            .addingTaskExecutor(taskExecutor, 10, taskType)
            .withCurator(curator, "test", "1")
            .build();

        WorkflowListenerManager workflowListenerManager = workflowManager.newWorkflowListenerManager();
        try
        {
            // listen for run completion and count down a latch when it happens
            final CountDownLatch doneLatch = new CountDownLatch(1);
            WorkflowListener listener = new WorkflowListener()
            {
                @Override
                public void receiveEvent(WorkflowEvent event)
                {
                    if ( event.getType() == WorkflowEvent.EventType.RUN_UPDATED )
                    {
                        // note: the run could have had an error. RUN_UPDATED does not guarantee successful completion
                        doneLatch.countDown();
                    }
                }
            };
            workflowListenerManager.getListenable().addListener(listener);

            // start the manager and the listeners
            workflowManager.start();
            workflowListenerManager.start();

            // submit our task
            workflowManager.submitTask(rootTask);

            // you should see these messages in the console:
            //      Executing task: Id{id='parent 1'}
            //      Executing task: Id{id='parent 2'}
            // then
            //      Executing task: Id{id='child task'}

            // wait for completion
            doneLatch.await();
        }
        catch ( InterruptedException e )
        {
            Thread.currentThread().interrupt();
        }
        finally
        {
            CloseableUtils.closeQuietly(workflowListenerManager);
            CloseableUtils.closeQuietly(workflowManager);
        }
    }

    @Override
    public void close()
    {
        CloseableUtils.closeQuietly(curator);
        CloseableUtils.closeQuietly(testingServer);
    }
}

ExampleTaskExecutor.java

public class ExampleTaskExecutor implements TaskExecutor
{
    @Override
    public TaskExecution newTaskExecution(WorkflowManager workflowManager, ExecutableTask executableTask)
    {
        return new TaskExecution()
        {
            @Override
            public TaskExecutionResult execute()
            {
                System.out.println("Executing task: " + executableTask.getTaskId());
                return new TaskExecutionResult(TaskExecutionStatus.SUCCESS, "My message");
            }
        };
    }
}