You can use a Kudu connector to query data in, insert data into, and delete data from Kudu tables.
Background information
This topic describes the following information about a Kudu connector and operations that you can perform:
Prerequisites
A Hadoop cluster that contains the Kudu service and a Trino cluster are created. For more information, see Create a cluster.
Limits
You can use a Kudu connector to connect only to Kudu 1.10 or later.
You must establish a network connection between the Trino cluster and the Hadoop cluster.
The names of Kudu tables and columns can contain only lowercase letters.
Modify the configurations of a Kudu connector
You can modify the configurations of a Kudu connector. For more information, see Configure a connector.
Log on to the E-MapReduce (EMR) console and go to the Configure tab of the Trino service page. On the Configure tab, click kudu.properties. Modify or add the configuration items that are described in the following table based on your business requirements.
Configuration item | Description |
kudu.client.master-addresses | The Kudu master address. If you want to configure multiple Kudu master addresses, separate the addresses with commas (,). The following address formats are supported: example.com, example.com:7051, 192.0.2.1, 192.0.2.1:7051, [2001:db8::1], [2001:db8::1]:7051, and 2001:db8::1. Default value: localhost. Note To ensure that you can write data to and query data from a Kudu table, change the default value localhost to the IP address or hostname of the master node of the Kudu cluster. Example: master-1-1. |
kudu.schema-emulation.enabled | Specifies whether to enable the schema emulation feature. Valid values:
Important You can click Add Configuration Item on the kudu.properties tab to add this configuration item. For more information about how to add a configuration item, see Add configuration items. |
kudu.schema-emulation.prefix | The prefix for the schema emulation feature. Important You must configure this parameter if you set the The standard prefix is |
kudu.client.default-admin-operation-timeout | The default timeout period for administrative operations, such as the operations to create and delete tables. Default value: 30. Unit: seconds. |
kudu.client.default-operation-timeout | The default timeout period for user operations. Default value: 30. Unit: seconds. |
kudu.client.default-socket-read-timeout | The default timeout period for waiting for data from a socket. Default value: 10. Unit: seconds. |
kudu.client.disable-statistics | Specifies whether to enable the Kudu client to collect statistical information. Valid values:
|
Query data
Apache Kudu does not support schemas. However, you can configure a Kudu connector to emulate schemas.
Schema emulation disabled (default)
By default, the schema emulation feature is disabled. In this case, all Kudu tables reside in the default
schema.
For example, you can execute the SELECT * FROM kudu.default.orders
statement to query data from the orders
table. If you specify kudu
as the catalog and default
as the schema, you can execute the SELECT * FROM orders
statement to query data from the orders table.
The name of a Kudu table can contain any characters. If a table name contains a special character, you must enclose the name in a pair of double quotation marks ("). For example, to query data from the special.table!
table, execute the SELECT * FROM kudu.default."special.table!"
statement.
Examples:
Create a table named
users
in thedefault
schema.CREATE TABLE kudu.default.users ( user_id int WITH (primary_key = true), first_name varchar, last_name varchar ) WITH ( partition_by_hash_columns = ARRAY['user_id'], partition_by_hash_buckets = 2 );
NoteWhen you create a table, you must specify the required table information, such as the primary key, the encoding format or compression format of columns, and the hash partitions or range partitions.
View information about the table.
DESCRIBE kudu.default.users;
Information similar to the following output is returned:
Column | Type | Extra | Comment ------------+---------+-------------------------------------------------+--------- user_id | integer | primary_key, encoding=auto, compression=default | first_name | varchar | nullable, encoding=auto, compression=default | last_name | varchar | nullable, encoding=auto, compression=default | (3 rows)
Insert data into the table.
INSERT INTO kudu.default.users VALUES (1, 'Donald', 'Duck'), (2, 'Mickey', 'Mouse');
Query data from the table.
SELECT * FROM kudu.default.users;
Schema emulation enabled
If you enable the schema emulation feature in the kudu.properties configuration file for the Kudu connector in the etc/catalog/ directory, Kudu tables are mapped to schemas based on the naming conventions.
If you configure
kudu.schema-emulation.enabled=true
andkudu.schema-emulation.prefix=
, the mappings listed in the following table prevail.Kudu table name
Presto table name
orders
kudu.default.orders
part1.part2
kudu.part1.part2
x.y.z
kudu.x."y.z"
NoteKudu does not support schemas. Presto creates a special table named
$schemas
to manage schemas.If you configure
kudu.schema-emulation.enabled=true
andkudu.schema-emulation.prefix=presto::
, the mappings listed in the following table prevail.Kudu table name
Presto table name
orders
kudu.default.orders
part1.part2
kudu.default."part1.part2"
x.y.z
kudu.default."x.y.z"
presto::part1.part2
kudu.part1.part2
presto::x.y.z
kudu.x."y.z"
NoteKudu does not support schemas. Presto creates a special table named
presto::$schemas
to manage schemas.
Data type mappings
The following table describes the mappings between Presto data types and Kudu data types.
Presto data type | Kudu data type | Remarks |
BOOLEAN | BOOL | None. |
TINYINT | INT8 | |
SMALLINT | INT16 | |
INTEGER | INT32 | |
BIGINT | INT64 | |
REAL | FLOAT | |
DOUBLE | DOUBLE | |
VARCHAR | STRING | When you execute the |
VARBINARY | BINARY | |
TIMESTAMP | UNIXTIME_MICROS | The precision of a Kudu column of this data type is reduced from µs to ms. |
DECIMAL | DECIMAL | This data type is supported only by Kudu servers of 1.7.0 or a later version. |
DATE | N/A | Kudu does not have a data type that matches this data type. When you execute the |
CHAR | N/A | Kudu does not have a data type that matches this data type. |
TIME | ||
JSON | ||
TIME WITH TIMEZONE | ||
TIMESTAMP WITH TIME ZONE | ||
INTERVAL YEAR TO MO NTH | ||
INTERVAL DAY TO SEC OND | ||
ARRAY | ||
MAP | ||
IPADDRESS |
Supported Presto SQL statements
The ALTER SCHEMA ... RENAME TO ...
statement is not supported.
Statement | Remarks |
| None. |
| None. |
| None. |
| None. |
| This statement is supported only if schema emulation is enabled. |
| This statement is supported only if schema emulation is enabled. |
| For information about how to create a table, see Create a table. |
| None. |
| None. |
| None. |
| For information about how to add a column, see Add a column. |
| These statements are supported only if the column that you want to rename or drop is not a primary key column. |
| |
| None. |
| None. |
| None. |
| None. |
| This statement is equivalent to |
| This statement is used to add a range partition. For more information, see Range partitions. |
| This statement is used to drop a range partition. For more information, see Range partitions. |
Create a table
When you create a table, you must specify columns, data types, and partition information. You can also specify the column encoding format or compression format based on your business requirements. Example:
CREATE TABLE user_events (
user_id int WITH (primary_key = true),
event_name varchar WITH (primary_key = true),
message varchar,
details varchar WITH (nullable = true, encoding = 'plain')
) WITH (
partition_by_hash_columns = ARRAY['user_id'],
partition_by_hash_buckets = 5,
number_of_replicas = 3
);
In this example, user_id
and event_name
are primary key columns. The table is divided into five partitions based on the hash values in the user_id
column. The value of number_of_replicas
is 3.
Take note of the following items when you configure parameters for the CREATE TABLE statement:
Primary key columns must be specified before other columns, and only a primary key column can be configured as a partition key column.
The
number_of_replicas
parameter is optional. This parameter specifies the number of tablet replicas and must be set to an odd number. If you do not configure this parameter, the default replication factor from the Kudu master configuration is used.Kudu supports hash partitions and range partitions. A hash partition distributes rows by hash value to one of many buckets. A range partition distributes rows by using an ordered range partition key. Range partitions must be explicitly created. Kudu supports multi-level partitioning. A table must contain at least one hash or range partition. A table can contain only one range partition but multiple hash partitions.
Column properties
In addition to column names and data types, you can also specify other column properties.
Column property | Data type | Description |
primary_key | BOOLEAN | If this parameter is set to true, the column is used as a primary key column. A Kudu primary key enforces the uniqueness constraint. If you insert a row that has the same primary key value as an existing row, the existing row is updated. For more information, see Primary Key Design. |
nullable | BOOLEAN | If you set this property to true, the column can contain null values. Important A primary key column cannot contain null values. |
encoding | VARCHAR | Specifies the column encoding format to save storage space and improve query performance. If you do not configure this property, Kudu encodes data in the column based on the column data type. Valid values: auto, plain, bitshuffle, runlength, prefix, dictionary, and group_varint. For more information, see Column Encoding. |
compression | VARCHAR | Specifies the column compression format. If you do not configure this parameter, Kudu uses the default compression format. Valid values: default, no, lz4, snappy, and zlib. For more information, see Column compression. |
Example:
CREATE TABLE mytable (
name varchar WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
index bigint WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
comment varchar WITH (nullable = true, encoding = 'plain', compression = 'default'),
...
) WITH (...);
Partition design
A table can contain only one range partition or at least one hash partition.
Define hash partitions
Define one partition group
You can use the table property partition_by_hash_columns to specify partition key columns and use the table property
partition_by_hash_buckets
to specify the number of partitions. The partition key columns must be a subset of primary key columns. Example:CREATE TABLE mytable ( col1 varchar WITH (primary_key=true), col2 varchar WITH (primary_key=true), ... ) WITH ( partition_by_hash_columns = ARRAY['col1', 'col2'], partition_by_hash_buckets = 4 )
NoteIn this example, col1 and col2 columns are defined as hash partition key columns, and data is distributed to four partitions.
Define two partition groups
If you want to define two independent hash partition groups, in addition to the table properties specified in the preceding example, you must specify the table properties
partition_by_second_hash_columns
andpartition_by_second_hash_buckets
. Example:CREATE TABLE mytable ( col1 varchar WITH (primary_key=true), col2 varchar WITH (primary_key=true), ... ) WITH ( partition_by_hash_columns = ARRAY['col1'], partition_by_hash_buckets = 2, partition_by_second_hash_columns = ARRAY['col2'], partition_by_second_hash_buckets = 3 )
NoteIn this example, two hash partition groups are defined. In the first hash partition group, rows are distributed to two partitions based on the col1 column. In the second hash partition group, rows are distributed to three partitions based on the col2 column. In this case, the total number of partitions in the table is 6 (2 × 3).
Define range partitions
A Kudu table can contain only one range partition, which can be defined by using the table property
partition_by_range_columns
. When you create a table, you can use the table propertyrange_partitions
to define the range of the partition. You can use the table propertieskudu.system.add_range_partition
andkudu.system.drop_range_partition
to manage the range partitions of existing tables. Example:CREATE TABLE events ( rack varchar WITH (primary_key=true), machine varchar WITH (primary_key=true), event_time timestamp WITH (primary_key=true), ... ) WITH ( partition_by_hash_columns = ARRAY['rack'], partition_by_hash_buckets = 2, partition_by_second_hash_columns = ARRAY['machine'], partition_by_second_hash_buckets = 3, partition_by_range_columns = ARRAY['event_time'], range_partitions = '[{"lower": null, "upper": "2018-01-01T00:00:00"}, {"lower": "2018-01-01T00:00:00", "upper": null}]' )
NoteIn this example, two hash partition groups and one range partition are defined. The table is range partitioned on the
event_time
field, and data is split based on2018-01-01T00:00:00
.Manage range partitions
You can use a stored procedure to add a range partition to or drop a range partition from an existing table.
Examples:
Add a range partition
CALL kudu.system.add_range_partition(<YOUR_SCHEMA_NAME>, <YOUR_TABLE_NAME>, <range_partition_as_json_string>)
Drop a range partition
CALL kudu.system.drop_range_partition(<YOUR_SCHEMA_NAME>, <YOUR_TABLE_NAME>, <range_partition_as_json_string>)
Parameter
Description
<YOUR_SCHEMA_NAME>
The schema to which the table belongs.
<YOUR_TABLE_NAME>
The name of the table.
<range_partition_as_json_string>
The upper and lower bounds of the range partition. You must configure this parameter in the
'{"lower": <value>, "upper": <value>}'
JSON format. If the partition has multiple columns, you must configure this parameter in the'{"lower": [<value_col1>,...], "upper": [<value_col1>,...]}' format
. The specific value formats of the upper and lower bounds depend on the data types of columns. Mappings between data types and JSON string formats:BIGINT:
'{"lower": 0, "upper": 1000000}'
SMALLINT:
'{"lower": 10, "upper": null}'
VARCHAR:
'{"lower": "A", "upper": "M"}'
TIMESTAMP:
'{"lower": "2018-02-01T00:00:00.000", "upper": "2018-02-01T12:00:00.000"}'
BOOLEAN:
'{"lower": false, "upper": true}'
VARBINARY: Base64-encoded strings
NoteIf you set this parameter to null, the partition is unbounded.
Example:
CALL kudu.system.add_range_partition('myschema', 'events', '{"lower": "2018-01-01", "upper": "2018-06-01"}')
NoteIn this example, a range partition is added to the
events
table in themyschema
schema. The lower bound of the partition is2018-01-01
, whose accurate value is2018-01-01T00:00:00.000
. The upper bound of the partition is2018-06-01
.You can execute the
SHOW CREATE TABLE
statement to query the existing range partition of the table. In the returned results, the table propertyrange_partitions
indicates the partition information of the table.
Add a column
You can execute the ALTER TABLE ... ADD COLUMN ...
statement to add a column to an existing table. You can also use column properties to add columns. For more information about the column properties, see Create a table.
ALTER TABLE mytable ADD COLUMN extraInfo varchar WITH (nullable = true, encoding = 'plain')