Integrating Hadoop and Elasticsearch – Part 1 – Loading into and Querying Elasticsearch from Apache Hive


As more and more organisations are deploying Hadoop and Elasticsearch in tandem to satisfy batch analytics, real-time analytics and monitoring requirements, the need for tigher integration between Hadoop and Elasticsearch has never been more important. In this series of blogposts we look at how these two distributed systems can be tightly integrated and how each of them can exploit the feaures of the other system to achieve ever demanding analytics and monitoring needs.


Hadoop is a broad ecosystem of tools that support bulk ingestion of data, efficient data formats and distributed processing of large data sets across clusters of commodity hardware. It allows for many ways (batch, interactive, SQL) to interact with the data that is stored in HDFS.


Elasticsearch together with Logstash for log tailing and Kibana for visualisation is gaining a lot of momentum as it is fairly easy to setup and get started with Elastic and its near real-time search and aggregation capabilities covers lots of use cases in the area of web analytics and monitoring (log & metric analysis).

Integrating Apache Hive with Elasticsearch

We first look at the required setup and configuration to integrate Apache Hive with Elasticsearch and then go through scenarios of querying data between Hadoop and Elastic

Setup and Configuration

Following steps guide you on the required setup and configuration both for pilot and production purposes


If you are getting started with testing ES-Hadoop connector then the following setup is suitable

  • Download and Copy Elastic-Hadoop connector to /tmp on HDFS
wget -P /tmp
unzip /tmp/ -d /tmp
hdfs dfs -copyFromLocal /tmp/elasticsearch-hadoop-2.2.0/dist/elasticsearch-hadoop-2.2.0.jar /tmp
  • Connecting to beehive
beeline -u "jdbc:hive2://hiveserver2:10000/default;principal=hive/hiveserver2@MYREALM"
  • Add JAR manually
add JAR hdfs://;
list JAR;

Please note that hiveserver2 in many cases is Hadoop namenode, you can also find this information from /etc/hive/conf/hive-site.xml


Once you have carried out satisfactory testing and would like to use then you should deploy as below

  • You will need to copy the jar file into $HIVE_HOME/lib (e.g /usr/lib/hive/lib) on all nodes in the hadoop cluster and restart the HiveServer2


  • Register the JAR through hive.aux.jars.path parameter of hive-site.xml and restart the hiveserver2

Now that the required setup is completed, we look at the following scenarios

  1. Querying data from Elasticsearch index
  2. Writing data to Elasticsearch index
  3. Offloading / archiving Elasticsearch index to Apache hive table

Querying data from Elasticsearch index

If you are able to query ElasticSearch from Hive this is in essence giving you an SQL interface to ElasticSearch data with out having to learn lucene query language.
This you achieve by creating an external table in Hive on top of ElasticSearch index using the elastic-hadoop driver we configured above

drop table fmem;

    dt      timestamp,
    server    STRING,
    mem   bigint)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = 'ela1', 'es.resource' = 'fmem/log', 'es.query' = '?q=*');

TBLPROPERTIES clause is used to specify the configuration parameters, I discuss below the important (essential) configuration parameters

es.nodes - list of es nodes, specifying one is enough as the other nodes in the cluster are discovered automatically
es.port - only need to mention if you are using non default port
es.resource - es index
es.query - query that is applied to es.resource, this allows you to externalize a part of the index in Hive - es username - es password

And finally you can also change the fields names using es.mapping.names property (e.g  ‘es.mapping.names’ = ‘date:dt , memory:mem’)

Once the ElasticSearch index is externalized in the form of Hive table, you can apply the full force of Hive query language to perform batch analytics

-- simple query to check if we are able to see data from ES index
select * from fmem limit 10;

-- we are collecting flume agent memory utilisation from our server farm over several weeks, this query is to determine the maximum utilisation on each server
select server, avg(mem),max(mem),min(mem) from fmem group by server;

As you can see in the above screen shot Hive start map-reduce to execute the query.

Inserting data to Elasticsearch index

There are use cases where you perform batch analytics using Hive and index these metrics to ElasticSearch for visualisation purpose. You can insert into Elastic index from Hive as below

  • Create an external hive table on the Elasticsearch index to which you would like to insert data, the index in elastic will be auto created on first insert
    day      STRING,
    metricid    BIGINT,
    count   BIGINT)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = 'ela1', 'es.resource' = 'mcrate/log');
  • Insert data to Elasticsearch from another table called event_history, here I am indexing the temporal frequency of metric collection from IoT sensors in LHC. You can see in the below screenshot that Hive starts map-reduce to execute the query and then inserts the results into elastic index
select to_date(ts),element_id, count(1) from event_history group by to_date(ts),element_id;

  • check that the records are available with a quick count
select count(1) from mcollect_rate;
  • Using the elastic commands you can also check the index state
curl ''

Using the configuration properties available you can also specify the metadata fields of ES index, like @timestamp or id (
Another option available is if you already have data in json format this can be loaded directly to ES index

STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = 'ela1', 'es.resource' = 'mcrate/log', 'es.input.json` = 'yes');

And finally writing to dynamic ES index can also be quite useful, this is determined by the value of the Hive table column at runtime

    dt      timestamp,
    server    STRING,
    mem   bigint)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = 'ela1', 'es.resource' = 'fmem/{server}', 'es.query' = '?q=*');

In the above example you will insert each server’s data into its own index.

Offloading / Archiving Elasticsearch index to Apache Hive table

For longer term retention and batch analytics you can offload ‘older data’ to Hive and drop it from ES index. This can be achieved as below

  • Create external Hive table on the index you would like to offload Hive
CREATE EXTERNAL TABLE fmem-2016-03-10 (
    dt      timestamp,
    server    STRING,
    mem   bigint)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = 'ela1', 'es.resource' = 'fmem-2016-03-10', 'es.query' = '?q=*');
  • Insert as select into the Hive table from external table
select * from fmem-2016-03-10;
  • Drop the Elasticsearch index
curl -XDELETE 'http://myelastic1:9200/fmem-2016-03-10/'


This blog post goes into the details of required setup and configuration for integrating Apache Hive with ElasticSearch and it explains at how to query Elastic from Hive, Indexing Hive data into Elastic and offloading indexes from Elastic to Hive for long term retention, There can be several use cases where such integration can be beneficial. In part 2 of the blog series we look at how Apache Spark can interact with Elasticsearch.


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s