How to build a datawarehouse on Hadoop
Building data warehouse on Hadoop platform
Data warehouse on Big Data platform is a standard use case many organizations are exploring. The reason for this approach could be one of many flexibility big data platform offer.
- Big Data for enterprise wide analytics - Many organization are moving to a concept of having data hub or data lake where data is gathered and stocked based on source systems, rather than project based or subject area based data warehouse. Advantage of the approach is to remodel and design marts anytime based on the requirement.
- Cost - Hadoop has become cheap and alternate storage medium.
- Faster analytics - Big Data platform or Hadoop common moniker for big data can take care of traditional batch systems as well as near-real time decision support(2 seconds to 2 minutes is the time taken to make an action since data delivery) or near real time event processing(100 milli seconds to 2 seconds this the time taken to make an action since data delivery)
- Disparate data sources which traditional warehouse could not support
- High data volume - Huge volume of data which cannot be handled by RDBMS
- Flexibility for data exploration - Want to discover / explore on data which was previously underused or unused
This
page shows an approach, how a traditional warehouse can be converted to
a warehouse on BigData platform. Since Hadoop ecosystem is very wide
and the range of technology available within ecosystem makes it
difficult to choose which tool has to be used in use cases. But the
other side of the coin is, this give a flexibility to try out tools
based on use cases and other considerations like performance, operations
and support etc.
This demo will make use of open source technology available on Hadoop stack like HDFS and Hive.
HDFS
- is the core of Hadoop, this is a distributed file system which
enables all the wonder Hadoop can do. There are many other distributed
file system like Amazon S3, but when we talk about Hadoop HDFS is the
moniker for file system. Most of the Hadoop marketing vendors like
Cloudera, IBM BigInsights, Hortonworks etc use HDFS as their file
system.
Hive - Hive is an apache project
which introduced the concept of Databases,tables and SQL to big data
world. Hive is a work in progress. Hive use HDFS for storage of data.
More than opening up the flexibility of SQL to file system, Hive is used
for its meta data store for HDFS. Meta data store is nothing but a
database which stores information about files stored in HDFS, structure
of files, partitions, buckets, serialization, de serialization needed to
read and write files etc. This will enable Hive to query files based on
the metadata stored in Hive meta store. To summarize Hive provides data
abstraction and data discovery to files.
"How is the performance of Hive"
The
performance of Hive is based on many factors. It includes the best
practices normally followed in relational database world to have an
optimized performance as well as Hadoop considerations like file
storage, compression techniques etc.
To simplify, let us check what happens when a query is run on Hive.
- Query submitted to Hive through an UI
- Hive driver - works on a execute and fetch model
- Hive's compiler read meta store and get table details , structure, serialization , de serialization needed to read and write files etc. Compiler then create execution plan. Following the footsteps of established DBMS Hive use a cost based execution (rule based and correlation optimizer is also available). It generates a plan with multiple stages based on the cost for each operation.
- Hive's execution engine executes the plan created by compiler. It plans and managed dependencies between different stages. Each stage is nothing but a map and reduce operation of "MapReduce" of Hadoop.The output of individual stages are written to temporary files and used by subsequent stages. Output of final stage is written to the table's file location.
Looking
at the above execution pattern,Hive tables can be designed in many ways
to have optimized performance. But each approach has pros and cons.
Considering the priority of requirements decision has to be made.
Data storage option
- File format - Hadoop support multiple file format option. Files can be stored as flat file, Hadoop supported sequence files, other complex but option rich formats like ORC, RC, Parquet etc. Hive supports all storage formats mentioned. But consideration also should be given to other access techniques to this data. If Hive is the only access option then any of the storage works. This will ensure faster query execution. But if other tools like ETL tools are used to access files it is not a guarantee that tool support all formats.
- Compression : compression is another way to optimize the query. Multiple compression codec like Gzip, snappy etc are available for Hive. Downside of GZIP is that files are not split - able. Compression codecs used here has different attributes, some compression work faster which need less CPU, while other split files during compression which need more CPU. split -ability of files on Hadoop is a significant factor when it comes to performance.
- How data is stored or organized - Organizing data in Hadoop is a discipline. The arrangement of data has a significant impact on querying. If the query is going to target a section of data we need to make sure query is hitting only that section not the entire data set available. Data can be sorted based on a transaction level, something like type of data or timestamp when event happened
Data hub Folder structure
The generic folder pattern to store data in data hub is based on source system.
/Source System/Type of data/Transaction Date
Data hub also can be made on a subject area basis rather than source system basis.
/credit system / risk data / Transaction date
This
decision has to be taken on an enterprise level because synergy between
programs and projects will be needed to achieve this on an enterprise
scale.
Let us take a peek at how files and folders can be created based on source system.
e.g. We have file which has Movie information like movie_id, movie_name, year_of_release(2016), rating from rotten tomatoes and imDB files can be arranged this way.
/rotten_tomatoes/movie_info/2016
/imDB/movie_info/2016
Similarly if we have
movie reviews from these vendors with information like movie_id,
review_id, individual_rating, comment it can stored as
/rotten_tomatoes/movie_reviews/2016
/imDB/movie_reviews/2016
Now let us check a few possible scenarios how this data is consumed.
- Get movies with rating more than 3
- Get movie ids with maximum reviews
- Get count of movies released after 2014
These are scenarios which access only one file at a time, no need of joining between files.
Now let us look at some other scenario.
- Get the movie names and other information related with the movies with maximum review
This is an example where data has to be joined between files.
Understanding
the joins and the queries will give a better idea how to store data.
But to make it generic the above folder structure can be improved to
rotten_tomato/movie_info/year_of_release=2016
rotten_tomato/movie_reviews/year_of_release=2016
This
folder pattern will help to access all movies and review related to
year 2016. In this case data from 2016 can be access without reading
other year data. In the above example technique we used is called
partitioning data based on year. This kind of data arrangement helps not
only Hive, but also other tools like impala, spark etc which use Hive's
meta store for metadata. This folder arrangement can be compared to
partitions on relational database because Hive knows where to find data
rather than doing a full data scan.
Data hub on
Hadoop avoid need of building an extra arcHive system which traditional
data warehouse houses. In traditional DWH the source files once consumed
by ETL are backed and go offline after a period of retention. Bringing
the data back online is significant effort and most cases it goes unused
over the time. The above discussed folder mechanism avoid the need of a
separate arcHive mechanism. Since space is cheap on Hadoop compared to
relational databases this option works out well.
Can we do relational data modeling on Hadoop?
Relational
data modeling is a very common use case when Hadoop is chosen platform
for enterprise data warehouse. The answer to the question is, it is
technically possible to do it on Hadoop platform, with the help of Hive
meta store. If we go one step back and look at the folder pattern
designed, it is quite obvious the folders can be designed and arranged
to follow relational data modeling on RDBMS.
Data
access on HDFS is "schema on read". Irrespective of how and where data
is stored, it can be read the way we want it to be read. Unlike
traditional RDBMS where the schema has to be modeled and data is loaded
into tables. This need extensive analysis on data and spending effort
and time on optimized data model. With Hadoop this effort is
significantly reduced, as the model change does not require data to be
reloaded.
To explain this, let us look at sales file with data fields customer name, customer id, customer email , customer phone number , item bought, item quantity, item price.
In
relational database this data can be normalized and split into customer
information, product information and sales information and linked back
with customer id and sales id and item id. This kind of model reduce
redundancy, which is a major factor in RDBMS as the storage is costly.
With right kind of indexes built the joined queries will yield better
results.
- Customer_Info
- Customer ID
- Customer Name
- Phone
- Items_Info
- Items ID
- Item Description
- Item Name
- Item price
- Sales_Fact
- Sales ID
- Customer ID
- Item ID
- Item Quantity
- Sales Date
Advantage
of the Hadoop is that we can still have the same logical model and
physically implemented in multiple ways without data movement.
+----------+------------+--------------+-----------+------+-------------+---------+----------+------ |CustomerID|CustomerName|Email|Phone|ItemID|ItemDesc| ItemName|ItemsPrice|SalesId|SalesDate|ItemQty| +----------+------------+--------------+-----------+------+-------------+---------+----------+------ | 100|John|john@gmail.com|+1-99999999| 1000| Product One|Product_1| 9.99|S1000|2016-01-01| 2| | 100|John|john@gmail.com|+1-99999999| 1001| Product Two|Product_2|19.99|S1000|2016-01-01| 2| | 100|John|john@gmail.com|+1-99999999| 1002|Product Three|Product_3|29.99|S1000|2016-01-01| 2| | 100|John|john@gmail.com|+1-99999999| 1003| Product Four|Product_4|39.99|S1000|2016-01-01| 2| | 200|Jane|jane@gmail.com|+1-99998888| 1000| Product One|Product_1| 9.99|S1000|2016-01-02| 2| | 200|Jane|jane@gmail.com|+1-99998888| 1001| Product Two|Product_2|19.99|S1000|2016-01-02| 2| | 200|Jane|jane@gmail.com|+1-99998888| 1002|Product Three|Product_3|29.99|S1000|2016-01-02| 2| | 200|Jane|jane@gmail.com|+1-99998888| 1003| Product Four|Product_4|39.99|S1000|2016-01-02| 2| +----------+------------+--------------+-----------+------+-------------+---------+----------+-------
The
above view shows a de normalized format of the data which is a
convenient format to have on a presentation layer or a semantic layer
where we need to run fact based analytical functions.
e.g. find total quantity of items sold on product one or total revenue on Product Two etc .etc.
In
normalized modeled, data model this will require joining at least 3
tables. In Hadoop we know joining data is an expensive operation due to
shuffle and multiple disk IO. In Hadoop platform consideration is given
to performance than the storage, as storage is cheaper.
Method 1 - Normalized Model
Physical implementation of normalized logical model
Let us try to implement the normalized form with same physical implementation. In this method 3 tables are created
Customer_Info
Items_Info
Sales_FactCREATE TABLE Customer_Info(customerId BIGINT, CustomerName STRING, Email STRING,Phone STRING) COMMENT 'A bucketed copy of customer_info' CLUSTERED BY(customerId) INTO 256 BUCKETS; CREATE TABLE Items_Info(ItemID BIGINT, ItemDesc STRING, ItemName STRING,ItemsPrice DOUBLE) COMMENT 'A bucketed copy of items_info' CLUSTERED BY(ItemID) INTO 256 BUCKETS; CREATE TABLE Sales_Fact(SalesId BIGINT,customerId BIGINT, ItemID BIGINT, ItemQty BIGINT) COMMENT 'A bucketed copy of sales fact' PARTITIONED BY(SalesDate STRING) CLUSTERED BY(customerId,ItemID) INTO 256 BUCKETS;
Data is bucketed based on customerID,Item ID on tables. Bucketing
is a technique where data is distributed based on a hash on
distribution key. Picking up distribution key is crucial as we need to
make sure data is getting distributed evenly across the data nodes. You
can see that sales data is partitioned based on sales date, this will
help in queries where data is accessed only from certain days or a range
of days. e.g. where salesDate='01/31/2016'.
Bucketing
data on same distribution key helps while joining. In the above model,
most of the queries where join is needed joining predicate is going to
be ItemId or customerID. Since data is distributed based on these
fields, it will ensure the data joined on the keys will not need
shuffling or moving over the network. This will significantly reduce the
joining time. On Hadoop map reduce paradigm joining on map side will
save time than joining on reducer side. Hive automatically detects the
buckets and this optimization is applied.
256 in
above example for the bucket size is a number, which has to be
determined based on the data volume and nodes available. Normal bucket
size has to be a few hdfs blocks size and which can be fit into memory.
Having too many buckets with small bucket size is not recommended.
Advantages
- Useful when a data is deterministic and data model works fine for the requirement
- Useful when queries used are static and predictive
- Easy migration from relational databases
Disadvantages
- Cannot be used for dynamic queries or other analytical purpose
- Model change will need change in ETL and data reload will be needed
- Performance will become issue as the data grows and get skewed. Constant redistribution will be needed on the data if bucketed keys are not good enough.
When data is normalized and stored with a star schema, we will have to consider maintaining ACID
to transactions too. Technically, we should be able to insert, update
or delete data. As you know in HDFS in place modification of data is not
easy. But with other techniques can be used to work around that
- Append data with a last_updated_ts instead of updating records
- Keep history data and latest data in two set of files.
- Use Hive to update data using How to use Hive for CRUD- Run updates and deletes on Hive
- Create a view on top of the history data to pull only latest data rather than materializing using the technique mentioned above
Method 2- De normalized data
The
second method talks about flattening the data after adding all the
possible attributes needed in a query. This can be compared to flat
Hbase tables with column families. In this method we will have just one
table for all our needs. The advantage of the de normalized data is that
the code can read a bigger chunk of data rather than reading same data
from multiple small chunks.
CREATE TABLE Sales_Fact_Flat(customerId BIGINT, CustomerName STRING, Email STRING,Phone STRING, ItemID BIGINT, ItemDesc STRING, ItemName STRING,ItemsPrice DOUBLE, SalesId BIGINT,ItemQty BIGINT) COMMENT 'De normalized data' PARTITIONED BY(SalesDate STRING)
Data
is partitioned to make search /select for partial access. Logical
column for such partition will be a timestamp or date. The only
consideration when we choose partition is on file size. Partition
information of file is stored in name node. Too many partitions on small
files will be heavy on name node. And reading small files from each
partition will be a not a good use of reader operation as well as
memory. If the files are few HDFS block size partitioning works well. If
files are too small, it can be merged together to have a reasonable
partition. e.g. Daily files are a few kilo bytes. Many such files can be
put together in a month partition or weekly partition based on data
volume.
Advantages
- Faster query execution
- Suitable for out of the box analytics
Disadvantages
- Data redundant - You can see customer and item data is repeated across sales information. Storage is not a primary concern on Hadoop, if you decide to avoid redundancy the trade off is between space and performance.
- Not a good use case where data has to be updated constantly.
When to pick De normalization over method 1 (normalized data / star schema)?
- When joined tables are small and have a one to one match when joined.
- When
the column width (number of columns) are less. If you have wide tables,
it consume more space and block read size becomes large
- When data is not updated much. If you have to go back and update some random fields every time , de normalization technique will not work. For e.g. age , if age field is added part of transaction, we know it has to be modified every year.
Achieving same logical model on de normalized data
Most
of the organization use haddop to complement the existing well
established EDW. The models are build agnostic to the platform, but as
we have seen Hadoop requires a different kind of physical implementation
compared to RDBMS. Irrespective of that same logical model can be
implemented in multiple ways
Splitting to Dimensions / sub tables
Above de normalized sales data can be split into dimensions and facts which we discussed already
Customer_Info
Items_Info
Sales_Fact
To achieve this either,
- Have a non materialized view built on top of Sales_Fact_Flat table Customer info can be recreated using a view
- create view Customer_Info as select distinct
- customerId BIGINT, CustomerName STRING, Email STRING,
- Phone STRING from Sales_Fact_Flat
similarly other views can be recreated. The downside of views is that every time a view is used in query a map reduce run behind the scene to materialize the result. - Instead of views, materialize the data to tables using CTAS (create table as select)
-
create table Customer_Info as select distinct
- customerId BIGINT, CustomerName STRING, Email STRING,
- Phone STRING from Sales_Fact_Flat
Advantage of these methods are, these views/tables can be used as a independent entity for other analysis.
To
summarize, blog talks about different viable options to implement data
warehouse on Hadoop platform . We also talk about the various file and
folder creation option based on access pattern, optimization and storage
techniques.
Hope this article helps, please use comment section for further discussion.

First of all thank you for this very good article. While researching DWH on Hadoop for my Thesis, I found it and it helped me in understanding how one could do "sort-of" relational modeling on Hadoop.
ReplyDeleteNow, seeing that this article was published in August, 2016. When you are working on DWH on Hadoop is this basically what you still do? There is lots of cool new stuff like Apache Impala or Apache Kudu which I think have their place in the "DWH on Hadoop"-world, but the general strategy when you use HDFS as your basis remains the same as you described, I think.
Again, thank you for your article.
FelixR
Hi Felix,
DeleteThanks for comments! I work on similar projects now also. We have Impala and Kudu in mix. Kudu is interesting project but catch is that file is not on HDFS. Kudu works on local file system where "immutability" is not an issue. For record volume <10 million for ACID we use kudu to an extend, but if it is more than that we prefer replacing in Hive or updating Hbase for CDC
Interesting, so you found Hbase to have a better performance on large Datasets than Kudu when doing CDC?
DeleteI thought Kudu+Impala are very nice in perfomance aspect.
Kudu Trade-Offs:
DeleteRandom updates will be slower
HBase model allows random updates without incurring a disk seek
Kudu requires a key lookup before update, Bloom lookup before insert
Single-row reads may be slower
Columnar design is optimized for scans
Future: may introduce “column groups” for applications where single-row access is more important
LSM vs Kudu:
LSM – Log Structured Merge (Cassandra, HBase, etc)
Inserts and updates all go to an in-memory map (MemStore) and later flush to on-disk files (HFile/SSTable)
Reads perform an on-the-fly merge of all on-disk HFiles
Kudu
Shares some traits (memstores, compactions)
More complex.
Slower writes in exchange for faster reads (especially scans)
Thanks for this article Hari.this stands out a good blog as there is very less information on dwh modeling in bigdata.
ReplyDeleteI have a question on the denormalization, can we also go for horizontal denormalization e.g in banking if we have different account types like commercial account ,loan account , credit card account,deposit account etc and all these account have only few common attributes among each other like account number ,account balance, open date,account status and remaining 70% of the columns are mutually exclusive to the respective account type....Is denormalization allowed in this because if you do so you will have lot of sparse data.
IT's very informative blog and useful article thank you for sharing with us , keep posting learn more about Product engineering services | Product engineering solutions.
ReplyDelete