本文以一个示例为您介绍Apache Nifi如何连接Hologres。

背景信息

Apache NiFi是一个易用、可靠的数据处理与分发系统,Apache NiFi的设计目标是自动化管理系统间的数据流。Apache Nifi是一个基于WEB-UI用户界面,具有很强交互性和易用性,为不同系统间或系统内提供数据流管理与处理的系统。

前提条件

获取本地JSON文件写入Hologres

获取本地JSON文件写入Hologres的流程如下图所示。流程图
  1. GetFile:读取JSON格式的文件。
  2. ConvertJSONToSQL:将JSON中的元素转化为SQL中的Insert语句。
  3. PutSQL:执行上一个Processor生成的SQL语句,将JSON中的元素插入到数据库中。
  1. 创建数据库和表
    1. 登录Hologres实例,并且建立数据库,命名为demo,详情请参见创建数据库
    2. 创建数据表。
      使用如下SQL创建数据表,之后会将数据写入该表中。
      DROP TABLE IF EXISTS user_info;
      
      CREATE TABLE IF NOT EXISTS user_info (
          id int,
          first_name text,
          last_name text,
          email text
      );
  2. 配置GetFile Processor
    1. 添加一个GetFile Processor。
      详情请参见Adding a Processor
    2. 填写JSON文件存放的地址。
      在PROPERTIES选项卡中的Input Directory中填写JSON文件存放的地址,例如我们将JSON文件存放在Apache Nifi服务器的/opt/nifi/nifi-current/file_source位置,文件名为user_info.json文件内容如下。
      {
          "id": 1,
          "first_name": "Sig",
          "last_name": "Olivo",
          "email": "solivo0@blinklist.com"
      }
      配置样例如下所示。getfile processor
    3. 单击APPLY保存配置。
  3. 配置ConvertJSONToSQL Processor
    1. 添加一个ConvertJSONToSQL Processor。
    2. 在JDBC Connection Pool中增加一个Service,选择Compatible Controller ServicesDBCPConnectionPool,并且设置Controller Service Name,此处设置为hologres
      addcontrollerservice
    3. 单击JDBC Connection Pool行最右侧的向右箭头按钮,开始配置连接字符串。
    4. 从中找到刚才创建的DBCPConnectionPool,单击设置按钮。
      设置DBCP
    5. 在设置页面的PROPERTIES选项卡中配置如下参数。
      propertioes
      参数名称 描述 说明
      Database Connection URL 连接Hologres实例的JDBC连接字符串格式为:jdbc:postgresql://<endpoint>/<database name>,例如jdbc:postgresql://hgpostcn-cn-xxxxxxxxxxx-cn-shanghai.hologres.aliyuncs.com:80/demo 其中Endpoint需要使用公网或者VPC的Endpoint,进入Hologres管理控制台的实例详情页获取Endpoint。
      Database Driver Class Name org.postgresql.Driver 不涉及
      Database Driver Location(s) PostgreSQL的JDBC存放地址,例如/opt/nifi/nifi-current/jdbc_driver/postgresql-42.3.4.jar 您可以从PostgreSQL官网下载JDBC,推荐使用42.2.25以上版本的JDBC。
      Database User 当前阿里云账号的AccessKey ID。 您可以单击AccessKey 管理,获取AccessKey ID。
      Password 当前账号的AccessKey Secret。 您可以单击AccessKey 管理,获取AccessKey Secret。
    6. 单击OK完成配置。
    7. 单击ENABLE启动该服务。
    8. 回到最初的ConvertJSONToSQL Processor,选择修改如下参数,更多说明请参见官方手册
      参数名称 描述
      Statement Type 需要执行的SQL语句,该示例中使用INSERT
      Table Name 需要写入数据的表名称,该示例中使用user_info
      Schema Name 需要写入数据的表的Schema名称,该示例中使用public
    9. 单击APPLY完成配置。
  4. 配置PutSQL Processor
    1. 添加一个PutSQL Processor。
    2. JDBC Connection Pool设置为上一步配置的DBCPConnectionPool,该案例中DBCPConnectionPool名称为hologres
    3. Support Fragmented Transactions置为false
    4. 单击APPLY完成配置。
  5. 开始写入数据
    至此,您就完成了所有配置,将所有节点置为运行状态,Nifi即开始读取JSON文件写入Hologres。开始写入数据
  6. 查询数据
    使用如下命令在Hologres中查询user_info表,即可看到写入的数据。
    SELECT * FROM user_info;
    查询结果如下。查询结果