全部產品
Search
文件中心

Realtime Compute for Apache Flink:時序資料庫InfluxDB

更新時間:Jul 13, 2024

本文為您介紹如何使用時序資料庫InfluxDB連接器。

背景資訊

時序資料庫InfluxDB®版是一款專門處理高寫入和查詢負載的時序資料庫,用於儲存大規模的時序資料並進行即時分析,包括來自DevOps監控、應用指標和IoT感應器上的資料。時序資料庫InfluxDB®版詳情請參見InfluxDB®️介紹

InfluxDB連接器支援的資訊如下。

類別

詳情

支援類型

結果表

運行模式

流模式

資料格式

Point

特有監控指標

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

說明

指標含義詳情,請參見監控指標說明

API種類

SQL

是否支援更新或刪除結果表資料

前提條件

已建立InfluxDB的資料庫,詳情請參見系統管理使用者帳號和資料庫

使用限制

僅Flink計算引擎VVR 2.1.5及以上版本支援InfluxDB Connector。

文法結構

CREATE TABLE stream_test_influxdb(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `tag_value1` VARCHAR,
 `field_fieldValue1` DOUBLE
) WITH (
  'connector' = 'influxdb',
  'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='300',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

建表預設格式:

  • 第0列:metric(VARCHAR),必填。

  • 第1列:timestamp(BIGINT),必填,單位為毫秒。

  • 第2列:tag_value1(VARCHAR),必填,最少填寫一個。

  • 第3列:field_fieldValue1(DOUBLE),必填,最少填寫一個。

    寫入多個field_fieldValue時,您需要按照如下格式填寫。

    field_fieldValue1 類型,
    field_fieldValue2 類型,
    ...  
    field_fieldValueN 類型

    樣本如下。

    field_fieldValue1 DOUBLE,
    field_fieldValue2 INTEGER,
    ...   
    field_fieldValueNINTEGER
說明

結果表中只支援metrictimestamptag_*field_*,不能出現其他的欄位。

WITH參數

參數

說明

是否必填

備忘

connector

結果表類型。

固定值為influxdb。

url

InfluxDB的服務地址。

在InfluxDB中,URL為VPC網路地址,例如:https://localhost:8086http://localhost:3242。

URL支援HTTP和HTTPS。

database

InfluxDB的資料庫名稱。

例如db-flink。

username

資料庫的使用者名稱。

需要對目標資料庫有寫入權限。使用者名稱詳情請參見系統管理使用者帳號和資料庫

password

資料庫的密碼。

密碼詳情請參見系統管理使用者帳號和資料庫

batchSize

批量提交的記錄條數。

預設每次批量提交300條記錄。

retentionPolicy

保留原則。

如果您不配置該參數時,該參數會被預設填寫為每個資料庫的預設保留原則autogen,保留原則詳情請參見系統管理使用者帳號和資料庫

ignoreErrorData

是否忽略異常資料。

參數取值如下:

  • true:忽略異常資料。

  • false(預設值):不忽略異常資料。

類型映射

InfluxDB欄位類型

Flink版欄位類型

BOOLEAN

BOOLEAN

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DECIMAL

DECIMAL

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

使用樣本

CREATE TEMPORARY TABLE datahub_source(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `filedvalue` DOUBLE,
 `tagvalue` VARCHAR
) WITH (
  'connector' = 'datagen',
  'fields.metric.length' = '3',
  'fields.tagvalue.length' = '3',
  'fields.timestamp.min' = '1587539547000',
  'fields.timestamp.max' = '1619075547000',
  'fields.filedvalue.min' = '1',
  'fields.filedvalue.max' = '100000',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE influxdb_sink(
  `metric` VARCHAR,
  `timestamp` BIGINT,
  `field_fieldValue1` DOUBLE,
  `tag_value1` VARCHAR
) WITH (
  'connector' = 'influxdb',
  'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='100',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

INSERT INTO influxdb_sink
SELECT 
  `metric`,
  `timestamp`,
  `filedvalue`,
  `tagvalue`
FROM datahub_source;