すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for ClickHouse:SQL文を使用してFlinkデータをApsaraDB for ClickHouseクラスターに書き込む

最終更新日:Oct 17, 2024

このトピックでは、SQL文を使用してFlinkデータをApsaraDB for ClickHouseに書き込む方法について説明します。

制限事項

Realtime compute for Apache FlinkのコンピューティングエンジンがVerverica Runtime (VVR) 3.0.2以降の場合にのみ、SQL文を使用してFlinkデータをApsaraDB for ClickHouseに書き込むことができます。

前提条件

  • ApsaraDB for ClickHouseでテーブルが作成されます。 詳細については、「バケットの作成」をご参照ください。

  • ホワイトリストはApsaraDB for ClickHouseで設定されています。 詳細については、「ホワイトリストの設定」をご参照ください。

  • フルマネージドFlinkが有効化されています。 詳細については、「サービスを有効化する手順」をご参照ください。

手順

  1. フルマネージドFlinkのコンソールにログインし、SQLドラフトを作成します。

    1. Realtime Compute for Apache Flinkコンソール.

    2. 管理するワークスペースを見つけて、コンソールで、アクション列を作成します。

    3. 左側のナビゲーションウィンドウで、[開発] > [ETL] をクリックします。 SQLエディターページの左上隅で、[新規作成] をクリックします。

    4. [新しい下書き] ダイアログボックスの [SQLスクリプト] タブで、空のストリームドラフト.

      フルマネージドFlinkは、さまざまなSQLコードテンプレートを提供し、データ同期をサポートします。 各コードテンプレートは、特定のシナリオ、コードサンプル、および手順を提供します。 目的のテンプレートをクリックすると、Flinkの機能と関連する構文についてすばやく学習し、ビジネスロジックを実装できます。 詳細については、「コードテンプレート」および「データ同期テンプレート」をご参照ください。

    5. 次へ.

    6. [新しいドラフト] ダイアログボックスで、ドラフトのパラメーターを設定します。 下表にパラメーターを示します。

      パラメーター

      説明

      名前

      作成するドラフトの名前。

      説明

      ドラフト名は、現在のプロジェクトで一意である必要があります。

      場所

      ドラフトのコードファイルが保存されているフォルダ。

      既存のフォルダの右側にある新建文件夹アイコンをクリックして、サブフォルダを作成することもできます。

      エンジン版

      ドラフトで使用されるFlinkのエンジンバージョン。 エンジンのバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。

    7. クリック作成.

  2. [ドラフトエディタ] ページで、コードを記述して実行します。

    1. ソーステーブルと結果テーブルを作成します。 次に、ソーステーブルのデータを結果テーブルに挿入します。

      -- Create a source table named sls_test_single_local. 
      CREATE TEMPORARY TABLE sls_test_single_local (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      -- Create a result table named clickhouse_output. 
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://demo.aliyuncs.com:8123',
        'userName' = 'test',
        'password' = '280226Ck',
        'tableName' = 'sls_test_single_local'
      );
      -- Insert data from the source table into the result table. 
      INSERT INTO clickhouse_output
      SELECT 
        id,
        name,
        age,
        rate
      FROM sls_test_single_local;
      説明

      結果テーブルを作成するための構文の詳細については、「ClickHouse結果テーブルの作成」をご参照ください。

    2. [保存] をクリックします。

    3. [検証] をクリックします。

    4. [公開] をクリックします。

    5. [下書きの公開] ダイアログボックスで、[OK] をクリックして、コードを本番環境に公開します。

    6. ドラフトが正常に公開されました。操作の詳細を確認してくださいメッセージで、操作をクリックします。

  3. [デプロイメント] ページで、ジョブを開始します。

    1. 右上隅の [開始] をクリックします。

    2. [デプロイ開始設定] ページで、[開始] をクリックします。

  4. ApsaraDB for ClickHouseのクエリテーブルをします。

    1. ApsaraDB for ClickHouseコンソールにログインします。

    2. 左上隅で、データ同期ジョブでデプロイされたクラスターが配置されているリージョンを選択します。

    3. [クラスター] ページで、[デフォルトインスタンス] タブをクリックします。 データ同期ジョブでデプロイされたクラスターを見つけ、クラスターIDをクリックします。

    4. [クラスターの詳細] ページで、右上隅の [データベースにログイン] をクリックします。

    5. [データベースインスタンスにログイン] ダイアログボックスで、データベースアカウントのユーザー名とパスワードを入力し、[ログイン] をクリックします。

    6. クエリステートメントを入力し、[実行 (F8)] をクリックします。 この例では、次のサンプル文を実行します。

      select * from db01.sls_test_single_local;

      次の結果が返されます。 Use Realtime Compute for Apache Flink to insert data into a table in ApsaraDB for ClickHouse