利用Kafka作为中间件,通过spring boot项目开放一个接口传数据,通过生产者发送,消费者来接收数据后按照小时划分存为文件,后load到hive仓库

springboot开发项目 引入Kafka依赖

    <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.0.0</version>
            </dependency>
            <!--kafka的⽇志组件依赖包 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.25</version>
            </dependency>

Java代码

创建生产者,发送接收到的数据

   @RequestMapping("/send")
    @ResponseBody
    public void sendata(String data) {
            Properties prop = new Properties();
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MyKafkaConfig.BROKER_LIST);
            prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            // 2. 创建生产者
            KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
            // 3. 发送数据
            producer.send(new ProducerRecord<>(
                    MyKafkaConfig.TOPIC,
                    data
            ));
        
            // 4. 销毁
            producer.close();
    }

shell向接口发送数据脚本

#! /bin/bash
IFS=$'\n\n'

#只取前5000行测试使用
for i in `tail -5000 data.txt`;
do
    echo $i  
  curl -X POST 'http://127.0.0.1:8077/send' -d "data=$i"

done
~          

启动消费者并把接收到的数据重定向到对应时间文件中

一开始直接把时间切分剩小时,结果报错 ambiguous redirect,又切分一下就好了。。。

原因百度应该是有什么特殊符号问题可能是命令行换行符,,但是我截取出来只有数字。。可能是shell哪里的问题吧。

后来查到也可以这样截取需要的内容:

  1. url="c.biancheng.net"
  2. echo ${url: 2: 9}
  3. 第一个是从第几位开始截取(从0 计数),第二位是截取长度
#! /bin/bash

sh /usr/hdp/3.1.0.0-78/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server cluster0.hadoop:6667 \
--topic ZRF01 \
|while read line
do
times=`echo ${line} | awk -F'\001' '{print $9}' |awk '{print $2}'`
hour=`echo ${times} | awk -F: '{print $1}'`
echo ${line} >> ${hour}.txt
done
~                

把文件load到hive数据仓库,根据小时作为分区

#! /bin/bash
# 查找到目录下所有。txt文件,切分出小时然后循环put到hdfs后load到hive数据库中
filenames=`ls data/*.txt |awk -F. '{print $1}'|awk -F/ '{print $2}'`
for filename in $filenames
do
echo $filename
hdfs dfs -put ./data/${filename}.txt /user/zhaoruifeng/weibodata/
hive -e "
          use zhaoruifeng;
          load data inpath '/user/zhaoruifeng/weibodata/${filename}.txt' overwrite into table weibo_ahour partition(hour_seq='$filename');
    "
done     

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐