云原生数据仓库 AnalyticDB MySQL 版支持通过外表导入导出数据。本文介绍如何通过外表查询HDFS数据,并将HDFS、AWS S3、Azure Blob Storage或Google Cloud Storage数据导入至AnalyticDB for MySQL。
前提条件
AnalyticDB for MySQL集群内核版本需为3.1.4及以上。
说明请在云原生数据仓库AnalyticDB MySQL控制台集群信息页面,配置信息区域,查看和升级内核版本。
HDFS数据文件格式需为CSV、Parquet或ORC。
已创建HDFS集群并在HDFS文件夹中准备需要导入的数据,本文示例中所用文件夹为
hdfs_import_test_data.csv。已在HDFS集群中为AnalyticDB for MySQL集群配置如下服务访问端口:
namenode:用于读写文件系统元信息。您可以在fs.defaultFS参数中配置端口号,默认端口号为8020。详细配置方式,请参见core-default.xml。
datanode:用于读写数据。您可以在dfs.datanode.address参数中配置端口号,默认端口号为50010。详细配置方式,请参见hdfs-default.xml。
AnalyticDB for MySQL数仓版弹性模式已开启ENI访问。
重要登录云原生数据仓库AnalyticDB MySQL控制台,在集群信息页面的网络信息区域,打开ENI网络开关。
开启和关闭ENI网络会导致数据库连接中断大约2分钟,无法读写。请谨慎评估影响后再开启或关闭ENI网络。
操作步骤
(可选)配置公网访问能力。
当您需要通过外表连接其他云厂商的对象存储(AWS S3、Azure Blob Storage或Google Cloud Storage)时,需确保AnalyticDB for MySQL集群具备公网访问能力。
为AnalyticDB for MySQL集群所在的VPC配置NAT网关和弹性公网IP(EIP)。
公网NAT网关需要与AnalyticDB for MySQL实例为同一个地域。
推荐按交换机粒度创建SNAT条目,指定任意交换机即可。
为AnalyticDB for MySQL集群开启ENI访问。
重要登录云原生数据仓库AnalyticDB MySQL控制台,在集群信息页面的网络信息区域,打开ENI网络开关。
开启和关闭ENI网络会导致数据库连接中断大约2分钟,无法读写。请谨慎评估影响后再开启或关闭ENI网络。
创建目标数据库。本示例中,AnalyticDB for MySQL集群的目标库名为
adb_demo。CREATE DATABASE IF NOT EXISTS adb_demo;使用
CREATE TABLE语句在目标库adb_demo中创建CSV、Parquet或ORC格式的外表。使用HDFS数据源:
连接其他云厂商的对象存储(AWS S3、Azure Blob Storage或Google Cloud Storage):创建外部云存储外表。
创建目标表。
您可以使用以下语句在目标数据库
adb_demo中创建一张目标表,用于存储从HDFS导入的数据:创建普通外表对应的目标表(本文示例中目标表名为
adb_hdfs_import_test),语法如下。CREATE TABLE IF NOT EXISTS adb_hdfs_import_test ( uid string, other string ) DISTRIBUTED BY HASH(uid);创建带分区外表对应的目标表时(本文示例中目标表名为
adb_hdfs_import_parquet_partition),需要同时在创建语句中定义普通列(如uid和other)和分区列(如p1、p2和p3),语法如下。CREATE TABLE IF NOT EXISTS adb_hdfs_import_parquet_partition ( uid string, other string, p1 date, p2 int, p3 varchar ) DISTRIBUTED BY HASH(uid);
将HDFS中的数据导入至目标AnalyticDB for MySQL集群中。
您可以根据业务需要选择如下几种方式导入数据(分区表导入数据语法与普通表一致,如下示例中以普通表为例):
(推荐)方式一:使用
INSERT OVERWRITE导入数据。数据批量导入,性能好。导入成功后数据可见,导入失败数据会回滚,示例如下。INSERT OVERWRITE adb_hdfs_import_test SELECT * FROM hdfs_import_test_external_table;方式二:使用
INSERT INTO导入数据。数据插入实时可查,数据量较小时使用,示例如下。INSERT INTO adb_hdfs_import_test SELECT * FROM hdfs_import_test_external_table;方式三:异步执行导入数据,示例如下。
SUBMIT JOB INSERT OVERWRITE adb_hdfs_import_test SELECT * FROM hdfs_import_test_external_table;返回结果如下。
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+您还可以根据上述
job_id查看异步任务的状态,更多详情,请参见异步提交导入任务。
后续步骤
导入完成后,您可以登录AnalyticDB for MySQL的目标库adb_demo中,执行如下语句查看并验证源表数据是否成功导入至目标表adb_hdfs_import_test中:
SELECT * FROM adb_hdfs_import_test LIMIT 100;创建HDFS外表
创建文件格式为CSV的外表
语句如下:
CREATE TABLE IF NOT EXISTS hdfs_import_test_external_table ( uid string, other string ) ENGINE='HDFS' TABLE_PROPERTIES='{ "format":"csv", "delimiter":",", "hdfs_url":"hdfs://172.17.***.***:9000/adb/hdfs_import_test_csv_data/hdfs_import_test_data.csv" }';参数
是否必填
说明
ENGINE='HDFS'必填
外表的存储引擎说明。本示例使用的存储引擎为HDFS。
TABLE_PROPERTIESAnalyticDB for MySQL访问HDFS数据的方式。
format数据文件的格式。创建CSV格式文件的外表时需设置为
csv。delimiter定义CSV数据文件的列分隔符。本示例使用的分隔符为英文逗号(,)。
hdfs_urlHDFS集群中目标数据文件或文件夹的绝对地址,需要以
hdfs://开头。示例:
hdfs://172.17.***.***:9000/adb/hdfs_import_test_csv_data/hdfs_import_test_data.csvpartition_column选填
定义外表的分区列,用英文逗号(,)切分各列。定义分区列的方法,请参见创建带分区的HDFS外表。
compress_type定义数据文件的压缩类型,CSV格式的文件目前仅支持Gzip压缩类型。
skip_header_line_count定义导入数据时需要在开头跳过的行数。CSV文件第一行为表头,若设置该参数为1,导入数据时可自动跳过第一行的表头。
默认为0,即不跳过。
hdfs_ha_host_port如果HDFS集群配置了HA功能,创建外表时需配置
hdfs_ha_host_port参数,格式为ip1:port1,ip2:port2,参数中的IP与Port是主备namenode的IP与Port。示例:
192.168.xx.xx:8020,192.168.xx.xx:8021创建HDFS Parquet格式/HDFS ORC格式的外表
以Parquet格式为例,创建HDFS外表语句如下:
CREATE TABLE IF NOT EXISTS hdfs_import_test_external_table ( uid string, other string ) ENGINE='HDFS' TABLE_PROPERTIES='{ "format":"parquet", "hdfs_url":"hdfs://172.17.***.***:9000/adb/hdfs_import_test_parquet_data/" }';参数
是否必填
说明
ENGINE='HDFS'必填
外表的存储引擎说明。本示例使用的存储引擎为HDFS。
TABLE_PROPERTIESAnalyticDB for MySQL访问HDFS数据的方式。
format数据文件的格式。
创建Parquet格式文件的外表时需设置为
parquet。创建ORC格式文件的外表时需设置为
orc。
hdfs_urlHDFS集群中目标数据文件或文件夹的绝对地址,需要以
hdfs://开头。partition_column选填
定义表的分区列,用英文逗号(,)切分各列。定义分区列的方法,请参见创建带分区的HDFS外表。
hdfs_ha_host_port如果HDFS集群配置了HA功能,创建外表时需配置
hdfs_ha_host_port参数,格式为ip1:port1,ip2:port2,参数中的IP与Port是主备namenode的IP与Port。示例:
192.168.xx.xx:8020,192.168.xx.xx:8021说明外表创建语句中的列名需与Parquet或ORC文件中该列的名称完全相同(可忽略大小写),且列的顺序需要一致。
创建外表时,可以仅选择Parquet或ORC文件中的部分列作为外表中的列,未被选择的列不会被导入。
如果创建外表创建语句中出现了Parquet或ORC文件中不存在的列,针对该列的查询结果均会返回NULL。
Parquet文件与AnalyticDB for MySQL的数据类型映射关系
Parquet基本类型
Parquet的logicalType类型
AnalyticDB for MySQL的数据类型
BOOLEAN
无
BOOLEAN
INT32
INT_8
TINYINT
INT32
INT_16
SMALLINT
INT32
无
INT或INTEGER
INT64
无
BIGINT
FLOAT
无
FLOAT
DOUBLE
无
DOUBLE
FIXED_LEN_BYTE_ARRAY
BINARY
INT64
INT32
DECIMAL
DECIMAL
BINARY
UTF-8
VARCHAR
STRING
JSON(如果已知Parquet该列内容为JSON格式)
INT32
DATE
DATE
INT64
TIMESTAMP_MILLIS
TIMESTAMP或DATETIME
INT96
无
TIMESTAMP或DATETIME
重要Parquet格式外表暂不支持
STRUCT类型,会导致建表失败。ORC文件与AnalyticDB for MySQL的数据类型映射关系
ORC文件中的数据类型
AnalyticDB for MySQL中的数据类型
BOOLEAN
BOOLEAN
BYTE
TINYINT
SHORT
SMALLINT
INT
INT或INTEGER
LONG
BIGINT
DECIMAL
DECIMAL
FLOAT
FLOAT
DOUBLE
DOUBLE
BINARY
STRING
VARCHAR
VARCHAR
STRING
JSON(如果已知ORC该列内容为JSON格式)
TIMESTAMP
TIMESTAMP或DATETIME
DATE
DATE
重要ORC格式外表暂不支持
LIST、STRUCT和UNION等复合类型,会导致建表失败。ORC格式外表的列使用MAP类型可以建表,但ORC的查询会失败。
创建带分区的HDFS外表
HDFS支持对Parquet、CSV和ORC文件格式的数据进行分区,包含分区的数据会在HDFS上形成一个分层目录。在下方示例中,p1为第1级分区,p2为第2级分区,p3为第3级分区:
parquet_partition_classic/
├── p1=2020-01-01
│ ├── p2=4
│ │ ├── p3=SHANGHAI
│ │ │ ├── 000000_0
│ │ │ └── 000000_1
│ │ └── p3=SHENZHEN
│ │ └── 000000_0
│ └── p2=6
│ └── p3=SHENZHEN
│ └── 000000_0
├── p1=2020-01-02
│ └── p2=8
│ ├── p3=SHANGHAI
│ │ └── 000000_0
│ └── p3=SHENZHEN
│ └── 000000_0
└── p1=2020-01-03
└── p2=6
├── p2=HANGZHOU
└── p3=SHENZHEN
└── 000000_0以Parquet格式为例,创建外表时指定列的建表语句示例如下:
CREATE TABLE IF NOT EXISTS hdfs_parquet_partition_table
(
uid varchar,
other varchar,
p1 date,
p2 int,
p3 varchar
)
ENGINE='HDFS'
TABLE_PROPERTIES='{
"hdfs_url":"hdfs://172.17.***.**:9000/adb/parquet_partition_classic/",
"format":"parquet", //如需创建CSV或ORC格式外表,仅需将format的取值改为csv或orc。
"partition_column":"p1, p2, p3" //针对包含分区的HDFS数据,如需以分区的模式进行查询,那么在导入数据至AnalyticDB MySQL时就需要在外表创建语句中指定分区列partition_column。
}';TABLE_PROPERTIES中的partition_column参数用于指定分区列(本例中的p1、p2、p3)。且partition_column参数中的分区列必须按照第1级、第2级、第3级的顺序声明(本例中p1为第1级分区,p2为第2级分区,p3为第3级分区)。列定义中必须定义分区列(本例中的p1、p2、p3)及类型,且分区列需要置于列定义的末尾。
列定义中分区列的先后顺序需要与
partition_column中分区列的顺序保持一致。分区列支持的数据类型包括:
BOOLEAN、TINYINT、SMALLINT、INT、INTEGER、BIGINT、FLOAT、DOUBLE、DECIMAL、VARCHAR、STRING、DATE、TIMESTAMP。查询数据时,分区列和其它数据列的展示和用法没有区别。
不指定format时,默认格式为CSV。
其他参数的详细说明,请参见参数说明。
创建外部云存储外表
AWS S3
参数说明
参数 | 说明 |
hdfs_url | S3 文件目录,前缀为“s3a”。 |
s3.access_key | S3 访问密钥 key。管理访问密钥,请参见Manage access keys for IAM users |
s3.secret_key | S3 访问密钥 secret key。 |
s3.endpoint | S3 endpoint地址。 |
权限要求
操作场景 | 最小权限集合 | 推荐策略 |
从S3外表读数据 |
| 建议您使用AmazonS3ReadOnlyAccess策略: |
将数据导出到S3外表 |
| 建议您使用AmazonS3FullAccess策略: |
创建示例
创建不带分区的外表
CREATE TABLE t1(c1 int, c2 int) ENGINE='hdfs' TABLE_PROPERTIES='{ "format" : "parquet", "hdfs_url" : "s3a://adbtest/t1", "s3.access_key":"AKIA****************45P", "s3.secret_key":"XH41************************l0q", "s3.endpoint":"s3.cn-north-1.amazonaws.com.cn" }'创建带分区的外表。
CREATE TABLE t1(c1 int, c2 int, p1 int) ENGINE='hdfs' TABLE_PROPERTIES='{ "partition_column":"p1", "format" : "parquet", "hdfs_url" : "s3a://adbtest/t1", "s3.access_key":"AKIAS************5P", "s3.secret_key":"XH41pLbBbFb**************xDl0q", "s3.endpoint":"s3.cn-north-1.amazonaws.com.cn" }'
Azure Blob Storage
参数说明
参数 | 是否必填 | 说明 |
hdfs_url | 必填 | Azure 文件目录,格式为“abfss://{容器名}@{账号名}.{域名}/test”。 |
azure.endpoint | 必填 | Azure endpoint地址。 |
azure.accesskey | Shared Key 认证鉴权必填 | Azure 访问密钥 key。查看访问密钥,请参见storage-account-keys-manage。 |
azure.sas.token | SAS 认证鉴权必填 | 通过SAS方式访问Azure外表时需配置。 |
权限要求
操作场景 | 最小权限集合 |
从Azure外表导入数据 |
|
将数据导出到Azure外表 |
|
您可以在目标存储账户的左侧导航栏中,单击设置>访问策略,在存储的访问策略区域编辑策略。
创建示例
使用Shared Key 认证鉴权。
CREATE TABLE t2(c1 int, c2 int, p1 int) ENGINE='hdfs' TABLE_PROPERTIES='{ "partition_column":"p1", "format" : "parquet", "hdfs_url" : "abfss://{容器名}@{账号名}.{域名}/test", "azure.accesskey":"qss33o/fQ2lCCQ+d7******************************8fxq+7dbdzuPuZji+AStCERlsg==", "azure.endpoint":"{账号名}.{域名}" }'使用SAS认证鉴权。
CREATE TABLE t2(c1 int, c2 int, p1 int) ENGINE='hdfs' TABLE_PROPERTIES='{ "partition_column":"p1", "format" : "parquet", "hdfs_url" : "abfss://{容器名}@{账号名}.{域名}/tb1", "azure.sas.token":"sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupx&se=2026-04-02T20:01:51Z&st=2025-04-02T12:01:51Z&spr=https,http&sig=r6a3************p7rM%3D", "azure.endpoint":"{账号名}.{域名}" }'
Google Cloud Storage
参数说明
参数 | 说明 |
hdfs_url | GCS文件目录。 |
gcs.project_id | Google Cloud service账号 project_id。 |
gcs.client_email | Google Cloud service账号 client_email。 |
gcs.token_uri | Google Cloud service账号 token_uri。 |
gcs.private_key_id | Google Cloud service账号 private_key_id。 |
gcs.private_key | Google Cloud service账号 private_key。 |
创建服务账号后会得到相应的json文件,将文件中对应的key填入相应参数即可。创建服务账号,请参见iam-service-accounts-create-console。
权限要求
操作场景 | 最小权限集合 |
从GCS外表导入数据 | Storage Legacy Bucket Reader |
将数据导出到GCS外表 | Storage Legacy Object Owner |
GCS存储桶的访问权限控制,请参见access-control。
创建示例
CREATE TABLE t2(c1 int, c2 int, p1 int)
ENGINE='hdfs'
TABLE_PROPERTIES='{
"partition_column":"p1",
"format" : "parquet",
"hdfs_url" : "gs://adbtest2/tbls/table1",
"gcs.project_id":"test-project",
"gcs.client_email":"adbtest@test-project.iam.gserviceaccount.com",
"gcs.token_uri":"https://oauth2.googleapis.cn/token",
"gcs.private_key_id":"xxxx",
"gcs.private_key":"-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFA****-----END PRIVATE KEY-----\n"
}'