HDHT

Some applications need to compute values based not only on current event flow but also on historical data. HDHT provides a simple interface to store and access historical data in an operator. HDHT is an embedded state store with key value interface on top of the Hadoop file system. It is fully integrated into the Apache Apex operator model and provides persistent storage with exactly-once guarantee.

The programming model of a key-value store or hash table can be applied to a wide range of common use cases. Within most streaming applications, ingested events or computed data already carry the key that can be used for storage and retrieval. Many operations performed during computation require key based access. HDHT provides an embedded key value store for the application. The advantage of HDHT over other key value stores in streaming applications are

  • Embedded, Hadoop native solution. Does not require install/manage of other services.
  • Integrates with Apex check-pointing mechanism to provide exactly once guarantee.

This document provides overview of HDHT and instructions for using HDHT in an operator for storing and retrieving state of the operator.

Concepts

Write Ahead Log

Each tuple written to HDHT is written to Write Ahead Log (WAL) first. The WAL is used to recover in-memory state and provide exactly once processing after failure of an operator. HDHT stores WAL on HDFS to prevent data loss during node failure.

Uncommitted Cache

Uncommitted cache is in-memory key value store. Initially updates are written to this memory store, to avoid disk I/Os on every update. When data in Uncommitted cached reaches a configured limit, it is written on the disk. It avoids frequent data flushes and small file creation by writing data in bulk from the cache to disk thereby also improving throughput.

Data Files

HDHT flushes memory to persisted storage periodically. The data is kept indexed for efficient retrieval of given key. HDHT supports multiple data file backends. Default backend used is DTFile, which is a modified version of Hadoop TFile with block cache for speedy lookups. Available backends are - TFile: Files are written in hadoop Tfile format - DTFile: Files are written in TFile format; during lookup HDHT maintains a block cache to reduce disk I/Os. - HFile : Files are written in HBase format.

Metadata

Metadata file keeps information about data files. Each data file record contains start key and name of the file. Metadata file also contains WAL recovery information, which is used during recovery after failure.

Partition (HDHT Bucket)

By default, when the operator is partitioned, the partitioning is reflected by HDHT in the filesystem by using a separate directory for each operator partition. Each directory is accessed only by the associated operator partition. Each partition has its own WAL and metadata file. Each HDHT partition is identified by bucketKey, which is also the name of the subdirectory used for storing data for the partition.

Interface

HDHT supports two basic operations get and put, they are wrapped by interfaces HDHT.Writer, HDHT.Reader and an abstract implementation is provided by the HDHTReader and HDHTWriter classes.

Operations supported by HDHT are.

byte[] get(long bucketKey, Slice key) throws IOException;
void put(long bucketKey, Slice key, byte[] value) throws IOException;
byte[] getUncommitted(long bucketKey, Slice key);

All methods takes bucketKey as the first argument. The bucketKey is used as a partition key within HDHT.

  • put store data in HDHT. The data written is written to the WAL first and then stored in uncommitted cache. After enough dirty data is accumulated in cache or enough time has elapsed from last flush, this cache is flushed to the data files.
  • getUncommitted does a lookup in uncommitted cache. Uncommitted cache is in-memory key value store.
  • get does a lookup in persisted storage file and return the data. Note get does not return data from uncommitted cache.

Architecture

Please refer to HDHT Blog for the architecture of HDHT.

Codec

HDHT provides an abstract implementation AbstractSinglePortHDHTWriter, which uses a user defined codec for serialization and de-serialization.

public interface HDHTCodec<EVENT> extends StreamCodec<EVENT>
{
  byte[] getKeyBytes(EVENT event);
  byte[] getValueBytes(EVENT event);
  EVENT fromKeyValue(Slice key, byte[] value);
}

It has following methods - getKeyBytes Return key as a byte array from the event. - getValueBytes Return value as a byte array from event. - fromKeyValue HDHT will use this function to deserialize key and value byte arrays to reconstruct the user event object. - getPartition This method is inherited from StreamCodec, its return value is used to determine HDHT bucket where this event will be written. The same stream codec is used for partition of the input port which make sure that data for same event goes to a single partition of the operator.

Configuration

fileStore

This setting determines the format in which files are written. Default is DTFileImpl.

basePath

Location in HDFS where data files are stored. This is required configuration parameter.

Property File Syntax

   <property>
     <name>dt.operator.{name}.store.basePath</name>
     <value>/home/hdhtdatadir</value>
  </property>

Java API.

/* select DTFile backend with basePath set to HDHTdata */
TFileImpl.DTFileImpl hdsFile = new TFileImpl.DTFileImpl();
hdsFile.setBasePath("HDHTdata");
store.setFileStore(hdsFile);

maxFileSize

Size of each file. Default value is 134217728134217728 (i.e 128MB).

Property File Syntax

 <property>
   <name>dt.operator.{name}.maxFileSize</name>
   <value>{value in bytes}</value>
 </property>

Java API.

store.setMaxFileSize(64 * 1024 * 1024);

flushSize

HDHT will flush data to files after number of unwritten tuples crosses this limit. Default value is 1000000.

Property File Syntax

<property>
  <name>dt.operator.{name}.flushSize</name>
  <value>{number}</value>
</property>

Java API.

store.setFlushSize(1000000);

flushIntervalCount

This setting will force data flush even if number of tuples are below flushSize. Default value is 120 windows.

Property File Syntax

 <property>
   <name>dt.operator.{name}.flushIntervalCount</name>
   <value>{number of windows}</value>
 </property>

Java API.

store.setFlushIntervalCount(120);

maxWalFileSize

Write Ahead Log segment size. Older segments are deleted once data is written to the data files. Default value is 67108864 (64MB)

Property File Syntax

<property>
  <name>dt.operator.{name}.maxWalFileSize</name>
  <value>{value in bytes}</value>
</property>

Java API.

store.setMaxWalFileSize(128 * 1024 * 1024);

Example

This is a sample reference implementation, which computes how many times a word was seen in an application. The partial count is stored in the HDHT. The application does a lookup for the previous count and writes back the incremented count in HDHT.

Store Operator

HDHT provides following abstract implementations HDHTReader - This class implements functionality required for get, It access HDHT in read-only mode. HDHTWriter - This class extends functionality of HDHTReader by adding support for put, this class also maintains uncommitted cache, which can be accessed through getUncommitted method. * AbstractSinglePortHDHTWriter - This class extends from HDHTWriter and provides common functionality required for the operator. This class support code for operator partitioning. Also it provides an input port with a default implementation of storing value received on the port to the HDHT using the coded provided.

For this example we will use AbstractSinglePortHDHTWriter for the store, we need to implement codec which is used by AbstractSinglePortHDHTWriter for serialization and deserialization. Following is a simple serializer which serializes key and ignores the value part, as the input to the operator is only keys.

Implement a Codec

  public static class StringCodec extends KryoSerializableStreamCodec<String> implements HDHTCodec<String> {
    @Override
    public byte[] getKeyBytes(String s)
    {
      return s.getBytes();
    }

    @Override
    public byte[] getValueBytes(String s)
    {
      return s.getBytes();
    }

    @Override
    public String fromKeyValue(Slice key, byte[] value)
    {
      return new String(key.buffer, key.offset, key.length);
    }
  }

The store operator is implemented as shown below, we will need to provide an implementation of getCodec, and override processEvent to change default behavior of storing data in HDHT directly.

public class HDHTWordCounter extends AbstractSinglePortHDHTWriter<String>
{
  public transient DefaultOutputPort<Pair<String, Long>> out = new DefaultOutputPort<>();
  private transient HashMap<String, AtomicLong> cache;

  @Override
  protected HDHTCodec<String> getCodec()
  {
    return new StringCodec();
  }

  @Override
  protected void processEvent(String word) throws IOException
  {
    AtomicLong count = cache.get(word);
    if (count == null) {
      count = new AtomicLong(0L);
      cache.put(word, count);
    }
    count.incrementAndGet();
  }

  private void updateCount() throws IOException
  {
    for(Map.Entry<String, AtomicLong> entry : cache.entrySet()) {
      String word = entry.getKey();
      long prevCount = 0;
      byte[] key = codec.getKeyBytes(word);
      Slice keySlice = new Slice(key);
      long bucketKey = getBucketKey(word);
      /** First look for cached data */
      byte[] value = getUncommitted(bucketKey, keySlice);
      if (value == null) {
        /** look into persisted data files */
        value = get(bucketKey, keySlice);
        if (value == null) {
          value = ByteBuffer.allocate(8).putLong(0).array();
        }
      }

      prevCount = ByteBuffer.wrap(value).getLong();

      /** update count by taking new event into account */
      prevCount += entry.getValue().get();

      /** save computed result back to HDHT */
      put(bucketKey, keySlice, ByteBuffer.wrap(value).putLong(prevCount).array());

      /* emit updated counts on the output port */
      out.emit(new Pair<>(word, prevCount));
    }
  }

  @Override
  public void beginWindow(long windowId)
  {
    super.beginWindow(windowId);
    cache = new HashMap<>();
  }

  @Override
  public void endWindow()
  {
    try {
      updateCount();
      super.endWindow();
    } catch (IOException e) {

      throw new RuntimeException("Unable to flush to HDHT");
    }
  }
}

Sample Application.

@ApplicationAnnotation(name="HDHTWordCount")
public class HDHTWordCountApp implements StreamingApplication
{
  @Override
  public void populateDAG(DAG dag, Configuration configuration)
  {
    AbstractFileInputOperator.FileLineInputOperator gen = dag.addOperator("Reader", new AbstractFileInputOperator.FileLineInputOperator());
    gen.setDirectory("/home/data");

    WordSplitter splitter = dag.addOperator("Splitter", new WordSplitter());

    HDHTWordCounter store = dag.addOperator("Store", new HDHTWordCounter());
    ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator());

    TFileImpl.DTFileImpl hdsFile = new TFileImpl.DTFileImpl();
    hdsFile.setBasePath("WALBenchMarkDir");
    store.setFileStore(hdsFile);

    dag.addStream("lines", gen.output, splitter.input);
    dag.addStream("s1", splitter.output, store.input);
    dag.addStream("s2", store.out, console.input);
  }
}

Performance tuning

Effect of frequent WAL flushes.

HDHT stores Write Ahead Log (WAL) on HDFS, WAL is flushed at end of every application window. Operator will be blocked till WAL is persisted on the disks. This flush will add additional delay to the operator. To avoid frequent delay we can reduce the frequency of flush by increasing APPLICATION_WINDOW_SIZE.

Application Level Cache

Maintain a cache to avoid frequent serialization and de-serialization of events while accessing HDHT. For example in the provided example the operator keeps computed counts till the endWindow and flushes the data to HDHT at end of the application window. If duplicate keys are seen within an application window we will save on serialization and de-serialization time.

Key design

HDHT gives best performance if keys are monotonically increasing, In this case HDHT does not have to overwrite existing files, which avoids expensive disk I/O thus yielding optimal performance. Overwriting existing file is costly operation, as it involves reading file data to memory and applying new changes from committed cached which falls within the key range of file, and writing back changes to disk again. If you are storing time series data in HDHT, it is best to use timestamp as the leading field in the key.

For keys which are not monotonically increasing, key design should be such that hot keys falls in small number of files. For example, suppose each file size is S bytes and flush is triggered every T seconds, and HDFS write bandwidth per container is B bytes per second, in this case we can sustain the write throughput, if keys processed within 30 seconds span at most (T / (S/B)) files.

If S, T and B are 128MB, 30 seconds, and 40MB respectively, this expression evaluates to 10, so if your keys span more than 10 files with 30 seconds, the write cannot be sustained.

File Backend

Prefer DTFile backend implementation over TFile backend implementation if you are going to issue frequent get operations. DTFile backend caches file blocks in memory which reduces disk I/O during cache hit.

Limitations

  • Dynamic Partitioning is not supported yet.
  • Write to same bucket from multiple operator is not supported. The default implementation use derives bucketKey based on number of operator partitions and hashcode of the event. If user chooses to use different bucketKey he needs to make sure that a single bucketKey is handled by only one operator partition.