本ドキュメントでは、DataWorks のデータ移行機能を使用して、OSS から MaxCompute へ JSON データを移行し、MaxCompute の組み込み文字列関数 GET_JSON_OBJECT を使用して JSON 情報を抽出する方法を説明します。
準備
- データを OSS にアップロードします。
JSON ファイルを TXT ファイルに変換し、OSS へアップロードします。 JSON ファイルの例は次のとおりです。
{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }, "expensive": 10 }
applog.txt ファイルを OSS へアップロードします。 この例では、OSS バケットは中国 (上海) にあります。
DataWorks を使用した OSS から MaxCompute への JSON データの移行
- 1. OSS データソースの追加
DataWorks コンソールで、 [Data Integration] ページへ移動し、OSS データソースを追加します。 詳細については、「OSS データソースの設定」をご参照ください。次の図のようにパラメーターが表示されます。 接続性テストが成功した後に [Complete] をクリックします。 本ドキュメントのエンドポイントには、http://oss-cn-shanghai.aliyuncs.com と http://oss-cn-shanghai-internal.aliyuncs.com が含まれます。注 OSS プロジェクトと DataWorks プロジェクトは同じ地域にあるため、イントラネットエンドポイント
http://oss-cn-shanghai-internal.aliyuncs.com
が使用されています。 - 2. データ同期タスクの作成
DataWorks コンソールで、データ同期ノードを作成します。 詳細については、「OTSStream リーダーの設定」をご参照ください。 同時に、DataWorksでmqdataという名前のテーブルを作成して JSON デー タを格納します。 詳細については、「バケットの作成」をご参照ください。 グラフィカルインターフェイスでテーブルパラメータを設定できます。 mqdata テーブルには、MQ データという名前の列が 1 つだけあります。 データ型は String 型です。
- 3. パラメーターの設定
テーブルを作成したら、次の図に示すように、グラフィカルインターフェイスでデータ同期タスクパラメーターを設定します。 宛先データソースを odps_first に、宛先テーブルを mqdata に設定します。 元のデータを OSS に設定し、ファイルパスと名前をオブジェクトの接頭辞として入力します。注 列の区切り文字をキャレット (^) または TXT ファイルに含まれている他の文字に設定します。 DataWorks では、OSS の TXT データソースに対して複数の区切り文字がサポートされています。 したがって、列のデータを分割するために、%&%#^$$^% などの文字を使用します。
"fileFormat":"binary"
に設定します。 スクリプトモードのコードの例は次のとおりです。{ "type": "job", "steps": [ { "stepType": "oss", "parameter": { "fieldDelimiterOrigin": "^", "nullFormat": "", "compress": "", "datasource": "OSS_userlog", "column": [ { "name": 0, "type": "string", "index": 0 } ], "skipHeader": "false", "encoding": "UTF-8", "fieldDelimiter": "^", "fileFormat": "binary", "object": [ "applog.txt" ] }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "isCompress": false, "truncate": true, "datasource": "odps_first", "column": [ "mqdata" ], "emptyAsNull": false, "table": "mqdata" }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false, "dmu": 1 } } }
注 その後、JSON ファイルが OSS から MaxCompute へ同期されたら、JSON ファイルのデータは同じ行に保存されます。 つまり、JSON ファイルのデータは同じフィールドを共有します。 他のパラメーターにはデフォルト値を使用します。
結果の検証
-
Business Flow で ODPS SQL ノードを作成します。
-
SELECT * from mqdata;
文を入力し、mqdata テーブルのデータを表示します。注 MaxCompute clientでSELECT * from mqdata;
を実行してデータを表示し、次の手順を実行することも可能です。 -
テーブルにインポートされたデータが正しいことを確認し
SELECT GET_JSON_OBJECT(mqdata.MQdata,'$.expensive') FROM mqdata;
を使ってJSONファイルの expensive の値を取得します。
追加情報
MaxCompute の組み込み文字列関数 GET_JSON_OBJECT を使用して、必要に応じて JSON データを取得します。