视频地址:https://www.bilibili.com/video/BV11A411L7CK?p=7

scala版本:https://www.scala-lang.org/download/all.html

需要去老师的文件中找到这个并配置下地址设置环境变量。

截图:

下载插件:https://blog.csdn.net/weixin_41122339/article/details/81141913

---01-06---

开始写代码了:

package com.atguigu.bigdata.spark.core.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_WordCount {

    def main(args: Array[String]): Unit = {

        // Application
        // Spark框架
        // TODO 建立和Spark框架的连接
        // JDBC : Connection
        val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparConf)

        // TODO 执行业务操作

        // 1. 读取文件,获取一行一行的数据
        //    hello world
        val lines: RDD[String] = sc.textFile("datas")

        // 2. 将一行数据进行拆分,形成一个一个的单词(分词)
        //    扁平化:将整体拆分成个体的操作
        //   "hello world" => hello, world, hello, world
        val words: RDD[String] = lines.flatMap(_.split(" "))

        // 3. 将数据根据单词进行分组,便于统计
        //    (hello, hello, hello), (world, world)
        val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word=>word)

        // 4. 对分组后的数据进行转换
        //    (hello, hello, hello), (world, world)
        //    (hello, 3), (world, 2)
        val wordToCount = wordGroup.map {
            case ( word, list ) => {
                (word, list.size)
            }
        }

        // 5. 将转换结果采集到控制台打印出来
        val array: Array[(String, Int)] = wordToCount.collect()
        array.foreach(println)

        // TODO 关闭连接
        sc.stop()

    }
}

这个比较不好,无法有聚合的感觉的。

   val wordToCount = wordGroup.map {
            case ( word, list ) => {
                (word, list.size)
            }
        }

---07---

spark可以对多个文件进行聚合的。

我们再改善一下:我们事先把数量加上,根据数量完成聚合,就会好很多的。

---08---

看着spark很简单,和scala的集合是差不多的,但是实际上不是这样的。

spark提供了一个reduceByKey,会对value做reduce。

注意这个写法可以是什么?当=>的两端的x只出现一次的时候,那么就会有简写的方法。

---09---

可以看到里面有大量的日志的,我们看下需要自己优化下。

---10---

如何运行呢?我们先解压缩到spark-local的目录,现在这里实现本地的提交。

第一步:进入到bin目录:

我们写下spark的函数:

是去哪里读取的呢?

---11---

我们看下监控的页面:http://192.168.244.133:4040/jobs/

监控的界面:

我们用idea开发的话如何使用这个环境呢,就是我们不能一直用命令行吧?

---

这个jar包告诉我们上面哪个类所在的位置。

执行这个指令。

---12---

独立的集群模式安装,先不看。

---13-16---

yarn模式:

第一步:解压上传文件,机器192.168.244.133

第三步:修改hadoop的配置文件

修改yarn-site.xml

<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认
是 true -->
<property>
 <name>yarn.nodemanager.pmem-check-enabled</name>
 <value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认
是 true -->
<property>
 <name>yarn.nodemanager.vmem-check-enabled</name>
 <value>false</value>
</property>

第四步:启动hadoop集群,我的集群是伪分布式的,启动的详情:https://blog.csdn.net/qq_28764557/article/details/109913185

启动语句:

start-dfs.sh
start-yarn.sh

第三步:修改spark-env.sh

export JAVA_HOME=/usr/local/apps/jdk8
YARN_CONF_DIR=/usr/local/apps/hadoop/etc/hadoop

第五步:直接提交应用,注意变化这个master编程yarn了。

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

看下监控页面:

192.168.244.133:8088

第六步:修改历史服务器:

spark.eventLog.enabled true
spark.eventLog.dir hdfs://192.168.244.133:9008/directory

这个需要在hdfs创建的。

第七步:修改spark-env.sh

注意这个实际上应该是这个配置文件的。

再次修改:

export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://192.168.244.133:9008/directory
-Dspark.history.retainedApplications=30"

第八步:修该 spark-defaults.conf

spark.yarn.historyServer.address=192.168.244.133:18080
spark.history.ui.port=18080

第九步:启动

需要改下:http://192.168.244.133:18080/history/application_1615825025568_0008/jobs/

---17---

---18---

运行架构:

ApplicationMaster是做什么的:https://blog.csdn.net/qq_44594249/article/details/93972793

计算相关的组件是Executor和Driver

资源相关的组件是Master和worker

如何解耦就是ApplicationMaster

Master和worker 其中Master相当于我们的yarn的Resourcemanager worker相当于NodeManager

计算和资源解耦用Applicationmater

有向无环图。

---19-20-21---

saprk的三大数据结构:

小的分布式的计算功能:

我们计算:在driver端把逻辑封装好传给executor,这里是把数据也传进去了。

---22-23-24---

RDD就是数据和逻辑。

如何提高计算性能呢?

RDD,最小的计算单元:

RDD斯和最小的计算单元,要是有新的逻辑只要关联下就完毕了。

---25---

关于缓冲区:

这个就类似于批处理

---26---

我们自己改善一下:

我们再改进下:

三个字节转换为一个字符。

---27---

java的装饰着设计模式:https://blog.csdn.net/jason0539/article/details/22713711

我们回头看下这个RDD:

RDD的包装:

RDD执行:

之前的操作都是功能的扩展。

数据是原封不动得往下流转得RDD是不保存数据得。

---28---

我们深入理解下RDD是什么:

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据
处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行
计算的集合。

RDD是如何分解的呢?

分区的目的就是形成独立的task并行执行。

分区,读取完的数据放在指定分区中,互相不受影响。

---29---

数据是分布式文件系统的文件,没有网络io是最块的。

executor和文件在一个节点上。

RDD的五大分区属性:

1.分区列表,分区的数据是不互相关联的。

2.分区的计算函数,数据是不一样的,但是计算的逻辑是一样的。

3.RDD之间的依赖关系,互相之间包装和依赖。

4.分区器,数据是如何放在不同的分区的。

5.计算分区的首选的位置。

---30---

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐