すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Simple Log Service (SLS)

最終更新日:Feb 06, 2026

このトピックでは、Simple Log Service(SLS)コネクタの使用方法について説明します。

背景情報

Simple Log Service はエンドツーエンドのデータロギングサービスです。ログデータを効率的に収集・消費・配信・照会・分析でき、運用保守(O&M)および業務効率を向上させ、大量のログデータを処理する能力を構築できます。

SLS コネクタは、以下の種類の情報をサポートしています。

カテゴリ

詳細

サポートされるタイプ

ソーステーブルおよび結果テーブル

実行モード

ストリーミングモードのみ

特定の監視メトリック

該当なし

データ形式

なし

APIタイプ

SQL、DataStream、データインジェスト YAML

結果テーブルのデータを更新または削除できますか?

結果テーブルのデータを更新または削除することはできません。挿入のみ可能です。

機能

ソーステーブル用の SLS コネクタは、メッセージの属性フィールドを直接読み取ることをサポートしています。サポートされる属性フィールドは次のとおりです。

フィールド名

フィールドタイプ

フィールドの説明

__source__

STRING METADATA VIRTUAL

メッセージソース。

__topic__

STRING METADATA VIRTUAL

メッセージトピック。

__timestamp__

BIGINT METADATA VIRTUAL

ログの時刻。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

メッセージタグ。

属性 "__tag__:__receive_time__":"1616742274" の場合、'__receive_time__' および '1616742274' がマップ内のキーと値のペアとして記録されます。SQL では __tag__['__receive_time__'] を使用して値にアクセスできます。

前提条件

Simple Log Service プロジェクトおよび Logstore を作成済みである必要があります。詳細については、「プロジェクトおよび Logstore の作成」をご参照ください。

制限事項

  • YAML によるデータインジェストで同期データソースとして Simple Log Service (SLS) を使用できるのは、Realtime Compute for Apache Flink VVR 11.1 以降のみです。

  • SLS コネクタは、at-least-once セマンティクスのみを保証します。

  • ソースの同時実行数をシャード数より大きい値に設定しないことを強く推奨します。この設定はリソースを無駄にし、VVR 8.0.5 以前ではシャード数が変更された場合に自動フェールオーバー機能が失敗し、一部のシャードが消費されなくなる可能性があります。

SQL

構文

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH パラメーター

  • 一般

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルタイプ。

    文字列

    はい

    なし

    このパラメーターを sls に設定します。

    endPoint

    エンドポイント。

    文字列

    はい

    なし

    SLS のプライベートネットワークエンドポイントを入力します。詳細については、「エンドポイント」をご参照ください。

    説明
    • デフォルトでは、Realtime Compute for Apache Flink はパブリックネットワークにアクセスできません。ただし、Alibaba Cloud では NAT Gateway を提供しており、VPC とパブリックネットワーク間の通信を可能にします。詳細については、「パブリックネットワークへのアクセス方法」をご参照ください。

    • SLS へのパブリックネットワーク経由でのアクセスは推奨しません。パブリックネットワーク経由でアクセスする必要がある場合は、HTTPS プロトコルを使用し、SLS の Global Accelerator (GA) を有効にしてください。詳細については、「転送アクセラレーションの管理」をご参照ください。

    project

    SLS プロジェクトの名前。

    文字列

    はい

    なし

    なし。

    logStore

    SLS Logstore または metricstore の名前。

    String

    はい

    なし

    Logstore のデータは metricstore と同様に消費されます。

    accessId

    Alibaba CloudアカウントのAccessKey ID。

    String

    はい

    なし

    詳細については、「AccessKey ID および AccessKey Secret の確認方法」をご参照ください。

    重要

    AccessKey 情報の漏洩を防ぐため、AccessKey ペアを指定する際に変数を使用することを推奨します。詳細については、「変数の管理」をご参照ください。

    accessKey

    Alibaba CloudアカウントのAccessKeyシークレット。

    String

    はい

    なし

  • ソーステーブル専用

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    enableNewSource

    FLIP-27 インターフェイスを実装する新しいデータソースを有効にするかどうかを指定します。

    Boolean

    いいえ

    false

    新しいデータソースは、シャードの変更に自動的に適応し、すべてのソース同時実行性にわたってシャードを可能な限り均等に分散させることができます。

    重要
    • このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.9 以降でのみサポートされています。VVR 11.1 以降では、デフォルトで true に設定されています。

    • この設定項目を変更した後は、ジョブを状態から復元できません。この問題を解決するには、まず consumerGroup 設定項目を設定してジョブを起動し、SLS 使用者グループに消費進捗を記録します。その後、consumeFromCheckpoint 設定項目を true に設定して、ステートレスな方法でジョブを起動します。これにより、ジョブは過去の進捗から消費を再開できます。

    • SLS に読み取り専用のシャードが存在する場合、一部の同時実行 Flink タスクは読み取り専用シャードからのデータ消費を完了した後も、他の未完了のシャードからデータを要求し続けます。その結果、一部の同時実行タスクが複数のシャードに割り当てられ、シャードの分散が不均衡になります。この不均衡は全体的な消費効率およびシステムパフォーマンスに影響を与えます。この問題を軽減するには、同時実行性を調整するか、タスクスケジューリングポリシーを最適化するか、小さなシャードをマージしてシャード数およびタスク割り当ての複雑さを減らしてください。

    shardDiscoveryIntervalMs

    シャードの変更を動的に検出する間隔。単位:ミリ秒。

    Long

    いいえ

    60000

    このパラメーターを負の値に設定すると、動的検出が無効になります。

    説明
    • このパラメーターの値は 1 分(60,000 ミリ秒)未満にできません。

    • このパラメーターは、enableNewSource パラメーターが true に設定されている場合にのみ有効です。

    • このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.9 以降でのみサポートされています。

    startupMode

    ソーステーブルの起動モード。

    String

    いいえ

    timestamp

    • timestamp(デフォルト):指定された開始時刻からログを消費します。

    • latest:最新のオフセットからログを消費します。

    • earliest:最も古いオフセットからログを消費します。

    • consumer_group:使用者グループに記録されたオフセットからログを消費します。使用者グループがシャードのコンシューマオフセットを記録していない場合、最も古いオフセットからログを消費します。

    重要
    • VVR 11.1 より前のバージョンでは、consumer_group 値はサポートされていません。consumeFromCheckpointtrue に設定する必要があります。この場合、指定された使用者グループに記録されたオフセットからログを消費し、ここで指定された起動モードは有効になりません。

    startTime

    ログ消費を開始する時刻。

    String

    いいえ

    現在時刻

    形式は yyyy-MM-dd hh:mm:ss です。

    このパラメーターは、startupModetimestamp に設定されている場合にのみ有効です。

    説明

    startTime パラメーターおよび stopTime パラメーターは、SLS の __timestamp__ 属性ではなく、__receive_time__ 属性に基づいています。

    stopTime

    ログ消費を終了する時刻。

    文字列

    いいえ

    なし

    形式は yyyy-MM-dd hh:mm:ss です。

    説明
    • このパラメーターは過去のログを消費する場合にのみ使用され、過去の時点に設定する必要があります。将来の時刻に設定すると、新しいログが書き込まれない場合に消費が早期に停止する可能性があります。これはエラーメッセージなしでデータストリームが中断されたように見えます。

    • ログ消費完了後に Flink プログラムを終了させるには、exitAfterFinish=true も設定する必要があります。

    consumerGroup

    コンシューマグループの名前。

    String

    いいえ

    なし

    使用者グループは消費進捗を記録するために使用されます。使用者グループの名前は任意に指定できます。形式は固定されていません。

    説明

    複数のジョブが同じ使用者グループを使用して協調的に消費することはできません。異なる Flink ジョブには異なる使用者グループを設定する必要があります。異なる Flink ジョブが同じ使用者グループを使用すると、すべてのデータが各ジョブによって消費されます。これは、Flink が SLS からデータを消費する際に、SLS 使用者グループを通じてパーティション割り当てを行わないためです。その結果、使用者グループが同じであっても、各コンシューマーは独自にメッセージを消費します。

    consumeFromCheckpoint

    指定された使用者グループに保存されたチェックポイントからログを消費するかどうかを指定します。

    String

    いいえ

    false

    • true:使用者グループも指定する必要があります。Flink プログラムは、使用者グループに保存されたチェックポイントからログを消費します。使用者グループに対応するチェックポイントがない場合、startTime パラメーターで指定された時刻からログを消費します。

    • false(デフォルト):指定された使用者グループに保存されたチェックポイントからログを消費しません。

    重要

    このパラメーターは、VVR 11.1 以降ではサポートされていません。VVR 11.1 以降では、startupModeconsumer_group に設定する必要があります。

    maxRetries

    SLS からのデータ読み取りに失敗した後の再試行回数。

    文字列

    いいえ

    3

    なし。

    batchGetSize

    1 回のリクエストで読み取るロググループの数。

    文字列

    いいえ

    100

    batchGetSize の値は 1,000 を超えることはできません。超えるとエラーが報告されます。

    exitAfterFinish

    データ消費完了後に Flink プログラムを終了するかどうかを指定します。

    文字列

    いいえ

    false

    • true:データ消費完了後に Flink プログラムが終了します。

    • false(デフォルト):データ消費完了後も Flink プログラムは終了しません。

    query

    重要

    このパラメーターは VVR 11.3 以降で非推奨となりましたが、後続のバージョンとの互換性は維持されています。

    SLS データ消費の前処理文。

    String

    いいえ

    なし

    query パラメーターを使用すると、SLS から消費される前にデータをフィルターできます。これにより、すべてのデータが Flink に消費されるのを防ぎ、コストを節約し、処理速度を向上させることができます。

    たとえば、 'query' = '*| where request_method = ''GET''' は、Flink が SLS からデータを読み取る前に、request_method フィールドの値が 'get' であるデータを一致させることを示します。

    説明

    query パラメーターには、Simple Log Service の構造化プロセス言語(SPL)を使用する必要があります。詳細については、「SPL 構文」をご参照ください。

    重要
    • このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.1 以降でのみサポートされています。

    • この機能は Simple Log Service から課金されます。詳細については、「課金」をご参照ください。

    processor

    SLS Consum Processor。このパラメーターと `query` の両方が設定されている場合、`query` が優先され、`processor` は有効になりません。

    String

    いいえ

    なし

    processor パラメーターを使用すると、Flink がデータを消費する前にデータをフィルターできます。これにより、コストを節約し、処理速度を向上させることができます。query パラメーターよりも processor パラメーターの使用を推奨します。

    たとえば、 'processor' = 'test-filter-processor' は、Flink がデータを読み取る前に SLS Consum Processor によってデータがフィルターされることを示します。

    説明

    processor には Simple Log Service (SLS) の構造化プロセス言語(SPL)を使用する必要があります。詳細については、「SPL 構文」をご参照ください。SLS Consum Processor の作成および更新方法については、「Consum Processor の管理」をご参照ください。

    重要

    このパラメーターは、Realtime Compute for Apache Flink VVR 11.3 以降でのみサポートされています。

    この機能は Simple Log Service から課金されます。詳細については、「課金」をご参照ください。

  • 結果テーブル専用

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    topicField

    フィールド名を指定します。このフィールドの値は __topic__ 属性フィールドの値を上書きし、ログのトピックを示します。

    文字列

    いいえ

    なし

    このパラメーターの値は、テーブル内の既存フィールドのいずれかである必要があります。

    timeField

    フィールド名を指定します。このフィールドの値は __timestamp__ 属性フィールドの値を上書きし、ログが書き込まれる時刻を示します。

    文字列

    いいえ

    現在時刻

    このパラメーターの値は、テーブル内の既存フィールドのいずれかである必要があり、フィールドタイプは INT である必要があります。このパラメーターが指定されていない場合、デフォルトで現在時刻が使用されます。

    sourceField

    フィールド名を指定します。このフィールドの値は __source__ 属性フィールドの値を上書きし、ログのソース(ログを生成するマシンの IP アドレスなど)を示します。

    String

    いいえ

    なし

    このパラメーターの値は、テーブル内の既存フィールドのいずれかである必要があります。

    partitionField

    フィールド名を指定します。データを書き込む際に、この列の値に基づいてハッシュ値が計算されます。同じハッシュ値を持つデータは同じシャードに書き込まれます。

    String

    いいえ

    なし

    このパラメーターが指定されていない場合、各データはランダムに現在利用可能なシャードに書き込まれます。

    buckets

    partitionField が指定されている場合、ハッシュ値に基づいて再グループ化されるグループの数。

    文字列

    いいえ

    64

    このパラメーターの値は [1, 256] の範囲内であり、2 の整数乗である必要があります。バケット数はシャード数以上である必要があります。そうでない場合、一部のシャードにデータが書き込まれません。

    flushIntervalMs

    データ書き込みをトリガーする期間。

    String

    いいえ

    2000

    単位:ミリ秒。

    writeNullProperties

    null 値を空の文字列として SLS に書き込むかどうかを指定します。

    Boolean

    いいえ

    true

    • true(デフォルト):null 値はログに空文字列として書き込まれます。

    • false:null 値を持つフィールドはログに書き込まれません。

    説明

    このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.6 以降でのみサポートされています。

型マッピング

Flink フィールドタイプ

SLS フィールドタイプ

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

整数

BIGINT

フロート

DOUBLE

DECIMAL

データインジェスト(パブリックプレビュー)

制限事項

この機能は、Realtime Compute for Apache Flink VVR 11.1 以降でのみサポートされています。

構文

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

設定項目

パラメーター

説明

データ型

必須

デフォルト値

備考

type

データソースタイプ。

文字列

はい

なし

このパラメーターを sls に設定します。

endpoint

エンドポイント。

文字列

はい

なし

SLS のプライベートネットワークエンドポイントを入力します。詳細については、「エンドポイント」をご参照ください。

説明
  • デフォルトでは、Realtime Compute for Apache Flink はパブリックネットワークにアクセスできません。ただし、Alibaba Cloud では NAT Gateway を提供しており、VPC とパブリックネットワーク間の通信を可能にします。詳細については、「パブリックネットワークへのアクセス方法」をご参照ください。

  • SLS へのパブリックネットワーク経由でのアクセスは推奨しません。パブリックネットワーク経由でアクセスする必要がある場合は、HTTPS プロトコルを使用し、SLS の Global Accelerator (GA) を有効にしてください。詳細については、「転送アクセラレーションの管理」をご参照ください。

accessId

Alibaba Cloud アカウントの AccessKey ID。

文字列

はい

なし

詳細については、「AccessKey ID および AccessKey Secret の確認方法」をご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、AccessKey ペアを指定する際に変数を使用することを推奨します。詳細については、「変数の管理」をご参照ください。

accessKey

Alibaba Cloud アカウントの AccessKey シークレット。

文字列

はい

なし

project

SLS プロジェクトの名前。

文字列

はい

なし

なし。

logStore

SLS Logstore または metricstore の名前。

文字列

はい

なし

Logstore のデータは metricstore と同様に消費されます。

schema.inference.strategy

スキーマ推論戦略。

文字列

いいえ

continuous

  • continuous:各データに対してスキーマ推論を実行します。前後のスキーマが互換性のない場合、より広いスキーマを推論し、スキーマ変更イベントを生成します。

  • static:ジョブ開始時に一度だけスキーマ推論を実行します。後続のデータは初期スキーマに基づいて解析され、スキーマ変更イベントは生成されません。

maxPreFetchLogGroups

初期スキーマ推論中に各シャードから読み取りおよび解析を試行するロググループの最大数。

整数

いいえ

50

ジョブが実際にデータを読み取りおよび処理する前に、各シャードから指定された数のロググループを事前消費してスキーマ情報を初期化しようと試みます。

shardDiscoveryIntervalMs

シャードの変更を動的に検出する間隔。単位:ミリ秒。

Long

いいえ

60000

このパラメーターを負の値に設定すると、動的検出が無効になります。

説明

このパラメーターの値は 1 分(60,000 ミリ秒)未満にできません。

startupMode

起動モード。

文字列

いいえ

なし

  • timestamp(デフォルト):指定された開始時刻からログを消費します。

  • latest:最新のオフセットからログを消費します。

  • earliest:最も古いオフセットからログを消費します。

  • consumer_group:使用者グループに記録されたオフセットからログを消費します。使用者グループがシャードのコンシューマオフセットを記録していない場合、最も古いオフセットからログを消費します。

startTime

ログ消費を開始する時刻。

文字列

いいえ

現在時刻

形式は yyyy-MM-dd hh:mm:ss です。

このパラメーターは、startupMode が timestamp に設定されている場合にのみ有効です。

説明

startTime パラメーターおよび stopTime パラメーターは、SLS の __timestamp__ 属性ではなく、__receive_time__ 属性に基づいています。

stopTime

ログ消費を終了する時刻。

文字列

いいえ

なし

形式は yyyy-MM-dd hh:mm:ss です。

説明

ログ消費完了後に Flink プログラムを終了させるには、exitAfterFinish=true も設定する必要があります。

consumerGroup

コンシューマーグループの名前。

文字列

いいえ

なし

使用者グループは消費進捗を記録するために使用されます。使用者グループの名前は任意に指定できます。形式は固定されていません。

batchGetSize

1 回のリクエストで読み取るロググループの数。

整数

いいえ

100

batchGetSize の値は 1,000 を超えることはできません。超えるとエラーが報告されます。

maxRetries

SLS からのデータ読み取りに失敗した後の再試行回数。

整数

いいえ

3

なし。

exitAfterFinish

データ消費完了後に Flink プログラムを終了するかどうかを指定します。

ブール値

いいえ

false

  • true:データ消費完了後に Flink プログラムが終了します。

  • false(デフォルト):データ消費完了後も Flink プログラムは終了しません。

query

SLS データ消費の前処理文。

文字列

いいえ

なし

query パラメーターを使用すると、SLS から消費される前にデータをフィルターできます。これにより、すべてのデータが Flink に消費されるのを防ぎ、コストを節約し、処理速度を向上させることができます。

たとえば、'query' = '*| where request_method = ''GET''' は、Flink が SLS からデータを読み取る前に、request_method フィールドの値が 'get' であるデータを照合することを示しています。

説明

query パラメーターには、Simple Log Service の構造化プロセス言語(SPL)を使用する必要があります。詳細については、「SPL 構文」をご参照ください。

重要
  • この機能をサポートするリージョンについては、「ルールに基づくログの消費」をご参照ください。

  • この機能はパブリックプレビュー中であり、無料でご利用いただけます。将来的には課金される可能性があります。詳細については、「課金」をご参照ください。

compressType

SLS の圧縮タイプ。

文字列

いいえ

なし

サポートされる圧縮タイプは次のとおりです。

  • lz4

  • deflate

  • zstd

timeZone

startTime および stopTime のタイムゾーン。

文字列

いいえ

なし

デフォルトでは、オフセットは追加されません。

regionId

SLS サービスが有効になっているリージョン。

文字列

いいえ

なし

設定については、「サポート対象リージョン」をご参照ください。

signVersion

SLS リクエスト署名のバージョン。

文字列

いいえ

なし

設定の詳細については、「リクエスト署名」をご参照ください。

shardModDivisor

SLS Logstore の読み取りパーティションの除数。

Int

いいえ

-1

詳細については、「シャード」のドキュメントをご参照ください。

shardModRemainder

SLS Logstore の読み取りパーティションの剰余。

Int

いいえ

-1

このオプションを構成するには、「シャード」をご参照ください。

metadata.list

ダウンストリームに渡すメタデータ列。

文字列

いいえ

なし

利用可能なメタデータフィールドには、__source____topic____timestamp__、および __tag__ が含まれます。これらはカンマで区切ります。

型マッピング

データインジェストの型マッピングは次のとおりです。

SLS フィールドタイプ

CDC フィールドタイプ

STRING

STRING

テーブルスキーマ推論および変更同期

  • シャードデータの事前消費およびテーブルスキーマの初期化

    SLS コネクタは、読み取り中の Logstore のスキーマを維持します。Logstore からデータを読み取る前に、SLS コネクタは各シャードから最大 maxPreFetchLogGroups 個のロググループを事前消費しようと試みます。その後、これらのロググループ内の各ログのスキーマを解析し、スキーマをマージしてテーブルスキーマを初期化します。実際のデータ消費が開始される前に、初期化されたスキーマに基づいて対応するテーブル作成イベントが生成されます。

    説明

    各シャードについて、SLS コネクタは現在時刻の 1 時間前からデータを消費し、ログスキーマを解析しようと試みます。

  • プライマリキー情報

    SLS ログにはプライマリキー情報が含まれていません。変換ルールを使用して、手動でテーブルにプライマリキーを追加できます。

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • スキーマ推論およびスキーマ進化

    テーブルスキーマが初期化された後、schema.inference.strategy が static に設定されている場合、SLS コネクタは各ログを初期テーブルスキーマに基づいて解析し、スキーマ変更イベントを生成しません。schema.inference.strategy が continuous に設定されている場合、SLS コネクタは各ログのデータを解析し、物理列を推論して、現在記録されているスキーマと比較します。推論されたスキーマが現在のスキーマと不一致の場合、次のルールに基づいてスキーマをマージします。

    • 推論された物理列に現在のスキーマにないフィールドが含まれている場合、これらのフィールドがスキーマに追加され、NULL 可能な列を追加するイベントが生成されます。

    • 推論された物理列に現在のスキーマにすでに存在するフィールドが含まれていない場合、これらのフィールドは保持され、データは NULL で埋められ、列を削除するイベントは生成されません。

    SLS コネクタは、各ログ内のすべてのフィールドの型を String として推論します。現在、列の追加のみがサポートされています。新しい列は現在のスキーマの末尾に追加され、NULL 可能な列として設定されます。

コード例

  • SQL ソーステーブルおよび結果テーブル

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • データインジェストデータソース

    SLS をデータインジェストジョブのデータソースとして使用し、SLS データをサポートされているダウンストリームシステムにリアルタイムで書き込むことができます。たとえば、次のとおりにデータインジェストジョブを構成して、Logstore のデータを paimon 形式で DLF データレイクに書き込むことができます。ジョブは自動的にフィールドのデータ型およびダウンストリームテーブルのスキーマを推論します。また、実行時の動的スキーマ進化もサポートしています。

source:
  type: sls
  name: SLS Source
  endpoint: ${endpoint}
  project: test_project
  logstore: test_log
  accessId: ${accessId}
  accessKey: ${accessKey}
   
# テーブルにプライマリキー情報を追加します。 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# test_log のすべてのデータを test_database.inventory テーブルに書き込みます。
route:
  - source-table: test_project.test_log
    sink-table: test_database.inventory

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション)削除ベクターを有効にして読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true

DataStream API

重要

DataStream API を使用してデータを読み書きする場合は、対応する DataStream コネクタを使用して Flink に接続する必要があります。詳細については、「DataStream コネクタの使用」をご参照ください。

VVR 8.0.10 より前のバージョンを使用している場合、ジョブを起動する際に依存関係 JAR パッケージが不足している可能性があります。この問題を解決するには、追加の依存関係に対応する uber パッケージを追加できます。

SLS からデータを読み取る

Realtime Compute for Apache Flink は、SLS からデータを読み取るための SourceFunction の実装である SlsSourceFunction クラスを提供しています。次のコードはその例です。

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // ストリーミング実行環境を設定します
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // SLS ソースとシンクを作成して追加します。
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source"); // SLSストリームソース
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint"); // エンドポイント
        accessInfo.setProjectName("yourProject"); // プロジェクト名
        accessInfo.setLogstore("yourLogStore"); // ログストア
        accessInfo.setAccessId("yourAccessId"); // アクセスID
        accessInfo.setAccessKey("yourAccessKey"); // アクセスキー

        // バッチ取得サイズを指定する必要があります。
        accessInfo.setBatchGetSize(10);

        // オプションパラメータ
        accessInfo.setConsumerGroup("yourConsumerGroup"); // コンシューマグループ
        accessInfo.setMaxRetries(3); // 最大再試行回数


        // 消費開始時刻、現在時刻に設定。
        int startInSec = (int) (new Date().getTime() / 1000);

        // 消費停止時刻、-1 は停止しないことを意味します。
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

SLS にデータを書き込む

SLS にデータを書き込むための OutputFormat の実装である SLSOutputFormat クラスを提供しています。次のコードはその例です。

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

SLS DataStream コネクタ は Maven セントラルリポジトリで入手可能です。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

FAQ

Flink プログラムの復元時にソーステーブルで TaskManager がメモリ不足になり、「java.lang.OutOfMemoryError: Java heap space」エラーが発生した場合はどうすればよいですか?