Simple Log Serviceは、増分データをプルするときに、効率的に新しいデータまたは更新されたデータのみをプルします。 このトピックでは、res_rds_mysql関数を使用して、ApsaraDB RDS for MySQLインスタンスに作成されたデータベースから増分データを取得する方法について説明します。
前提条件
Simple Log Service
データはソースLogstoreにアップロードされます。 詳細については、「データ収集の概要」をご参照ください。
宛先Logstoreが作成されます。 詳細については、「Logstore の作成」をご参照ください。
RAM (Resource Access Management) ユーザーを使用する場合は、RAMユーザーにデータを変換する権限があることを確認してください。 詳細については、「RAMユーザーにデータ変換ジョブを管理する権限を付与する」をご参照ください。
インデックスは、ソースおよび宛先ログストアに設定されます。 詳細については、「インデックスの作成」をご参照ください。
データ変換はインデックスに依存しません。 ただし、インデックスを設定しないと、クエリまたは分析操作を実行できません。
ApsaraDB RDS
ApsaraDB RDS for MySQLインスタンスにデータベースが作成されます。 データベースへの接続に使用されるアカウントが作成されます。 詳細については、「アカウントとデータベースの作成」をご参照ください。
データはデータベースのテーブルにアップロードされます。
ApsaraDB RDS for MySQLインスタンスにホワイトリストが設定されています。 詳細については、「データベースクライアントまたはCLIを使用したApsaraDB RDS For MySQLインスタンスへの接続」をご参照ください。
重要res_rds_mysql関数を使用してApsaraDB RDS for MySQLインスタンスに作成されたデータベースからデータをプルする場合、インスタンスのホワイトリストを設定し、0.0.0.0をホワイトリストに追加する必要があります。
背景情報
テクノロジーエンタープライズは、顧客情報をApsaraDB RDS for MySQLインスタンスのデータベースに保存し、顧客メンテナンスログをSimple Log ServiceのLogstoreに保存します。 2つのタイプのデータは連続的に更新される。 企業は、2種類のデータに対してJOIN操作を実行し、結果を新しいLogstoreに保存したいと考えています。
この場合、Simple Log Serviceが提供するres_rds_mysql関数を使用できます。 この関数を使用すると、データベースからデータをプルし、そのデータを宛先Logstoreに保存できます。 データ変換ジョブを実行してデータベースから増分データを取得すると、Simple Log Serviceはデータベースのタイムスタンプフィールドに基づいて新しいデータまたは更新されたデータのみを取得します。 これは高い効率を保証する。 この関数は、必要なデータベースに大量のデータが存在する、データが頻繁に更新される、データが頻繁に削除される、またはリアルタイムのデータ変換が必要なシナリオで使用できます。
増分プルおよびその他のプルモードの詳細については、「res_rds_mysql」をご参照ください。
リソースとデータサンプル
シンプルなLog Serviceリソース
プロジェクト: client-Project
ソースLogstore: client-log
次の図は、データサンプルを示しています。
宛先ログストア: クライアント情報
ApsaraDB RDSリソース
データベース: client-db
表: クライアント
次の図は、データサンプルを示しています。
データベースへの接続に使用されるアカウントのユーザー名とパスワード: testとtest1234 @ @ @
データベースのパブリックエンドポイント: rm-bp1k **** tp8o.mysql.rds.aliyuncs.com
手順
データ変換ページに移動します。
[プロジェクト] セクションで、client-projectプロジェクトを見つけてクリックします。
タブで、client-Log Logstoreを見つけてクリックします。
クエリと分析ページで、データ変換.
表示されるページの右上隅で、管理するデータの時間範囲を指定します。
[生ログ] タブにデータが存在することを確認します。
エディターボックスに、データ変換ステートメントを入力します。
パラメーターの詳細については、「res_rds_mysql」をご参照ください。
e_table_map( res_rds_mysql( "rm-bp1k****tp8o.mysql.rds.aliyuncs.com", "test", "test1234@@", "client-db", table="client", fields=["c_id", "name", "telephone", "update_time"], refresh_interval=1, primary_keys="c_id", update_time_key="update_time", deleted_flag_key=None, ), "c_id", ["name", "telephone"], )
クイックモードでデータをプレビューします。
指定されたデータ変換ステートメントが有効かどうかを確認します。 詳細については、「クイックプレビュー」をご参照ください。
クイック.
タブで、次の内容を入力します。
{ "__source__": "192.0.2.0", "__time__": 1624956516, "__topic__": "log", "__tag__:__client_ip__":"192.0.2.2", "c_id": "1", "staff_id": "002", "status": "Ongoing follow-up", "tag": "Follow-up visit", }
タブで、次の内容を入力します。
c_id,name,telephone,update_time,delete_flag 1,maki,010-123,1606358931,false
データのプレビュー.
プレビュー結果を表示します。
詳細モードでデータをプレビューします。
Simple Log ServiceがApsaraDB RDS for MySQLインスタンスに接続されているかどうかを確認します。 詳細については、「高度なプレビュー」をご参照ください。
高度な.
クリックデータのプレビュー.
プレビュー設定の追加パネルで、[認証方法] パラメーターを設定し、OK.
プレビュー結果を表示します。
データ変換ジョブを作成します。
クリック変換ジョブとして保存.
データ変換ジョブの作成パネル、パラメータを設定し、OK.
詳細については、「データ変換ジョブの作成」をご参照ください。
データ変換ジョブが作成された後、変換されたデータを宛先Logstoreで表示できます。
よくある質問
増分プルモードで削除機能を使用するにはどうすればよいですか?
増分プルモードでは、Simple Log Serviceは、必要なデータベーステーブルのプライマリキーと時間フィールドに基づいて、新しいデータまたは更新されたデータのみをプルします。 データベーステーブルのデータレコードにdelete_flag=true
を設定して、レコードを削除対象としてマークした場合、Simple Log Serviceは変換のためにレコードを引き続きプルします。 この問題を解決するために、Simple Log Serviceはdeleted_flag_keyパラメーターをres_rds_mysql関数に追加します。 データ変換ステートメントでdeleted_flag_keyパラメーターを設定してデータベーステーブルからデータをプルすると、ジョブがテーブルから更新されたデータを取得した後、データ変換ジョブはdelete_flagパラメーターがtrueに設定されているデータ行をジョブのメモリ内ディメンションテーブルから削除します。 これは、データベーステーブルの内容には影響しません。 インメモリディメンションテーブルから削除されたデータ行は、ログデータに対して実行される後続のJOIN操作から除外されます。
データ行のdelete_flagパラメーターがtrueまたは1に設定されている場合、データ行は削除対象としてマークされます。 詳細については、「res_rds_mysql」をご参照ください。
データ変換ステートメントでdeleted_flag_keyパラメーターを設定する場合は、update_time_keyパラメーターも設定する必要があります。
たとえば、name=miaおよびname=tomデータレコードがApsaraDB RDS For MySQLインスタンスのデータベースに挿入され、name=miaデータレコードのdelete_flagパラメーターがtrueに設定されているとします。 これは、データレコードが削除のためにマークされることを示す。 Simple Log Serviceがメモリ内ディメンションテーブルを更新すると、name=miaデータレコードはメモリ内ディメンションテーブルから削除され、変換されません。
次のサンプルコードは、データ変換ステートメントの例を示しています。
e_table_map(
res_rds_mysql(
"rm-bp1****l3tp.mysql.rds.aliyuncs.com",
"test",
"test1234@@",
"client-db",
table="client",
fields=["c_id", "name", "telephone", "update_time"],
refresh_interval=1,
primary_keys="c_id",
update_time_key="update_time",
deleted_flag_key="delete_flag",
),
"c_id",
["name", "telephone"],
)