All Products
Search
Document Center

Platform For AI:Deploy a Kohya-based training as a scalable job

Last Updated:Nov 14, 2024

This topic uses a Kohya-based training service as an example to describe how to deploy a training service as a scalable job by using the independent deployment method or integrated deployment method. This topic also describes how to use the scalable job to perform training, obtain training results, terminate tasks, and query related logs.

Prerequisites

An Object Storage Service (OSS) bucket is created to store the model files and configuration files that are obtained from training. For information about how to create a bucket, see Create a bucket.

Deploy a training job as a scalable job

In this topic, the kohya_ss image provided by Platform for AI (PAI) is used as an example to describe how to deploy a Kohya-based training service as a scalable job.

  1. Go to the EAS-Online Model Services page. For more information, see the "Step 1: Go to the EAS-Online Model Services page" section in the Model service deployment by using the PAI console topic.

  2. Deploy a training service.

    The following deployment methods are supported:

    Integrated deployment

    Deploy a Kohya-based training, including the queue service, resident frontend services, and a scalable job in an integrated manner. Procedure:

    1. On the Elastic Algorithm Service (EAS) page, click Deploy Service. In the dialog box that appears, select Custom Deployment and click OK.

    2. In the Configuration Editor section, click JSON Deployment and enter the configuration information of the training service.

      {
        "cloud": {
          "computing": {
            "instance_type": "ecs.gn6i-c4g1.xlarge"
          }
        },
        "containers": [
          {
            "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2"
          }
        ],
        "features": {
          "eas.aliyun.com/extra-ephemeral-storage": "30Gi"
        },
        "front_end": {
          "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2",
          "port": 8001,
          "script": "python -u kohya_gui.py --listen 0.0.0.0 --server_port 8001 --data-dir /workspace --headless --just-ui --job-service"
        },
        "metadata": {
          "cpu": 4,
          "enable_webservice": true,
          "gpu": 1,
          "instance": 1,
          "memory": 15000,
          "name": "kohya_job",
          "type": "ScalableJobService"
        },
        "name": "kohya_job",
        "storage": [
          {
            "mount_path": "/workspace",
            "oss": {
              "path": "oss://examplebucket/kohya/",
              "readOnly": false
            },
            "properties": {
              "resource_type": "model"
            }
          }
        ]
      }

      The following table describes the parameters in the preceding code.

      Parameter

      Description

      metadata

      name

      The service name. The name is unique in the region.

      type

      The type of the service. Set the value to ScalableJobService, which specifies the Integrated deployment method.

      enable_webservice

      Set the value to true to deploy an AI-powered web application.

      front_end

      image

      The image used to run the frontend instance. Click PAI Image and select kohya_ss from the image drop-down list, and 2.2 from the image version drop-down list.

      Note

      The image version is frequently updated. We recommend that you select the latest version.

      script

      The command that is used to start the frontend instance. Set the value to python -u kohya_gui.py --listen 0.0.0.0 --server_port 8000 --headless --just-ui --job-service. Description of parameters in the preceding code:

      • --listen: associates the application with the specified local IP address to receive and process external requests.

      • --server_port: the port number that is used for listening.

      • --just-ui: enables the independent frontend mode. The frontend service page only displays the service UI.

      • --job-service: runs the training as a scalable job.

      port

      The port number. The value must be the same as the value of the server_port parameter in containers.script.

      containers

      image

      The image that is used to run the scalable job. If you do not specify this parameter, the image that is used to run the frontend instance is used.

      instance_type

      The instance type that is used to run the scalable job. A GPU-accelerated instance type is required. If you do not specify this parameter, the instance type specified by the cloud.computing.instance_type parameter is used.

      storage

      path

      In this topic, Object Storage Service (OSS) is used as an example. Specify an OSS path in the same region to store the model files obtained from the training. Example: oss://examplebucket/kohya/.

      readOnly

      Set this parameter to false. Otherwise, the model file cannot be stored in OSS.

      mount_path

      The mount path. You can specify a custom mount path. In this example, /workspace is used.

      cloud

      instance_type

      The instance type that is used to run the frontend service. Select a CPU type.

    3. Click Deploy.

    Independent deployment

    Separately deploy the scalable job and the frontend service. This way, the scalable job can receive training requests from multiple frontend services. Procedure:

    1. Deploy a scalable job.

      1. On the Elastic Algorithm Service (EAS) page, click Deploy Service. In the dialog box that appears, select Custom Deployment and click OK.

      2. In the Configuration Editor section, click JSON Deployment and enter the configuration information of the scalable job.

        {
          "cloud": {
            "computing": {
              "instance_type": "ecs.gn6i-c4g1.xlarge"
            }
          },
          "containers": [
            {
              "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2"
            }
          ],
          "features": {
            "eas.aliyun.com/extra-ephemeral-storage": "30Gi"
          },
          "metadata": {
            "instance": 1,
            "name": "kohya_scalable_job",
            "type": "ScalableJob"
          },
          "storage": [
            {
              "mount_path": "/workspace",
              "oss": {
                "path": "oss://examplebucket/kohya/",
                "readOnly": false
              },
              "properties": {
                "resource_type": "model"
              }
            }
          ]
        }

        The following table describes the parameters in the preceding code.

        Parameter

        Description

        metadata

        name

        The service name. The name is unique in the region.

        type

        The type of the service. Set the value to ScalableJob, which specifies the Independent deployment method.

        containers

        image

        The image that is used to run the scalable job. Click PAI Image and select kohya_ss from the image drop-down list, and 2.2 from the image version drop-down list.

        Note

        The image version is frequently updated. We recommend that you select the latest version.

        storage

        path

        In this topic, OSS is used as an example. Specify an OSS path in the same region to store the model files obtained from the training. Example: oss://examplebucket/kohya/.

        readOnly

        Set this parameter to false. Otherwise, the model file cannot be stored in OSS.

        mount_path

        The mount path. You can specify a custom mount path. In this example. /workspace is used.

        cloud

        instance_type

        The instance type that is used to run the scalable job. You must select a GPU-accelerated instance type to run Kohya-based training.

      3. Click Deploy.

      4. After you deploy the service, click Invocation Information in the Service Type column. On the Public Endpoint tab, obtain the endpoint and token of the service and save them to your on-premises device.

    2. Optional. Deploy the frontend service.

      1. On the Elastic Algorithm Service (EAS) page, click Deploy Service. In the dialog box that appears, select Custom Deployment and click OK.

      2. In the Configuration Editor section, click JSON Deployment and enter the configuration information of the frontend service.

        {
          "cloud": {
            "computing": {
              "instance_type": "ecs.g6.large"
            }
          },
          "containers": [
            {
              "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2",
              "port": 8000,
              "script": "python kohya_gui.py --listen 0.0.0.0 --server_port 8000 --headless --just-ui --job-service --job-service-endpoint 166233998075****.vpc.cn-hangzhou.pai-eas.aliyuncs.com --job-service-token test-token --job-service-inputname kohya_scalable_job"
            }
          ],
          "metadata": {
            "enable_webservice": true,
            "instance": 1,
            "name": "kohya_scalable_job_front"
          },
          "storage": [
            {
              "mount_path": "/workspace",
              "oss": {
                "path": "oss://examplebucket/kohya/",
                "readOnly": false
              },
              "properties": {
                "resource_type": "model"
              }
            }
          ]
        }

        The following table describes the parameters in the preceding code.

        Parameter

        Description

        metadata

        name

        The name of the frontend service.

        enable_webservice

        Set the value to true to deploy an AI-powered web application.

        containers

        image

        The image that is used to run the frontend service. Click PAI Image and select kohya_ss from the image drop-down list, and 2.2 from the image version drop-down list.

        Note

        The image version is frequently updated. We recommend that you select the latest version.

        script

        The command that is used to start the frontend service. Example: python kohya_gui.py --listen 0.0.0.0 --server_port 8000 --headless --just-ui --job-service --job-service-endpoint 166233998075****.vpc.cn-hangzhou.pai-eas.aliyuncs.com --job-service-token test-token --job-service-inputname kohya_scaled_job. Description of parameters in the preceding code:

        • --listen: associates the application with the specified local IP address to receive and process external requests.

        • --server_port: the port number that is used for listening.

        • --just-ui: enables the independent frontend mode. The frontend service page only displays the service UI.

        • --job-service: runs the training as a scalable job.

        • --job-service-endpoint: the endpoint of the scalable job.

        • --job-service-token: the token of the scalable job.

        • --job-service-inputname: the service name of the scalable job.

        port

        The port number. The value must be the same as the value of the server_port parameter in containers.script.

        storage

        path

        In this topic, OSS is used as an example. Specify an OSS path in the same region to store the model files obtained from the training. Example: oss://examplebucket/kohya/.

        readOnly

        Set this parameter to false. Otherwise, the model file cannot be stored in OSS.

        mount_path

        The mount path. You can specify a custom mount path. In this example. /workspace is used.

        cloud

        instance_type

        The instance type that is used to run the frontend service. Select a CPU type.

    3. Click Deploy.

Call the Kohya-based training service

Call the scalable job service by using the web application

If you deploy the frontend service by using the preset kohya_ss image (2.2 or later) of EAS, click View Web App in the Service Type column after you deploy the service. On the web application page, configure the parameters for Low-Rank Adaptation (LoRA) training. For more information, see Deploy a LoRA SD model by using Kohya_ss in EAS.

image.png

  • Click Start training to send a training request. Click the button only once before the training is completed or terminated. The scalable job automatically scales based on the number of training sessions.

  • Click Stop training to terminate the current training task.

Call the scalable job service that uses a custom image

You can call the scalable job by using the SDK for Python. The system sends a command request and obtains the task execution logs. If you also want to call the service by using the web UI and deploy the frontend service by using a custom image, you need to implement the following interfaces. After you deploy the service, you can use the web UI to call the scalable job. Procedure:

  1. Obtain the endpoint and token of the scalable job service.

    Integrated deployment

    On the Elastic Algorithm Service (EAS) page, click the service name to go to the Service Details page. In the Basic Information section, click View Endpoint Information. On the Public Endpoint tab, obtain the endpoint and token of the service and save them to your on-premises device. Note:

    • The service endpoint is in the <queue_name>.<service_name>.<uid>.<region>.pai-eas.aliyuncs.com format. Example: kohya-job-queue-b****4f0.kohya-job.175805416243****.cn-beijing.pai-eas.aliyuncs.com. <queue_name> is the part before -0 in the instance name of the queue service. You can view the instance name in the service instance list on the Service Details page. image

    • Sample token: OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****==.

    Independent deployment

    On the Elastic Algorithm Service (EAS) page, click Invocation Method in the Service Type column of the service to obtain the endpoint and token. Note:

    • Sample service endpoint: 175805416243****.cn-beijing.pai-eas.aliyuncs.com.

    • Sample token: Njk5NDU5MGYzNmRlZWQ3ND****QyMDIzMGM4MjExNmQ1NjE1NzY5Mw==.

  2. Install the SDK for Python.

    pip install -U eas-prediction --user

    For more information about SDK for Python, see SDK for Python.

  3. Create clients for the input queue and the output queue.

    Integrated deployment

    from eas_prediction import QueueClient
    
    if __name__ == '__main__':
        token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****=='
        input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com'
        sink_url = input_url + '/sink'
    
        # Create an input queue to send training requests and termination requests for command tasks. 
        inputQueue = QueueClient(custom_url = input_url)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # Create an output queue to obtain the execution status and logs of the command tasks. 
        sinkQueue = QueueClient(custom_url = sink_url)
        sinkQueue.set_token(token)
        sinkQueue.init()
    

    Description of parameters in the preceding code:

    • Replace token with the token that you obtained in the preceding step.

    • Replace input_url with the service endpoint that you obtained in the preceding step.

    Independent deployment

    from eas_prediction import QueueClient
    
    if __name__ == '__main__':
        endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com'
        token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OT****c1MTUxNg=='
        input_name = 'kohya_scalable_job'
        sink_name = input_name + '/sink'
    
        # Create an input queue to send training requests and termination requests for command tasks. 
        inputQueue = QueueClient(endpoint, input_name)
        inputQueue.set_token(token)
        inputQueue.init()
    
        # Create an output queue to obtain the execution status and logs of the command tasks. 
        sinkQueue = QueueClient(endpoint, sink_name)
        sinkQueue.set_token(token)
        sinkQueue.init()
    

    Description of parameters in the preceding code:

    • Replace endpoint with the service endpoint that you obtained in the preceding step.

    • Replace token with the service token that you obtained in the preceding step.

    • Set input_name to the name of the scalable job service.

  4. Send a training request to the input queue.

    Integrated deployment

    from eas_prediction import QueueClient
    import uuid
    
    if __name__ == '__main__':
        token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****=='
        input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com'
        sink_url = input_url + '/sink'
    
        # Create an input queue client to send command requests. 
        inputQueue = QueueClient(custom_url = input_url)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # Generate a unique task ID for each task request. 
        task_id = uuid.uuid1().hex
        # Create a command string. 
        cmd = "for i in {1..10}; do date; sleep 1; done;"
        # Set the taskType parameter to command and specify the task ID. 
        tags = {"taskType": "command", "taskId": task_id}
        # Send a training request to the input queue. 
        index, request_id = inputQueue.put(cmd, tags)
        print(f'send index: {index}, request id: {request_id}')
    

    The following table describes the parameters in the preceding code.

    Parameter

    Description

    token

    Replace the value with the token that you obtained in the preceding step.

    input_url

    Replace the value with the service endpoint that you obtained in the preceding step.

    cmd

    Specify the command that you want to run. If you use a Python command, you must add the -u parameter to obtain the task execution logs in real time.

    tags

    For a training request:

    • taskType: Set the value to command.

    • taskId: the ID of the training task, which uniquely identifies the training task.

    Independent deployment

    from eas_prediction import QueueClient
    import uuid
    
    if __name__ == '__main__':
        endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com'
        token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYj****djN2IyODc1MTM5ZQ=='
        input_name = 'kohya_scalable_job'
    
        # Create an input queue client to send command requests. 
        inputQueue = QueueClient(endpoint, input_name)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # Generate a unique task ID for each task request. 
        task_id = uuid.uuid1().hex
        # Create a command string. 
        cmd = "for i in {1..10}; do date; sleep 1; done;"
        # Set the taskType parameter to command and specify the task ID. 
        tags = {"taskType": "command", "taskId": task_id}
        # Send a training request to the input queue. 
        index, request_id = inputQueue.put(cmd, tags)
        print(f'send index: {index}, request id: {request_id}')
    

    The following table describes the parameters in the preceding code.

    Parameter

    Description

    endpoint

    Replace the value with the endpoint that you obtained in the preceding step.

    token

    Replace the value with the service token that you obtained in the preceding step.

    cmd

    Specify the command that you want to run. If you use a Python command, you must add the -u parameter to obtain the task execution logs in real time.

    tags

    For a training request:

    • taskType: Set the value to command.

    • taskId: the ID of the training task, which uniquely identifies the training task.

  5. Query the queuing status of requests.

    Integrated deployment

    from eas_prediction import QueueClient
    import uuid
    
    if __name__ == '__main__':
        token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****=='
        input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com'
        sink_url = input_url + '/sink'
    
        # Create an input queue client to send command requests. 
        inputQueue = QueueClient(custom_url = input_url)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # Send a command request to the input queue. 
        task_id = uuid.uuid1().hex
        cmd = "for i in {1..100}; do date; sleep 1; done;"
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
    
        # Query the queuing status of the requests. 
        search_info = inputQueue.search(index)
        print("index: {}, search info: {}".format(index, search_info))
    

    Description of parameters in the preceding code:

    • Replace token with the token that you obtained in the preceding step.

    • Replace input_url with the service endpoint that you obtained in the preceding step.

    Independent deployment

    from eas_prediction import QueueClient
    import uuid
    
    if __name__ == '__main__':
        endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com'
        token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYjBjOTdjN2I****1MTM5ZQ=='
        input_name = 'kohya_scalable_job'
    
        # Create an input queue client to send command requests. 
        inputQueue = QueueClient(endpoint, input_name)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # Send a command request to the input queue. 
        task_id = uuid.uuid1().hex
        cmd = "for i in {1..100}; do date; sleep 1; done;"
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
    
        # Query the queuing status of the requests. 
        search_info = inputQueue.search(index)
        print("index: {}, search info: {}".format(index, search_info))
    

    Description of parameters in the preceding code:

    • Replace endpoint with the endpoint that you obtained in the preceding step.

    • Replace token with the service token that you obtained in the preceding step.

    Sample response in the JSON format:

    {
    	'IsPending': False,
    	'WaitCount': 0
    }

    The following table describes the fields in the response.

    Field

    Description

    IsPending

    Indicates whether the request is being processed. Valid values:

    • True: The request is being processed.

    • False: The request is in the queue and waiting to be processed.

    WaitCount

    The sequence of the request in the queue. This parameter is valid only if you set the IsPending parameter to False. If you set the IsPending parameter to True, the value is 0.

  6. Obtains the execution result from the output queue.

    The execution logs of training tasks are written to the output queue in real time. You can call the queue.get(request_id=request_id, length=1, timeout='0s', tags=tags) method to obtain the logs of the training task that has the specified task ID. Sample code:

    Integrated deployment

    from eas_prediction import QueueClient
    import json
    import uuid
    
    if __name__ == '__main__':
        token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****=='
        input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com'
        sink_url = input_url + '/sink'
    
        # Create an input queue client to send command requests. 
        inputQueue = QueueClient(custom_url = input_url)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # Create an output queue client to obtain command execution logs. 
        sinkQueue = QueueClient(custom_url = sink_url)
        sinkQueue.set_token(token)
        sinkQueue.init()
    
        # Send a command request to the input queue. 
        cmd = "for i in {1..10}; do date; sleep 1; done;"
        task_id = uuid.uuid1().hex
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
    
        # Obtain the logs of the training task that has the specified task ID from the output queue in real time. 
        running = True
        while running:
            dfs = sinkQueue.get(length=1, timeout='0s', tags=tags)
            if len(dfs) == 0:
                continue
            df = dfs[0]
            data = json.loads(df.data.decode())
            state = data["state"]
            print(data.get("log", ""))
            if state in {"Exited", "Stopped", "Fatal", "Backoff"}:
                running = False
    

    Description of parameters in the preceding code:

    • Replace token with the token that you obtained in the preceding step.

    • Replace input_url with the service endpoint that you obtained in the preceding step.

    Independent deployment

    from eas_prediction import QueueClient
    import json
    import uuid
    
    if __name__ == '__main__':
        endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com'
        token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYjBjOTdjN2IyOD****M5ZQ=='
        input_name = 'kohya_scalable_job'
        sink_name = input_name + '/sink'
    
        # Create an input queue client to send command requests. 
        inputQueue = QueueClient(endpoint, input_name)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # Create an output queue client to obtain command execution logs. 
        sinkQueue = QueueClient(endpoint, sink_name)
        sinkQueue.set_token(token)
        sinkQueue.init()
    
        # Send a command request to the input queue. 
        cmd = "for i in {1..10}; do date; sleep 1; done;"
        task_id = uuid.uuid1().hex
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
    
        # Obtain the logs of the training task that has the specified task ID from the output queue in real time. 
        running = True
        while running:
            dfs = sinkQueue.get(length=1, timeout='0s', tags=tags)
            if len(dfs) == 0:
                continue
            df = dfs[0]
            data = json.loads(df.data.decode())
            state = data["state"]
            print(data.get("log", ""))
            if state in {"Exited", "Stopped", "Fatal", "Backoff"}:
                running = False
    

    Description of parameters in the preceding code:

    • Replace endpoint with the endpoint that you obtained in the preceding step.

    • Replace token with the service token that you obtained in the preceding step.

    Sample response in the BYTES format:

    {
    	"taskId": "e97409eea4a111ee9cb600163e08****",
    	"command": "python3 -u test.py --args=xxx",
    	"state": "Running",
    	"log": "prepare tokenizer\\n"
    }

    The following table describes the fields in the response.

    Field

    Description

    taskId

    The ID of the task.

    command

    The command that is run by the task.

    state

    The execution status of the command. Valid values:

    • Running: The task is running.

    • Exited: The task has exited.

    • Fatal: An exception occurred in the task.

    • Stopping: The task is being stopped.

    • Stopped: The task is stopped.

    log

    The logs. The system generates the logs that have the same task ID in the sequence in which the logs are obtained.

  7. Stop the training task.

    If you want to stop a training task after you send the request, you need to check whether the request is in the Queuing or Running state. Use the queue.search(index) method to obtain the request status. Examples:

    Integrated deployment

    from eas_prediction.queue_client import QueueClient
    import uuid
    
    if __name__ == '__main__':
        token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****=='
        input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com'
        sink_url = input_url + '/sink'
    
        # Create an input queue client to send command requests and terminate requests. 
        inputQueue = QueueClient(custom_url = input_url)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # Send a command request to the input queue. 
        cmd = "for i in {1..10}; do date; sleep 1; done;"
        task_id=uuid.uuid1().hex# The task_id of the request. 
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
        print(f'cmd send, index: {index}, task_id: {task_id}')
    
        job_index=index# The index returned after you send the task request. 
    
        pending_detail = inputQueue.search(job_index)
        print(f'search info: {pending_detail}')
        if len(pending_detail) > 0 and pending_detail.get("IsPending", True) == False:
            # The command task is in the queue and is deleted from the input queue. 
            inputQueue.delete(job_index)
            print(f'delete task index: {job_index}')
        else:
            # The command task is being run. The system sends a stop signal to the input queue. 
            stop_data = "stop"
            tags = {"_is_symbol_": "true", "taskId": task_id}
            inputQueue.put(stop_data, tags)
            print(f'stop task index: {job_index}')
    

    The following table describes the parameters in the preceding code.

    Parameter

    Description

    token

    Replace the value with the token that you obtained in the preceding step.

    input_url

    Replace the value with the service endpoint that you obtained in the preceding step.

    stop_data

    Set the value to stop.

    tags

    • _is_symbol_: Set the value to true to terminate the task. This parameter is required.

    • task_id: The ID of the task that you want to terminate.

    Independent deployment

    from eas_prediction.queue_client import QueueClient
    import uuid
    
    if __name__ == '__main__':
        endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com'
        token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYjBjOTdjN2IyODc1MTM5****'
        input_name = 'kohya_scalable_job'
    
        # Create an input queue client to send command requests and terminate requests. 
        inputQueue = QueueClient(endpoint, input_name)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # Send a command request to the input queue. 
        cmd = "for i in {1..10}; do date; sleep 1; done;"
    
        # The task_id of the request. 
        task_id = uuid.uuid1().hex
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
        print(f'cmd send, index: {index}, task_id: {task_id}')
    
        job_index=index# The index returned after you send the task request. 
    
        pending_detail = inputQueue.search(job_index)
        print(f'search info: {pending_detail}')
        if len(pending_detail) > 0 and pending_detail.get("IsPending", True) == False:
            # The command task is in the queue and is deleted from the input queue. 
            inputQueue.delete(job_index)
            print(f'delete task index: {job_index}')
        else:
            # The command task is being run. The system sends a stop signal to the input queue. 
            stop_data = "stop"
            tags = {"_is_symbol_": "true", "taskId": task_id}
            inputQueue.put(stop_data, tags)
            print(f'stop task index: {job_index}')
    

    The following table describes the parameters in the preceding code.

    Parameter

    Description

    endpoint

    Replace the value with the endpoint that you obtained in the preceding step.

    token

    Replace the value with the service token that you obtained in the preceding step.

    stop_data

    Set the value to stop.

    tags

    • _is_symbol_: Set the value to true to terminate the task. This parameter is required.

    • task_id: The ID of the task that you want to terminate.

References