influxdb-client-java使用心得(一)
java的influxdb2使用基础教程
·
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
前言
跟随公司项目学习已经有四五个月了,想在这里记录一下我在跟随项目学习的过程中学到的一些关于Influxdb2的心得。我没有系统的学习过Influxdb2的知识,所以我的心得不一定百分百正确,只能说是我的个人经验,记录下来供大家参考,如有错误请指出,谢谢。
一、项目使用Maven管理插件包
我们项目使用的是influxdb-client-java7.1.0
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>7.1.0</version>
</dependency>
二、使用步骤
1.初始化
代码如下(示例):
// 写入配置
@Value("${spring.influxdb.url}")
private String url;
@Value("${spring.influxdb.token}")
private String token;
@Value("${spring.influxdb.org}")
private String organization;
@Value("${spring.influxdb.bucket}")
private String bucket;
private InfluxDBClient influxDBClient;
@Bean
public void init() {
// 如果查询的数据量大或者查询语句消耗大则有可能会导致 Influxdb 数据库的查询时间过长
OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder()
.protocols(Collections.singletonList(Protocol.HTTP_1_1))
.connectTimeout(30, TimeUnit.SECONDS) // 连接超时时间
.readTimeout(30, TimeUnit.SECONDS) // 读取超时时间
.writeTimeout(30, TimeUnit.SECONDS); // 写入超时时间
// 创建 InfluxDBClientOptions
InfluxDBClientOptions options = InfluxDBClientOptions.builder()
.url(url) // 设置 InfluxDB 的 URL
.authenticateToken(token.toCharArray()) // 使用 Token 进行身份验证
.org(organization) // 设置组织
.bucket(bucket) // 设置存储桶
.okHttpClient(okHttpClient) // 设置 OkHttpClient
.build();
// 创建 InfluxDBClient 实例
this.influxDBClient = InfluxDBClientFactory.create(options);
}
2.写入数据
代码如下(示例):
// 通过 Map 以 Point 形式写入数据点
public void writePoint(String measurement, Map<String, String> tags, Map<String, Object> fields, Instant timeInstant) {
Point point = Point.measurement(measurement).time(timeInstant, WritePrecision.NS);
point.addTags(tags);
point.addFields(fields);
influxDBClient.makeWriteApi().writePoint(bucket, organization, point);
}
// 通过 实体类 以 Measurement 形式写入数据表
public void writeMeasurement(Object object) {
influxDBClient.makeWriteApi().writeMeasurement(bucket, organization, WritePrecision.NS, object);
}
// 通过 实体类 以 Measurement 形式写入数据表
public void writeMeasurements(List<?> objects) {
influxDBClient.makeWriteApi().writeMeasurements(bucket, organization, WritePrecision.NS, objects);
}
上述写入数据表的实体类必须继承于下面的基类,其中的time是必填项,result是查询influxdb之后出来的一个表名,与 yield 这个相关,比如查询语句中如果存在 |> yield(name:“MqBase”) 这个语句,则查询出来的数据中result就是"MqBase"这个字符串。
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MqBase {
/**
* 查询结果 返回表名
*/
private String result;
/**
* 索引
*/
@Column(tag = true)
private String myTag;
/**
* 时间
*/
@Column(timestamp = true)
private Instant time;
}
3.删除数据
代码如下(示例):
// 通过 start stop measurement tag 删除对应时间的表索引数据
public void deleteMeasurement(LocalDateTime start, LocalDateTime stop, String measurement, String tag) {
OffsetDateTime startTime = ZonedDateTime.of(start, ZoneId.systemDefault()).toOffsetDateTime();
OffsetDateTime endTime = ZonedDateTime.of(stop, ZoneId.systemDefault()).toOffsetDateTime();
String predicate = "_measurement = \"" + measurement + "\"" ;
predicate += " and myTag = \"" + tag + "\"";
influxDBClient.getDeleteApi().delete(startTime, endTime, predicate, bucket, organization);
}
4.查询数据
代码如下(示例):
/**
* influxDB数据库查询主要语句
* 可以使用|> range(start: 2025-01-01T00:00:00Z, stop: 2025-01-08T00:00:00Z)
* 也可以使用|> range(start: 1735660800, stop: 1736265600)
* @param measurement 查询表名
* @param start 查询开始时间
* @param stop 查询结束时间
* @param filter 查询条件 tag
*
* @return influxQuery influxDB数据库查询语句
**/
private String getInfluxQueryString(String measurement, String start, String stop, Map<String, String> filter) {
String influxQuery = "from(bucket: \"" + bucket + "\")";
influxQuery += " |> range(start: " + start;
if (StringUtils.isNotEmpty(stop)) {
influxQuery += ", stop: " + stop;
}
influxQuery += ") |> filter(fn: (r) => r._measurement == \"" + measurement + "\"";
for (Map.Entry<String, String> entry : filter.entrySet()) {
influxQuery += " and r." + entry.getKey() + " == \"" + entry.getValue() + "\"";
}
influxQuery += ")";
return influxQuery;
}
// 查询返回 List<FluxTable> extra 可写 last() first() limit(n: 1) 等等
public List<FluxTable> doQuery(String measurement, String start, String stop, Map<String, String> filter, String extra) {
String influxQuery = getInfluxQueryString(measurement, start, stop, filter);
if (StringUtils.isNotEmpty(extra)) {
influxQuery += " |> " + extra;
}
log.info("Influxdb Query: \n{}", influxQuery.replace("|>", "\n|>"));
return influxDBClient.getQueryApi().query(influxQuery, organization);
}
// 查询返回 List<T> 入参增加 Class<T> clazz
public <T> List<T> doQuery(String measurement, String start, String stop, Map<String, String> filter, String extra, Class<T> clazz) {
String influxQuery = getInfluxQueryString(measurement, start, stop, filter);
// 转换实体类 Class 需要以下语句 pivot (行转列, 将多个 field 合成一个 Record)
influxQuery += " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")";
if (StringUtils.isNotEmpty(extra)) {
influxQuery += " |> " + extra;
}
log.info("Influxdb Query: \n{}", influxQuery.replace("|>", "\n|>"));
return influxDBClient.getQueryApi().query(influxQuery, organization, clazz);
}
// 查询返回 csv 格式的 String 方便直接导出 csv,当然数据量大的话导出不建议使用这个,需要的时间太久了,应该使用单独的java服务在数据库服务器中直接运行命令行导出
public String doQueryRaw(String measurement, String start, String stop, Map<String, String> filter, String extra) {
String influxQuery = getInfluxQueryString(measurement, start, stop, filter);
if (StringUtils.isNotEmpty(extra)) {
influxQuery += " |> " + extra;
}
log.info("Influxdb Query: \n{}", influxQuery.replace("|>", "\n|>"));
return influxDBClient.getQueryApi().queryRaw(influxQuery, organization);
}
5.导出数据
influxdb2 命令行导出命令
D:/influxdb2/influx.exe query 'from(bucket: "bucket") |> range(start: 1735660800, stop: 1736265600) |> filter(fn: (r) => r.myTag== "tag") |> drop(columns: ["_start", "_stop"])' --raw > D:/influxdb2/test.csv
当然如果你想使用命令行导出数据需要先对 influxdb2 的 influx.exe 进行配置。
总结
后续有时间的时候在进行补充吧,今天就先分享这么多,感谢各位大佬的关注与阅读!
更多推荐
所有评论(0)