Operators in Top N words application

This section describes the operators used for building the top N words application. The operators, the implementing classes and a brief description of their functionalities are described in this table.


Implementing class




Reads lines from input files.



Splits a line into words.



Computes word frequencies for a single window.



Maintains per-file and global word frequencies.



Writes top N words and their frequencies to output files.



Writes received tuples to console.



Caches the last data set for the current file, and returns it in response to queries.



Caches the last global data set, and returns it in response to queries.



Receives queries for per-file data.



Receives queries for global data.



Returns results for per-file queries.



Returns results for global queries.

We now describe the process of wiring these operators together in the populateDAG() method of the main application class ApplicationWithQuerySupport. First, the operators are created and added to the DAG via the addOperator method:

LineReader lineReader = dag.addOperator("lineReader",new LineReader());

The first argument is a string that names this instance of the operator; it is the same as the value in the first column of the above table and also the node name in the Logical DAG.

Next, we connect each output port of an operator with all the input ports that should receive these tuples using the addStream function, for example:

dag.addStream("lines", lineReader.output, wordReader.input);
dag.addStream("WordCountsFile", fileWordCount.outputPerFile, snapshotServerFile.input, console.input);

Notice that the stream from fileWordCount.outputPerFile (which consists of the top N words for the current file as the file is being read) goes to snapshotServerFile.input (where it will be saved to respond to queries) and to console.input (which is used for debugging). Additional sinks can be provided in the same call as additional terminal arguments. You can examine the rest of these calls and ensure that they match the names and connections of the Logical DAG.

This section provides detailed information about each operator.


This class extends AbstractFileInputOperator<String> to open a file, read its lines, and emit them as tuples. It has two output ports, one for the normal output of tuples, and the other for the output of an EOF tuple indicating the end of the current input file. Ports should always be transient fields because they should not be serialized and saved to the disk during checkpointing.

The base class keeps track of files already processed, files that should be ignored, and files that failed part-way. Derived classes need to override four methods: openFile, closeFile, readEntity, and emit. Of these, only the third is non-trivial: if a valid line is available, it is read and returned. Otherwise, the end of the file must have been reached. To indicate this, the file name is emitted on the control port where it will be read by the FileWordCount operator.


This operator receives lines from LineReader on the input port and emits words on the output port. It has a configurable property called nonWordStr along with associated public getter and setter methods. Such properties can be customized in the appropriate properties file of the application. The values of the properties are automatically injected into the operator at run-time. In this scenario, this string is provided the value of the property dt.application.TopNWordsWithQueries.operator.wordReader.nonWordStr. For efficiency, this string is compiled into a pattern for repeated use. The process method of the input port splits each input line into words using this pattern as the separator, and emits non-empty words on the output port.


This operator receives words and emits a list of word-frequency pairs for each window. It maintains a word-frequency map for the current window, updates this map for each word received, emits the whole map (if non-empty) when endWindow is called, and clears the map in preparation for the next window. This design pattern is appropriate because for normal text files, the number of words received is far more than the size of the accumulated map. However, for situations where data is emitted for each tuple, you should not wait till the endWindow call, but rather emit output tuples as each input tuple is processed.


This operator has two input ports, one for the per-window frequency maps it gets from the previous operator, and a control port to receive the file name when LineReader reaches the end of a file. When a file name is received on the control port, it is saved and the final results for the file appear as output at the next endWindow. The reason for waiting is subtle: there is no guarantee of the relative order in which tuples arrive at two input ports; additional input tuples from the same window can arrive at the input port even after the EOF was received on the control port. Note however that we do have a guarantee that tuples on the input port will arrive in exactly the same order in which they were emitted on the output port between the bracketing beginWindow and endWindow calls by the upstream operator.

This operator also has three output ports: the outputPerFile port for the top N pairs for the current file as it is being read; the outputGlobal port for the global top N pairs, and the fileOutput port for the final top N pairs for the current file computed after receiving the EOF control tuple. The output from the first is sent to the per-file snapshot server, the output from the second is sent to the global snapshot server, and the output from the last is sent to the operator that writes results to the output file.

FileWordCount also maintains two maps for per-file and global frequency counts because they track frequencies of all words seen so far. These maps can get very large as more and more files are processed.

FileWordCount has a configurable property topN for the number of top pairs we are interested in. This was configured in our properties file with a value of 10 and the property name: dt.application.TopNWordsWithQueries.operator.fileWordCount.topN

In the endWindow call, both maps are passed to the getTopNList function where they are flattened, sorted in descending order of frequency, stripped of all but the top N pairs, and returned for output. There are a couple of additional fields used to cast the output into the somewhat peculiar form required by the snapshot server.


This operator extends AbstractFileOutputOperator<Map<String,Object>>, and simply writes the final top N pairs to the output file. As with LineReader, most of the complexity of WordCountWriter is hidden in the base class. You must provide implementations for 3 methods: endWindow, getFileName, and getBytesForTuple. The first method calls the base class method requestFinalize. The output file is written periodically to temporary files with a synthetic file name that includes a timestamp. These files are removed and the actual desired file name is restored by this call. The getFileName method retrieves the file name from the tuple, and the getBytesForTuple method converts the list of pairs to a string in the desired format.


This is an output operator that is a part of the Malhar library. It simply writes incoming tuples to the console and is useful when debugging.


This operator is also part of the Malhar library and is used to store snapshots of data. These snapshots are used to respond to queries. For this application, we use two snapshots — one for a per-file top N snapshot and one for a global snapshot.


This is an input operator that is a part of the Malhar library. It is used to send queries to an operator via the Data Torrent Gateway, which can act as a message broker for limited amounts of data using a topic-based publish-subscribe model. The URL to connect is typically something like:


where gateway-host and port should be replaced by appropriate values.

A publisher sends a JSON message to the URL where the value of the data key is the desired message content. The JSON might look like this:

{"type":"publish", "topic":"foobar", "data": ...}

Correspondingly, subscribers send messages like this to retrieve published message data:

{"type":"subscribe", "topic":"foobar"}

Topic names need not be pre-registered anywhere but the same topic name (for example, foobar in the example) must be used by both publisher and subscriber. Additionally, if there are no subscribers when a message is published, it is simply discarded.

For this tutorial, two query operators are used: one for per-file queries and one for global queries. The topic names were configured in the properties file earlier with values TopNWordsQueryFile and TopNWordsQueryGlobal under the respective names:



Analogous to the previous operator, this is an output operator used to publish query results to a gateway topic. You must use two of these to match the query operators, and configure their topics in the properties file with values TopNWordsQueryFileResult and TopNWordsQueryGlobalResult corresponding to the respective names:


Further Exploration

In this tutorial, the property values in the properties.xml file were set to limit the amount of memory allocated to each operator. You can try varying these values and checking the impact of such an operation on the stability and performance of the application. You can also explore the largest text file that the application can handle.

Another aspect to explore is fixing the current limitation of one-file-at-a-time processing; if multiple files are dropped into the input directory simultaneously, the file reader can switch from one file to the next in the same window. When the FileWordCount operator gets an EOF on the control port, it waits for an endWindow call to emit word counts so those counts will be incorrect if tuples from two different files arrive in the same window. Try fixing this issue.

DataTorrent terminology


Operators are basic computation units that have properties and attributes, and are interconnected via streams to form an application. Properties customize the functional definition of the operator, while attributes customize the operational behavior. You can think of operators as classes for implementing the operator interface. They read from incoming streams of tuples and write to other streams.


A stream is a connector (edge) abstraction which is a fundamental building block of DataTorrent RTS. A stream consists of tuples that flow from one input port to one or more output ports.


Ports are transient objects declared in the operator class and act connection points for operators. Tuples flow in and out through ports. Input ports read from streams while output port write to streams.

Directed Acyclic Graph (DAG)

A DAG is a logical representation of real-time stream processing application. The computational units within a DAG are called operators and the data-flow edges are called data streams.

Logical Plan or DAG

Logical Plan is the Data Object Model (DOM) that is created as operators and streams are added to the DAG. It is identical to a DAG.

Physical Plan or DAG

A physical plan is the physical representation of the logical plan of the application, which depicts how applications run on physical containers and nodes of a DataTorrent cluster.

Data Tuples Processed

This is the number of data objects processed by real-time stream processing applications.

Data Tuples Emitted

This is the number of data objects emitted after real-time stream processing applications complete processing operations.

Streaming Application Manager (STRAM)

Streaming Application Manager (STRAM) is a YARN-native, lightweight controller process. It is the process that is activated first upon application launch to orchestrate the streaming application.

Streaming Window

A streaming window is a duration during which a set of tuples are emitted. The collection of these tuples constitutes a window data set, also called as an atomic micro-batch.

Sliding Application Window

Sliding window is computation that requires "n" streaming windows. After each streaming window, the nth window is dropped, and the new window is added to the computation.

Demo Applications

The real-time stream processing applications which are packaged with the DataTorrent RTS binaries, are called demo applications. A Demo application can be launched standalone, or on a Hadoop cluster.

Command-line Interface

Command line interface (CLI) is the access point for applications. This is a wrapper around the web services layer, which makes the web services user friendly.

Web services

DataTorrent RTS platform provides a robust webservices layer called DT Gateway. Currently, Hadoop provides detailed web services for map-reduce jobs. The DataTorrent RTS platform leverages the same framework to provide a web service interface for real-time streaming applications.