从Canal读取数据

  • 添加依赖
<dependencies>
        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
        <!--canal 客户端, 从 canal 服务器读取数据-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <!-- kafka 客户端 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.example</groupId>
            <artifactId>gmall-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
  • 从 Canal读取数据
    在这里插入图片描述
    参数分析
    1.val address = new InetSocketAddress(“hadoop12”,11111)
    port:11111在哪里查看?
    /export/servers/canal/conf/canal.properties下
    在这里插入图片描述
    2.val connector = CanalConnectors.newSingleConnector(address, “example”, “”, “”)
    “example”, “”, ""是什么?
    example是当时安装时候配置了里面的文件
    在这里插入图片描述

“”, ""是用户名和密码,因为安装的时候没有配,默认为空!

import java.net.InetSocketAddress
import java.util

import com.alibaba.fastjson.JSONObject
import com.alibaba.otter.canal.client.CanalConnectors
import com.alibaba.otter.canal.protocol.CanalEntry
import com.alibaba.otter.canal.protocol.CanalEntry.{EntryType, EventType, RowChange}

import scala.collection.JavaConversions._

object CanalClient {
  def main(args: Array[String]): Unit = {
    //1.连接到canal
    val address = new InetSocketAddress("hadoop12",11111)
    val connector = CanalConnectors.newSingleConnector(address, "example", "", "")
    connector.connect()  //连接
    //1.1 订阅数据  gmall1602.*表示gmall1602数据下所有的表
    connector.subscribe("gmall1602.*")
    //2.读数据,解析数据
    while (true) { //2.1使用循环的方式持续的从canal服务中读取数据
      val msg = connector.get(100) //2.2表示一次从canal拉去最多100条sql数据引起的变化
      //2.3一个entry封装一条sql的变化结果   . 做非空的判断
      val entriesOption = if (msg != null) Some(msg.getEntries) else None
      if (entriesOption.isDefined && entriesOption.get.nonEmpty) {
        val entries = entriesOption.get
        for (entry <- entries){
          // entryType应该是RowData类型
          if (entry != null && entry.hasEntryType && entry.getEntryType == EntryType.ROWDATA){
            //2.4 从每个entry中获取一个StoreValue
            val storeValue = entry.getStoreValue
            //2.5 把storeValue解析出来rowChange
            val rowChange = RowChange.parseFrom(storeValue)
            //2.6 一个storeValue中有多个RowData,每个RowData表示一行数据的变化
            val rowDatas = rowChange.getRowDatasList
            //2.7 解析rowDatas中的每行的每列的数据
            handleData(entry.getHeader.getTableName,rowDatas,rowChange.getEventType)

          }

        }
      }else{
        println("没有拉取到数据,2s之后重新拉取")
        Thread.sleep(2000)
      }
    }

    //3.把数据转成json字符串写入到kafka中  {列名:列值,列名:列值}
  }
  //处理RowData数据
  def handleData(tableName: String,
                 rowDatas: util.List[CanalEntry.RowData],
                 eventType: CanalEntry.EventType) = {
    if ("order_info" == tableName && eventType == EventType.INSERT && rowDatas != null && rowDatas.nonEmpty){
      for (rowData <- rowDatas){
        val result = new JSONObject()
        //1. 一行所有的变化后的列
        val columnsList = rowData.getAfterColumnsList
        //2.一行数据将来在kafka中,应该放一样,多列中封装到一个json字符串中
        for (column <- columnsList){
          val key = column.getName //列名
          val value = column.getValue  //列值
          result.put(key,value)
        }
        println(result.toJSONString)

      }
    }
  }
}

准备数据库数据

  • 步骤 1: 创建数据库(两种方法)
create database gmall1602 DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;

在这里插入图片描述

  • 步骤 2: 执行脚本, 建表和插入数据
    脚本文件
链接:https://pan.baidu.com/s/1iTbUaFek7A8z5PD4NXYbMQ 
提取码:8jow

方法一:
1.右键数据库
在这里插入图片描述
2.在这里插入图片描述
3.成功了!在这里插入图片描述
方法二:
1.gmall1602.sql文件在当前目录下

mysql -uroot -p -Dgmall1602 < gmall1602.sql

在这里插入图片描述
2.登入mysql中输入

source /export/data/gmall1602.sql
  • 步骤 3: 插入数据
# 日期  订单个数 用户数 是否删除以前的数据
call init_data("2020-09-23", 10,2,false)

代码实现

在idea中 运行CanalClient(记得启动canal)
在这里插入图片描述

读取数据发送到Kafka

  • 实现 Kafka 生产者
import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}


object MyKafkaUtil {
  val props = new Properties()
  props.put("bootstrap.servers","hadoop12:9092,hadoop13:9092,hadoop14:9092")
  // key序列化
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  // value序列化
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

  val producer = new KafkaProducer[String, String](props)

  def send(topic: String,content:String)={
    producer.send(new ProducerRecord[String,String](topic,content))

  }


}
  • 在gmall-common下增添下Constant增添一条数据
public final static String TOPIC_ORDER_INFO="topic_order_info";
  • 使用生产者向 Kafka 生成数据
    考虑到将来存储到 ES 中的数据是 Json 格式, 所以, 我们在 Kafka 存储的的时候也存储为 Json 格式的.
import java.net.InetSocketAddress
import java.util

import com.alibaba.fastjson.JSONObject
import com.alibaba.otter.canal.client.CanalConnectors
import com.alibaba.otter.canal.protocol.CanalEntry
import com.alibaba.otter.canal.protocol.CanalEntry.{EntryType, EventType, RowChange}

import scala.collection.JavaConversions._

object CanalClient {
  def main(args: Array[String]): Unit = {
    //1.连接到canal
    val address = new InetSocketAddress("hadoop102",11111)
    val connector = CanalConnectors.newSingleConnector(address, "example", "", "")
    connector.connect()  //连接
    //1.1 订阅数据  gmall1602.*表示gmall1602数据下所有的表
    connector.subscribe("gmall1602.*")
    //2.读数据,解析数据
    while (true) { //2.1使用循环的方式持续的从canal服务中读取数据
      val msg = connector.get(100) //2.2表示一次从canal拉去最多100条sql数据引起的变化
      //2.3一个entry封装一条sql的变化结果   . 做非空的判断
      val entriesOption = if (msg != null) Some(msg.getEntries) else None
      if (entriesOption.isDefined && entriesOption.get.nonEmpty) {
        val entries = entriesOption.get
        for (entry <- entries){
          // entryType应该是RowData类型
          if (entry != null && entry.hasEntryType && entry.getEntryType == EntryType.ROWDATA){
            //2.4 从每个entry中获取一个StoreValue
            val storeValue = entry.getStoreValue
            //2.5 把storeValue解析出来rowChange
            val rowChange = RowChange.parseFrom(storeValue)
            //2.6 一个storeValue中有多个RowData,每个RowData表示一行数据的变化
            val rowDatas = rowChange.getRowDatasList
            //2.7 解析rowDatas中的每行的每列的数据
            handleData(entry.getHeader.getTableName,rowDatas,rowChange.getEventType)
          }
        }
      }else{
        println("没有拉取到数据,2s之后重新拉取")
        Thread.sleep(2000)
      }
    }

  }
  //处理RowData数据
  def handleData(tableName: String,
                 rowDatas: util.List[CanalEntry.RowData],
                 eventType: CanalEntry.EventType) = {
    if ("order_info" == tableName && eventType == EventType.INSERT && rowDatas != null && rowDatas.nonEmpty){
      for (rowData <- rowDatas){
        val result = new JSONObject()
        //1. 一行所有的变化后的列
        val columnsList = rowData.getAfterColumnsList
        //2.一行数据将来在kafka中,应该放一样,多列中封装到一个json字符串中
        for (column <- columnsList){
          val key = column.getName //列名
          val value = column.getValue  //列值
          result.put(key,value)
        }
        //println(result.toJSONString)
       //3把数据写入到kafka
       MyKafkaUtil.send(Constant.TOPIC_ORDER_INFO,result.toJSONString)
      }
    }

  }

}

  • 查看结果
    1.运行idea中的CanalClient
    2.在kafka是否接收到数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop12:9092 --topic topic_order_info

3.模拟数据
在这里插入图片描述

4.查看结果
在这里插入图片描述

如果canal拉取不了数据,请看点击这里

在canal中conf/example下的instance.properties看看mysql下的账号密码是否正确
在这里插入图片描述

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐