すべてのプロダクト
Search
ドキュメントセンター

Object Storage Service:Apache Flinkを使用してOSS-HDFSにデータを書き込む

最終更新日:May 06, 2024

オープンソースのApache Flinkを使用して、ストリーミングモードでOSS-HDFSにデータを書き込んだり、1回だけのセマンティクスを使用してストレージメディアにデータを書き込んだりすることはできません。 Apache Flinkで1回だけセマンティクスを使用してストリーミングモードでOSS-HDFSにデータを書き込む場合は、Apache Flinkと一緒にJindoSDKを使用する必要があります。

説明

Apache Flinkを使用してストリーミングモードでデータをOSS-HDFSに書き込む前にJindoSDKをデプロイしたくない場合は、Realtime Compute for Apache Flinkを使用してデータをOSS-HDFSに書き込むことができます。 詳細については、「Realtime Compute For Apache Flinkを使用したOSSまたはOSS-HDFSへのデータの読み書き」をご参照ください。

前提条件

  • Elastic Compute Service (ECS) インスタンスが作成されました。 詳細は、インスタンスの作成をご参照ください。

  • バケットのOSS-HDFSが有効になり、OSS-HDFSに対するアクセス許可が付与されます。 詳細については、「OSS-HDFSの有効化とアクセス許可の付与」をご参照ください。

  • Apache Flink 1.10.1以降がダウンロードされ、インストールされます。 Flink 1.16.0以降の可用性は検証されていません。 Apache Flinkのインストールパッケージとバージョンの説明の詳細については、Apache Flinkをご参照ください。

JindoSDKの設定

  1. ECS インスタンスにログインします。 詳細については、「インスタンスへの接続」をご参照ください。

  2. 最新バージョンのJindoSDK JARパッケージをダウンロードして解凍します。 詳細については、『GitHub』をご参照ください。

  3. JindoSDKのplugins/flink/ ディレクトリにあるjindo-flink-${version}-full.jarファイルを、Apache Flinkが存在するルートディレクトリのlibディレクトリに移動します。

    mv plugins/flink/jindo-flink-${version}-full.jar lib/
重要
  • Apache FlinkにOSS (Object Storage Service) コネクタが組み込まれている場合は、libサブディレクトリまたはplugins/oss-fs-hadoopパスからflink-oss-fs-hadoop-${flink-version}.jarファイルを削除して削除する必要があります。

  • JindoSDKを設定した後、Flinkジョブを使用してストリーミングモードでOSS-HDFSにデータを書き込むことができます。 OSS-HDFSまたはossにデータを書き込むには、OSS: // プレフィックスを使用する必要があります。 JindoSDKは書き込まれたデータを自動的に認識します。

  1. 一般的な設定を行います。

    1回限りのセマンティクスを使用してデータをOSS-HDFSに書き込むには、次の操作を実行します。

    1. Apache Flinkのチェックポイントメカニズムを有効にします。

      サンプルコード:

      1. 次のコマンドを実行して、StreamExecutionEnvironmentクラスを作成します。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      2. 次のコマンドを実行して、チェックポイントメカニズムを有効にします。

        env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
    2. Kafkaなど、データの再送信をサポートするデータソースを使用します。

  2. Apache Flinkを使用するようにクイック設定を構成します。

    oss:// で始まるパスを含め、Apache Flinkを有効にするOSS-HDFSにバケットとエンドポイントを使用できます。 このメソッドは、追加の依存関係を必要としません。

    1. シンクを追加します。

      次のサンプルコードは、OutputStreamを使用してDataStream<String> オブジェクトをOSS-HDFSに書き込む方法の例を示しています。

      String outputPath = "oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>"
      StreamingFileSink<String> sink = StreamingFileSink.forRowFormat (
              新しいパス (outputPath) 、
              新しいSimpleStringEncoder<String>("UTF-8")
      ).build();
      outputStream.addSink (シンク); 
      重要

      前の例では、. <oss-hdfs-endpoint>はオプションです。 このフィールドを無視する場合は、FlinkコンポーネントまたはHadoopコンポーネントでOSS-HDFSの正しいエンドポイントを指定してください。

    2. Flinkジョブを実行するには、env.exe cute() を使用します。

(オプション) カスタム設定の構成

Flinkジョブを送信するときに、カスタムパラメーターを設定して、特定の機能を有効または管理できます。

次のサンプルコードは、-yDを使用してYarnクラスターに基づいてFlinkジョブ送信の設定を構成する方法の例を示しています。

<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2...

エントロピー注入を有効にできます。 エントロピー注入機能を使用すると、宛先パスの特定の文字列をランダムな文字列に置き換えることができます。 このように、書き込まれたデータは、指定されたパスに基づいて異なるパーティションに分散され、書き込みパフォーマンスが向上します。

OSS-HDFSにデータを書き込むには、次の設定を完了する必要があります。

oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length> 

オブジェクトにデータを書き込むと、書き込みパスの <user-defined-key> 文字列がランダムな文字列に置き換えられます。 ランダム文字列の長さは、<user-defined-length> の値と同じである必要があります。 <user-defined-length> の値は0より大きくなければなりません。