すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:SDK for Java

最終更新日:Jul 22, 2024

EAS SDKを使用すると、シンプルで安定した方法でモデルサービスを呼び出すことができます。 このトピックでは、EAS SDK for Javaのメソッドについて説明し、文字列入力と出力、テンソル入力と出力、キューサービス、リクエストデータ圧縮などの一般的なユースケースのサンプルコードを提供します。

依存関係の追加

EAS SDK for Javaをプロジェクトに統合するには、pom.xmlファイルにeas-sdk依存関係を追加します。 SDKの最新バージョンについては、Mavenリポジトリをご参照ください。 サンプルコード:

<dependency>
  <groupId>com.aliyun.openservices.eas</groupId>
  <artifactId>eas-sdk</artifactId>
  <version>2.0.14</version>
</dependency> 

バージョン2.0.5以降、SDKは非同期リクエストの優先度を管理するキューサービスをサポートしています。 互換性の問題なしにキューサービスを使用するには、次の依存関係の必要なバージョンを追加します。

<dependency>
    <groupId>org.java-websocket</groupId>
    <artifactId>Java-WebSocket</artifactId>
    <version>1.5.1</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.1</version>
</dependency> 

変更方法

クラス

移動方法

説明

PredictClient

PredictClient(HttpConfig httpConfig)

  • 説明: PredictClientインスタンスを作成します。

  • パラメーター: httpConfigは、PredictClientインスタンスの構築に使用するHttpConfigインスタンスを指定します。

void setToken(String token)

  • 説明: HTTPリクエストで使用されるトークンを設定します。

  • パラメーター: tokenは、サービス認証に使用されるトークンを指定します。

void setModelName(String modelName)

  • 説明: オンライン予測に使用されるモデルの名前を設定します。

  • パラメーター: modelNameは、モデルの名前を指定します。

void setEndpoint (文字列エンドポイント)

  • 説明: サービスのエンドポイントを "host:port" 形式で設定します。

  • パラメーター: endpointは、リクエストを受信するエンドポイントを指定します。

void setDirectEndpoint (文字列エンドポイント)

  • 説明: 仮想プライベートクラウド (VPC) 直接接続チャネルを介してサービスにアクセスするために使用されるエンドポイントを設定します。 エンドポイントの例: pai-eas-vpc.cn-shanghai.aliyuncs.com

  • パラメーター: endpointは、VPCダイレクト接続のサービスのエンドポイントを指定します。

void setRetryCount(boolean int retryCount)

  • 説明: リクエストが失敗した場合の再試行の最大数を設定します。

  • パラメーター: retryCountは、リクエストが失敗した場合の最大再試行回数を指定します。

void setRetryConditions(EnumSet retryConditions)

  • 説明: リクエストが失敗した場合のリトライ条件を設定します。 このメソッドは、setRetryCountメソッドと一緒に使用できます。 デフォルトでは、失敗した場合、すべてのリクエストが再試行されます。 この方法を使用して、特定のリクエストエラーに対してのみ再試行条件を設定できます。

  • パラメーター: retryConditionsは、EnumSetタイプの1つ以上の再試行条件を指定します。 次の再試行条件がサポートされています。

    • RetryCondition.CONNECTION_FAILED: 接続要求が失敗しました。

    • RetryCondition.CONNECTION_TIMEOUT: 接続要求がタイムアウトしました。

    • RetryCondition.READ_TIMEOUT: リクエストのレスポンスがタイムアウトしました。

    • RetryCondition.RESPONSE_5XX: ステータスコード5xxが返されます。

    • RetryCondition.RESPONSE_4XX: ステータスコード4xxが返されます。

  • サンプルコード:

    client.setRetryConditions(
        EnumSet.of(
            RetryCondition.READ_TIMEOUT,    // Retry if a read timeout occurs.
            RetryCondition.RESPONSE_5XX // Retry if the status code 5xx is returned.
        )
    );

    上記のサンプルコードは、リクエストがタイムアウトするか、ステータスコード5xxが返された場合にのみ、リクエストが再試行されることを示しています。

void setContentType (文字列contentType)

  • 説明: HTTPクライアントのデータストリームタイプを設定します。 デフォルトでは、データストリームタイプは "application/octet-stream" に設定されています。

  • パラメーター: contentTypeは、送信するデータストリームのタイプを指定します。

void setUrl (文字列url)

説明: リクエストのURLをカスタマイズします。

void setCompressor (コンプレッサー)

  • 説明: リクエストデータの圧縮方法を設定します。

  • パラメーター: compressorは圧縮方法を指定します。 Compressor.GzipおよびCompressor.Zlibの圧縮方法がサポートされています。

  • 例: 詳細については、「リクエストデータの圧縮」をご参照ください。

void addExtraHeaders(Map<String, String> extraHeaders)

  • 説明: カスタムHTTPヘッダーを追加します。

  • パラメータ: Map<String, String> は、Map<String, String> 型のHTTPヘッダーを指定します。

void createChildClient(String token, String endpoint, String modelName)

  • 説明: 親クライアントのスレッドプールを使用する子クライアントを作成します。 マルチスレッド予測を実行するには、このメソッドを呼び出します。

  • パラメーター:

    • token: サービス認証に使用されるトークン。

    • endpoint: サービスのエンドポイント。

    • modelName: サービスの名前。

void predict(TFRequest runRequest)

  • 説明: TensorFlowリクエストをサービスに送信します。

  • パラメーター: runRequestは、TensorFlowリクエストインスタンスを指定します。

void predict (文字列requestContent)

  • 説明: リクエストの内容を文字列としてフォーマットして、サービスにリクエストを送信します。

  • パラメーター: requestContentは、リクエストコンテンツを含む文字列を指定します。

void predict(byte[] requestContent)

  • 説明: リクエストの内容をバイト配列としてフォーマットして、サービスにリクエストを送信します。

  • パラメーター: requestContentは、リクエストコンテンツを含むバイト配列を指定します。

HttpConfig

void setIoThreadNum(int ioThreadNum)

  • 説明: HTTPリクエストの送信に使用されるI/Oスレッドの数を設定します。 デフォルトでは、2つのI/Oスレッドが使用されます。

  • パラメーター: ioThreadNumは、HTTPリクエストの送信に使用されるI/Oスレッドの数を指定します。

void setReadTimeout(int readTimeout)

  • 説明: リクエストが送信された後、レスポンスを待つタイムアウト時間を設定します。 デフォルト値: 5000。5秒を示します。

  • パラメーター: readTimeoutは、リクエストコンテンツを読み取るためのタイムアウト期間を指定します。

void setConnectTimeout(int connectTimeout)

  • 説明: リクエストの接続タイムアウト期間を設定します。 デフォルト値: 5000。5秒を示します。

  • パラメーター: connectTimeoutは、リクエストの接続タイムアウト期間を指定します。

void setMaxConnectionCount(int maxConnectionCount)

  • 説明: 接続の最大数を設定します。 デフォルト値は 1000 です。

  • パラメーター: maxConnectionCountは、PredictClientインスタンスの接続プールで許可される接続の最大数を指定します。

void setMaxConnectionPerRoute(int maxConnectionPerRoute)

  • 説明: 各ルートのデフォルト接続の最大数を設定します。 デフォルト値は 1000 です。

  • パラメーター: maxConnectionPerRouteは、各ルートのデフォルト接続の最大数を指定します。

void setKeepAlive (ブールkeepAlive)

  • 説明: HTTP接続に対してkeep-aliveメカニズムを有効にするかどうかを指定します。

  • パラメーター: keepAliveは、HTTP接続に対してkeep-aliveメカニズムを有効にするかどうかを指定します。 デフォルト値: true

int getErrorCode()

最後の呼び出しのステータスコードを返します。

文字列getErrorMessage()

最後の呼び出しのエラーメッセージを返します。

TFRequest

void setSignatureName (文字列値)

  • 説明: サービスで使用され、SavedModel形式で保存されるTensorFlowモデルのSignatureDefを設定します。

  • Parameter: valueは、TensorFlowモデルのSignatureDefの名前を指定します。

void addFetch (文字列値)

  • 説明: サービスで使用されるTensorFlowモデルの出力テンソルのエイリアスを設定します。

  • パラメーター: valueは、出力テンソルのエイリアスを指定します。

void addFeed(String inputName, TFDataType dataType, long[]shape, ? [] コンテンツ)

  • 説明: サービスで使用されるTensorFlowモデルの入力テンソルを設定します。

  • パラメーター:

    • inputName: 入力テンソルのエイリアス。

    • dataType: 入力テンソルのデータ型。

    • shape: 入力テンソルの形状。

    • content: 入力テンソルの内容。 このパラメーターは1次元配列形式で指定します。

      dataTypeパラメーターをDT_FLOAT、DT_COMPLEX64、DT_BFLOAT16、またはDT_HALFに設定した場合、contentパラメーターはFLOAT型要素の1次元配列である必要があります。 dataTypeパラメーターをDT_COMPLEX64に設定した場合、配列内の隣接する2つの要素はすべて、複素数の実数部と虚数部を表します。

      dataTypeパラメーターをDT_DOUBLEまたはDT_COMPLEX128に設定した場合、contentパラメーターはDOUBLE型要素の1次元配列である必要があります。 dataTypeパラメーターをDT_COMPLEX128に設定した場合、配列内の隣接する2つの要素はすべて、複素数の実数部と虚数部を表します。

      dataTypeパラメーターをDT_INT32、DT_UINT8、DT_INT16、DT_INT8、DT_QINT8、DT_QUINT32、DT_QINT16、DT_QUINT16、またはDT_UINT16に設定した場合、contentパラメーターはINT型要素の1次元配列である必要があります。

      dataTypeパラメーターをDT_INT64に設定した場合、contentパラメーターはLONG型要素の1次元配列である必要があります。

      dataTypeパラメーターをDT_STRINGに設定した場合、contentパラメーターはSTRING型要素の1次元配列である必要があります。

      dataTypeパラメーターをDT_BOOLに設定した場合、contentパラメーターはBOOLEAN型要素の1次元配列である必要があります。

TFResponse

List <Long> getTensorShape(String outputName)

  • 説明: 出力テンソルのエイリアスを使用して、出力テンソルの形状を照会します。

  • パラメーター: outputNameは、形状を照会する出力テンソルのエイリアスを指定します。

  • 戻り値: 出力テンソルの形状を表す1次元配列。

List <Float> getFloatVals(String outputName)

  • 説明: データ型がDT_FLOAT、DT_COMPLEX64、DT_BFLOAT16、またはDT_HALFである出力テンソルの内容を抽出します。

  • パラメーター: outputNameは、コンテンツを抽出する出力テンソルのエイリアスを指定します。

  • 戻り値: 出力テンソルの内容を指定する1次元配列。

List <Double> getDoubleVals(String outputName)

  • 説明: データ型がDT_DOUBLEまたはDT_COMPLEX128である出力テンソルの内容を抽出します。

  • パラメーター: outputNameは、コンテンツを抽出する出力テンソルのエイリアスを指定します。

  • 戻り値: 出力テンソルの内容を指定する1次元配列。

List <Integer> getIntVals(String outputName)

  • 説明: データ型がDT_INT32、DT_UINT8、DT_INT16、DT_INT8、DT_QINT8、DT_QUINT8、DT_QINT32、DT_QINT16、DT_QUINT16、またはDT_UINT16である出力テンソルの内容を抽出します。

  • パラメーター: outputNameは、コンテンツを抽出する出力テンソルのエイリアスを指定します。

  • 戻り値: 出力テンソルの内容を指定する1次元配列。

List <String> getStringVals(String outputName)

  • 説明: データ型がDT_STRINGである出力テンソルの内容を抽出します。

  • パラメーター: outputNameは、コンテンツを抽出する出力テンソルのエイリアスを指定します。

  • 戻り値: 出力テンソルの内容を指定する1次元配列。

List <Long> getInt64Vals(String outputName)

  • 説明: データ型がDT_INT64である出力テンソルの内容を抽出します。

  • パラメーター: outputNameは、コンテンツを抽出する出力テンソルのエイリアスを指定します。

  • 戻り値: 出力テンソルの内容を指定する1次元配列。

List <Boolean> getBoolVals(String outputName)

  • 説明: データ型がDT_BOOLである出力テンソルの内容を抽出します。

  • パラメーター: outputNameは、コンテンツを抽出する出力テンソルのエイリアスを指定します。

  • 戻り値: 出力テンソルの内容を指定する1次元配列。

QueueClient

QueueClient(String endpoint, String queueName, String token, HttpConfig httpConfig, QueueUser user)

  • 説明: QueueClientインスタンスを作成します。

  • パラメーター:

    • endpoint: サービスのエンドポイント。

    • queueName: サービスの名前。

    • token: サービスのトークン。

    • httpConfig: サービスリクエストの設定。

    • user: UserIdパラメーターとGroupNameパラメーターが含まれます。 デフォルトでは、UserIdの値は乱数で、GroupNameの値はeasです。

JSONObject attributes()

  • 説明: キューサービスの詳細を取得します。

  • 戻り値: キューサービスに関する次の情報を含むJSONObjectインスタンス。

    • meta.maxPayloadBytes: キュー内の各データレコードの最大サイズ。

    • meta.name: キューの名前。

    • stream.approxMaxLength: キューに格納できるデータレコードの最大数。

    • stream.firstEntry: キュー内の最初のデータレコードのインデックス。

    • stream.lastEntry: キュー内の最後のデータレコードのインデックス。

    • stream.length: キューに格納されているデータレコードの数。

Pair<Long, String> put(byte[] data, long priority, Map<String, String> tags)

  • 説明: キューサービスにデータを書き込みます。

  • パラメーター:

    • data: Byte[] 型のデータレコード。

    • priority: データレコードの優先度。 デフォルト値は0で、データレコードの優先度が低いことを指定します。 1の値は、高い優先度を指定する。

    • tags: カスタムパラメーター。

  • 戻り値: 2つの要素の順序付きペア。 最初の要素はLong型で、データレコードのインデックスを示します。 2番目の要素はstring型で、リクエストIDを示します。

DataFrame[] get(long index, long length, long timeout, boolean autoDelete, Map<String, String> tags)

  • 説明: キューサービスからデータレコードを取得します。

  • パラメーター:

    • index: 取得の開始インデックス。 値-1は、最新のデータレコードから検索を開始することを指定します。

    • length: 取得するデータレコードの量。

    • timeout: 取得のタイムアウト期間。 単位は秒です。

    • autoDelete: データレコードが取得された後にキューから自動的に削除するかどうかを指定します。

    • tags: カスタムパラメーター。 たとえば、このパラメーターをリクエストIDに設定できます。

  • 戻り値: DataFrameの配列。

void truncate(Long index)

  • 説明: キュー内のインデックスよりも小さいインデックスを持つデータレコードを削除します。

String delete(Long index)

  • 説明: データレコードのインデックスを指定して、キュー内のデータレコードを削除します。

  • パラメーター: indexは、削除するデータレコードのインデックスを指定します。

  • 戻り値: データレコードが削除された場合はOKが返されます。

JSONObject検索 (ロングインデックス)

  • 説明: データレコードのキュー情報を照会します。

  • パラメーター: indexは、クエリするデータレコードのインデックスを指定します。

  • 戻り値: JSONObject型のデータレコードのキュー情報。 情報には、次のフィールドが含まれます。

    • ConsumerId: データレコードを処理するインスタンスのID。

    • IsPending: データレコードが処理されているかどうかを示します。

      • Trueは、データレコードが処理中であることを示します。

      • Falseは、データレコードがキューにあり、処理を待っていることを示します。

    • WaitCount: データレコードの前のデータレコードの数を示します。 このパラメーターは、IsPendingがFalseに設定されている場合にのみ有効です。 IsPendingがTrueに設定されている場合、このパラメーターの値は0です。

    レスポンス例:

    • {'ConsumerId': 'eas.**** ', 'IsPending': False, 'WaitCount':2} が返された場合、データレコードはキューにあり、処理されるのを待っています。

    • ログにストリームにデータが表示されず{} が返された場合、データレコードはキューに見つかりません。 これは、データレコードが処理され、結果がクライアントに返されたか、インデックスパラメータが誤って構成されているためです。

WebSocketWatcher watch(long index, long window, boolean indexOnly, boolean autoCommit, Map<String, String> tags)

  • 説明: 出力キュー内のデータをサブスクライブします。

  • パラメーター:

    • index: データが取得される開始インデックス。 値が-1の場合、最新のデータのみが取得されます。

    • window: 送信ウィンドウのサイズ。コミットされていないデータの最大長です。 コミットされていないデータの長さがこのパラメーターの値を超えると、キューサービスは送信を停止します。

    • indexOnly: 帯域幅を節約するためにインデックスとタグのパラメーターのみを返すかどうかを指定します。

    • autoCommit: コミット操作の呼び出しを回避するために、データの送信後にデータを自動的にコミットするかどうかを指定します。 autoCommittrue設定した場合、windowパラメーターは無効です。

    • tags: カスタムパラメーター。

  • 戻り値: サブスクライブされたデータを取得するために使用されるWebSocketWatcherインスタンス。 詳細については、このトピックの「キューサービスの使用」を参照してください。

String commit(Long index) またはString commit(Long[] index)

  • 説明: データが消費されたことを確認し、キュー内のデータを削除します。

  • 戻り値: 操作が成功したことを示します。

void end (ブール力)

説明: キューサービスを停止します。

DataFrame

byte[] getData()

  • 説明: データレコードの値を取得します。

  • 戻り値: Byte[] 型の値。

long getIndex()

  • 説明: データレコードのインデックスを取得します。

  • 戻り値: Long型のインデックス。

Map<String, String> getTags()

  • 説明: データレコードのタグを取得します。

  • 戻り値: Map<String,String> 型のタグ。リクエストIDの取得に使用できます。 例: df.getTags().get("requestId")

デモ

文字列入力と出力の使用

PMML (Predictive model Markup Language) モデルなどのモデルをデプロイするためにカスタムプロセッサを使用する場合、リクエストコンテンツは文字列としてフォーマットされることがよくあります。 サンプルコード:

import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;

public class TestString {
    public static void main(String[] args) throws Exception {
        // Start and initialize a client. A PredictClient instance is shared by multiple requests. Do not create a PredictClient instance for each request. 
        PredictClient client = new PredictClient(new HttpConfig());
        client.setToken("YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****");
        // To use the VPC direct connection feature, call the setDirectEndpoint method.
        // Example: client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
        // You must enable the VPC direct connection feature and configure a vSwitch in the PAI console. After you enable the feature, you can call the service without the need to passing a gateway, which improves stability and performance. 
        // Note: To call a service by using a gateway, use the endpoint that starts with your user ID. To obtain the endpoint, find the service that you want to call on the EAS-Online Model Services page and click Invocation Method in the Service Type column. In the dialog box that appears, you can view the endpoint. To call a service by using the VPC direct connection feature, use the endpoint in the pai-eas-vpc.{region_id}.aliyuncs.com format. 
        client.setEndpoint("182848887922****.vpc.cn-shanghai.pai-eas.aliyuncs.com");
        client.setModelName("scorecard_pmml_example");

        // Define the input string.
        String request = "[{\"money_credit\": 3000000}, {\"money_credit\": 10000}]";
        System.out.println(request);

        // EAS returns a string.
        try {
            String response = client.predict(request);
            System.out.println(response);
        } catch (Exception e) {
            e.printStackTrace();
        }

        // Close the client.
        client.shutdown();
        return;
    }
}

上記のサンプルコードは、次の手順を実行します。

  1. PredictClientメソッドを呼び出して、サービスのクライアントを作成します。 複数のサービスが関係する場合は、複数のクライアントを作成します。

  2. クライアントのトークン、エンドポイント、モデル名のパラメーターを設定します。

  3. 入力としてSTRING型のrequest変数を作成し、client.predictメソッドを呼び出してHTTPリクエストを送信します。 サービスはresponseパラメーターを返します。

TensorFlow入出力の使用

サービスがTensorFlowモデルを使用している場合、入力はTFRequest形式を使用し、出力はTFResponse形式を使用する必要があります。 サンプルコード:

import java.util.List;

import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.request.TFDataType;
import com.aliyun.openservices.eas.predict.request.TFRequest;
import com.aliyun.openservices.eas.predict.response.TFResponse;

public class TestTF {
    public static TFRequest buildPredictRequest() {
        TFRequest request = new TFRequest();
        request.setSignatureName("predict_images");
        float[] content = new float[784];
        for (int i = 0; i < content.length; i++) {
            content[i] = (float) 0.0;
        }
        request.addFeed("images", TFDataType.DT_FLOAT, new long[]{1, 784}, content);
        request.addFetch("scores");
        return request;
    }

    public static void main(String[] args) throws Exception {
        PredictClient client = new PredictClient(new HttpConfig());

        // To use the VPC direct connection feature, call the setDirectEndpoint method. 
        // Example: client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
        // You must enable the VPC direct connection feature and configure a vSwitch in the PAI console. After you enable the feature, you can call the service without the need to passing a gateway, which improves stability and performance. 
        // Note: To call a service by using a gateway, use the endpoint that starts with your user ID. To obtain the endpoint, find the service that you want to call on the EAS-Online Model Services page and click Invocation Method in the Service Type column. In the dialog box that appears, you can view the endpoint. To call a service by using the VPC direct connection feature, use the endpoint in the pai-eas-vpc.{region_id}.aliyuncs.com format. 
        client.setEndpoint("182848887922****.vpc.cn-shanghai.pai-eas.aliyuncs.com");
        client.setModelName("mnist_saved_model_example");
        client.setToken("YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****");
        long startTime = System.currentTimeMillis();
        int count = 1000;
        for (int i = 0; i < count; i++) {
            try {
                TFResponse response = client.predict(buildPredictRequest());
                List<Float> result = response.getFloatVals("scores");
                System.out.print("Predict Result: [");
                for (int j = 0; j < result.size(); j++) {
                    System.out.print(result.get(j).floatValue());
                    if (j != result.size() - 1) {
                        System.out.print(", ");
                    }
                }
                System.out.print("]\n");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Spend Time: " + (endTime - startTime) + "ms");
        client.shutdown();
    }
}

上記のサンプルコードは、次の手順を実行します。

  1. PredictClientメソッドを呼び出して、サービスのクライアントを作成します。 複数のサービスが関係する場合は、複数のクライアントを作成します。

  2. クライアントのトークン、エンドポイント、モデル名のパラメーターを設定します。

  3. TFRequestクラスを使用して入力をカプセル化し、TFResponseクラスを使用して出力をカプセル化します。

キューサービスの使用

キューサービスを実装するには、QueueClientクラスを使用します。 サンプルコード:

import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.http.QueueClient;
import com.aliyun.openservices.eas.predict.queue_client.QueueUser;
import com.aliyun.openservices.eas.predict.queue_client.WebSocketWatcher;

public class DemoWatch {
    public static void main(String[] args) throws Exception {
        /** Create a client for the queue service. */
        String queueEndpoint = "18*******.cn-hangzhou.pai-eas.aliyuncs.com";
        String inputQueueName = "test_queue_service";
        String sinkQueueName = "test_queue_service/sink";
        String queueToken = "test-token";

        /** Create the input queue. After you add data to the input queue, the inference service automatically reads the request data from the input queue. */
        QueueClient inputQueue =
            new QueueClient(queueEndpoint, inputQueueName, queueToken, new HttpConfig(), new QueueUser());
        /** Create the output queue. After the inference service processes the input data, the result is written to the output queue. */
        QueueClient sinkQueue =
            new QueueClient(queueEndpoint, sinkQueueName, queueToken, new HttpConfig(), new QueueUser());
        /** Clear data in the queue. Use with caution. */
        inputQueue.clear();
        sinkQueue.clear();

        /** Add data to the input queue. */
        int count = 10;
        for (int i = 0; i < count; ++i) {
            String data = Integer.toString(i);
            inputQueue.put(data.getBytes(), null);
            /** The queue service supports multi-priority queues. You can call the put method to set the data priority. The default priority is 0. */
            //  inputQueue.put(data.getBytes(), 0L, null);
        }

        /** Call the watch method to subscribe to the data of the output queue. The window size is 5. */
        WebSocketWatcher watcher = inputQueue.watch(0L, 5L, false, true, null);
        /** You can configure the WatchConfig parameter to specify the number of retries, the retry interval (in seconds), and whether to retry indefinitely. If you do not configure the WatchConfig parameter, the default number of retries is 3 and the default retry interval is 5. */
        //  WebSocketWatcher watcher = sink_queue.watch(0L, 5L, false, true, null, new WatchConfig(3, 1));
        //  WebSocketWatcher watcher = sink_queue.watch(0L, 5L, false, true, null, new WatchConfig(true, 10));

        /** Obtain output data. */
        for (int i = 0; i < count; ++i) {
            try {
                /** Call the getDataFrame method to obtain data of the DataFrame type. If no data is available, the method blocks until data is available. */
                byte[] data = watcher.getDataFrame().getData();
                System.out.println("[watch] data = " + new String(data));
            } catch (RuntimeException ex) {
                System.out.println("[watch] error = " + ex.getMessage());
                break;
            }
        }
        /** Close the watcher. Each client can have only one watcher. If you do not close a watcher, an error is reported when you create another client for the queue service. */
        watcher.close();

        Thread.sleep(2000);
        JSONObject attrs = sinkQueue.attributes();
        System.out.println(attrs.toString());

        /** Close the client. */
        inputQueue.shutdown();
        sinkQueue.shutdown();
    }
}

上記のサンプルコードは、次の手順を実行します。

  1. QueueClientメソッドを呼び出して、キューサービス用のクライアントを作成します。 推論サービスに必要な入力キューと出力キューを必ず作成してください。

  2. put() メソッドを呼び出して入力キューにデータを送信し、watch() メソッドを呼び出して出力キューのデータをサブスクライブします。

    説明

    説明の便宜上、この例ではデータを送信し、同じスレッドでデータをサブスクライブします。 実際の実装では、さまざまなスレッドでデータを送信したり、データをサブスクライブしたりできます。

要求データの圧縮

リクエストデータのサイズが大きい場合、EAS SDKを使用すると、データをサーバーに送信する前にZlibまたはGzip形式でデータを圧縮できます。 データ圧縮機能を使用するには、サービスのデプロイ時にrpc.de compressorパラメーターを設定します。

サービス展開のサンプル構成:

"metadata": {
  "rpc": {
    "decompressor": "zlib"
  }
}

圧縮データを送信するためのサンプルコード:

package com.aliyun.openservices.eas.predict;
import com.aliyun.openservices.eas.predict.http.Compressor;
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
public class TestString {
    public static void main(String[] args) throws Exception{
    	  // Start and initialize a client. 
        PredictClient client = new PredictClient(new HttpConfig());
        client.setEndpoint("18*******.cn-hangzhou.pai-eas.aliyuncs.com");
        client.setModelName("echo_compress");
        client.setToken("YzZjZjQwN2E4NGRkMDMxNDk5NzhhZDcwZDBjOTZjOGYwZDYxZGM2****");
        // You can also set the compressor to Compressor.Gzip. 
        client.setCompressor(Compressor.Zlib);  
        // Define the input string. 
        String request = "[{\"money_credit\": 3000000}, {\"money_credit\": 10000}]";
        System.out.println(request);
        // EAS returns a string. 
        String response = client.predict(request);
        System.out.println(response);
        // Close the client. 
        client.shutdown();
        return;
    }
}

関連ドキュメント