このトピックでは、Data Integration を使用して、Simple Log Service の LogHub データソースから、MaxCompute、オブジェクトストレージサービス (OSS)、Tablestore、リレーショナルデータベース管理システム (RDBMS)、DataHub など、Data Integration でサポートされている宛先にデータを同期する方法について説明します。このトピックでは、宛先として MaxCompute データソースを使用します。
前提条件
MaxCompute データソースが追加されていること。詳細については、「MaxCompute データソースを追加する」をご参照ください。
ソース Logstore と宛先の MaxCompute テーブルが準備されていること。
背景情報
以下のシナリオでは、LogHub データソースから宛先にデータを同期できます。
LogHub データソースから、リージョンをまたがる MaxCompute データソースなどのデータソースにデータを同期する。
LogHub データソースから、Alibaba Cloud アカウントをまたがる MaxCompute データソースなどのデータソースにデータを同期する。
LogHub データソースから、同じ Alibaba Cloud アカウント内の MaxCompute データソースなどのデータソースにデータを同期する。
LogHub データソースから、Alibaba Cloud パブリッククラウドと Alibaba Finance Cloud 間の MaxCompute データソースなどのデータソースにデータを同期する。
Alibaba Cloud アカウント A と B がある場合、アカウント B を使用して Data Integration で同期タスクを作成できます。次に、同期タスクを使用して、アカウント A 内の LogHub データをアカウント B 内の MaxCompute データソースに同期できます。次の説明は詳細情報を提供します。
アカウント A の AccessKey ID と AccessKey Secret を使用して、LogHub データソースを追加します。
アカウント B を使用して、アカウント A を使用して作成されたすべての Simple Log Service プロジェクトのデータを同期できます。
アカウント A 内の RAM ユーザー A1 の AccessKey ID と AccessKey Secret を使用して、LogHub データソースを追加します。
アカウント A を使用して、
AliyunLogFullAccess
およびAliyunLogReadOnlyAccess
システムポリシーを RAM ユーザー A1 にアタッチし、Simple Log Service に対する一般的な権限を RAM ユーザー A1 に付与します。詳細については、「RAM ユーザーを作成し、RAM ユーザーに Simple Log Service へのアクセスを承認する」をご参照ください。説明AliyunLogFullAccess
およびAliyunLogReadOnlyAccess
システムポリシーが RAM ユーザー A1 にアタッチされると、RAM ユーザー A1 を使用して、アカウント A を使用して作成されたすべての Simple Log Service プロジェクトにアクセスできます。アカウント A を使用して、Simple Log Service に対するカスタム権限を RAM ユーザー A1 に付与します。
アカウント A を使用して [RAM コンソール] にログインします。左側のナビゲーションペインで、
を選択します。[ポリシー] ページで、[ポリシーの作成] をクリックします。RAM と Simple Log Service に対するカスタム権限を RAM ユーザーに付与する方法の詳細については、「RAM の概要」および「概要」をご参照ください。
次のポリシーが RAM ユーザー A1 にアタッチされている場合、アカウント B を使用して、RAM ユーザー A1 が Simple Log Service で権限を持っている project_name1 と project_name2 のデータのみを同期できます。
{ "Version": "1", "Statement": [ { "Action": [ "log:Get*", // log:Get* アクション "log:List*", // log:List* アクション "log:CreateConsumerGroup", // コンシューマーグループの作成アクション "log:UpdateConsumerGroup", // コンシューマーグループの更新アクション "log:DeleteConsumerGroup", // コンシューマーグループの削除アクション "log:ListConsumerGroup", // コンシューマーグループのリストアクション "log:ConsumerGroupUpdateCheckPoint", // コンシューマーグループのチェックポイント更新アクション "log:ConsumerGroupHeartBeat", // コンシューマーグループのハートビートアクション "log:GetConsumerGroupCheckPoint" // コンシューマーグループのチェックポイント取得アクション ], "Resource": [ "acs:log:*:*:project/project_name1", "acs:log:*:*:project/project_name1/*", "acs:log:*:*:project/project_name2", "acs:log:*:*:project/project_name2/*" ], "Effect": "Allow" } ] }
LogHub データソースを追加する
DataWorks コンソール にログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションペインで、 を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[data Integration に移動] をクリックします。
Data Integration ページの左側のナビゲーションペインで、[データソース] をクリックします。
[データソース] ページで、[データソースを追加] をクリックします。
[データソースを追加] ダイアログボックスで、[loghub] をクリックします。
[loghub データソースを追加] ダイアログボックスで、パラメーターを設定します。
パラメーター
説明
データソース名
データソースの名前。名前には、文字、数字、アンダースコア (_) のみを含めることができ、文字で始める必要があります。
データソースの説明
データソースの説明。説明は 80 文字以内にする必要があります。
Loghub エンドポイント
Simple Log Service プロジェクトにアクセスするために使用される URL。
http://example.com
形式である必要があります。 example.com は Simple Log Service プロジェクトのエンドポイントを示します。詳細については、「エンドポイント」をご参照ください。プロジェクト
Simple Log Service プロジェクトの名前。
AccessKey ID
Simple Log Service プロジェクトへの接続に使用する Alibaba Cloud アカウントの AccessKey ID。[アクセスキーペア] ページで AccessKey ID をコピーできます。
AccessKey Secret
Simple Log Service プロジェクトへの接続に使用する Alibaba Cloud アカウントの AccessKey Secret。
使用するリソースグループを見つけて、[接続ステータス (本番環境)] 列の [ネットワーク接続テスト] をクリックします。
接続テストが成功したら、[完了] をクリックします。
バッチ同期タスクを作成する
[データソース] ページで、左上隅にある
アイコンをクリックし、 を選択します。
[datastudio] ページで、ポインターを
アイコンの上に移動し、[ワークフローを作成] を選択します。
[ワークフローを作成] ダイアログボックスで、[ワークフロー名] パラメーターと [説明] パラメーターを設定し、[作成] をクリックします。
[スケジュールされたワークフロー] ペインで作成されたワークフローの名前をクリックし、[data Integration] を右クリックし、 を選択します。
[ノードの作成] ダイアログボックスで、[名前] パラメーターを設定し、[パス] ドロップダウンリストからパスを選択します。
[確認] をクリックして、バッチ同期タスクの設定タブに移動します。
コードレス UI を使用して同期タスクを設定する
[ネットワーク接続とリソースグループの設定] ステップで、使用するソースと宛先を選択します。
パラメーター
説明
ソース
[loghub] を選択します。
データソース名
追加した LogHub データソースを選択します。
リソースグループ
使用する Data Integration 専用リソースグループを選択します。
宛先
MaxCompute を選択します。
データソース名
追加した MaxCompute データソースを選択します。
Data Integration 専用リソースグループとソースおよび宛先間のネットワーク接続をテストします。ネットワーク接続テストが成功したら、[次へ] をクリックします。
ソース Logstore や宛先テーブルなどの情報を設定します。
次の表は、ソースのパラメーターについて説明しています。
パラメーター
説明
Logstore
データを読み取る Logstore の名前。
ログ開始時刻
データ消費の開始時刻。このパラメーターは、yyyyMMddHHmmss 形式の時間範囲の左境界 (左閉右開) を定義します。例: 20180111013000。このパラメーターは、DataWorks のスケジューリングパラメーターと一緒に使用できます。
ログ終了時刻
データ消費の終了時刻。このパラメーターは、yyyyMMddHHmmss 形式の時間範囲の右境界 (左閉右開) を定義します。例: 20180111013010。このパラメーターは、DataWorks のスケジューリングパラメーターと一緒に使用できます。
バッチ数
一度に読み取るデータレコードの数。デフォルト値: 256。
説明[データプレビュー] をクリックしてデータをプレビューできます。少数の LogHub データレコードのみが表示されます。表示されるデータレコードは、指定した開始時刻と終了時刻のために、同期される実際のデータと異なる場合があります。
[フィールドマッピング] セクションで、ソースフィールドと宛先フィールド間のマッピングを設定します。
[チャネル制御] セクションで、[同期レート] や [ダーティデータレコードのポリシー] などのパラメーターを設定します。
同期タスクの設定タブの右側のナビゲーションペインで、[プロパティ] をクリックします。[プロパティ] タブで、再実行プロパティ、スケジューリング用リソースグループ、同期タスクの祖先タスクなどの設定を行います。
説明同期タスクの祖先タスクを設定する場合は、[ルートノードを追加] を選択します。
上記の設定が正しいことを確認し、トップツールバーの
アイコンをクリックします。
バッチ同期タスクを実行します。
バッチ同期タスクを実行するには、次のいずれかの方法を使用できます。
タスクを 1 回だけ実行します。
設定タブでトップツールバーの
アイコンをクリックしてタスクを実行します。
説明タスクを実行する前に、タスクに設定したカスタムパラメーターの値を指定する必要があります。
スケジューリングシステムを有効にして、スケジューリングプロパティに基づいてタスクを実行します。
タスクの設定タブの右側のナビゲーションペインで [プロパティ] タブをクリックし、タスクのスケジューリングサイクルなどの時間プロパティを設定します。
次に、トップツールバーの
アイコンと
アイコンを順番にクリックして、タスクをスケジューリングシステムにコミットします。スケジューリングシステムは、タスクに設定されたプロパティに基づいて、翌日以降にタスクを定期的に実行します。
コードエディターを使用してバッチ同期タスクを設定する
バッチ同期タスクの設定タブで、トップツールバーの [変換スクリプト] アイコンをクリックします。
[ヒント] メッセージで、[OK] をクリックしてコードエディターに切り替えます。
トップツールバーの [テンプレートのインポート] アイコンをクリックします。
[テンプレートのインポート] ダイアログボックスで、[ソースタイプ]、[データソース]、[ターゲットタイプ]、[データソース] パラメーターを設定し、[確認] をクリックしてテンプレートを適用します。
ビジネス要件に基づいて、コードエディターでコードを変更します。サンプルコード:
{ "type": "job", "version": "1.0", "configuration": { "reader": { "plugin": "loghub", "parameter": { "datasource": "loghub_lzz", // データを読み取る LogHub データソースの名前。追加したデータソースの名前と同じである必要があります。 "logstore": "logstore-ut2", // データを読み取る Logstore の名前。Logstore は、ログデータを収集、保存、およびクエリするための Simple Log Service ユニットです。 "beginDateTime": "${startTime}", // データ消費の開始時刻。このパラメーターは、時間範囲の左境界 (左閉右開) を定義します。 "endDateTime": "${endTime}", // データ消費の終了時刻。このパラメーターは、時間範囲の右境界 (左閉右開) を定義します。 "batchSize": 256, // 一度に読み取るデータレコードの数。デフォルト値: 256。 "splitPk": "", "column": [ "key1", "key2", "key3" ] } }, "writer": { "plugin": "odps", "parameter": { "datasource": "odps_source", // データを書き込むデータソースの名前。追加したデータソースの名前と同じである必要があります。 "table": "test", // データを書き込むテーブルの名前。 "truncate": true, "partition": "", // 宛先テーブルのパーティション情報。 "column": [ // データを書き込む列の名前。 "key1", "key2", "key3" ] } }, "setting": { "speed": { "mbps": 8, // 最大伝送速度。単位: MB/s。 "concurrent": 7 // 並列スレッドの最大数。 } } } }