このトピックでは、Simple Log Service(SLS)コネクタの使用方法について説明します。
背景情報
Simple Log Service はエンドツーエンドのデータロギングサービスです。ログデータを効率的に収集・消費・配信・照会・分析でき、運用保守(O&M)および業務効率を向上させ、大量のログデータを処理する能力を構築できます。
SLS コネクタは、以下の種類の情報をサポートしています。
カテゴリ | 詳細 |
サポートされるタイプ | ソーステーブルおよび結果テーブル |
実行モード | ストリーミングモードのみ |
特定の監視メトリック | 該当なし |
データ形式 | なし |
APIタイプ | SQL、DataStream、データインジェスト YAML |
結果テーブルのデータを更新または削除できますか? | 結果テーブルのデータを更新または削除することはできません。挿入のみ可能です。 |
機能
ソーステーブル用の SLS コネクタは、メッセージの属性フィールドを直接読み取ることをサポートしています。サポートされる属性フィールドは次のとおりです。
フィールド名 | フィールドタイプ | フィールドの説明 |
__source__ | STRING METADATA VIRTUAL | メッセージソース。 |
__topic__ | STRING METADATA VIRTUAL | メッセージトピック。 |
__timestamp__ | BIGINT METADATA VIRTUAL | ログの時刻。 |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | メッセージタグ。 属性 |
前提条件
Simple Log Service プロジェクトおよび Logstore を作成済みである必要があります。詳細については、「プロジェクトおよび Logstore の作成」をご参照ください。
制限事項
YAML によるデータインジェストで同期データソースとして Simple Log Service (SLS) を使用できるのは、Realtime Compute for Apache Flink VVR 11.1 以降のみです。
SLS コネクタは、at-least-once セマンティクスのみを保証します。
ソースの同時実行数をシャード数より大きい値に設定しないことを強く推奨します。この設定はリソースを無駄にし、VVR 8.0.5 以前ではシャード数が変更された場合に自動フェールオーバー機能が失敗し、一部のシャードが消費されなくなる可能性があります。
SQL
構文
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);WITH パラメーター
一般
パラメーター
説明
データ型
必須
デフォルト値
備考
connector
テーブルタイプ。
文字列
はい
なし
このパラメーターを sls に設定します。
endPoint
エンドポイント。
文字列
はい
なし
SLS のプライベートネットワークエンドポイントを入力します。詳細については、「エンドポイント」をご参照ください。
説明デフォルトでは、Realtime Compute for Apache Flink はパブリックネットワークにアクセスできません。ただし、Alibaba Cloud では NAT Gateway を提供しており、VPC とパブリックネットワーク間の通信を可能にします。詳細については、「パブリックネットワークへのアクセス方法」をご参照ください。
SLS へのパブリックネットワーク経由でのアクセスは推奨しません。パブリックネットワーク経由でアクセスする必要がある場合は、HTTPS プロトコルを使用し、SLS の Global Accelerator (GA) を有効にしてください。詳細については、「転送アクセラレーションの管理」をご参照ください。
project
SLS プロジェクトの名前。
文字列
はい
なし
なし。
logStore
SLS Logstore または metricstore の名前。
String
はい
なし
Logstore のデータは metricstore と同様に消費されます。
accessId
Alibaba CloudアカウントのAccessKey ID。
String
はい
なし
詳細については、「AccessKey ID および AccessKey Secret の確認方法」をご参照ください。
重要AccessKey 情報の漏洩を防ぐため、AccessKey ペアを指定する際に変数を使用することを推奨します。詳細については、「変数の管理」をご参照ください。
accessKey
Alibaba CloudアカウントのAccessKeyシークレット。
String
はい
なし
ソーステーブル専用
パラメーター
説明
データ型
必須
デフォルト値
備考
enableNewSource
FLIP-27 インターフェイスを実装する新しいデータソースを有効にするかどうかを指定します。
Boolean
いいえ
false
新しいデータソースは、シャードの変更に自動的に適応し、すべてのソース同時実行性にわたってシャードを可能な限り均等に分散させることができます。
重要このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.9 以降でのみサポートされています。VVR 11.1 以降では、デフォルトで true に設定されています。
この設定項目を変更した後は、ジョブを状態から復元できません。この問題を解決するには、まず consumerGroup 設定項目を設定してジョブを起動し、SLS 使用者グループに消費進捗を記録します。その後、consumeFromCheckpoint 設定項目を true に設定して、ステートレスな方法でジョブを起動します。これにより、ジョブは過去の進捗から消費を再開できます。
SLS に読み取り専用のシャードが存在する場合、一部の同時実行 Flink タスクは読み取り専用シャードからのデータ消費を完了した後も、他の未完了のシャードからデータを要求し続けます。その結果、一部の同時実行タスクが複数のシャードに割り当てられ、シャードの分散が不均衡になります。この不均衡は全体的な消費効率およびシステムパフォーマンスに影響を与えます。この問題を軽減するには、同時実行性を調整するか、タスクスケジューリングポリシーを最適化するか、小さなシャードをマージしてシャード数およびタスク割り当ての複雑さを減らしてください。
shardDiscoveryIntervalMs
シャードの変更を動的に検出する間隔。単位:ミリ秒。
Long
いいえ
60000
このパラメーターを負の値に設定すると、動的検出が無効になります。
説明このパラメーターの値は 1 分(60,000 ミリ秒)未満にできません。
このパラメーターは、enableNewSource パラメーターが true に設定されている場合にのみ有効です。
このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.9 以降でのみサポートされています。
startupMode
ソーステーブルの起動モード。
String
いいえ
timestamp
timestamp(デフォルト):指定された開始時刻からログを消費します。latest:最新のオフセットからログを消費します。earliest:最も古いオフセットからログを消費します。consumer_group:使用者グループに記録されたオフセットからログを消費します。使用者グループがシャードのコンシューマオフセットを記録していない場合、最も古いオフセットからログを消費します。
重要VVR 11.1 より前のバージョンでは、consumer_group 値はサポートされていません。
consumeFromCheckpointをtrueに設定する必要があります。この場合、指定された使用者グループに記録されたオフセットからログを消費し、ここで指定された起動モードは有効になりません。
startTime
ログ消費を開始する時刻。
String
いいえ
現在時刻
形式は
yyyy-MM-dd hh:mm:ssです。このパラメーターは、
startupModeがtimestampに設定されている場合にのみ有効です。説明startTime パラメーターおよび stopTime パラメーターは、SLS の __timestamp__ 属性ではなく、__receive_time__ 属性に基づいています。
stopTime
ログ消費を終了する時刻。
文字列
いいえ
なし
形式は
yyyy-MM-dd hh:mm:ssです。説明このパラメーターは過去のログを消費する場合にのみ使用され、過去の時点に設定する必要があります。将来の時刻に設定すると、新しいログが書き込まれない場合に消費が早期に停止する可能性があります。これはエラーメッセージなしでデータストリームが中断されたように見えます。
ログ消費完了後に Flink プログラムを終了させるには、exitAfterFinish=true も設定する必要があります。
consumerGroup
コンシューマグループの名前。
String
いいえ
なし
使用者グループは消費進捗を記録するために使用されます。使用者グループの名前は任意に指定できます。形式は固定されていません。
説明複数のジョブが同じ使用者グループを使用して協調的に消費することはできません。異なる Flink ジョブには異なる使用者グループを設定する必要があります。異なる Flink ジョブが同じ使用者グループを使用すると、すべてのデータが各ジョブによって消費されます。これは、Flink が SLS からデータを消費する際に、SLS 使用者グループを通じてパーティション割り当てを行わないためです。その結果、使用者グループが同じであっても、各コンシューマーは独自にメッセージを消費します。
consumeFromCheckpoint
指定された使用者グループに保存されたチェックポイントからログを消費するかどうかを指定します。
String
いいえ
false
true:使用者グループも指定する必要があります。Flink プログラムは、使用者グループに保存されたチェックポイントからログを消費します。使用者グループに対応するチェックポイントがない場合、startTime パラメーターで指定された時刻からログを消費します。false(デフォルト):指定された使用者グループに保存されたチェックポイントからログを消費しません。
重要このパラメーターは、VVR 11.1 以降ではサポートされていません。VVR 11.1 以降では、
startupModeをconsumer_groupに設定する必要があります。maxRetries
SLS からのデータ読み取りに失敗した後の再試行回数。
文字列
いいえ
3
なし。
batchGetSize
1 回のリクエストで読み取るロググループの数。
文字列
いいえ
100
batchGetSizeの値は 1,000 を超えることはできません。超えるとエラーが報告されます。exitAfterFinish
データ消費完了後に Flink プログラムを終了するかどうかを指定します。
文字列
いいえ
false
true:データ消費完了後に Flink プログラムが終了します。false(デフォルト):データ消費完了後も Flink プログラムは終了しません。
query
重要このパラメーターは VVR 11.3 以降で非推奨となりましたが、後続のバージョンとの互換性は維持されています。
SLS データ消費の前処理文。
String
いいえ
なし
query パラメーターを使用すると、SLS から消費される前にデータをフィルターできます。これにより、すべてのデータが Flink に消費されるのを防ぎ、コストを節約し、処理速度を向上させることができます。
たとえば、
'query' = '*| where request_method = ''GET'''は、Flink が SLS からデータを読み取る前に、request_method フィールドの値が 'get' であるデータを一致させることを示します。説明query パラメーターには、Simple Log Service の構造化プロセス言語(SPL)を使用する必要があります。詳細については、「SPL 構文」をご参照ください。
重要このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.1 以降でのみサポートされています。
この機能は Simple Log Service から課金されます。詳細については、「課金」をご参照ください。
processor
SLS Consum Processor。このパラメーターと `query` の両方が設定されている場合、`query` が優先され、`processor` は有効になりません。
String
いいえ
なし
processor パラメーターを使用すると、Flink がデータを消費する前にデータをフィルターできます。これにより、コストを節約し、処理速度を向上させることができます。query パラメーターよりも processor パラメーターの使用を推奨します。
たとえば、
'processor' = 'test-filter-processor'は、Flink がデータを読み取る前に SLS Consum Processor によってデータがフィルターされることを示します。説明processor には Simple Log Service (SLS) の構造化プロセス言語(SPL)を使用する必要があります。詳細については、「SPL 構文」をご参照ください。SLS Consum Processor の作成および更新方法については、「Consum Processor の管理」をご参照ください。
重要このパラメーターは、Realtime Compute for Apache Flink VVR 11.3 以降でのみサポートされています。
この機能は Simple Log Service から課金されます。詳細については、「課金」をご参照ください。
結果テーブル専用
パラメーター
説明
データ型
必須
デフォルト値
備考
topicField
フィールド名を指定します。このフィールドの値は __topic__ 属性フィールドの値を上書きし、ログのトピックを示します。
文字列
いいえ
なし
このパラメーターの値は、テーブル内の既存フィールドのいずれかである必要があります。
timeField
フィールド名を指定します。このフィールドの値は __timestamp__ 属性フィールドの値を上書きし、ログが書き込まれる時刻を示します。
文字列
いいえ
現在時刻
このパラメーターの値は、テーブル内の既存フィールドのいずれかである必要があり、フィールドタイプは INT である必要があります。このパラメーターが指定されていない場合、デフォルトで現在時刻が使用されます。
sourceField
フィールド名を指定します。このフィールドの値は __source__ 属性フィールドの値を上書きし、ログのソース(ログを生成するマシンの IP アドレスなど)を示します。
String
いいえ
なし
このパラメーターの値は、テーブル内の既存フィールドのいずれかである必要があります。
partitionField
フィールド名を指定します。データを書き込む際に、この列の値に基づいてハッシュ値が計算されます。同じハッシュ値を持つデータは同じシャードに書き込まれます。
String
いいえ
なし
このパラメーターが指定されていない場合、各データはランダムに現在利用可能なシャードに書き込まれます。
buckets
partitionField が指定されている場合、ハッシュ値に基づいて再グループ化されるグループの数。
文字列
いいえ
64
このパラメーターの値は [1, 256] の範囲内であり、2 の整数乗である必要があります。バケット数はシャード数以上である必要があります。そうでない場合、一部のシャードにデータが書き込まれません。
flushIntervalMs
データ書き込みをトリガーする期間。
String
いいえ
2000
単位:ミリ秒。
writeNullProperties
null 値を空の文字列として SLS に書き込むかどうかを指定します。
Boolean
いいえ
true
true(デフォルト):null 値はログに空文字列として書き込まれます。false:null 値を持つフィールドはログに書き込まれません。
説明このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.6 以降でのみサポートされています。
型マッピング
Flink フィールドタイプ | SLS フィールドタイプ |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
整数 | |
BIGINT | |
フロート | |
DOUBLE | |
DECIMAL |
データインジェスト(パブリックプレビュー)
制限事項
この機能は、Realtime Compute for Apache Flink VVR 11.1 以降でのみサポートされています。
構文
source:
type: sls
name: SLS Source
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>設定項目
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
type | データソースタイプ。 | 文字列 | はい | なし | このパラメーターを sls に設定します。 |
endpoint | エンドポイント。 | 文字列 | はい | なし | SLS のプライベートネットワークエンドポイントを入力します。詳細については、「エンドポイント」をご参照ください。 説明
|
accessId | Alibaba Cloud アカウントの AccessKey ID。 | 文字列 | はい | なし | 詳細については、「AccessKey ID および AccessKey Secret の確認方法」をご参照ください。 重要 AccessKey 情報の漏洩を防ぐため、AccessKey ペアを指定する際に変数を使用することを推奨します。詳細については、「変数の管理」をご参照ください。 |
accessKey | Alibaba Cloud アカウントの AccessKey シークレット。 | 文字列 | はい | なし | |
project | SLS プロジェクトの名前。 | 文字列 | はい | なし | なし。 |
logStore | SLS Logstore または metricstore の名前。 | 文字列 | はい | なし | Logstore のデータは metricstore と同様に消費されます。 |
schema.inference.strategy | スキーマ推論戦略。 | 文字列 | いいえ | continuous |
|
maxPreFetchLogGroups | 初期スキーマ推論中に各シャードから読み取りおよび解析を試行するロググループの最大数。 | 整数 | いいえ | 50 | ジョブが実際にデータを読み取りおよび処理する前に、各シャードから指定された数のロググループを事前消費してスキーマ情報を初期化しようと試みます。 |
shardDiscoveryIntervalMs | シャードの変更を動的に検出する間隔。単位:ミリ秒。 | Long | いいえ | 60000 | このパラメーターを負の値に設定すると、動的検出が無効になります。 説明 このパラメーターの値は 1 分(60,000 ミリ秒)未満にできません。 |
startupMode | 起動モード。 | 文字列 | いいえ | なし |
|
startTime | ログ消費を開始する時刻。 | 文字列 | いいえ | 現在時刻 | 形式は yyyy-MM-dd hh:mm:ss です。 このパラメーターは、startupMode が timestamp に設定されている場合にのみ有効です。 説明 startTime パラメーターおよび stopTime パラメーターは、SLS の __timestamp__ 属性ではなく、__receive_time__ 属性に基づいています。 |
stopTime | ログ消費を終了する時刻。 | 文字列 | いいえ | なし | 形式は yyyy-MM-dd hh:mm:ss です。 説明 ログ消費完了後に Flink プログラムを終了させるには、exitAfterFinish=true も設定する必要があります。 |
consumerGroup | コンシューマーグループの名前。 | 文字列 | いいえ | なし | 使用者グループは消費進捗を記録するために使用されます。使用者グループの名前は任意に指定できます。形式は固定されていません。 |
batchGetSize | 1 回のリクエストで読み取るロググループの数。 | 整数 | いいえ | 100 | batchGetSize の値は 1,000 を超えることはできません。超えるとエラーが報告されます。 |
maxRetries | SLS からのデータ読み取りに失敗した後の再試行回数。 | 整数 | いいえ | 3 | なし。 |
exitAfterFinish | データ消費完了後に Flink プログラムを終了するかどうかを指定します。 | ブール値 | いいえ | false |
|
query | SLS データ消費の前処理文。 | 文字列 | いいえ | なし | query パラメーターを使用すると、SLS から消費される前にデータをフィルターできます。これにより、すべてのデータが Flink に消費されるのを防ぎ、コストを節約し、処理速度を向上させることができます。 たとえば、 説明 query パラメーターには、Simple Log Service の構造化プロセス言語(SPL)を使用する必要があります。詳細については、「SPL 構文」をご参照ください。 重要
|
compressType | SLS の圧縮タイプ。 | 文字列 | いいえ | なし | サポートされる圧縮タイプは次のとおりです。
|
timeZone | startTime および stopTime のタイムゾーン。 | 文字列 | いいえ | なし | デフォルトでは、オフセットは追加されません。 |
regionId | SLS サービスが有効になっているリージョン。 | 文字列 | いいえ | なし | 設定については、「サポート対象リージョン」をご参照ください。 |
signVersion | SLS リクエスト署名のバージョン。 | 文字列 | いいえ | なし | 設定の詳細については、「リクエスト署名」をご参照ください。 |
shardModDivisor | SLS Logstore の読み取りパーティションの除数。 | Int | いいえ | -1 | 詳細については、「シャード」のドキュメントをご参照ください。 |
shardModRemainder | SLS Logstore の読み取りパーティションの剰余。 | Int | いいえ | -1 | このオプションを構成するには、「シャード」をご参照ください。 |
metadata.list | ダウンストリームに渡すメタデータ列。 | 文字列 | いいえ | なし | 利用可能なメタデータフィールドには、 |
型マッピング
データインジェストの型マッピングは次のとおりです。
SLS フィールドタイプ | CDC フィールドタイプ |
STRING | STRING |
テーブルスキーマ推論および変更同期
シャードデータの事前消費およびテーブルスキーマの初期化
SLS コネクタは、読み取り中の Logstore のスキーマを維持します。Logstore からデータを読み取る前に、SLS コネクタは各シャードから最大 maxPreFetchLogGroups 個のロググループを事前消費しようと試みます。その後、これらのロググループ内の各ログのスキーマを解析し、スキーマをマージしてテーブルスキーマを初期化します。実際のデータ消費が開始される前に、初期化されたスキーマに基づいて対応するテーブル作成イベントが生成されます。
説明各シャードについて、SLS コネクタは現在時刻の 1 時間前からデータを消費し、ログスキーマを解析しようと試みます。
プライマリキー情報
SLS ログにはプライマリキー情報が含まれていません。変換ルールを使用して、手動でテーブルにプライマリキーを追加できます。
transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2スキーマ推論およびスキーマ進化
テーブルスキーマが初期化された後、schema.inference.strategy が static に設定されている場合、SLS コネクタは各ログを初期テーブルスキーマに基づいて解析し、スキーマ変更イベントを生成しません。schema.inference.strategy が continuous に設定されている場合、SLS コネクタは各ログのデータを解析し、物理列を推論して、現在記録されているスキーマと比較します。推論されたスキーマが現在のスキーマと不一致の場合、次のルールに基づいてスキーマをマージします。
推論された物理列に現在のスキーマにないフィールドが含まれている場合、これらのフィールドがスキーマに追加され、NULL 可能な列を追加するイベントが生成されます。
推論された物理列に現在のスキーマにすでに存在するフィールドが含まれていない場合、これらのフィールドは保持され、データは NULL で埋められ、列を削除するイベントは生成されません。
SLS コネクタは、各ログ内のすべてのフィールドの型を String として推論します。現在、列の追加のみがサポートされています。新しい列は現在のスキーマの末尾に追加され、NULL 可能な列として設定されます。
コード例
SQL ソーステーブルおよび結果テーブル
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;データインジェストデータソース
SLS をデータインジェストジョブのデータソースとして使用し、SLS データをサポートされているダウンストリームシステムにリアルタイムで書き込むことができます。たとえば、次のとおりにデータインジェストジョブを構成して、Logstore のデータを paimon 形式で DLF データレイクに書き込むことができます。ジョブは自動的にフィールドのデータ型およびダウンストリームテーブルのスキーマを推論します。また、実行時の動的スキーマ進化もサポートしています。
source:
type: sls
name: SLS Source
endpoint: ${endpoint}
project: test_project
logstore: test_log
accessId: ${accessId}
accessKey: ${accessKey}
# テーブルにプライマリキー情報を追加します。
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# test_log のすべてのデータを test_database.inventory テーブルに書き込みます。
route:
- source-table: test_project.test_log
sink-table: test_database.inventory
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション)削除ベクターを有効にして読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: trueDataStream API
DataStream API を使用してデータを読み書きする場合は、対応する DataStream コネクタを使用して Flink に接続する必要があります。詳細については、「DataStream コネクタの使用」をご参照ください。
VVR 8.0.10 より前のバージョンを使用している場合、ジョブを起動する際に依存関係 JAR パッケージが不足している可能性があります。この問題を解決するには、追加の依存関係に対応する uber パッケージを追加できます。
SLS からデータを読み取る
Realtime Compute for Apache Flink は、SLS からデータを読み取るための SourceFunction の実装である SlsSourceFunction クラスを提供しています。次のコードはその例です。
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// ストリーミング実行環境を設定します
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// SLS ソースとシンクを作成して追加します。
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source"); // SLSストリームソース
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint"); // エンドポイント
accessInfo.setProjectName("yourProject"); // プロジェクト名
accessInfo.setLogstore("yourLogStore"); // ログストア
accessInfo.setAccessId("yourAccessId"); // アクセスID
accessInfo.setAccessKey("yourAccessKey"); // アクセスキー
// バッチ取得サイズを指定する必要があります。
accessInfo.setBatchGetSize(10);
// オプションパラメータ
accessInfo.setConsumerGroup("yourConsumerGroup"); // コンシューマグループ
accessInfo.setMaxRetries(3); // 最大再試行回数
// 消費開始時刻、現在時刻に設定。
int startInSec = (int) (new Date().getTime() / 1000);
// 消費停止時刻、-1 は停止しないことを意味します。
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}SLS にデータを書き込む
SLS にデータを書き込むための OutputFormat の実装である SLSOutputFormat クラスを提供しています。次のコードはその例です。
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}XML
SLS DataStream コネクタ は Maven セントラルリポジトリで入手可能です。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
</dependency>