1、Apache Spark-3福建师大数信学院 张仕目 录C o n t e n t s01020304Spark简介Spark安装与应用Spark API实例Spark的运行机制Spark API实例Ø Spark的APIØ DataFrame API实例Ø RDD API实例Ø MLlib实例1 Spark的APIØ Spark是基于分布式数据集的概念的,可以包含任意的Java、Python对象。我们只需要基于这些外部数据构造数据集,然后对这些数据集进行并行操作。Spark API的基础构件是RDD API,在RDD API之上,又提供了 的

2、API 供使用,例如DataFrame API, 学习API。这些更 次的API提供了特定数据操作的方法,本部分将通过若干例子说明最简单的Spark应用,展示Spark的强大功能。2 RDD API实例 Word Count:构造 (String, Int) 数据集,并保存。Java:JavaRDD<String> textFile = sc.textFile("hdfs:/."); JavaPairRDD<String, Integer> counts = textFile.flatMap(s -> Arrays.asList(s.split

3、(" ").iterator().mapToPair(word -> new Tuple2<>(word, 1).reduceByKey(a, b) -> a + b); counts.saveAsTextFile("hdfs:/.");Scala:val textFile = sc.textFile("hdfs:/.")val counts = textFile.flatMap(line => line.split(" ").map(word => (word, 1).redu

4、ceByKey(_ + _) counts.saveAsTextFile("hdfs:/.")Python:text_file = sc.textFile("hdfs:/.")counts = text_file.flatMap(lambda line: line.split(" ") .map(lambda word: (word, 1) .reduceByKey(lambda a, b: a + b) counts.saveAsTextFile("hdfs:/.")3 DataFrame API实例textFi

5、le = sc.textFile("hdfs:/.")# Creates a DataFrame having a single column named "line"df = textFile.map(lambda r: Row(r).toDF("line")errors = df.filter(col("line").like("%ERROR%") # Counts all the errorserrors.count()# Counts errors mentioning MySQL er

6、rors.filter(col("line").like("%MySQL%").count() # Fetches the MySQL errors as an array of strings errors.filter(col("line").like("%MySQL%").collect()Ø DataFrame API实例: DataFrame是分布式数据集,其数据带列名组织。DataFrame API可以方便的支持各种关系操作。并且,基于DataFrame API的程序会自动使用Spark内置优

7、化器进行优化。Ø 实例:文本查找(Python)3 DataFrame API实例 Ø 实例:文本查找(Scala) val textFile = sc.textFile("hdfs:/.")/ Creates a DataFrame having a single column named "line"val df = textFile.toDF("line")val errors = df.filter(col("line").like("%ERROR%")/ Count

8、s all the errors errors.count()/ Counts errors mentioning MySQL errors.filter(col("line").like("%MySQL%").count()/ Fetches the MySQL errors as an array of strings errors.filter(col("line").like("%MySQL%").collect()3 DataFrame API实例 Ø 实例:文本查找(Java) / Creat

9、es a DataFrame having a single column named "line"JavaRDD<String> textFile = sc.textFile("hdfs:/."); JavaRDD<Row> rowRDD = textFile.map(RowFactory:create); List<StructField> fields = Arrays.asList( DataTypes.createStructField("line", DataTypes.StringTy

10、pe, true); StructType schema = DataTypes.createStructType(fields); DataFrame df = sqlContext.createDataFrame(rowRDD, schema);DataFrame errors = df.filter(col("line").like("%ERROR%");errors.count();/ Counts all the errors/ Counts errors mentioning MySQL errors.filter(col("lin

11、e").like("%MySQL%").count();/ Fetches the MySQL errors as an array of strings errors.filter(col("line").like("%MySQL%").collect();4 MLlib实例# Every record of this DataFrame contains the label and# features represented by a vector.df = sqlContext.createDataFrame(data

12、, "label", "features")# Set parameters for the algorithm.# Here, we limit the number of iterations to 10. lr = LogisticRegression(maxIter=10)m= lr.fit(df) # Fit the mto the data.# Given a dataset, predict each point's label, and show the results.m.transform(df).show()实例:回归(Pr

13、ediction with Logistic Regression)(Python)本实例中,利用一些带 的数据集和特征向量,利用回归算法进行数据 的预测。Machine Learning 实例:MLlib(Sparks Machine Learning (ML) library)提供了许多分布式ML算法。这些算法覆盖了特征提取、分类、回归、聚类、推荐等等方面。MLlib 也提供一些诸如ML Pipeline构造工作流、CrossValidator用于调参、模型持久性用于和装载模型。4 MLlib实例/ Every record of this DataFrame contains the l

14、abel and/ features represented by a vector.val df = sqlContext.createDataFrame(data).toDF("label", "features")/ Set parameters for the algorithm./ Here, we limit the number of iterations to 10. val lr = new LogisticRegression().setMaxIter(10) 些带的数据集 和特征向量,利用回归算法进行数据/ Fit the mto

15、the data.val m= lr.fit(df)的。/ Inspect the mval weights = m: get the feature weights.weights/ Given a dataset, predict each point's label, and show the results.m.transform(df).show()实例:回归(Prediction with Logistic Regression)(Scala)本实例中,利用一4 MLlib实例/ Every record of this DataFrame contains the lab

16、el and/ features represented by a vector.StructType schema = new StructType(new StructFieldnew StructField("label", DataTypes.DoubleType, false, Metadata.empty(), new StructField("features", new VectorUDT(), false, Metadata.empty(),);DataFrame df = jsql.createDataFrame(data, sche

17、ma);/ Set parameters for the algorithm./ Here, we limit the number of iterations to 10. LogisticRegression lr = new LogisticRegression().setMaxIter(10); 些带的数据集 和特征向量,利用回归算法进行数据LogisticRegressionMm= lr.fit(df); / Fit the mto the data.的。/ Inspect the mVector weights = m: get the feature weights.weight

18、s();/ Given a dataset, predict each point's label, and show the results.m.transform(df).show();实例:回归(Prediction with Logistic Regression)(Java)本实例中,利用一Spark安装与应用Ø Spark的安装scala程序开发环境与步骤1 Spark的安装推荐:把Spark安装于Linux系统。具体安装 Apache Spark 的步骤如下:Step 1: 验证Java是否正确安装$java -version若正确输出Java版本信息,则说明J

19、ava已经正确安装,否则需要安装Java, 并设置相关路径。Step 2: 验证Scala是否正确安装$scala -version若正确输出Scala版本信息,则说明Scala正确安装,可跳过Step3和Step4,否则需要安装Scala。Step 3:Scala最新版Scala,这里使用的是 scala-2.11.6 。1 Spark的安装Step 4: 安装 Scala解压tar文件,命令如下:$ tar xvf scala-2.11.6.tgz把解压后的文件移到/usr/local/scala(也可以是你指定的其他文件夹位置)$ su Password:# cd /home/Hadoo

20、p/Downloads/ # mv scala-2.11.6 /usr/local/scala # exit设置Scala路径:$ export PATH = $PATH:/usr/local/scala/bin验证安装是否$scala -version:正确显示Scala版本信息,则说明安装。1 Spark的安装Step 5:Apache Spark到Apache Spark官网Spark,本实例这里的是spark-1.3.1-bin-hadoop2.6Step 6: 安装Spark按解压、修改存放文件夹、设置环境变量几个步骤进行Spark的设置,参考如下:$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz$ su Password:# cd /home/Hadoop/Downloads/# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark # exit设置环境变量:打开 /.bashrc文件,并且在文件后面加上如下路径: export PATH = $PATH:/usr/local/spark/bin执行 /.bashrc file,使设置生效。$ sou


