Beginner's Guide to Apache Apex
Introduction
Apache Apex is a fault-tolerant, high-performance platform and framework for building distributed applications; it is built on Hadoop. This guide is targeted at Java developers who are getting started with building Apex applications.
Quickstart
Those eager to get started right away can download, build, and run a few sample applications; just follow these steps:
- Make sure you have Java JDK, maven and git installed.
- Clone the examples git repo:
https://github.com/DataTorrent/examples
- Switch to
tutorials/fileOutput
and build it either in your favorite IDE or on the command line with:
cd examples/tutorials/fileOutput
mvn clean package -DskipTests
- Run the test in your IDE or on the command line with:
mvn test
Some of the applications are discussed in the Sample Applications section below. The rest of this document provides a more leisurely introduction to Apex.
Preliminaries
Before beginning development, you'll need to make sure the following prerequisites are present; details of setting up your development environment are here:
- Recent versions of the Java JDK, maven, git.
- A Hadoop cluster where the application can be deployed.
- A working internet connection.
Running the maven archetype
Apex applications use the maven
build tool. The maven archetype is useful for avoiding
the tedious process of creating a suitable pom.xml
maven build file by hand; it also
generates a simple default application with two operators that you can build and run
without having to write any code or make any changes. To run it, place the following
lines in a text file (called, say newapp.sh
) and run it with the shell
(e.g. bash newapp.sh
):
v="3.3.0-incubating"
mvn -B archetype:generate \
-DarchetypeGroupId=org.apache.apex \
-DarchetypeArtifactId=apex-app-archetype \
-DarchetypeVersion="$v" \
-DgroupId=com.example \
-Dpackage=com.example.myapexapp \
-DartifactId=myapexapp \
-Dversion=1.0-SNAPSHOT
As new versions are released, you might need to update the version number (the
the variable v
above). You can also run the archetype from your Java IDE as described
here.
It should create a new project directory named myapexapp
with these 3 Java source files:
src/test/java/com/example/myapexapp/ApplicationTest.java
src/main/java/com/example/myapexapp/Application.java
src/main/java/com/example/myapexapp/RandomNumberGenerator.java
The project should also contain these properties files:
src/site/conf/my-app-conf1.xml
src/main/resources/META-INF/properties.xml
You should now be able to step into the new directory and build the project:
cd myapexapp; mvn clean package -DskipTests
This will create a directory named target
and an application package file
within it named myapexapp-1.0-SNAPSHOT.apa
.
These files are discussed further in the sections below.
The default application
We now discuss the default application generated by the archetype in some detail.
Additional, more realistic applications are presented in the section titled
Sample Applications
below.
The default application creates an application in Application.java
with 2 operators:
@ApplicationAnnotation(name="MyFirstApplication")
public class Application implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf) {
RandomNumberGenerator randomGenerator = dag.addOperator("randomGenerator", RandomNumberGenerator.class);
randomGenerator.setNumTuples(500);
ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
dag.addStream("randomData", randomGenerator.out, cons.input).setLocality(Locality.CONTAINER_LOCAL);
}
}
The application is named MyFirstApplication via the annotation
@ApplicationAnnotation
; this name will be displayed in the UI console and must be
unique among the applications running on a cluster. It can also be changed at launch time.
The populateDAG
method is the only one that you'll need to implement. Its contents
fall into three categories: operator creation, operator configuration, and stream
creation.
Two operators are created: randomGenerator
and console
. The first is
defined in RandomNumberGenerator.java
; it generates random
floating point values and emits them on its output port. The second is an instance
of ConsoleOutputOperator
class defined in Malhar – the library of pre-built
operators. The first argument to addOperator()
is the name of this operator instance
(there can be multiple instances of the same class, so we need a unique name to
distinguish them); the second can be either a class object that needs to be instantiated
as shown for randomGenerator
or an actual instance of that class as shown for console
.
The operators can be configured by calling setter methods on them; the call to
setNumTuples
is an example. However, operators are typically configured via XML
properties files as discussed in later sections below.
Finally, the addStream
call creates a stream named randomData
connecting the
output port of first operator to the input port of the second.
Running the application and the unit test
The file ApplicationTest.java
contains a unit test that can be run from an IDE
by highlighting the testApplication
method and selecting the appropriate
option from the dropdown; it can also be run the maven command line:
mvn -Dtest=ApplicationTest#testApplication test
It runs for about 10 seconds printing each random number with a prefix of hello world:
.
The first argument explicitly selects the test to run (testApplication
)
from the named class (ApplicationTest
); you can omit it and just run mvn test
to
run all of the unit tests.
It is important to note that this particular test is actually a test of the entire
application rather than a single class or a method within a class. It uses a class
called LocalMode
to essentially simulate a cluster. It is an extremely useful
technique for testing your application without the need for a cluster. It can be used
in more elaborate ways to test complex applications as discussed in the section
entitled Local Mode Testing
below.
To run the application, you need access to a cluster with Hadoop installed; there are multiple options here:
- Download and setup the sandbox as described here. This is the simplest option for experimenting with Apex since it has all the necessary pieces installed.
- Download and install the DataTorrent Community or Enterprise Edition downloadable from here.
- Use an existing DataTorrent RTS licensed installation.
- Clone the Apex source code on a cluster with Hadoop already installed, build it and
use the
apexcli
command line tool (previously nameddtcli
) from there to run your application as described in this video.
With the first 3 methods, you have a browser-based GUI console and you can simply
navigate to Develop
⇒ Upload Package
and upload the .apa
file built earlier;
then run it using the launch
button. If using the command line tool, run that tool
then use the command launch myapexapp-1.0-SNAPSHOT.apa
to launch the application. You
can also specify a particular XML configuration file that is packaged with the
application to use during launch, for example:
launch -apconf my-app-conf1.xml myapexapp-1.0-SNAPSHOT.apa
For an exhaustive list of commands available with this tool, please see
here.
More details about configuration files are provided in the next section.
Configuring your application
Application configuration is typically done via XML properties files though it can also
be done with code as shown above. The properties.xml
file mentioned earlier has some
examples:
<property>
<name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
<value>1000</value>
</property>
<property>
<name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
<value>hello world: %s</value>
</property>
The number of tuples output per window (windows are discussed in greater detail below) is set both in the code and in this file; in such cases, values in this properties file override those set in the code.
Configuration values can also be placed in XML
files under site/conf
and the file my-app-conf1.xml
mentioned above is an example.
These files are not processed automatically; they need to be explicitly selected at
launch time. To do this in the GUI, select the Use saved configuration checkbox in
the launch dialog and choose the desired file from the dropdown. When a property is
specified in multiple files, precedence rules determine the final value; those rules
are discussed
here.
Attributes and Properties
Properties are simply public accessor methods in the operator classes and govern the
functionality of the operator. Attributes on the other hand are pre-defined and affect
how the operator or application behaves with respect to its environment. We have already
seen a couple of examples of properties, namely, the number of tuples emitted per window
by the random number generator (numTuples
) and the prefix string appended to each
value before it is output by the console operator (stringFormat
).
Operator properties that are more complex objects than the primitive types can also
be initialized from XML files. For example, if we have properties declared as
int[] counts;
and String[] paths;
in an operator named foo
, we can initialize
them with:
<property>
<name>dt.operator.foo.prop.counts</name>
<value>10, 20, 30</value>
</property>
<property>
<name>dt.operator.foo.prop.paths</name>
<value>"/tmp/path1", "/tmp/path3", "/tmp/path3"</value>
</property>
An example of an attribute is the amount of memory allocated to the Buffer Server (see
section below entitled Buffer Server
); it is named BUFFER_MEMORY_MB
can be set
like this:
<property>
<name>dt.application.{appName}.operator.{opName}.port.{portName}.attr.BUFFER_MEMORY_MB</name>
<value>128</value>
</property>
Here {appName}, {opName}, {portName} are appropriate application, operator and port names respectively; they can also be replaced with asterisks (wildcards). The default value is 512MB.
Some additional attributes include:
Attribute name | Description |
---|---|
PARTITIONER |
custom partitioner class associated with an operator |
PARTITION_PARALLEL |
if true, triggers parallel partitioning of a downstream operator when an upstream operator is partitioned. |
STREAM_CODEC |
controls serialization/deserialization as well as destination partitions |
RECOVERY_ATTEMPTS |
maximum restart attempts of a failed operator |
Currently available attribute names are in Context
class of the api
module in
Apex source code.
For additional examples of initialization of properties, including list and map objects, please look here.
Local Mode Testing
As noted above, the LocalMode
class is used for testing the application locally in
your development environment. A common, though suboptimal, use looks like this:
try {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
lma.prepareDAG(new Application(), conf);
LocalMode.Controller lc = lma.getController();
lc.run(10000); // runs for 10 seconds and quits
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
Here, a Configuration object containing all the appropriate settings of properties and
attributes for the application is created by parsing the default properties.xml
file,
a new Application object is created and configured and finally a controller used for
timed execution of the application. This approach, though occasionally useful to uncover
shallow bugs, has one glaring deficiency – it does not check the results in any way
as most unit tests do. We strongly recommend avoiding this usage pattern.
A far better (and recommended) approach is to write data to some storage system that can then be queried for verification. An example of this approach is here and looks like this:
try {
LocalMode lma = LocalMode.newInstance();
lma.prepareDAG(new Application(), getConfig());
LocalMode.Controller lc = lma.getController();
lc.runAsync();
// wait for output files to show up
while ( ! check() ) {
System.out.println("Sleeping ....");
Thread.sleep(1000);
}
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
Here, we invoke runAsync
on the controller to fork off a separate thread to run the
application while the main thread enters a loop where it looks for the presence of the
expected output files. A pair of functions can be defined to setup prerequisites such
as starting external servers, creating directories, etc. and perform the corresponding
teardown upon test termination; these functions are annotated with @Before
and @After
.
Checking logs
Logs for the Application Master and the various containers can be retrieved and viewed
on the UI console by navigating to the Physical
tab, clicking on the specific container
in question, clicking on the blue logs
button and then selecting the appropriate file
from the dropdown. If you don't have access to the UI, you'll need to log in to the
appropriate node on the cluster and check the logs there.
A good starting point is the
YARN log file which is usually present at /var/log/hadoop-yarn
or /var/log/hadoop
or
similar locations and named yarn-{user}-resourcemanager-{host}.log
or
hadoop-cmf-yarn-RESOURCEMANAGER-{host}.log.out
(where {user} and {host} have
appropriate values) or something similar. This file will
indicate what containers were allocated for each application, whether the allocation
succeeded or failed and where each container is running. Typically, application and
container ids will have the respective forms application_1448033276100_0001
and
container_1462948052533_0001_01_022468
.
If the application failed during launch, the YARN logs will usually have enough information to diagnose the root cause -- for example, a container requiring more memory than is available. If the application launch was successful, you'll see the containers transition through various states: NEW, ALLOCATED, ACQUIRED, RUNNING.
If it failed after launch, the logs of the particular container that failed will shed more light on the issue. Those logs are located on the appropriate node under a directory with the same name as the container id which itself is under a directory named with the application id, for example:
dtadmin@dtbox:/sfw/hadoop/shared/logs$ cd nodemanager/application_1465857463845_0001/
dtadmin@dtbox:/sfw/hadoop/shared/logs/nodemanager/application_1465857463845_0001$ ls
container_1465857463845_0001_01_000001 container_1465857463845_0001_01_000003
container_1465857463845_0001_01_000002 container_1465857463845_0001_01_000004
dtadmin@dtbox:/sfw/hadoop/shared/logs/nodemanager/application_1465857463845_0001$ ls *1
AppMaster.stderr AppMaster.stdout dt.log
dtadmin@dtbox:/sfw/hadoop/shared/logs/nodemanager/application_1465857463845_0001$ ls *2
dt.log stderr stdout
The Application Master container id always has the suffix _000001
.
Operators, Ports and Streams
An operator is, simply put, a class that
implements the Operator
interface. Though the class could be written to directly
implement that interface, a more common and easier method is to extend BaseOperator
since it provides default empty implementations of all the required methods.
We've already seen an example above, namely RandomNumberGenerator
.
A port is a class that can either emit (output port) or ingest (input port) data.
Input and output ports implement the InputPort
and OutputPort
interfaces
respectively. More commonly, output ports are simply defined as instances of
DefaultOutputPort
, for example:
public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>();
and input ports are defined as anonymous inner classes that extend DefaultInputPort
:
public final transient DefaultInputPort<Double> input = new DefaultInputPort<Double>()
{
@Override
public void process(Double v) {
out.emit(v);
}
}
A stream is the set of links connecting a single port of an upstream operator to one or
more input ports of downstream operators. We've already seen an example of a stream above,
namely randomData
. If the upstream operator is not partitioned, tuples are delivered to
the input ports of a stream in the same order in which they were written to the output
port; this guarantee may change in the future. For a more detailed explanation of these
concepts, please
look here.
Annotations
Annotations are an important tool for expressing desired guarantees which are then verified in a validation phase before running the application. Some examples:
@Min(1)
int index;
@NotNull
String name
@NotNull
@Size(min=1, max=128)
private String[] path;
@NotNull
@Size(min = 1)
private final Set<String> files;
@Valid
FooBar object;
The @Min
and @Max
annotations check lower and upper bounds; @Size
checks the
size of a collection or array against minimum and maximum limits; @Valid
requests
recursive validation checks on the object.
There are also a few Apex-specific annotations; we've seen one example above, namely
@ApplicationAnnotation
used to set the name of the application. A couple of others
are useful to declare that a port within an operator need not be connected:
@InputPortFieldAnnotation(optional = true)
public final transient InputPort<Object> inportWithCodec = new DefaultInputPort<>();
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Map<String, Object>> outBindings
= new DefaultOutputPort<>();
For additional information about annotations, please see: here and here
Partitioners, Unifiers and StreamCodecs
Partitioning is a mechanism for load balancing; it involves replicating one or more
operators so that the load can be shared by all the replicas. Partitioning is
accomplished by the definePartitions
method of the Partitioner
interface.
A couple of implementations are available: StatelessPartitioner
(static partitioning)
and StatelessThroughputBasedPartitioner
(dynamic partitioning, see below).
These can be used by setting the PARTITIONER
attribute on the operator by including a stanza like the following in your
configuration file (where {appName} and {opName} are the
appropriate application and operator names):
<property>
<name>dt.application.{appName}.operator.{opName}.attr.PARTITIONER</name>
<value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value>
</property>
The number after the colon specifies the number of desired partitions. This can be
done for any operator that is not a connector (i.e. input or output operator) and is not
annotated with @OperatorAnnotation(partitionable = false)
. No code changes are necessary.
Incoming tuples entering input ports of the operator are automatically distributed among the
partitions based on their hash code by default. You can get greater control of how tuples
are distributed to partitions by using a StreamCodec
; further discussion of stream
codecs is deferred to the Advanced Guide [coming soon].
A small sample program illustrating use of stream codecs is
here.
Connectors need special care since they interact with external systems. Many connectors
(e.g. Kafka input, file input and output operators) implement the Partitioner
interface and support partitioning using custom implementations of definePartitions
within the operator.
Documentation and/or source code of the individual connectors should be consulted for
details.
Sometimes there is need to replicate an entire linear segment of the DAG; this is
known as Parallel Partitioning
and is achieved by setting the PARTITION_PARALLEL
attribute on the input port of each downstream operator that is part of the linear
segment. Both of these mechanisms are described in greater detail in the
Advanced Features section of the
Top N Words tutorial
As mentioned above, the StatelessPartitioner
is used for static
partitioning since it occurs once before the application starts. Dynamic partitioning
while the application is running is also possible using the
StatelessThroughputBasedPartitioner
or a custom partitioner. Implementing such
a partitioner needs special care, especially if the operator to be partitioned
has some accumulated state since this state typically needs to be redistributed among
the newly created partitions. An example of a custom partitioner that does dynamic
partitioning is here.
Unifiers are the flip side of the partitioning coin: When data that was intended to be processed by a single instance is now processed by multiple partitions, each instance computes partial results since it processes only part of the stream; these partial results need to be combined to form the final result; this is the function of a unifier.
For example, suppose an operator is processing numbers and computes the sum of all the values seen in a window. If it is partitioned into N replicas, each replica is computing a partial sum and we would need a unifier that computes the overall sum from these N partial sums. A sample application that shows how to define and use a unifier is available here.
A unifier for an operator is provided by a suitable override of the getUnifier()
method
of the output port, for example:
public final transient DefaultOutputPort<HighLow<Integer>> out
= new DefaultOutputPort<HighLow<Integer>>() {
@Override
public Unifier<HighLow<Integer>> getUnifier() {
return new UnifierRange<Integer>()
}
};
If no unifier is supplied for a partitioned operator, the platform will supply a default pass-through unifier.
When the number of partitions is large and the unifier involves non-trivial computations
there is a risk that it can become a bottleneck; in such cases, the UNIFIER_LIMIT
attribute can be set on the appropriate output port. The platform will then automatically
generate the required number of parallel unifiers, cascading into multiple levels if
necessary, to ensure that the number of input streams at each unifier does not exceed
this limit.
Buffer Server
The Buffer Server
is a separate service within a container which implements
a publish-subscribe model. It is present whenever the container hosts an operator
with an output port connected to another operator outside the container.
The output port is the publisher and the connected input ports of downstream operators
are the subscribers. It buffers tuples so that they can be replayed when a
downstream operator fails and is restarted. As described earlier, the memory allocated
to a buffer server is user configurable via an attribute named BUFFER_MEMORY_MB
and
defaults to 512MB.
The total memory required by a container that hosts many such operators may climb rapidly; reducing the value of this attribute is advisable in such cases in a memory constrained environment.
Allocating Operator Memory
A container is a JVM process; the maximum memory each such container can consume is
user configurable via the MEMORY_MB
attribute whose default value is 1GB.
If it is too large, container allocation may fail before the application even
begins; if it is too small, the application may fail at run time as it tries to
allocate memory and runs up against this limit.
An example of setting this limit:
<property>
<name>dt.application.MyFirstApplication.operator.randomGenerator.attr.MEMORY_MB</name>
<value>300</value>
</property>
As before, wildcards can be used to set the same value for all operators by replacing
the operator name randomGenerator
with an asterisk. The application name can also be
omitted, so a shorter version is:
<property>
<name>dt.operator.*.attr.MEMORY_MB</name>
<value>300</value>
</property>
The Application Master (aka StrAM) is a supervisory Java process associated with each application; memory allocation for it is specified slightly differently:
<property>
<name>dt.attr.MASTER_MEMORY_MB</name>
<value>700</value>
</property>
For small applications, a value as low as 700 may be adequate; for larger applications, a value 2000 or more may be needed. If this value is too small, the application typically fails at startup with no user-visible diagnostics; YARN logs need to be examined in such cases.
Sample Applications
This section briefly discusses some sample applications using commonly used connectors. The applications themselves are available at Examples. We briefly describe a few of them below.
Each example has a brief README.md
file (in markdown format) describing what the
application does. In most cases, the unit tests function as full application tests that
can be run locally in your development environment without the need for a cluster as
described above.
Application — file copy
The fileIO-simple
application copies all data verbatim from files added to an input
directory to rolling files in an output directory. The input and output directories,
the output file base name and maximum size are configured in META_INF/properties.xml
:
<property>
<name>dt.application.SimpleFileIO.operator.input.prop.directory</name>
<value>/tmp/SimpleFileIO/input-dir</value>
</property>
<property>
<name>dt.application.SimpleFileIO.operator.output.prop.filePath</name>
<value>/tmp/SimpleFileIO/output-dir</value>
</property>
<property>
<name>dt.application.SimpleFileIO.operator.output.prop.fileName</name>
<value>myfile</value>
</property>
<property>
<name>dt.application.SimpleFileIO.operator.output.prop.maxLength</name>
<value>1000000</value>
</property>
The application DAG is created in Application.java
:
@Override
public void populateDAG(DAG dag, Configuration conf)
{
// create operators
FileLineInputOperator in = dag.addOperator("input", new FileLineInputOperator());
FileOutputOperator out = dag.addOperator("output", new FileOutputOperator());
// create streams
dag.addStream("data", in.output, out.input);
}
The FileLineInputOperator
is part of Malhar and is a concrete class that extends
AbstractFileInputOperator
. The FileOutputOperator
is defined locally
and extends the AbstractFileOutputOperator
and overrides 3 methods:
+ getFileName
which simply returns the current file name
+ getBytesForTuple
which appends a newline to the argument string, converts it to an array of bytes and returns it.
+ setup
which creates the actual file name by appending the operator id to the configured base name (this last step is necessary when partitioning is involved to ensure that multiple partitions do not write to the same file).
Output files are created with temporary names like myfile_p2.0.1465929407447.tmp
and
renamed to myfile_p2.0
when they reach the maximum configured size.
Application — database to file
The jdbcIngest
application reads rows from a table in MySQL
, creates Java objects
(_POJO_s) and writes them to a file in the user specified directory in HDFS.
Application configuration values are specified in 2 files:
META_INF/properties.xml
and src/site/conf/example.xml
. The former uses the in-memory
database HSQLDB
and is used by the unit test in JdbcInputAppTest; this test can be
run, as described earlier, either in your IDE or using maven on the command line.
The latter uses MySql and is intended for use on a cluster. To run on a cluster you'll
need a couple of preparatory steps:
- Make sure MySql is installed on the cluster.
- Change
example.xml
to reflect proper values fordatabaseUrl
,userName
,password
andfilePath
. - Create the required table and rows by running the SQL queries in the file
src/test/resources/example.sql
. - Create the HDFS output directory if necessary.
- Build the project to create the
.apa
package - Launch the application, selecting
example.xml
as the configuration file during launch. - Verify that the expected output file is present.
Further details on these steps are in the project README.md
file.
The application uses two operators: The first is FileLineOutputOperator
which extends
AbstractFileOutputOperator
and provides implementations for two methods:
getFileName
and getBytesForTuple
. The former creates a file name using the operator
id ‐ this is important if this operator has multiple partitions and prevents the
partitions from writing to the same file (which could cause garbled data). The latter
simply converts the incoming object to an array of bytes and returns it.
The second is JdbcPOJOInputOperator
which comes from Malhar; it reads records from
a table and outputs them on the output port; they type of object that is emitted is
specified by the value of the TUPLE_CLASS
attribute in the configuration file
namely PojoEvent
in this case. This operator also needs a couple of additional
properties: (a) a list of FieldInfo
objects that describe the mapping from table
columns to fields of the Pojo; and (b) a store
object that deals with the details
of establishing a connection to the database server.
The application itself is then created in the usual way in JdbcHDFSApp.populateDAG
:
JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator());
jdbcInputOperator.setFieldInfos(addFieldInfos());
jdbcInputOperator.setStore(new JdbcStore());
FileLineOutputOperator fileOutput = dag.addOperator("FileOutputOperator", new FileLineOutputOperator());
dag.addStream("POJO's", jdbcInputOperator.outputPort, fileOutput.input).setLocality(Locality.CONTAINER_LOCAL);
AppHub
AppHub is a source of application templates. There are many more applications for various use cases to help jump start development effort.