Realtime Compute for Apache Flinkを使用すると、OSSまたはOSS-HDFSからデータを読み書きできます。 OSSまたはOSS-HDFSコネクタの入力属性を設定すると、Realtime Compute for Apache Flinkは指定されたパスからデータを自動的に読み取り、そのデータを入力ストリームとして使用します。 次に、Realtime Compute for Apache Flinkは、指定された形式でコンピューティング結果をOSSまたはOSS-HDFSの指定されたパスに書き込みます。
前提条件
フルマネージドFlinkが有効化されています。 詳細については、「フルマネージドFlinkの有効化」をご参照ください。
フルマネージドFlinkを有効にすると、作成されたワークスペースが [フルマネージドFlink] タブに5〜10分以内に表示されます。
SQLドラフトが作成されます。
SQLドラフトを作成するときは、エンジンバージョンがVVR 8.0.1以降のApache Flink用のRealtime Computeを選択する必要があります。 詳細については、「手順」をご参照ください。
制限
Realtime Compute for Apache Flinkを使用して、Alibaba Cloudアカウント内のOSSバケットまたはOSS-HDFSのみからデータを読み書きできます。
Avro、CSV、JSON、Rawなどの行指向のストレージ形式でデータをOSSまたはOSS-HDFSに書き込むことはできません。 詳細については、FLINK-30635をご覧ください。
手順
[新しい下書き] ダイアログボックスに移動します。
[Fully Managed Flink] タブで、管理するワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[SQLエディター] をクリックします。
SQLエディターでDDLおよびDMLステートメントを記述します。
srcbucketバケットのdirパスのソーステーブルからdestbucketバケットのテストパスの結果テーブルにデータを書き込みます。
説明次のコードを使用してOSS-HDFSからデータを読み書きする場合は、srcbucketバケットとdestbucketバケットのOSS-HDFSが有効になっていることを確認してください。
TEMPORARYテーブルの作成source_table ( 'file.name' STRING NOT NULL、 'file.path' STRING NOT NULL METDATA ) WITH ( 'connector'='filesystem' 、 'path'='oss:// srcbucket/dir/'、 'format'='parquet' ); TEMPORARY TABLE target_tableの作成 ( 「名前」ストリング、 「パス」ストリング ) with ( 'connector'='filesystem' 、 'path'='oss:// destbucket/test/'、 'format'='parquet' ); INSERT INTO target_table SELECT * FROM source_table ;
d file.nameのfile.pathなど、ソーステーブルでサポートされているメタデータ列の詳細、およびWITHパラメーターの使用方法については、「OSSコネクタ」をご参照ください。
[保存] をクリックします。
[検証] をクリックします。
ドラフトのSQLセマンティクス、ネットワーク接続、およびドラフトで使用されるテーブルのメタデータ情報を確認します。 計算結果で [SQLアドバイス] をクリックして、SQLリスクと関連する最適化の提案に関する情報を表示することもできます。
[デプロイ] をクリックします。
ドラフト開発と構文チェックが完了したら、ドラフトをデプロイしてデータを本番環境に公開できます。
オプションです。 この手順は、OSS-HDFSからデータを読み取る必要がある場合にのみ実行してください。
下書きをクリックします。 [設定] タブの [パラメーター] セクションで、OSS-HDFSのAccessKeyペアとエンドポイントなどのパラメーターを設定し、[保存] をクリックします。
fs.oss.jindo.buckets: srcbucket;destbucket fs.oss.jindo.accessKeyId: LTAI5t7h6SgiLSganP2m **** fs.oss.jindo.accessKeySecret: KZo149BD9GLPNiDIEmdQ7dyNKG **** fs.oss.jindo.endpoint: cn-hangzhou.oss-dls.aliyuncs.com
次の表に、上記のコードのパラメーターを示します。
パラメーター
説明
fs.oss.jindo.buckets
ソーステーブルが格納されているバケットの名前と、結果テーブルが格納されているバケットの名前。 バケット名はセミコロン (;) で区切ります。 例:
srcbucket;destbucket
fs.oss.jindo.accessKeyId
Alibaba CloudアカウントまたはRAMユーザーのAccessKey ID。 AccessKey IDの取得方法の詳細については、「RAMユーザーのAccessKeyペアに関する情報の表示」をご参照ください。
fs.oss.jindo.accessKeySecret
Alibaba CloudアカウントまたはRAMユーザーのAccessKeyシークレット。 AccessKeyシークレットの取得方法の詳細については、「RAMユーザーのAccessKeyペアに関する情報の表示」をご参照ください。
fs.oss.jindo.endpoint
OSS-HDFSへのアクセスに使用されるエンドポイント。 例: cn-hangzhou.oss-dls.aliyuncs.com。
[デプロイメント] ページで、[操作] 列の [開始] をクリックし、ドラフトが [実行中] 状態になるのを待ちます。
OSSまたはOSS-HDFS結果テーブルの指定されたストレージパスに書き込まれたデータを表示します。
OSSにデータを書き込む場合、OSSコンソールの [オブジェクト] ページの [OSSオブジェクト] タブで、書き込まれたデータを表示できます。 OSS-HDFSにデータを書き込む場合、OSSコンソールの [オブジェクト] ページの [HDFS] タブで書き込まれたデータを表示できます。