Real time Analytics-Implementing a lambda architecture on Hadoop
Implement lambda architecture with fewer steps - using Spark, Hbase, Solr, Hbase-lily indexer and Hive
Welcome to three part tutorial of getting your data available for consumption on real time (near) basis. Data domain has so advanced where decision making has to rapid hence we are gradually moving away from batch based data load (ETL) and tending towards real time analytics. With data being center of your strategies and decision making, getting data available sooner is pivotal for all organization. This three part tech blog explains about implementing lambda architecture (architecture supporting batch and real time analytics alike).
Overall architecture for such projects is to cater three needs
1. Quick data access for web sites - Random access of data pattern e.g. a particular profile id or customer id or a comment key
2. Fast searches on random texts, fuzzy search, search suggestion e.g customer name, product name etc.
3. Analytical query support for BI tools like Cognos,Tableau etc.
For sake of simplicity this tech blog is divided into three parts
- Loading data to Hbase using Apache Spark using HbaseRDD and dataframes. [this blog]
- Indexing data to Solr automatically from Hbase using Hbase lily indexer [Coming soon]
- Creating Hive tables on Hbase using Hive-Hbase storage handler [Coming soon]
Blog assumes you have an IDE for Spark development, if you don't have go through link to install and set up Spark and Eclipse IDE
Below code will handle four tasks
- Get data to spark dataframe - from multiple source systems. In this blog we take data from a text file
- Create a simple Hbase table with two column families
- Write data from dataframe to Hbase table using normal load and bulk load
- Read data from Hbase and converting to a RDD and a dataframe
Reading data to spark data frame
Method 1: Read data to a RDD and convert to data framepackage com.hari.sparkhbase.hbase
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.sql.SQLContext
object readwritetoHbase {
case class Schema(name: String, age: Int, gender: String)
def main(args: Array[String]): Unit = {
// Setting spark context
val conf = new SparkConf().setAppName("ExploreSparknHbase").setMaster("local[4]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// Read file to spark
val data = sc.textFile("/tmp/test.dat")
// Define a schema with case class
// Skip header -> Split lines on delimiter -> Assign a schema to data
val dataRDD = data.filter { line => !line.contains("Name,Age,Gender") }
.map { line => line.split(",") }
.map { x => Schema(x(0), x(1).toInt, x(2)) }
val dataDF = dataRDD.toDF()
dataDF.show()
//Result so far
//+--------+---+------+
//| name|age|gender|
//+--------+---+------+
//|John Doe| 21| Male|
//|Jane Doe| 21|Female|
//+--------+---+------+
}
}
Method 2: Read data directly to dataframe using spark-csv package- if you are using spark2.0 or greater spark-csv is part of spark package else you need to add package to dependencies
Check my blog about usage of spark-csv and spark xml from here.
Create a simple Hbase table with two column families
Open hbase shell and run command
create 'test','cf1','cf2'
Write data from dataframe to Hbase table using normal load
Inorder to write into Hbase there are two popular packages available - sparkOnHbase or unicredit's HbaseRDD, this blog make use of latter.
Code to write into a single column family using normal load method
dataDF.map { x =>
val Array(col1, col2, col3) = x.toSeq.toArray
val content = Map("name" -> col1.toString(), "age" -> col2.toString(), "gender" -> col3.toString())
// Make col1[Name] as Hbase row Key, in this case name is row key[not ideal but you get the gist ]
// And all other key value pair Map is passed as the fields into column family 1
col1.toString() -> content
}.toHBase(s"test", "cf1")
Code to write into a multiple column family using bulk load method
// Bulk method to multiple column families
dataDF.map { x =>
val Array(col1, col2, col3) = x.toSeq.toArray
// Making two set of Maps one for column family 1(cf1) and other for column family 2(cf2)
val myCols1 = Map("name" -> col1.toString(), "age" -> col2.toString())
val myCols2 = Map("gender" -> col3.toString())
val content = Map("cf1" -> myCols1, "cf2" -> myCols2)
col1.toString() -> content
}.toHBaseBulk(s"test")
Read data from Hbase and load into a spark RDD and a spark dataframe
// Read from Hbase -entire data at once - assuming all our data is loaded in single column family//Making a map of our columns
val columnlist = Map("cf1" -> Set("name", "age", "gender"))
//Read data from hbase and assign into sparkSQL Row- This will create a RDD of Rows
val hbaseRDD = sc.hbase[String](s"test", columnlist).map({
case (k, v) =>
val cf = v("cf1")
Row(k, cf("name"), cf("age"), cf("gender"))
})
//Convert RDD to DF using a case class of our columns- you can type case the data if you need at this stage
val hbaseDF = hbaseRDD.map({
case Row(val1: String, val2: String, val3: String) => (val1, val2, val3)
}).toDF("key", "name", "age", "gender")
Check out my Github account for full code - repo

Comments
Post a Comment
50% of time, all the time I am wrong, Please correct me!!