目录

1 分布式任务调度

1.1 分布式任务调度介绍

1.2 elastic-job讲解

1.2.1 静态任务案例

1.2.2 动态任务案例

2 索引+静态页动态更新

2.1 分析

2.2 更新操作

3 日志收集

3.1 业务分析

3.2 Kafka

3.2.1 Kafka搭建

3.2.2 队列创建

3.2.3 消息发布

3.2.4 消息订阅

3.2.5 信息查看

3.3 收集日志-Lua

3.3.1 OpenRestry安装

3.3.2 详情页发布

3.3.3 Lua日志收集

4 Apache Druid日志实时分析

4.1 业务分析

4.2 Apache Druid

4.2.1 Apache Druid介绍

4.2.2 Apache Druid安装

4.2.3 数据摄入

4.2.3.1 离线数据摄入

4.2.3.2 实时数据摄入

4.2.4 Druid SQL

4.2.4.1 简介

4.2.4.2 语法

4.2.5 JDBC查询Apache Druid


1 分布式任务调度

1.1 分布式任务调度介绍

​ 很多时候,我们需要定时执行一些程序完成一些预定要完成的操作,如果手动处理,一旦任务量过大,就非常麻烦,所以用定时任务去操作是个非常不错的选项。

​ 现在的应用多数是分布式或者微服务,所以我们需要的是分布式任务调度,那么现在分布式任务调度流行的主要有elastic-job、xxl-job、quartz等,我们这里做一个对比:

feature quartz elastic-job xxl-job antares opencron
依赖 mysql jdk1.7+, zookeeper 3.4.6+ ,maven3.0.4+ mysql ,jdk1.7+ , maven3.0+ jdk 1.7+ , redis , zookeeper jdk1.7+ , Tomcat8.0+
HA 多节点部署,通过竞争数据库锁来保证只有一个节点执行任务 通过zookeeper的注册与发现,可以动态的添加服务器。 支持水平扩容 集群部署 集群部署
任务分片 支持 支持 支持
文档完善 完善 完善 完善 文档略少 文档略少
管理界面 支持 支持 支持 支持
难易程度 简单 简单 简单 一般 一般
公司 OpenSymphony 当当网 个人 个人 个人
高级功能 弹性扩容,多种作业模式,失效转移,运行状态收集,多线程处理数据,幂等性,容错处理,spring命名空间支持 弹性扩容,分片广播,故障转移,Rolling实时日志,GLUE(支持在线编辑代码,免发布),任务进度监控,任务依赖,数据加密,邮件报警,运行报表,国际化 任务分片, 失效转移,弹性扩容 , 时间规则支持quartz和crontab ,kill任务, 现场执行,查询任务运行状态
使用企业 大众化产品,对分布式调度要求不高的公司大面积使用 36氪,当当网,国美,金柚网,联想,唯品会,亚信,平安,猪八戒 大众点评,运满满,优信二手车,拍拍贷

1.2 elastic-job讲解

中文官网:ElasticJob - Distributed scheduled job solution

1.2.1 静态任务案例

​ 使用elastic-job很容易,我们接下来学习下elastic-job的使用,这里的案例我们先实现静态任务案例,静态任务案例也就是执行时间事先写好。

实现步骤:


1.引入依赖包
2.配置zookeeper节点以及任务名称命名空间
3.实现自定义任务,需要实现SimpleJob接口

1)在seckill-goods中引入依赖


<!-- ElasticJobAutoConfiguration自动配置类作用-->
<dependency>
    <groupId>com.github.kuhn-he</groupId>
    <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
    <version>2.1.5</version>
</dependency>

2)配置elastic-job

bootstrap.yml中配置elastic-job,如下:


elaticjob:
  zookeeper:
    server-lists: zk-server:3181
    namespace: updatetask

讲解:


server-lists:zookeeper的地址
namespace:定时任务命名空间

这里我们只展示了部分常用的参数,还有很多参数,但不一定常用,大家可以参考下面地址学习:

3)任务创建

创建com.seckill.goods.task.statictask.ElasticjobTask,代码如下:


@ElasticSimpleJob(
        cron = "5/10 * * * * ?",
        jobName = "updateTask",
        shardingTotalCount = 1
)
@Component
public class ElasticjobTask implements SimpleJob {

    /***
     * 执行任务
     * @param shardingContext
     */
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("-----------执行!");
    }
}

讲解:


cron:定时表达式
jobName:这里和bootstrap.yml中的namespace保持一致
shardingTotalCount:分片数量
1.2.2 动态任务案例

​ 参考地址:https://github.com/LuoLiangDSGA/spring-learning/tree/master/boot-elasticjob

​ 动态任务案例主要是讲解程序在运行时,动态添加定时任务,这种场景应用非常广泛。使用elastic-job实现动态添加定时任务的实现有点复杂,我们接下来实际操作一次。

步骤:


1.配置初始化的zookeeper地址
2.配置的定时任务命名空间(不一定会使用)
3.注册初始化数据
4.监听器->任务执行前后监听(可有可无)
5.动态添加定时任务实现
6.自定义任务处理过程-实现SimpleJob

1)**监听器创建**

​ 监听器采用AOP模式,类似前置通知和后置通知,doBeforeJobExecutedAtLastStarteddoAfterJobExecutedAtLastCompleted分别会在任务执行前和执行后调用,我们创建一个监听器实现任务调度前后拦截,com.seckill.goods.task.dynamic.ElasticJobListener:


public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {

    /****
     * 构造函数
     * @param startedTimeoutMilliseconds
     * @param completedTimeoutMilliseconds
     */
    public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    /***
     * 任务初始化前要做的事情,类似前置通知
     * @param shardingContexts
     */
    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        System.out.println("========doBeforeJobExecutedAtLastStarted========"+ TimeUtil.date2FormatHHmmss(new Date()));
    }

    /***
     * 任务执行完成后要做的事情,类似后置通知
     * @param shardingContexts
     */
    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        System.out.println("=======doAfterJobExecutedAtLastCompleted============="+ TimeUtil.date2FormatHHmmss(new Date()));
    }
}

2)**注册中心配置**

在bootstrap.yml中配置zk和namespace


#配置动态任务案例的zk和namespace
zk: zk-server:3181
namesp: autotask

创建配置类配置注册中心信息,com.seckill.goods.task.dynamic.ElasticJobConfig:


@Configuration
public class ElasticJobConfig {

    //配置文件中的zookeeper的ip和端口
    @Value(value = "${zk}")
    private String serverlists;
    //指定一个命名空间
    @Value("${namesp}")
    private String namespace;

    /***
     * 配置Zookeeper和namespace
     * @return
     */
    @Bean
    public ZookeeperConfiguration zkConfig() {
        return new ZookeeperConfiguration(serverlists, namespace);
    }

    /***
     * 向zookeeper注册初始化信息
     * @param config
     * @return
     */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
        return new ZookeeperRegistryCenter(config);
    }

    /****
     * 创建ElasticJob的监听器实例
     * @return
     */
    @Bean
    public ElasticJobListener elasticJobListener() {
        //初始化要给定超时多少秒重连
        return new ElasticJobListener(100L,100L);
    }
}

3)**任务构建**

​ 我们创建一个动态配置任务的类,任何逻辑代码需要创建定时任务,可以直接调用该类的指定方法即可。创建类:com.seckill.goods.task.dynamic.ElasticJobHandler,代码如下:


@Component
public class ElasticJobHandler {

    @Resource
    private ZookeeperRegistryCenter registryCenter;

    @Resource
    private ElasticJobListener elasticJobListener;

    /**
     * @param jobName:任务的命名空间
     * @param jobClass:执行的定时任务对象
     * @param shardingTotalCount:分片个数
     * @param cron:定时周期表达式
     * @param id:自定义参数
     * @return
     */
    private static LiteJobConfiguration.Builder simpleJobConfigBuilder(String jobName,
                                                                       Class<? extends SimpleJob> jobClass,
                                                                       int shardingTotalCount,
                                                                       String cron,
                                                                       String id) {
        //创建任务构建对象
        LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.
                        //任务命名空间名字、任务执行周期表达式、分片个数
                        newBuilder(jobName, cron, shardingTotalCount).
                        //自定义参数
                        jobParameter(id).
                        build(),
                jobClass.getCanonicalName()));
        //本地配置是否可覆盖注册中心配置
        builder.overwrite(true);
        return builder;
    }

    /**
     * 添加一个定时任务
     * @param cron:周期执行表达式
     * @param id:自定义参数
     * @param jobName:命名空间
     * @param instance:任务对象
     */
    public void addPublishJob(String cron,String id,String jobName,SimpleJob instance) {
        LiteJobConfiguration jobConfig = simpleJobConfigBuilder(
                jobName,
                instance.getClass(),
                1,
                cron,
                id).overwrite(true).build();
        //DynamicTask为具体的任务执行逻辑类
        new SpringJobScheduler(instance, registryCenter, jobConfig, elasticJobListener).init();
    }

    /***
     * Date转cron表达式
     */
    public static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";

    /**
     * 获得定时
     * @param date
     * @return
     */
    public static String getCron(final Date date) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(CRON_DATE_FORMAT);
        return simpleDateFormat.format(date);
    }
}

4)**执行逻辑**

我们接着创建一个类,用于执行自己所需要操作的逻辑,com.seckill.goods.task.dynamic.DynamicTask,代码如下:


public class DynamicTask implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        //传递的参数
        String id = shardingContext.getJobParameter();
        try {
            //具体任务逻辑
            System.out.println("执行你的逻辑代码!param:"+id);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

5)**调用测试**

创建com.seckill.goods.controller.TaskController动态调用创建任务的方法,代码如下:


@RestController
@RequestMapping(value = "/task")
public class TaskController {

    @Autowired
    ElasticJobHandler elasticJobHandler;

    /***
     * 动态创建任务
     * @param times:延迟时间,为了测试到效果,所以在当前时间往后延迟
     * @param jobname:任务名字
     * @param param:自定义参数
     * @return
     */
    @GetMapping
    public Result add(Long times,String jobname,String param){
        //在当前指定时间内延迟times毫秒执行任务
        Date date = new Date(System.currentTimeMillis()+times);
        //需要传递给定时任务的参数
        String cron = ElasticJobHandler.getCron(date);

        //执行任务
        elasticJobHandler.addPublishJob(cron,param,jobname,new DynamicTask());
        return new Result(true, StatusCode.OK,"添加任务成功!");
    }
}

6)**测试**

访问:http://localhost:18081/task?times=15000&jobname=asyncname&param=No001

后台执行效果如下:

2 索引+静态页动态更新

2.1 分析

​ 索引和静态资源的更新功能已经完成,所有秒杀商品都只是参与一段时间活动,活动时间过了需要将秒杀商品从索引中移除,同时删除静态页。我们需要有这么一个功能,在秒杀商品活动结束的时候,将静态页删除、索引库数据删除。

​ 此时我们可以使用elastic-job定时执行该操作,我们看如下活动表,活动表中有一个活动开始时间和活动结束时间,我们可以在每次增加、修改的时候,动态创建一个定时任务,把活动结束时间作为任务执行时间。


CREATE TABLE `tb_activity` (
  `id` varchar(60) NOT NULL,
  `name` varchar(100) NOT NULL,
  `status` int(2) NOT NULL DEFAULT '1' COMMENT '状态:1开启,2未开启',
  `startdate` date DEFAULT NULL,
  `begintime` datetime DEFAULT NULL COMMENT '开始时间',
  `endtime` datetime DEFAULT NULL COMMENT '结束时间',
  `total_time` float DEFAULT NULL,
  `is_del` int(1) DEFAULT '1' COMMENT '删除:1未删除,2已删除',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

实现步骤:


1.编写动态定时任务
2.修改/增加秒杀活动,添加动态定时任务,执行时间为秒杀活动结束时间

2.2 更新操作

​ 我们在上面动态案例上进行修改,当添加和修改活动时,执行一个定时任务,定时任务以活动结束的时间为任务执行时间,将活动ID作为任务名字。

1)创建定时任务对象

com.seckill.goods.task.dynamic.DynamicAsyncTask代码如下:



public class DynamicAsyncTask implements SimpleJob {

    @Autowired
    private SkuActMapper skuActMapper;

    @Autowired
    private SkuMapper skuMapper;

    @Override
    public void execute(ShardingContext shardingContext) {
        //传递的活动ID
        String id = shardingContext.getJobParameter();
        try {
            mldify(id);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /*****
     * 修改商品状态
     * @param id
     */
    public void mldify(String id){
        //根据活动ID查询所有秒杀商品ID
        SkuAct skuAct = new SkuAct();
        skuAct.setActivityId(id);
        List<SkuAct> skuActs = skuActMapper.select(skuAct);

        //获取所有id
        List<String> ids = new ArrayList<String>();
        for (SkuAct act : skuActs) {
            ids.add(act.getSkuId());
        }
        //活动结束,秒杀商品变更成普通商品
        Sku sku = new Sku();
        sku.setStatus("1");

        Example example = new Example(Sku.class);
        Example.Criteria criteria = example.createCriteria();
        criteria.andIn("id",ids);
        criteria.andEqualTo("status","2");
        skuMapper.updateByExample(sku,example);
    }
}

2)调用操作

​ 在活动修改和增加的时候,添加定时任务,修改com.seckill.goods.service.impl.ActivityServiceImpl的add和update方法,代码如下:


@Autowired
private ElasticJobHandler elasticJobHandler;


/**
 * 增加Activity
 * @param activity
 */
@Override
public void add(Activity activity){
    //选中的id集合
    List<Integer> seckillIds = activity.getSeckillIds();

    //循环添加活动到数据库中
    for (Integer seckillId : seckillIds) {
        Activity oneActivity = new Activity();
        BeanUtils.copyProperties(activity,oneActivity);
        //查询当前活动对应的信息
        SeckillTime seckillTime = seckillTimeMapper.selectByPrimaryKey(seckillId);
        oneActivity.setId("A"+idWorker.nextId());
        oneActivity.setBegintime(seckillTime.getStarttime());
        oneActivity.setEndtime(seckillTime.getEndtime());
        float times = TimeUtil.dif2hour(oneActivity.getBegintime(), oneActivity.getEndtime());
        oneActivity.setTotalTime(times);
        //添加
        activityMapper.insertSelective(oneActivity);

        //添加定时任务
        elasticJobHandler.addPublishJob(ElasticJobHandler.getCron(oneActivity.getEndtime()),oneActivity.getId(),oneActivity.getId(), new DynamicAsyncTask());
    }
}

3 日志收集

​ 日志在我们项目中是非常重要的,日志的作用也有差异,例如根据日志查找问题、根据日志做数据分析。在我们秒杀系统中,活跃的热点商品其实并不多,我们往往需要对热点商品进行额外处理。用户每次抢购商品的时候,都是从详情页发起的,因此统计热度商品,详情页的访问频次可以算一个方向,详情页访问的频次我们可以记录访问日志,然后统计某一段时间的访问量,根据访问量评判商品是否是热点商品。

3.1 业务分析

​ 日志收集流程如上图,用户请求经过nginx,此时已经留下了用户对某个商品访问的足迹,我们可以在这里将用户访问的商品信息发送给我们kafka,采用大数据实时分析工具Apache Druid实时存储访问信息,再通过程序分析计算访问情况。

3.2 Kafka

​ 从上面流程图中,可以看到实现日志收集中间件是MQ,我们秒杀系统中会搭建MQ服务。

​ 目前市面上成熟主流的MQ有Kafka 、RocketMQ、RabbitMQ、ActiveMQ,我们这里对每款MQ做一个简单介绍。

Kafka


Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
	1.快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化;
	2.高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率;
	3.高堆积:支持topic下消费者较长时间离线,消息堆积量大;
	4.完全的分布式系统:Broker、Producer、Consumer都原生自动支持分布式,依赖zookeeper自动实现复杂均衡;
	5.支持Hadoop数据并行加载:对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。
	6.高并发:支持数千个客户端同时读写

RocketMQ


RocketMQ的前身是Metaq,当Metaq3.0发布时,产品名称改为RocketMQ。RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点 :
	1.能够保证严格的消息顺序
 	2.提供丰富的消息拉取模式
 	3.高效的订阅者水平扩展能力
 	4.实时的消息订阅机制
 	5.支持事务消息
 	6.亿级消息堆积能力

RabbitMQ

使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。

kafka官网:Apache Kafka

3.2.1 Kafka搭建

​ 单机版的kafka搭建非常简单,不过我们今天采用Docker搭建kafka。Kafka使用Zookeeper存储Consumer、Broker信息,安装kafak的时候,需要先安装Zookeeper。

Zookeeper安装:

docker run -d --name zookeeper -p 3181:3181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

讲解:

/etc/localtime:/etc/localtime:使容器与宿主机时间能够同步

Kafka安装:

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.17.0.223:3181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.223:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

讲解:


KAFKA_BROKER_ID:当前Kafka的唯一ID
KAFKA_ZOOKEEPER_CONNECT:当前Kafka使用的Zookeeper配置信息
KAFKA_ADVERTISED_LISTENERS:对外发布(暴露)的监听器,对外发布监听端口、地址
KAFKA_LISTENERS:监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。

IP更改:

外部程序如果想链接Kafka,需要根据IP链接,所以我们可以给Kafka一个IP名字,编辑:/opt/kafka_2.12-2.4.1/config/server.properties,在文件最末尾添加如下代码:

host.name=192.168.211.137
3.2.2 队列创建

进入kafka容器,创建队列:


docker exec -it kafka /bin/sh

cd /opt/kafka_2.12-2.4.1/bin

./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic itemaccess

讲解:


解释:使用kafka-topics.sh创建队列
        --create:执行创建一个新的队列操作
        --bootstrap-server:需要链接的kafka配置,必填
        --replication-factor 1:设置分区的副本数量
        --topic itemaccess:队列的名字叫itemaccess
3.2.3 消息发布

在kafka容器中执行消息发送(接着上面的步骤执行):

./kafka-console-producer.sh --broker-list localhost:9092 --topic itemaccess

讲解:


解释:使用kafka-console-producer.sh实现向kafka的test队列发送消息
        --broker-list:指定将消息发给指定的Kafka服务的链接列表配置  HOST1:Port1,HOST2:Port2
        --topic itemaccess:指定要发送消息的队列名字

我们发送的消息如下(输入信息,回车即可发送):

{"actime":"2020-4-10 9:50:10","uri":"http://www-seckill.test.net/items/333.html","IP":"119.123.33.231","Token":"Bearer itcast"}
3.2.4 消息订阅

在kafka容器中执行消息订阅(接着上面的步骤执行,但要先按ctrl+c退出控制台):

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic itemaccess --from-beginning

讲解:


解释:使用kafka-console-consumer.sh从kafka中消费test队列的数据
        --bootstrap-server:从指定的kafka中读取消息
        --topic itemaccess:读取队列的名字
        --from-beginning:从最开始的数据读取,也就是读取所有数据的意思

查看已经存在的主题:

./kafka-topics.sh --zookeeper localhost:3181 --list

删除主题:

./kafka-topics.sh --zookeeper localhost:3181 --delete --topic itemaccess

查看主题信息:

/kafka-topics.sh --zookeeper localhost:3181 --describe --topic itemaccess
3.2.5 信息查看

​ 上面执行整个流程如下图:

Kafka注册信息查看:

​ 我们进入到zookeeper中,可以查看到kafka的注册信息,相关操作命令如下:


docker exec -it zookeeper /bin/bash

cd bin

./zkCli.sh

ls /

效果如下:

关于Kafka的学习,大家可以直接参考:Apache Kafka

3.3 收集日志-Lua

​ Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

​ OpenResty® 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。OpenResty 通过lua脚本扩展nginx功能,可提供负载均衡、请求路由、安全认证、服务鉴权、流量控制与日志监控等服务。

​ OpenResty® 通过汇聚各种设计精良的 Nginx 模块(主要由 OpenResty 团队自主开发),从而将 Nginx 有效地变成一个强大的通用 Web 应用平台。这样,Web 开发人员和系统工程师可以使用 Lua 脚本语言调动 Nginx 支持的各种 C 以及 Lua 模块,快速构造出足以胜任 10K 乃至 1000K 以上单机并发连接的高性能 Web 应用系统。

​ 关于Lua的基本知识,我们这里就不学习了,直接进入日志收集的使用操作。

3.3.1 OpenRestry安装

关于OpenRestry的学习,大家可以参考:OpenResty® - 开源官方站

下载OpenRestry:

wget https://openresty.org/download/openresty-1.11.2.5.tar.gz

解压:

tar -xf openresty-1.11.2.5.tar.gz

安装(进入到解压目录进行安装):


cd openresty-1.11.2.5

./configure --prefix=/usr/local/openresty --with-luajit --without-http_redis2_module --with-http_stub_status_module --with-http_v2_module --with-http_gzip_static_module --with-http_sub_module

make

make install

软件会安装到/usr/local/openresty,这里面会包含nginx。

配置环境变量:


vi /etc/profile

export PATH=/usr/local/openresty/nginx/sbin:$PATH

source /etc/profile
3.3.2 详情页发布

​ 商品详情页生成后会存储在/usr/local/server/web/items目录下,详情页是静态网页,我们可以使用Nginx直接发布。

​ 商品详情页的访问:http://192.168.211.137/items/S1235433012716498944.html,我们可以让所有以`/items/`的请求直接到`/usr/local/server/web/`目录下找。

修改nginx.conf:


cd /usr/local/openresty/nginx/conf/

vi nginx.conf

修改内容如下:

启动nginx,并访问测试:http://192.168.211.137/items/S1235433012716498944.html

3.3.3 Lua日志收集

​ 使用Lua实现日志收集,并向Kafka发送访问的详情页信息,此时我们需要安装一个依赖组件lua-restry-kafka。关于lua-restry-kafka的下载和使用,可以参考https://github.com/doujiang24/lua-resty-kafka

1)收集流程

​ 日志收集流程如下:

​ 用户请求/web/items/1.html,进入到nginx第1个location中,在该location中向Kafka发送请求日志信息,并将请求中的/web去掉,跳转到另一个location中,并查找本地文件,这样既可以完成日志收集,也能完成文件的访问。

2)插件配置

lua-restry-kafka:https://github.com/doujiang24/lua-resty-kafka

资料\lua中已经提供了该包lua-resty-kafka-master.zip,我们需要将该文件上传到/usr/local/openrestry目录下,并解压,再配置使用。

解压:

unzip lua-resty-kafka-master.zip

配置:

修改nginx.conf,在配置文件中指定lua-resty-kafka的库文件位置:

lua_package_path "/usr/local/openresty/lua-resty-kafka-master/lib/?.lua;;";

配置效果图如下:

3)日志收集

​ 用户访问详情页的时候,需要实现日志收集,日志收集采用Lua将当前访问信息发布到Kafka中,因此这里要实现Kafka消息生产者。

我们定义一个消息格式:


{
  "actime": "2020-4-10 9:50:30",
  "uri": "http://192.168.211.137/items/S1235433012716498944.html",
  "ip": "119.123.33.231",
  "token": "Bearer TESTOOPJAVAITCAST"
}

生产者脚本:

定义好了消息格式后,创建一个生产者,往Kafka中发送详情页的访问信息。我们创建一个lua脚本,items-access.lua,脚本内容如下:

上图脚本内容如下:


--引入json解析库
local cjson = require("cjson")
--kafka依赖库
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
--配置kafka的链接地址
local broker_list = {
      { host = "192.168.211.137", port = 9092 }
}
--创建生产者
local pro = producer:new(broker_list,{ producer_type="async"})

--获取IP
local headers=ngx.req.get_headers()
local ip=headers["X-REAL-IP"] or headers["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"

--定义消息内容
local logjson = {}
logjson["uri"]=ngx.var.uri
logjson["ip"]=ip
logjson["token"]="Bearer TEST"
logjson["actime"]=os.date("%Y-%m-%d %H:%m:%S")

--发送消息
local offset, err = pro:send("itemaccess", nil, cjson.encode(logjson))

--页面跳转
local uri = ngx.var.uri
uri = string.gsub(uri,"/web","")
ngx.exec(uri)

4)nginx配置

​ 按照上面的流程图,我们需要配置nginx的2个location,修改nginx.conf,代码如下:

上图代码如下:


server {
    listen       80;
    server_name  localhost;

    #/web开始的请求,做日志记录,然后跳转到下面的location
    location /web/items/ {
    content_by_lua_file /usr/local/openresty/nginx/lua/items-access.lua;
    }


    #商品详情页,以/items/开始的请求,直接在详情页目录下找文件
    location /items/ {
    #日志处理
    #content_by_lua_file /usr/local/openresty/nginx/lua/items-access.lua;
    root /usr/local/server/web/;
    }
}

5)日志收集测试

请求地址:http://192.168.211.137/web/items/S1235433012716498944.html

查看Kafka的itemaccess队列数据:

4 Apache Druid日志实时分析

4.1 业务分析

​ 秒杀业务中,通常会有很多用户同时蜂拥而上去抢购热卖商品,经常会出现抢购人数远大于商品库存。其实在秒杀过程中,热卖商品并不多,几乎只占1%,而99%的流量都源自热卖商品,很有可能因为这1%的热卖商品导致服务器宕机,因此针对热卖商品我们要做特殊处理。

​ 热卖商品我们这里称为热点商品,针对热点商品的处理,有这么几种思路,一是优化,二是限制,三是隔离。

​ 优化:优化热点数据最有效的办法就是缓存热点数据。

​ 限制:限制其实是一种削峰手段,我们可以把热点商品抢单采用队列来存储用户抢单信息,将热点抢单限制在一个队列里,防止热点商品抢单占用太多的资源服务,而使得其他服务无法获取抢单机会。

​ 隔离:隔离其实就是将热点商品和非热点商品进行数据源的隔离、操作流程的隔离,不要因为1%的热点数据影响到另外的99%数据。我们可以把热点商品数据存储到缓存中和非热点数据分开,抢单程序也可以和非热点抢单分开。

​ 热点数据又分为离线热点数据和实时热点数据,离线热点数据主要是分析过往热点商品信息,这个统计起来并无难度,可以直接从历史数据库中查询分析。但根据用户抢单实时数据进行分析是一个很困难的事,首先要存储大量的访问信息,同时还能高效的实时统计访问日志信息,从中获取热点商品信息。

4.2 Apache Druid

4.2.1 Apache Druid介绍

介绍

​ Apache Druid 是一个分布式的、支持实时多维 OLAP 分析的数据处理系统。它既支持高速的数据实时摄入,也支持实时且灵活的多维数据分析查询。因此 Druid 最常用的场景是大数据背景下、灵活快速的多维 OLAP 分析。 另外,Druid 还有一个关键的特点:它支持根据时间戳对数据进行预聚合摄入和聚合分析,因此也有用户经常在有时序数据处理分析的场景中用到它。

OLTP与OLAP的区别:

OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理。

OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的分析查询结果。

OLAP和OLTP区别:

OLTP OLAP
用户 面向操作人员,支持日常操作 面向决策人员,支持管理需要
功能 日常操作处理 分析决策
DB 设计 面向应用,事务驱动 面向主题,分析驱动
数据 当前的,最新的细节的 历史的,聚集的,多维的,集成的,统一的
存取 可更新,读/写数十条记录 不可更新,但周期性刷新,读上百万条记录
工作单位 简单的事务 复杂的查询
DB 大小 100MB-GB 100GB-TB

OLTP就是面向我们的应用系统数据库的,OLAP是面向数据仓库的。

Apache Druid 特性: 


亚秒响应的交互式查询,支持较高并发。
支持实时导入,导入即可被查询,支持高并发导入。
采用分布式 shared-nothing 的架构,可以扩展到PB级。
支持聚合函数,count 和 sum,以及使用 javascript 实现自定义 UDF。
支持复杂的 Aggregator,近似查询的 Aggregator 例如 HyperLoglog 以及 Yahoo 开源的 DataSketches。
支持Groupby,Select,Search查询。

开源OLAP数据处理系统性能方面我们做个对比:

Apache Druid 架构设计

Druid自身包含下面4类节点:


1.Realtime Node:即时摄入实时数据,生成Segment(LSM-Tree实现与Hbase基本一致)文件。
2.Historical Node:加载已生成好的数据文件,以供数据查询。
3.Broker Node:对外提供数据查询服务,并同时从Realtime Node和Historical Node查询数据,合并后返回给调用方。
4.Coordinator Node:负责Historical Node的数据负载均衡,以及通过Rule管理数据生命周期。

同时,Druid集群还包含以下3类外部依赖:


1.元数据库(Metastore):存储druid集群的元数据信息,如Segment的相关信息,一般使用MySQL或PostgreSQL
2.分布式协调服务(Coordination):为Druid集群提供一致性服务,通常为zookeeper
3.数据文件存储(DeepStorage):存储生成的Segment文件,供Historical Node下载,一般为使用HDFS

数据摄入

​ Apache Druid同时支持流式和批量数据摄入。通常通过像 Kafka 这样的消息总线(加载流式数据)或通过像 HDFS 这样的分布式文件系统(加载批量数据)来连接原始数据源。

4.2.2 Apache Druid安装

Apache Druid的安装方面,我们可以参考官方文档实现。

JDK:java8(8u92+)

下载地址:https://druid.apache.org/downloads.html

解压该压缩包:


tar -xf apache-druid-0.17.0-bin.tar.gz
cd apache-druid-0.17.0

包文件如下:

启动单机版Apache Druid:

./bin/start-micro-quickstart

启动后,访问:http://192.168.211.137:8888

4.2.3 数据摄入
4.2.3.1 离线数据摄入

从一个文件中将数据加载到Apache Druid,参考地址:W<https://druid.apache.org/docs/latest/tutorials/tutorial-batch.html>,如下操作:

1)点击Load data->Local disk->Connect data

2)选择要导入的数据

我们要导入的数据在/usr/local/server/apache-druid-0.17.0/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz,需要把该文件的相对路径填写到右边表单中,再点击Apply,如下图:

3)解析数据

在上一个步骤上点击Next:Parse data,此时会解析导入的数据,如下图:

4)解析时间

在上一个步骤上点击Next: Parse time,Apache Druid要求每条数据都有一个time列,如果我们导入的数据没有该列,Apache Druid会自动帮助我们创建该列,如下图:

5)数据分区设置

点击下一步一直到Partition,我们根据需要设置数据分区方式,如下图:

讲解:


Type:数据粒度使用的类型
Segment granularity:分片文件每个segment包含的时间戳范围
Force guaranteed rollup:是否启用批量推送模式
Partitioning type:分区类型
Max rows per segment:用于分片。确定每个段中的行数。

更多参数如下图:

6)设置数据源

Publish设置,注意设置数据源名字,这里类似数据库中数据库名字。

7)提交配置

最后一步需要提交配置,如下图,点击submit即可。

4.2.3.2 实时数据摄入

​ 前面的案例是离线数据的摄入,接着我们实现实时数据摄入,我们以收集用户访问商品详情页的访问记录为例,如下图:

参考地址:https://druid.apache.org/docs/latest/tutorials/tutorial-kafka.html

1)load data

2)配置Kafka源

3)配置数据源名字

其他的步骤和之前文件摄入一样,直到配置数据源名字,我们配置数据源名字叫itemlogs,最后一步submit和之前一样,如下图:

查询效果如下:

4.2.4 Druid SQL
4.2.4.1 简介

​ Apache Druid SQL是一个内置的SQL层,是Druid基于JSON的查询语言的替代品,由基于Apache Calcite的解析器和规划器提供支持。Druid SQL将SQL转换为Broker本机Druid查询,然后将其传递给数据进程。除了在Broker上转换SQL的(轻微)开销之外,与本机查询相比,没有额外的性能损失。

4.2.4.2 语法

每个Druid数据源都显示为“Druid”模式,这也是默认模式,Druid数据源引用为druid.dataSourceName或者简单引用dataSourceName

可以选择使用双引号引用数据源和列名等标识符。要在标识符中转义双引号,请使用另一个双引号,例如"My ""cat"" identifier",所有标识符都区分大小写。

文字字符串应引用单引号,如'foo',文字数字可以用100(表示整数),100.0(表示浮点值)或1.0e5(科学记数法)等形式编写。时间戳可以写成TIMESTAMP '2000-01-01 00:00:00'。时间算法,可以这样写INTERVAL '1' HOURINTERVAL '1 02:03' DAY TO MINUTEINTERVAL '1-2' YEAR TO MONTH,等等。

Druid SQL支持具有以下结构的SELECT查询:


[ EXPLAIN PLAN FOR ]
[ WITH tableName [ ( column1, column2, ... ) ] AS ( query ) ]
SELECT [ ALL | DISTINCT ] { * | exprs }
FROM table
[ WHERE expr ]
[ GROUP BY exprs ]
[ HAVING expr ]
[ ORDER BY expr [ ASC | DESC ], expr [ ASC | DESC ], ... ]
[ LIMIT limit ]
[ UNION ALL <another query> ]

查询所有:

SELECT * FROM "itemlogs"

查询count列:

SELECT "count" FROM "itemlogs"

查询前5条:

SELECT * FROM "itemlogs" LIMIT 5

分组查询:

SELECT ip FROM "itemlogs" GROUP BY ip

排序:

SELECT * FROM "itemlogs" ORDER BY __time DESC

求和:

SELECT SUM("count") FROM "itemlogs"

最大值:

SELECT MAX("count") FROM "itemlogs"

平均值:

SELECT AVG("count") FROM "itemlogs"

查询6年前的数据:

SELECT * FROM "wikiticker" WHERE "__time" >= CURRENT_TIMESTAMP - INTERVAL '6' YEAR

去除重复查询:

SELECT DISTINCT "count" FROM "accessitem"
4.2.5 JDBC查询Apache Druid

​ Apache Calcite是面向Hadoop新的查询引擎,它提供了标准的SQL语言、多种查询优化和连接各种数据源的能力,除此之外,Calcite还提供了OLAP和流处理的查询引擎。

​ 如果使用java,可以使用Calcite JDBC驱动程序进行Druid SQL查询。可以下载Avatica客户端jar后,将其添加到类路径并使用连接字符串jdbc:avatica:remote:url=http://192.168.211.137:8082/druid/v2/sql/avatica/

​ 如果是Maven项目,需要引入avatica-core包,如下:


<dependency>
    <groupId>org.apache.calcite.avatica</groupId>
    <artifactId>avatica-core</artifactId>
    <version>1.15.0</version>
</dependency>

使用案例:


public static void main(String[] args) throws Exception{
    //链接地址
    String url = "jdbc:avatica:remote:url=http://192.168.211.137:8082/druid/v2/sql/avatica/";
    AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(url);

    //SQL语句,查询2020-4-10 11:50:30之后的访问uri和访问数量
    String sql="SELECT uri,count(*) AS \"viewcount\" FROM(SELECT * FROM \"itemlogs\" WHERE __time>'2020-4-10 11:50:30' ORDER BY __time DESC) GROUP BY uri LIMIT 100";

    //创建Statment
    AvaticaStatement statement = connection.createStatement();

    //执行查询
    ResultSet resultSet = statement.executeQuery(sql);

    while (resultSet.next()) {
        //获取uri
        String uri = resultSet.getString("uri");
        String viewcount = resultSet.getString("viewcount");
        System.out.println(uri+"--------->"+viewcount);
    }
}

知识点:

Druid的时区和国内时区不一致,会比我们的少8个小时,我们需要修改配置文件,批量将时间+8,代码如下:

sed -i "s/Duser.timezone=UTC/Duser.timezone=UTC+8/g" `grep Duser.timezone=UTC -rl ./`
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐