.@mpron SQL is roaring back, but it’s important to understand the CAP theorem and when availability is preferable over ACID #DZBigData
— Dean Wampler (@deanwampler) September 22, 2014
What MapReduce can't do
We discuss here a large class of big data problems where MapReduce can't be used - not in a straightforward way at least - and we propose a rather simple analytic, statistical solution.MapReduce is a technique that splits big data sets into many smaller ones, process each small data set separately (but simultaneously) on different servers or computers, then gather and aggregate the results of all the sub-processes to produce the final answer. Such a distributed architecture allows you to process big data sets 1,000 times faster than traditional (non-distributed) designs, if you use 1,000 servers and split the main process into 1,000 sub-processes.MapReduce works very well in contexts where variables or observations are processed one by one. For instance, you analyze 1 terabyte of text data, and you want to compute the frequencies of all keywords found in your data. You can divide the 1 terabyte into 1,000 data sets, each 1 gigabyte. Now you produce 1,000 keyword frequency tables (one for each subset) and aggregate them to produce a final table.However, when you need to process variables or data sets jointly, that is 2 by 2 or or 3 by 3, MapReduce offers no benefit over non-distributed architectures. One must come with a more sophisticated solution.
Here we are talking about self-joining 10,000 data sets with themselves, each data set being a time series of 5 or 10 observations. My point in my article is that the distributed architecture of Map Reduce does not bring any smart processing of these computations. Just brute force (better than a non-parallel approach, sure) but totally incapable of removing the very massive redundancy involved in these computations (as well as in storage redundancy, since the same data set must be stored on a bunch of servers to allow for the computations of all cross-correlations).
This underscores the importance of a major upcoming change to Hadoop - the introduction of YARN which will open up Hadoop's distributed data and compute framework for use with non-MapReduce paradigms and frameworks. One such example - RushAnalytics which, incidentally, also adds fine-grained parallelism within each node of a cluster.
Spark 1.5 shows MapReduce the exit
Apache Spark, the in-memory data processing framework nominally associated with Hadoop, has hit version 1.5.
This time around, improvements include the addition of data-processing features and a speed boost, as well as changes designed to remove bottlenecks to Spark's performance stemming from its dependencies on the JVM.
With one major Hadoop vendor preparing to ditch MapReduce for Spark, the pressure's on to speed up both Spark's native performance and its development.
Making sparks fly faster
A key component of the Spark 1.5 feature set is Project Tungsten, an initiative to improve Spark's performance by circumventing the limits of the JVM.
Many of Spark's speed limits are by-products of the JVM's garbage collection and memory management systems. Project Tungsten, pieces of which landed in Spark 1.4, rewrites key parts of Spark to avoid the bottlenecks entirely and enable features like allowing direct use of CPU cache memory to further speed up processing. Databricks, Spark's main commercial sponsor, has plans to eventually leverage GPU parallelism to further pick up the pace, but they remain theoretical for now.
Ericsson: Apache Storm vs Spark Streaming
In Telecom OSS & BSS Systems, there are cases when a continuous stream of massive amount of data needs to be processed in real time with very low latency. For example in a Network Operation Centre (NoC), detecting alarms and triggering fallback mechanisms have such strict demands in processing continuous streams of data at real time. In these cases, there are few problems that may arise when Spark streaming is put to use.
A serialized task might be very large due to a closure. This might occur especially when processing massive amounts of continuous streams of data with data structures like a hashmap.
For example, in a NoC, an engineer could program a use case to monitor network logs as given below:
hash_map = large_hash_map_of_network_logs()
rdd.map(lambda x: hash_map(x)) .count_by_value()
Current versions of spark streaming greater than 0.9.x detect such cases and warn the user. The ideal fix for this problem is to convert large objects to RDD’s.
Also, in cases where serialized tasks are very large, there could be further issues as the time it takes to write data between stages is large. Spark writes shuffle output to an OS buffer cache. If the task is large, as mentioned above, then it spends a lot of time writing output to the OS buffer cache. The solution for this problem is to allow several GB’s of buffer cache, especially when operating on large shuffles on large heaps.
In telecom charging and billing, many times there is a need to process a selected subset of data, for instance billing the monthly usage of a subscriber. In such cases, Spark streaming could generate a large number of empty tasks due to the usage of filters.
A billing system could use a date filter as given below:
rdd = source.textFile(“occ://mbb/usage/imsi-2015-usage-data”).map(lambda x: x.split(“\t”))
.filter(lambda parts: parts[0] == “2015-05-31”) .filter(lambda parts: parts[1] == “19:00”)
rdd.map(lambda parts: (parts[2], parts[3]).reduceBy…
The ideal solution for this is to use repartitioning to shrink RDD number of partitions after filtering.
A common mistake made by programmers is to overlook setting the number of reducers for a task. In such cases, Spark streaming inherits the number of reducers from the parent RDD’s. If parent RDD’s have too many reducers, then there is significant overhead in launching the task. Too few reducers might end up in limited parallelism in the cluster. Setting the apt number of reducers is very important especially in transactional systems like telecom systems.
Twitter Has Replaced Storm with Heron
Twitter has replaced Storm with Heron which provides up to 14 times more throughput and up to 10 times less latency on a word count topology, and helped them reduce the needed hardware to a third.
Twitter used Storm to analyze large amounts of data in real time for years, and open sourced it back in 2011. The project was later incubated at Apache, becoming a top level project last fall. Having a quarterly release cycle, Storm has reached version 0.9.5 and is approaching the stable and desired version 1.0. But all this time, Twitter has been working on a replacement called Heron because Storm is no longer up to the task for their real-time processing needs.
Topologies are deployed to an Aurora scheduler which in turn runs them as a job consisting of several containers (cgroups): a Topology Master, a Stream Manager, a Metrics Manager (for performance monitoring) and multiple Heron Instances (spouts and bolts). A topology’s metadata is kept in ZooKeeper. The processing flow is adjusted using a backpressure mechanism to control the amount of data flowing through the topology. Besides Aurora, Heron can use other service schedulers such as YARN or Mesos. Instances run user code written in Java, and there is one JVM per instance. Heron processes communicate with each other via protocol buffers and there can be multiple containers on a single machine. (More details on Heron’s internal architecture can be found in the paper, Twitter Heron: Stream Processing at Scale.)
Twitter has completely replaced Storm with Heron which is currently processing “several tens of terabytes of data, generating billions of output tuples” per day, “delivering 6-14X improvements in throughput, and 5-10X reductions in tuple latencies” on a standard word count test, and resulting in a 3X reduction in hardware.
When we asked if Twitter intends to open source Heron, Ramasamy said “in the short term no, long term maybe.”
NoSQL for Telco
No-SQL encompasses a wide variety of database technologies that are an “internet” solution for handling the rise in the volume of data stored, the frequency in which this data is accessed and performance needs. We have done research experiments to find out if those approaches hold for the specific requirements that apply to the Telco domain.Experiment 1) Looking at replacing a subscription database such as the Home Location Register (HLR) database with HBase. It is technically feasible to use HBase as an HLR database replacement, but it is a poor fit.Experiment 2) Stressing an HBase/Hadoop File System (HDFS) cluster by persisting a stream of data events as they would be generated from a mobile network.HBase and Hadoop will not perform well out of the box, both requiring a lot of tuning. For superior performance, parts of the configuration can be specific to the use case. Even though we did all of this in our setup, the data batches recurring at constant intervals had the undesired effect of triggering a compaction storm. This can be addressed by manually managing compaction and/or improvements in the HBase software.
.@Wiley_Oompa Netflix has an incredible Cassandra story. Lots of talks on the web about it. Facebook uses HBase for messaging. #DZBigData
— Dean Wampler (@deanwampler) September 22, 2014
.@sarveshgupta89 @mapr Only use HBase if you need CRUD. Rare for log data. Straight HDFS/MapRFS rights faster(?) Use Flume in any case.
— Dean Wampler (@deanwampler) September 22, 2014
Great article on using @ApacheSpark for #ETL with @ApacheParquet http://t.co/5mwoT31ewN
— Orrin Edenfield (@OrrinEdenfield) September 15, 2014
How do I learn about big data?
I'll try to give a very crude overview of how the pieces fit in together, because the details span multiple books. Please forgive me for some oversimplifications.
- MapReduce is the Google paper that started it all (Page on googleusercontent.com). It's a paradigm for writing distributed code inspired by some elements of functional programming. You don't have to do things this way, but it neatly fits a lot of problems we try to solve in a distributed way. The Google internal implementation is called MapReduce and Hadoop is it's open-source implementation. Amazon's Hadoop instance is called Elastic MapReduce (EMR) and has plugins for multiple languages.
- HDFS is an implementation inspired by the Google File System (GFS) to store files across a bunch of machines when it's too big for one. Hadoop consumes data in HDFS (Hadoop Distributed File System).
- Apache Spark is an emerging platform that has more flexibility than MapReduce but more structure than a basic message passing interface. It relies on the concept of distributed data structures (what it calls RDDs) and operators. See this page for more: The Apache Software Foundation
- Because Spark is a lower level thing that sits on top of a message passing interface, it has higher level libraries to make it more accessible to data scientists. The Machine Learning library built on top of it is called MLib and there's a distributed graph library called GraphX.
- Pregel and it's open source twin Giraph is a way to do graph algorithms on billions of nodes and trillions of edges over a cluster of machines. Notably, the MapReduce model is not well suited to graph processing so Hadoop/MapReduce are avoided in this model, but HDFS/GFS is still used as a data store.
- Zookeeper is a coordination and synchronization service that a distributed set of computer make decisions by consensus, handles failure, etc.
- Flume and Scribe are logging services, Flume is an Apache project and Scribe is an open-source Facebook project. Both aim to make it easy to collect tons of logged data, analyze it, tail it, move it around and store it to a distributed store.
- Google BigTable and it's open source twin HBase were meant to be read-write distributed databases, originally built for the Google Crawler that sit on top of GFS/HDFS and MapReduce/Hadoop. Google Research Publication: BigTable
- Hive and Pig are abstractions on top of Hadoop designed to help analysis of tabular data stored in a distributed file system (think of excel sheets too big to store on one machine). They operate on top of a data warehouse, so the high level idea is to dump data once and analyze it by reading and processing it instead of updating cells and rows and columns individually much. Hive has a language similar to SQL while Pig is inspired by Google'sSawzall - Google Research Publication: Sawzall. You generally don't update a single cell in a table when processing it with Hive or Pig.
- Hive and Pig turned out to be slow because they were built on Hadoop which optimizes for the volume of data moved around, not latency. To get around this, engineers bypassed and went straight to HDFS. They also threw in some memory and caching and this resulted in Google's Dremel (Dremel: Interactive Analysis of Web-Scale Datasets), F1 (F1 - The Fault-Tolerant Distributed RDBMS Supporting Google's Ad Business), Facebook's Presto(Presto | Distributed SQL Query Engine for Big Data), Apache Spark SQL(Page on apache.org ), Cloudera Impala (Cloudera Impala: Real-Time Queries in Apache Hadoop, For Real), Amazon's Redshift, etc. They all have slightly different semantics but are essentially meant to be programmer or analyst friendly abstractions to analyze tabular data stored in distributed data warehouses.
- Mahout (Scalable machine learning and data mining) is a collection of machine learning libraries written in the MapReduce paradigm,specifically for Hadoop. Google has it's own internal version but they haven't published a paper on it as far as I know.
- Oozie is a workflow scheduler. The oversimplified description would be that it's something that puts together a pipeline of the tools described above. For example, you can write an Oozie script that will scrape your production HBase data to a Hive warehouse nightly, then a Mahout script will train with this data. At the same time, you might use pig to pull in the test set into another file and when Mahout is done creating a model you can pass the testing data through the model and get results. You specify the dependency graph of these tasks through Oozie (I may be messing up terminology since I've never used Oozie but have used the Facebook equivalent).
- Lucene is a bunch of search-related and NLP tools but it's core feature is being a search index and retrieval system. It takes data from a store like HBase and indexes it for fast retrieval from a search query. Solr uses Lucene under the hood to provide a convenient REST API for indexing and searching data. ElasticSearch is similar to Solr.
Something's wrong if you need this much: “Hadoop … [has] become a kernel with 20+ complementary comps. #StrataHadoop pic.twitter.com/0vr6CViQut”
— Dean Wampler (@deanwampler) February 20, 2015
Hadoop illuminated
'Hadoop illuminated' is the open source book about Apache Hadoop™. It aims to make Hadoop knowledge accessible to a wider audience, not just to the highly technical.
new guide: "SQL to @scalding" #hadoop https://t.co/Jd2TnyLfgq
— Twitter Open Source (@TwitterOSS) December 4, 2014
How do I learn about big data?
I'll try to give a very crude overview of how the pieces fit in together, because the details span multiple books. Please forgive me for some oversimplifications.
- MapReduce is the Google paper that started it all (Page on googleusercontent.com). It's a paradigm for writing distributed code inspired by some elements of functional programming. You don't have to do things this way, but it neatly fits a lot of problems we try to solve in a distributed way. The Google internal implementation is called MapReduce and Hadoop is it's open-source implementation. Amazon's Hadoop instance is called Elastic MapReduce (EMR) and has plugins for multiple languages.
- HDFS is an implementation inspired by the Google File System (GFS) to store files across a bunch of machines when it's too big for one. Hadoop consumes data in HDFS (Hadoop Distributed File System).
- Apache Spark is an emerging platform that has more flexibility than MapReduce but more structure than a basic message passing interface. It relies on the concept of distributed data structures (what it calls RDDs) and operators. See this page for more: The Apache Software Foundation
- Because Spark is a lower level thing that sits on top of a message passing interface, it has higher level libraries to make it more accessible to data scientists. The Machine Learning library built on top of it is called MLib and there's a distributed graph library called GraphX.
- Pregel and it's open source twin Giraph is a way to do graph algorithms on billions of nodes and trillions of edges over a cluster of machines. Notably, the MapReduce model is not well suited to graph processing so Hadoop/MapReduce are avoided in this model, but HDFS/GFS is still used as a data store.
- Zookeeper is a coordination and synchronization service that a distributed set of computer make decisions by consensus, handles failure, etc.
- Flume and Scribe are logging services, Flume is an Apache project and Scribe is an open-source Facebook project. Both aim to make it easy to collect tons of logged data, analyze it, tail it, move it around and store it to a distributed store.
- Google BigTable and it's open source twin HBase were meant to be read-write distributed databases, originally built for the Google Crawler that sit on top of GFS/HDFS and MapReduce/Hadoop. Google Research Publication: BigTable
- Hive and Pig are abstractions on top of Hadoop designed to help analysis of tabular data stored in a distributed file system (think of excel sheets too big to store on one machine). They operate on top of a data warehouse, so the high level idea is to dump data once and analyze it by reading and processing it instead of updating cells and rows and columns individually much. Hive has a language similar to SQL while Pig is inspired by Google'sSawzall - Google Research Publication: Sawzall. You generally don't update a single cell in a table when processing it with Hive or Pig.
- Hive and Pig turned out to be slow because they were built on Hadoop which optimizes for the volume of data moved around, not latency. To get around this, engineers bypassed and went straight to HDFS. They also threw in some memory and caching and this resulted in Google's Dremel (Dremel: Interactive Analysis of Web-Scale Datasets), F1 (F1 - The Fault-Tolerant Distributed RDBMS Supporting Google's Ad Business), Facebook's Presto(Presto | Distributed SQL Query Engine for Big Data), Apache Spark SQL(Page on apache.org ), Cloudera Impala (Cloudera Impala: Real-Time Queries in Apache Hadoop, For Real), Amazon's Redshift, etc. They all have slightly different semantics but are essentially meant to be programmer or analyst friendly abstractions to analyze tabular data stored in distributed data warehouses.
- Mahout (Scalable machine learning and data mining) is a collection of machine learning libraries written in the MapReduce paradigm,specifically for Hadoop. Google has it's own internal version but they haven't published a paper on it as far as I know.
- Oozie is a workflow scheduler. The oversimplified description would be that it's something that puts together a pipeline of the tools described above. For example, you can write an Oozie script that will scrape your production HBase data to a Hive warehouse nightly, then a Mahout script will train with this data. At the same time, you might use pig to pull in the test set into another file and when Mahout is done creating a model you can pass the testing data through the model and get results. You specify the dependency graph of these tasks through Oozie (I may be messing up terminology since I've never used Oozie but have used the Facebook equivalent).
- Lucene is a bunch of search-related and NLP tools but it's core feature is being a search index and retrieval system. It takes data from a store like HBase and indexes it for fast retrieval from a search query. Solr uses Lucene under the hood to provide a convenient REST API for indexing and searching data. ElasticSearch is similar to Solr.
Researchers from IBM benchmark Impala, Hive / Tez. Impala is faster. It's not that close. http://t.co/BSZ2OoO6Nw pic.twitter.com/cuDKLHLLtt
— Henry Robinson (@HenryR) August 4, 2014
Hadoop & Java @jug_chennai http://t.co/ctdPvoVVlK
— NightHacking (@_nighthacking) April 26, 2014
Thomas w Dinsmore: Apache Spark Page
Apache Spark is an open source distributed computing framework for advanced analytics in Hadoop. Originally developed as a research project at UC Berkeley’s AMPLab, the project achieved incubator status in Apache in June 2013 and top-level status in February 2014. According to one analyst, Apache Spark is among the five key Big Data technologies, together with cloud, sensors, AI and quantum computing.
Organizations seeking to implement advanced analytics in Hadoop face two key challenges. The first is performance: MapReduce persists intermediate results to disk after each pass through the data; as a result, iterative algorithms implemented in MapReduce run significantly slower than they do on distributed in-memory platforms.
A second key challenge for organizations is complexity introduced by multiple analytic point solutions in Hadoop: Mahout for machine learning; Giraph, and GraphLab for graph analytics; Storm and S4 for streaming; or Hive, Impala and Stinger for interactive queries. Many real-world applications require integration across projects, which is challenging; multiple projects also add to support costs.
Spark addresses these challenges. It supports distributed in-memory processing, so developers can write iterative algorithms without writing out a result set after each pass through the data. This enables true high performance advanced analytics; for techniques like logistic regression, project sponsors report runtimes in Spark 100X faster than what they are able to achieve with MapReduce.
Second, Spark offers an integrated framework for analytics, including:
- Machine learning (MLLib)
- Graph analytics (GraphX)
- Streaming analytics (Spark Streaming)
Spark’s core is an abstraction layer called Resilient Distributed Datasets, or RDDs. RDDs are read-only partitioned collections of records created through deterministic operations on stable data or other RDDs. RDDs include information about data lineage together with instructions for data transformation and (optional) instructions for persistence. They are designed to be fault tolerant, so that if an operation fails it can be reconstructed.
- SQL interface (Spark SQL)
For data sources, Spark works with any file stored in HDFS, or any other storage system supported by Hadoop (including local file systems, Amazon S3, Hypertable and HBase). Hadoop supports text files, SequenceFiles and any other Hadoop InputFormat. Through Spark SQL, the Spark user can import relational data from Hive tables and Parquet files.
Currently, Spark supports programming interfaces for Scala, Java and Python; MLLib algorithms support sparse feature vectors in all three languages. For R users, Berkeley’s AMPLab released a developer preview of SparkR in January 2014; the project team expects to integrate this interface into Spark as of Release 1.4 in June, 2015.
Apache Parquet paves the way for better Hadoop data storage
Apache Parquet, which provides columnar storage in Hadoop, is now a top-level Apache Software Foundation (ASF)-sponsored project, paving the way for its more advanced use in the Hadoop ecosystem.
Already adopted by Netflix and Twitter, Parquet began in 2013 as a co-production between engineers at Twitter and Cloudera to allow complex data to be encoded efficiently in bulk.
Databases traditionally store information in rows and are optimized for working with one record at a time. Columnar storage systems serialize and store data by column, meaning that searches across large data sets and reads of large sets of data can be highly optimized.
Hadoop was built for managing large sets of data, so a columnar store is a natural complement. Most Hadoop projects can read and write data to and from Parquet; the Hive, Pig, and Drill projects already do this, as well as conventional MapReduce.
As another benefit, per-column data compression further accelerates performance in Parquet. A textual data column is compressed differently than a column loaded with only integer data, and being able to compress columns separately provides its own performance boost. Parquet also implements column compression so that it's "future-proofed to allow adding more encodings as they are invented and implemented."
Early adopters and project leads have used Parquet for some time and built functionality around it. Cloudera, the project's co-progenitor, uses Parquet as a native data storage format for its Impala analytics database project, and MapR has added data self-description functions to Parquet. Netflix -- never one to shy away from a forward-looking technology (such as Cassandra) -- has 7 petabytes of warehoused data in Parquet format, according to the ASF.
Parquet isn't the only way to store columnar data in Hadoop, but it's shaping up as the leader. Hive has its own columnar-data format, called ORC, although it's mainly intended as an extension to Hive rather than as a general data store for Hadoop.
Hortonworks, a Cloudera competitor (in more ways than one), claimed earlier in Parquet's lifecycle that ORC compresses data more efficiently than Parquet. And IBM ran its own performance comparisons in September 2014 and found that while ORC used the least amount of HDFS storage, Parquet had the best overall query and analysis time, which are the metrics that typically matter most for Hadoop users.
The Paper Trail: Apace Impala
Traditional data analytics platforms are extremely powerful, but rely on an integrated approach – you don’t buy a couple of servers and throw a data warehouse that you downloaded on them, you buy a fully integrated solution that includes hardware and software. The hardware is tuned to the requirements of the software, and the data layout is carefully managed to get great performance on supported queries. This is a very serious platform archetype, and one that I believe will continue to be important to the market, but it’s fundamentally not configured to process data at the scale that the storage substrate now enables, because you would have to move data into the processing platform to do so, which is a very costly proposition. So typically what you do is identify an important subset of your data that requires the full analytics power to your integrated solution, and move that and only that. But that blunts the advantage of keeping all of that data in the first place – if you can only process some of it, why are you keeping all of it?
This is where Apache Hadoop originally came in. Hadoop shines for heavy-lifting batch-processing jobs that have high tolerance for latency but need to touch most bytes in your dataset. Where it doesn’t shine is for interactive analyses, where a user might be refining the analytic questions they ask iteratively. It’s useless to wait for 30 minutes just to find that you probably should have grouped by a different columns. Similarly, users of popular BI tools will want to construct reports that they can interact with. Impala is the tool for these use cases; by allowing for relatively low-latency queries over the great unwashed masses of your data you get the value of the data coupled with some of the processing power of the integrated engines. We’ve brought some of the execution engine magic of massively-parallel distributed databases to the incredibly flexible storage engine model of HDFS and HBase.
Thanks for sharing this informative information..
ReplyDeleteFor Storm components all over information you may also refer.... http://www.s4techno.com/blog/2016/08/13/storm-components/
This is a nice article about Apache Nifi and Apache Livy. You can click here to learn more about machine learning service providers and enhance your knowledge in this field to enhance your skills.
ReplyDeleteGoogle brain is working in the Big data consulting services to make it a huge success for the world. We hope that society will soon use AI devices at a reasonable cost.
ReplyDelete