This topic describes how to use the Time Series Database (TSDB) for InfluxDB connector.
Background information
TSDB for InfluxDB is a time series database service that can process large numbers of write and query requests. This service is used to store and analyze large amounts of time series data in real time, including DevOps monitoring data, application metric data, and data that is collected from IoT sensors. For more information about TSDB for InfluxDB, see TSDB for InfluxDB.
The following table describes the capabilities supported by the TSDB for InfluxDB connector.
Item | Description |
Table type | Result table |
Running mode | Streaming mode |
Data format | Point |
Metric |
Note For more information about the metrics, see Metrics. |
API type | SQL API |
Data update or deletion in a result table | Not supported |
Prerequisites
A TSDB for InfluxDB database is created. For more information, see Manage user accounts and databases.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.1.5 or later supports the TSDB for InfluxDB connector.
Syntax
CREATE TABLE stream_test_influxdb(
`metric` VARCHAR,
`timestamp` BIGINT,
`tag_value1` VARCHAR,
`field_fieldValue1` DOUBLE
) WITH (
'connector' = 'influxdb',
'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='300',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
Default format for the created table:
Column 0: metric (VARCHAR). This column is required.
Column 1: timestamp (BIGINT). This column is required. Unit: milliseconds.
Column 2: tag_value1 (VARCHAR). This column is required. You must enter at least one value in this column.
Column 3: field_fieldValue1 (DOUBLE). This column is required. You must enter at least one value in this column.
To specify multiple field_fieldValue values, use the following format:
field_fieldValue1 <Data type>, field_fieldValue2 <Data type>, ... field_fieldValueN <Data type>
Example:
field_fieldValue1 DOUBLE, field_fieldValue2 INTEGER, ... field_fieldValueNINTEGER
A TSDB for InfluxDB result table can contain only the following fields: metric, timestamp, tag_*, and field_*.
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
connector | The type of the result table. | Yes | Set the value to influxdb. |
url | The URL of the TSDB for InfluxDB database. | Yes | The URL of a TSDB for InfluxDB database is the virtual private cloud (VPC) endpoint of the TSDB for InfluxDB database. For example, you can set this parameter to https://localhost:8086 or http://localhost:3242. HTTP and HTTPS are supported. |
database | The name of the TSDB for InfluxDB database. | Yes | Example: db-flink. |
username | The username of the account that is used to access the database. | Yes | You must have write permissions on the TSDB for InfluxDB database. For more information about the username, see Manage user accounts and databases. |
password | The password that is used to access the database. | Yes | For more information about the password, see Manage user accounts and databases. |
batchSize | The number of data records that are submitted at the same time. | No | By default, 300 records are submitted at the same time. |
retentionPolicy | The retention policy. | No | If you do not configure this parameter, the default retention policy autogen for each database is used. For more information about the retention policy, see Manage user accounts and databases. |
ignoreErrorData | Specifies whether to ignore abnormal data. | No | Valid values:
|
Data type mappings
Data type of TSDB for InfluxDB | Data type of Flink |
BOOLEAN | BOOLEAN |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
Sample code
CREATE TEMPORARY TABLE datahub_source(
`metric` VARCHAR,
`timestamp` BIGINT,
`filedvalue` DOUBLE,
`tagvalue` VARCHAR
) WITH (
'connector' = 'datagen',
'fields.metric.length' = '3',
'fields.tagvalue.length' = '3',
'fields.timestamp.min' = '1587539547000',
'fields.timestamp.max' = '1619075547000',
'fields.filedvalue.min' = '1',
'fields.filedvalue.max' = '100000',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE influxdb_sink(
`metric` VARCHAR,
`timestamp` BIGINT,
`field_fieldValue1` DOUBLE,
`tag_value1` VARCHAR
) WITH (
'connector' = 'influxdb',
'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='100',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
INSERT INTO influxdb_sink
SELECT
`metric`,
`timestamp`,
`filedvalue`,
`tagvalue`
FROM datahub_source;