Beginner's Guide to Apache Apex

Introduction

Apache Apex is a fault-tolerant, high-performance platform and framework for building distributed applications; it is built on Hadoop. This guide is targeted at Java developers who are getting started with building Apex applications.

Quickstart

Those eager to get started right away can download, build, and run a few sample applications; just follow these steps:

  • Make sure you have Java JDK, maven and git installed.
  • Clone the examples git repo: https://github.com/DataTorrent/examples
  • Switch to tutorials/fileOutput and build it either in your favorite IDE or on the command line with:
cd examples/tutorials/fileOutput
mvn clean package -DskipTests
  • Run the test in your IDE or on the command line with:
mvn test

Some of the applications are discussed in the Sample Applications section below. The rest of this document provides a more leisurely introduction to Apex.

Preliminaries

Before beginning development, you'll need to make sure the following prerequisites are present; details of setting up your development environment are here:

  1. Recent versions of the Java JDK, maven, git.
  2. A Hadoop cluster where the application can be deployed.
  3. A working internet connection.

Running the maven archetype

Apex applications use the maven build tool. The maven archetype is useful for avoiding the tedious process of creating a suitable pom.xml maven build file by hand; it also generates a simple default application with two operators that you can build and run without having to write any code or make any changes. To run it, place the following lines in a text file (called, say newapp.sh) and run it with the shell (e.g. bash newapp.sh):

v="3.3.0-incubating"
mvn -B archetype:generate \
  -DarchetypeGroupId=org.apache.apex \
  -DarchetypeArtifactId=apex-app-archetype \
  -DarchetypeVersion="$v" \
  -DgroupId=com.example \
  -Dpackage=com.example.myapexapp \
  -DartifactId=myapexapp \
  -Dversion=1.0-SNAPSHOT

As new versions are released, you might need to update the version number (the the variable v above). You can also run the archetype from your Java IDE as described here.

It should create a new project directory named myapexapp with these 3 Java source files:

src/test/java/com/example/myapexapp/ApplicationTest.java
src/main/java/com/example/myapexapp/Application.java
src/main/java/com/example/myapexapp/RandomNumberGenerator.java

The project should also contain these properties files:

src/site/conf/my-app-conf1.xml
src/main/resources/META-INF/properties.xml

You should now be able to step into the new directory and build the project:

cd myapexapp; mvn clean package -DskipTests

This will create a directory named target and an application package file within it named myapexapp-1.0-SNAPSHOT.apa.

These files are discussed further in the sections below.

The default application

We now discuss the default application generated by the archetype in some detail. Additional, more realistic applications are presented in the section titled Sample Applications below.

The default application creates an application in Application.java with 2 operators:

@ApplicationAnnotation(name="MyFirstApplication")
public class Application implements StreamingApplication
{
  @Override
  public void populateDAG(DAG dag, Configuration conf) {
    RandomNumberGenerator randomGenerator = dag.addOperator("randomGenerator", RandomNumberGenerator.class);
    randomGenerator.setNumTuples(500);
    ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("randomData", randomGenerator.out, cons.input).setLocality(Locality.CONTAINER_LOCAL);
  }
}

The application is named MyFirstApplication via the annotation @ApplicationAnnotation; this name will be displayed in the UI console and must be unique among the applications running on a cluster. It can also be changed at launch time.

The populateDAG method is the only one that you'll need to implement. Its contents fall into three categories: operator creation, operator configuration, and stream creation.

Two operators are created: randomGenerator and console. The first is defined in RandomNumberGenerator.java; it generates random floating point values and emits them on its output port. The second is an instance of ConsoleOutputOperator class defined in Malhar – the library of pre-built operators. The first argument to addOperator() is the name of this operator instance (there can be multiple instances of the same class, so we need a unique name to distinguish them); the second can be either a class object that needs to be instantiated as shown for randomGenerator or an actual instance of that class as shown for console.

The operators can be configured by calling setter methods on them; the call to setNumTuples is an example. However, operators are typically configured via XML properties files as discussed in later sections below.

Finally, the addStream call creates a stream named randomData connecting the output port of first operator to the input port of the second.

Running the application and the unit test

The file ApplicationTest.java contains a unit test that can be run from an IDE by highlighting the testApplication method and selecting the appropriate option from the dropdown; it can also be run the maven command line:

mvn -Dtest=ApplicationTest#testApplication test

It runs for about 10 seconds printing each random number with a prefix of hello world:. The first argument explicitly selects the test to run (testApplication) from the named class (ApplicationTest); you can omit it and just run mvn test to run all of the unit tests.

It is important to note that this particular test is actually a test of the entire application rather than a single class or a method within a class. It uses a class called LocalMode to essentially simulate a cluster. It is an extremely useful technique for testing your application without the need for a cluster. It can be used in more elaborate ways to test complex applications as discussed in the section entitled Local Mode Testing below.

To run the application, you need access to a cluster with Hadoop installed; there are multiple options here:

  • Download and setup the sandbox as described here. This is the simplest option for experimenting with Apex since it has all the necessary pieces installed.
  • Download and install the DataTorrent Community or Enterprise Edition downloadable from here.
  • Use an existing DataTorrent RTS licensed installation.
  • Clone the Apex source code on a cluster with Hadoop already installed, build it and use the apexcli command line tool (previously named dtcli) from there to run your application as described in this video.

With the first 3 methods, you have a browser-based GUI console and you can simply navigate to DevelopUpload Package and upload the .apa file built earlier; then run it using the launch button. If using the command line tool, run that tool then use the command launch myapexapp-1.0-SNAPSHOT.apa to launch the application. You can also specify a particular XML configuration file that is packaged with the application to use during launch, for example: launch -apconf my-app-conf1.xml myapexapp-1.0-SNAPSHOT.apa For an exhaustive list of commands available with this tool, please see here.

More details about configuration files are provided in the next section.

Configuring your application

Application configuration is typically done via XML properties files though it can also be done with code as shown above. The properties.xml file mentioned earlier has some examples:

<property>
  <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
  <value>1000</value>
</property>
<property>
  <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
  <value>hello world: %s</value>
</property>

The number of tuples output per window (windows are discussed in greater detail below) is set both in the code and in this file; in such cases, values in this properties file override those set in the code.

Configuration values can also be placed in XML files under site/conf and the file my-app-conf1.xml mentioned above is an example. These files are not processed automatically; they need to be explicitly selected at launch time. To do this in the GUI, select the Use saved configuration checkbox in the launch dialog and choose the desired file from the dropdown. When a property is specified in multiple files, precedence rules determine the final value; those rules are discussed here.

Attributes and Properties

Properties are simply public accessor methods in the operator classes and govern the functionality of the operator. Attributes on the other hand are pre-defined and affect how the operator or application behaves with respect to its environment. We have already seen a couple of examples of properties, namely, the number of tuples emitted per window by the random number generator (numTuples) and the prefix string appended to each value before it is output by the console operator (stringFormat).

Operator properties that are more complex objects than the primitive types can also be initialized from XML files. For example, if we have properties declared as int[] counts; and String[] paths; in an operator named foo, we can initialize them with:

<property>
  <name>dt.operator.foo.prop.counts</name>
  <value>10, 20, 30</value>
</property>
<property>
  <name>dt.operator.foo.prop.paths</name>
  <value>"/tmp/path1", "/tmp/path3", "/tmp/path3"</value>
</property>

An example of an attribute is the amount of memory allocated to the Buffer Server (see section below entitled Buffer Server); it is named BUFFER_MEMORY_MB can be set like this:

<property>
  <name>dt.application.{appName}.operator.{opName}.port.{portName}.attr.BUFFER_MEMORY_MB</name>
  <value>128</value>
</property>

Here {appName}, {opName}, {portName} are appropriate application, operator and port names respectively; they can also be replaced with asterisks (wildcards). The default value is 512MB.

Some additional attributes include:

Attribute name Description
PARTITIONER custom partitioner class associated with an operator
PARTITION_PARALLEL if true, triggers parallel partitioning of a downstream operator when an upstream operator is partitioned.
STREAM_CODEC controls serialization/deserialization as well as destination partitions
RECOVERY_ATTEMPTS maximum restart attempts of a failed operator

Currently available attribute names are in Context class of the api module in Apex source code.

For additional examples of initialization of properties, including list and map objects, please look here.

Local Mode Testing

As noted above, the LocalMode class is used for testing the application locally in your development environment. A common, though suboptimal, use looks like this:

try {
  LocalMode lma = LocalMode.newInstance();
  Configuration conf = new Configuration(false);
  conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
  lma.prepareDAG(new Application(), conf);
  LocalMode.Controller lc = lma.getController();
  lc.run(10000); // runs for 10 seconds and quits
} catch (ConstraintViolationException e) {
  Assert.fail("constraint violations: " + e.getConstraintViolations());
}

Here, a Configuration object containing all the appropriate settings of properties and attributes for the application is created by parsing the default properties.xml file, a new Application object is created and configured and finally a controller used for timed execution of the application. This approach, though occasionally useful to uncover shallow bugs, has one glaring deficiency – it does not check the results in any way as most unit tests do. We strongly recommend avoiding this usage pattern.

A far better (and recommended) approach is to write data to some storage system that can then be queried for verification. An example of this approach is here and looks like this:

try {
  LocalMode lma = LocalMode.newInstance();
  lma.prepareDAG(new Application(), getConfig());
  LocalMode.Controller lc = lma.getController();
  lc.runAsync();

  // wait for output files to show up
  while ( ! check() ) {
    System.out.println("Sleeping ....");
    Thread.sleep(1000);
  }
} catch (ConstraintViolationException e) {
  Assert.fail("constraint violations: " + e.getConstraintViolations());
}

Here, we invoke runAsync on the controller to fork off a separate thread to run the application while the main thread enters a loop where it looks for the presence of the expected output files. A pair of functions can be defined to setup prerequisites such as starting external servers, creating directories, etc. and perform the corresponding teardown upon test termination; these functions are annotated with @Before and @After.

Checking logs

Logs for the Application Master and the various containers can be retrieved and viewed on the UI console by navigating to the Physical tab, clicking on the specific container in question, clicking on the blue logs button and then selecting the appropriate file from the dropdown. If you don't have access to the UI, you'll need to log in to the appropriate node on the cluster and check the logs there.

A good starting point is the YARN log file which is usually present at /var/log/hadoop-yarn or /var/log/hadoop or similar locations and named yarn-{user}-resourcemanager-{host}.log or hadoop-cmf-yarn-RESOURCEMANAGER-{host}.log.out (where {user} and {host} have appropriate values) or something similar. This file will indicate what containers were allocated for each application, whether the allocation succeeded or failed and where each container is running. Typically, application and container ids will have the respective forms application_1448033276100_0001 and container_1462948052533_0001_01_022468.

If the application failed during launch, the YARN logs will usually have enough information to diagnose the root cause -- for example, a container requiring more memory than is available. If the application launch was successful, you'll see the containers transition through various states: NEW, ALLOCATED, ACQUIRED, RUNNING.

If it failed after launch, the logs of the particular container that failed will shed more light on the issue. Those logs are located on the appropriate node under a directory with the same name as the container id which itself is under a directory named with the application id, for example:

dtadmin@dtbox:/sfw/hadoop/shared/logs$ cd nodemanager/application_1465857463845_0001/
dtadmin@dtbox:/sfw/hadoop/shared/logs/nodemanager/application_1465857463845_0001$ ls
container_1465857463845_0001_01_000001  container_1465857463845_0001_01_000003
container_1465857463845_0001_01_000002  container_1465857463845_0001_01_000004
dtadmin@dtbox:/sfw/hadoop/shared/logs/nodemanager/application_1465857463845_0001$ ls *1
AppMaster.stderr  AppMaster.stdout  dt.log
dtadmin@dtbox:/sfw/hadoop/shared/logs/nodemanager/application_1465857463845_0001$ ls *2
dt.log  stderr  stdout

The Application Master container id always has the suffix _000001.

Operators, Ports and Streams

An operator is, simply put, a class that implements the Operator interface. Though the class could be written to directly implement that interface, a more common and easier method is to extend BaseOperator since it provides default empty implementations of all the required methods. We've already seen an example above, namely RandomNumberGenerator.

A port is a class that can either emit (output port) or ingest (input port) data. Input and output ports implement the InputPort and OutputPort interfaces respectively. More commonly, output ports are simply defined as instances of DefaultOutputPort, for example:

public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>();

and input ports are defined as anonymous inner classes that extend DefaultInputPort:

public final transient DefaultInputPort<Double> input = new DefaultInputPort<Double>()
{
  @Override
  public void process(Double v) {
    out.emit(v);
  }
}

A stream is the set of links connecting a single port of an upstream operator to one or more input ports of downstream operators. We've already seen an example of a stream above, namely randomData. If the upstream operator is not partitioned, tuples are delivered to the input ports of a stream in the same order in which they were written to the output port; this guarantee may change in the future. For a more detailed explanation of these concepts, please look here.

Annotations

Annotations are an important tool for expressing desired guarantees which are then verified in a validation phase before running the application. Some examples:

@Min(1)
int index;

@NotNull
String name

@NotNull
@Size(min=1, max=128)
private String[] path;

@NotNull
@Size(min = 1)
private final Set<String> files;

@Valid
FooBar object;

The @Min and @Max annotations check lower and upper bounds; @Size checks the size of a collection or array against minimum and maximum limits; @Valid requests recursive validation checks on the object.

There are also a few Apex-specific annotations; we've seen one example above, namely @ApplicationAnnotation used to set the name of the application. A couple of others are useful to declare that a port within an operator need not be connected:

@InputPortFieldAnnotation(optional = true)
public final transient InputPort<Object> inportWithCodec = new DefaultInputPort<>();

@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Map<String, Object>> outBindings
   = new DefaultOutputPort<>();

For additional information about annotations, please see: here and here

Partitioners, Unifiers and StreamCodecs

Partitioning is a mechanism for load balancing; it involves replicating one or more operators so that the load can be shared by all the replicas. Partitioning is accomplished by the definePartitions method of the Partitioner interface. A couple of implementations are available: StatelessPartitioner (static partitioning) and StatelessThroughputBasedPartitioner (dynamic partitioning, see below). These can be used by setting the PARTITIONER attribute on the operator by including a stanza like the following in your configuration file (where {appName} and {opName} are the appropriate application and operator names):

<property>
  <name>dt.application.{appName}.operator.{opName}.attr.PARTITIONER</name>
  <value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value>
</property>

The number after the colon specifies the number of desired partitions. This can be done for any operator that is not a connector (i.e. input or output operator) and is not annotated with @OperatorAnnotation(partitionable = false). No code changes are necessary. Incoming tuples entering input ports of the operator are automatically distributed among the partitions based on their hash code by default. You can get greater control of how tuples are distributed to partitions by using a StreamCodec; further discussion of stream codecs is deferred to the Advanced Guide [coming soon]. A small sample program illustrating use of stream codecs is here.

Connectors need special care since they interact with external systems. Many connectors (e.g. Kafka input, file input and output operators) implement the Partitioner interface and support partitioning using custom implementations of definePartitions within the operator. Documentation and/or source code of the individual connectors should be consulted for details.

Sometimes there is need to replicate an entire linear segment of the DAG; this is known as Parallel Partitioning and is achieved by setting the PARTITION_PARALLEL attribute on the input port of each downstream operator that is part of the linear segment. Both of these mechanisms are described in greater detail in the Advanced Features section of the Top N Words tutorial

As mentioned above, the StatelessPartitioner is used for static partitioning since it occurs once before the application starts. Dynamic partitioning while the application is running is also possible using the StatelessThroughputBasedPartitioner or a custom partitioner. Implementing such a partitioner needs special care, especially if the operator to be partitioned has some accumulated state since this state typically needs to be redistributed among the newly created partitions. An example of a custom partitioner that does dynamic partitioning is here.

Unifiers are the flip side of the partitioning coin: When data that was intended to be processed by a single instance is now processed by multiple partitions, each instance computes partial results since it processes only part of the stream; these partial results need to be combined to form the final result; this is the function of a unifier.

For example, suppose an operator is processing numbers and computes the sum of all the values seen in a window. If it is partitioned into N replicas, each replica is computing a partial sum and we would need a unifier that computes the overall sum from these N partial sums. A sample application that shows how to define and use a unifier is available here.

A unifier for an operator is provided by a suitable override of the getUnifier() method of the output port, for example:

  public final transient DefaultOutputPort<HighLow<Integer>> out
    = new DefaultOutputPort<HighLow<Integer>>() {
    @Override
    public Unifier<HighLow<Integer>> getUnifier() {
      return new UnifierRange<Integer>()
    }
  };

If no unifier is supplied for a partitioned operator, the platform will supply a default pass-through unifier.

When the number of partitions is large and the unifier involves non-trivial computations there is a risk that it can become a bottleneck; in such cases, the UNIFIER_LIMIT attribute can be set on the appropriate output port. The platform will then automatically generate the required number of parallel unifiers, cascading into multiple levels if necessary, to ensure that the number of input streams at each unifier does not exceed this limit.

Buffer Server

The Buffer Server is a separate service within a container which implements a publish-subscribe model. It is present whenever the container hosts an operator with an output port connected to another operator outside the container.

The output port is the publisher and the connected input ports of downstream operators are the subscribers. It buffers tuples so that they can be replayed when a downstream operator fails and is restarted. As described earlier, the memory allocated to a buffer server is user configurable via an attribute named BUFFER_MEMORY_MB and defaults to 512MB.

The total memory required by a container that hosts many such operators may climb rapidly; reducing the value of this attribute is advisable in such cases in a memory constrained environment.

Allocating Operator Memory

A container is a JVM process; the maximum memory each such container can consume is user configurable via the MEMORY_MB attribute whose default value is 1GB. If it is too large, container allocation may fail before the application even begins; if it is too small, the application may fail at run time as it tries to allocate memory and runs up against this limit. An example of setting this limit:

<property>
  <name>dt.application.MyFirstApplication.operator.randomGenerator.attr.MEMORY_MB</name>
  <value>300</value>
</property>

As before, wildcards can be used to set the same value for all operators by replacing the operator name randomGenerator with an asterisk. The application name can also be omitted, so a shorter version is:

<property>
  <name>dt.operator.*.attr.MEMORY_MB</name>
  <value>300</value>
</property>

The Application Master (aka StrAM) is a supervisory Java process associated with each application; memory allocation for it is specified slightly differently:

<property>
  <name>dt.attr.MASTER_MEMORY_MB</name>
  <value>700</value>
</property>

For small applications, a value as low as 700 may be adequate; for larger applications, a value 2000 or more may be needed. If this value is too small, the application typically fails at startup with no user-visible diagnostics; YARN logs need to be examined in such cases.

Sample Applications

This section briefly discusses some sample applications using commonly used connectors. The applications themselves are available at Examples. We briefly describe a few of them below.

Each example has a brief README.md file (in markdown format) describing what the application does. In most cases, the unit tests function as full application tests that can be run locally in your development environment without the need for a cluster as described above.

Application — file copy

The fileIO-simple application copies all data verbatim from files added to an input directory to rolling files in an output directory. The input and output directories, the output file base name and maximum size are configured in META_INF/properties.xml:

<property>
  <name>dt.application.SimpleFileIO.operator.input.prop.directory</name>
  <value>/tmp/SimpleFileIO/input-dir</value>
</property>
<property>
  <name>dt.application.SimpleFileIO.operator.output.prop.filePath</name>
  <value>/tmp/SimpleFileIO/output-dir</value>
</property>
<property>
  <name>dt.application.SimpleFileIO.operator.output.prop.fileName</name>
  <value>myfile</value>
</property>
<property>
  <name>dt.application.SimpleFileIO.operator.output.prop.maxLength</name>
  <value>1000000</value>
</property>

The application DAG is created in Application.java:

@Override
public void populateDAG(DAG dag, Configuration conf)
{
  // create operators
  FileLineInputOperator in = dag.addOperator("input", new FileLineInputOperator());
  FileOutputOperator out = dag.addOperator("output", new FileOutputOperator());

  // create streams
  dag.addStream("data", in.output, out.input);
}

The FileLineInputOperator is part of Malhar and is a concrete class that extends AbstractFileInputOperator. The FileOutputOperator is defined locally and extends the AbstractFileOutputOperator and overrides 3 methods: + getFileName which simply returns the current file name + getBytesForTuple which appends a newline to the argument string, converts it to an array of bytes and returns it. + setup which creates the actual file name by appending the operator id to the configured base name (this last step is necessary when partitioning is involved to ensure that multiple partitions do not write to the same file).

Output files are created with temporary names like myfile_p2.0.1465929407447.tmp and renamed to myfile_p2.0 when they reach the maximum configured size.

Application — database to file

The jdbcIngest application reads rows from a table in MySQL, creates Java objects (_POJO_s) and writes them to a file in the user specified directory in HDFS.

Application configuration values are specified in 2 files: META_INF/properties.xml and src/site/conf/example.xml. The former uses the in-memory database HSQLDB and is used by the unit test in JdbcInputAppTest; this test can be run, as described earlier, either in your IDE or using maven on the command line. The latter uses MySql and is intended for use on a cluster. To run on a cluster you'll need a couple of preparatory steps:

  • Make sure MySql is installed on the cluster.
  • Change example.xml to reflect proper values for databaseUrl, userName, password and filePath.
  • Create the required table and rows by running the SQL queries in the file src/test/resources/example.sql.
  • Create the HDFS output directory if necessary.
  • Build the project to create the .apa package
  • Launch the application, selecting example.xml as the configuration file during launch.
  • Verify that the expected output file is present.

Further details on these steps are in the project README.md file.

The application uses two operators: The first is FileLineOutputOperator which extends AbstractFileOutputOperator and provides implementations for two methods: getFileName and getBytesForTuple. The former creates a file name using the operator id ‐ this is important if this operator has multiple partitions and prevents the partitions from writing to the same file (which could cause garbled data). The latter simply converts the incoming object to an array of bytes and returns it.

The second is JdbcPOJOInputOperator which comes from Malhar; it reads records from a table and outputs them on the output port; they type of object that is emitted is specified by the value of the TUPLE_CLASS attribute in the configuration file namely PojoEvent in this case. This operator also needs a couple of additional properties: (a) a list of FieldInfo objects that describe the mapping from table columns to fields of the Pojo; and (b) a store object that deals with the details of establishing a connection to the database server.

The application itself is then created in the usual way in JdbcHDFSApp.populateDAG:

JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator());
jdbcInputOperator.setFieldInfos(addFieldInfos());

jdbcInputOperator.setStore(new JdbcStore());

FileLineOutputOperator fileOutput = dag.addOperator("FileOutputOperator", new FileLineOutputOperator());

dag.addStream("POJO's", jdbcInputOperator.outputPort, fileOutput.input).setLocality(Locality.CONTAINER_LOCAL);

AppHub

AppHub is a source of application templates. There are many more applications for various use cases to help jump start development effort.

Additional Resources

  1. Development environment setup

  2. Troubleshooting guide