Function ComputeのApsaraDB RDSトリガー機能は、2023年1月31日に廃止される予定です。 ApsaraDB RDSイベントを使用してFunction Computeの関数をトリガーする場合は、のパスを使用することを推奨します。 このトピックでは、ApsaraDB RDSトリガーの新しいパスを設定する方法について説明します。 このトピックでは、イベントパラメーターの違いについても説明します。
前提条件
DTSデータサブスクリプションタスクを構成するために使用できるデータベースアカウントが作成されます。 詳細については、「アカウントの作成」をご参照ください。
EventBridgeが有効になり、RAMユーザーに権限が付与されます。 詳細については、「EventBridgeの有効化とRAMユーザーへの権限付与」をご参照ください。
function Computeコンソールでサービスと関数が作成されます。 詳細については、「サービスの作成」および「関数の作成」をご参照ください。
手順
DTSコンソールでの設定
DTSコンソールにログインします。 上部のナビゲーションバーで、リージョンを選択します。
左側のナビゲーションウィンドウで、[追跡の変更] をクリックします。 [追跡変更] ページの右上隅にある [タスクの作成] をクリックします。
[タスクの作成] ページの [ソースデータベースと宛先データベースの設定] ステップで、[ソースデータベース] セクションと [消費者ネットワークタイプ] セクションのパラメーターを設定します。 次に、[テスト接続と続行] をクリックします。
次の表に、設定する必要があるパラメーターを示します。 他のパラメーターのデフォルト値を保持します。
説明テストの接続を確立するには、インターネットアクセスが必要です。 ApsaraDB RDSインスタンスのインターネットアクセス機能が有効になっていることを確認します。 インターネットアクセス機能は、ApsaraDB RDSコンソールの [データベース接続] ページで設定できます。
パラメーター
説明
例
ソースデータベースセクション
データベースタイプ
ソースデータベースのタイプを選択します。
MySQL
RDSインスタンスID
データの変更を追跡するApsaraDB RDSインスタンスを選択します。
rm-bp1pw60i18f2x ****
データベースアカウント
ソースApsaraDB RDSインスタンスのデータベースアカウント。
db_chi
データベースパスワード
ソースApsaraDB RDSデータベースのアカウントのパスワード。
*************
消費者ネットワークタイプセクション
[VPC]
変更追跡インスタンスが属する仮想プライベートクラウド (VPC) を選択します。
説明コンシューマーネットワークタイプセクションで指定するVPCおよびvSwitchは、ApsaraDB RDSインスタンスのVPCおよびvSwitchと同じである必要があります。 ApsaraDB RDSコンソールの [データベース接続] ページで、ApsaraDB RDSインスタンスのVPCおよびvSwitchに関する情報を表示できます。
vpc-bp12c5dzorfoizcez ****
vSwitch
変更追跡インスタンスが属するvSwitchを選択します。
説明コンシューマーネットワークタイプセクションで指定するVPCおよびvSwitchは、ApsaraDB RDSインスタンスのVPCおよびvSwitchと同じである必要があります。 ApsaraDB RDSコンソールの [データベース接続] ページで、ApsaraDB RDSインスタンスのVPCおよびvSwitchに関する情報を表示できます。
vsw-bp1lt2oxenx87jvc8 ****
[タスクの作成] ページの [オブジェクトの構成と詳細設定] ステップで、データ型とテーブルを選択します。 次に、画面の指示に従って、[詳細設定] ステップと [事前チェック] ステップで設定を完了します。
[タスクの作成] ページの [インスタンスの購入] ステップで、画面の指示に従ってインスタンスを購入します。 次に、タスクリストでタスクを開始します。
作成したタスクのIDをクリックします。 [タスク管理] ページで、[データの使用] をクリックします。 表示されるページで、[コンシューマーグループの追加] をクリックします。 [コンシューマーグループの追加] パネルで、[コンシューマーグループ名] 、[アカウント] 、[パスワード] のパラメーターを設定します。
EventBridgeコンソールの設定
EventBridgeコンソールにログインします。 左側のナビゲーションウィンドウで、[イベントストリーム] をクリックします。
上部のナビゲーションバーでリージョンを選択し、[イベントストリームの作成] をクリックします。
[EventStreamingの作成] パネルでパラメーターを設定し、[OK] をクリックします。
[基本情報] ステップで、[EventStreaming Name] パラメーターと [Description] パラメーターを設定し、[次のステップ] をクリックします。
[イベントソース] ステップで、[イベントプロバイダー] ドロップダウンリストから [データ送信サービス (DTS)] を選択し、前のセクションのステップ6の [コンシューマーグループ名] パラメータに指定したコンシューマーグループをコンシューマーグループドロップダウンリストから選択します。PasswordパラメーターとConsumer Offsetパラメーターを設定します。 [次のステップ] をクリックします。
オプション: [ルール] ステップで、イベントルールを設定し、[次のステップ] をクリックします。
では、イベント対象ステップ、セットサービスタイプパラメーターをFunction Computeを設定し、サービスパラメータと、関数パラメーターを使用します。
イベントストリームの作成後、[イベントストリーム] ページで、[イベントソース] がDTS、[イベントターゲット] が [Function Compute] のイベントストリームを表示できます。 イベントストリームが実行中の状態です。
上記の設定を完了すると、新しいパスが設定されます。
イベント形式に基づいて関数コードを変更する
ApsaraDB RDSトリガーの新しいパスのデータ形式は、元のApsaraDB RDSトリガーのデータ形式とは異なります。 以下の項目は違いを説明します。 新しいパスを使用する前に、コード内のイベントパラメーターを解析するために関数によって使用されるメソッドを変更する必要があります。
ApsaraDB RDSトリガーの元のパスは、です。 このパスでは、イベントデータはオブジェクト形式です。
ApsaraDB RDSトリガーの新しいパスは、です。 このパスでは、イベントデータは配列形式です。 配列内の各要素はstring型であり、文字列はオブジェクトから変換されます。 詳細については、「DTS」をご参照ください。
次のセクションでは、PythonおよびJavaで元のパスと新しいパスを使用したイベント解析のデモについて説明します。 パスの違いを表示したり、関数のイベント解析ロジックを変更したりできます。
Python
パスを使用するイベント解析のサンプルコード
# -*- coding: utf-8 -*- import json import logging def handler(event, context): logger = logging.getLogger() eventObj = json.loads(event) logger.info("rds trigger event = {}".format(eventObj)) # You can add your processing logic in the code. Example: Update Redis cache. return "OK"を返す
パスを使用するイベント解析のサンプルコード
# -*- coding: utf-8 -*- import logging import json def handler(event, context): logger = logging.getLogger() # Convert the input parameter of the event to a string list. evt_list = json.loads(event) for evt in evt_list: # Convert each element in the string list to data of the JSON type. evt_obj = json.loads(evt) logger.info(evt_obj) return "OK"を返す
Java
パスを使用するイベント解析のサンプルコード
package example; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import com.google.gson.*; import com.google.gson.JsonParser; import com.google.gson.JsonObject; import com.aliyun.fc.runtime.Context; import com.aliyun.fc.runtime.StreamRequestHandler; import java.io.*; import java.nio.charset.StandardCharsets; public class App implements StreamRequestHandler { @Override public void handleRequest( InputStream inputStream, OutputStream outputStream, Context context) throws IOException { // Read data from the input parameters and store the data in ByteBuffer. ByteArrayOutputStream result = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; for (int length; (length = inputStream.read(buffer)) != -1; ) { result.write(buffer, 0, length); } // Convert data of the byte type to data of the object type. String stringData = result.toString(StandardCharsets.UTF_8.name()); JsonObject jsonObj = (JsonObject)(new JsonParser().parse(stringData)); System.out.println(jsonObj); outputStream.write(new String("OK").getBytes()); } }次のサンプルコードは、pom.xmlファイルの例を示しています。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>example</groupId> <artifactId>FCJavaDemo</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>FCJavaDemo</name> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.aliyun.fc.runtime</groupId> <artifactId>fc-java-core</artifactId> <version>1.4.1</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-my-jar-with-dependencies</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <properties> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> <maven.test.skip>true</maven.test.skip> </properties> </project>パスを使用するイベント解析のサンプルコード
package example; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import com.google.gson.*; import com.google.gson.JsonParser; import com.google.gson.JsonObject; import com.aliyun.fc.runtime.Context; import com.aliyun.fc.runtime.StreamRequestHandler; import java.io.*; import java.nio.charset.StandardCharsets; public class App implements StreamRequestHandler { @Override public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { // Read data from the input parameters and store the data in ByteBuffer. ByteArrayOutputStream result = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; for (int length; (length = inputStream.read(buffer)) != -1; ) { result.write(buffer, 0, length); } // Convert data of the byte type to data of the string type. String stringData = result.toString(StandardCharsets.UTF_8.name()); Gson gson =new Gson(); String[] stringArrayData = gson.fromJson(stringData, String[].class); // Scan each element in the string and convert each element into data of the JSON type. The data field is the original data of DTS. for(String elem : stringArrayData){ JsonObject jsonObj = (JsonObject)(new JsonParser().parse(elem)); System.out.println(jsonObj.get("data")); } outputStream.write((new String("OK")).getBytes()); } }次のサンプルコードは、pom.xmlファイルの例を示しています。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>example</groupId> <artifactId>FCJavaDemo</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>FCJavaDemo</name> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.aliyun.fc.runtime</groupId> <artifactId>fc-java-core</artifactId> <version>1.4.1</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-my-jar-with-dependencies</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <properties> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> <maven.test.skip>true</maven.test.skip> </properties> </project>