本文介绍通过Java Native SDK连接并使用Lindorm时序引擎的方法。
前提条件
准备工作
通过Java Native SDK连接Lindorm时序引擎前,需要安装Java Native SDK。以1.0.0版本为例,您可以通过以下三种方式安装Java Native SDK:
(推荐)在Maven项目中使用Lindorm TSDB Java SDK。在pom.xml文件的
dependencies
中添加以下依赖项。<dependency> <groupId>com.aliyun.lindorm</groupId> <artifactId>lindorm-tsdb-client</artifactId> <version>1.0.4</version> </dependency>
说明Lindorm TSDB Java SDK提供了一个基于Maven的示例工程,您可以直接下载示例工程并在本地编译和运行,也可以以示例工程为基础开发您的项目工程。
在Eclipse项目中导入JAR包。
下载Java SDK开发包。
解压下载的Java SDK开发包。
将解压后JAR包添加至Eclipse项目中。
在Eclipse中打开您的项目,右键单击该项目,选择Properties。
在弹出的对话框中,单击lindorm-tsdb-client-1.0.0.jar和lib文件中的JAR包。
,选择解压后的单击Apply and Close。
在IntelliJ IDEA项目中导入JAR包。
下载Java SDK开发包。
解压下载的Java SDK开发包。
将解压后JAR包添加至IntelliJ IDEA项目中。
在IntelliJ IDEA中打开您的项目,在菜单栏单击
。在Project Structure对话框的左边选择
。单击右边,选择JARs or directories。
在弹出的对话框中,选择解压后的lindorm-tsdb-client-1.0.0.jar和lib文件中的JAR包,并单击OK。
单击Apply。
单击OK。
Lindorm时序引擎的Java Native SDK各版本可以通过Maven中央仓库获取,更多信息请参见Maven Repository。
Lindorm时序引擎的Java Native SDK各版本说明请参见版本说明。
操作步骤
创建数据库实例。新建LindormTSDBClient时,需要指定Lindorm时序引擎的连接地址,获取方法请参见获取连接串。
String url = "http://ld-bp17j28j2y7pm****-proxy-tsdb-pub.lindorm.rds.aliyuncs.com:8242"; // LindormTSDBClient线程安全,可以重复使用,无需频繁创建和销毁 ClientOptions options = ClientOptions.newBuilder(url).build(); LindormTSDBClient lindormTSDBClient = LindormTSDBFactory.connect(options);
创建数据库demo和时序表sensor。关于创建数据库和时序表的SQL语句说明,请参见CREATE DATABASE和CREATE TABLE。
lindormTSDBClient.execute("CREATE DATABASE demo"); lindormTSDBClient.execute("demo","CREATE TABLE sensor (device_id VARCHAR TAG,region VARCHAR TAG,time BIGINT,temperature DOUBLE,humidity DOUBLE,PRIMARY KEY(device_id))");
在表中写入数据。
说明默认情况下,为了提高写入数据的性能,LindormTSDBClient通过异步攒批的方式进行数据写入。如果需要通过同步的方式进行数据写入,可以调用
write()
方法返回的CompletableFuture<WriteResult>
的join()
方法。int numRecords = 10; List<Record> records = new ArrayList<>(numRecords); long currentTime = System.currentTimeMillis(); for (int i = 0; i < numRecords; i++) { Record record = Record .table("sensor") .time(currentTime + i * 1000) .tag("device_id", "F07A1260") .tag("region", "north-cn") .addField("temperature", 12.1 + i) .addField("humidity", 45.0 + i) .build(); records.add(record); } CompletableFuture<WriteResult> future = lindormTSDBClient.write("demo", records); // 处理异步写入结果 future.whenComplete((r, ex) -> { // 处理写入失败 if (ex != null) { System.out.println("Failed to write."); if (ex instanceof LindormTSDBException) { LindormTSDBException e = (LindormTSDBException) ex; System.out.println("Caught an LindormTSDBException, which means your request made it to Lindorm TSDB, " + "but was rejected with an error response for some reason."); System.out.println("Error Code: " + e.getCode()); System.out.println("SQL State: " + e.getSqlstate()); System.out.println("Error Message: " + e.getMessage()); } else { ex.printStackTrace(); } } else { System.out.println("Write successfully."); } }); // 这里作为示例, 简单同步处理写入结果 System.out.println(future.join());
查询时序表的数据。关于查询操作的SQL语句说明,请参见基本查询。
String sql = "select * from sensor limit 10"; ResultSet resultSet = lindormTSDBClient.query("demo", sql); try { // 处理查询结果 QueryResult result = null; // 查询结果使用分批的方式返回,默认每批1000行 // 当resultSet的next()方法返回为null,表示已经读取完所有的查询结果 while ((result = resultSet.next()) != null) { List<String> columns = result.getColumns(); System.out.println("columns: " + columns); List<String> metadata = result.getMetadata(); System.out.println("metadata: " + metadata); List<List<Object>> rows = result.getRows(); for (int i = 0, size = rows.size(); i < size; i++) { List<Object> row = rows.get(i); System.out.println("row #" + i + " : " + row); } } } finally { // 查询结束后,需确保调用ResultSet的close方法,以释放IO资源 resultSet.close(); }
完整的代码示例
import com.aliyun.lindorm.tsdb.client.ClientOptions;
import com.aliyun.lindorm.tsdb.client.LindormTSDBClient;
import com.aliyun.lindorm.tsdb.client.LindormTSDBFactory;
import com.aliyun.lindorm.tsdb.client.exception.LindormTSDBException;
import com.aliyun.lindorm.tsdb.client.model.QueryResult;
import com.aliyun.lindorm.tsdb.client.model.Record;
import com.aliyun.lindorm.tsdb.client.model.ResultSet;
import com.aliyun.lindorm.tsdb.client.model.WriteResult;
import com.aliyun.lindorm.tsdb.client.utils.ExceptionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class QuickStart {
public static void main(String[] args) {
// 1.创建客户端实例
String url = "http://ld-xxxx-proxy-tsdb-pub.lindorm.rds.aliyuncs.com:8242";
// LindormTSDBClient线程安全,可以重复使用,无需频繁创建和销毁
ClientOptions options = ClientOptions.newBuilder(url).build();
LindormTSDBClient lindormTSDBClient = LindormTSDBFactory.connect(options);
// 2.创建数据库demo和表sensor
lindormTSDBClient.execute("CREATE DATABASE demo");
lindormTSDBClient.execute("demo","CREATE TABLE sensor (device_id VARCHAR TAG,region VARCHAR TAG,time BIGINT,temperature DOUBLE,humidity DOUBLE,PRIMARY KEY(device_id))");
// 3.写入数据
int numRecords = 10;
List<Record> records = new ArrayList<>(numRecords);
long currentTime = System.currentTimeMillis();
for (int i = 0; i < numRecords; i++) {
Record record = Record
.table("sensor")
.time(currentTime + i * 1000)
.tag("device_id", "F07A1260")
.tag("region", "north-cn")
.addField("temperature", 12.1 + i)
.addField("humidity", 45.0 + i)
.build();
records.add(record);
}
CompletableFuture<WriteResult> future = lindormTSDBClient.write("demo", records);
// 处理异步写入结果
future.whenComplete((r, ex) -> {
// 处理写入失败
if (ex != null) {
System.out.println("Failed to write.");
Throwable throwable = ExceptionUtils.getRootCause(ex);
if (throwable instanceof LindormTSDBException) {
LindormTSDBException e = (LindormTSDBException) throwable;
System.out.println("Caught an LindormTSDBException, which means your request made it to Lindorm TSDB, "
+ "but was rejected with an error response for some reason.");
System.out.println("Error Code: " + e.getCode());
System.out.println("SQL State: " + e.getSqlstate());
System.out.println("Error Message: " + e.getMessage());
} else {
throwable.printStackTrace();
}
} else {
System.out.println("Write successfully.");
}
});
// 这里作为示例, 简单同步等待
System.out.println(future.join());
// 4.查询数据
String sql = "select * from sensor limit 10";
ResultSet resultSet = lindormTSDBClient.query("demo", sql);
try {
// 处理查询结果
QueryResult result = null;
// 查询结果使用分批的方式返回,默认每批1000行
// 当resultSet的next()方法返回为null,表示已经读取完所有的查询结果
while ((result = resultSet.next()) != null) {
List<String> columns = result.getColumns();
System.out.println("columns: " + columns);
List<String> metadata = result.getMetadata();
System.out.println("metadata: " + metadata);
List<List<Object>> rows = result.getRows();
for (int i = 0, size = rows.size(); i < size; i++) {
List<Object> row = rows.get(i);
System.out.println("row #" + i + " : " + row);
}
}
} finally {
// 查询结束后,需确保调用ResultSet的close方法,以释放IO资源
resultSet.close();
}
lindormTSDBClient.shutdown();
}
}