本文為您介紹如何使用時序資料庫InfluxDB連接器。
背景資訊
時序資料庫InfluxDB®版是一款專門處理高寫入和查詢負載的時序資料庫,用於儲存大規模的時序資料並進行即時分析,包括來自DevOps監控、應用指標和IoT感應器上的資料。時序資料庫InfluxDB®版詳情請參見InfluxDB®️介紹。
InfluxDB連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 結果表 |
運行模式 | 流模式 |
資料格式 | Point |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 否 |
前提條件
已建立InfluxDB的資料庫,詳情請參見系統管理使用者帳號和資料庫。
使用限制
僅Flink計算引擎VVR 2.1.5及以上版本支援InfluxDB Connector。
文法結構
CREATE TABLE stream_test_influxdb(
`metric` VARCHAR,
`timestamp` BIGINT,
`tag_value1` VARCHAR,
`field_fieldValue1` DOUBLE
) WITH (
'connector' = 'influxdb',
'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='300',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
建表預設格式:
第0列:metric(VARCHAR),必填。
第1列:timestamp(BIGINT),必填,單位為毫秒。
第2列:tag_value1(VARCHAR),必填,最少填寫一個。
第3列:field_fieldValue1(DOUBLE),必填,最少填寫一個。
寫入多個field_fieldValue時,您需要按照如下格式填寫。
field_fieldValue1 類型, field_fieldValue2 類型, ... field_fieldValueN 類型
樣本如下。
field_fieldValue1 DOUBLE, field_fieldValue2 INTEGER, ... field_fieldValueNINTEGER
結果表中只支援metric、timestamp、tag_*和field_*,不能出現其他的欄位。
WITH參數
參數 | 說明 | 是否必填 | 備忘 |
connector | 結果表類型。 | 是 | 固定值為influxdb。 |
url | InfluxDB的服務地址。 | 是 | 在InfluxDB中,URL為VPC網路地址,例如:https://localhost:8086或http://localhost:3242。 URL支援HTTP和HTTPS。 |
database | InfluxDB的資料庫名稱。 | 是 | 例如db-flink。 |
username | 資料庫的使用者名稱。 | 是 | 需要對目標資料庫有寫入權限。使用者名稱詳情請參見系統管理使用者帳號和資料庫。 |
password | 資料庫的密碼。 | 是 | 密碼詳情請參見系統管理使用者帳號和資料庫。 |
batchSize | 批量提交的記錄條數。 | 否 | 預設每次批量提交300條記錄。 |
retentionPolicy | 保留原則。 | 否 | 如果您不配置該參數時,該參數會被預設填寫為每個資料庫的預設保留原則autogen,保留原則詳情請參見系統管理使用者帳號和資料庫。 |
ignoreErrorData | 是否忽略異常資料。 | 否 | 參數取值如下:
|
類型映射
InfluxDB欄位類型 | Flink版欄位類型 |
BOOLEAN | BOOLEAN |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
使用樣本
CREATE TEMPORARY TABLE datahub_source(
`metric` VARCHAR,
`timestamp` BIGINT,
`filedvalue` DOUBLE,
`tagvalue` VARCHAR
) WITH (
'connector' = 'datagen',
'fields.metric.length' = '3',
'fields.tagvalue.length' = '3',
'fields.timestamp.min' = '1587539547000',
'fields.timestamp.max' = '1619075547000',
'fields.filedvalue.min' = '1',
'fields.filedvalue.max' = '100000',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE influxdb_sink(
`metric` VARCHAR,
`timestamp` BIGINT,
`field_fieldValue1` DOUBLE,
`tag_value1` VARCHAR
) WITH (
'connector' = 'influxdb',
'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='100',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
INSERT INTO influxdb_sink
SELECT
`metric`,
`timestamp`,
`filedvalue`,
`tagvalue`
FROM datahub_source;