AnalyticDB for MySQLは、特定のオフセットに基づいてApsaraMQ for KafkaインスタンスからAnalyticDB for MySQLクラスターにデータをリアルタイムで同期できるデータ同期機能を提供します。 この機能は、ほぼリアルタイムのデータ取り込み、完全なデータアーカイブ、弾性分析などの要件を満たすのに役立ちます。 このトピックでは、ApsaraMQ for Kafkaデータソースの作成、データ同期ジョブの作成と開始、データの分析、およびデータソースの管理方法について説明します。
前提条件
AnalyticDB for MySQL Data Lakehouse Editionクラスターが作成されます。
AnalyticDB for MySQLクラスターのジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。
AnalyticDB for MySQLクラスター用のデータベースアカウントが作成されます。
Alibaba Cloudアカウントを使用する場合は、特権アカウントを作成する必要があります。 詳細については、「データベースアカウントの作成」トピックの「特権アカウントの作成」セクションをご参照ください。
RAM (Resource Access Management) ユーザーを使用する場合は、特権アカウントと標準アカウントの両方を作成し、標準アカウントをRAMユーザーに関連付ける必要があります。 詳細については、「データベースアカウントの作成」および「データベースアカウントの関連付けまたは関連付けの解除」をご参照ください。
ApsaraMQ for Kafkaインスタンスは、AnalyticDB for MySQLと同じリージョンに作成されています。 Data Lakehouse Editionクラスター。
ApsaraMQ for KafkaインスタンスにKafkaトピックが作成され、メッセージが送信されます。 詳細については、概要トピックの「ApsaraMQ For Kafkaのクイックスタート」をご参照ください。
使用上の注意
トピックのデータが期限切れになると、システムは自動的にデータを削除します。 トピックデータの有効期限が切れ、データ同期ジョブが失敗した場合、データ同期ジョブを再起動したときに削除されたデータの読み取りに失敗します。 その結果、データが失われる可能性がある。 データ同期ジョブが失敗した場合は、トピックデータのライフサイクルを増やし、テクニカルサポートに連絡することを推奨します。
トピックから取得したサンプルデータのサイズが8 KBを超える場合、ApsaraMQ for Kafka APIはデータを切り捨て、データをJSON形式に解析できません。 その結果、フィールドマッピング情報を自動的に生成することができない。
課金ルール
AnalyticDB for MySQLのデータ移行機能を使用してデータをOSSに移行する場合、次の料金が発生します。
AnalyticDB for MySQLのAnalyticDBコンピューティングユニット (ACU) エラスティックリソース料金。 詳細については、「Data Lakehouse Editionの課金項目」をご参照ください。.
OSSのストレージ料金、およびGETやPUTなどのリクエスト数の料金。 詳細については、「課金の概要」をご参照ください。
手順
手順1: データソースの作成
手順2: データ同期ジョブの作成
ステップ3: データ同期ジョブの開始
ステップ4: データの分析
ステップ5: (オプション) データソースの管理
データソースの作成
既存のApsaraMQ for Kafkaデータソースからデータを同期する場合は、この手順をスキップしてデータ同期ジョブを作成します。 詳細については、このトピックの「データ同期ジョブの作成」をご参照ください。
AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 [Data Lakehouse Edition] タブで、管理するクラスターを見つけ、クラスターIDをクリックします。
左側のナビゲーションウィンドウで、[データ取り込み]> [データソース] を選択します。
ページの右上隅にある データソースの新規作成 をクリックします。
データソースの新規作成 ページで、次の表に示すパラメーターを設定します。
パラメーター
説明
データソースのタイプ
データソースのタイプ。 Kafkaを選択します。
データソース名
データソースの名前。 デフォルトでは、データソースの種類と現在の時刻に基づいて名前が生成されます。 ビジネス要件に基づいて名前を変更できます。
データソースの説明
データソースの説明 たとえば、ユースケースとビジネスの制限を入力できます。
デプロイモード
Alibaba Cloudインスタンスのみがサポートされています。
Kafka インスタンス
ApsaraMQ for KafkaインスタンスのID。
ApsaraMQ for Kafkaコンソールにログインし、[インスタンス] ページに移動してインスタンスIDを表示します。
Kafka Topic
ApsaraMQ for Kafkaインスタンスで作成するトピックの名前。
ApsaraMQ for Kafkaコンソールにログインし、インスタンスの [トピック] ページに移動してトピック名を表示します。
メッセージデータフォーマット
メッセージのデータ形式。 JSONのみがサポートされています。
クリック作成.
データ同期ジョブの作成
左側のナビゲーションウィンドウで、Simple Log Service / Kafka データ同期 をクリックします。
ページの右上隅にある 同期リンクの新規作成 をクリックします。
のKafkaデータソースタブ同期リンクの新規作成ページでパラメーターを設定します。データソースと宛先の設定,ターゲットデータベースとターゲットテーブルの設定、および同期設定セクションを使用します。
次の表に、データソースと宛先の設定 セクションのパラメーターを示します。
パラメーター
説明
データリンク名
データ同期ジョブの名前。 デフォルトでは、データソースの種類と現在の時刻に基づいて名前が生成されます。 ビジネス要件に基づいて名前を変更できます。
データソース
データソース。 既存のApsaraMQ for Kafkaデータソースを選択するか、データソースを作成できます。
宛先タイプ
AnalyticDB for MySQLのデータストレージタイプ。 Data Lake - OSS Storageのみがサポートされています。
OSS パス
AnalyticDB for MySQLクラスターデータのOSSストレージパス。
重要AnalyticDB for MySQLクラスターと同じリージョンにあるすべてのバケットが表示されます。 ビジネス要件に基づいてこのパラメーターを設定します。 このパラメーターを設定した後、変更することはできません。
他のデータ同期ジョブのディレクトリとネストされた関係を持たない空のディレクトリを選択することを推奨します。 これにより、履歴データの上書きが防止されます。 たとえば、2つのデータ同期ジョブに関連するOSSストレージパスがoss:// adb_demo/test/sls1 /およびoss:// adb_demo/test /であるとします。 この場合、これら2つのパスは互いに入れ子の関係にあるため、データ同期中にデータの上書きが発生します。
次の表に、ターゲットデータベースとターゲットテーブルの設定 セクションのパラメーターを示します。
パラメーター
説明
ライブラリ名
AnalyticDB for MySQLクラスター内のターゲットデータベースの名前。 同じ名前のデータベースが存在しない場合は, データベースが作成されます。 同じ名前を使用するデータベースがすでに存在する場合、データは既存のデータベースと同期されます。 データベースの命名規則については、「制限」をご参照ください。
テーブル名
AnalyticDB for MySQLクラスター内のターゲットテーブルの名前。 同じ名前を使用するテーブルがデータベースに存在しない場合、テーブルが作成されます。 同じ名前を使用するテーブルがデータベースにすでに存在する場合、データの同期に失敗します。 テーブルの命名規則については、「制限」をご参照ください。
サンプルデータ
ApsaraMQ for Kafkaトピックから自動的に取得される最新のデータ。
説明ApsaraMQ for KafkaトピックのデータはJSON形式である必要があります。 それ以外の場合、データ同期中にエラーが報告されます。
解析されたJSONレイヤー
ネストされたJSONフィールドに対して解析されるレイヤーの数。 有効な値:
0: ネストされたJSONフィールドは解析されません。
1 (デフォルト): 1つのレイヤーが解析されます。
2: 2つのレイヤーが解析されます。
3: 3つのレイヤが解析される。
4: 4つのレイヤーが解析されます。
ネストされたJSONフィールドの解析方法の詳細については、このトピックの「異なる数のレイヤーで解析されたスキーマフィールドの例」をご参照ください。
スキーマフィールドマッピング
サンプルデータから解析されるスキーマフィールドに関する情報。 宛先フィールド名またはタイプを変更し、フィールドを追加または削除できます。
パーティションキーの設定
ターゲットテーブルのパーティションフィールドの設定。 データが取り込まれ、クエリが期待どおりに実行されるようにするには、ログ時間またはビジネスロジックに基づいてパーティションを構成することをお勧めします。 パーティションを設定しない場合、ターゲットテーブルにパーティションは存在しません。
Format Processing Methodパラメータの有効な値:
フォーマットされた時間: ソースパーティションフィールドドロップダウンリストからdatetimeフィールドを選択し、フォーマット処理方法パラメータをフォーマットされた時間に設定し、ソースフィールドフォーマットおよび宛先パーティションフォーマットパラメータを設定します。 AnalyticDB for MySQLは、指定されたソースフィールド形式に基づいてパーティションフィールドの値を識別し、その値をパーティション分割のために指定された宛先パーティション形式に変換します。 たとえば、ソースフィールドがgmt_createdで、値が1711358834、ソースフィールド形式パラメーターがTimestamp Accurate to Secondsに設定され、Destination Partition FormatパラメーターがyyyyMMddに設定されている場合、値は20240325に基づいてパーティション分割されます。
指定されたパーティションフィールド: Format Processing Methodパラメーターを指定されたパーティションフィールドに設定し、その他の必須パラメーターを設定します。
次の表に、同期設定 セクションのパラメーターを示します。
パラメーター
説明
増分同期の最初のコンシューマーオフセット
データ同期ジョブの開始時にシステムがApsaraMQ for Kafkaデータを消費する時点。 有効な値:
最古のオフセット: システムは、最古のデータレコードが生成された時点から、KafkaデータのApsaraMQを消費します。
最新のオフセット: システムは、最新のデータレコードが生成された時点から、Kafkaデータ用のApsaraMQを消費します。
カスタムオフセット: 時点を選択できます。 その後、システムは、選択した時点で生成された最初のデータエントリからのApsaraMQ for Kafkaデータを消費します。
ジョブ型リソースグループ
データ同期ジョブを実行するジョブリソースグループ。
増分同期に必要な ACU の数
ジョブリソースグループがデータ同期ジョブを実行するために必要なAnalyticDBコンピューティングユニット (ACU) の数。 値の範囲は2から、ジョブリソースグループで使用可能なコンピューティングリソースの最大数です。 データ取り込みの安定性とパフォーマンスを向上させるために、より多くのACUを指定することを推奨します。
説明ジョブリソースグループでデータ同期ジョブを作成すると、リソースグループ内のエラスティックリソースが使用され、システムは使用されたリソースをリソースグループから除外します。 たとえば、ジョブリソースグループが48のACUの予約済みコンピューティングリソースを有し、8つのACUを消費する同期ジョブが作成されると仮定する。 リソースグループで別の同期ジョブを作成する場合、最大40のACUを選択できます。
詳細設定
データ同期ジョブのカスタム設定。 カスタム設定を構成する場合は、テクニカルサポートにお問い合わせください。
上記のパラメーター設定を完了したら、送信をクリックします。
データ同期ジョブの開始
Simple Log Service / Kafka データ同期ページで、作成したデータ同期ジョブを見つけて、開始で、アクション列を作成します。
ページの右上隅にある [検索] をクリックします。 ジョブの状態がStartingに変わると、データ同期ジョブが開始されます。
データの分析
データ同期ジョブが完了したら、Spark JAR Developmentを使用して、AnalyticDB for MySQLクラスターに同期されたデータを分析できます。 Spark開発の詳細については、「Sparkエディター」および「概要」をご参照ください。
左側のナビゲーションウィンドウで、 .
デフォルトのテンプレートにSQL文を入力し、実行.
-- Here is just an example of SparkSQL. Modify the content and run your spark program. conf spark.driver.resourceSpec=medium; conf spark.executor.instances=2; conf spark.executor.resourceSpec=medium; conf spark.app.name=Spark SQL Test; conf spark.adb.connectors=oss; -- Here are your sql statements show tables from lakehouse20220413156_adbTest;
(オプション) [アプリケーション] タブでアプリケーションを見つけ、[操作] 列の [ログ] をクリックして、アプリケーションのSpark SQL実行ログを表示します。
データソースの管理
[データソース] ページで、次の表の [操作] 列に記載されている操作を実行できます。
API 操作 | 説明 |
リンクの新規作成 | 同期ジョブの作成または移行ジョブの作成ページに移動して、データソースを使用するジョブを作成します。 |
表示 | データソースの詳細な設定を表示します。 |
編集 | データソース名や説明などのデータソースパラメーターを変更します。 |
削除 | データソースを削除します。 説明 データソースがデータ同期またはデータ移行ジョブで使用されている場合、データソースを削除することはできません。 この場合、最初に Simple Log Service / Kafka データ同期 ページに移動してジョブを見つけ、操作 列の 削除 をクリックしてジョブを削除する必要があります。 |
異なる数のレイヤーで解析されたスキーマフィールドの例
ネストされたJSONフィールドは、異なる数のレイヤーで解析できます。 この例では、次のJSONデータがApsaraMQ for Kafkaに送信されます。
{
"name" : "zhangle",
"age" : 18,
"device" : {
"os" : {
"test":lag,
"member":{
"fa":zhangsan,
"mo":limei
}
},
"brand" : "none",
"version" : "11.4.2"
}
}
前述のJSONデータは、異なる数のレイヤーで解析できます。
解析なし
JSONフィールドは解析されず、JSONデータは直接表示されます。
JSONフィールド | 例 | 宛先フィールド名 |
__value__ | {"name": "zhangle","age" : 18, "device": {"os": {"test":lag,"member": {"fa":zhangsan,"mo":limei }}, "brand": "none","version": "11.4.2" }} | __value__ |
1層解析
JSONフィールドの最初のレイヤーが解析されます。
JSONフィールド | 例 | 宛先フィールド名 |
name | zhangle | name |
age | 18 | age |
device | {"os": {"test":lag,"member": {"fa":zhangsan,"mo":limei }}, "brand": "none","version": "11.4.2"} | device |
2層解析
JSONフィールドの最初の2つのレイヤーが解析されます。 フィールドにネストされたフィールドがない場合、フィールドは直接表示されます。 たとえば、名前と年齢のフィールドが直接表示されます。 フィールドにネストされたフィールドがある場合、フィールドの子フィールドが表示されます。 たとえば、デバイスフィールドはdevice.os
、device.br and
、device.version
として解析されます。
宛先フィールド名では、ピリオド (.) が自動的にアンダースコア (_) に変更されます。
JSONフィールド | 例 | 宛先フィールド名 |
name | zhangle | name |
age | 18 | age |
device.os | {"test":lag,"member":{ "fa":zhangsan,"mo":limei }} | device_os |
device.brおよび | none | device_brand |
デバイス. version | 11.4.2 | device_version |
3層解析
JSONフィールド | 例 | 宛先フィールド名 |
name | zhangle | name |
age | 18 | age |
device.os.test | ラグ | device_os_test |
device.os.member | {"fa":zhangsan,"mo":limei} | device_os_member |
device.brおよび | none | device_brand |
デバイス. version | 11.4.2 | device_version |
4層解析
JSONフィールド | 例 | 宛先フィールド名 |
name | zhangle | name |
age | 18 | age |
device.os.test | ラグ | device_os_test |
device.os.member.fa | zhangsan | device_os_member_fa |
device.os.member.mo | ライム | device_os_member_mo |
device.brおよび | none | device_brand |
デバイス. version | 11.4.2 | device_version |