Spark快速入门

本教程提供了如何使用 Spark 的简要介绍。首先通过运行 Spark 交互式的 shell(在 Python 或 Scala 中)来介绍 API,然后展示如何使用 Java ,Scala 和 Python 来编写应用程序。更多信息请参考 Spark 编程指南。

为了继续阅读本指南,首先从 Spark 官网 下载 Spark 的发行包。因为我们将不使用 HDFS,所以你可以下载一个任何Hadoop 版本的软件包。

使用 Spark Shell 进行交互式分析

基础

Spark shell 提供了一种来学习该 API 比较简单的方式,以及一个来分析数据交互的强大的工具。在 Scala(运行于 Java虚拟机之上,并能很好的调用已存在的 Java 类库)或者 Python 中它是可用的。通过在 Spark 目录中运行以下的命令来启动它 :

Scala

./bin/spark-shell

Spark 的主要抽象是一个称为弹性分布式数据集(RDD)的分布式的 item 集合。RDD 可以从 Hadoop 的InputFormats(例如 HDFS文件)或者通过其它 RDD 的转换来创建。让我们从源目录中的 README 文件中的文本创建一个新的 RDD : ****Scala

scala> val textFile = sc.textFile("README.md")

textFile:org.apache.spark.rdd.RDD[String] =README.md MapPartitionsRDD[1] at textFile at :25

RDD 有可以返回值的 actions(动作),还有可以返回指定的新 RDD 的 transformations(转换)。让我们启动一个新的actions(动作) : Scala

scala> textFile.count()

// 在这个 RDD 中 items 的数量

res0:Long = 126

scala> textFile.first()

// 在这个 RDD 中的第一个 item

res1:String =# Apache Spark

现在让我们使用一个 transformation(转换)。我们将使用 filter transaction(转换)来返回一个新的 RDD(文件中item 的一个子集)。

Scala

scala> val linesWithSpark =textFile.filter(line => line.contains("Spark"

))

linesWithSpark:org.apache.spark.rdd.RDD[String]

=MapPartitionsRDD[2] at filter at :27

我们可以链式操作 transformation(转换) 和 action(动作)。

Scala

scala> textFile.filter(line

=> line.contains("Spark))).count()

// How many lines contain "Spark"?

res3: Long = 15

更多 RDD 上的操作

RDD actions(操作)和 transformations(转换)可以用于更复杂的计算。例如,统计出现次数最多的单词 :

Scala

scala> textFile.map(line

=> line.split(" ").size).reduce((a, b) =>

if(a > b) a else b)

res4:Long =15

第一个 map 操作创建一个新的 RDD,将一行数据 map 为一个整型值。reduce RDD 找到最大的行计数。参数 map

与 reduce是 Python 的匿名函数(lambda表达式),但我们也可以通过我们想要的任何顶级的 Python 功能。例如,我们将定义一个 max 函数来使代码更易于理解 :

Scala

scala> import java.lang.Math

import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))

res5: Int = 15

一种常见的数据流模式是被 Hadoop 所推广的 MapReduce。Spark 可以很容易实现 MapReduce :

Scala

scala> val wordCounts = textFile.flatMap(line => line.split(" "

)).map(word => (word, 1)).reduceByKey((a, b) => a + b)

wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at :28

在这里,我们结合了 flatMap,map 和reduceByKey transformations(转换)来计算文件中每一个单词的数量作为一个(string,int)的 RDD pairs(对)。对每个单词计数。为了在我们的 shell 中统计单词出现的次数,我们可以使用 collect action(

动作):

Scala

wordCounts.collect()

[(u'and', 9), (u'A', 1), (u'webpage', 1), (u'README', 1), (u'Note', 1),

(u'"local"', 1), (u'variable', 1), ...]

缓存

Spark 还支持 Pulling(拉取)数据集到一个群集范围的内存缓存中。例如当查询一个小的 “hot” 数据集或运行一个像PageRANK 这样的迭代算法时,在数据被重复访问时是非常高效的。举一个简单的例子,让我们标记我们的 **linesWithSpark

**** **数据集到缓存中 :

Scala

scala> linesWithSpark.cache()

res7:linesWithSpark.type=MapPartitionsRDD[2] at filter at :27

scala> linesWithSpark.count()

res8:Long =19

scala> linesWithSpark.count()

res9:Long =19

使用 Spark 来探索和缓存一个 100 行的文本文件看起来比较愚蠢。有趣的是,即使在他们跨越几十或者几百个节点时,这些相同的函数也可以用于非常大的数据集。您也可以像 编程指南 中描述的一样通过连接 bin/spark-shell 到集群中,使用交互式的方式来做这件事情。

独立的应用

假设我们希望使用 Spark API 来创建一个独立的应用程序。我们在 Scala(SBT),Java(Maven)和 Python 中练习一个简单应用程序。

我们将在 Scala 中创建一个非常简单的 Spark 应用程序 - 很简单的,事实上,它名为 SimpleApp.scala :

Scala

/* SimpleApp.scala */

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

object

SimpleApp {

def main(args:Array[String]) {

val logFile = "YOUR_SPARK_HOME/README.md"

// Should be some file on your system

val conf = new SparkConf().setAppName(

"Simple Application")

val sc = new SparkContext(conf)

val logData = sc.textFile(logFile, 2).cache()

val numAs = logData.filter(line => line.contains("a")).count()

val numBs = logData.filter(line => line.contains("b")).count()

println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

}

}

注意这个应用程序我们应该定义一个 main() 方法而不是去扩展 scala.App。使用 scala.App 的子类可能不会正常运行。

该程序仅仅统计了 Spark README 文件中每一行包含 ‘a’ 的数量和包含 ‘b’ 的数量。注意,您需将 YOUR_SPARK_HOME 替换为您 Spark 安装的位置。不像先前使用 spark shell 操作的示例,它们初始化了它们自己的 SparkContext,我们初始化了一个 SparkContext 作为应用程序的一部分。

我们传递给了 SparkContext 构造器一个包含我们应用程序信息的 [SparkConf(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkConf) 对象。

我们的应用依赖了 Spark API,所以我们将包含一个名为 simple.sbt 的 sbt 配置文件,它说明了 Spark 是一个依赖。该文件也添加了一个 Spark 依赖的仓库 :

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark"%% "spark-core" % "2.0.2"

为了让 sbt 正常的运行,我们需要根据经典的目录结构来布局 **SimpleApp.scala

** 和 **simple.sbt

** 文件。在成功后,我们可以创建一个包含应用程序代码的 JAR 包,然后使用 spark-submit 脚本来运行我们的程序。

# Your directory layout should look like this

$ find.

./simple.sbt

./src

./src/main

./src/main/scala

./src/main/scala/SimpleApp.scala

# Package a jar containing your application

$ sbt package

...

[info] Packaging {..}/{..}

/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application

$ YOUR_SPARK_HOME

/bin/spark-submit \

--class "SimpleApp" \

--master local [4] \

target/scala-2.11/simple-project_2.11-1.0.jar

...

Lines with a: 46, Lines with b: 23

快读跳转

恭喜您成功的运行了您的第一个 Spark 应用程序!

更多 API 的深入概述,从Spark 编程指南 开始,或查阅 “编程指南” 菜单下的其它组件。

为了在集群上运行应用程序,前往 集群模式概述。

最后,在 examples 目录(Scala,Java,Python,R)中 Spark 包括了一些例子。您可以按照如下方式来运行它们 :

# 针对 Scala 和 Java, 使用 run-example :

./bin/run-example SparkPi

# 针对 Python 示例, 直接使用 spark-submit :

./bin/spark-submit examples /src/main/python/pi.py

# 针对 R 示例,直接使用 spark-submit :

./bin/spark-submit examples /src/main/r/dataframe.R

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐