All Products
Search
Document Center

Cloud Parallel File Storage:Best practice of streaming dataflow tasks

Last Updated:Nov 26, 2024

You can create a streaming task to continuously migrate files between a Cloud Parallel File Storage (CPFS) for Lingjun file system and an Object Storage Service (OSS) bucket one by one.

Overview

To import files to and export files from a directory, you need to perform the following steps:

  1. Create a dataflow: You can create a dataflow to map a subdirectory of a CPFS for Lingjun file system to a prefix in an OSS bucket.

  2. Create a streaming task: You can call the CreateDataFlowTask operation to create a streaming import or export task. This task creates a tunnel between the source and destination directories. After a streaming task is created, the task remains in the Running state. However, no data is transmitted over the tunnel. You must create a streaming subtask to transmit data.

  3. Create a streaming subtask: You can call the CreateDataFlowSubTask operation to create an import or export subtask for each different file.

  4. Query the status of a streaming subtask: You can call the DescribeDataFlowSubTask operation to query the progress and status of a streaming subtask that has been submitted. When the value of the Status parameter is COMPLETE and the value of the Progress parameter is 10000 in the returned result, all the source data is imported to or exported from the specified destination directory.

Prerequisites

  • A CPFS for Lingjun file system is created. For more information, see Create a file system.

  • A tag with the key cpfs-dataflow and value true is added to the destination OSS bucket. When a dataflow is being used, do not remove or modify the tag. Otherwise, the CPFS for Lingjun dataflow cannot access the data in the OSS bucket. For more information, see Manage bucket tags.

  • The versioning feature is enabled for an OSS bucket if you need to use multiple dataflows to export data to the OSS bucket. This prevents data conflicts. For more information, see Overview.

Important

Only file systems of CPFS for Lingjun V2.6.0 or later support streaming dataflow tasks. For more information about how to view the version of a file system, see the View the version number of a file system section of the "View the details of a file system" topic.

Create a streaming import task

This example describes how to create a streaming import task and a streaming import subtask to migrate the file.xml file in the /bmcpfs/test/ directory of an OSS bucket named examplebucket to the /oss/mnt/ directory of a CPFS for Lingjun file system named bmcpfs-370jz26fkr2st9****. You can refer to this example to continuously migrate files one by one.

  1. Create a dataflow.

    You can create a dataflow for the destination file system by calling an API operation or performing operations in the console. After the dataflow is created, obtain the dataflow ID, such as df-37bae1804cc6****.

    • Create a dataflow by calling the CreateDataFlow operation.

      {
        "FileSystemId": "bmcpfs-370jz26fkr2st9****", // The ID of the CPFS for Lingjun file system. 
        "SourceStorage": "oss://examplebucket", // The endpoint of the source OSS bucket. 
        "FileSystemPath": "/oss/",  // The directory in the CPFS for Lingjun file system that is linked to OSS. The directory must already exist. 
        "SourceStoragePath": "/bmcpfs/", // The path of the objects in the source OSS bucket. 
      }

      Expected output:

      {
        "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E",
        "DataFlowId": "df-37bae1804cc6****"
      }
    • Create a dataflow in the console. For more information, see Manage data flows.

  2. Create a streaming dataflow import task.

    Call the CreateDataFlowTask operation to create a streaming dataflow import task, and save the value returned for the TaskId parameter.

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // The ID of the CPFS for Lingjun file system. 
      "DataFlowId": "df-37bae1804cc6****", // The ID of the dataflow. 
      "TaskAction": "StreamImport", // The type of the streaming dataflow task. The value is StreamImport for an import task and StreamExport for an export task. 
      "DataType": "MetaAndData", // The data type. Set the value to MetaAndData. 
      "Directory": "/test/", // The relative path of the directory in which the files to be migrated reside. In this example, the prefix of the OSS bucket is used. 
      "DstDirectory": "/mnt/", // The relative path of the directory in which the migrated files reside. In this example, the directory in the CPFS for Lingjun file system is used. 
      "ConflictPolicy": "SKIP_THE_FILE" // The conflict resolution policy for files with the same name. Valid values: OVERWRITE_EXISTING: forcibly overwrites the file with the same name. SKIP_THE_FILE: skips the file with the same name. KEEP_LATEST: keeps the latest version of the file with the same name. 
    }

    Expected output:

    {
      "RequestId": "2D69A58F-345C-4FDE-88E4-BF518948F518",
      "TaskId": "task-376a61ab2d80****"
    }
  3. Create a streaming import subtask.

    Call the CreateDataFlowSubTask operation to create a streaming import subtask.

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // The ID of the CPFS for Lingjun file system. 
      "DataFlowId": "df-37bae1804cc****", // The ID of the dataflow. 
      "DataFlowTaskId": "task-376a61ab2d80****", // The ID of the streaming import task. 
      "SrcFilePath": "/file.xml", // The path of the file to be migrated in the streaming task. In this example, the path of an object in the OSS bucket is used. 
      "DstFilePath": "/mnt/file.xml" // The path of the migrated file in the streaming task. In this example, the directory in the CPFS for Lingjun file system is used. 
    }

    Expected output:

    {
      "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3",
      "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****"
    }
  4. Query the progress and status of a streaming subtask.

    Call the DescribeDataFlowSubTasks operation to query the progress and status of a streaming subtask that has been submitted. Different keys correspond to different values. For more information, see DescribeDataFlowSubTasks.

    This example describes how to query the information about subtasks by dataflow ID.

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // The ID of the CPFS for Lingjun file system. 
      "Filters": [
        {
          "Key": "DataFlowIds",
          "Value": "df-37bae1804cc****"
        }
      ]
    }

    Expected output:

    {
      "RequestId": "98696EF0-1607-4E9D-B01D-F20930B6****",
      "DataFlowSubTask": {
        "DataFlowSubTask": [
          {
            "FileSystemId": "bmcpfs-370jz26fkr2st9****", // The ID of the CPFS for Lingjun file system. 
            "DataFlowId": "df-37bae1804cc****", // The ID of the dataflow. 
            "DataFlowTaskId": "task-37b705830bcb****", // The ID of the streaming dataflow task. 
            "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****",// The ID of the streaming dataflow subtask. 
            "SrcFilePath": "/bmcpfs/test/file.xml",// The path of the file to be migrated. 
            "DstFilePath": "/oss/mnt/file.xml", // The path of the migrated file. 
            "Status": "COMPLETE",
            "Progress": 10000,
            "CreateTime": "2024-10-23 16:28:16",
            "StartTime": "2024-10-23 16:28:17",
            "EndTime": "2024-10-23 16:29:22",
            "ErrorMsg": "",// If this parameter is not returned or its returned result is empty, no error occurs. 
            "ProgressStats": {
              "BytesTotal": 68,
              "BytesDone": 68,
              "ActualBytes": 68,
              "AverageSpeed": 34
            },
            "FileDetail": {
              "ModifyTime": 1725897600000000000,
              "Size": 68,
              "Checksum": "crc64:850309505450944****"// The file checksum. 
            }
          }
        ]
      }
    }

    The values of the Progress and Status parameters indicate the progress and status of a streaming subtask. If the value of the Status parameter is COMPLETE, the subtask is complete. If the value of the Progress parameter is 10000, all the data is imported or exported to the destination directory.

Create a streaming export task

This example describes how to create a streaming export task and a streaming export subtask to migrate the file.png file from the /oss_test/yaml/test/ directory in a CPFS for Lingjun file system named bmcpfs-370jz26fkr2st9**** to the /bmcpfs_test/dataflows/mnt/ directory in an OSS bucket named examplebucket. You can refer to this example to continuously migrate files one by one.

  1. Create a dataflow.

    You can create a dataflow for the destination file system by calling an API operation or performing operations in the console. After the dataflow is created, obtain the dataflow ID, such as df-37bae1804cc6****.

    • Create a dataflow by calling the CreateDataFlow operation.

      {
        "FileSystemId": "bmcpfs-370jz26fkr2st9****", // The ID of the CPFS for Lingjun file system. 
        "SourceStorage": "oss://examplebucket", // The endpoint of the source OSS bucket. 
        "FileSystemPath": "/oss/",  // The directory in the CPFS for Lingjun file system that is linked to OSS. The directory must already exist. 
        "SourceStoragePath": "/bmcpfs/", // The path of the objects in the source OSS bucket. 
      }

      Expected output:

      {
        "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E",
        "DataFlowId": "df-37bae1804cc6****"
      }
    • Create a dataflow in the console. For more information, see Manage data flows.

  2. Create a streaming dataflow export task.

    Call the CreateDataFlowTask operation to create a streaming dataflow export task, and save the value returned for the TaskId parameter.

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // The ID of the CPFS for Lingjun file system. 
      "DataFlowId": "df-37bae1804cc6****", // The ID of the dataflow. 
      "TaskAction": "StreamImport", // The type of the streaming dataflow task. The value is StreamExport for an export task. 
      "DataType": "MetaAndData", // The data type. Set the value to MetaAndData. 
      "Directory": "/yaml/", // The relative path of the directory in which the files to be migrated reside. In this example, the relative path of the directory in the CPFS for Lingjun file system is used. 
      "DstDirectory": "/dataflows/", // The relative path of the directory in which the migrated files reside. In this example, the relative path of the OSS bucket prefix is used. 
      "ConflictPolicy": "SKIP_THE_FILE" // The conflict resolution policy for files with the same name. Valid values: OVERWRITE_EXISTING: forcibly overwrites the file with the same name. SKIP_THE_FILE: skips the file with the same name. KEEP_LATEST: keeps the latest version of the file with the same name. 
    }

    Expected output:

    {
      "RequestId": "BC7C825C-5F65-4B56-BEF6-98C56C7C930B",
      "TaskId": "task-37b705830bcb****"
    }
  3. Create a streaming export subtask.

    Call the CreateDataFlowSubTask operation to create a streaming export subtask.

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // The ID of the CPFS for Lingjun file system. 
      "DataFlowId": "df-37bae1804cc****", // The ID of the dataflow. 
      "DataFlowTaskId": "task-37b705830bcb****", //The ID of the streaming export task. 
      "SrcFilePath": "/test/file.png", // The relative path of the directory in which the file to be migrated resides in the streaming task. 
      "DstFilePath": "/mnt/file.png" // The relative path of the directory in which the migrated file resides in the streaming task. 
    }

    Expected output:

    {
      "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3",
      "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****"
    }
  4. Query the progress and status of a streaming export subtask.

    Call the DescribeDataFlowSubTasks operation to query the progress and status of a streaming subtask that has been submitted. Different keys correspond to different values. For more information, see DescribeDataFlowSubTasks.

    This example describes how to query the information about subtasks by dataflow ID.

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // The ID of the CPFS for Lingjun file system. 
      "Filters": [
        {
          "Key": "DataFlowIds",
          "Value": "df-37bae1804cc****"
        }
      ]
    }

    Expected output:

    {
      "RequestId": "FCBB356-96CA-135B-84B3-02E6F262B6BD",
      "DataFlowSubTask": {
        "DataFlowSubTask": [
          {
            "FileSystemId": "bmcpfs-370jz26fkr2st9****", // The ID of the CPFS for Lingjun file system. 
            "DataFlowId": "df-37bae1804cc****", // The ID of the dataflow. 
            "DataFlowTaskId": "task-37b705830bcb****", // The ID of the streaming dataflow task. 
            "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****",// The ID of the streaming dataflow subtask. 
            "SrcFilePath": "/oss_test/yaml/test/file.png",// The path of the file to be migrated. 
            "DstFilePath": "/bmcpfs_test/dataflows/mnt/file.png", // The path of the migrated file. 
            "Status": "COMPLETE",
            "Progress": 10000,
            "CreateTime": "2024-10-23 17:18:16",
            "StartTime": "2024-10-23 17:18:17",
            "EndTime": "2024-10-23 17:19:00",
            "ErrorMsg": "",// If this parameter is not returned or its returned result is empty, no error occurs. 
            "ProgressStats": {
              "BytesTotal": 68,
              "BytesDone": 68,
              "ActualBytes": 68,
              "AverageSpeed": 34
            },
            "FileDetail": {
              "ModifyTime": 1725897600000000000,
              "Size": 68,
              "Checksum": "crc64:850309505450944****"// The file checksum. 
            }
          }
        ]
      }
    }

    The values of the Progress and Status parameters indicate the progress and status of a streaming subtask. If the value of the Status parameter is COMPLETE, the subtask is complete. If the value of the Progress parameter is 10000, all the data is imported or exported to the destination directory.

What to do next

To cancel a streaming subtask, call the CancelDataFlowSubTask operation. You can cancel only streaming subtasks that are in the CREATED or RUNNING state.

{
  "FileSystemId": "bmcpfs-370jz26fkr2st9****", // The ID of the CPFS for Lingjun file system. 
  "DataFlowId": "df-37bae1804cc****", // The ID of the dataflow. 
  "DataFlowTaskId": "task-37b705830bcb****", // The ID of the streaming import or export task. 
  "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" // The ID of the streaming dataflow subtask. 
}