提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

跟随公司项目学习已经有四五个月了,想在这里记录一下我在跟随项目学习的过程中学到的一些关于Influxdb2的心得。我没有系统的学习过Influxdb2的知识,所以我的心得不一定百分百正确,只能说是我的个人经验,记录下来供大家参考,如有错误请指出,谢谢。


一、项目使用Maven管理插件包

我们项目使用的是influxdb-client-java7.1.0

	<dependency>
      <groupId>com.influxdb</groupId>
      <artifactId>influxdb-client-java</artifactId>
      <version>7.1.0</version>
    </dependency>

二、使用步骤

1.初始化

代码如下(示例):

	// 写入配置
	@Value("${spring.influxdb.url}")
    private String url;

    @Value("${spring.influxdb.token}")
    private String token;

    @Value("${spring.influxdb.org}")
    private String organization;

    @Value("${spring.influxdb.bucket}")
    private String bucket;
    
    private InfluxDBClient influxDBClient;
    
    @Bean
    public void init() {
    	// 如果查询的数据量大或者查询语句消耗大则有可能会导致 Influxdb 数据库的查询时间过长
        OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder()
                .protocols(Collections.singletonList(Protocol.HTTP_1_1))
                .connectTimeout(30, TimeUnit.SECONDS) // 连接超时时间
                .readTimeout(30, TimeUnit.SECONDS) // 读取超时时间
                .writeTimeout(30, TimeUnit.SECONDS); // 写入超时时间
        
        // 创建 InfluxDBClientOptions
        InfluxDBClientOptions options = InfluxDBClientOptions.builder()
                .url(url) // 设置 InfluxDB 的 URL
                .authenticateToken(token.toCharArray()) // 使用 Token 进行身份验证
                .org(organization) // 设置组织
                .bucket(bucket) // 设置存储桶
                .okHttpClient(okHttpClient) // 设置 OkHttpClient
                .build();
                
 		// 创建 InfluxDBClient 实例
        this.influxDBClient = InfluxDBClientFactory.create(options);
    }

2.写入数据

代码如下(示例):

	// 通过 Map 以 Point 形式写入数据点
	public void writePoint(String measurement, Map<String, String> tags, Map<String, Object> fields, Instant timeInstant) {
        Point point = Point.measurement(measurement).time(timeInstant, WritePrecision.NS);
        point.addTags(tags);
        point.addFields(fields);
        influxDBClient.makeWriteApi().writePoint(bucket, organization, point);
    }

	// 通过 实体类 以 Measurement 形式写入数据表
    public void writeMeasurement(Object object) {
        influxDBClient.makeWriteApi().writeMeasurement(bucket, organization, WritePrecision.NS, object);
    }

	// 通过 实体类 以 Measurement 形式写入数据表
    public void writeMeasurements(List<?> objects) {
        influxDBClient.makeWriteApi().writeMeasurements(bucket, organization, WritePrecision.NS, objects);
    }

上述写入数据表的实体类必须继承于下面的基类,其中的time是必填项,result是查询influxdb之后出来的一个表名,与 yield 这个相关,比如查询语句中如果存在 |> yield(name:“MqBase”) 这个语句,则查询出来的数据中result就是"MqBase"这个字符串。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MqBase {
    /**
     * 查询结果 返回表名
     */
    private String result;

    /**
     * 索引
     */
    @Column(tag = true)
    private String myTag;
    
    /**
     * 时间
     */
    @Column(timestamp = true)
    private Instant time;
}

3.删除数据

代码如下(示例):

	// 通过 start stop measurement tag 删除对应时间的表索引数据
	public void deleteMeasurement(LocalDateTime start, LocalDateTime stop, String measurement, String tag) {
        OffsetDateTime startTime = ZonedDateTime.of(start, ZoneId.systemDefault()).toOffsetDateTime();
        OffsetDateTime endTime = ZonedDateTime.of(stop, ZoneId.systemDefault()).toOffsetDateTime();
        String predicate = "_measurement = \"" + measurement + "\"" ;
        predicate += " and myTag = \"" + tag + "\"";
        influxDBClient.getDeleteApi().delete(startTime, endTime, predicate, bucket, organization);
    }

4.查询数据

代码如下(示例):

	/**
     * influxDB数据库查询主要语句
     * 可以使用|> range(start: 2025-01-01T00:00:00Z, stop: 2025-01-08T00:00:00Z)
     * 也可以使用|> range(start: 1735660800, stop: 1736265600) 
     * @param measurement 查询表名
     * @param start 查询开始时间
     * @param stop 查询结束时间
     * @param filter 查询条件 tag
     *
     * @return influxQuery influxDB数据库查询语句
     **/
    private String getInfluxQueryString(String measurement, String start, String stop, Map<String, String> filter) {
        String influxQuery = "from(bucket: \"" + bucket + "\")";
        influxQuery += " |> range(start: " + start;
        if (StringUtils.isNotEmpty(stop)) {
            influxQuery += ", stop: " + stop;
        }
        influxQuery += ") |> filter(fn: (r) => r._measurement == \"" + measurement + "\"";
        for (Map.Entry<String, String> entry : filter.entrySet()) {
            influxQuery += " and r." + entry.getKey() + " == \"" + entry.getValue() + "\"";
        }
        influxQuery += ")";
        return influxQuery;
    }
    
    // 查询返回 List<FluxTable> extra 可写 last() first() limit(n: 1) 等等
	public List<FluxTable> doQuery(String measurement, String start, String stop, Map<String, String> filter, String extra) {
        String influxQuery = getInfluxQueryString(measurement, start, stop, filter);
        if (StringUtils.isNotEmpty(extra)) {
            influxQuery += " |> " + extra;
        }
        log.info("Influxdb Query: \n{}", influxQuery.replace("|>", "\n|>"));
        return influxDBClient.getQueryApi().query(influxQuery, organization);
    }
    
    // 查询返回 List<T> 入参增加 Class<T> clazz
	public <T> List<T> doQuery(String measurement, String start, String stop, Map<String, String> filter, String extra, Class<T> clazz) {
        String influxQuery = getInfluxQueryString(measurement, start, stop, filter);
        // 转换实体类 Class 需要以下语句 pivot (行转列, 将多个 field 合成一个 Record)
		influxQuery += " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")";
        if (StringUtils.isNotEmpty(extra)) {
            influxQuery += " |> " + extra;
        }
        log.info("Influxdb Query: \n{}", influxQuery.replace("|>", "\n|>"));
        return influxDBClient.getQueryApi().query(influxQuery, organization, clazz);
    }

	// 查询返回 csv 格式的 String 方便直接导出 csv,当然数据量大的话导出不建议使用这个,需要的时间太久了,应该使用单独的java服务在数据库服务器中直接运行命令行导出
    public String doQueryRaw(String measurement, String start, String stop, Map<String, String> filter, String extra) {
        String influxQuery = getInfluxQueryString(measurement, start, stop, filter);
        if (StringUtils.isNotEmpty(extra)) {
            influxQuery += " |> " + extra;
        }
        log.info("Influxdb Query: \n{}", influxQuery.replace("|>", "\n|>"));
        return influxDBClient.getQueryApi().queryRaw(influxQuery, organization);
    }

5.导出数据

influxdb2 命令行导出命令

D:/influxdb2/influx.exe query 'from(bucket: "bucket") |> range(start: 1735660800, stop: 1736265600) |> filter(fn: (r) => r.myTag== "tag") |> drop(columns: ["_start", "_stop"])' --raw > D:/influxdb2/test.csv  

当然如果你想使用命令行导出数据需要先对 influxdb2 的 influx.exe 进行配置。


总结

后续有时间的时候在进行补充吧,今天就先分享这么多,感谢各位大佬的关注与阅读!

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐