本文以一個樣本為您介紹Apache Nifi如何串連Hologres。
背景資訊
Apache NiFi是一個易用、可靠的資料處理與分發系統,Apache NiFi的設計目標是自動化管理系統間的資料流。Apache Nifi是一個基於WEB-UI使用者介面,具有很強互動性和易用性,為不同系統間或系統內提供資料流管理與處理的系統。
前提條件
- 開通Hologres,詳情請參見購買Hologres。
- 安裝Apache Nifi,詳情請參見How to install and start NiFi。
擷取本地JSON檔案寫入Hologres
擷取本地JSON檔案寫入Hologres的流程如下圖所示。
- GetFile:讀取JSON格式的檔案。
- ConvertJSONToSQL:將JSON中的元素轉化為SQL中的Insert語句。
- PutSQL:執行上一個Processor產生的SQL語句,將JSON中的元素插入到資料庫中。
- 建立資料庫和表
- 登入Hologres執行個體,並且建立資料庫,命名為demo,詳情請參見建立資料庫。
- 建立資料表。使用如下SQL建立資料表,之後會將資料寫入該表中。
DROP TABLE IF EXISTS user_info; CREATE TABLE IF NOT EXISTS user_info ( id int, first_name text, last_name text, email text );
- 配置GetFile Processor
- 添加一個GetFile Processor。詳情請參見Adding a Processor。
- 填寫JSON檔案存放的地址。在PROPERTIES選項卡中的Input Directory中填寫JSON檔案存放的地址,例如我們將JSON檔案存放在Apache Nifi伺服器的
/opt/nifi/nifi-current/file_source
位置,檔案名稱為user_info.json檔案內容如下。
配置範例如下所示。{ "id": 1, "first_name": "Sig", "last_name": "Olivo", "email": "solivo0@blinklist.com" }
- 單擊APPLY儲存配置。
- 添加一個GetFile Processor。
- 配置ConvertJSONToSQL Processor
- 添加一個ConvertJSONToSQL Processor。
- 在JDBC Connection Pool中增加一個Service,選擇Compatible Controller Services為DBCPConnectionPool,並且設定Controller Service Name,此處設定為hologres。
- 單擊JDBC Connection Pool行最右側的向右箭頭按鈕,開始配置連接字串。
- 從中找到剛才建立的DBCPConnectionPool,單擊設定按鈕。
- 在設定頁面的PROPERTIES選項卡中配置如下參數。
參數名稱 描述 說明 Database Connection URL 串連Hologres執行個體的JDBC連接字串格式為: jdbc:postgresql://<endpoint>/<database name>
,例如jdbc:postgresql://hgpostcn-cn-xxxxxxxxxxx-cn-shanghai.hologres.aliyuncs.com:80/demo
。其中Endpoint需要使用公網或者VPC的Endpoint,進入Hologres管理主控台的執行個體詳情頁擷取Endpoint。 Database Driver Class Name org.postgresql.Driver 不涉及 Database Driver Location(s) PostgreSQL的JDBC存放地址,例如 /opt/nifi/nifi-current/jdbc_driver/postgresql-42.3.4.jar
。您可以從PostgreSQL官網下載JDBC,推薦使用42.2.25以上版本的JDBC。 Database User 當前阿里雲帳號的AccessKey ID。 您可以單擊AccessKey 管理,擷取AccessKey ID。 Password 當前帳號的AccessKey Secret。 您可以單擊AccessKey 管理,擷取AccessKey Secret。 - 單擊OK完成配置。
- 單擊ENABLE啟動該服務。
- 回到最初的ConvertJSONToSQL Processor,選擇修改如下參數,更多說明請參見官方手冊。
參數名稱 描述 Statement Type 需要執行的SQL語句,該樣本中使用 INSERT
。Table Name 需要寫入資料的表名稱,該樣本中使用 user_info
。Schema Name 需要寫入資料的表的Schema名稱,該樣本中使用 public
。 - 單擊APPLY完成配置。
- 配置PutSQL Processor
- 添加一個PutSQL Processor。
- 將JDBC Connection Pool設定為上一步配置的DBCPConnectionPool,該案例中DBCPConnectionPool名稱為
hologres
。 - 將Support Fragmented Transactions置為false。
- 單擊APPLY完成配置。
- 開始寫入資料至此,您就完成了所有配置,將所有節點置為運行狀態,Nifi即開始讀取JSON檔案寫入Hologres。
- 查詢資料使用如下命令在Hologres中查詢user_info表,即可看到寫入的資料。
查詢結果如下。SELECT * FROM user_info;