Monday, November 16, 2015

A few memory issues in Hadoop!

In a Hadoop setup (rather any Big Data setup), memory issues are not unexpected!

An update on couple of issues we have seen off late –


1.       NameNode process gets stuck:

In this case, typically you will see following symptoms –

a.       DataNode gives following timeout error -

WARN ipc.Client: Exception encountered while connecting to the server : 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/<datanode-ip>:<datanode-port> remote=/<namenode-ip>:<namenode-port>]

ls: Failed on local exception: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/<datanode-ip>:<datanode-port> remote=/<namenode-ip>:<namenode-port>]; Host Details : local host is: "<datanode-ip>"; destination host is: "<namenode-ip>":<namenode-port>;

ð  What this essentially means is that DataNode process timed-out while trying to connect to the NameNode. So obviously the next step is to check why NameNode didn’t respond.

b.      On checking NameNode logs, we observed following warning –

WARN org.apache.hadoop.util.JvmPauseMonitor (org.apache.hadoop.util.JvmPauseMonitor$Monitor@18df3f69): Detected pause in JVM or host machine (eg GC): pause of approximately 74135ms No GCs detected

This indicates that the NameNode paused for longer than expected time of 60000ms. This also explains why DataNode did not get response from NameNode in designated 60000ms.

The warning also indicates that the pause was not due to GC. Typically GC can cause such ‘stop the world’ pauses and if that’s the case, it calls for a memory profiling and GC Tuning.

However, in this case, it turned out CPU activity was very high on the master node due to another cronjob. We sorted out the cronjob issue and the issue was resolved.


2.       DataNode process OOM:

Depending on the size of data and amount of data activity, you may observe OOM issue in DataNode process once in a while.

A quick fix would be to allocate more memory to DataNode process. Typically following configuration change will be helpful –


Also, it is advisable to configure data-node to generate heap-dump on OOM error. That will help you with further analysis of heap if you get same error again.

(This is applicable to other processes as well – NN/RM/NM etc.)





Wednesday, June 17, 2015

EMR cluster and selection of EC2 instance type - Cost Optimization!

AWS Elastic MapReduce (EMR) is Amazon’s service providing Hadoop in the Cloud.
EMR inherently uses the EC2 nodes as the hadoop nodes. While triggering an EMR cluster, we can choose appropriate instance type based on the requirements of resources and profile of hadoop process.

In our case, we achieved direct cost saving of 30% by moving from m2.4xlarge to r3.2xlarge for our EMR cluster.
Apart from this, we had additional advantage of performance improvement of our hadoop jobs because of new generation CPUs in r3 instances and also because of SSD.
SSD is better for HDFS as well as MapReduce jobs because a MapReduce job reads/writes intermediate data to local disk.

Apart from this, earlier we used to use m2.4xlarge cluster for all transient clusters but as part of this optimization I observed that some jobs used to take hardly 15 minutes.
Now, as EMR billing is done on hourly rates, even for using cluster for only 20 minutes, we will end up paying cost for full 1 hour.
In such cases, I decided to move corresponding jobs to smaller r3.xlarge instances which resulted in cost savings of 65%!

We can further push this cost savings upto 90% by moving to spot instances for our transient clusters.
However this requires more effort in terms of ensuring fault-tolerance of our workflows and avoiding delays in our job execution due to loss of spot instances because of price spikes.

In summary, appropriate instance type selection can significantly reduce your EMR bills!

Following page talks about available EC2 instance types -
After comparing various instance types, I decided to move our transient clusters from m2.4xlarge to r3.2xlarge clusters.

For example, let’s compare m2.4xlarge vs r3.2xlarge


8 core
8 core (new gen CPU)


No (2 x 840 GB)
Yes (1 x 160 GB)
SSD is better for performance of hadoop jobs
EC2 Price
$0.980 per Hour
$0.700 per Hour

EMR Price
$0.246 per Hour
$0.180 per Hour

Total Price
$1.226 per Hour
$0.880 per Hour
~30% cost saving

Default Memory Allocation:
Another interesting aspect of EMR is that EMR's ResourceManager allocates different memory to various YARN containers (mapper, reducer, application-manager etc.) based on instance type.
That means that only looking at the available resources for an instance type is not sufficient to take decision about which instance type to use for our EMR cluster.
Configuration Option
Default Value

Configuration Option
Default Value

Following page shows default memory allocations for various instance types.
The memory allocation does not go well with all hadoop jobs, so we wrote a script to reduce the memory allocation in r3.2xlarge instances during bootstrap of the cluster.

Overall, we achieved direct cost saving of 30% to 90% by selecting appropriate EC2 instance types for our EMR cluster.

- Sarang Anajwala

Tuesday, May 26, 2015

Cost optimization through performance improvement of S3DistCp

We reduced the cost of running our production cluster by about 60% by reducing total size of production cluster from 15 nodes to 6 nodes through performance tuning of AWS utility S3DistCp. This page provides some details about the same.

What is S3DistCp

Apache DistCp is an open-source tool you can use to copy large amounts of data. DistCp uses MapReduce to copy in a distributed manner—sharing the copy, error handling, recovery, and reporting tasks across several servers. For more information about the Apache DistCp open source project, go to
S3DistCp is an extension of DistCp that is optimized to work with AWS, particularly Amazon S3.


The AWS S3DistCp command, by default, is susceptible to take very high number of resources of the cluster to transfer even a small file!
As mentioned above, S3DistCp uses MapReduce to copy a file in distributed manner. By default, a MapReduce job creates approximately as many number of reducer as teh available container slots in the cluster.
Hive optimizes this behavior by estimating required number of reducers based on the input data size. However, S3DistCp uses the default MapReduce logic to trigger as many reducers as available slots in the cluster.
Because of this, S3DistCp ends up creating huge number of reducers for large clusters.

For example, for one instance it created 1 mapper and 498 reducers to transfer 694kb of data!
This behavior eats up lots of resources on the cluster and eventually slows down the overall throughput of the cluster.

Specifying required number of  reducers by passing argument  "-D mapreduce.job.reduces=${no_of_reduces}" can optimize the number of reducers being triggered for given S3DistCp operation.

After restricting the default count of reducers to 5, the load on cluster reduced and throughput increased significantly.

Immediately after deployment of fix

Number of spikes reduced in cluster CPU utilization and number of containers running in cluster reduced significantly immediately after deploying the fix.

After 10 days

In 10 days, after we deployed S3DistCp fix, the cluster performance improved significantly. (Green block in graph indicates the improvement because of deployment of the fix)
We reduced the total cluster size in production from 15 nodes to 6 nodes.
As a result of this, our average cluster utilization improved. (The yellow line in graph indicates improvement in utilization as we slowly reduced nodes from 15 to 6.)
We are monitoring the cluster utilization and would slowly bring down the cluster size to about 3 to 4 nodes in coming days.

- Sarang Anajwala

Wednesday, April 29, 2015

Impact of NULL values on where-clause/group-by-clause in Hive queries

Following is the check to verified that NULL values do not impact GROUP BY but it DOES IMPACT where clause.

Query: select count(*) from table1 where (field1 is NULL) AND dth >= '2014-12-01-00' AND dth <= '2014-12-01-23';
Result: 24

- select count(*) from table1 where (field1 != 'Value1') AND dth >= '2014-12-01-00' AND dth <= '2014-12-01-23';
Result: 1853517 (The correct count is 1853541 which gets reported incorrectly here because count of NULL values is ignored.)

- select count(*) from table1 where (field1 = 'Value1') AND dth >= '2014-12-01-00' AND dth <= '2014-12-01-23'
Result: 142570

select field1, count(*) from table1 where dth >= '2014-12-01-00' AND dth <= '2014-12-01-23' GROUP by field1;


Wednesday, March 25, 2015

Hive - S3NativeFileSystem - Insert Overwrite Bug

We store all our data in S3. We create external tables pointing to the data in S3 and run hive queries on these tables.

In one of the use cases, we needed to update a table incrementally with incoming data.
The query was something like this –

SELECT distinct my_field
    SELECT my_field
        FROM new_table1
    SELECT my_field
      FROM main_table1
) s;

Basically what we are doing here is that we union new data with existing data and eliminate duplicates from this union.
The expected behavior is that the ‘main_table1’ should get updated  on each run with new data from ‘new_table’.
What we observed is this query was overwriting ‘main_table1’ with the data of ‘new_table’. That means on each run, we will lose all the old data and only new data will remain in the table.

The behavior is due to a bug in EMR’s S3 library. S3NativeFileSystem class deletes the S3 file pointed by ‘main_table1’ while preparing query plan itself!

Even simple ‘Explain’ statement for this insert-overwrite-query deletes the data file in S3! This can result in SERIOUS DATA LOSS!

Use a staging area (tmp table) to store your results and then copy the result from tmp table to main table.

CREATE TABLE tmp_table….

SELECT distinct my_field
    SELECT my_field
        FROM new_table1
    SELECT my_field
      FROM main_table1
) s;

SELECT my_field from tmp_table;

There is one problem with this solution though. If you stop the last INSERT OVERWRITE statement (INSERT OVERWRITE main_table1 SELECT my_field from tmp_table;) before it completes successfully, you lose all your data! (Remember? - The S3 file is deleted while preparing query plan itself!)

Stable Solution:
Use ‘INSERT INTO’ instead of ‘INSERT Overwrite’.

INSERT INTO main_table1
SELECT t1.my_field FROM new_table1 t1
WHERE t1.my_field NOT IN (SELECT t2.my_field FROM main_table1);

This will ensure that you incrementally update the ‘main_table1’ without using INSERT OVERWRITE!

- Sarang

Thursday, March 12, 2015

Tuning Yarn container for Oozie

Oozie is a popular workflow management tool for BigData applications.
To give some high level idea, following is the container allocation for a typical oozie workflow application with hive action.

If you are running a heavy job through Oozie, there are chances that the yarn container which runs oozie job (‘Oozie workflow’ container in above image) may give out of memory.
The memory allocation for yarn container for oozie can be increased with property ‘’ & ''. The default value is typically 1536.
The property can be updated in oozie application workflow definition (workflow.xml) to allocate additional memory to container.


- Sarang

Thursday, February 26, 2015

Hive - dynamic partitions performance issue with S3/EMR

We use Hive in Amazon EMR to parse our logs. Any insert query to insert data into a table where number of partitions are high (8000+ in our case), takes huge amount of time because the insert operation loads partitions every time.
This load partitions is done by making one hit per partition to S3 and one hit to metastore!
In our case, for one of the jobs I deciphered the logs - about 95 mins out of 102 mins are wasted in loading partitions.... which means almost 93% of time is wasted in loading partitions!

Following are the logs which suggest loading of partitions -
5538245 [main] INFO  hive.ql.metadata.Hive  - New loading path = s3://<path>/dth=2011-09-04-01 with partSpec {dth=2011-09-04-01}
5538697 [main] INFO  hive.ql.metadata.Hive  - New loading path = s3://<path>/dth=2015-01-01-15 with partSpec {dth=2015-01-01-15}
5539151 [main] INFO  hive.ql.metadata.Hive  - New loading path = s3://<path>/dth=2014-11-16-04 with partSpec {dth=2014-11-16-04}
5539661 [main] INFO  hive.ql.metadata.Hive  - New loading path = s3://<path>/dth=2014-08-16-19 with partSpec {dth=2014-08-16-19}
5540109 [main] INFO  hive.ql.metadata.Hive  - New loading path = s3://<path>/dth=2014-12-15-06 with partSpec {dth=2014-12-15-06}
6152836 [main] INFO  org.apache.hadoop.hive.ql.exec.Task  - Loading partition {dth=2014-12-04-18}
6152888 [main] INFO  org.apache.hadoop.hive.ql.exec.Task  - Loading partition {dth=2015-01-13-19}
6152941 [main] INFO  org.apache.hadoop.hive.ql.exec.Task  - Loading partition {dth=2014-12-27-11}
6152994 [main] INFO  org.apache.hadoop.hive.ql.exec.Task  - Loading partition {dth=2014-08-25-14}
6153046 [main] INFO  org.apache.hadoop.hive.ql.exec.Task  - Loading partition {dth=2014-08-28-18}

For all insert (/overwrite) queries, insert should be performed into a new(empty) table pointing to a temporary staging location. Once the insert statement completes, the data file should be copied using a cp command (hdfs cp/distcp) command to final destination.
This would ensure that the insert query completes quickly as the destination table for insert query is empty and no partitions will have to be loaded.
File copy operation should also complete quickly as it will be done outside hive and no partition loading will be involved.

As we are copying data file for new partition directly on file system and not through hive, we will have to recover these new partitions to make sure they are added into hive metastore otherwise this new partitions will not be visible to any next job in workflow which uses this table as input.

In our case, we observed performance for a particular job increase by almost 90%. Run-time for the job reduced from 4+ hours to 25-30 minutes.

- Sarang

Sunday, November 16, 2014

Hive Performance Improvement - statistics and file format checks

Following are couple of important configuration changes that can improve the performance of hive queries (specially insert queries) great deal.


By default 'hive.stats.autogather' is 'true'. This setting governs collection of statistics for newly created tables/partitions.
Following are the stats collected:
  • Number of rows
  • Number of files
  • Size in Bytes
For newly created tables and/or partitions (that are populated through the INSERT OVERWRITE command), statistics are automatically computed by default.
In case of tables with high number of partitions, this can have big impact on performance.
It is advisable to disable this setting with set hive.stats.autogather=false;
The stats can be collected when required using following query:
ANALYZE TABLE tablename [PARTITION(partcol1[=val1], partcol2[=val2], ...)]
  [FOR COLUMNS]          -- (Note: Hive 0.10.0 and later.)


This property governs whether to check file format or not when loading data files.
In our case, where the format of data file is governed by our own processes, checking file format every time we load data file may not add any value.
Hence, it is recommended to disable the file format check to gain some performance.
set hive.fileformat.check=false;


More than 50% of Performance improvement is observed by using these configuration changes. The time taken by hive queries for metering reduced from >100 minutes to <50 minutes.


Thursday, October 2, 2014

Hive Query to get 95th Percentiled ranked item

I was working on a query where I had to convert a complex MySql query which was providing 95th percentile value from a group_concat result.

Following is the hive query to do the same on a simple sample table.


Sample Table:

hive> describe coll;


col_name            data_type      

proc_id                 int                    

status                    string                 


Sample Data:

hive> select * from coll;

proc_id                status

53                           stopped

56                           stopped

1                              started

2                              started

52                           stopped

4                              started

59                           stopped

29                           stopped

13                           stopped

55                           stopped

54                           stopped

63                           stopped

8                              stopped

9                              stopped

51                           stopped

61                           stopped

69                           stopped

6                              stopped

23                           stopped

57                           stopped

3                              started

11                           stopped

7                              stopped

66                           stopped

12                           stopped

67                           stopped


How percent_rank() works?

Query: select status, proc_id, percent_rank() over (PARTITION BY status ORDER BY proc_id DESC) as proc_rank_desc from coll;


Status          proc_id     proc_rank_desc

started                4         0.0

started                3         0.3333333333333333

started                2         0.6666666666666666

started                1         1.0

stopped               69      0.0

stopped               67      0.047619047619047616

stopped               66      0.09523809523809523

stopped               63      0.14285714285714285

stopped               61      0.19047619047619047

stopped               59      0.23809523809523808

stopped               57      0.2857142857142857

stopped               56      0.3333333333333333

stopped               55      0.38095238095238093

stopped               54      0.42857142857142855

stopped               53      0.47619047619047616

stopped               52      0.5238095238095238

stopped               51      0.5714285714285714

stopped               29      0.6190476190476191

stopped               23      0.6666666666666666

stopped               13      0.7142857142857143

stopped               12      0.7619047619047619

stopped               11      0.8095238095238095

stopped               9       0.8571428571428571

stopped               8       0.9047619047619048

stopped               7       0.9523809523809523

stopped               6       1.0


Final Query:

select q.status, min(q.proc_id) as proc_id

from (

select * from (select status, proc_id, percent_rank() over (PARTITION BY status ORDER BY proc_id DESC) as proc_rank_desc

from coll

           ) ranked_table

where ranked_table.proc_rank_desc >=0.95) q

group by q.status;

Final Result:

status                    proc_id

started                 1

stopped               6



Friday, September 26, 2014

Collections in CQL3 - How they are stored

If you don’t already know about collections in Cassandra CQL, following page provides excellent details about the same –

However, if you have been using cassandra from pre-CQL days, you would have worked with low level thrift APIs and hence you would be tempted to think how the data looks like in cassandra’s internal storage structure [which is very well articulated (exposed?) by thrift APIs]!
I have a big hangover of my extensive work with thrift API and hence I always get tempted to think how my CQL data looks like in internal storage structure.

Following is a CQL column-family containing  different types of collections – set/list/map followed by corresponding look of data in internal storage structure:


cqlsh:dummy> describe table users;

  user_id text,
  emails set<text>,
  first_name text,
  last_name text,
  numbers list<int>,
  todo map<timestamp, text>,
  top_places list<text>,
  PRIMARY KEY (user_id)

cqlsh:dummy> select * from users;

user_id    | emails                                 | first_name   | last_name      | numbers       | todo                                     | top_places             | numbermap
   frodo   | {'', ''} |      Frodo    |   Baggins     | [1, 2, 3]      | {'2012-09-24 00:00:00-0700': value1’} | ['rivendell', 'rohan']   | {1: 11, 2: 12}
Internal Storage Structure: (CLI result)
RowKey: frodo
=> (name=, value=, timestamp=1411701396643000)
=> (name=emails:62616767696e7340676d61696c2e636f6d, value=, timestamp=1411701396643000)
=> (name=emails:664062616767696e732e636f6d, value=, timestamp=1411701396643000)
=> (name=first_name, value=46726f646f, timestamp=1411701396643000)
=> (name=last_name, value=42616767696e73, timestamp=1411701396643000)
=> (name=numbermap:00000001, value=0000000b, timestamp=1411703133238000)
=> (name=numbermap:00000002, value=0000000c, timestamp=1411703133238000)
=> (name=numbers:534eaca0452c11e4932561c636c97db3, value=00000001, timestamp=1411701740650000)
=> (name=numbers:534eaca1452c11e4932561c636c97db3, value=00000002, timestamp=1411701740650000)
=> (name=numbers:534eaca2452c11e4932561c636c97db3, value=00000003, timestamp=1411701740650000)
=> (name=todo:00000139f7134980, value=76616c756531, timestamp=1411702558812000)
=> (name=top_places:a3300bb0452c11e4932561c636c97db3, value=726976656e64656c6c, timestamp=1411701874667000)
=> (name=top_places:a3300bb1452c11e4932561c636c97db3, value=726f68616e, timestamp=1411701874667000)

Some important points to note:
-          The ‘set’ field (emails) do not have any column-values in CLI result. As set is expected to store unique items, the values (rather, hash values) are stored as part of column-names only!
-          On the contrary, as ‘list’ field (numbers/top_places) is expected to have duplicate values, the actual value of list elements is stored in column-value and not in column-name to avoid overwrite of duplicate elements!
-          ‘map’ field (numbermap/todo): hash/hex of key is used in column-name and hash of values is used in column-values.