このトピックでは、API操作を使用してData Integrationでデータ同期ノードを作成、構成、管理する方法について説明します。
前提条件
Mavenプロジェクトが作成されていること。詳細については、Mavenプロジェクトの作成をご参照ください。
ワークフローが作成されていること。詳細については、自動トリガーワークフローの作成をご参照ください。
データソースが作成され、データ同期のためにDataWorksに追加されていること。詳細については、データソースの追加をご参照ください。
制限事項
DataWorksでは、API操作を使用してバッチ同期ノードのみを作成、構成、管理できます。
CreateDISyncTask API操作を呼び出すことによって作成されたデータ同期ノードは、コードエディターのみを使用して構成できます。詳細については、コードエディターを使用したバッチ同期タスクの構成をご参照ください。
DataWorksでは、API操作を使用してワークフローを作成することはできません。API操作を使用して同期ノードを作成できるのは、既存のワークフロー内のみです。
準備
Mavenの依存関係を構成します。
Mavenプロジェクトの pom.xml ファイルを開き、
aliyun-java-sdk-core
をファイルに追加します。<dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.20</version> </dependency>
Mavenプロジェクトの pom.xml ファイルを開き、
aliyun-java-sdk-dataworks-public
をファイルに追加します。<dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-dataworks-public</artifactId> <version>3.3.18</version> </dependency>
Mavenプロジェクトの pom.xml ファイルを開き、
credentials-java
をファイルに追加します。 最新バージョンの credentials-java を使用することをお勧めします。<dependency> <groupId>com.aliyun</groupId> <artifactId>credentials-java</artifactId> <version>0.2.11</version> </dependency>
説明Alibaba Cloud Credentialsツールを使用してAccessKeyペアを管理できます。 Alibaba Cloud Credentialsツールの使用方法については、認証情報の構成をご参照ください。
アカウントを認証します。
API操作を使用してデータ同期ノードを作成する前に、次のコードを実行して、DataWorksへのアクセスに使用するAlibaba Cloudアカウントを認証する必要があります。アカウントが認証に合格した場合、後続の操作を実行できます。アカウントが認証に失敗した場合、エラーが返され、エラーに基づいて問題を解決する必要があります。
// 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET が設定されていることを確認してください。https://www.alibabacloud.com/help/en/alibaba-cloud-sdk-262060/latest/configure-credentials-378659 IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
概要
上記の準備が完了したら、API操作を呼び出して次の手順を実行できます。
手順
Data Integrationでデータ同期ノードを作成します。
CreateDISyncTask 操作を呼び出して、Data Integrationでデータ同期ノードを作成できます。次のコードは、いくつかのパラメーターの設定例を示しています。詳細については、CreateDISyncTaskをご参照ください。
public void createFile() throws ClientException{ CreateDISyncTaskRequest request = new CreateDISyncTaskRequest(); request.setProjectId(181565L); request.setTaskType("DI_OFFLINE"); request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"テーブル same のコメント\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_first\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"テーブル same のコメント\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}"); // 同期タスクの内容を設定します。 request.setTaskParam("{\"FileFolderPath\":\"ビジネスフロー/new_biz/データ統合\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}"); // Data Integration 専用のリソースグループを使用します。 request.setTaskName("new_di_task_0607_1416"); String regionId = "cn-hangzhou"; // 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET が設定されていることを確認してください。https://www.alibabacloud.com/help/en/alibaba-cloud-sdk-262060/latest/configure-credentials-378659 IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com"); IAcsClient client; client = new DefaultAcsClient(profile); CreateDISyncTaskResponse response1 = client.getAcsResponse(request); Gson gson1 = new Gson(); System.out.println(gson1.toJson(response1)); }
UpdateFile 操作を呼び出して、ノードのスケジューリング依存関係を構成します。
次の表に、操作のリクエストパラメーターを示します。
パラメーター
タイプ
必須
例
説明
Action
String
はい
UpdateFile
実行する操作。
FileFolderPath
String
いいえ
ビジネスフロー/ 1 /データ統合/フォルダー 1 /フォルダー 2
ファイルのパス。
ProjectId
Long
いいえ
10000
DataWorksワークスペースの ID。DataWorksコンソールにログインし、ワークスペース管理ページに移動してワークスペース ID を取得できます。
FileName
String
いいえ
ods_user_info_d
ファイルの名前。 FileName パラメーターを新しい値に設定して、ファイル名を変更できます。
ListFiles 操作を呼び出して、名前を変更するファイルの ID をクエリできます。次に、UpdateFile 操作を呼び出すときに、FileId パラメーターを ID に設定し、FileName パラメーターを新しい値に設定できます。
FileDescription
String
いいえ
ファイルの説明
ファイルの説明。
Content
String
いいえ
SELECT "1";
ファイルのコード。コード形式はファイルタイプによって異なります。特定のファイルタイプのコード形式を表示するには、オペレーションセンターに移動し、ファイルタイプのノードを右クリックして、コードの表示を選択します。
AutoRerunTimes
Integer
はい
3
エラー発生後に許可される自動再実行の回数。
AutoRerunIntervalMillis
Integer
いいえ
120000
エラー発生後、2 回連続する自動再実行の間隔。単位:ミリ秒。最大値:1800000(30 分)。
このパラメーターは、DataWorksコンソールのプロパティタブのスケジュールセクションで、エラー時に自動再実行チェックボックスが選択された後に表示される再実行間隔パラメーターに対応します。
DataWorksコンソールで指定する間隔は分単位です。操作を呼び出すときは、時間の単位の変換に注意してください。
RerunMode
String
いいえ
ALL_ALLOWED
ファイルに対応するノードを再実行できるかどうかを指定します。有効な値:
ALL_ALLOWED:ノードが正常に実行されたか、実行に失敗したかに関係なく、ノードを再実行できます。
FAILURE_ALLOWED:ノードは、実行に失敗した場合にのみ再実行できます。
ALL_DENIED:ノードが正常に実行されたか、実行に失敗したかに関係なく、ノードを再実行できません。
このパラメーターは、DataWorksコンソールのプロパティタブのスケジュールセクションにある再実行パラメーターに対応します。
Stop
Boolean
いいえ
false
ノードのスケジューリングを一時停止するかどうかを指定します。有効な値:
true:ノードのスケジューリングを一時停止します。
false:ノードのスケジューリングを一時停止しません。
このパラメーターは、DataWorksコンソールのプロパティタブのスケジュールセクションにある繰り返しパラメーターに対応します。
ParaValue
String
いいえ
x=a y=b z=c
ノードのスケジューリングパラメーター。
このパラメーターは、DataWorksコンソールのプロパティタブのパラメーターセクションに対応します。詳細については、スケジューリングパラメーターの構成をご参照ください。
StartEffectDate
Long
いいえ
936923400000
自動スケジューリングの開始時刻。このパラメーターは、1970 年 1 月 1 日 00:00:00 UTC から経過したミリ秒数を表す UNIX タイムスタンプに設定します。
このパラメーターは、DataWorksコンソールのプロパティタブのスケジュールセクションで、有効期間パラメーターに指定された開始時刻に対応します。
EndEffectDate
Long
いいえ
4155787800000
自動スケジューリングの終了時刻。このパラメーターは、1970 年 1 月 1 日 00:00:00 UTC から経過したミリ秒数を表す UNIX タイムスタンプに設定します。
このパラメーターは、DataWorksコンソールのプロパティタブのスケジュールセクションで、有効期間パラメーターに指定された終了時刻に対応します。
CronExpress
String
いいえ
00 00-59/5 1-23 * * ?
ノードの定期スケジューリングポリシーを表す CRON 式。このパラメーターは、DataWorksコンソールのプロパティタブのスケジュールセクションにある Cron 式パラメーターに対応します。 DataWorksコンソールでスケジューリングサイクルと実行時刻パラメーターを構成すると、DataWorks は Cron 式パラメーターの値を自動的に生成します。
例:
毎日 05:30 に実行するようにスケジュールされているノードの CRON 式:
00 30 05 * * ?
毎時 15 分に実行するようにスケジュールされているノードの CRON 式:
00 15 * * * ?
10 分ごとに実行するようにスケジュールされているノードの CRON 式:
00 00/10 * * * ?
毎日 08:00 から 17:00 まで 10 分ごとに実行するようにスケジュールされているノードの CRON 式:
00 00-59/10 8-23 * * * ?
毎月 1 日の 00:20 に実行するようにスケジュールされているノードの CRON 式:
00 20 00 1 * ?
1 月 1 日の 00:10 から 3 か月ごとに実行するようにスケジュールされているノードの CRON 式:
00 10 00 1 1-12/3 ?
毎週火曜日と金曜日の 00:05 に実行するようにスケジュールされているノードの CRON 式:
00 05 00 * * 2,5
DataWorks のスケジューリングシステムは、CRON 式に次の制限を課します。
ノードは、最短 5 分間隔で実行するようにスケジュールできます。
ノードを毎日実行するようにスケジュールできる最も早い時刻は 05:00 です。
CycleType
String
いいえ
NOT_DAY
ノードのスケジューリングサイクルのタイプ。有効な値:NOT_DAY と DAY。値 NOT_DAY は、ノードが分または時間単位で実行するようにスケジュールされていることを示します。値 DAY は、ノードが日、週、または月単位で実行するようにスケジュールされていることを示します。
このパラメーターは、DataWorksコンソールのプロパティタブのスケジュールセクションにあるスケジューリングサイクルパラメーターに対応します。
DependentType
String
いいえ
USER_DEFINE
ノードのクロスサイクルスケジューリング依存関係のタイプ。有効な値:
SELF:現在のサイクルでノードに対して生成されたインスタンスは、前のサイクルでノードに対して生成されたインスタンスに依存します。
CHILD:現在のサイクルでノードに対して生成されたインスタンスは、前のサイクルでノードの最も近いレベルの子孫ノードに対して生成されたインスタンスに依存します。
USER_DEFINE:現在のサイクルでノードに対して生成されたインスタンスは、前のサイクルで 1 つ以上の指定されたノードに対して生成されたインスタンスに依存します。
NONE:ノードにクロスサイクルスケジューリング依存関係タイプは指定されていません。
DependentNodeIdList
String
いいえ
5,10,15,20
DependentType パラメーターが USER_DEFINE に設定されている場合、ファイルに対応するノードが依存するノードの ID。複数の ID を指定する場合は、コンマ(,)で区切ります。
このパラメーターの値は、DataWorksコンソールのプロパティタブの依存関係セクションで、前のサイクルを選択し、依存関係を他のノードに設定した後に指定したノードの ID に対応します。
InputList
String
いいえ
project_root,project.file1,project.001_out
現在のファイルが依存する親ファイルの出力名。複数の出力名を指定する場合は、コンマ(,)で区切ります。
このパラメーターは、DataWorksコンソールのプロパティタブの依存関係セクションにある親ノードの下の出力名パラメーターに対応します。
ProjectIdentifier
String
いいえ
dw_project
DataWorks ワークスペースの名前。 DataWorks コンソールにログインし、[ワークスペース管理] ページに移動してワークスペース名を取得できます。
操作を適用する DataWorks ワークスペースを決定するには、このパラメーターまたは ProjectId パラメーターを構成する必要があります。
FileId
Long
はい
100000001
ファイルの ID。 ListFiles 操作を呼び出して ID を取得できます。
OutputList
String
いいえ
dw_project.ods_user_info_d
ファイルの出力名。
このパラメーターは、DataWorksコンソールのプロパティタブの依存関係セクションにある出力の下の出力名パラメーターに対応します。
ResourceGroupIdentifier
String
いいえ
default_group
ノードの実行に使用するリソースグループの識別子。 ListResourceGroups 操作を呼び出して、ワークスペースで使用可能なリソースグループをクエリできます。
ConnectionName
String
いいえ
odps_first
ノードの実行に使用するデータソースの名前。 ListDataSources 操作を呼び出して、ワークスペースで使用可能なデータソースをクエリできます。
Owner
String
いいえ
18023848927592
ファイル所有者の ID。
AutoParsing
Boolean
いいえ
true
ファイルの自動解析機能を有効にするかどうかを指定します。有効な値:
true:ファイルの自動解析機能を有効にします。
false:ファイルの自動解析機能を無効にします。
このパラメーターは、DataWorksコンソールのプロパティタブの依存関係セクションで、同じサイクルが選択された後に表示されるコードの分析パラメーターに対応します。
SchedulerType
String
いいえ
NORMAL
ノードのスケジューリングタイプ。有効な値:
NORMAL:ノードは自動トリガーノードです。
MANUAL:ノードは手動トリガーノードです。手動トリガーノードは自動的にスケジュールできません。手動トリガーワークフローペインに移動して、手動トリガーノードを表示できます。
PAUSE:ノードは一時停止ノードです。
SKIP:ノードはドライランノードです。ドライランノードはスケジュールどおりに開始されますが、システムはノードの実行を開始するときにノードのステータスを成功に設定します。
AdvancedSettings
String
いいえ
{"queue":"default","SPARK_CONF":"--conf spark.driver.memory=2g"}
ノードの高度な構成。
このパラメーターは、EMR Spark Streaming ノードまたは EMR Streaming SQL ノードに対してのみ有効です。このパラメーターは、DataWorksコンソールのノードの詳細設定タブに対応します。
このパラメーターの値は JSON 形式である必要があります。
StartImmediately
Boolean
いいえ
true
ノードが本番環境にデプロイされた直後にノードを実行するかどうかを指定します。有効な値:
true:ノードが本番環境にデプロイされた直後にノードを実行します。
false:ノードが本番環境にデプロイされた直後にノードを実行しません。
このパラメーターは、EMR Spark Streaming ノードまたは EMR Streaming SQL ノードに対してのみ有効です。このパラメーターは、DataWorksコンソールの構成タブのスケジュールセクションにある開始方法パラメーターに対応します。
InputParameters
String
いいえ
[{"ValueSource": "project_001.first_node:bizdate_param","ParameterName": "bizdate_input"}]
ノードの入力パラメーター。このパラメーターの値は JSON 形式である必要があります。入力パラメーターの詳細については、GetFile 操作のレスポンスパラメーターセクションにある InputContextParameterList パラメーターを参照してください。
このパラメーターは、DataWorksコンソールのプロパティタブの入力および出力パラメーターセクションにある入力パラメーターテーブルに対応します。
OutputParameters
String
いいえ
[{"Type": 1,"Value": "${bizdate}","ParameterName": "bizdate_param"}]
ノードの出力パラメーター。このパラメーターの値は JSON 形式である必要があります。出力パラメーターの詳細については、GetFile 操作のレスポンスパラメーターセクションにある OutputContextParameterList パラメーターを参照してください。
このパラメーターは、DataWorksコンソールのプロパティタブの入力および出力パラメーターセクションにある出力パラメーターテーブルに対応します。
ノードをコミットします。
SubmitFile 操作を呼び出して、ノードをスケジューリングシステムの開発環境にコミットします。ノードをコミットすると、システムは関連するデプロイタスクの ID を含むレスポンスを返します。 GetDeployment 操作を呼び出して、ID に基づいてデプロイタスクの詳細を取得できます。
public void submitFile() throws ClientException{ SubmitFileRequest request = new SubmitFileRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); // ID は、ノードの作成時に返される FileId パラメーターの値です。 request.setFileId(501576542L); request.setComment("コメント"); SubmitFileResponse acsResponse = client.getAcsResponse(request); // ID は、ノードのコミットまたはデプロイ後に返されるデプロイタスク ID です。 Long deploymentId = acsResponse.getData(); log.info(acsResponse.toString()); }
上記のコードは、いくつかのパラメーターの設定例を示しています。詳細については、SubmitFile および GetDeployment をご参照ください。
本番環境にノードをデプロイします。
DeployFile 操作を呼び出して、ノードを本番環境にデプロイします。
説明ワークスペースが標準モードの場合は、この手順を実行する必要があります。
public void deploy() throws ClientException{ DeployFileRequest request = new DeployFileRequest(); request.setProjectIdentifier("zxy_8221431"); request.setFileId(501576542L); request.setComment("コメント"); // NodeId または FileId パラメーターを指定する必要があります。NodeId の値は、ノードの構成タブのプロパティタブの一般セクションにあるノード ID パラメーターの値です。 request.setNodeId(700004537241L); DeployFileResponse acsResponse = client.getAcsResponse(request); // ID は、ノードのコミットまたはデプロイ後に返されるデプロイタスク ID です。 Long deploymentId = acsResponse.getData(); log.info(acsResponse.getData().toString()); }
上記のコードは、いくつかのパラメーターの設定例を示しています。詳細については、DeployFile をご参照ください。
ノードのステータスをクエリします。
ノードをデプロイすると、システムは関連するデプロイタスクの ID を含むレスポンスを返します。 GetDeployment 操作を呼び出して、ID に基づいてデプロイタスクの詳細を取得できます。 GetDeployment 操作のレスポンスの Status パラメーターの値が 1 の場合、ノードはデプロイされています。
public void getDeployment() throws ClientException{ GetDeploymentRequest request = new GetDeploymentRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); // ID は、ノードのコミットまたはデプロイ後に返されるデプロイタスク ID です。GetDeployment 操作を呼び出して、関連するデプロイタスクの詳細を取得します。 request.setDeploymentId(2776067L); GetDeploymentResponse acsResponse = client.getAcsResponse(request); log.info(acsResponse.getData().toString()); }
上記のコードは、いくつかのパラメーターの設定例を示しています。詳細については、GetDeployment をご参照ください。
ノードの構成を変更する
Data Integration でデータ同期ノードを作成および構成した後、UpdateDISyncTask 操作を呼び出してノードの構成を変更できます。 TaskParam パラメーターを使用して、ノードが使用する排他的リソースグループを変更することもできます。データ同期ノードの構成を変更した後、ノードを再度コミットしてデプロイする必要があります。詳細については、概要をご参照ください。
サンプルコード
POM 依存関係
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.20</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dataworks-public</artifactId>
<version>3.4.1</version>
</dependency>
Java 用 SDK
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;
public class createofflineTask {
static Long createTask(String fileName) throws Exception {
Long projectId = 2043L;
String taskType = "DI_OFFLINE";
String taskContent = "{\n" +
" \"type\": \"job\",\n" +
" \"version\": \"2.0\",\n" +
" \"steps\": [\n" +
" {\n" +
" \"stepType\": \"mysql\",\n" +
" \"parameter\": {\n" +
" \"envType\": 0,\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"connection\": [\n" +
" {\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"table\": [\n" +
" \"user\"\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"where\": \"\",\n" +
" \"splitPk\": \"\",\n" +
" \"encoding\": \"UTF-8\"\n" +
" },\n" +
" \"name\": \"Reader\",\n" + // リーダーの名前を設定します。
" \"category\": \"reader\"\n" + // リーダーのカテゴリを設定します。
" },\n" +
" {\n" +
" \"stepType\": \"odps\",\n" +
" \"parameter\": {\n" +
" \"partition\": \"pt=${bizdate}\",\n" +
" \"truncate\": true,\n" +
" \"datasource\": \"odps_first\",\n" +
" \"envType\": 0,\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"emptyAsNull\": false,\n" +
" \"tableComment\": \"null\",\n" +
" \"table\": \"user\"\n" +
" },\n" +
" \"name\": \"Writer\",\n" + // ライターの名前を設定します。
" \"category\": \"writer\"\n" + // ライターのカテゴリを設定します。
" }\n" +
" ],\n" +
" \"setting\": {\n" +
" \"executeMode\": null,\n" +
" \"errorLimit\": {\n" +
" \"record\": \"\"\n" +
" },\n" +
" \"speed\": {\n" +
" \"concurrent\": 2,\n" +
" \"throttle\": false\n" +
" }\n" +
" },\n" +
" \"order\": {\n" +
" \"hops\": [\n" +
" {\n" +
" \"from\": \"Reader\",\n" +
" \"to\": \"Writer\"\n" +
" }\n" +
" ]\n" +
" }\n" +
"}";
CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
request.setProjectId(projectId);
request.setTaskType(taskType);
request.setTaskContent(taskContent);
request.setTaskName(fileName);
request.setTaskParam("{\"FileFolderPath\":\"ビジネスフロー/test/データ統合\",\"ResourceGroup\":\"S_res_group_XXX\"}"); // Data Integration 専用のリソースグループを使用します。
CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
return response1.getData().getFileId();
}
public static void updateFile(Long fileId) throws Exception {
UpdateFileRequest request = new UpdateFileRequest();
request.setProjectId(2043L);
request.setFileId(fileId);
request.setAutoRerunTimes(3);
request.setRerunMode("FAILURE_ALLOWED");
request.setCronExpress("00 30 05 * * ?");
request.setCycleType("DAY");
request.setResourceGroupIdentifier("S_res_group_XXX"); // スケジューリング専用のリソースグループを使用します。
request.setInputList("dataworks_di_autotest_root");
request.setAutoParsing(true);
request.setDependentNodeIdList("5,10,15,20");
request.setDependentType("SELF");
request.setStartEffectDate(0L);
request.setEndEffectDate(4155787800000L);
request.setFileDescription("説明");
request.setStop(false);
request.setParaValue("x=a y=b z=c");
request.setSchedulerType("NORMAL");
request.setAutoRerunIntervalMillis(120000);
UpdateFileResponse response1 = client.getAcsResponse(request);
}
static IAcsClient client;
public static void main(String[] args) throws Exception {
String akId = "XX";
// 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET が設定されていることを確認してください。https://www.alibabacloud.com/help/en/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");
client = new DefaultAcsClient(profile);
String taskName = "offline_job_0930_1648";
Long fileId = createTask(taskName); // Data Integration でデータ同期ノードを作成します。
updateFile(fileId); // ノードのスケジューリングプロパティを構成します。
}
}