すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Realtime Compute for Apache Flinkを使用してログデータを消費する

最終更新日:Sep 29, 2024

Realtime Compute for Apache Flinkを使用して、Simple Log Serviceのログデータを消費するSimple log Serviceソーステーブルを作成できます。 このトピックでは、Realtime Compute for Apache Flinkを使用してSimple Log Serviceソーステーブルを作成する方法と、作成プロセスに関連する属性フィールドを抽出する方法について説明します。

背景情報

次の表に、Realtime Compute for Apache Flinkがログデータを使用するために構成する必要がある設定を示します。

カテゴリ

説明

サポートされているタイプ

ソーステーブルと結果テーブルを設定できます。

実行モード

ストリーミングモードのみがサポートされています。

メトリック

メトリックはサポートされていません。

データフォーマット

なし。

APIタイプ

SQL文がサポートされています。

結果テーブルでログデータを更新または削除できるかどうか

結果テーブルのログデータを更新または削除することはできません。 ログデータは結果テーブルにのみ挿入できます。

Realtime Compute For Apache Flinkを使用してログデータを使用する方法の詳細については、「Flink SQLデプロイの開始方法」をご参照ください。

前提条件

  • Resource Access Management (RAM) ユーザーまたはRAMロールを使用してログデータを消費する場合は、RAMユーザーまたはRAMロールがRealtime Compute for Apache Flinkコンソールで必要な権限を持っていることを確認してください。 詳細については、「権限管理」をご参照ください。

  • Realtime Compute for Apache Flinkワークスペースが作成されます。 詳細については、「Realtime Compute For Apache Flinkの有効化」をご参照ください。

  • プロジェクトと Logstore が作成済みである必要があります。 詳細については、「プロジェクトとLogstoreの作成」をご参照ください。

制限事項

  • Ververica Runtime (VVR) 2.0.0以降を使用するRealtime Compute for Apache Flinkのみが、Simple Log Serviceコネクタをサポートしています。

  • Simple Log Serviceコネクタは、少なくとも1回のセマンティクスのみをサポートします。

  • VVR 4.0.13以降を使用するRealtime Compute for Apache Flinkのみが、シャード番号の変更によるデプロイの自動フェイルオーバーをサポートします。

  • ソースノードの配置並列処理をシャードの数より大きい値に設定しないことを推奨します。 ソースノードのデプロイの並列性がシャードの数よりも大きい場合、リソースが無駄になる可能性があります。 VVR 8.0.5以前を使用するRealtime Compute for Apache Flinkでは、シャードの数が変更されると、デプロイの自動フェイルオーバーが無効になり、特定のシャードが消費されない可能性があります。

Simple Log Serviceソーステーブルと結果テーブルの作成

重要

Realtime Compute for Apache Flinkを使用してSimple log Serviceでログデータを消費する前に、完全なSQLドラフトを作成する必要があります。 完全なSQLドラフトには、ソーステーブルと結果テーブルが含まれます。 ソーステーブルのログデータが処理された後、INSERT intoステートメントを使用して結果が結果テーブルに挿入されます。

Realtime Compute For Apache FlinkでSQLドラフトを開発する方法の詳細については、「SQLドラフトの開発」をご参照ください。

Simple Log Serviceはログデータをリアルタイムで保存します。 Realtime Compute for Apache Flinkは、入力データとしてストリーミングモードでデータを読み取ることができます。 次のコードは、ログの例を示しています。

__source__:  11.85.*.199
__tag__:__receive_time__:  1562125591
__topic__:  test-topic
request_method:  GET
status:  200

サンプルコード

次のコードは、Simple log Serviceでログデータを消費するためにRealtime Compute for Apache Flinkで開発できるSQLドラフトの例を示しています。

重要

SQLドラフトのテーブル、列、および予約フィールドの名前が互いに競合する場合は、名前をbackticks (') で囲む必要があります。

CREATE TEMPORARY TABLE sls_input(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
  request_method,
  status,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

WITH句のパラメーター

  • 共通パラメーター

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    補足

    コネクター

    テーブルのタイプ。

    STRING

    はい

    なし

    値をslsに設定します。

    endPoint

    Simple Log Serviceのエンドポイント。

    STRING

    はい

    なし

    Simple Log Serviceの内部エンドポイントを入力します。 詳細については、「エンドポイント」をご参照ください。

    説明
    • デフォルトでは、Realtime Compute for Apache Flinkはインターネットにアクセスできません。 この場合、Alibaba Cloudは、仮想プライベートクラウド (VPC) とインターネット間の通信を可能にするNATゲートウェイを提供します。 詳細については、リファレンストピックの「Realtime Compute For Apache Flinkがインターネットにアクセスする方法」を参照してください。

    • 必要な場合を除き、インターネット経由でSimple Log Serviceにアクセスしないことを推奨します。 インターネット経由でSimple Log Serviceにアクセスするには、HTTPSを使用してグローバルアクセラレーション機能を有効にします。 詳細については、「Endpoints」トピックの「Endpoint For global acceleration」セクションをご参照ください。

    project

    Simple Log Serviceプロジェクトの名前。

    STRING

    はい

    なし

    非該当

    logStore

    Simple Log ServiceのLogstoreまたはMetricstoreの名前。

    STRING

    はい

    なし

    Logstore内のデータは、Metricstoreと同じ方法で消費されます。

    accessId

    Alibaba CloudアカウントのAccessKey ID。

    STRING

    はい

    なし

    詳細については、「リファレンス」トピックの「アカウントのAccessKey IDとAccessKeyシークレットに関する情報の表示方法」を参照してください。

    重要

    AccessKeyペアを保護するために、鍵管理方法を使用してAccessKey IDを設定することを推奨します。 詳細については、「変数とキーの管理」をご参照ください。

    accessKey

    Alibaba CloudアカウントのAccessKeyシークレット。

    STRING

    はい

    なし

    詳細については、「リファレンス」トピックの「アカウントのAccessKey IDとAccessKeyシークレットに関する情報の表示方法」を参照してください。

    重要

    AccessKeyペアを保護するために、鍵管理方法を使用してAccessKeyシークレットを設定することを推奨します。 詳細については、「変数とキーの管理」をご参照ください。

  • ソーステーブルのみのパラメータ

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    補足

    startupMode

    ソーステーブルの起動モード。

    STRING

    いいえ

    timestamp

    有効な値:

    • timestamp: ログは指定された開始時刻から消費されます。 デフォルト値です。

    • latest: ログは最新のオフセットから消費されます。

    • least: ログは最も早いオフセットから消費されます。

    説明

    consumeFromCheckpointパラメーターをtrueに設定すると、ログは指定されたコンシューマーグループに格納されているチェックポイントから消費されます。 このパラメーターで指定した起動モードは有効になりません。

    startTime

    ログが消費され始める時刻。

    STRING

    いいえ

    現在の時刻

    形式: yyyy-MM-dd hh:mm:ss。

    このパラメーターは、startupModeパラメーターがtimestampに設定されている場合にのみ有効です。

    説明

    startTimeパラメーターとstopTimeパラメーターは、Simple Log Serviceソーステーブルの__receive_time__属性フィールドに基づいて設定されますが、__timestamp__属性フィールドには基づいていません。

    stopTime

    ログの消費が停止した時刻。

    STRING

    なし

    形式: yyyy-MM-dd hh:mm:ss。

    説明

    ログの消費が完了した後にRealtime Compute for Apache Flinkを終了する場合は、このパラメーターと一緒にexitAfterFinishパラメーターを設定し、exitAfterFinishパラメーターをtrueに設定する必要があります。

    consumerGroup

    消費者グループの名前です。

    STRING

    なし

    消費者グループは、消費の進捗を記録する。 カスタムコンシューマグループ名を指定できます。 名前の形式は固定されていません。

    説明

    消費者グループは、共同消費のために複数の展開で共有できません。 Realtime Compute for Apache Flink展開ごとに異なるコンシューマーグループを指定することを推奨します。 異なるRealtime Compute for Apache Flink展開に同じコンシューマーグループを指定すると、すべてのデータが消費されます。 Realtime Compute for Apache FlinkがSimple Log Serviceのデータを消費する場合、データはSimple Log Serviceコンシューマーグループでシャードされません。 したがって、デプロイメントが同じコンシューマグループを共有する場合、コンシューマグループ内のすべてのメッセージは各デプロイメントによって消費されます。

    consumeFromCheckpoint

    指定されたコンシューマーグループに保存されているチェックポイントからログを消費するかどうかを指定します。

    STRING

    いいえ

    false

    有効な値:

    • true: このパラメーターをtrueに設定した場合、コンシューマーグループを指定する必要があります。 コンシューマーグループを指定すると、Realtime Compute for Apache Flinkは、コンシューマーグループに保存されているチェックポイントのログを消費します。 コンシューマーグループにチェックポイントが存在しない場合、Realtime Compute for Apache FlinkはstartTimeパラメーターで指定された時間からログを消費します。

    • false: Realtime Compute for Apache Flinkは、指定されたコンシューマーグループに保存されているチェックポイントのログを消費しません。 デフォルト値です。

    説明

    VVR 6.0.5以降を使用するRealtime Compute for Apache Flinkのみがこのパラメーターをサポートしています。

    directMode

    Simple Log Serviceの直接接続モードを有効にするかどうかを指定します。

    STRING

    いいえ

    false

    有効な値:

    • true: 直接接続モードが有効です。

    • false: 直接接続モードは無効です。 デフォルト値です。

    maxRetries

    Simple Log Serviceからのデータの読み取りに失敗した場合に許可される再試行の回数。

    STRING

    いいえ

    3

    非該当

    batchGetSize

    リクエストでデータを読み取るロググループの数。

    STRING

    いいえ

    100

    batchGetSizeパラメーターの値は1000を超えることはできません。 それ以外の場合は、エラーが返されます。

    exitAfterFinish

    データ消費が完了した後にRealtime Compute for Apache Flinkが終了するかどうかを指定します。

    STRING

    いいえ

    false

    有効な値:

    • true: Realtime Compute for Apache Flinkは、データの消費が完了すると終了します。

    • false: Realtime Compute for Apache Flinkは、データの消費が完了すると終了しません。 デフォルト値です。

    説明

    VVR 4.0.13以降を使用するRealtime Compute for Apache Flinkのみがこのパラメーターをサポートしています。

    query

    Simple Log Serviceでデータを消費する前にデータを前処理するために使用されるクエリステートメント。

    STRING

    なし

    クエリパラメーターを設定すると、Realtime Compute for Apache FlinkがSimple Log Serviceのデータを消費する前に、Realtime Compute for Apache FlinkはSimple Log Serviceからデータを除外できます。 これにより、Realtime Compute for Apache FlinkがSimple Log Serviceのすべてのデータを消費するのを防ぎます。 これにより、コストが削減され、データ処理効率が向上します。

    たとえば、'query' = '* | where request_method = ''GET''' ステートメントを実行すると、Realtime Compute For Apache Flinkは、Realtime Compute for Apache FlinkがSimple Log ServiceのLogstoreからデータを読み取る前に、request_methodフィールドの値がGET句の値に等しいデータと一致します。

    説明

    クエリ文を実行してデータを前処理する場合は、Simple Log Service Processing Language (SPL) が必要です。 詳細については、「SPLの概要」をご参照ください。

    重要
    • VVR 8.0.1以降を使用するRealtime Compute for Apache Flinkのみがこのパラメーターをサポートしています。

    • Simple Log Serviceがこの機能をサポートしているリージョンの詳細については、「ルールに基づくログの消費」をご参照ください。

    • この機能は、パブリックプレビュー段階では無料です。 将来、Simple Log Serviceに対して課金される可能性があります。 詳細については、「ルールに基づくログの消費」トピックの「課金」セクションをご参照ください。

  • 結果テーブルのパラメーターのみ

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    補足

    topicField

    フィールド名を指定します。 このパラメーターの値は、__topic__ attributeフィールドの値を上書きして、ログのトピックを示します。

    STRING

    なし

    このパラメーターの値は、テーブルの既存のフィールドである必要があります。

    timeField

    フィールド名を指定します。 このパラメーターの値は、__timestamp__属性フィールドの値を上書きして、ログの書き込み時間を示します。

    STRING

    いいえ

    現在の時刻

    このパラメーターの値は、テーブル内のINT型の既存のフィールドである必要があります。 フィールドを指定しない場合は、デフォルトで現在の時刻が使用されます。

    sourceField

    フィールド名を指定します。 このパラメーターの値は、__source__属性フィールドの値を上書きして、ログの発生元を示します。 たとえば、値はログを生成するマシンのIPアドレスです。

    STRING

    なし

    このパラメーターの値は、テーブルの既存のフィールドである必要があります。

    partitionField

    フィールド名を指定します。 ハッシュ値は、データがSimple Log Serviceに書き込まれるときに、このパラメーターの値に基づいて計算されます。 同じハッシュ値を含むデータは、同じシャードに書き込まれます。

    STRING

    なし

    このパラメーターを指定しない場合、各データエントリは使用可能なシャードにランダムに書き込まれます。

    バケット

    partitionFieldパラメーターを指定したときにハッシュ値に基づいて再グループ化されるバケットの数。

    STRING

    いいえ

    64

    有効な値: [1,256] 。 このパラメーターの値は、2の整数乗である必要があります。 バケットの数は、シャードの数以上である必要があります。 それ以外の場合、データは特定のシャードに書き込まれません。

    説明

    VVR 6.0.5以降を使用するRealtime Compute for Apache Flinkのみがこのパラメーターをサポートしています。

    flushIntervalMs

    データ書き込みがトリガーされる間隔。

    STRING

    いいえ

    2000

    単位:ミリ秒。

    writeNullProperties

    null値を空の文字列としてSimple Log Serviceに書き込むかどうかを指定します。

    BOOLEAN

    いいえ

    true

    有効な値:

    • true: Null値は空の文字列としてSimple Log Serviceに書き込まれます。 デフォルト値です。

    • false: Null値はSimple Log Serviceに書き込まれません。

    説明

    VVR 8.0.6以降を使用するRealtime Compute for Apache Flinkのみがこのパラメーターをサポートしています。

属性フィールドの抽出

Realtime Compute for Apache Flinkは、ログフィールド、カスタムフィールド、および次の属性フィールドを抽出できます。

フィールド

種類

説明

__source__

流れのメタデータ仮想

メッセージソース。

__topic__

流れのメタデータ仮想

メッセージトピック。

__timestamp__

BIGINT METDATA VRTUAL

ログ時間。

__tag__

地図 <VARCHAR、VARCHAR> メタデータ仮想

メッセージタグ。

"__tag __:__ receive_time__":"1616742274" 属性の場合、__receive_time__ フィールドと1616742274フィールドは、キーと値のペアとしてマップに記録されます。 SQL文に __tag__['__receive_time__'] を含めて、タグを照会できます。

属性フィールドを抽出するには、SQL文でヘッダーを定義する必要があります。 例:

create table sls_stream(
  __timestamp__ bigint HEADER,
  __receive_time__ bigint HEADER
  b int,
  c varchar
) with (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

関連ドキュメント

Realtime Compute For Apache FlinkのDataStream APIを使用してログデータを消費する方法の詳細については、「DataStream API」をご参照ください。