项目场景:

大数据省赛题实时数据采集

从端口号25001的socket数据生成器中采集至kafka的topic中(allstart.sh脚本开启下)


步骤实现:

   基于 Linux 命令

终端中在kafka的topic中创建一个新的topic 名为order

$ cd /opt/module/kafka/bin

$ kafka-topic.sh  --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic order

进入Flume的配置文件,创建新的properties后缀的配置文件

$ cd /opt/module/flume/conf       进入flume配置文件

$ vim task.properties    创建一个名为task的配置文件

接着按i编写flume的配置:

  实现 从 端口25001的 数据生成器 采集数据

 a1.sources=r1

 a1.channels=c1

 a1.sinks=k1

 

# 配置数据源

 a1.sources.r1.type=netcat

 a1.sources.r1.port=25001

 a1.sources.r1.bind=localhost

 a1.sources.r1.channels=c1 

 a1.channels.c1.type=memory

 a1.channels.c1.capacity=1000

 a1.channels.c1.transactionCapacity=100 

 

 # 配置kafka

 a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

 a1.sinks.k1.kafka.topic=order

#注意,下面的地址换成你自己的

 a1.sinks.k1.kafka.bootstrap.servers=master:9092

 a1.sinks.k1.kafka.producer.acks=1

 a1.sinks.k1.kafka.producer.linger.ms=1

 a1.sinks.k1.kafka.flumeBatchSize=20

 a1.sinks.k1.channel=c1

在Flume的conf目录下接着输入启动flume的命令

 $ ./flume-ng agent  -c conf  -f task.properites -n a1 

授权数据生成器,启动数据生成器程序,将生成的订单数据发往Socket 25001端口

$ chmod 777 datamake.sh

$ ./datamake.sh

于kafka中查看是否传输到数据

$ cd /opt/module/kafka/bin

$ kafka-console-consumer.sh --bootstrap.server master:9092 --topic order


Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐