すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:Python 用 SDK

最終更新日:Jul 22, 2024

公式のElastic Algorithm Service (EAS) SDKは、モデルに基づいてデプロイされたコールサービスに提供されます。 EAS SDKは、コールロジックの定義に必要な時間を短縮し、コールの安定性を向上させます。 このトピックでは、Python用EAS SDKについて説明します。 EAS SDK for Pythonを使用してサービスを呼び出す方法を示すデモが提供されています。 これらのデモでは、入力と出力は一般的に使用されるタイプです。

SDK のインストール

pip install -U eas-prediction -- user

変更方法

クラス

移動方法

説明

PredictClient

PredictClient(endpoint, service_name, custom_url)

  • PredictClientクラスのクライアントオブジェクトを作成します。

  • パラメーター:

    • endpoint: サーバーのエンドポイント。

      通常モードでサービスを呼び出すには、このパラメーターをデフォルトゲートウェイのエンドポイントに設定します。 例: 182848887922 *** .cn-shanghai.pai-eas.aliyuncs.com

      Virtual Private Cloud (VPC) 直接接続を使用する場合は、このパラメーターを現在のリージョンの共通エンドポイントに設定します。 たとえば、現在のリージョンが中国 (上海) の場合、このパラメーターをpai-eas-vpc.cn-shanghai.aliyuncs.comに設定します。

    • service_name: サービスの名前。

    • custom_url: サービスのURL。 このパラメーターはオプションで、web UIサービスなど、エンドポイントが <uid>.<regio n>.pai-eas.aliyuncs.comの形式ではないサービスにのみ必要です。 このパラメーターを設定してクライアントを作成できます。 例: client = PredictClient(custom_url='<url>')

set_endpoint(endpoint)

  • 説明: サーバーのエンドポイントを指定します。

  • パラメーター: endpoint: サーバーのエンドポイント。

    通常モードでサービスを呼び出すには、このパラメーターをデフォルトゲートウェイのエンドポイントに設定します。 例: 182848887922 *** .cn-shanghai.pai-eas.aliyuncs.com

    Virtual Private Cloud (VPC) 直接接続を使用する場合は、このパラメーターを現在のリージョンの共通エンドポイントに設定します。 たとえば、現在のリージョンが中国 (上海) の場合、このパラメーターをpai-eas-vpc.cn-shanghai.aliyuncs.comに設定します。

set_service_name(service_name)

  • 説明: サービスの名前を指定します。

  • パラメーター: service_name: サービスの名前。

set_endpoint_type(endpoint_type)

  • 説明: サーバーのゲートウェイタイプを指定します。

  • パラメーター: endpoint_type: 使用するゲートウェイの種類。 次のゲートウェイタイプがサポートされています。

    • ENDPOINT_TYPE_GATEWAY: デフォルトゲートウェイ。

    • ENDPOINT_TYPE_DIRECT: VPCダイレクト接続チャネル。 このパラメーターを設定しない場合、デフォルトのゲートウェイがサービスへのアクセスに使用されます。

set_token(token)

  • 説明: サービスアクセストークンを指定します。

  • パラメーター: token: サービスアクセスのトークン。

set_retry_count(max_retry_count)

  • 説明: リクエストの失敗後に許可される再試行の最大数を設定します。

  • パラメーター: max_retry_count: リクエストの失敗後に許可される再試行の最大数。 既定値:5

    重要

    サーバーでプロセスエラーが発生した場合、サーバーエラーが発生した場合、またはゲートウェイへの永続的な接続が閉じられた場合、クライアントは要求を再送信する必要があります。 したがって、このパラメーターを0に設定しないことを推奨します。

set_max_connection_count(max_connection_count)

  • 説明: クライアントの接続プールで許可される永続接続の最大数を設定します。 パフォーマンスを向上させるために、クライアントはサーバーへの永続的接続を確立し、永続的接続を接続プールに格納します。 リクエストを開始するたびに、クライアントは接続プールのアイドル接続を使用して必要なサービスにアクセスします。

  • パラメーター: max_connection_count: 接続プールで許可される永続的な接続の最大数。 デフォルト値:100

set_timeout (タイムアウト)

  • 説明: リクエストのタイムアウト期間を設定します。

  • パラメーター: timeout: リクエストのタイムアウト期間。 デフォルト値: 5000。 単位:ミリ秒。

init()

説明: クライアントオブジェクトを初期化します。 パラメーターの設定に使用される前述のすべてのメソッドが呼び出された後、パラメーターはInit() メソッドを呼び出した後にのみ有効になります。

predict (リクエスト)

  • 説明: 予測要求をオンライン予測サービスに送信します。

  • パラメーター: request: 抽象クラス。文字列を使用したリクエストやTensorFlowリクエストなど、さまざまなタイプのリクエストを使用できます。

  • 戻り値: 予測要求に対する応答。

StringRequest

StringRequest(request_data)

  • 説明: StringRequestクラスのオブジェクトを作成します。

  • パラメーター: request_data: 送信するリクエスト文字列。

StringResponse

to_string()

  • 説明: StringResponseクラスのレスポンスを文字列に変換します。

  • 戻り値: リクエストのレスポンスボディ。

TFRequest

TFRequest(signature_name)

  • 説明: TFRequestクラスのオブジェクトを作成します。

  • パラメーター: signature_name: 呼び出されるサービスのモデルの署名名。

add_feed(self, input_name, shape, data_type, content)

  • 説明: 呼び出すオンライン予測サービスのTensorFlowモデルの入力テンソルを指定します。

  • パラメーター:

    • input_name: 入力テンソルのエイリアス。

    • shape: 入力テンソルの形状。

    • data_type: 入力テンソルのデータ型。 次のデータ型がサポートされています。

      • TFRequest.DT_FLOAT

      • TFRequest.DT_DOUBLE

      • TFRequest.DT_INT8

      • TFRequest.DT_INT16

      • TFRequest.DT_INT32

      • TFRequest.DT_INT64

      • TFRequest.DT_STRING

      • TFRequest.TF_BOOL

    • content: 入力テンソルのデータ。 値を1次元配列の形式で指定します。

add_fetch(self, output_name)

  • 説明: TensorFlowモデルからエクスポートする出力テンソルのエイリアスを指定します。

  • パラメーター: output_name: エクスポートする出力テンソルのエイリアス。

    TensorFlowモデルがSavedModel形式の場合、このパラメーターはオプションです。 このパラメーターを指定しない場合、すべての出力テンソルがエクスポートされます。

    TensorFlowモデルが凍結モデルの場合、このパラメーターは必須です。

to_string()

  • 説明: プロトコルバッファ (PB) オブジェクトを文字列にシリアル化します。 PBオブジェクトは、TFRequestクラスを使用して作成され、要求を送信するために使用されます。

  • 戻り値: TFRequestベースのシリアル化が完了した後に取得される文字列。

TFResponse

get_tensor_shape(output_name)

  • 説明: 指定されたエイリアスで識別される出力テンソルの形状を照会します。

  • パラメーター: output_name: クエリするシェイプの出力テンソルのエイリアス。

  • 戻り値: 出力テンソルの形状。

get_values(output_name)

  • 説明: 指定された出力テンソルのデータを照会します。

  • パラメーター: output_name: データを照会する出力テンソルのエイリアス。

  • 戻り値: 1次元配列。 このメソッドをget_tensor_shape() メソッドと一緒に呼び出して、出力テンソルの形状を照会できます。 戻り値は多次元配列です。 出力テンソルのデータ型によって、返される1次元配列のデータ型が決まります。

TorchRequest

TorchRequest()

説明: TorchRequestクラスのオブジェクトを作成します。

add_feed(self、index、shape、data_type、content)

  • 説明: 呼び出されるオンライン予測サービスのPyTorchモデルの入力テンソルを指定します。

  • パラメーター:

    • index: 入力テンソルのインデックス。

    • shape: 入力テンソルの形状。

    • data_type: 入力テンソルのデータ型。 次のデータ型がサポートされています。

      • TFRequest.DT_FLOAT

      • TFRequest.DT_DOUBLE

      • TFRequest.DT_INT8

      • TFRequest.DT_INT16

      • TFRequest.DT_INT32

      • TFRequest.DT_INT64

      • TFRequest.DT_STRING

      • TFRequest.TF_BOOL

    • content: 入力テンソルのデータ。 値を1次元配列の形式で指定します。

add_fetch(self, output_index)

  • 説明: PyTorchモデルからエクスポートする出力テンソルのインデックスを指定します。 このメソッドはオプションです。 このメソッドを呼び出して出力テンソルのインデックスを設定しない場合、すべての出力テンソルがエクスポートされます。

  • パラメーター: output_index: エクスポートする出力テンソルのインデックス。

to_string()

  • 説明: PBオブジェクトを文字列にシリアル化します。 PBオブジェクトは、TorchRequestクラスを使用して作成され、要求を送信するために使用されます。

  • 戻り値: TorchRequestベースのシリアル化が完了した後に取得される文字列。

TorchResponse

get_tensor_shape(output_index)

  • 説明: 指定されたインデックスで識別される出力テンソルの形状を照会します。

  • パラメーター: output_index: クエリするシェイプの出力テンソルのインデックス。

  • 戻り値: 指定されたインデックスによって識別される出力テンソルの形状。

get_values(output_index)

  • 説明: 指定された出力テンソルのデータを照会します。 戻り値は1次元配列です。 このメソッドをget_tensor_shape() メソッドと一緒に呼び出して、出力テンソルの形状を照会できます。 戻り値は多次元配列です。 出力テンソルのデータ型によって、返される1次元配列のデータ型が決まります。

  • パラメーター: output_index: データを照会する出力テンソルのインデックス。

  • 戻り値: 1次元配列。

QueueClient

QueueClient (エンドポイント、queue_name)

  • 説明: QueueClientクラスのクライアントオブジェクトを作成します。

  • パラメーター:

    • endpoint: サーバーのエンドポイント。

    • queueName: 作成するキューの名前。

  • 戻り値: 作成されたクライアントオブジェクト。

set_token(token)

  • 説明: 認証用のQueueClientクラスのクライアントオブジェクトのトークンを作成します。

  • パラメーター: token: 作成するキューのトークン。

init(uid=なし、gid='eas')

  • 説明: QueueClientクラスのクライアントオブジェクトを初期化します。

  • パラメーター:

    • uid: クライアントのユーザーID。 クライアントは、サーバに登録するために使用される。 各クライアントインスタンスには一意のユーザーIDが必要で、各ユーザーIDは1回だけ登録できます。 サーバーによってプッシュされたデータは、ユーザーID間で均等に分散されます。

    • gid: クライアントのグループID。 クライアントは、サーバに登録するために使用される。 デフォルトでは、同じグループIDを持つクライアントは同じグループに属します。 異なるグループが存在する場合、1つのデータレコードがすべてのグループにプッシュされます。

set_logger(logger=なし)

  • 説明: キューのロガーを設定します。 デフォルトでは、ロガーは警告情報を通常の出力として表示します。 ロギングを無効にするには、Noneを長く設定します。

  • パラメーター: logger: 設定するロガー。

truncate (インデックス)

  • 説明: 特定のインデックス値の前のデータを切り捨て、インデックス値の後のデータのみを保持します。

  • パラメーター: index: データの切り捨てに使用されるインデックス値。

put (データ、タグ: dict ={})

  • 説明: データレコードをキューに書き込みます。

  • パラメーター:

    • data: キューに書き込むデータレコード。

    • tags: オプション。 キューに書き込むデータレコードのタグ。

  • 戻り値:

    • index: 書き込まれたデータレコードのインデックス値。 この値は、キュー内のデータを照会するために使用できます。

    • requestId: キューに書き込まれたデータレコードに対して自動的に生成されたリクエストID。 reuqestIdは、キュー内のデータを照会するための特別なタグとして使用できます。

get(request_id=なし、index=0、length=1、timeout='5s' 、auto_delete=True、tags ={})

  • 説明: 指定された条件に基づいてキュー内のデータを照会します。

  • パラメーター:

    • request_id: クエリするデータレコードのリクエストID。 このパラメーターが指定されている場合、システムはindexから始まるデータレコードの最大数を照会します。 データレコードにリクエストIDと一致するレコードが含まれている場合、レコードが返されます。 それ以外の場合は、nullが返されます。

    • index: クエリの開始インデックス。 デフォルト値: 0。これは、クエリが最初のデータレコードから開始することを示します。

    • length: クエリするデータレコードの数。 このパラメーターを指定すると、indexから始まるデータレコードの最大数が返されます。 インデックス値に一致するデータレコードも返されます。

    • timeout: クエリのタイムアウト期間。 タイムアウト期間中、キューにデータが含まれている場合、指定された条件を満たすデータレコードの数が返されます。 それ以外の場合、クエリはタイムアウト期間が終了した後に停止します。

    • auto_delete: 取得したデータレコードをキューから自動的に削除するかどうかを指定します。 auto_deleteをFalseに設定すると、データレコードを繰り返し照会できます。 この場合、Del() メソッドを使用してデータを手動で削除できます。

    • tags: データレコードのクエリに使用されるタグ。 データ型はDICTである必要があります。 このパラメーターを指定すると、指定されたタグが追加されたインデックスから始まるデータレコードが返されます。

  • 戻り値: DataFrame形式で取得したデータレコード。

attributes()

  • 説明: キューの属性を照会します。 属性には、キュー内のデータレコードの総数と、現在のキュー内のデータレコードの数が含まれます。

  • 戻り値: attrs: キューの属性。 データ型はDICTである必要があります。

delete (インデックス)

  • 説明: 指定したインデックス値に一致するデータレコードをキューから削除します。

  • パラメーター: indexes: データレコードの削除に使用される指定されたインデックス値。 単一のインデックス値を文字列として指定したり、複数のインデックス値をリストとして指定したりできます。

search (インデックス)

  • 説明: データレコードのキュー情報を照会します。

  • パラメーター: index: クエリするデータレコードのインデックス。

  • 戻り値: JSONObject型のデータレコードのキュー情報。 情報には、次のフィールドが含まれます。

    • ConsumerId: データレコードを処理するインスタンスのID。

    • IsPending: データレコードが処理されているかどうかを示します。

      • Trueは、データレコードが処理中であることを示します。

      • Falseは、データレコードがキューにあり、処理を待っていることを示します。

    • WaitCount: データレコードの前のデータレコードの数を示します。 このパラメーターは、IsPendingがFalseに設定されている場合にのみ有効です。 IsPendingがTrueに設定されている場合、このパラメーターの値は0です。

    レスポンス例:

    • {'ConsumerId': 'eas.**** ', 'IsPending': False, 'WaitCount':2} が返された場合、データレコードはキューにあります。

    • ログにsearch error:Code 404, Message: b'no data in stream 'が表示され、{} が返された場合、データレコードはキューに見つかりません。 これは、データレコードが処理され、結果がクライアントに返されたか、インデックスパラメータが誤って構成されているためです。

ウォッチ (インデックス、ウィンドウ、index_only=False、auto_commit=False)

  • 説明: キュー内のデータレコードをサブスクライブします。 次に、キューイングサービスは指定された条件に基づいてクライアントにデータをプッシュします。

  • パラメーター:

    • index: サブスクライブされているデータレコードの開始インデックス。

    • window: キューイングサービスによって1つのクライアントにプッシュできるデータレコードの最大数。

      説明

      データレコードがコミットされていない場合、サーバーは他のデータレコードをクライアントにプッシュしません。 次に、N個のデータレコードがコミットされると、N個のデータレコードがサーバにプッシュされる。 これにより、クライアントが処理するデータレコードの数がwindowに指定された値を超えないようにします。 このようにして、クライアント側の同時計算が制御される。

    • index_only: インデックス値のみをプッシュするかどうかを指定します。

    • auto_commit: レコードのプッシュ後にデータレコードを自動的にコミットするかどうかを指定します。 auto_commitをFalseに設定することを推奨します。 この場合、レコードを受信して計算した後に、データレコードを手動でコミットする必要があります。 計算が完了する前にインスタンスで例外が発生した場合、コミットされていないデータレコードはキューサービスによって他のインスタンスにプッシュされます。

  • 戻り値: プッシュされたデータを読み取るために使用されるウォッチャー。

commit(index)

  • 説明: 指定したデータレコードをコミットします。

    説明

    データレコードが処理され、他のインスタンスにプッシュする必要がない場合は、コミットされます。 次に、データレコードをキューから削除することができる。

  • パラメーター: index: コミットされたデータレコードと一致する指定されたインデックス値。 単一のインデックス値を文字列として指定したり、複数のインデックス値をリストとして指定したりできます。

ウォッチャー

run()

  • 説明: ウォッチャーを実行してWebSocketsを使用してサーバーへの接続を確立し、サーバーからプッシュされたデータを受信し、そのデータをリアルタイムで呼び出し元に返します。

  • 戻り値: DataFrame形式で呼び出し元にプッシュされたリアルタイムデータ。

close()

説明: バックエンド接続を閉じるためにウォッチャーを停止します。

説明

1つのクライアントに対して起動できるウォッチャーは1つだけです。 別のウォッチャーを開始する前に、ウォッチャーを閉じる必要があります。

デモ

  • 文字列としての入力と出力

    カスタムプロセッサを使用してモデルをサービスとして展開する場合、文字列は、予測モデルマークアップ言語 (PMML) モデルに基づいて展開されるサービスなど、サービスを呼び出すためによく使用されます。 詳細については、次のデモを参照してください。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example')
        client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****')
        client.init()
    
        request = StringRequest('[{"fea1": 1, "fea2": 2}]')
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • テンソルとしての入力と出力

    TensorFlowを使用してモデルをサービスとして展開する場合は、TFRequestクラスとTFResponseクラスを使用してサービスを呼び出す必要があります。 詳細については、次のデモを参照してください。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****')
        client.init()
    
        #request = StringRequest('[{}]')
        req = TFRequest('predict_images')
        req.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(req)
            print(resp)
  • VPCダイレクト接続チャネルを使用してサービスを呼び出す

    VPCダイレクト接続チャネルを使用して、EAS専用リソースグループにデプロイされているサービスのみにアクセスできます。 さらに、チャネルを使用するには、EASの専用リソースグループと指定されたvSwitchがVPCに接続されている必要があります。 EAS専用リソースグループとネットワーク接続の購入方法については、「専用リソースグループの操作」および「ネットワーク接続の設定」をご参照ください。 通常モードと比較して、このモードにはclient.set_endpoint_type(ENDPOINT_TYPE_DIRECT) という追加のコード行が含まれています。 このモードは、同時実行性が高く、トラフィックが多いシナリオで使用できます。 詳細については、次のデモを参照してください。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    from eas_prediction import ENDPOINT_TYPE_DIRECT
    
    if __name__ == '__main__':
        client = PredictClient('http://pai-eas-vpc.cn-hangzhou.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('M2FhNjJlZDBmMzBmMzE4NjFiNzZhMmUxY2IxZjkyMDczNzAzYjFi****')
        client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)
        client.init()
    
        request = TFRequest('predict_images')
        request.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • PyTorchモデルを呼び出す

    PyTorchを使用してモデルをサービスとして展開する場合は、TorchRequestクラスとTorchResponseクラスを使用してサービスを呼び出す必要があります。 詳細については、次のデモを参照してください。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import TorchRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'pytorch_gpu_wl')
        client.init()
    
        req = TorchRequest()
        req.add_feed(0, [1, 3, 224, 224], TorchRequest.DT_FLOAT, [1] * 150528)
        # req.add_fetch(0)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            print(resp.get_tensor_shape(0))
            # print(resp)
        print("average response time: %s s" % (timer / 10) )
  • Bladeプロセッサベースモデルの呼び出し

    Bladeプロセッサを使用してモデルをサービスとして展開する場合は、BladeRequestクラスとBladeResponseクラスを使用してサービスを呼び出す必要があります。 詳細については、次のデモを参照してください。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import BladeRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = BladeRequest()
    
        req.add_feed('input_data', 1, [1, 360, 128], BladeRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', 1, [1], BladeRequest.DT_INT32, [187])
        req.add_feed('start_token', 1, [1], BladeRequest.DT_INT32, [104])
        req.add_fetch('output', BladeRequest.DT_FLOAT)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • デフォルトのTensorFlowメソッドと互換性のあるBladeプロセッサベースモデルの呼び出し

    TFRequestクラスとTFResponseクラスを使用して、EASでサポートされている既定のTensorFlowメソッドと互換性のあるBladeプロセッサベースのモデルを呼び出すことができます。 詳細については、次のデモを参照してください。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction.blade_tf_request import TFRequest # Need Importing blade TFRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = TFRequest(signature_name='predict_words')
    
        req.add_feed('input_data', [1, 360, 128], TFRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', [1], TFRequest.DT_INT32, [187])
        req.add_feed('start_token', [1], TFRequest.DT_INT32, [104])
        req.add_fetch('output')
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • キューイングサービスを使用したデータの送信とサブスクライブ

    キュー内のデータを送信および照会したり、キューの状態を照会したり、キューによってプッシュされたデータをサブスクライブしたりできます。 次のデモでは、スレッドがキューにデータをプッシュし、別のスレッドがウォッチャーを使用してプッシュされたデータをサブスクライブします。 詳細については、次のデモを参照してください。

    #!/usr/bin/env python
    
    from eas_prediction import QueueClient
    import threading
    
    if __name__ == '__main__':
        endpoint = '182848887922****.cn-shanghai.pai-eas.aliyuncs.com'
        queue_name = 'test_group.qservice/sink'
        token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUx****'
    
        queue = QueueClient(endpoint, queue_name)
        queue.set_token(token)
        queue.init()
        queue.set_timeout(30000)
    
        # truncate all messages in the queue
        attributes = queue.attributes()
        if 'stream.lastEntry' in attributes:
            queue.truncate(int(attributes['stream.lastEntry']) + 1)
    
        count = 100
        # create a thread to send messages to the queue
        def send_thread():
            for i in range(count):
                index, request_id = queue.put('[{}]')
                print('send: ', i, index, request_id)
    
        # create a thread to watch messages from the queue
        def watch_thread():
            watcher = queue.watch(0, 5, auto_commit=True)
            i = 0
            for x in watcher.run():
                print('recv: ', i, x.index, x.tags['requestId'])
                i += 1
                if i == count:
                    break
            watcher.close()
    
        thread1 = threading.Thread(target=watch_thread)
        thread2 = threading.Thread(target=send_thread)
    
        thread1.start()
        thread2.start()
    
        thread1.join()
        thread2.join()