大数据基础之SparkStreaming——SparkStreaming整合Flume
Spark Streaming继承Flume Spark Streaming通过Push和Pull两种方式对接Flume数据源。以Spark Streaming的角度来看,Push方式属于推送(由Flume向Spark推送),而Pull属于拉取(Spark拉取Flume的输出)。 不论以何种方式,开发过程类似,都是由 Spark Streaming 对接 Flume 数据流,Flume 做为
Spark Streaming继承Flume
Spark Streaming通过Push和Pull两种方式对接Flume数据源。以Spark Streaming的角度来看,Push方式属于推送(由Flume向Spark推送),而Pull属于拉取(Spark拉取Flume的输出)。
不论以何种方式,开发过程类似,都是由 Spark Streaming 对接 Flume 数据流,Flume 做为 Spark Streaming 的数据源。Push 和 Pull 两者的差别主要体现在Flume Sink 的不同,而 Flume Source 与 Channel 不会受影响。
Push方式
Push方式:Flume架构为netcat->memory->avro
1.编写Streaming数据简单处理的代码
pom文件maven依赖:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.3.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.4</version>
</dependency>
</dependencies>
简单数据处理代码:
object FlumeSparkStreamingDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("mytest")
val ssc = new StreamingContext(conf,Seconds(5))
val flumeStream = FlumeUtils.createStream(ssc,"192.168.56.101",55555)
flumeStream.map(x=>new String(x.event.getBody.array()).trim).flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
2.将以上idea项目打包上传到Linux运行,因为Flume在Linux上。
scala项目打包方式的pom配置在我之前的博客里写过,有需要的可以去看,用默认的pom文件打包不能将scala项目打包,所以必须要换pom文件里的build配置
spark项目上传到Linux上运行的命令
spark-submit \
--class beijing.FlumeSparkStreamingDemo \ //改成自己的主类
--packages org.apache.spark:spark-streaming-flume_2.11:2.3.4 \
/opt/kafkasparkstreaming-1.0-SNAPSHOT.jar //改成自己jar包所在的位置
3.Flume配置文件
a1.sources=s1
a1.sinks=k1
a1.channels=c1
a1.sources.s1.type=netcat
a1.sources.s1.bind=192.168.56.101
a1.sources.s1.port=44444
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.56.101
a1.sinks.k1.port=55555
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
4.启动程序运行
先启动spark,再启动idea项目,然后启动flume,最后启动44444端口,即可运行
Pull方式
Pull方式:Flume架构为:netcat->memory->SparkSink
1.编写Streaming数据简单处理的代码
pom文件maven依赖同上
object FlumeSparkStreamingDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("mytest")
val ssc = new StreamingContext(conf,Seconds(5))
val flumeStream = FlumeUtils.createPollingStream(ssc,"192.168.56.101",55555)
flumeStream.map(x=>new String(x.event.getBody.array()).trim).flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
2.将以上idea项目打包上传到Linux运行,因为Flume在Linux上。
scala项目打包方式的pom配置在我之前的博客里写过,有需要的可以去看,用默认的pom文件打包不能将scala项目打包,所以必须要换pom文件里的build配置
上传spark项目到Linux运行命令同上
3.Flume配置文件
a1.sources=s1
a1.sinks=k1
a1.channels=c1
a1.sources.s1.type=netcat
a1.sources.s1.bind=192.168.56.101
a1.sources.s1.port=44444
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.56.101
a1.sinks.k1.port=55555
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
4.注意:启动顺序和Push方式不同
先启动spark,再启动Flume,再运行idea项目,最后启动44444端口,即可运行
注意:pull方式读取Flume数据时需要将以下三个jar放到Flume文件夹下的lib目录中。并将原有的scala-library.jar删除。
jar包资源:
https://pan.baidu.com/s/19EMtXgf5N2QQf4lqSoNfPw
提取码:yrtx
更多推荐
所有评论(0)