本文介紹通過Java Native SDK串連並使用Lindorm時序引擎的方法。
前提條件
準備工作
通過Java Native SDK串連Lindorm時序引擎前,需要安裝Java Native SDK。以1.0.0版本為例,您可以通過以下三種方式安裝Java Native SDK:
(推薦)在Maven專案中使用Lindorm TSDB Java SDK。在pom.xml檔案的
dependencies
中添加以下依賴項。<dependency> <groupId>com.aliyun.lindorm</groupId> <artifactId>lindorm-tsdb-client</artifactId> <version>1.0.4</version> </dependency>
說明Lindorm TSDB Java SDK提供了一個基於Maven的樣本工程,您可以直接下載樣本工程並在本地編譯和運行,也可以以樣本工程為基礎開發您的專案工程。
在Eclipse專案中匯入JAR包。
下載Java SDK開發包。
解壓下載的Java SDK開發包。
將解壓後JAR包添加至Eclipse專案中。
在Eclipse中開啟您的專案,按右鍵該專案,選擇Properties。
在彈出的對話方塊中,單擊lindorm-tsdb-client-1.0.0.jar和lib檔案中的JAR包。
,選擇解壓後的單擊Apply and Close。
在IntelliJ IDEA專案中匯入JAR包。
下載Java SDK開發包。
解壓下載的Java SDK開發包。
將解壓後JAR包添加至IntelliJ IDEA專案中。
在IntelliJ IDEA中開啟您的專案,在功能表列單擊
。在Project Structure對話方塊的左邊選擇
。單擊右邊,選擇JARs or directories。
在彈出的對話方塊中,選擇解壓後的lindorm-tsdb-client-1.0.0.jar和lib檔案中的JAR包,並單擊OK。
單擊Apply。
單擊OK。
Lindorm時序引擎的Java Native SDK各版本可以通過Maven中央倉庫擷取,更多資訊請參見Maven Repository。
Lindorm時序引擎的Java Native SDK各版本說明請參見版本說明。
操作步驟
建立資料庫執行個體。建立LindormTSDBClient時,需要指定Lindorm時序引擎的串連地址,擷取方法請參見擷取串連串。
String url = "http://ld-bp17j28j2y7pm****-proxy-tsdb-pub.lindorm.rds.aliyuncs.com:8242"; // LindormTSDBClient安全執行緒,可以重複使用,無需頻繁建立和銷毀 ClientOptions options = ClientOptions.newBuilder(url).build(); LindormTSDBClient lindormTSDBClient = LindormTSDBFactory.connect(options);
建立資料庫demo和時序表sensor。關於建立資料庫和時序表的SQL語句說明,請參見CREATE DATABASE和CREATE TABLE。
lindormTSDBClient.execute("CREATE DATABASE demo"); lindormTSDBClient.execute("demo","CREATE TABLE sensor (device_id VARCHAR TAG,region VARCHAR TAG,time BIGINT,temperature DOUBLE,humidity DOUBLE,PRIMARY KEY(device_id))");
在表中寫入資料。
說明預設情況下,為了提高寫入資料的效能,LindormTSDBClient通過非同步攢批的方式進行資料寫入。如果需要通過同步的方式進行資料寫入,可以調用
write()
方法返回的CompletableFuture<WriteResult>
的join()
方法。int numRecords = 10; List<Record> records = new ArrayList<>(numRecords); long currentTime = System.currentTimeMillis(); for (int i = 0; i < numRecords; i++) { Record record = Record .table("sensor") .time(currentTime + i * 1000) .tag("device_id", "F07A1260") .tag("region", "north-cn") .addField("temperature", 12.1 + i) .addField("humidity", 45.0 + i) .build(); records.add(record); } CompletableFuture<WriteResult> future = lindormTSDBClient.write("demo", records); // 處理非同步寫入結果 future.whenComplete((r, ex) -> { // 處理寫入失敗 if (ex != null) { System.out.println("Failed to write."); if (ex instanceof LindormTSDBException) { LindormTSDBException e = (LindormTSDBException) ex; System.out.println("Caught an LindormTSDBException, which means your request made it to Lindorm TSDB, " + "but was rejected with an error response for some reason."); System.out.println("Error Code: " + e.getCode()); System.out.println("SQL State: " + e.getSqlstate()); System.out.println("Error Message: " + e.getMessage()); } else { ex.printStackTrace(); } } else { System.out.println("Write successfully."); } }); // 這裡作為樣本, 簡單同步處理寫入結果 System.out.println(future.join());
查詢時序表的資料。關於查詢操作的SQL語句說明,請參見基本查詢。
String sql = "select * from sensor limit 10"; ResultSet resultSet = lindormTSDBClient.query("demo", sql); try { // 處理查詢結果 QueryResult result = null; // 查詢結果使用分批的方式返回,預設每批1000行 // 當resultSet的next()方法返回為null,表示已經讀取完所有的查詢結果 while ((result = resultSet.next()) != null) { List<String> columns = result.getColumns(); System.out.println("columns: " + columns); List<String> metadata = result.getMetadata(); System.out.println("metadata: " + metadata); List<List<Object>> rows = result.getRows(); for (int i = 0, size = rows.size(); i < size; i++) { List<Object> row = rows.get(i); System.out.println("row #" + i + " : " + row); } } } finally { // 查詢結束後,需確保調用ResultSet的close方法,以釋放IO資源 resultSet.close(); }
完整的程式碼範例
import com.aliyun.lindorm.tsdb.client.ClientOptions;
import com.aliyun.lindorm.tsdb.client.LindormTSDBClient;
import com.aliyun.lindorm.tsdb.client.LindormTSDBFactory;
import com.aliyun.lindorm.tsdb.client.exception.LindormTSDBException;
import com.aliyun.lindorm.tsdb.client.model.QueryResult;
import com.aliyun.lindorm.tsdb.client.model.Record;
import com.aliyun.lindorm.tsdb.client.model.ResultSet;
import com.aliyun.lindorm.tsdb.client.model.WriteResult;
import com.aliyun.lindorm.tsdb.client.utils.ExceptionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class QuickStart {
public static void main(String[] args) {
// 1.建立用戶端執行個體
String url = "http://ld-xxxx-proxy-tsdb-pub.lindorm.rds.aliyuncs.com:8242";
// LindormTSDBClient安全執行緒,可以重複使用,無需頻繁建立和銷毀
ClientOptions options = ClientOptions.newBuilder(url).build();
LindormTSDBClient lindormTSDBClient = LindormTSDBFactory.connect(options);
// 2.建立資料庫demo和表sensor
lindormTSDBClient.execute("CREATE DATABASE demo");
lindormTSDBClient.execute("demo","CREATE TABLE sensor (device_id VARCHAR TAG,region VARCHAR TAG,time BIGINT,temperature DOUBLE,humidity DOUBLE,PRIMARY KEY(device_id))");
// 3.寫入資料
int numRecords = 10;
List<Record> records = new ArrayList<>(numRecords);
long currentTime = System.currentTimeMillis();
for (int i = 0; i < numRecords; i++) {
Record record = Record
.table("sensor")
.time(currentTime + i * 1000)
.tag("device_id", "F07A1260")
.tag("region", "north-cn")
.addField("temperature", 12.1 + i)
.addField("humidity", 45.0 + i)
.build();
records.add(record);
}
CompletableFuture<WriteResult> future = lindormTSDBClient.write("demo", records);
// 處理非同步寫入結果
future.whenComplete((r, ex) -> {
// 處理寫入失敗
if (ex != null) {
System.out.println("Failed to write.");
Throwable throwable = ExceptionUtils.getRootCause(ex);
if (throwable instanceof LindormTSDBException) {
LindormTSDBException e = (LindormTSDBException) throwable;
System.out.println("Caught an LindormTSDBException, which means your request made it to Lindorm TSDB, "
+ "but was rejected with an error response for some reason.");
System.out.println("Error Code: " + e.getCode());
System.out.println("SQL State: " + e.getSqlstate());
System.out.println("Error Message: " + e.getMessage());
} else {
throwable.printStackTrace();
}
} else {
System.out.println("Write successfully.");
}
});
// 這裡作為樣本, 簡單同步等待
System.out.println(future.join());
// 4.查詢資料
String sql = "select * from sensor limit 10";
ResultSet resultSet = lindormTSDBClient.query("demo", sql);
try {
// 處理查詢結果
QueryResult result = null;
// 查詢結果使用分批的方式返回,預設每批1000行
// 當resultSet的next()方法返回為null,表示已經讀取完所有的查詢結果
while ((result = resultSet.next()) != null) {
List<String> columns = result.getColumns();
System.out.println("columns: " + columns);
List<String> metadata = result.getMetadata();
System.out.println("metadata: " + metadata);
List<List<Object>> rows = result.getRows();
for (int i = 0, size = rows.size(); i < size; i++) {
List<Object> row = rows.get(i);
System.out.println("row #" + i + " : " + row);
}
}
} finally {
// 查詢結束後,需確保調用ResultSet的close方法,以釋放IO資源
resultSet.close();
}
lindormTSDBClient.shutdown();
}
}