AnalyticDB for MySQLでは、データ同期ジョブを作成して、Simple Log Service (SLS) LogstoreからAnalyticDB for MySQLクラスターに特定のオフセットに基づいてリアルタイムでデータを同期できます。 データ同期機能は、ほぼリアルタイムのデータ取り込み、完全なデータアーカイブ、柔軟な分析などの要件を満たすのに役立ちます。 このトピックでは、SLSデータソースの作成、データ同期ジョブの作成と開始、データの分析、およびデータソースの管理方法について説明します。
前提条件
AnalyticDB for MySQL Data Lakehouse Editionクラスターが作成されます。
AnalyticDB for MySQLクラスターのジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。
AnalyticDB for MySQLクラスター用のデータベースアカウントが作成されます。
Alibaba Cloudアカウントを使用する場合は、特権アカウントを作成する必要があります。 詳細については、「データベースアカウントの作成」トピックの「特権アカウントの作成」セクションをご参照ください。
RAM (Resource Access Management) ユーザーを使用する場合は、特権アカウントと標準アカウントの両方を作成し、標準アカウントをRAMユーザーに関連付ける必要があります。 詳細については、「データベースアカウントの作成」および「データベースアカウントの関連付けまたは関連付けの解除」をご参照ください。
SLSは活性化される。 プロジェクトとLogstoreは、AnalyticDB for MySQLクラスターと同じリージョンに作成されます。 詳細については、「入門」をご参照ください。
使用上の注意
SLS Logstoreのデータは、AnalyticDB for MySQLクラスター内の単一のテーブルにのみ同期できます。
課金ルール
AnalyticDB for MySQLのデータ移行機能を使用してデータをOSSに移行する場合、次の料金が発生します。
AnalyticDB for MySQLのAnalyticDBコンピューティングユニット (ACU) エラスティックリソース料金。 詳細については、「Data Lakehouse Editionの課金項目」をご参照ください。.
OSSのストレージ料金、およびGETやPUTなどのリクエスト数の料金。 詳細については、「課金の概要」をご参照ください。
手順
ステップ1: (オプション) RAM権限付与の設定
手順2: データソースの作成
手順3: データ同期ジョブの作成
ステップ4: データ同期ジョブの開始
ステップ5: データの分析
ステップ6: (オプション) データソースの管理
RAM権限の設定
Alibaba Cloudアカウント全体でSLSデータをAnalyticDB for MySQLクラスターに同期する場合は、データソースでRAMロールを作成し、ポリシーをアタッチしてRAMロールに権限を付与し、RAMロールの信頼ポリシーを編集する必要があります。 同じAlibaba Cloudアカウント内でSLSデータを同期する場合は、この手順をスキップしてデータソースを作成できます。 詳細については、このトピックの「データソースの作成」をご参照ください。
RAM ロールを作成します。 詳細については、「信頼できるAlibaba CloudアカウントのRAMロールの作成」をご参照ください。
説明[信頼できるAlibaba Cloudアカウントの選択] パラメーターに [その他のAlibaba Cloudアカウント] を選択し、AnalyticDB for MySQLクラスターが属するAlibaba CloudアカウントのIDを入力します。 [アカウントセンター] コンソールにログインして、[セキュリティ設定] ページで [Alibaba CloudアカウントID] を表示します。
AliyunAnalyticDBAccesssingLogRolePolicyポリシーをRAMロールにアタッチします。 詳細については、「RAMロールへのアクセス許可の付与」トピックの「方法2: [ロール] ページの [正確なアクセス許可] 」セクションを参照してください。
RAMロールの信頼ポリシーを編集します。 詳細については、「RAMロールの信頼ポリシーの編集」をご参照ください。
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "RAM": [ "acs:ram::<Alibaba Cloud account ID>:root" ], "Service": [ "<Alibaba Cloud account ID>@ads.aliyuncs.com" ] } } ], "Version": "1" }
説明上記のコードでは、Alibaba CloudアカウントIDパラメーターは、ステップ1で入力したアカウントIDを指定します。 このパラメーターを指定するときは、角かっこ (<>) を削除します。
データソースの作成
既存のデータソースからデータを同期する場合は、この手順をスキップしてデータ同期ジョブを作成します。 詳細については、このトピックの「データ同期ジョブの作成」をご参照ください。
AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 [Data Lakehouse Edition] タブで、管理するクラスターを見つけ、クラスターIDをクリックします。
左側のナビゲーションウィンドウで、[データ取り込み]> [データソース] を選択します。
ページの右上隅にある データソースの新規作成 をクリックします。
[データソースの作成] ページで、次の表に示すパラメーターを設定します。
パラメーター
説明
データソースのタイプ
データソースのタイプ。 [SLS] を選択します。
データソース名
データソースの名前。 デフォルトでは、データソースの種類と現在の時刻に基づいて名前が生成されます。 ビジネス要件に基づいて名前を変更できます。
データソースの説明
データソースの説明 たとえば、ユースケースとビジネスの制限を入力できます。
デプロイモード
Alibaba Cloudインスタンスのみがサポートされています。
SLS プロジェクトのリージョン
ソースSLSプロジェクトが存在するリージョン。
Alibaba Cloud プライマリアカウントをクロスするかどうか
SLSデータをAlibaba Cloudアカウント間でAnalyticDB for MySQLクラスターに同期できるかどうかを指定します。 有効な値:
いいえ。
はい。 [はい] を選択した場合、Alibaba CloudアカウントおよびRAMロールパラメーターを指定する必要があります。
説明Alibaba Cloudアカウント: データソースが属するAlibaba CloudアカウントのID。
RAMロール: 手順1でデータソースに作成されたRAMロールの名前。
SLS project
ソースSLSプロジェクトの名前。
重要Alibaba CloudアカウントとRAMユーザーに属するすべてのプロジェクトが表示されます。 Alibaba Cloudアカウントに属し、RAMユーザーとしてデータを同期するプロジェクトを選択する場合は、RAMユーザーにプロジェクトに対する権限が付与されていることを確認してください。 そうしないと、データをAnalyticDB for MySQLに同期できません。
SLS Logstore
ソースSLS Logstoreの名前。
作成をクリックします。
データ同期ジョブの作成
左側のナビゲーションウィンドウで、Simple Log Service / Kafka データ同期 をクリックします。
ページの右上隅にある 同期リンクの新規作成 をクリックします。
同期リンクの新規作成 ページの [Simple Log Serviceデータソース] タブで、データソースと宛先の設定 、ターゲットデータベースとターゲットテーブルの設定 、同期設定 セクションのパラメーターを設定します。
次の表に、[ソースと宛先の設定] セクションのパラメーターを示します。
パラメーター
説明
データリンク名
データ同期ジョブの名前。 デフォルトでは、データソースの種類と現在の時刻に基づいて名前が生成されます。 ビジネス要件に基づいて名前を変更できます。
データソース
データソースの名前。 既存のSLSデータソースを選択するか、データソースを作成できます。
宛先タイプ
宛先タイプ。 Data Lake - OSS Storageのみがサポートされています。
OSS パス
AnalyticDB for MySQLクラスターデータのObject Storage Service (OSS) ストレージパス。
重要AnalyticDB for MySQLクラスターと同じリージョンにあるすべてのバケットが表示されます。 ビジネス要件に基づいてこのパラメーターを設定します。 このパラメーターを設定した後、変更することはできません。
他のデータ同期ジョブのディレクトリとネストされた関係を持たない空のディレクトリを選択することを推奨します。 これにより、履歴データの上書きが防止されます。 たとえば、2つのデータ同期ジョブに関連するOSSストレージパスがoss:// adb_demo/test/sls1 /およびoss:// adb_demo/test /であるとします。 この場合、これら2つのパスは互いに入れ子の関係にあるため、データ同期中にデータの上書きが発生します。
次の表に、ターゲットデータベースとターゲットテーブルの設定 セクションのパラメーターを示します。
パラメーター
説明
ライブラリ名
AnalyticDB for MySQLクラスター内のターゲットデータベースの名前。 同じ名前のデータベースが存在しない場合は, データベースが作成されます。 同じ名前を使用するデータベースがすでに存在する場合、データは既存のデータベースと同期されます。 データベースの命名規則については、「制限」をご参照ください。
テーブル名
AnalyticDB for MySQLクラスター内のターゲットテーブルの名前。 同じ名前を使用するテーブルがデータベースに存在しない場合、テーブルが作成されます。 同じ名前を使用するテーブルがデータベースにすでに存在する場合、データの同期に失敗します。 テーブルの命名規則については、「制限」をご参照ください。
スキーマフィールドマッピング
ソースフィールドと宛先フィールド。 デフォルトでは、フィールドはSLSの出荷タスク設定に基づいて取得されます。 Logstoreに出荷タスクが設定されていない場合、フィールドは最新のログデータに基づいて取得されます。
BOOLEAN、INT、BIGINT、FLOAT、DOUBLE、STRINGのデータ型がサポートされています。
SLSの予約フィールドは同期させることができる。 詳細については、「予約済みフィールド」をご参照ください。
重要宛先フィールド名は変更できません。
データ同期ジョブが開始された場合、開始および実行中であるか、開始および完了済みであるかにかかわらず、既存の列に関する情報を変更することはできません。 ただし、列を追加することはできます。 データ同期ジョブが作成されてもまだ開始されていない場合は、既存の列に関する情報を変更できます。
パーティションキーの設定
ターゲットテーブルのパーティションフィールドの設定。 データが取り込まれ、クエリが期待どおりに実行されるようにするには、ログ時間またはビジネスロジックに基づいてパーティションを構成することをお勧めします。 パーティションを設定しない場合、ターゲットテーブルにパーティションは存在しません。
Format Processing Methodパラメータの有効な値:
フォーマットされた時間: ソースパーティションフィールドドロップダウンリストからdatetimeフィールドを選択し、フォーマット処理方法パラメータをフォーマットされた時間に設定し、ソースフィールドフォーマットおよび宛先パーティションフォーマットパラメータを設定します。 AnalyticDB for MySQLは、指定されたソースフィールド形式に基づいてパーティションフィールドの値を識別し、その値をパーティション分割のために指定された宛先パーティション形式に変換します。 たとえば、ソースフィールドがgmt_createdで、値が1711358834、ソースフィールド形式パラメーターがTimestamp Accurate to Secondsに設定され、Destination Partition FormatパラメーターがyyyyMMddに設定されている場合、値は20240325に基づいてパーティション分割されます。
指定されたパーティションフィールド: Format Processing Methodパラメーターを指定されたパーティションフィールドに設定し、その他の必須パラメーターを設定します。
次の表に、[同期設定] セクションのパラメーターを示します。
パラメーター
説明
増分同期の最初のコンシューマーオフセット
データ同期ジョブの開始時にシステムがSLSデータを消費する時点。 有効な値:
初期オフセット (Earliest Offset) (begin_cursor): システムは、初期データレコードが生成された時点からSLSデータを消費する。
最新オフセット (end_cursor): システムは、最新のデータレコードが生成された時点からSLSデータを消費します。
カスタムオフセット: 時点を選択できます。 次いで、システムは、選択された時点で生成された第1のエントリからのSLSデータを消費する。
求人リソースグループ
データ同期ジョブを実行するジョブリソースグループ。
増分同期のためのACU
ジョブリソースグループがデータ同期ジョブを実行するために必要なAnalyticDBコンピューティングユニット (ACU) の数。 値の範囲は2から、ジョブリソースグループで使用可能なコンピューティングリソースの最大数です。 データ取り込みの安定性とパフォーマンスを向上させるために、より多くのACUを指定することを推奨します。
説明ジョブリソースグループでデータ同期ジョブを作成すると、リソースグループ内のエラスティックリソースが使用され、システムは使用されたリソースをリソースグループから除外します。 たとえば、ジョブリソースグループが48のACUの予約済みコンピューティングリソースを有し、8つのACUを消費する同期ジョブが作成されると仮定する。 リソースグループで別の同期ジョブを作成する場合、最大40のACUを選択できます。
詳細設定
データ同期ジョブのカスタム設定。 カスタム設定を構成する場合は、テクニカルサポートにお問い合わせください。
送信 をクリックします。
データ同期ジョブの開始
On theSimple Log Service / Kafka データ同期ページで、作成したデータ同期ジョブを見つけて、開始で、アクション列を作成します。
ページの右上隅にある [検索] をクリックします。 ジョブの状態がStartingに変わると、データ同期ジョブが開始されます。
データの分析
データ同期ジョブが完了したら、Spark JAR Developmentを使用して、AnalyticDB for MySQLクラスターに同期されたデータを分析できます。 Spark開発の詳細については、「Sparkエディター」および「概要」をご参照ください。
左側のナビゲーションウィンドウで、 .
デフォルトのテンプレートにSQL文を入力し、実行.
-- Here is just an example of SparkSQL. Modify the content and run your spark program. conf spark.driver.resourceSpec=medium; conf spark.executor.instances=2; conf spark.executor.resourceSpec=medium; conf spark.app.name=Spark SQL Test; conf spark.adb.connectors=oss; -- Here are your sql statements show tables from lakehouse20220413156_adbTest;
(オプション) [アプリケーション] タブでアプリケーションを見つけ、[操作] 列の [ログ] をクリックして、アプリケーションのSpark SQL実行ログを表示します。
データソースの管理
[データソース] ページで、次の表の [操作] 列に記載されている操作を実行できます。
API 操作 | 説明 |
リンクの新規作成 | 同期ジョブの作成または移行ジョブの作成ページに移動して、データソースを使用するジョブを作成します。 |
表示 | データソースの詳細な設定を表示します。 |
編集 | データソース名や説明などのデータソースパラメーターを変更します。 |
削除 | データソースを削除します。 説明 データソースがデータ同期またはデータ移行ジョブで使用されている場合、データソースを削除することはできません。 この場合、最初に Simple Log Service / Kafka データ同期 ページに移動してジョブを見つけ、操作 列の 削除 をクリックしてジョブを削除する必要があります。 |