すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Function Compute に基づいて拡張機能を開発およびデプロイする

最終更新日:Jan 14, 2025

DataWorks で実行される操作 (特定の操作のブロックなど) を管理するために、拡張機能のカスタムロジックを設定できます。拡張機能は、特定のイベントの処理結果を返し、DataWorks でのプロセス制御を実装します。このトピックでは、Function Compute に基づいて拡張機能を開発およびデプロイする方法について説明します。

背景情報

Function Compute は、イベント駆動型のフルマネージドコンピューティングサービスです。Function Compute に拡張機能をデプロイして、DataWorks が拡張ポイントイベントのメッセージを ExtensionRequest クラスにプッシュできるようにすることができます。拡張機能のコードでは、PojoRequestHandler インターフェースの handleRequest メソッドを実装することで、ExtensionRequest オブジェクトを構築して、拡張ポイントイベントのメッセージと context を受信できます。その後、拡張機能の処理ロジックを記述し、ExtensionResponse クラスを使用して、拡張ポイントイベントの処理結果を DataWorks に返すことができます。このようにして、DataWorks は ExtensionResponse クラスから拡張ポイントイベントの処理結果を自動的に取得し、プロセスをブロックするかどうかを決定します。

制限事項

  • 拡張機能モジュールを使用できるのは、DataWorks Enterprise Edition のユーザーのみです。

  • 拡張機能モジュールは、次のリージョンで使用できます。中国 (北京), 中国 (杭州), 中国 (上海), 中国 (張家口), 中国 (深圳), 中国 (成都), 米国 (シリコンバレー), 米国 (バージニア), ドイツ (フランクフルト), 日本 (東京), 中国 (香港), シンガポール

注意事項

  • [オープン プラットフォーム管理者][テナント管理者]、Alibaba Cloud アカウント、および [aliyundataworksfullaccess] ポリシーがアタッチされている RAM ユーザーのみが、開発者バックエンドに対する読み取りおよび書き込み権限を持っています。権限管理の詳細については、「グローバルレベルのサービスの権限を管理する」および「RAM ポリシーを使用して DataWorks サービスおよび DataWorks コンソールのエンティティの権限を管理する」をご参照ください。

  • DataWorks Enterprise Edition の有効期限が切れると、拡張機能は無効になり、拡張ポイントイベントをチェックするためにトリガーできなくなります。拡張機能がイベントをチェックするためにトリガーされ、DataWorks Enterprise Edition の有効期限が切れる前にチェックが完了しない場合、チェックは終了し、結果は「チェック合格」として返されます。

  • AI プラットフォームノードdo-while ノードfor-each ノード などの複合ノードが拡張チェックをトリガーする場合、後続の操作を実行する前に、複合ノードのすべての内部ノードがチェックに合格するまで待つ必要があります。

  • 同じ拡張ポイントイベントに複数の拡張機能を関連付けることができます。このようにして、イベントが発生したときに関連付けられた拡張機能がトリガーされます。

  • Function Compute に基づいてデプロイされた拡張機能は、次のイベントのみを処理できます。データダウンロードの事前イベントアセットの公開と非公開の事前イベントデータアップロードの事前イベント

プロセス

次の図は、Function Compute に基づいて開発およびデプロイされた拡張機能が拡張ポイントイベントを処理する方法を示しています。

image
説明

拡張機能が拡張ポイントイベントによってトリガーされると、イベントプロセスは [チェック中] 状態になります。拡張機能が API 操作を呼び出して処理結果を DataWorks に送信すると、DataWorks はプロセスをブロックするかどうかを決定します。

ユーザー

Function Compute に基づいて拡張機能をデプロイする前に、オンプレミス マシンで拡張機能を開発する必要があります。fc-java-core ライブラリを使用してハンドラーを実行し、fc_dataworks_demo01-1.0-SNAPSHOT.jar をダウンロードしてサンプルコードを取得できます。詳細については、「イベントハンドラー」をご参照ください。

ステップ 1: 拡張機能の依存関係を設定する

拡張機能を開発する場合は、pom.xml ファイルに次の依存関係を追加する必要があります。

DataWorks 依存関係ライブラリ

<dependency>
 <groupId>com.aliyun</groupId>
 <artifactId>dataworks_public20200518</artifactId>
 <version>5.6.0</version>
</dependency>

Function Compute 依存関係ライブラリ

<!-- https://mvnrepository.com/artifact/com.aliyun.fc.runtime/fc-java-core -->
<dependency>
    <groupId>com.aliyun.fc.runtime</groupId>
    <artifactId>fc-java-core</artifactId>
    <version>1.4.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.aliyun.fc.runtime/fc-java-event -->
<dependency>
    <groupId>com.aliyun.fc.runtime</groupId>
    <artifactId>fc-java-event</artifactId>
    <version>1.2.0</version>
</dependency>

依存関係のパッケージ化

<build>
        <plugins>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-shade-plugin</artifactId>
                  <version>3.2.1</version>
                  <executions>
                    <execution>
                      <phase>package</phase>
                      <goals>
                        <goal>shade</goal>
                      </goals>
                      <configuration>
                        <filters>
                          <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                              <exclude>META-INF/*.SF</exclude>
                              <exclude>META-INF/*.DSA</exclude>
                              <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                          </filter>
                        </filters>
                      </configuration>
                    </execution>
                  </executions>
              </plugin>
        </plugins>
</build>

Apache Maven Shade または Apache Maven Assembly を使用して、依存関係ライブラリをパッケージ化できます。上記のサンプルコードでは、Apache Maven Shade が使用されています。

ステップ 2: 拡張機能を開発する

Function Compute に基づいて拡張機能を開発するには、PojoRequestHandler インターフェースを使用して handleRequest メソッドを実装し、ExtensionRequest クラスと context で指定されたリクエストを受信する必要があります。その後、拡張機能の処理ロジックを記述し、ExtensionResponse クラスを使用して、拡張ポイントイベントの処理結果を DataWorks に返すことができます。

  1. 拡張機能のコードを開発します。

    イベントメッセージを解析する

    DataWorks によって送信されるイベントメッセージの形式については、「開発リファレンス: イベントリストとイベントメッセージの形式」をご参照ください。イベントメッセージでは、messageBody フィールドにメッセージの内容が指定されています。実際の開発では、messageBody.eventCode フィールドを使用してメッセージタイプを確認し、messageId フィールドを使用してメッセージの詳細を取得できます。

    処理ロジックを記述する

    ビジネス要件に基づいて、DataWorks によってプッシュされるイベントメッセージの処理ロジックを記述します。拡張機能のコードを開発する場合は、次のメソッドを使用して、開発効率とアプリケーション効果を向上させることができます。

    • 拡張パラメーターを使用します。たとえば、extension.project.disabled パラメーターを設定して、指定されたワークスペースで拡張機能が有効にならないようにすることができます。詳細については、「高度な機能: 拡張パラメーターを設定する」をご参照ください。

    • DataStudio の拡張ポイントイベントの GetIDEEventDetail 操作で MessageId パラメーターを設定して、拡張ポイントイベントのスナップショットを取得します。

    説明

    MessageId パラメーターは、イベントメッセージの id フィールドに対応しています。詳細については、「開発リファレンス: イベントメッセージとイベントメッセージの形式」トピックの「付録: メッセージ形式」セクションをご参照ください。

    拡張機能の処理結果を DataWorks に返す

    ExtensionResponse クラスをカプセル化する場合は、CheckResult パラメーターを設定する必要があります。DataWorks は CheckResult パラメーターから処理結果を自動的に読み取り、プロセスを失敗させるかどうかを決定します。

    • OK: 拡張ポイントイベントは拡張機能のチェックに合格します。

    • FAIL: 拡張ポイントイベントは拡張機能のチェックに失敗します。報告されたエラーをできるだけ早くチェックして処理し、サービスが期待どおりに実行されるようにする必要があります。

    • WARN: 拡張ポイントイベントは拡張機能のチェックに合格しますが、イベントに対して警告が報告されます。

    をダウンロードして Function Compute にアップロードし、検証とテストを行うことができます。次のタブの内容では、コードの詳細について説明します。

    コードの詳細

    App

    次のサンプルコードは、PojoRequestHandler インターフェースを使用して handleRequest メソッドを実装し、ExtensionRequest クラスと context で指定されたリクエストを受信する方法を示しています。その後、拡張機能の処理ロジックを記述し、ExtensionResponse クラスを使用して、拡張ポイントイベントの処理結果を DataWorks に返すことができます。

    この例では、ADS テーブルへのデータのアップロードは禁止されています。

    package com.aliyun.example;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.aliyun.dataworks.ExtensionRequest;
    import com.aliyun.dataworks.ExtensionResponse;
    import com.aliyun.fc.runtime.Context;
    import com.aliyun.fc.runtime.PojoRequestHandler;
    
    
    public class App implements PojoRequestHandler<ExtensionRequest, ExtensionResponse> {
    
        public ExtensionResponse handleRequest(ExtensionRequest extensionRequest, Context context) {
            // デバッグのためにリクエストの内容を出力します。
             System.out.println(JSON.toJSONString(extensionRequest));
            // レスポンスオブジェクトを作成します。
            ExtensionResponse extensionResponse = new ExtensionResponse();
            // eventType パラメーターの値が upload-data-to-table であるかどうかを確認します。
            if ("upload-data-to-table".equals(extensionRequest.getEventType())) {
                try {
                    // メッセージ本文を文字列に変換し、文字列を JSON オブジェクトに解析します。
                    String messageBodyStr = JSON.toJSONString(extensionRequest.getMessageBody());
                    JSONObject messageBody = JSON.parseObject(messageBodyStr);
                    String tableGuid = messageBody.getString("tableGuid");
                    // tableGuid パラメーターの値に ads が含まれているかどうかを確認します。
                    if (tableGuid != null && tableGuid.contains("ads")) {
                        extensionResponse.setCheckResult("FAIL");
                    } else {
                        extensionResponse.setCheckResult("OK");
                    }
                } catch (Exception e) {
                    extensionResponse.setCheckResult("FAIL");
                    extensionResponse.setErrorMessage("リクエストの処理中にエラーが発生しました: " + e.getMessage());
                    return extensionResponse;
                }
            } else {
                extensionResponse.setCheckResult("FAIL");
            }
    
            // エラーメッセージをフィードバック情報として指定します。
            extensionResponse.setErrorMessage("これはテストです!");
    
            // レスポンスオブジェクトを返します。
            return extensionResponse;
        }
    }
    
    

    ExtensionRequest

    DataWorks によって送信されるイベントメッセージをカプセル化するために、拡張機能のリクエスト構造を定義します。

    public class ExtensionRequest {
        private Object messageBody;
        private String messageId;
        private String extensionBizId;
        private String extensionBizName;
        private String eventType;
        private String eventCategoryType;
        private Boolean blockBusiness;
    
        public ExtensionRequest() {
        }
    
        public Object getMessageBody() {
            return this.messageBody;
        }
    
        public void setMessageBody(Object messageBody) {
            this.messageBody = messageBody;
        }
    
        public String getMessageId() {
            return this.messageId;
        }
    
        public void setMessageId(String messageId) {
            this.messageId = messageId;
        }
    
        public String getExtensionBizId() {
            return this.extensionBizId;
        }
    
        public void setExtensionBizId(String extensionBizId) {
            this.extensionBizId = extensionBizId;
        }
    
        public String getExtensionBizName() {
            return this.extensionBizName;
        }
    
        public void setExtensionBizName(String extensionBizName) {
            this.extensionBizName = extensionBizName;
        }
    
        public String getEventType() {
            return this.eventType;
        }
    
        public void setEventType(String eventType) {
            this.eventType = eventType;
        }
    
        public String getEventCategoryType() {
            return this.eventCategoryType;
        }
    
        public void setEventCategoryType(String eventCategoryType) {
            this.eventCategoryType = eventCategoryType;
        }
    
        public Boolean getBlockBusiness() {
            return this.blockBusiness;
        }
    
        public void setBlockBusiness(Boolean blockBusiness) {
            this.blockBusiness = blockBusiness;
        }
    }

    ExtensionResponse

    拡張機能の処理結果をカプセル化するために、拡張機能のレスポンス構造を定義します。

    public class ExtensionResponse {
        private String checkResult;
        private String errorMessage;
    
        public ExtensionResponse() {
        }
    
        public String getCheckResult() {
            return this.checkResult;
        }
    
        public void setCheckResult(String checkResult) {
            this.checkResult = checkResult;
        }
    
        public String getErrorMessage() {
            return this.errorMessage;
        }
    
        public void setErrorMessage(String errorMessage) {
            this.errorMessage = errorMessage;
        }
    }
  2. 拡張コードを実行可能な .jar パッケージまたは .zip ファイルにパッケージ化して、後で Function Compute で使用します。

    コードエディターの CLI を開き、ルートディレクトリに切り替えてから、mvn clean package コマンドを実行して、拡張コードをパッケージ化します。

    • コンパイルに失敗したというメッセージが表示された場合は、エラーメッセージに基づいてコードを修正してください。

    • コンパイルが成功した場合、コンパイルされた JAR ファイルは、プロジェクトフォルダーの target ディレクトリにあり、pom.xmlartifactId フィールドと version フィールドに基づいて java-example-1.0-SNAPSHOT.jar という名前が付けられます。

    説明

    macOS および Linux OS の場合、関連するコードファイルをパッケージ化する前に、そのファイルに対する 読み取り および 実行 権限があることを確認してください。

Function Compute

ステップ 1: 拡張機能をデプロイする

  1. Function Compute コンソール 2.0 にログオンします。左側のナビゲーションペインで、[タスク] をクリックします。

  2. [関数を作成] をクリックして、Function Compute サービスと必要な関数を作成し、拡張機能をデプロイします。開発した拡張機能は、特定のイベントメッセージを作成されたサービスに直接送信します。詳細については、「サービスを作成する」および「関数を作成する」をご参照ください。次の図は、関数を構成する方法を示しています。image

    • コードに基づいてランタイム環境を選択します。ステップ 1 で提供されているサンプルパッケージ fc_dataworks_demo01-1.0-SNAPSHOT.jar を使用する場合は、[ランタイム] パラメーターを Java 8 に設定します。fc_dataworks_demo01-1.0-SNAPSHOT.jar パッケージをオンプレミスのマシンにダウンロードし、[コードパッケージ] としてアップロードする必要があります。ビジネス要件に基づいてパラメーターを構成できます。

    • 関数で実行できる操作の詳細については、「関数を管理する」をご参照ください。

  3. 関数のハンドラーを構成します。

    Function Compute コンソールで関数のハンドラーを構成できます。Java 言語を使用する関数の場合、[パッケージ名].[クラス名]::[メソッド名] 形式でハンドラーを構成できます。例:

    • パッケージ名: example

    • クラス名: HelloFC

    • メソッド名: handleRequest

    ハンドラーは example.HelloFC::handleRequest です。

    説明

    デフォルトのハンドラーは example.App::handleRequest です。ビジネス要件に基づいてハンドラーを変更できます。詳細については、ハンドラー」をご参照ください。

ステップ 2: 関数をテストする

関数を作成した後、次の操作を実行して関数をテストできます。作成された関数を見つけ、関数の名前をクリックします。表示されるページで、[コード] タブをクリックします。次に、[関数のテスト] をクリックします。関数がテストに合格したら、DataWorks コンソールで拡張機能を登録できます。

DataWorks

ステップ 1:拡張機能を登録する

Function Compute に拡張機能をデプロイした後、DataWorks に拡張機能を登録する必要があります。 DataWorks に拡張機能を登録するには、次の手順を実行します。

  1. Open Platform ページに移動します。

    DataWorks コンソール にログオンします。 上部のナビゲーションバーで、目的のリージョンを選択します。 左側のナビゲーションペインで、[その他] > [open Platform] を選択します。 [開発者バックエンド] タブが表示されます。

  2. 拡張機能を登録します。

    1. 表示されるページの左側のナビゲーションペインで、[拡張機能] をクリックします。

    2. [拡張機能の一覧] セクションの [拡張機能の登録] をクリックします。 拡張機能の登録ウィザードのデプロイ方法の選択ステップで、拡張機能のデプロイ方法の選択パラメーターを [function Compute に基づいてデプロイ (推奨)] に設定し、拡張機能の登録ステップでパラメーターを設定します。

    次の表に、拡張機能を登録するためのパラメーターを示します。 ビジネス要件に基づいてパラメーターを設定できます。

    パラメーター

    パラメーター

    説明

    拡張機能名

    拡張機能を識別するために使用されるカスタム拡張機能の名前。

    Function Compute の選択と関数の選択

    拡張機能をデプロイする Function Compute サービスと関数。 開発した拡張機能は、特定のイベントメッセージをサービスに直接送信します。

    処理された拡張ポイント

    データダウンロードの事前イベント、アセットの公開と非公開の事前イベント、およびデータアップロードの事前イベント によってトリガーされるメッセージのみを処理できます。

    説明

    このパラメーターを設定すると、システムは [イベント] パラメーターと [適用可能なサービス] パラメーターを自動的に設定します。

    所有者

    拡張機能の所有者。 拡張機能のユーザーは、問題が発生した場合に、できるだけ早く拡張機能の所有者に連絡できます。

    テスト用ワークスペース

    拡張機能をテストするために使用されるワークスペース。 拡張機能が有効かどうかを確認するために、拡張機能を公開する必要はありません。テストワークスペースは、拡張機能が期待どおりに動作するかどうかを確認できるエンドツーエンドのテストをサポートしているためです。

    テストワークスペースでは、開発者はイベントをトリガーして、DataWorks が EventBridge を使用して拡張機能に関連メッセージを送信するかどうか、および拡張機能がイベントをチェックしてチェック結果を DataWorks に送信するかどうかを確認できます。

    説明

    処理する拡張ポイントイベント がテナントレベルの拡張ポイントイベントである場合、[テスト用ワークスペース] パラメーターを設定する必要はありません。

    拡張機能の詳細アドレス

    拡張機能の詳細ページの URL。

    拡張機能を開発してデプロイした後、拡張機能の動作を表示する Web ページを開発できます。 ユーザーがこの Web ページにアクセスして拡張機能をよりよく理解し、使用できるように、このパラメーターをページ URL に設定します。 たとえば、ユーザーは Web ページにアクセスして、プロセスが拡張機能によってチェックおよびブロックされた理由を確認できます。

    拡張機能のドキュメントアドレス

    拡張機能のドキュメントページの URL。

    拡張機能を開発してデプロイした後、ヘルプドキュメントページを開発できます。 ユーザーが拡張機能のビジネスロジックとプロパティを理解できるように、このパラメーターをページ URL に設定します。

    拡張機能のパラメーター

    拡張機能の開発とアプリケーションの効率を向上させるために使用する拡張機能のパラメーター。

    一般的なシナリオの組み込みパラメーターと、参照用のカスタムパラメーターの両方を入力できます。

    key=value の形式で複数のパラメーターを入力できます。 各パラメーターが別の行を占めるようにしてください。 これらのパラメーターの使用方法の詳細については、「高度な機能:拡張機能パラメーターの設定」をご参照ください。

    拡張機能のオプション

    拡張機能の設定項目。 設定項目は、ユーザーがビジネス要件に基づいて異なるワークスペースでカスタムプロセス管理を実装するために提供されます。 拡張機能の開発者は、[拡張機能の登録] ダイアログボックスで各設定項目を JSON 文字列で定義する必要があります。

    たとえば、拡張機能の開発者は、このパラメーターに基づいて、ユーザーが SQL ステートメントの長さを管理できるようにすることができます。 JSON 形式の詳細については、「高度な機能:拡張機能のオプションの定義」をご参照ください。

  3. [OK] をクリックします。

    拡張機能を登録した後、[拡張機能] ページの [拡張機能の一覧] セクションで拡張機能を表示できます。

ステップ 2:拡張機能を公開する

DataWorks で拡張機能を開発、デプロイ、および登録した後、拡張機能をテストし、レビューのために拡張機能を送信してから、拡張機能を公開する必要があります。 その後、管理者は、拡張機能の所有者以外に、管理センターで拡張機能を有効にできます。 詳細については、「拡張機能の使用」をご参照ください。

付録: Function Compute に送信されるイベントメッセージの形式

次のサンプルコードは、Function Compute にプッシュされるイベントメッセージの種類の一般的な形式を示しています。 messageBody パラメーターは、DataWorks イベントメッセージの詳細を指定します。イベントメッセージの内容は、イベントの種類によって異なります。

{
	"blockBusiness": true,
	"eventCategoryType": "resources-download",// イベントカテゴリ。
	"eventType": "upload-data-to-table",// イベントタイプ。
	"extensionBizId": "job_6603643923728775070",
	"messageBody": {
             // イベントメッセージの内容はイベントタイプによって異なります。次のフィールドは、イベントメッセージ内の固定フィールドです。
             "tenantId": 28378****10656,// テナントの ID。DataWorks の各 Alibaba Cloud アカウントはテナントに対応しています。各テナントには独自のテナント ID があります。テナント ID を表示するには、DataStudio ページの右上隅にあるユーザー名をクリックし、[メニュー] セクションの [ユーザー情報] をクリックします。
             "eventCode": "xxxx"//
	},
	"messageId": "52d44ee7-b51f-4d4d-afeb-*******"// イベント ID。この ID は、イベントの一意の識別子です。
}
説明

messageBody フィールドの内容は、イベントの種類によって異なります。イベントメッセージの詳細については、「開発リファレンス: イベントリストとイベントメッセージの形式」をご参照ください。

参考資料