Real time Analytics-Implementing a lambda architecture on Hadoop


Implement lambda architecture with fewer steps - using Spark, Hbase, Solr, Hbase-lily indexer and Hive


Welcome to three part tutorial of getting your data available for consumption on real time (near) basis. Data domain has so advanced where decision making has to rapid hence we are gradually moving away from batch based data load (ETL) and tending towards real time analytics. With data being center of your strategies and decision making, getting data available sooner is pivotal for all organization. This three part tech blog explains about implementing lambda architecture (architecture supporting batch and real time analytics alike).

Overall architecture for such projects is to cater three needs

1. Quick data access for web sites - Random access of data pattern e.g. a particular profile id or customer id or a comment key
2. Fast searches on random texts, fuzzy search, search suggestion e.g  customer name, product name etc.
3. Analytical query support for BI tools like Cognos,Tableau etc.





For sake of simplicity this tech blog is divided into three parts

  • Loading data to Hbase using Apache Spark using HbaseRDD and dataframes. [this blog]
  • Indexing data to Solr automatically from Hbase using Hbase lily indexer [Coming soon]
  • Creating Hive tables on Hbase using Hive-Hbase storage handler [Coming soon]


Blog assumes you have an IDE for Spark development, if you don't have go through link to install and set up Spark and Eclipse IDE

Below code will handle four tasks


  1. Get data to spark dataframe - from multiple source systems. In this blog we take data from a text file
  2. Create a simple Hbase table with two column families
  3. Write data from dataframe to Hbase table using normal load and bulk load 
  4. Read data from Hbase and converting to a  RDD and a  dataframe


Reading data to spark data frame

Method 1: Read data to a RDD and convert to data frame

package com.hari.sparkhbase.hbase
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.sql.SQLContext

object readwritetoHbase {
  case class Schema(name: String, age: Int, gender: String)

  def main(args: Array[String]): Unit = {

    //    Setting spark context
    val conf = new SparkConf().setAppName("ExploreSparknHbase").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    //    Read file to spark
    val data = sc.textFile("/tmp/test.dat")
    //    Define a schema with case class

    //    Skip header -> Split lines on delimiter -> Assign a schema to data
    val dataRDD = data.filter { line => !line.contains("Name,Age,Gender") }
      .map { line => line.split(",") }
      .map { x => Schema(x(0), x(1).toInt, x(2)) }
    val dataDF = dataRDD.toDF()
    dataDF.show()
    //Result so far   
    //+--------+---+------+
    //|    name|age|gender|
    //+--------+---+------+
    //|John Doe| 21|  Male|
    //|Jane Doe| 21|Female|
    //+--------+---+------+
  }

}

Method 2: Read data directly to dataframe using spark-csv package- if you are using spark2.0 or greater spark-csv is part of spark package else you need to add package to dependencies

Check my blog about usage of spark-csv and spark xml from here.

Create a simple Hbase table with two column families


Open hbase shell and run command

create 'test','cf1','cf2'

Write data from dataframe to Hbase table using normal load


Inorder to write into Hbase there are two popular packages available - sparkOnHbase or unicredit's HbaseRDD, this blog make use of latter.

Code to write into a single column family using normal load method

    dataDF.map { x =>
      val Array(col1, col2, col3) = x.toSeq.toArray
      val content = Map("name" -> col1.toString(), "age" -> col2.toString(), "gender" -> col3.toString())
      //      Make col1[Name] as Hbase row Key, in this case name is row key[not ideal but you get the gist ]
      //      And all other key value pair Map is passed as the fields into column family 1
      col1.toString() -> content
    }.toHBase(s"test", "cf1")

Code to write into a multiple column family using bulk load method


    //    Bulk method to multiple column families
    dataDF.map { x =>
      val Array(col1, col2, col3) = x.toSeq.toArray
      //      Making two set of Maps one for column family 1(cf1) and other for column family 2(cf2)
      val myCols1 = Map("name" -> col1.toString(), "age" -> col2.toString())
      val myCols2 = Map("gender" -> col3.toString())
      val content = Map("cf1" -> myCols1, "cf2" -> myCols2)
      col1.toString() -> content
    }.toHBaseBulk(s"test")

Read data from Hbase and load into a spark RDD and a spark dataframe

    //    Read from Hbase -entire data at once - assuming all our data is loaded in single column family
    //Making a map of our columns
    val columnlist = Map("cf1" -> Set("name", "age", "gender"))
    //Read data from hbase and assign into sparkSQL Row- This will create a RDD of Rows
    val hbaseRDD = sc.hbase[String](s"test", columnlist).map({
      case (k, v) =>
        val cf = v("cf1")
        Row(k, cf("name"), cf("age"), cf("gender"))
    })
    //Convert RDD to DF using a case class of our columns- you can type case the data if you need at this stage
    val hbaseDF = hbaseRDD.map({
      case Row(val1: String, val2: String, val3: String) => (val1, val2, val3)
    }).toDF("key", "name", "age", "gender")


 Check out my Github account for full code - repo

Comments

Popular posts from this blog

How to build a datawarehouse on Hadoop

Compose and Send HTML emails from Informatica (ETL)

Update and Delete on Hive table (Hive supports CRUD)