Monday, December 21, 2009

Google Basic Building Block - Protocol Buffers

“Protocol Buffers” is an important one of Google’s basic building blocks. It’s is a way of encoding structured data in an efficient yet extensible format, and a compiler that generates convenient wrappers for manipulating the objects in a variety of languages. Protocol Buffers are used extensively at Google for almost all RPC protocols, and for storing structured information in a variety of persistent storage systems.

When to use Protocol Buffers:
- RPC Protocols/Messages
- Persistent Storage of structured information
- As Client/Server Framework

According to Jeff Dean’s keynote at LADIS2009

- high performance (200+ MB/s encode/decode)
- fairly compact (uses variable length encodings)
- format used to store data persistently (not just for RPCs)

Low-level MapReduce interfaces are in terms of byte arrays
- Hardly ever use textual formats, though: slow, hard to parse
- Most input & output is in encoded Protocol Buffer format

Language Support:
- C++
- Java
- Python

Optimization for different use cases: (e.g.: option optimize_for = SPEED)

- SPEED (default): The protocol buffer compiler will generate code for serializing, parsing, and performing other common operations on your message types. This code is extremely highly optimized.

- CODE_SIZE: The protocol buffer compiler will generate minimal classes and will rely on shared, reflection-based code to implement serialialization, parsing, and various other operations. The generated code will thus be much smaller than with SPEED, but operations will be slower. Classes will still implement exactly the same public API as they do in SPEED mode. This mode is most useful in apps that contain a very large number .proto files and do not need all of them to be blindingly fast.

- LITE_RUNTIME: The protocol buffer compiler will generate classes that depend only on the "lite" runtime library (libprotobuf-lite instead of libprotobuf). The lite runtime is much smaller than the full library (around an order of magnitude smaller) but omits certain features like descriptors and reflection. This is particularly useful for apps running on constrained platforms like mobile phones. The compiler will still generate fast implementations of all methods as it does in SPEED mode. Generated classes will only implement the MessageLite interface in each language, which provides only a subset of the methods of the full Message interface.

The detail of Protocol Buffers, please refer

We may select one between Protocol Buffers and Thrift as our building block. After have a brief read of the Protocol Buffers’ code, and compare to our experiences of using Thrift, I like Thrift, which provide better RPC implementation and coding interfaces.

There are also some performance compares of Thrift and Protocol Buffers:

How to install protobuf (an example):

1. Download protobuf-2.2.0a.tar.gz
$ cd /usr/local/src
$ tar -zxvf /root/pkgs/protobuf-2.2.0a.tar.gz

Read README.TXT and INSTALL.TXT for detail.

2. Build and install the C++ Protocol Buffer runtime and the Protocol Buffer compiler (protoc)
$./configure --prefix=/usr/local/protobuf
$ make
$ make check
$ make install

Set linux lib path, then application can find
$ echo “/usr/local/protobuf/lib” > /etc/
$ ldconfig

3. /etc/profile.d/
This is added by me. It add some system level environment variables for the convenience of applications.
# apache-ant
export ANT_HOME

# google protocol buffer
# apps use pkg-config to compile and link protobuf (eg. pkg-config --cflags --libs protobuf)

export PATH
4. Install protobuf Java
$ cd /usr/local/src/protobuf-2.2.0a/java
Read README.TXT (Installation - Without Maven)

$ protoc --java_out=src/main/java -I../src ../src/google/protobuf/descriptor.proto

Write a new build.xml:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>

<project basedir="." default="jar-libprotobuf" name="libprotobuf">
<property environment="env"/>

<!-- javac options -->
<property name="javac.version" value="1.6"/>
<property name="javac.source" value="${javac.version}"/>
<property name="" value="${javac.version}"/>
<property name="javac.deprecation" value="off"/>
<property name="javac.debug" value="off"/>
<property name="javac.debuglevel" value="source,lines,vars"/>
<property name="javac.optimize" value="on"/>
<property name="javac.args" value=""/>
<property name="javac.args.warnings" value="-Xlint:unchecked"/>

<!-- jar options -->
<property name="jar.index" value="true"/>

<!-- protobuf names -->
<property name="version" value="2.2.0a"/>
<property name="Name" value="libprotobuf"/>
<property name="" value="${Name}-java-${version}"/>

<!-- dir locations -->
<property name="src.dir" value="${basedir}/src"/>
<property name="src.main.dir" value="${src.dir}/main"/>
<property name="src.test.dir" value="${src.dir}/test"/>
<property name="build.dir" value="${basedir}/build"/>
<property name="build.classes.dir" value="${build.dir}/classes"/>

<!-- TARGET init -->
<target name="init">
<mkdir dir="${build.dir}"/>

<!-- TARGET clean -->
<target name="clean">
<delete dir="${build.dir}"/>

<!-- TARGET cleanall -->
<target name="cleanall" depends="clean">
<fileset dir="." includes="*.jar"/>

<!-- TARGET compile-libprotobuf -->
<target name="compile-libprotobuf" depends="init" >
<echo message="${}: ${ant.file}"/>
<mkdir dir="${build.classes.dir}"/>
<javac source="${javac.source}" target="${}"
<compilerarg line="${javac.args} ${javac.args.warnings}" />

<!-- TARGET jar-libprotobuf -->
<target name="jar-libprotobuf" depends="compile-libprotobuf">
<jar basedir="${build.classes.dir}" destfile="${build.dir}/${}.jar" index="${jar.index}">
<copy todir="${basedir}">
<fileset file="${build.dir}/${}.jar"/>

<!-- for libprotobuf-lite -->

<property name="build.lite.dir" value="${build.dir}/lite"/>
<property name="build.lite.classes.dir" value="${build.lite.dir}/classes"/>
<property name="" value="${Name}-lite-java-${version}"/>

<!-- TARGET compile-libprotobuf-lite -->
<target name="compile-libprotobuf-lite" depends="init" >
<echo message="${}: ${ant.file}"/>
<mkdir dir="${build.lite.dir}"/>
<mkdir dir="${build.lite.classes.dir}"/>
<javac source="${javac.source}" target="${}"
<compilerarg line="${javac.args} ${javac.args.warnings}" />

<!-- TARGET jar-libprotobuf-lite -->
<target name="jar-libprotobuf-lite" depends="compile-libprotobuf-lite">
<jar basedir="${build.lite.classes.dir}" destfile="${build.lite.dir}/${}.jar" index="${jar.index}">
<copy todir="${basedir}">
<fileset file="${build.lite.dir}/${}.jar"/>


$ ant
$ cp libprotobuf-java-2.2.0a.jar /usr/local/protobuf/lib/

$ ant libprotobuf-lite-java
$ cp libprotobuf-lite-java-2.2.0a.jar /usr/local/protobuf/lib/

5. Build examples
$ cd /usr/local/src/protobuf-2.2.0a/examples
Read detail of README.txt

$ export CLASSPATH=.:$CLASSPATH:/usr/local/protobuf/lib/libprotobuf-java-2.2.0a.jar
$ make java

$ make cpp

$ make python

Then we can read the example code (AddPersion and ListPeople) and run them.

Thursday, October 29, 2009

Amazon adding more components/utilites as services in AWS

Amazon just launched Relational Database Service (RDS) as a new service in its AWS cloud. Werner Vogels's blog "Expanding the Cloud: The Amazon Relational Database Service (RDS)" gives us more detail of the motivations to create it's S3, EC2, SimpleDB, EBS and now RDS. We can find the business

Amazon's tech. style:
In the Amazon services architecture, each service is responsible for its own data management, which means that each service team can pick exactly those solutions that are ideally suited for the particular application they are implementing.It allows them to tailor the data management system such that they get maximum reliability and guaranteed performance at the right cost as the system scales up.

So, for the target of "scalability, reliability, performance, and cost-effectiveness":
- Since the Key-Value storage solutions are widely used, they led to the creation of S3.
- Since the simple structured data management systems without complex transactions and relations, without rigid schema is widely used, they led to the creation of SimpleDB.
- But, Amazon found that many applications running in EC2 instances want to use RDBMS, then EBS is launched to provide scalable and reliable storage volume that can be used for persisting the databases.
- To free up users to focus on their applications and business, now RDS is ready. The users need not to maintain their DB and consider who to scale the DB.
- Another case is AWS MapReduce services.

RDS is MySQL in AWS cloud.

Now, AWS users have three methods to use DB:
(1) RDS
(3) SimpleDB (no relation model)

We can re-read the good paper "Amazon Cloud Architecture", to understand the strategy of AWS better.

[1] Amazon Relational Database Service (RDS):
[2] Werner Vogels's blog:
[3] Amazon Cloud Architecture:

Friday, October 2, 2009

The Integration of Analytic DBMS and Hadoop

Recently, tow famous vendors of analytic DBMS, Vertica and Aster-Data announced their integration with Hadoop. The analytic DBMS and Hadoop, each address distinct but complementary problems for managing large data.


Currently it is a light integration.
  • ETL, ELT, data cleansing, data mining, etc.
  • Moving data between Hadoop and Vertica.
  • InputFormat (InputSplit , VerticaRecord, push down relational map operations by parameterizing the database query).
  • OutputFormat (to existing or create a new table).
  • Easy for Hadoop developers to push down Map operations to Vertica databases in parallel by specifying parameterized queries which result in pre-aggregated data for each mapper.
  • Support Hadoop streaming interface.

Typical usages:

(1) Raw Data->Hadoop(ETL)->Vertical (for fast ad-hoc query, near realtime)
(2) Vertical -> Hadoop(ETL) ->Vertical (for fast ad-hoc query, near realtime)
(3) Vertical -> Hadoop (sophisticated query for analysis or mining)

We can expect to see tighter integration and higher performance.

[1] The Scoop on Hadoop and Vertica:
[2] Using Vertica as a Structured Data Repository for Apache Hadoop:
[3] Cloudera DBInputFormat interface:
[4] Managing Big Data with Hadoop and Vertica:


AsterData already provide in-database MapReduce.

The new Aster-Hadoop Data Connector, which utilizes Aster’s patent-pending SQL-MapReduce capabilities for two-way, high-speed, data transfer between Apache Hadoop and Aster Data’s massively parallel data warehouse.
  • ETL processing or data mining, and then pull that data into Aster for interactive queries or ad-hoc analytics on massive data scales.
  • The Connector utilizes key new SQL-MapReduce functions to provide ultra-fast, two-way data loading between HDFS (Hadoop Distributed File System) and Aster Data’s MPP Database.
  • Parallel loader.
  • LoadFromHadoop: Parallel data loading from HDFS to Aster nCluster.
  • LoadToHadoop: Parallel data loading from Aster nCluster to HDFS.

Key advantages of Aster’s Hadoop Connector include:
  • High-performance: Fast, parallel data transfer between Hadoop and Aster nCluster.
  • Ease-of-use: Analysts can now seamlessly invoke a SQL command for ultra-simple import of Hadoop-MapReduce jobs, for deeper data analysis. Aster intelligently and automatically parallelizes the load.
  • Data Consistency: Aster Data's data integrity and transactional consistency capabilities treat the data load as a 'transaction', ensuring that the data load or export is always consistent and can be carried out while other queries are running in parallel in Aster.
  • Extensibility: Customers can easily further extend the Connector using SQL-MapReduce, to provide further customization for their specific environment.

The typical usages are similar to Vertica.

[1] Aster Data Announces Seamless Connectivity With Hadoop:
[2] DBMS2 - MapReduce tidbits
[3] AstaData Blog: Aster Data Seamlessly Connects to Hadoop,

Another Integration of Analytic DBMS and Hadoop case is HadoopDB project.

Monday, September 28, 2009

Notes of Key-Value Stores

Key-Value Stores:

It’s a storage system that stores values, indexed by a key. [1]

Sometimes, a key-value store works as a under layer storage system for a data management system.

When to consider Key-Value Stores:

- You need something that can query simple data faster than a relational system can.

- There are scaling requirements that are difficult to meet with a relational system.

- You want to avoid tying an application to a requirement for a relational system that may well have its own maintenance needs over time.

- You just want something simpler than a relational database.

- ……

In all of these cases, a key-value store may be just the tool for you.

Typical data structure:


Hash based key-value stores have fast lookup and update speeds, and require that keys be unique.

Lookup in a hash based tree is limited to key-only lookup.

For example:

To an object storage system which stores billions of photos, its applications would only query/get a photo by its unique key/id, like Facebook’s Haystack. [2]

The general API: Put/Get/Delete.

The important issues of hash based key-value stores are to limit the size of in-memory index/metadata, and to implement the distribution.

Tree (e.g. B-tree or B+tree)

Tree based stores generally allow multiple identical keys, and because tree based structures (like the b-tree) are ordered, they allow you to query individual keys, as well as ranges of keys.

The drawback to tree based stores is that those structures are generally slower than simple hashes.

For example:

A log aggregation system that stores messages keyed by timestamp, but which uses a hash based store, will be troublesome to query in a useful way, despite the speed of lookup, because you can only lookup specific timestamps. A system that uses a b-tree based store may be marginally slower for individual queries, but by permitting lookup based on a range of timestamps, can easily query the records of interest. This more than offsets the difference in basic query speeds. BigTable is based on B-Tree based key-value store. [3]

The general API: Put/Get/Delete/Scan.

The important issue of tree based key-value stores is to implement distributed tree structures.


[1] Engine Yard, Key-Value Stores in Ruby (Key-Value Stores Part 1)

[2] Facebook, Needle in a haystack: efficient storage of billions of photos,

[3] Data Management Projects at Google-200803,

Sunday, September 27, 2009

Tips of Hadoop Runtime Environment


To setup a cluster for running Hadoop/HBase/Hive, etc., besides the configuration and tuning of these open-source programs themselves, the system hardware and software, and some useful utilities, should also be considered to improve the system performance and to ease the system maintenance.

1. Cluster Facilities and Hardware [1]

(1) Data center:

Usually, we run Hadoop/HBase/Hive in a single data center.

(2) Servers:

Clusters are often either capacity bound or CPU bound.

The 1U or 2U configuration is usually used.

The storage capability of each node is usually not too dense (<= 4TB is recommended).

Commodity server: 2x4 core CPU, 16 GB RAM, 4x1TB SATA, 2x1 GE NIC

Use ECC RAM and cheap hard drives: 7200 RPM SATA.

Start with standard 64-bit box for masters and workers.

(3) Network:

Gigabit Ethernet, 2 level tree, 5:1 oversubscription to core

May want redundancy at top of rack and core

Usually, for a small cluster, all nodes are under a single GE switch.

(4) RAID configuration: RAID0

If there are two or more disks in each machine, RAID0 can provide better disk throughput than other RAID levels (and JBOD?). The multiple data replicas of HDFS can tolerate failure and guarantee the data safety.

2. System Software

(1) Linux:

RedHat5+ or CentOS5+ (recommended). Now, we use CentOS5.3-x64.

(2) Local File System:

Ext3 is ok.

We usually configure a separate disk partition for Hadoop used local file system, create and mount separate local file system for Hadoop.

Mount with noatime and nodiratime for performance improvements. Default, Linux will update the atime of files and directories, which is unnecessary in most cases. [2]

-- Edit /etc/fstab:

e.g. /dev/VolGroup00/LogVol02 /data ext3 defaults,noatime,nodiratime 1 2

-- remound:

mount -o remount /data

(3) Swappiness configuration: [3]

With the introduction of version 2.6, the new variable "swappiness" was added in the Linux kernel memory management subsystem and a tunable was created for it. High value of swappiness will make the kernel page out application text in favour of another application or even file-system cache. The default value is 60 (see mm/vmscan.c).

If you end up swapping, you're going to start seeing some weird behavior and very slow GC runs, and likely killing off HBase regionservers as ZooKeeper times out and assume the RegionServer is dead. Suggest setting vm.swappiness = 0 or other low number (e.g. 10), and observe the state of swap.

-- Edit /etc/sysctl.conf : vm.swappiness=0

-- To check the current value on a running system: sysctl vm.swappiness

(4) Linux default file handle limit: [4]

Currently HBase is a file handle glutton. To up the users' file handles, edit /etc/security/limits.conf on all nodes.

* - nofile 32768

(5) Java: JRE/JDK1.6 latest and GC options [5]

For machine with 4-16 cores, our Hadoop/HBase and other java applications should use GC option as: “-XX:+UseConcMarkSweepGC”.

For machine with 2 cores, should use GC option as: “-XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode”.

(6) Apache ANT 1.7.1

(7) Useful Linux utilities:

top, sar, iostat, iftop, vmstat, nfsstat, strace, dmesg, and friends

Especially iostat is very useful for disk I/O analysis.

(8) Useful java utilities:

jps, jstack, jconsole

(9) Compression native library: Gzip and LZO [7]

(10) Ganglia:

To integrate metrics of Hadoop, HBase, Hive, applications, and Linux system.


[1] Hadoop and Cloudera, Managing Petabytes with Open Source, Jeff Hammerbacher, Aug. 21 or

[2] Set noatime of local file system:

[3] Linux Swappiness:

[4] HBase FAQ:

[5] Java SE 6 HotSpot[tm] Virtual Machine Garbage Collection Tuning,

[6] Apache Ant:


Saturday, September 12, 2009

HFile: A Block-Indexed File Format to Store Sorted Key-Value Pairs

1. Intruduction

HFile is a mimic of Google’s SSTable. Now, it is available in Hadoop HBase-0.20.0. And the previous releases of HBase temporarily use an alternate file format – MapFile[4], which is a common file format in Hadoop IO package. I think HFile should also become a common file format when it becomes mature, and should be moved into the common IO package of Hadoop in the future.

Following words of SSTable are from section 4 of Google’s Bigtable paper.

The Google SSTable file format is used internally to store Bigtable data. An SSTable provides a persistent, ordered immutable map from keys to values, where both keys and values are arbitrary byte strings. Operations are provided to look up the value associated with a specified key, and to iterate over all key/value pairs in a specified key range. Internally, each SSTable contains a sequence of blocks (typically each block is 64KB in size, but this is configurable). A block index (stored at the end of the SSTable) is used to locate blocks; the index is loaded into memory when the SSTable is opened. A lookup can be performed with a single disk seek: we first find the appropriate block by performing a binary search in the in-memory index, and then reading the appropriate block from disk. Optionally, an SSTable can be completely mapped into memory, which allows us to perform lookups and scans without touching disk.[1]

The HFile implements the same features as SSTable, but may provide more or less.

2. File Format

Data Block Size

Whenever we say Block Size, it means the uncompressed size.

The size of each data block is 64KB by default, and is configurable in HFile.Writer. It means the data block will not exceed this size more than one key/value pair. The HFile.Writer starts a new data block to add key/value pairs if the current writing block is equal to or bigger than this size. The 64KB size is same as Google’s [1].

To achieve better performance, we should select different block size. If the average key/value size is very short (e.g. 100 bytes), we should select small blocks (e.g. 16KB) to avoid too many key/value pairs in each block, which will increase the latency of in-block seek, because the seeking operation always finds the key from the first key/value pair in sequence within a block.

Maximum Key Length

The key of each key/value pair is currently up to 64KB in size. Usually, 10-100 bytes is a typical size for most of our applications. Even in the data model of HBase, the key (rowkey+column family:qualifier+timestamp) should not be too long.

Maximum File Size

The trailer, file-info and total data block indexes (optionally, may add meta block indexes) will be in memory when writing and reading of an HFile. So, a larger HFile (with more data blocks) requires more memory. For example, a 1GB uncompressed HFile would have about 15600 (1GB/64KB) data blocks, and correspondingly about 15600 indexes. Suppose the average key size is 64 bytes, then we need about 1.2MB RAM (15600X80) to hold these indexes in memory.

Compression Algorithm

- Compression reduces the number of bytes written to/read from HDFS.

- Compression effectively improves the efficiency of network bandwidth and disk space

- Compression reduces the size of data needed to be read when issuing a read

To be as low friction as necessary, a real-time compression library is preferred. Currently, HFile supports following three algorithms:

(1)NONE (Default, uncompressed, string name=”none”)

(2)GZ (Gzip, string name=”gz”)

Out of the box, HFile ships with only Gzip compression, which is fairly slow.

(3)LZO(Lempel-Ziv-Oberhumer, preferred, string name=”lzo”)

To achieve maximal performance and benefit, you must enable LZO, which is a lossless data compression algorithm that is focused on decompression speed.

Following figures show the format of an HFile.

In above figures, an HFile is separated into multiple segments, from beginning to end, they are:

- Data Block segment

To store key/value pairs, may be compressed.

- Meta Block segment (Optional)

To store user defined large metadata, may be compressed.

- File Info segment

It is a small metadata of the HFile, without compression. User can add user defined small metadata (name/value) here.

- Data Block Index segment

Indexes the data block offset in the HFile. The key of each index is the key of first key/value pair in the block.

- Meta Block Index segment (Optional)

Indexes the meta block offset in the HFile. The key of each index is the user defined unique name of the meta block.

- Trailer

The fix sized metadata. To hold the offset of each segment, etc. To read an HFile, we should always read the Trailer firstly.

The current implementation of HFile does not include Bloom Filter, which should be added in the future.

3. LZO Compression

LZO is now removed from Hadoop or HBase 0.20+ because of GPL restrictions. To enable it, we should install native library firstly as following. [6][7][8][9]

(1) Download LZO:, and build.

# ./configure --build=x86_64-redhat-linux-gnu --enable-shared --disable-asm

# make

# make install

Then the libraries have been installed in: /usr/local/lib

(2) Download the native connector library, and build.

Copy hadoo-0.20.0-core.jar to ./lib.

# ant compile-native

# ant jar

(3) Copy the native library (build/native/ Linux-amd64-64) and hadoop-gpl-compression-0.1.0-dev.jar to your application’s lib directory. If your application is a MapReduce job, copy them to hadoop’s lib directory. Your application should follow the $HADOOP_HOME/bin/hadoop script to ensure that the native hadoop library is on the library path via the system property -Djava.library.path=. [9]

4. Performance Evaluation


4 slaves + 1 master

Machine: 4 CPU cores (2.0G), 2x500GB 7200RPM SATA disks, 8GB RAM.

Linux: RedHat 5.1 (2.6.18-53.el5), ext3, no RAID, noatime

1Gbps network, all nodes under the same switch.

Hadoop-0.20.0 (1GB heap), lzo-2.0.3

Some MapReduce-based benchmarks are designed to evaluate the performance of operations to HFiles, in parallel.

Total key/value entries: 30,000,000.

Key/Value size: 1000 bytes (10 for key, and 990 for value). We have totally 30GB of data.

Sequential key ranges: 60, i.e. each range have 500,000 entries.

Use default block size.

The entry value is a string, each continuous 8 bytes are a filled with a same letter (A~Z). E.g. “BBBBBBBBXXXXXXXXGGGGGGGG……”.

We set to avoid client side bottleneck.

(1) Write

Each MapTask for each range of key, which writes a separate HFile with 500,000 key/value entries.

(2) Full Scan

Each MapTask scans a separate HFile from beginning to end.

(3) Random Seek a specified key

Each MapTask opens one separate HFile, and selects a random key within that file to seek it. Each MapTask runs 50,000 (1/10 of the entries) random seeks.

(4) Random Short Scan

Each MapTask opens one separate HFile, and selects a random key within that file as a beginning to scan 30 entries. Each MapTask runs 50,000 scans, i.e. scans 50,000*30=1,500,000 entries.

This table shows the average entries which are written/seek/scanned per second, and per node.

In this evaluation case, the compression ratio is about 7:1 for gz(Gzip), and about 4:1 for lzo. Even through the compression ratio is just moderate, the lzo column shows the best performance, especially for writes.

The performance of full scan is much better than SequenceFile, so HFile may provide better performance to MapReduce-based analytical applications.

The random seek in HFiles is slow, especially in none-compressed HFiles. But the above numbers already show 6X~10X better performance than a disk seek (10ms). Following Ganglia charts show us the overhead of load, CPU, and network. The random short scan makes the similar phenomena.


[1] Google, Bigtable: A Distributed Storage System for Structured Data,

[2] HBase-0.20.0 Documentation,

[3] HFile code review and refinement.

[4] MapFile API:

[5] Parallel LZO: Splittable Compression for Hadoop.

[6] Using LZO in Hadoop and HBase:

[7] LZO:

[8] Hadoop LZO native connector library:

[9] Hadoop Native Libraries Guide: