大数据入门--Flume详解
Flume本身是由Cloudera公司开发的后来贡献给了Apache的一套针对日志进行收集(collecting)、汇聚(aggregating)和传输(moving)的分布式机制。图-1 Flume图标Flume本身提供了较为简易的流式结构,使得开发者能够较为简易和方便的搭建Flume的流动模型。
简介
概述
Flume本身是由Cloudera公司开发的后来贡献给了Apache的一套针对日志进行收集(collecting)、汇聚(aggregating)和传输(moving)的分布式机制。
图-1 Flume图标
Flume本身提供了较为简易的流式结构,使得开发者能够较为简易和方便的搭建Flume的流动模型。
图-2 Flume流动模型图
目前,Flume提供了两大版本:Flume0.x和Flume1.x版本:
1)Flume0.X:又称之为Flume-og版本,依赖于Zookeeper,结构配置相对复杂,现在在市面上已经不怎么常见了。
2)Flume1.X:又称之为Flume-ng版本,不依赖于Zookeeper,结构配置相对简单,是现在比较常用的版本。
截止到目前为止(2023年7月20日),Flume的最新版本是Flume1.11.0版本。
基本概念
在Flume中,有两个基本的概念:Event和Agent。
Event
在Flume中,会将收集到的每一条日志都封装成一个Event对象,所以一个Event就是一条日志。
Event本质上是一个json串,固定的包含两部分:headers和body。即Flume会将收集到的每一条日志封装成一个json,这个json就是一个Event。
{"headers":{},"body":""}
Agent
Agent是Flume流动模型的基本组成部分,固定的包含三个部分:
1)Source:从数据源采集数据(collecting);
2)Channel:将数据进行临时的存储(aggregating);
3)Sink:将数据进行传输(moving)。
流动模型/拓扑结构
Flume中,通过Agent之间相互组合,可以构成复杂的流动模型,包括:单级流动,多级流动,扇入流动,扇出流动以及复杂流动。
单级流动(Agent Flow),顾名思义,即只有1个Agent组成,数据经过Source采集,通过Sink写入目的地,经过一次流动即可。
图-3 单级流动
多级流动(Multi-agent Flow),又叫串联流动,是由2个及以上的Agent串联组成,数据需要流经多个Agent才能写到目的地。
图-4 多级流动
扇入流动(Consolidation),又叫聚集流动,是将多个Agent的结果汇聚到一个Agent中,最终写入目的地。
图-5 扇入流动
扇出流动(Multiplexing the flow),又叫复用流动,是将数据分别传输给多个Agent,写入多个目的地。
图-6 扇出流动
复杂流动(Flexible Flow),按照项目需求,将上述流动模型进行组合,构成的就是复杂流动。
编译和安装
编译
1)进入源码存放目录,上传或者下载Flume的源码包:
# 进入目录
cd /opt/presoftware/
# 官网下载地址
wget https://archive.apache.org/dist/flume/1.11.0/apache-flume-1.11.0-src.tar.gz
2)解压:
tar -xvf apache-flume-1.11.0-src.tar.gz -C /opt/source/
3)进入Flume源码目录:
cd /opt/source/apache-flume-1.11.0-src/
4)编译:
mvn -X package -Pdist,nativeN,docs -DskipTests -Dtar -Dmaven.skip.test=true -Dmaven.javadoc.skip=true -Denforcer.skip=true
5)编译好的安装包在flume-ng-dist/target目录下。
编译错误处理
如果编译过程中提示缺少了eigenbase,那么上传jar包之后,执行:
mvn install:install-file \
-Dfile=DynamoDBLocal-1.11.86.jar \
-DgroupId=com.amazonaws \
-DartifactId=DynamoDBLocal \
-Dversion=1.11.86 \
-Dpackaging=jar
缺少net.hydromatic:linq4j:jar:0.4:
mvn install:install-file \
-Dfile=linq4j-0.4.jar \
-DgroupId=net.hydromatic \
-DartifactId=linq4j \
-Dversion=0.4 \
-Dpackaging=jar
缺少net.hydromatic:quidem:jar:0.1:
mvn install:install-file \
-Dfile=quidem-0.1.1.jar \
-DgroupId=net.hydromatic \
-DartifactId=quidem \
-Dversion=0.1.1 \
-Dpackaging=jar
缺少org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde:
mvn install:install-file \
-Dfile=pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar \
-DgroupId=org.pentaho \
-DartifactId=pentaho-aggdesigner-algorithm \
-Dversion=5.1.5-jhyde \
-Dpackaging=jar
安装
Flume本身是使用Java开发的,所以要求服务器上首先安装了JDK1.8。
1)进入软件预安装目录,上传或者下载安装包:
# 进入预安装目录
cd /opt/software/
# 官网下载地址
wget --no-check-certificate https://dlcdn.apache.org/flume/1.11.0/apache-flume-1.11.0-bin.tar.gz
2)解压安装包:
tar -xvf apache-flume-1.11.0-bin.tar.gz -C /opt/software/
3)进入Flume安装目录:
cd /opt/module/
4)重命名目录:
mv apache-flume-1.11.0-bin/ flume-1.11.0
5)配置环境变量:
# 编辑文件
vim /etc/profile.d/myenv.sh
# 在文件中添加
export FLUME_HOME=/opt/module/flume-1.11.0
export PATH=$PATH:$FLUME_HOME/bin
# 保存退出,生效
source /etc/profile.d/flumehome.sh
# 测试
flume-ng version
6)解决Flume的连接池包和Hadoop的连接池包不一致问题:
# 进入Flume的lib目录
cd flume-1.11.0/lib/
# 删除原来的连接池包
rm -rf guava-11.0.2.jar
# 复制Hadoop的连接池包
cp /opt/module/hadoop-3.2.4/share/hadoop/common/lib/guava-27.0-jre.jar ./
入门案例
1)进入Flume中,创建数据目录,用于存放格式文件:
# 进入Flume的安装目录
cd /opt/module/flume-1.11.0/
# 创建数据目录
mkdir data
# 进入目录
cd data
2)创建格式文件:
vim basic.properties
添加内容:
# 指定Agent的名字
# 指定Source的名字
# 如果要配置多个Source,那么Source之间用空格间隔
a1.sources = s1
# 指定Channel的名字
# 如果要配置多个Channel,那么Channel之间用空格间隔
a1.channels = c1
# 指定Sink的名字
# 如果要配置多个Sink,那么Sink之间用空格间隔
a1.sinks = k1
# 配置Source
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 配置Channel
a1.channels.c1.type = memory
# 配置Sink
a1.sinks.k1.type = logger
# 绑定Source和Channel
a1.sources.s1.channels = c1
# 绑定Sink和Channel
a1.sinks.k1.channel = c1
3)启动Flume:
flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file basic.properties -Dflume.root.logger=INFO,console
上述命令可以简化为:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f basic.properties -Dflume.root.logger=INFO,console
4)复制窗口,在新窗口中输入:
nc hadoop101 8090
# 回车之后发送数据
5)停掉Flume之后,会发现在当前的data目录下生成了一个新文件flume.log,而我们输入的数据被封装成了Event形式写入到了这个新文件中。
控制台打印
需要注意的是,从Flume1.10.0版本开始,使用log4j2.x替换log4j1.x,使用log4j2.xml替换了log4j.properties,因此使用Logger Sink不再输出到控制台上,而是将结果输出到了flume.log文件中。如果想要将数据打印到控制台上,操作如下:
1)进入Flume的配置文件目录:
cd /opt/module/flume-1.11.0/conf/
2)编辑文件:
vim log4j2.xml
修改文件内容如下:
<Loggers>
<Logger name="org.apache.flume.lifecycle" level="info"/>
<Logger name="org.jboss" level="WARN"/>
<Logger name="org.apache.avro.ipc.netty.NettyTransceiver" level="WARN"/>
<Logger name="org.apache.hadoop" level="INFO"/>
<Logger name="org.apache.hadoop.hive" level="ERROR"/>
<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="LogFile" />
</Root>
</Loggers>
3)回到数据目录下:
cd ../data
4)重新启动Flume,发送数据,会发现数据已经打印到了控制台上。
参数解释
Flume启动过程中,需要用到多个参数,这多个参数的解释如下:
表-1 参数解释
完整写法 |
简写 |
解释 |
flume-ng |
Flume启动命令 |
|
agent |
表示启动Flume的Agent组件 |
|
--name |
-n |
指定要启动的Agent的名字,和格式文件中Agent的名字对应 |
--conf |
-c |
指定Flume的配置文件所在的位置,注意,这儿的配置文件是Flume自己的配置文件,而不是我们自己编写的格式文件/数据文件!默认情况下,Flume的配置文件在Flume安装目录的conf目录下 |
--conf-file |
-f |
指定要执行的格式文件的位置 |
-D |
指定其他运行的参数 |
|
flume.root.logger |
指定日志的打印级别和打印位置,可以指定级别包含INFO,WARN和ERROR,可以指定的打印位置包含console和logfile |
Source
AVRO Source
AVRO Source会监听指定的端口,接收其他节点传过来的被AVRO序列化之后的数据。AVRO Source结合AVRO Sink可以实现更多的流动模型/拓扑结构,例如多级流动、扇入流动、扇出流动等。
AVRO Source中配置的选项有:
表-2 配置选项
选项 |
备注 |
解释 |
type |
required |
Source的类型,此处必须是avro |
bind |
required |
要监听的服务器的主机名或者IP |
port |
required |
要监听的端口 |
channels |
required |
Source需要绑定的Channel |
案例:
1)编辑文件:
vim avrosource.properties
添加如下配置:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 配置AVRO Source
# 类型必须是avro
a1.sources.s1.type = avro
# 绑定要监听的主机
a1.sources.s1.bind = 0.0.0.0
# 绑定要监听的端口
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2)启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f avrosource.properties -Dflume.root.logger=INFO,console
3)在新窗口中,创建一个文件,添加内容。
4)启动Flume的avro-client组件,将文件进行AVRO序列化之后发送给Avro Source:
flume-ng avro-client -H hadoop101 -p 8090 -F a.txt
avro-client组件中的参数解释:
表-3 参数解释
完整写法 |
简写 |
解释 |
avro-client |
指定启动Flume的avro-client组件 |
|
--host |
-H |
指定数据要发送的主机名或者IP |
--port |
-p |
指定数据要发送的端口 |
--filename |
-F |
指定要发送的文件 |
Exec Source
Exec Source会运行指定的命令,然后收集这个命令的执行结果。可以利用这个Source来完成部分场景的监控,例如对方是否有返回,文件是否有新增等。
Exec Source中配置的选项有:
表-4 配置选项
选项 |
备注 |
解释 |
type |
required |
Source的类型,此处必须是exec |
command |
required |
要执行的命令 |
channels |
required |
Source需要绑定的Channel |
shell |
optional |
指定命令的类型 |
案例:监听指定的文件,如果文件中新添了数据,那么自动收集新添的数据。
1)编辑文件:
vim execsource.properties
添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 配置Exec Source
# 类型必须是exec
a1.sources.s1.type = exec
# 指定命令
a1.sources.s1.command = tail -F /opt/module/flume-1.11.0/data/a.txt
# 指定Shell类型,用于在 Bash shell 中执行一个命令字符串
a1.sources.s1.shell = /bin/bash -c
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2)启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f execsource.properties -Dflume.root.logger=INFO,console
3)新窗口中,向文件中追加数据:
echo 'append' >> /opt/module/flume-1.11.0/data/a.txt
echo 'exec' >> /opt/module/flume-1.11.0/data/a.txt
回到Flume界面,发现新添加的数据被Flume自动收集了。
Spooling Directory Source
Spooling Directory Source用于监听指定的目录,如果目录中有新的文件,那么会自动收集新文件中的数据。注意,被收集过的文件会自动的添加一个后缀。
Spooling Directory Source配置的选项包含:
表-5 配置选项
选项 |
备注 |
解释 |
type |
required |
Source的类型,此处必须是spooldir |
spoolDir |
required |
要监听的目录 |
channels |
required |
Source需要绑定的Channel |
fileSuffix |
optional |
被收集过的文件添加的后缀,默认是.COMPLETED |
案例:监听指定的目录,自动收集新文件的内容。
1)编辑文件:
vim spoolsource.properties
添加如下配置:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 配置Spooling Directory Source
# 类型必须是spooldir
a1.sources.s1.type = spooldir
# 绑定要监听的目录
a1.sources.s1.spoolDir = /opt/module/flumedata
# 指定收集后的文件的后缀
a1.sources.s1.fileSuffix = .testing
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2)启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f spoolsource.properties -Dflume.root.logger=INFO,console
3)编辑新文件,添加数据,然后移动到指定的目录中:
# 编辑文件
vim a.txt
# 在文件中添加数据
# 添加之后,保存退出,然后将文件挪到指定目录下
mv a.txt /opt/module/flumedata
会发现,新添加的文件中的内容被自动收集了。
Taildir Source
Taildir Source可以用于监听多个文件(一组文件)的变化,如果这一组文件中的某个文件中新添了数据,那么可以自动收集这些新添的数据。不同于之前案例的地方在于,Taildir Source可以监听多个文件中数据的新添,Exec Source是利用命令监听一个文件中数据的新添,Spooling Directory Source是监听目录下文件的新添而不是某一个文件中数据的变化!
Taildir Source支持断点续传,本身依靠索引文件来记录每一个文件上一次的读取位置。
需要注意的是,Taildir Source本身不支持Windows系统中使用。
Taildir Source配置选项包含:
表-6 配置选项
选项 |
备注 |
解释 |
type |
required |
Source的类型,此处必须是TAILDIR |
filegroups |
required |
给要监听的文件组起名,如果要指定多个文件组,那么彼此之间用空格隔开 |
filegroups.<filegroupName> |
required |
指定文件组中要监听的文件,支持正则表达式 |
channels |
required |
Source需要绑定的Channel |
positionFile |
optional |
该Source会用json形式记录每一个文件上一次的读取位置(索引文件)。如果不指定,默认是~/.flume/taildir_position.json |
案例:监听所有的log文件,如果log文件中有新的数据,则自动收集。
1)编辑文件:
vim tailsource.properties
在文件中添加如下配置:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 配置Taildir Source
# 类型必须是TAILDIR
a1.sources.s1.type = TAILDIR
# 指定文件组的名字
a1.sources.s1.filegroups = f1
# 指定文件组要监听的目录
a1.sources.s1.filegroups.f1 = /opt/module/flumedata/.*log.*
# 指定索引文件的存放位置
a1.sources.s1.positionFile = /opt/module/flumedata/taildir_position.json
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2)启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f tailsource.properties -Dflume.root.logger=INFO,console
3) 编辑新文件,添加数据,然后移动到指定的目录中:
# 编辑文件
vim a.log
# 在文件中添加数据
# 添加之后,保存退出,然后将文件挪到指定目录下
mv a.log /opt/module/flumedata
会发现,新添加的文件中的内容被自动收集了。继续测试:
# 进入flumedata目录:
cd /opt/module/flumedata
# 创建新文件
touch b.log
# 追加数据
echo 'from a' >> a.log
echo 'hi b' >> b.log
会发现,无论是新文件还是追加到文件中的数据,都会被自动收集。
NetCat Source
NetCat Source分为NetCat TCP Source和NetCat UDP Source,都是用于监听指定的主机和端口,从该端口接收TCP请求或者UDP请求的数据。
可以配置的选项:
表-7 配置选项
选项 |
备注 |
解释 |
type |
required |
Source的类型,如果监听TCP请求,则类型为netcat;如果监听UDP请求,则类型为netcatudp |
bind |
required |
要监听的主机IP或者主机名 |
port |
required |
要监听的端口 |
channels |
required |
Source需要绑定的Channel |
max-line-length |
optional |
NetCat TCP Source支持,NetCat UDP Source不支持该选项,表示每一次能接收的最大数据量,单位是字节,默认值是512 |
如果需要发送TCP请求,使用nc命令;如果需要发送UDP请求,使用nc -u命令。
Sequence Generator Source
Sequence Generator Source本质上是一个序列产生器,会从0开始依次递增。实际过程中,经常使用这个Souce来测试流动模型是否搭建成功。
可以配置的选项包含:
表-8 配置选项
选项 |
备注 |
解释 |
type |
required |
Source的类型,该选项的值必须是seq |
channels |
required |
Source需要绑定的Channel |
totalEvents |
optional |
递增的最大值。默认是Long.MAV_VALUE |
案例配置文件内容如下:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 配置Sequence Generator Source
# 类型必须是seq
a1.sources.s1.type = seq
# 指定递增的最大范围
a1.sources.s1.totalEvents = 100
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
HTTP Source
HTTP Source用于监听和接收HTTP请求,但是需要注意的是,该Source只能监听GET请求和POST请求。其中,官网指出,GET请求的监听最好只用于实验环境,所以实际生产环境中只用这个Source来监听POST请求。
可以配置的选项:
表-9 配置选项
选项 |
备注 |
解释 |
type |
required |
Source的类型,该选项的值必须是http |
port |
required |
要监听的端口 |
channels |
required |
Source需要绑定的Channel |
案例:
1)编辑文件:
vim httpsource.properties
2)配置文件内容如下:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 配置Http Source
# 类型必须是http
a1.sources.s1.type = http
# 指定要监听的端口
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
3)启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f httpsource.properties -Dflume.root.logger=INFO,console
4)发送POST请求:
curl -X POST -d '[{"headers":{"kind":"test","class":"test"},"body":"testing"}]' http://hadoop101:8090
Custom Source
概述
在Flume中,支持自定义Source。如果Flume没有提供对应的数据源,那么可以通过自定义Source来实现数据的采集过程。
Flume针对Source提供了顶级接口Source,但是实际过程中,一般不是直接实现Source接口,而是实现其子接口EventDrivenSource或者PollableSource:
1)EventDrivenSource:事件驱动Source,本身是一个被动型Source,需要自己定义线程来获取数据并封装数据。
2)PollableSource:拉取Source,本身是一个主动型Source,提供了线程来获取数据,用户只需要考虑怎么封装数据即可。
除了实现上述接口之外,一般自定义的Source还需要实现Configurable接口,覆盖这个接口中的configure方法,通过这个configure方法来获取格式文件中的属性值。
自定义EventDrivenSource
1)新建Maven项目,在POM文件中添加如下依赖:
<dependencies>
<!--Flume核心包-->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.11.0</version>
</dependency>
<!--Flume开发包-->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.11.0</version>
</dependency>
<!--Flume配置包-->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.11.0</version>
</dependency>
</dependencies>
2)定义类,继承AbstractSource,实现EventDrivenSource和Configurable接口,覆盖其中的configure、start和stop方法:
package com.flume.auth.source;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// 模拟Sequence Generator Source
public class AuthDrivenSource extends AbstractSource implements EventDrivenSource, Configurable {
// 递增的最大范围
private long max;
// 递增的起始值
private long min;
// 递增步长
private long step;
// 线程池
private ExecutorService es;
// 获取配置属性
@Override
public void configure(Context context) {
// 获取max的属性值,如果用户没有指定,那么最大递增到Long.MAX_VALUE
this.max = context.getLong("max", Long.MAX_VALUE);
// 获取min的属性值,如果用户没有指定,那么从0开始递增
this.min = context.getLong("min", 0L);
// 判断起始值是否超过了最大值
if (min > max) throw new IllegalArgumentException("起始值" + min + "超过了递增范围" + max);
// 获取step的属性值,如果没有指定,那么默认递增步长为1
this.step = context.getLong("step", 1L);
// 判断步长是否合理
if (step < 1) throw new IllegalArgumentException("递增步长" + step + "不合理!");
}
// 启动Source,完成初始化过程
@Override
public synchronized void start() {
// 获取ChannelProcessor
ChannelProcessor cp = this.getChannelProcessor();
// 构建线程池
es = Executors.newCachedThreadPool();
// 提交任务
es.submit(new Add(min, max, step, cp));
}
// 同步结束Source,用完就关掉,保证平滑关闭Executor,回收资源
@Override
public synchronized void stop() {
if (es != null) es.shutdown();
}
}
3)定义递增线程类:
package com.flume.auth.source;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Add implements Runnable {
private final long min;
private final long max;
private final long step;
private final ChannelProcessor cp;
//这个构造函数,我们可以在创建 Add 类的对象时,将 min、max、step 和 cp 的值传入,并自动初始化对象的成员变量
public Add(long min, long max, long step, ChannelProcessor cp) {
this.min = min;
this.max = max;
this.step = step;
this.cp = cp;
}
@Override
public void run() {
// 循环,从min~max
for (long i = min; i < max; i += step) {
// Flume会将收集的数据封装成Event
// 封装headers,注意键和值的类型必须是String
Map<String, String> headers = new ConcurrentHashMap<>();
headers.put("time", String.valueOf(System.currentTimeMillis()));
// 封装body
byte[] body = String.valueOf(i).getBytes();
// 构建Event对象
Event e = EventBuilder.withBody(body, headers);
// 通过ChannelProcessor,推送给Channel
cp.processEvent(e);
}
}
}
4)将定义好的Source打成jar包。
5)将jar包上传到Flume安装目录的lib目录下:
# 进入Flume安装目录的lib目录下:
cd /opt/module/flume-1.11.0/lib/
# 选择jar包,上传
rz
6)回到数据目录,编辑配置文件:
# 回到数据目录
cd ../data/
# 编辑文件
vim authdrivensource.properties
在文件中添加:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 配置自定义Source
# 类型必须是类的全路径名
a1.sources.s1.type = chapter1.AuthDrivenSource
# 指定递增起始值
a1.sources.s1.min = 0
# 指定递增的最大范围
a1.sources.s1.max = 100
# 指定递增的步长
a1.sources.s1.step = 5
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
7)启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f authdrivensource.properties -Dflume.root.logger=INFO,console
自定义PollableSource
1) 定义类,继承AbstractSource,实现PollableSource和Configurable接口,覆盖其中的configure,process,getBackOffSleepIncrement和getMaxBackOffSleepInterval方法。
package com.flume.auth.source;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.PollableSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
import java.util.Map;
public class AuthPollableSource extends AbstractSource implements PollableSource, Configurable {
// 起始值
private long min;
// 最大范围
private long max;
// 递增步长
private long step;
// 获取配置
@Override
public void configure(Context context) {
// 获取起始值,如果没有指定,那么就从0开始递增
min = context.getLong("min", 0L);
// 获取最大值,如果没有指定,那么就递增到Long.MAX_VALUE
max = context.getLong("max", Long.MAX_VALUE);
// 判断起始值是否大于最大值
if (min > max) throw new IllegalArgumentException("起始值" + min + "超过了递增范围" + max);
// 获取步长,如果没有指定,那么步长为1
step = context.getLong("step", 1L);
// 判断步长
if (step < 1) throw new IllegalArgumentException("递增步长" + step + "不合理!");
}
// 封装处理数据:因为是多线程调用process方法,所以会是死循环,而EventDrivenSource中有stop方法,所以会自动停止
@Override
public Status process() {
// 获取通道处理器
ChannelProcessor cp = this.getChannelProcessor();
// 遍历,递增
for (long i = min; i < max; i += step) {
// 封装headers
Map<String, String> headers = new HashMap<>();
headers.put("time", String.valueOf(System.currentTimeMillis()));
// 封装body
byte[] body = String.valueOf(i).getBytes();
// 封装Event
Event e = EventBuilder.withBody(body, headers);
// 交给ChannelProcessor推送给通道
cp.processEvent(e);
}
return Status.READY;
}
// PollableSource是通过PollableRunner线程池来启动线程获取数据
// 而如果没有数据获取,那么PollableSource会暂时休眠
// 该方法是用于控制休眠的时间增量
//每次重新尝试时,睡眠时间都会增加 1000 毫秒
@Override
public long getBackOffSleepIncrement() {
return 1000;
}
//重新尝试操作的睡眠时间不会超过 10000 毫秒
@Override
public long getMaxBackOffSleepInterval() {
return 10000;
}
}
2)将定义好的Source打成jar包。
3)将jar包传到Flume安装目录的lib目录下:
# 进入Flume的lib目录下
cd /opt/software/flume-1.11.0/lib/
# 选择对应的jar包上传
rz
4)回到数据目录下,编辑配置文件:
# 回到数据目录下
cd ../data/
# 编辑文件
vim authpollablesource.properties
在文件中添加:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 配置自定义Source
# 类型必须是类的全路径名
a1.sources.s1.type = com.flume.auth.source.AuthPollableSource
# 指定递增起始值
a1.sources.s1.min = 0
# 指定递增的最大范围
a1.sources.s1.max = 100
# 指定递增的步长
a1.sources.s1.step = 5
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
5)启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f authpollablesource.properties -Dflume.root.logger=INFO,console
Channel
Memory Channel
Memory Channel是将收集来的数据临时存储到内存队列中,如果不指定,那么该队列默认大小是100,即最多允许在队列中存储100条数据。如果队列被占满,那么后来的数据就会被阻塞(即Source收集到的数据就无法放入队列中),直到队列中有位置被空出。实际过程中,这个值一般会调大,一般会调节为10W~30W,如果数据量较大,那么也可以考虑调节为50W。
Memory Channel可以批量接收Source发来的数据,也可以批量的将数据发送给Sink,而默认情况下,每一批数据是100条。实际过程中,一般会将这个值调节为1000~3000,如果Channel的容量为50W,那么这个值可以考虑调节为5000。
需要注意的是,Memory Channel是将数据临时存储在内存中,所以是不可靠的,但是数据的读写速度相对较快,因此适用于要求速度但是不要求可靠性的场景。
Memory Channel可配置的选项如下:
表-10 配置选项
选项 |
备注 |
解释 |
type |
required |
Channel的类型,此处必须是memory |
capacity |
optional |
队列的容量,默认为100 |
transactionCapacity |
optional |
指定命令的类型,最好指定这个属性 |
案例:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 配置Memory Channel
# 类型必须是memory
a1.channels.c1.type = memory
# 指定队列的容量
a1.channels.c1.capacity = 10000
# 指定批容量
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
File Channel
File Channel是将数据临时存储到本地磁盘上,因此数据不会丢失,但是读写速度相对较慢,适用于要求可靠性但是不要求速度的场景。
在File Channel中,如果不指定,那么默认情况下File Channel会将数据临时存储到~/.flume/file-channel/data目录下。例如是tom用户登录,那么就存储到/home/tom/.flume/file-channel/data目录下;bob用户登录那么就存储到/home/bob/.flume/file-channel/data目录下;如果是root用户登录,那么就存储到/root/.flume/file-channel/data目录下。
为了防止File Channel占用磁盘过多,所以在默认情况下,最多允许File Channel在磁盘上存储100W条数据,或者是2146435071B(大约2GB)的数据。
File Channel可配置的选项如下:
表-11 配置选项
选项 |
备注 |
解释 |
type |
required |
Channel的类型,此处必须是file |
checkpointDir |
optional |
检查点的存储位置,默认是~/.flume/file-channel/checkpoint |
dataDirs |
optional |
临时存储数据的目录,如果指定了多个目录,目录之间用逗号间隔。默认是~/.flume/file-channel/data |
transactionCapacity |
optional |
批容量的大小,默认是10000 |
maxFileSize |
optional |
临时文件的大小,默认是2146435071,单位是字节 |
capacity |
optional |
临时文件的容量,即队列的容量,默认是1000000 |
案例:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 配置File Channel
# 类型必须是file
a1.channels.c1.type = file
# 指定临时文件的存储位置
a1.channels.c1.dataDirs = /opt/module/flumedata/data
# 指定检查点文件的存储位置
a1.channels.c1.checkpointDir = /opt/module/flumedata/checkpoint
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
其他Channel
JDBC Channel
JDBC Channel是将收集到的数据临时存储到数据库中,因此理论上而言,JDBC Channel的效率略高于File channel但是低于Memory Channel。
到目前为止,JDBC Channel只支持Derby数据库,而基于Derby数据库的特性(微型数据库,最多能存储几百条数据;单连接,只允许一个用户连接操作),所以实际过程中很少使用这个数据库,因此实际生产过程中,几乎弃用JDBC Channel。
Spillable Memory Channel
Spillable Memory Channel可以看作是对Memory Channel的一个扩展。在接收到数据的时候,Spillable Memory Channel会先试图将数据临时存储到内存队列中,不同于Memory Channel的地方在于,如果内存队列被占满,那么Spillable Memory Channel不会阻塞,而是会将数据临时存储到本地磁盘上。所以,Spillable Memory Channel可以看作是Memory Channel和File Channel的结合体。
但是到目前为止,Spillable Memory Channel一直处于试验阶段,不推荐在生产环境中使用。
Sink
Logger Sink
Logger Sink是将收集到的数据以日志形式打印。需要注意的是,从Flume1.10.0版本开始,使用log4j2.x替换log4j1.x,使用log4j2.xml替换了log4j.properties,因此使用Logger Sink不再输出到控制台上,而是将结果输出到了flume.log文件中。
在打印的时候,为了防止过多的数据占满屏幕,所以要求数据body的部分不能超过16个字节,超过16个字节的部分不打印。另外,Logger Sink对中文打印支持的不好。
Logger Sink可以配置的选项有:
表-12 配置选项
选项 |
备注 |
解释 |
type |
required |
Sink的类型,此处必须是logger |
channel |
required |
Sink要绑定的Channel |
maxBytesToLog |
optional |
body部分打印的字节个数,默认是16,单位是字节 |
案例:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 配置Logger Sink
# 类型必须是logger
a1.sinks.k1.type = logger
# 指定可打印的字节个数
a1.sinks.k1.maxBytesToLog = 20
# 绑定Source和Channel
a1.sources.s1.channels = c1
# 绑定Sink和Channel
a1.sinks.k1.channel = c1
HDFS Sink
HDFS Sink,顾名思义,是将收集到的数据写到HDFS中。由于HDFS是以文件形式来存储数据,所以在往HDFS上写的时候,需要指定文件类型。HDFS Sink支持三种文件类型:DataStream(文本类型),SequenceFile(序列类型)以及CompressedStream(压缩类型)。如果不指定,那么HDFS Sink默认使用的是序列类型。
同样,HDFS Sink在写数据的时候,数据对应的存储文件会定时的滚动。如果不指定,那么默认是每隔30s滚动一次,生成一个小文件,所以此时会生成大量的小文件,因此在实际过程中一般需要调节这个属性。
HDFS Sink可以配置的选项有:
表-13 配置选项
选项 |
备注 |
解释 |
type |
required |
Sink的类型,此处必须是hdfs |
hdfs.path |
required |
文件在HDFS上的存储位置 |
channel |
required |
Sink要绑定的Channel |
hdfs.filePrefix |
optional |
文件的前缀,默认是FlumeData |
hdfs.fileSuffix |
optional |
文件的后缀,默认文件没有后缀 |
hdfs.rollInterval |
optional |
文件的滚动间隔时间,默认是30,单位是秒。如果指定为0,则表示不滚动 |
hdfs.rollSize |
optional |
文件的滚动大小,默认是1024,单位是字节。如果指定为0,则表示不滚动 |
hdfs.rollCount |
optional |
文件的滚动条数,默认是10。如果指定为0,则表示不滚动 |
hdfs.fileType |
optional |
文件在HDFS上的存储类型,支持DataStream,SequenceFile以及CompressedStream三种类型。默认是SequenceFile类型 |
hdfs.codeC |
optional |
如果指定了文件类型为CompressedStream,那么需要指定压缩编码。支持gzip,bzip2,lzo,lzop和snappy共五种编码格式 |
案例:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 类型必须是spooldir
a1.sources.s1.type = spooldir
# 绑定要监听的目录
a1.sources.s1.spoolDir = /opt/module/flumedata
# 指定收集后的文件的后缀
a1.sources.s1.fileSuffix = .testing
a1.channels.c1.type = memory
# 配置HDFS Sink
# 类型必须是hdfs
a1.sinks.k1.type = hdfs
# 指定文件在HDFS上的存储位置
a1.sinks.k1.hdfs.path = hdfs://hadoop101:8020/flumedata/
# 指定文件前缀
a1.sinks.k1.hdfs.filePrefix = test
# 指定文件后缀
a1.sinks.k1.hdfs.fileSuffix = .txt
# 指定文件滚动间隔时间
a1.sinks.k1.hdfs.rollInterval = 3600
# 指定文件滚动大小
a1.sinks.k1.hdfs.rollSize = 134217728
# 指定文件滚动条数
a1.sinks.k1.hdfs.rollCount = 1000000
# 指定文件类型
a1.sinks.k1.hdfs.fileType = DataStream
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
启动Flume:
flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file hdfs.properties -Dflume.root.logger=INFO,console
File Roll Sink
File Roll Sink是将收集到的数据写到磁盘的指定位置上。同HDFS Sink一样,File Roll Sink在往磁盘上写的时候,也有一个滚动时间,默认同样是30s间隔,因此在磁盘上同样会形成大量的小文件。
File Roll Sink可以配置的选项有:
表-14 配置选项
选项 |
备注 |
解释 |
type |
required |
Sink的类型,此处必须是file_roll |
sink.directory |
required |
文件在磁盘上的存储位置 |
channel |
required |
Sink要绑定的Channel |
sink.rollInterval |
optional |
文件的滚动间隔时间。默认是30,单位是秒 |
案例:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 配置File Roll Sink
# 类型必须是file_roll
a1.sinks.k1.type = file_roll
# 指定文件在磁盘上的存储位置
a1.sinks.k1.sink.directory = /opt/flumedata
# 指定文件的滚动间隔时间
a1.sinks.k1.sink.rollInterval = 3600
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
AVRO Sink
概述
AVRO Sink会将数据利用AVRO序列化之后再写出到指定节点的指定端口上,结合之前学习的AVRO Source可以是现实数据的多级、扇入、扇出等流动效果。
AVRO Sink可以配置的选项有:
表-15 配置选项
选项 |
备注 |
解释 |
type |
required |
Sink的类型,此处必须是avro |
hostname |
required |
要发送的主机 |
port |
required |
要发送的端口 |
多级流动
第一个节点的格式文件内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 类型是avro
a1.sinks.k1.type = avro
# 要发送的节点的主机名或者ip
a1.sinks.k1.hostname = hadoop102
# 要发送的端口
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
第二个节点的格式文件内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 配置为AVRO Source
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 类型是avro
a1.sinks.k1.type = avro
# 要发送的节点的主机名或者ip
a1.sinks.k1.hostname = hadoop103
# 要发送的端口
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
第三个节点的格式文件内容如下:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 配置为AVRO Source
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 类型是logger
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
注意,启动的时候,是先启动接收节点,在启动发送节点,所以启动顺序是:hadoop103,hadoop102,hadoop101。
启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f multilevel.properties -Dflume.root.logger=INFO,console
通过在hadoop101新启动窗口输入数据:
nc hadoop101 8090
扇入流动
第一个和第二个节点的格式文件内容如下:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
第三个节点的格式文件内容如下:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f fanin.properties -Dflume.root.logger=INFO,console
通过在hadoop101、hadoop102新启动窗口输入数据:
nc hadoop101 8090
扇出流动
第一个节点的格式文件内容如下:
a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 8090
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 8090
a1.sources.s1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
第二个和第三个节点的格式文件内容如下:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f fanout.properties -Dflume.root.logger=INFO,console
通过在hadoop101新启动窗口输入数据:
nc hadoop101 8090
Custom Sink
如果写入的目的地在Flume中找不到对应的Sink,那么Flume同样支持自定义Sink。在Flume中,Sink的顶级接口就是Sink,而考虑到还需要获取配置,所以一般还需要实现Configurable接口。
1)定义类,继承AbstractSink,实现Sink接口和Configurable接口,覆盖其中的configure,start,process和stop方法。
package com.flume.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.util.Map;
public class AuthSink extends AbstractSink implements Sink, Configurable {
// 文件的存储路径
private String path;
// 输出流
private PrintStream out;
@Override
public void configure(Context context) {
// 获取指定路径,如果没有指定,那么默认放在/flume_data目录下
path = context.getString("path", "/flume_data");
// 判断路径是否合法
if (path.equals("")) throw new IllegalArgumentException("路径不合法!");
else if (!path.startsWith("/")) path = "/" + path;
}
// 获取输出流,用于写入数据
@Override
public synchronized void start() {
// 创建输入流
try {
out = new PrintStream(path + "/" + System.currentTimeMillis());
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
@Override
public Status process() {
// 获取Channel对象
Channel c = this.getChannel();
// 获取事务
Transaction t = c.getTransaction();
// 开启事务
t.begin();
// 获取数据
Event e;
try {
while ((e = c.take()) != null) {
// 写出headers部分
out.println("headers:");
Map<String, String> headers = e.getHeaders();
for (Map.Entry<String, String> entry : headers.entrySet()) {
out.println("\t" + entry.getKey() + "=" + entry.getValue());
}
// 写出body部分
out.println("body:");
out.println("\t" + new String(e.getBody()));
}
// 提交事务
t.commit();
// 返回状态
return Status.READY;
} catch (Exception exception) {
// 事务回滚
t.rollback();
return Status.BACKOFF;
} finally {
// 无论如何,都要关闭事务
t.close();
}
}
// 关流
@Override
public synchronized void stop() {
if (out != null) out.close();
}
}
2)将定义好的Sink打成jar包。
3)将jar包上传到Flume安装目录的lib目录下:
# 进入Flume安装目录的lib目录下
cd /opt/module/flume-1.11.0/lib/
# 选择jar包,上传
rz
4)回到数据目录下,编写格式文件:
# 回到数据目录
cd ../data
# 编辑文件
vim authsink.properties
在文件中添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = http
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 配置自定义Sink
# 类型必须是类的全路径名
a1.sinks.k1.type = chapter1.AuthSink
# 指定存储路径
a1.sinks.k1.path = /opt/module/flumedata
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
5)启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f authsink.properties -Dflume.root.logger=INFO,console
6)发送HTTP请求:
curl -X POST -d '[{"headers":{"class":"big data","kind":"test"},"body":"hello custom source"}]' http://hadoop101:8090
Channel Selector
概述
Channel Selector本身是Source的子组件,决定了将数据的分发方式以及数据分发给哪一个Channel。
Channel Selector提供了三种模式:
1)replicating:复制模式。节点收入数据之后,会将数据复制之后发送给每一个节点,所以此时每一个节点收到的数据都是完全相同的。
2)multiplexing:路由/多路复用模式。节点收入数据之后,会根据数据中headers中的指定字段的值来决定将数据分发给哪一个Channel,所以此时各个节点的数据是不同的。
3)load_balancing:负载均衡模式。这是Flume1.10.0版本开始提供的一种新的分发模式。提供了两种均衡方式:round_robin(轮询)模式和random(随机)模式。默认是round_robin模式。需要注意的是,这种模式并不好用!
在Channel Selector中,如果不指定,那么默认使用的是replicating模式。
multiplexing模式
multiplexing模式本身是基于扇出结构来实现,根据数据headers中指定字段的指定值来决定将数据分配给哪一个Channel,从而实现数据的分类/分组。
multiplexing模式需要配置的选项:
表-16 配置选项
选项 |
备注 |
解释 |
selector.type |
required |
Selector的类型,此处必须是multiplexing |
selector.header |
required |
要监听的字段 |
selector.mapping.* |
required |
字段对应的值 |
selector.default |
optional |
如果没有对应的值,那么要分发到哪一个channel上 |
案例:
a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.s1.type = http
a1.sources.s1.port = 8090
# Selector的类型必须是multiplexing
a1.sources.s1.selector.type = multiplexing
# 要监听的字段
a1.sources.s1.selector.header = kind
# 字段的值
a1.sources.s1.selector.mapping.video = c1
a1.sources.s1.selector.mapping.music = c2
# 如果值不对应,那么数据分发给哪一个通道
a1.sources.s1.selector.default = c2
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 8090
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 8090
a1.sources.s1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
load_balancing模式
load_balancing模式是Flume1.10.0开始提供的一种新模式,能够实现数据在Channel之间的负载均衡。目前官网提供了两种均衡模式:round_robin(轮询)和random(随机)模式。需要注意的是,这种模式并不好用!(官网提供了这种模式,但是没有在lib包中集成这种方式!!!)
load_balancing配置的选项包含:
表-17 配置选项
选项 |
备注 |
解释 |
selector.type |
required |
Selector的类型,此处必须是load_balancing |
selector.policy |
optional |
均衡策略,默认是round_robin,支持round_robin和random两种模式 |
案例:
a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# Selector的类型必须是load_balancing
a1.sources.s1.selector.type = load_balancing
# 指定均衡策略
a1.sources.s1.selector.policy = round_robin
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 8090
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 8090
a1.sources.s1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
Sink Processor
概述
Sink Processor是将一个或者多个Sink绑定到一个组中,针对一个组中的Sink来进行整体的操作。目前官网支持三种模式:
1)default:默认模式。该模式下,一个Sink就是一个Sink组,有几个Sink就对应了几个Sink组。
2)failover:崩溃恢复模式。该模式下,要求将多个Sink绑定到一个组中,给这个组中的每一个Sink指定优先级,数据优先发送给高优先级的Sink;如果高优先级的Sink宕机,那么才会发送给低优先级的Sink。
# 定义了一个名为g1的sink group
a1.sinkgroups = g1
# sinkgroup g1 中有两个sinks,分别为 k1 和 k2
a1.sinkgroups.g1.sinks = k1 k2
# 使用了failover类型的processor
a1.sinkgroups.g1.processor.type = failover
# 配置定义了两个sink的优先级。
# k1和k2是唯一的标识符,可以根据实际情况进行命名。在failover模式下,优先级较高的sink会被优先选择。
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
# 定义了在切换sink时的最大延迟。如果一个sink在切换时超过了这个延迟,将被视为不可用,并尝试切换到下一个可用的sink。
a1.sinkgroups.g1.processor.maxpenalty = 10000
3)load_balancing:负载均衡模式。该模式同样要求将多个Sink绑定到一个组中,数据在组中进行均衡。Flume提供了两种均衡方式:round_robin和random。同样需要注意的是,Flume原生提供的load_balancing模式同样不好用!
failover模式
failover(崩溃恢复)模式,在使用的时候要求将多个Sink绑定到一个组中,然后给每一个Sink指定优先级,数据优先发送给高优先级的Sink;如果高优先级的Sink宕机,那么才会将数据发送给低优先级的Sink。如果同一个组中的Sink没有指定优先级,那么就按照绑定顺序发送,即数据会优先发送给最先绑定的Sink。
failover模式可以配置的选项有:
表-18 配置选项
选项 |
备注 |
解释 |
sinks |
required |
组中要绑定的Sink,Sink之间用空格间隔 |
processor.type |
required |
Processor的类型,此处必须是failover |
processor.priority.<sinkName> |
required |
Sink的优先级 |
processor.maxpenalty |
optional |
崩溃恢复的时间,默认是30000,单位是毫秒 |
案例:
a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2
# 给Sinkgroup起名
a1.sinkgroups = g1
# 绑定Sink
a1.sinkgroups.g1.sinks = k1 k2
# 指定Sinkgroup的类型
a1.sinkgroups.g1.processor.type = failover
# 给每一个Sink指定优先级
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 2
# 定义了在切换sink时的最大延迟。如果一个sink在切换时超过了这个延迟,将被视为不可用,并尝试切换到下一个可用的sink
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 8090
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 8090
a1.sources.s1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
load_balancing模式
load_balancing(负载均衡)模式,该模式同样要求将多个Sink绑定到一个组中,数据在组中进行均衡。Flume提供了两种均衡方式:round_robin和random。同样需要注意的是,Flume原生提供的load_balancing模式同样不好用!
load_balancing模式可以配置的选项有:
表-19 配置选项
选项 |
备注 |
解释 |
sinks |
required |
组中要绑定的Sink,Sink之间用空格间隔 |
processor.type |
required |
Processor的类型,此处必须是load_balance |
processor.selector |
optional |
负载均衡模式,支持round_robin和random,默认是round_robin |
案例:
a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2
# 给Sinkgroup起名
a1.sinkgroups = g1
# 绑定Sink
a1.sinkgroups.g1.sinks = k1 k2
# 指定Sinkgroup的类型
a1.sinkgroups.g1.processor.type = load_balance
# 指定均衡类型
a1.sinkgroups.g1.processor.selector = round_robin
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 8090
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 8090
a1.sources.s1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
Interceptor
概述
Interceptor(拦截器)本身是Source的子组件之一,可以对数据进行拦截、过滤、替换等操作。不同于Selector的地方在于,一个Source上可以配置多个拦截器,构成拦截器链。需要注意的是,后一个拦截器不能和前一个拦截器相悖!
Timestamp Interceptor
时间戳拦截器,本质上不是拦截数据,而是在数据的headers字段中添加一个时间戳,可以用于标记数据被收集的时间。
Timestamp Interceptor可以配置的选项有:
表-20 配置选项
选项 |
备注 |
解释 |
type |
required |
拦截器类型,此处必须是timestamp |
headerName |
optional |
在headers中添加的字段值,默认是timestamp |
preserveExisting |
optional |
如果时间戳已经存在,是否替换。默认是false |
案例:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 给Interceptor起名
# 如果需要配置多个拦截器,那么拦截器之间用空格隔开
a1.sources.s1.interceptors = i1
# 配置Timestamp Interceptor
a1.sources.s1.interceptors.i1.type = timestamp
# 指定headers字段的名称
a1.sources.s1.interceptors.i1.headerName = time
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
结合之前学习的HDFS Sink,可以实现数据的按小时/天/月/年存放:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = timestamp
a1.channels.c1.type = memory
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop101:8020/flumedata/reportDate=%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = test
a1.sinks.k1.hdfs.fileSuffix = .txt
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 1000000
a1.sinks.k1.fileType = DataStream
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
符号解析如下:
表-21 符号解析
符号 |
解释 |
%t |
毫秒 |
%a |
星期简写,例如Mon,Tue等 |
%A |
星期全称,例如Monday,Tuesday等 |
%b |
月份简称,例如Jan,Feb等 |
%B |
月份全称,例如January,February等 |
%c |
当前日期和时间,例如Mon Jul 24 17:02:05 2023 |
%d |
月份,例如01,02,03等 |
%e |
无填充的月份,例如1,2,3等 |
%D |
日期,等价于%m/%d/%y |
%H |
小时,00~23 |
%I |
小时,0~12 |
%j |
一年中的第几天,1~366 |
%k |
无填充的小时,0~23 |
%m |
月份,01~12 |
%n |
无填充的月份,1~12 |
%M |
分钟,00~59 |
%p |
am或者pm |
%s |
从1970-01-01 00:00:00到现在的秒数 |
%S |
秒,00~59 |
%y |
两位数的年,00~99 |
%Y |
四位数的年,例如2023 |
Host Interceptor
主机拦截器,本质上不是拦截数据,而是在数据的headers中添加一个host字段,可以用于标记数据来源(被收集)的主机。
Host Interceptor可以配置的选项有:
表-22 配置选项
选项 |
备注 |
解释 |
type |
required |
拦截器类型,此处必须是host |
preserveExisting |
optional |
如果host字段已经存在,是否替换。默认是false |
useIP |
optional |
是否使用IP。如果为true,则显示为IP;如果为false,则显示为主机名。默认为true |
hostHeader |
optional |
显示的字段,默认是host |
案例:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.sources.s1.interceptors = i1
# 配置Host Interceptor
a1.sources.s1.interceptors.i1.type = host
# 是否使用IP
a1.sources.s1.interceptors.i1.useIP = false
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
Static Interceptor
静态拦截器,本质上不是拦截器,而是在数据的headers中添加一个指定的字段。
Static Interceptor可以配置的选项有:
表-23 配置选项
选项 |
备注 |
解释 |
type |
required |
拦截器类型,此处必须是static |
preserveExisting |
optional |
如果指定字段已经存在,是否替换。默认是false |
key |
optional |
指定的键。默认是key |
value |
optional |
指定的值,默认是value |
案例:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.sources.s1.interceptors = i1
# 配置Static Interceptor
a1.sources.s1.interceptors.i1.type = static
# 指定键
a1.sources.s1.interceptors.i1.key = kind
# 指定值
a1.sources.s1.interceptors.i1.value = test
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
UUID Interceptor
UUID拦截器,本质上也不是一个拦截器,而是在数据的headers中添加一个id字段,可以用于标记数据的唯一性。
UUID Interceptor可以配置的选项包含:
表-24 配置选项
选项 |
备注 |
解释 |
type |
required |
拦截器类型,此处必须是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
headerName |
optional |
headers中添加的字段名,默认是id |
preserveExisting |
optional |
如果headers中已经存在id字段,是否替换。默认是true |
prefix |
optional |
在生成的id之前添加前缀 |
案例:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.sources.s1.interceptors = i1
# 配置UUID Interceptor
a1.sources.s1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
Search And Replace Interceptor
搜索和替换拦截器,在使用的时候需要指定正则表达式,会根据正则表达式指定的规则,对Event中body部分的数据进行替换。注意,只替换body部分的数据,而不会影响headers中的数据。正则表达式的使用规则和Java中正则表达式的规则是一致的。
Search And Replace Interceptor中可以配置的选项包含:
表-25 配置选项
选项 |
备注 |
解释 |
type |
required |
拦截器类型,此处必须是search_replace |
searchPattern |
optional |
搜索的正则表达式形式 |
replaceString |
optional |
替换的形式 |
charset |
optional |
body部分的字符集编码,默认是UTF-8 |
案例:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.sources.s1.interceptors = i1
# 配置Search And Replace Interceptor
a1.sources.s1.interceptors.i1.type = search_replace
# 指定正则表达式
a1.sources.s1.interceptors.i1.searchPattern = [a-z]
# 指定替换后的形式
a1.sources.s1.interceptors.i1.replaceString = *
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
Regex Filtering Interceptor
正则过滤拦截器,在使用的时候需要指定一个正则表达式,然后根据属性excludeEvents的值来确定过滤方式。如果excludeEvents的值为true,则表示过滤掉符合正则表达式形式的数据,其他数据留下来;如果excludeEvents的值为false,则表示过滤掉不符合正则表达式形式的数据,符合形式的数据留下来。默认情况下,excludeEvents的值为false。
Regex Filtering Interceptor中可以配置的选项包含:
表-26 配置选项
选项 |
备注 |
解释 |
type |
required |
拦截器类型,此处必须是regex_filter |
regex |
optional |
正则表达式 |
excludeEvents |
optional |
替换规则,默认为false |
案例:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.sources.s1.interceptors = i1
# 配置Regex Filtering Interceptor
a1.sources.s1.interceptors.i1.type = regex_filter
# 指定正则表达式
a1.sources.s1.interceptors.i1.regex = .*[0-9].*
# 指定替换规则
a1.sources.s1.interceptors.i1.excludeEvents = true
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
Custom Interceptor
在实际开发过程中,如果Flume提供的拦截器无法满足业务需求,那么Flume同样支持自定义拦截器。但是不同于其他组件的地方在于,Flume中的拦截器在启动的时候,是通过内部类Builder来启动,所以拦截器中需要覆盖其内部类。
步骤:
1)定义类实现Interceptor接口,覆盖其中的intercept方法,其他方法可以忽略掉;同时需要定义内部类实现Interceptor.Builder接口,覆盖其中的build方法;如果需要获取配置,那么可以配置configure方法。
package com.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class AuthInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 获取headers部分
Map<String, String> headers = event.getHeaders();
// 判断headers是否包含time或者timestamp字段
if (headers.containsKey("time") || headers.containsKey("timestamp")) return event;
// 如果没有,则添加当前的时间戳
headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
// 定义新的集合
List<Event> es = new ArrayList<>();
// 遍历
for (Event event : events) {
es.add(intercept(event));
}
return es;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
//用于构建并返回一个实现 Interceptor 接口的对象
@Override
public Interceptor build() {
return new AuthInterceptor();
}
//用于配置拦截器的属性和行为
@Override
public void configure(Context context) {
}
}
}
2)将定义好的Interceptor打成jar包。
3)将jar包上传到Flume安装目录的lib目录下:
# 进入Flume的lib目录
cd /opt/software/flume-1.11.0/lib/
# 选择jar包,上传
rz
4)回到数据目录,编辑文件:
# 回到数据目录
cd ../data
# 编辑文件
vim authin.properties
在文件中添加:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.sources.s1.interceptors = i1
# 配置Custom Interceptor
a1.sources.s1.interceptors.i1.type = com.flume.interceptor.AuthInterceptor$Builder
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
5)启动Flume:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f authin.properties -Dflume.root.logger=INFO,console
其他
事务
在Flume中,也存在事务的问题:
图-7 Flume事务
流程如下:
1)Source进行doPut将数据写到临时的缓冲区PutList中;
2)PutList会推送数据给Channel,如果Channel中有足够的位置,则数据推送成功(doCommit),如果Channel中没有位置,则推送失败,进行回滚(doRollback);
3)Channel进行doTake操作将数据写到临时缓冲区TakeList中;
4)将TakeList中的数据通过Sink批量写往目的地;
5)如果写成功了,则执行doCommit操作;如果写失败了,则执行doRollback操作。
执行流程
Flume执行流程如下图所示:
图-8 Flume执行流程
流程如下:
1)Source会先采集数据,然后将数据发送给ChannelProcessor(用于处理数据事件并将其传递到通道(Channel)中)进行处理;
2)ChannelProcessor收到数据处理之后,会将数据交给Interceptor来处理,注意,在Flume允许存在多个Interceptor来构成拦截器链;
3)Interceptor处理完成之后,会交给Channel Selector处理,Selector存在三种模式:replicating、multiplexing和load_balancing。Selector收到数据之后会根据对应的模式将数据交给对应的Channel来处理;
4)Channel处理之后会交给SinkProcessor。SinkProcessor本质上是一个Sinkgroup,包含了三种方式:default,failover和load_balance。SinkProcessor收到数据之后会根据对应的方式将数据交给Sink来处理;
5)Sink收到数据之后,会将数据写到指定的目的地。
更多推荐
所有评论(0)