Realtime Compute for Apache Flinkを使用して、Simple Log Serviceのログデータを消費するSimple log Serviceソーステーブルを作成できます。 このトピックでは、Realtime Compute for Apache Flinkを使用してSimple Log Serviceソーステーブルを作成する方法と、作成プロセスに関連する属性フィールドを抽出する方法について説明します。
背景情報
次の表に、Realtime Compute for Apache Flinkがログデータを使用するために構成する必要がある設定を示します。
カテゴリ | 説明 |
サポートされているタイプ | ソーステーブルと結果テーブルを設定できます。 |
実行モード | ストリーミングモードのみがサポートされています。 |
メトリック | メトリックはサポートされていません。 |
データフォーマット | なし。 |
APIタイプ | SQL文がサポートされています。 |
結果テーブルでログデータを更新または削除できるかどうか | 結果テーブルのログデータを更新または削除することはできません。 ログデータは結果テーブルにのみ挿入できます。 |
Realtime Compute For Apache Flinkを使用してログデータを使用する方法の詳細については、「Flink SQLデプロイの開始方法」をご参照ください。
前提条件
Resource Access Management (RAM) ユーザーまたはRAMロールを使用してログデータを消費する場合は、RAMユーザーまたはRAMロールがRealtime Compute for Apache Flinkコンソールで必要な権限を持っていることを確認してください。 詳細については、「権限管理」をご参照ください。
Realtime Compute for Apache Flinkワークスペースが作成されます。 詳細については、「Realtime Compute For Apache Flinkの有効化」をご参照ください。
プロジェクトと Logstore が作成済みである必要があります。 詳細については、「プロジェクトとLogstoreの作成」をご参照ください。
制限事項
Ververica Runtime (VVR) 2.0.0以降を使用するRealtime Compute for Apache Flinkのみが、Simple Log Serviceコネクタをサポートしています。
Simple Log Serviceコネクタは、少なくとも1回のセマンティクスのみをサポートします。
VVR 4.0.13以降を使用するRealtime Compute for Apache Flinkのみが、シャード番号の変更によるデプロイの自動フェイルオーバーをサポートします。
ソースノードの配置並列処理をシャードの数より大きい値に設定しないことを推奨します。 ソースノードのデプロイの並列性がシャードの数よりも大きい場合、リソースが無駄になる可能性があります。 VVR 8.0.5以前を使用するRealtime Compute for Apache Flinkでは、シャードの数が変更されると、デプロイの自動フェイルオーバーが無効になり、特定のシャードが消費されない可能性があります。
Simple Log Serviceソーステーブルと結果テーブルの作成
Realtime Compute for Apache Flinkを使用してSimple log Serviceでログデータを消費する前に、完全なSQLドラフトを作成する必要があります。 完全なSQLドラフトには、ソーステーブルと結果テーブルが含まれます。 ソーステーブルのログデータが処理された後、INSERT into
ステートメントを使用して結果が結果テーブルに挿入されます。
Realtime Compute For Apache FlinkでSQLドラフトを開発する方法の詳細については、「SQLドラフトの開発」をご参照ください。
Simple Log Serviceはログデータをリアルタイムで保存します。 Realtime Compute for Apache Flinkは、入力データとしてストリーミングモードでデータを読み取ることができます。 次のコードは、ログの例を示しています。
__source__: 11.85.*.199
__tag__:__receive_time__: 1562125591
__topic__: test-topic
request_method: GET
status: 200
サンプルコード
次のコードは、Simple log Serviceでログデータを消費するためにRealtime Compute for Apache Flinkで開発できるSQLドラフトの例を示しています。
SQLドラフトのテーブル、列、および予約フィールドの名前が互いに競合する場合は、名前をbackticks (') で囲む必要があります。
CREATE TEMPORARY TABLE sls_input(
request_method STRING,
status BIGINT,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` BIGINT METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
request_method STRING,
status BIGINT,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
request_method,
status,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;
WITH句のパラメーター
共通パラメーター
パラメーター
説明
データ型
必須
デフォルト値
補足
コネクター
テーブルのタイプ。
STRING
はい
なし
値をslsに設定します。
endPoint
Simple Log Serviceのエンドポイント。
STRING
はい
なし
Simple Log Serviceの内部エンドポイントを入力します。 詳細については、「エンドポイント」をご参照ください。
説明デフォルトでは、Realtime Compute for Apache Flinkはインターネットにアクセスできません。 この場合、Alibaba Cloudは、仮想プライベートクラウド (VPC) とインターネット間の通信を可能にするNATゲートウェイを提供します。 詳細については、リファレンストピックの「Realtime Compute For Apache Flinkがインターネットにアクセスする方法」を参照してください。
必要な場合を除き、インターネット経由でSimple Log Serviceにアクセスしないことを推奨します。 インターネット経由でSimple Log Serviceにアクセスするには、HTTPSを使用してグローバルアクセラレーション機能を有効にします。 詳細については、「Endpoints」トピックの「Endpoint For global acceleration」セクションをご参照ください。
project
Simple Log Serviceプロジェクトの名前。
STRING
はい
なし
非該当
logStore
Simple Log ServiceのLogstoreまたはMetricstoreの名前。
STRING
はい
なし
Logstore内のデータは、Metricstoreと同じ方法で消費されます。
accessId
Alibaba CloudアカウントのAccessKey ID。
STRING
はい
なし
詳細については、「リファレンス」トピックの「アカウントのAccessKey IDとAccessKeyシークレットに関する情報の表示方法」を参照してください。
重要AccessKeyペアを保護するために、鍵管理方法を使用してAccessKey IDを設定することを推奨します。 詳細については、「変数とキーの管理」をご参照ください。
accessKey
Alibaba CloudアカウントのAccessKeyシークレット。
STRING
はい
なし
詳細については、「リファレンス」トピックの「アカウントのAccessKey IDとAccessKeyシークレットに関する情報の表示方法」を参照してください。
重要AccessKeyペアを保護するために、鍵管理方法を使用してAccessKeyシークレットを設定することを推奨します。 詳細については、「変数とキーの管理」をご参照ください。
ソーステーブルのみのパラメータ
パラメーター
説明
データ型
必須
デフォルト値
補足
startupMode
ソーステーブルの起動モード。
STRING
いいえ
timestamp
有効な値:
timestamp: ログは指定された開始時刻から消費されます。 デフォルト値です。
latest: ログは最新のオフセットから消費されます。
least: ログは最も早いオフセットから消費されます。
説明consumeFromCheckpointパラメーターをtrueに設定すると、ログは指定されたコンシューマーグループに格納されているチェックポイントから消費されます。 このパラメーターで指定した起動モードは有効になりません。
startTime
ログが消費され始める時刻。
STRING
いいえ
現在の時刻
形式: yyyy-MM-dd hh:mm:ss。
このパラメーターは、startupModeパラメーターがtimestampに設定されている場合にのみ有効です。
説明startTimeパラメーターとstopTimeパラメーターは、Simple Log Serviceソーステーブルの__receive_time__属性フィールドに基づいて設定されますが、__timestamp__属性フィールドには基づいていません。
stopTime
ログの消費が停止した時刻。
STRING
✕
なし
形式: yyyy-MM-dd hh:mm:ss。
説明ログの消費が完了した後にRealtime Compute for Apache Flinkを終了する場合は、このパラメーターと一緒にexitAfterFinishパラメーターを設定し、exitAfterFinishパラメーターをtrueに設定する必要があります。
consumerGroup
消費者グループの名前です。
STRING
✕
なし
消費者グループは、消費の進捗を記録する。 カスタムコンシューマグループ名を指定できます。 名前の形式は固定されていません。
説明消費者グループは、共同消費のために複数の展開で共有できません。 Realtime Compute for Apache Flink展開ごとに異なるコンシューマーグループを指定することを推奨します。 異なるRealtime Compute for Apache Flink展開に同じコンシューマーグループを指定すると、すべてのデータが消費されます。 Realtime Compute for Apache FlinkがSimple Log Serviceのデータを消費する場合、データはSimple Log Serviceコンシューマーグループでシャードされません。 したがって、デプロイメントが同じコンシューマグループを共有する場合、コンシューマグループ内のすべてのメッセージは各デプロイメントによって消費されます。
consumeFromCheckpoint
指定されたコンシューマーグループに保存されているチェックポイントからログを消費するかどうかを指定します。
STRING
いいえ
false
有効な値:
true: このパラメーターをtrueに設定した場合、コンシューマーグループを指定する必要があります。 コンシューマーグループを指定すると、Realtime Compute for Apache Flinkは、コンシューマーグループに保存されているチェックポイントのログを消費します。 コンシューマーグループにチェックポイントが存在しない場合、Realtime Compute for Apache FlinkはstartTimeパラメーターで指定された時間からログを消費します。
false: Realtime Compute for Apache Flinkは、指定されたコンシューマーグループに保存されているチェックポイントのログを消費しません。 デフォルト値です。
説明VVR 6.0.5以降を使用するRealtime Compute for Apache Flinkのみがこのパラメーターをサポートしています。
directMode
Simple Log Serviceの直接接続モードを有効にするかどうかを指定します。
STRING
いいえ
false
有効な値:
true: 直接接続モードが有効です。
false: 直接接続モードは無効です。 デフォルト値です。
maxRetries
Simple Log Serviceからのデータの読み取りに失敗した場合に許可される再試行の回数。
STRING
いいえ
3
非該当
batchGetSize
リクエストでデータを読み取るロググループの数。
STRING
いいえ
100
batchGetSizeパラメーターの値は1000を超えることはできません。 それ以外の場合は、エラーが返されます。
exitAfterFinish
データ消費が完了した後にRealtime Compute for Apache Flinkが終了するかどうかを指定します。
STRING
いいえ
false
有効な値:
true: Realtime Compute for Apache Flinkは、データの消費が完了すると終了します。
false: Realtime Compute for Apache Flinkは、データの消費が完了すると終了しません。 デフォルト値です。
説明VVR 4.0.13以降を使用するRealtime Compute for Apache Flinkのみがこのパラメーターをサポートしています。
query
Simple Log Serviceでデータを消費する前にデータを前処理するために使用されるクエリステートメント。
STRING
✕
なし
クエリパラメーターを設定すると、Realtime Compute for Apache FlinkがSimple Log Serviceのデータを消費する前に、Realtime Compute for Apache FlinkはSimple Log Serviceからデータを除外できます。 これにより、Realtime Compute for Apache FlinkがSimple Log Serviceのすべてのデータを消費するのを防ぎます。 これにより、コストが削減され、データ処理効率が向上します。
たとえば、
'query' = '* | where request_method = ''GET'''
ステートメントを実行すると、Realtime Compute For Apache Flinkは、Realtime Compute for Apache FlinkがSimple Log ServiceのLogstoreからデータを読み取る前に、request_methodフィールドの値がGET句の値に等しいデータと一致します。説明クエリ文を実行してデータを前処理する場合は、Simple Log Service Processing Language (SPL) が必要です。 詳細については、「SPLの概要」をご参照ください。
重要VVR 8.0.1以降を使用するRealtime Compute for Apache Flinkのみがこのパラメーターをサポートしています。
Simple Log Serviceがこの機能をサポートしているリージョンの詳細については、「ルールに基づくログの消費」をご参照ください。
この機能は、パブリックプレビュー段階では無料です。 将来、Simple Log Serviceに対して課金される可能性があります。 詳細については、「ルールに基づくログの消費」トピックの「課金」セクションをご参照ください。
結果テーブルのパラメーターのみ
パラメーター
説明
データ型
必須
デフォルト値
補足
topicField
フィールド名を指定します。 このパラメーターの値は、__topic__ attributeフィールドの値を上書きして、ログのトピックを示します。
STRING
✕
なし
このパラメーターの値は、テーブルの既存のフィールドである必要があります。
timeField
フィールド名を指定します。 このパラメーターの値は、__timestamp__属性フィールドの値を上書きして、ログの書き込み時間を示します。
STRING
いいえ
現在の時刻
このパラメーターの値は、テーブル内のINT型の既存のフィールドである必要があります。 フィールドを指定しない場合は、デフォルトで現在の時刻が使用されます。
sourceField
フィールド名を指定します。 このパラメーターの値は、__source__属性フィールドの値を上書きして、ログの発生元を示します。 たとえば、値はログを生成するマシンのIPアドレスです。
STRING
✕
なし
このパラメーターの値は、テーブルの既存のフィールドである必要があります。
partitionField
フィールド名を指定します。 ハッシュ値は、データがSimple Log Serviceに書き込まれるときに、このパラメーターの値に基づいて計算されます。 同じハッシュ値を含むデータは、同じシャードに書き込まれます。
STRING
✕
なし
このパラメーターを指定しない場合、各データエントリは使用可能なシャードにランダムに書き込まれます。
バケット
partitionFieldパラメーターを指定したときにハッシュ値に基づいて再グループ化されるバケットの数。
STRING
いいえ
64
有効な値: [1,256] 。 このパラメーターの値は、2の整数乗である必要があります。 バケットの数は、シャードの数以上である必要があります。 それ以外の場合、データは特定のシャードに書き込まれません。
説明VVR 6.0.5以降を使用するRealtime Compute for Apache Flinkのみがこのパラメーターをサポートしています。
flushIntervalMs
データ書き込みがトリガーされる間隔。
STRING
いいえ
2000
単位:ミリ秒。
writeNullProperties
null値を空の文字列としてSimple Log Serviceに書き込むかどうかを指定します。
BOOLEAN
いいえ
true
有効な値:
true: Null値は空の文字列としてSimple Log Serviceに書き込まれます。 デフォルト値です。
false: Null値はSimple Log Serviceに書き込まれません。
説明VVR 8.0.6以降を使用するRealtime Compute for Apache Flinkのみがこのパラメーターをサポートしています。
属性フィールドの抽出
Realtime Compute for Apache Flinkは、ログフィールド、カスタムフィールド、および次の属性フィールドを抽出できます。
フィールド | 種類 | 説明 |
__source__ | 流れのメタデータ仮想 | メッセージソース。 |
__topic__ | 流れのメタデータ仮想 | メッセージトピック。 |
__timestamp__ | BIGINT METDATA VRTUAL | ログ時間。 |
__tag__ | 地図 <VARCHAR、VARCHAR> メタデータ仮想 | メッセージタグ。
|
属性フィールドを抽出するには、SQL文でヘッダーを定義する必要があります。 例:
create table sls_stream(
__timestamp__ bigint HEADER,
__receive_time__ bigint HEADER
b int,
c varchar
) with (
'connector' = 'sls',
'endpoint' ='cn-hangzhou.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
関連ドキュメント
Realtime Compute For Apache FlinkのDataStream APIを使用してログデータを消費する方法の詳細については、「DataStream API」をご参照ください。