Knative Eventingは、Broker-Triggerモデルを使用して、サーバーレスアプリケーションに堅牢なイベント駆動機能を提供します。 このコンポーネントは、イベントの転送とフィルタリングを可能にし、複雑なイベントフローを調整するために使用できます。 Broker-Triggerモデルは、ユーザーの行動に基づいて自動的に通知を送信したり、さまざまなソースからのデータストリームを処理したりするなど、外部トリガーまたは内部トリガーに応答するアプリケーションに最適です。 このトピックでは、Knative Eventingを使用してシンプルなイベント駆動型アーキテクチャを構築する方法について説明します。これには、サービスのデプロイ、メッセージ配信メカニズム、および機能検証が含まれます。
制御ポリシー機能の動作
Knative Eventingは、さまざまな外部イベントシステムとインターフェースする包括的なイベントモデルを提供します。 イベントが取り込まれた後、それらはクラウドイベント標準を使用して内部的に転送され、Broker-Triggerイベント駆動モデルを使用して処理されます。
次の図に示すように、Knativeイベント駆動型アーキテクチャでは、Brokerはメッセージ送信の仲介役として機能し、トリガーは特定の条件下でワークフローを自動的に開始します。 Broker-Triggerイベント駆動型モデルの原則は、TriggerにBrokerイベントをサブスクライブさせてフィルタリングさせ、その後、対応するサービスにイベントをディスパッチして消費させることです。
イベントソース: イベントの発生元。データベースの更新などの内部システム、またはクラウドメッセージングサービスなどの外部サービスを使用できます。
Ingress: Knativeクラスターに外部イベントを受信します。
チャネル: Knative Eventingでは、Broker-Triggerモデルを使用するには、イベントを転送する適切なチャネルを選択する必要があります。 サポートされているイベント転送チャネルには、ApsaraMQ for Kafka、NATS Streaming、およびInMemoryChannelが含まれます。 デフォルトのチャンネルはInMemoryChannelです。
ブローカー: イベント仲介者として機能し、設定されたトリガーに従ってさまざまなソースからイベントをルーティングします。 ブローカーは複雑なイベントフローと処理ロジックを実装し、NATSやApsaraMQ for Kafkaなどの複数のプロトコルとバックエンドイベントサービスをサポートし、イベントの管理と配布をより柔軟かつ効率的にします。
トリガー: イベントを特定のサービスにルーティングする方法を定義します。 各トリガは、通常はイベントタイプまたはコンテンツに基づくイベントフィルタ、およびサービスターゲットに関連付けられる。 フィルター条件に一致するイベントのみ、指定されたサービスに送信されます。
サービス: イベントの最終プロセッサ。 トリガーによってルーティングされたイベントを受信した後、対応するビジネスロジックを実行します。
前提条件
Knative ServingとKnative Eventingがインストールされています。 詳細については、「Knativeのデプロイ」「」および「Knative Eventingのデプロイ」「」をご参照ください。
以下のセクションでは、イベントの作成、送信、消費など、Knative Eventingフレームワークを使用して基本的なイベント駆動型システムを構築するプロセスについて説明します。
手順1: Knativeのデプロイサービス
このセクションでは、Knative Serviceの例としてevent-display
を使用します。 イベント表示
の主な役割は、イベントを受け取り、その内容を印刷することです。
次のYAMLテンプレートを使用してKnativeサービスを作成します。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/event-display:v1028
Knativeサービスをデプロイします。
kubectl apply -f event-display.yaml
Knativeサービスが作成されているかどうかを確認します。
kubectl get ksvc
期待される出力:
NAME URL LATESTCREATED LATESTREADY READY REASON event-display http://event-display.default.example.com event-display-00001 event-display-00001 True
手順2: ブローカーとトリガーの作成
default
という名前のブローカーとmy-service-Trigger
という名前のトリガーを作成します。 トリガーは、明示的に処理されないイベントが、手順1でデプロイされたイベント表示
サービスにルーティングされるようにします。
ブローカーを作成します。
次のYAMLテンプレートを使用してブローカーを作成します。
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: default namespace: default
ブローカーをデプロイします。
kubectl apply -f broker.yaml
ブローカーが作成されているかどうかを確認します。
kubectl get broker
期待される出力:
NAME URL AGE READY REASON default http://broker-ingress.knative-eventing.svc.cluster.local/default/default 9s True
出力には、ブローカーの情報とURLが表示されます。 URLは、イベントを送信する場所を示します。
トリガーを作成します。
以下のサンプルコードでトリガーを作成します。
デフォルトのBrokerに登録して、イベントを
event-display
Serviceにルーティングします。apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: my-service-trigger spec: broker: default subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
上記のYAMLテンプレートによって作成された
my-service-trigger
はdefault Broker
にリンクされ、event-display
Serviceをサブスクライバーとして指定します。 つまり、default Broker
に送信されたイベントは、それらを処理するために他の特定のトリガーが指定されていない場合、event-display
Serviceに転送されます。トリガーをデプロイします。
kubectl apply -f trigger.yaml
トリガーが作成されたかどうかを確認します。
kubectl get trigger
期待される出力:
NAME BROKER SUBSCRIBER_URI AGE READY REASON my-service-trigger default http://event-display.default.svc.cluster.local 22s True
出力は、トリガーの準備ができていることを示します。つまり、トリガーが正しく設定されており、イベントの受信とルーティングが可能です。
手順3: イベントを送信し、サービスがイベントを受信できるかどうかを確認する
HTTP POSTリクエストを外部ソースからブローカーに直接送信して、イベントトリガーをシミュレートします。 curl
コマンドを使用して、特定のCloudEventsヘッダーを含むリクエストをブローカーに送信し、event-display
Serviceがこのメッセージを受信して期待どおりに出力するかどうかを確認します。
broker-ingress.knativeイベント
を使用して、ブローカーからイベントリクエストアドレスを取得します。 Brokerにイベントリクエストを送信することで、Knative EventingシステムのBroker Serviceが正しく構成され、イベントを受信できることを確認します。kubectlの
port-forward
機能を使用して、クラスター内の内部サービスポートをローカルマシンに転送し、サービスへのアクセスを許可します。kubectl port-forward svc/broker-ingress -n knative-eventing 8080:80
curl
コマンドを使用してPOSTリクエストをローカルポート8080に送信し、サービスがリクエストを正しく受信して処理できることを確認します。curl -v "http://localhost:8080/default/default" \ -X POST \ -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" \ -H "Ce-Specversion: 1.0" \ -H "Ce-Type: dev.knative.samples.helloworld" \ -H "Ce-Source: dev.knative.samples/helloworldsource" \ -H "Content-Type: application/json" \ -d '{"msg":"Hello World from the curl pod."}'
期待される出力:
Note: Unnecessary use of -X or --request, POST is already inferred. * Trying 127.0.0.1:8080... * Connected to localhost (127.0.0.1) port 8080 (#0) > POST /default/default HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/8.1.2 > Accept: */* > Ce-Id: 53****3-88be-4077-9d7a-a3f162******9 > Ce-Specversion: 1.0 > Ce-Type: dev.knative.samples.helloworld > Ce-Source: dev.knative.samples/helloworldsource > Content-Type: application/json > Content-Length: 40 > < HTTP/1.1 202 Accepted < Allow: POST, OPTIONS < Date: Tue, 15 Oct 2024 09:36:42 GMT < Content-Length: 0 < * Connection #0 to host localhost left intact
期待される出力は、サービスが
http:// localhost:8080/default/default
に送信されたPOSTリクエストを正しく受信し、リクエストが受け入れられたことを示す202 Accepted
ステータスコードを返したことを確認します。
event-display
ポッドのログを調べて、イベントソースを確認します。event-display
ポッドの詳細を取得し、そのログを表示します。kubectl get pods --namespace=default | grep event-display # The output is as follows: event-display-00001-deployment-766f7b9fd6-gfcz5 2/2 Running 0 3m43s
ログを表示します。
kubectl logs event-display-00001-deployment-766f7b9fd6-gfcz5 # The log output is as follows: Defaulted container "user-container" out of: user-container, queue-proxy ☁️ cloudevents.Event Context Attributes, specversion: 1.0 type: dev.knative.samples.helloworld source: dev.knative.samples/helloworldsource id: 536808d3-88be-4077-9d7a-a3f162705f79 datacontenttype: application/json Extensions, knativearrivaltime: 2024-10-28T11:48:56.929517041Z Data, { "msg": "Hello World from the curl pod1." }
期待される出力は、Kubernetesで実行されているアプリケーションがKnative Samples Hello WorldソースからCloudEventsイベントを受信しており、これらのイベントのコンテンツを印刷していることを示しています。