How to use MarkLogic in Apache Spark applications
Spark is quickly becoming an attractive platform for developers as it addresses multiple data-processing use cases through different components — Spark SQL, Spark Streaming, Machine Learning, Graph engine and so on. While Spark provides the framework for cluster computing, it does not include its own distributed data persistence technology (i.e. a database or a file system), relying on technologies like Hadoop/HDFS for that purpose. In fact, Spark can work with any Hadoop-compatible data formats.
You can use the MarkLogic Connector for Hadoop as an input source to Spark and take advantage of the Spark framework to develop your ‘Big Data’ applications on top of MarkLogic. Since the MarkLogic Connector for Hadoop already provides the interface for using MarkLogic as a MapReduce input source, I decided to use the same connector as an input source for my Spark application that we walk through below.
Spark Example Application: MarkLogicWordCount
MarkLogicWordCount is an example application designed to work with simple XML documents that contain name:value pairs — but it does much more than a simple word count. Here is an example XML document:
<?xml version="1.0" encoding="UTF-8"?> <complaint xmlns="http://data.gov.consumercomplaints"> <Complaint_ID>1172370</Complaint_ID> <Product>Credit reporting</Product> <Issue>Improper use of my credit report</Issue> <Sub-issue>Report improperly shared by CRC</Sub-issue> <State>CA</State> <ZIP_code>94303</ZIP_code> <Submitted_via>Web</Submitted_via> <Date_received>12/28/2014</Date_received> <Date_sent_to_company>12/28/2014</Date_sent_to_company> <Company>TransUnion</Company> <Company_response>Closed with explanation</Company_response> <Timely_response_>Yes</Timely_response_> <Consumer_disputed_>Yes</Consumer_disputed_> </complaint>
The complaint XML documents are stored within a MarkLogic database. The MarkLogicWordCount application loads all the documents from the database into Spark RDD (Resilient Distributed Dataset) and performs following operations:
- Extracts XML elements as name:value pairs where element content is the value.
- Counts distinct values for each element name.
- Counts occurrences of each distinct name:value pair across the document set.
- Saves the results from step 2 and 3 into the specified HDFS target location.
The application produces the output as shown below:
(Product,11) (Product:Bank account or service,44671) (Product:Consumer loan,12683) (Product:Credit card,48400) (Product:Credit reporting,54768) (Product:Debt collection,62662) (Product:Money transfers,2119) (Product:Mortgage,143231) (Product:Other financial service,191) (Product:Payday loan,2423) (Product:Prepaid card,626) (Product:Student loan,11489) (State,63) (State:,5360) (State:AA,10) (State:AE,141) (State:AK,465) ... ...
As you can see in the output above, the first line indicates that 11 distinct product names were found in the given data set (result of step 2) and following 11 lines indicates the number of times (or the number of documents in which) each of the 11 product names were found within the data set (result of step3). The output contains this statistic profile for all element names and associated name:value pairs found in the data set.
Hadoop Connector Setup
MarkLogic Connector for Hadoop is supported against Hortonworks Data Platform (HDP) and the Cloudera Distribution of Hadoop (CDH). Recent releases of HDP and CDH come bundled with Apache Spark. Please refer to Getting Started with the MarkLogic Connector for Hadoop in order to setup the Hadoop connector. Also refer to setup instructions specific to MarkLogicWordCount.
Although Spark is developed using Scala, it also supports application development in Java and Python. MarkLogicWordCount is a Java application.
Loading MarkLogic documents in Spark RDD
The first logical step within the application is to load documents from the MarkLogic database into a Spark RDD. The new RDD is created using the
newAPIHadoopRDD method on the
SparkContext object. The MarkLogic-specific configuration properties are passed using a Hadoop
Configuration object. These properties are loaded from the configuration XML that is passed as an input argument to the MarkLogicWordCount application. The properties include username, password, MarkLogic host, port, database name etc. We use the
DocumentInputFormat class that enables reading documents from the MarkLogic instance into
MarkLogicNode objects as key-value pairs. The following code demonstrates how to create an RDD based on documents within a MarkLogic database.
//first you create the spark context within java SparkConf conf = new SparkConf().setAppName("com.marklogic.spark.examples").setMaster("local"); JavaSparkContext context = new JavaSparkContext(conf); //Create configuration object and load the MarkLogic specific properties from the configuration file. Configuration hdConf = new Configuration(); String configFilePath = args.trim(); FileInputStream ipStream = new FileInputStream(configFilePath); hdConf.addResource(ipStream); //Create RDD based on documents within MarkLogic database. Load documents as DocumentURI, MarkLogicNode pairs. JavaPairRDD<DocumentURI, MarkLogicNode> mlRDD = context.newAPIHadoopRDD( hdConf, //Configuration DocumentInputFormat.class, //InputFormat DocumentURI.class, //Key Class MarkLogicNode.class //Value Class );
Apply Transformation to RDD
Now that we have loaded the documents in Spark RDD, let’s apply the necessary transformations to produce the intended output. Spark RDD supports two types of operations: transformations and actions. Transformations create a new dataset and actions return a value or save the dataset back to the persistence layer. While performing RDD operations, Spark’s API relies heavily on passing a function that is defined within a Spark application that will be executed in a Spark cluster. For example, map is a transformation that you can apply to a Spark RDD. The map API will take a user-defined function, pass each dataset element through that function and return a new RDD that represents the results. Since Spark is developed in Scala, which is fundamentally a functional programming language, this programming paradigm is very natural to Scala developers. Since we are developing the MarkLogicWordCount application in Java, we will implement the functions by extending the classes that are available in the Spark Java API, specifically in the package
org.apache.spark.api.java.function. Also we will use the special Java RDDs in
spark.api.java that provide the same methods as Scala RDDs but take Java functions.
Within the MarkLogicWordCount example we apply the following transformation steps to the RDD in which we have loaded the documents from the MarkLogic database.
- Transform the dataset of XML documents into the dataset that contains the XML name→value pairs found in each input XML document.
- Group the values of the same element name to transform name→value pairs to name→valueList
- Count distinct values for each element name to transform name→valueList to name→distinctValueCount
- Now again use the name→value pair dataset created in step 1 and transform it to a dataset that maps each name:value pair to its occurrence count i.e. name:value→count.
- Aggregate the occurrence count of each distinct name:value pair i.e. name:value→AggregateOccurrenceCount. This is the value distribution for each element.
- Filter out the name:value pairs that occur rarely within the dataset i.e. value distribution is statistically insignificant. Note that depending on the use case (for example anomaly detection) you may want to filter out the most commonly occurring name:value pairs and keep the rarely occurring ones in the dataset.
- Combine the dataset produced in step 3 (name→distinctValueCount) and the dataset produced in step 6 (name:value→AggregateOccurenceCount) into a single dataset.
- Sort the combined dataset so that each distinct value count for each element name and its associated value distribution statistics appear in alphabetical order.
- Finally save the dataset to the target HDFS location that is specified as an input argument to MarkLogicWordCount.
The following code demonstrates how these steps are accomplished within the MarkLogicWordCount example.
//extract XML elements as name value pairs where element content is value JavaPairRDD<String, String> elementNameValuePairs = mlRDD.flatMapToPair(ELEMENT_NAME_VALUE_PAIR_EXTRACTOR); //Group element values for the same element name JavaPairRDD<String, Iterable<String> > elementNameValueGroup = elementNameValuePairs.groupByKey(); //Count distinct values for each element name JavaPairRDD<String, Integer> elementNameDistinctValueCountMap = elementNameValueGroup.mapValues(DISTINCT_VALUE_COUNTER); //map the element name value pairs to occurrence count of each name:value pair JavaPairRDD<String, Integer> elementNameValueOccurrenceCountMap = elementNameValuePairs.mapToPair(ELEMENT_VALUE_OCCURRENCE_COUNT_MAPPER); //aggregate the occurrence count of each distinct name:value pair. JavaPairRDD<String, Integer> elementNameValueOccurrenceAggregateCountMap = elementNameValueOccurrenceCountMap.reduceByKey(ELEMENT_VALUE_COUNT_REDUCER); //filter out the name:value occurrences that are statistically insignificant JavaPairRDD<String, Integer> relevantNameValueOccurrenceCounters = elementNameValueOccurrenceAggregateCountMap.filter(ELEMENT_VALUE_COUNT_FILTER); //Combine the distinct value count for each element name and occurrence count for each distinct name:value pair in a single RDD JavaPairRDD<String, Integer> valueDistribution = elementNameDistinctValueCountMap.union(relevantNameValueOccurrenceCounters); //sort the RDD based on key value so that element name and name:value keys appear in order JavaPairRDD<String, Integer> sortedValueDistribution = valueDistribution.sortByKey(true, 1); //Save the output to HDFS location that is specified as a part of input argument. sortedValueDistribution.saveAsTextFile(args);
All RDD transformations in Spark are lazy. They are applied only when Spark encounters an action. In this case the only action performed on the RDD is
saveAsTextFile. By default each time you perform an action on a transformed RDD, that RDD may be recomputed. However, Spark allows you to cache the RDD in memory using the
cache methods for much faster access whenever you need to perform multiple actions on the RDD.
Note that in the code above, for many of the transformation steps, custom functions are passed as input parameters. To take a look at the implementation of these functions refer to the complete source code of MarkLogicWordCount. Feel free to download the source code, build it and try out the MarkLogicWordCount application yourself.