DataHub SDK for Java
1. Maven dependencies and JDK
Maven POM
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.19.0-public</version>
</dependency>
JDK version jdk: >= 1.8
2. Usage notes
If your current SDK version is updated from V2.9, the setTimestampInms method is replaced by another method. Take note that the timestamp value in the new version is that in V2.9 multiplied by 1,000.
In general, the
putRecords or putRecordsByShard
, andgetRecords
methods are the most frequently called to read and write data. Other methods, such asgetTopic
,getCursor
, andlistShard
, are called only during initialization.You can initialize one or more DataHub clients in a project. Multiple DataHub clients can be concurrently used.
Different packages may contain classes of the same names but in different directories. DataHub SDK for Java V2.12 uses the classes in the com.aliyun.datahub.client package, whereas the classes of the same names in other packages are provided for versions earlier than V2.12. Examples:
The com.aliyun.datahub.client.model.RecordSchema package is used for DataHub SDK for Java V2.12.
The com.aliyun.datahub.common.data.RecordSchema package contains the code of DataHub SDK for Java whose version is earlier than V2.12. If you update your SDK version to V2.12 or later but do not modify the code, the code in the package can still be used.
If the "Parse body failed, Offset: 0" error occurs, you can set the enableBinary parameter to false.
3. Use DataHub SDK for Java
Initialization
You can use an Alibaba Cloud account to access DataHub. To access DataHub, you must provide your AccessKey ID and AccessKey secret, and the endpoint that is used to access DataHub. The following sample code provides an example on how to create a DataHub client by using a DataHub endpoint:
// In this example, an endpoint of the China (Hangzhou) region is used. You can also use an endpoint of another region as needed.
String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
String accessId = "<YourAccessKeyId>";
String accessKey = "<YourAccessKeySecret>";
// Create a DataHub client.
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
// Specify whether to enable binary data transmission. In DataHub SDK for Java V2.12 and later, the server supports binary data transmission.
new AliyunAccount(accessId, accessKey), true))
// If an error occurs in Apsara Stack DataHub, set this parameter to false.
// The HttpConfig parameter is optional. If you do not set the HttpConfig parameter, the default value is used.
.setHttpConfig(new HttpConfig()
.setCompressType(HttpConfig.CompressType.LZ4) // When you read data from or write data to DataHub, we recommend that you use the LZ4 compression algorithm for data transmission.
.setConnTimeout(10000))
.build();
Configuration description: DatahubConfig
Parameter |
Description |
endpoint |
The endpoint that is used to access DataHub. |
account |
The information about the Alibaba Cloud account. |
enableBinary |
Specifies whether to perform binary data transmission. In DataHub SDK for Java V2.12 and later, the server supports binary data transmission. If the SDK version is earlier than V2.12, set this parameter to false. If the "Parse body failed, Offset:0" error occurs in Apsara Stack DataHub, set this parameter to false. |
HttpConfig
Parameter |
Description |
readTimeout |
The timeout period of a socket read/write. Unit: seconds. Default value: 10. |
connTimeout |
The timeout period of a TCP connection. Unit: seconds. Default value: 10. |
maxRetryCount |
The maximum number of retries after a request failure. Default value: 1. We recommend that you do not change the value. The upper business layer performs retries. |
debugRequest |
Specifies whether to display the request logs. Default value: false. |
compressType |
The compression mode for data transmission. By default, no compression mode is used. LZ4 and deflate compression modes are supported. |
proxyUri |
The uniform resource identifier (URI) of the proxy host. |
proxyUsername |
The username that is verified by the proxy server. |
proxyPassword |
The password that is verified by the proxy server. |
SDK statistics You can use DataHub SDK for Java to collect statistics on the data read/write requests, such as the queries that are initiated per second. You can call the following method to collect statistics:
ClientMetrics.startMetrics();
By default, the statistics on metrics are displayed in log files. In this case, you must configure Simple Logging Facade for Java (SLF4J). The following metric package is used: com.aliyun.datahub.client.metrics
.
Write data to DataHub
In the following example, data is written to a tuple topic in DataHub.
// Write tuple records.
public static void tupleExample(String project,String topic,int retryTimes) {
// Obtain the schema.
RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
// Generate 10 records.
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// You can specify additional attributes, such as the IP address and machine name of the server, for each record. If you do not specify additional attributes, data writing is not affected.
recordEntry.addAttribute("key1", "value1");
TupleRecordData data = new TupleRecordData(recordSchema);
data.setField("field1", "HelloWorld");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntries.add(recordEntry);
}
try {
PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
int i = result.getFailedRecordCount();
if (i > 0) {
retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
}
} catch (DatahubClientException e) {
System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
}
}
// The retry mechanism.
public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
boolean suc = false;
while (retryTimes != 0) {
retryTimes = retryTimes - 1;
PutRecordsResult recordsResult = client.putRecords(project, topic, records);
if (recordsResult.getFailedRecordCount() > 0) {
retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
}
suc = true;
break;
}
if (!suc) {
System.out.println("retryFailure");
}
}
Create a subscription to consume DataHub data
// The following sample code provides an example on how to consume data from a saved offset and submit offsets during consumption.
public static void example() {
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1. Obtain the cursor of the record at the current offset. If the record expires or is not consumed, obtain the cursor of the first record within the time to live (TTL) of the topic.
String cursor = null;
// If the sequence number is smaller than 0, the record is not consumed.
if (subscriptionOffset.getSequence() < 0) {
// Obtain the cursor of the first record within the TTL of the topic.
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// Obtain the cursor of the next record.
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
// If the SeekOutOfRange error is returned after you obtain the cursor based on the sequence number, the record of the current cursor expires.
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// Obtain the cursor of the first record within the TTL of the topic.
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2. Read records and save offsets. For example, read tuple records and save an offset each time 1,000 records are read.
long recordCount = 0L;
// Read 10 records each time.
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// If no record can be read, pause the thread for 1,000 ms and continue to read records.
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
// Consume data.
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// Save the offset after the data is consumed.
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
// Submit the offset.
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
// The subscription session is exited. Offline: The subscription is offline. SubscriptionSessionInvalid: The subscription is also used on other clients.
break;
} catch (SubscriptionOffsetResetException e) {
// The offset is reset. You must obtain the offset information of the subscription again. In this example, the sequence number is rest.
// If the timestamp is rest, you must use the CursorType.SYSTEM_TIME parameter to obtain the cursor.
subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (DatahubClientException e) {
// TODO: Specify whether to exit when an error occurs.
} catch (Exception e) {
break;
}
}
}
4.Error types
This section describes the types of errors related to DataHub SDK for Java V2.12 and later. You can configure a try-catch mechanism to catch the error type and process errors. Only DatahubClientException and LimitExceededException errors can be resolved by retires. Some DatahubClientException errors, such as the errors that are caused because the server is busy or unavailable, can be resolved by retries. We recommend that you add retry logic to the code for DatahubClientException and LimitExceededException errors. However, the number of retries must be limited. The following table describes the types of errors related to DataHub SDK for Java V2.12 and later. The error files are stored in the following package: com.aliyun.datahub.client.exception
.
Error type |
Error message |
Description |
InvalidParameterException |
InvalidParameter, InvalidCursor |
The error message returned because a specified parameter is invalid. |
ResourceNotFoundException |
ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo |
The error message returned because the resource to be accessed does not exist. If you immediately send another request after you split or merge shards, this error message is returned. |
ResourceAlreadyExistException |
ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist |
The error message returned because the resource already exists. If the resource that you want to create already exists, this error message is returned. |
SeekOutOfRangeException |
SeekOutOfRange |
The error message returned because the specified sequence number is invalid or the specified timestamp is later than the current timestamp when you obtain the cursor. The sequence number may become invalid because the record of the cursor expires. |
AuthorizationFailureException |
Unauthorized |
The error message returned because an error occurs when the authorization signature is being parsed. Check whether the AccessKey pair is valid. |
NoPermissionException |
NoPermission, OperationDenied |
The error message returned because you do not have permissions. Check whether the RAM configurations are valid or the RAM user is authorized. |
ShardSealedException |
InvalidShardOperation |
The error message returned because the shard is closed and data cannot be read from or written to the shard. If you continue to write data to the shard or continue to read data after the last data record is read from the shard, this error message is returned. |
LimitExceededException |
LimitExceeded |
The error message returned because the limits of DataHub SDK for Java have been exceeded. For more information, see Limits. |
SubscriptionOfflineException |
SubscriptionOffline |
The error message returned because the subscription is offline and cannot be used. |
SubscriptionSessionInvalidException |
OffsetSessionChanged, OffsetSessionClosed |
The error message returned because the subscription session is abnormal. When a subscription is used, a session is established to submit offsets. If the subscription is also used on another client, this error message is returned. |
SubscriptionOffsetResetException |
OffsetReseted |
The error message returned because the offset of a subscription is reset. |
MalformedRecordException |
MalformedRecord,ShardNotReady |
The error message returned because the record format is invalid. This may be caused because the schema is invalid, non-UTF-8 characters exist, or the client uses the protocol buffer (PB) protocol but the server does not support the PB protocol. |
DatahubClientException |
All other errors. This error type is the base class of all errors. |
The error message returned because the error does not fall in the preceding error types. This type of error can be resolved by retries. However, the number of retries must be limited. |
5.Methods
Manage projects
A project is a basic unit for managing data in DataHub. A project contains multiple topics. The projects in DataHub are independent of those in MaxCompute. You cannot reuse MaxCompute projects in DataHub. You must create projects in DataHub.
Create a project
Syntax: CreateProjectResult createProject(String projectName, String comment)
When you create a project, you must set the project name and enter the project description. The project name must be 3 to 32 characters in length, and can contain letters, digits, and underscores (_). The project name must start with a letter and is not case-sensitive.
Parameters
projectName: the name of the project.
comment: the comments on the project.
Errors
DatahubClientException
Sample code
public static void createProject(String projectName,String projectComment) {
try {
datahubClient.createProject(projectName, projectComment);
System.out.println("create project successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Delete a project
Syntax: DeleteProjectResult deleteProject(String projectName). Make sure that the project contains no topic before you delete the project. Parameters: projectName: the name of the project.
Errors
DatahubClientException
NoPermissionException: If the project contains a topic, this error is returned.
Sample code
public static void deleteProject(String projectName) {
try {
datahubClient.deleteProject(projectName);
System.out.println("delete project successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Update a project
Syntax: UpdateProjectResult updateProject(String projectName, String comment). You can update only the comments on a project. Parameters: projectName: the name of the project. comment: the comments on the project.
Errors
DatahubClientException
Sample code
public static void updateProject(String projectName,String newComment) {
try {
datahubClient.updateProject(projectName, newComment);
System.out.println("update project successful");
} catch (DatahubClientException e) {
System.out.println("other error");
}
}
List projects
Syntax: ListProjectResult listProject(). The return result of the listProject method is a ListProjectResult object, which contains a list of project names.
Parameters: none
Errors
DatahubClientException
Sample code
public static void listProject() {
try {
ListProjectResult listProjectResult = datahubClient.listProject();
if (listProjectResult.getProjectNames().size() > 0) {
for (String pName : listProjectResult.getProjectNames()) {
System.out.println(pName);
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Query a project
Syntax: GetProjectResult getProject(String projectName). You can call the getProject method to view the attribute information of the current project. Parameters: projectName: the name of the project.
Errors
DatahubClientException
Sample code
public static void getProject(String projectName) {
try {
GetProjectResult getProjectResult = datahubClient.getProject(projectName );
System.out.println(getProjectResult.getCreateTime() + "\t"
+ getProjectResult.getLastModifyTime() + "\t"
+ getProjectResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Manage topics
A topic is the smallest unit for data subscription and publishing in DataHub. You can use topics to distinguish different types of streaming data. Two types of topics are supported: tuple and blob.
You can write a block of binary data as a record to blob topics.
Tuple topics contain records that are similar to data records in databases. Each record contains multiple columns. You must specify record schemas for tuple topics because the data in tuple topics is transmitted as strings over the network. Therefore, schemas are required for data type conversion. The following table describes the data types that are supported.
Type |
Description |
Value range |
BIGINT |
An eight-byte signed integer. |
-9223372036854775807 to 9223372036854775807. |
DOUBLE |
A double-precision floating-point number. It is eight bytes in length. |
-1.0 _10^308 to 1.0 _10^308. |
BOOLEAN |
The Boolean type. |
True and False, true and false, or 0 and 1. |
TIMESTAMP |
The type of timestamp. |
A timestamp that is accurate to microseconds. |
STRING |
A string. Only UTF-8 encoding is supported. |
The size of all values in a column of the STRING type cannot exceed 2 MB. |
TINYINT |
A single-byte integer. |
-128 to 127. |
SMALLINT |
A double-byte integer. |
-32768 to 32767. |
INTEGER |
A four-byte integer. |
-2147483648 to 2147483647. |
FLOAT |
A four-byte single-precision floating-point number. |
-3.40292347_10^38 to 3.40292347_10^38. |
DataHub SDK for Java V2.16.1-public and later support TINYINT, SMALLINT, INTEGER, and FLOAT.
Create a tuple topic
Syntax: CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, String comment)
Parameters
projectName: the name of the project in which you want to create the topic.
topicName: the name of the topic.
shardCount: the number of initial shards in the topic.
lifeCycle: the TTL of the data. Unit: days. The data that is written before that time is not accessible.
recordType: the type of record that you want to write. Valid values: TUPLE and BLOB.
recordSchema: the record schema for the topic.
comment: the comments on the topic.
Errors
DatahubClientException
Sample code
public static void createTupleTopic(String projectName, String topicName, int shardCount, int lifeCycle, String topicComment) {
RecordSchema schema = new RecordSchema();
schema.addField(new Field("bigint_field", FieldType.BIGINT));
schema.addField(new Field("double_field", FieldType.DOUBLE));
schema.addField(new Field("boolean_field", FieldType.BOOLEAN));
schema.addField(new Field("timestamp_field", FieldType.TIMESTAMP));
schema.addField(new Field("tinyint_field", FieldType.TINYINT));
schema.addField(new Field("smallint_field", FieldType.SMALLINT));
schema.addField(new Field("integer_field", FieldType.INTEGER));
schema.addField(new Field("floar_field", FieldType.FLOAT));
schema.addField(new Field("decimal_field", FieldType.DECIMAL));
schema.addField(new Field("string_field", FieldType.STRING));
try {
datahubClient.createTopic(projectName,topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicComment);
System.out.println("create topic successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Create a blob topic
Syntax: CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, String comment)
Parameters
projectName: the name of the project in which you want to create the topic.
topicName: the name of the topic.
shardCount: the number of initial shards in the topic.
lifeCycle: the TTL of the data. Unit: days. The data that is written before that time is not accessible.
recordType: the type of record that you want to write. Valid values: TUPLE and BLOB.
comment: the comments on the topic.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
ResourceAlreadyExistException
Sample code
public static void createBlobTopic(String projectName, String topicName, int shardCount, int lifeCycle, String topicComment) {
try {
datahubClient.createTopic(projectName, blobTopicName, shardCount, lifeCycle, RecordType.BLOB, topicComment);
System.out.println("create topic successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
Delete a topic
Make sure that the topic contains no subscription or DataConnector before you delete the topic. Otherwise, the NoPermission error is reported.
Syntax: DeleteTopicResult deleteTopic(String projectName, String topicName)
Parameters
projectName: the name of the project in which you want to delete the topic.
topicName: the name of the topic.
Errors
DatahubClientException
NoPermissionException: If the topic contains a subscription or DataConnector, this error is returned.
Sample code
public static void deleteTopic(String projectName, String topicName) {
try {
datahubClient.deleteTopic(projectName, topicName);
System.out.println("delete topic successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
List topics
Syntax: ListTopicResult listTopic(String projectName)
Parameters
projectName: the name of the project in which you want to list projects.
Sample code
public static void listTopic(String projectName ) {
try {
ListTopicResult listTopicResult = datahubClient.listTopic(projectName);
if (listTopicResult.getTopicNames().size() > 0) {
for (String tName : listTopicResult.getTopicNames()) {
System.out.println(tName);
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Update a topic
You can update the comments on and TTL of a topic.
Syntax: UpdateTopicResult updateTopic(String projectName, String topicName, int lifeCycle, String comment)
Parameters
projectName: the name of the project in which you want to update the topic.
topicName: the name of the topic.
comment: the comments to be updated.
lifeCycle: the TTL of the topic.
Errors
DatahubClientException
Sample code
public static void updateTopic(String projectName, String topicName, int lifeCycle, String comment) {
try {
comment = "new topic comment";
lifeCycle = 1;
datahubClient.updateTopic(projectName, Constant.topicName,lifeCycle, comment);
System.out.println("update topic successful");
// View the updated results.
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Query a topic
Syntax: GetTopicResult getTopic(String projectName, String topicName). You can call the getTopic method to obtain the attribute information about a topic.
Parameters
projectName: the name of the project in which you want to query the topic.
topicName :the name of the topic.
Errors
DatahubClientException
Sample code
public static void getTopic(String projectName, String topicName) {
try {
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getShardCount() + "\t"
+ getTopicResult.getLifeCycle() + "\t"
+ getTopicResult.getRecordType() + "\t"
+ getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Add fields to a tuple topic
You can add a single field or add multiple fields at a time.
Syntax: AppendFieldResult appendField(String projectName, String topicName, Field field)
Parameters
projectName: the name of the project where the topic to which you want to add fields resides.
topicName: the name of the topic.
fields: the field to be added. All fields can be set to null.
Errors
DatahubClientException
Sample code
public static void appendNewField(String projectName,String topicName) {
try {
Field newField = new Field("newField", FieldType.STRING, true,"comment");
datahubClient.appendField(projectName, topicName, newField);
System.out.println("append field successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
AppendFieldResult appendField(String projectName, String topicName, List fields);
Parameters
projectName: the name of the project where the topic to which you want to add fields resides.
topicName:the name of the topic.
fields: the fields to be added. All fields can be set to null.
Errors
DatahubClientException
Sample code
public static void appendNewField(String projectName,String topicName) {
try {
List<Field> list = new ArrayList<>();
Field newField1 = new Field("newField1", FieldType.STRING, true,"comment");
list.add(newField1);
datahubClient.appendField(projectName, topicName, list);
System.out.println("append field successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Manage shards
Shards are concurrent tunnels used for data transmission in a topic. Each shard has an ID. A shard can be in different states. Opening: The shard is being started. Active: The shard is started and can be used to provide services. Each active shard consumes server resources. We recommend that you create shards as needed.
List shards
Syntax: ListShardResult listShard(String projectName, String topicName)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
Errors
DatahubClientException
Sample code
public static void listShard(String projectName, String topicName) {
try {
ListShardResult listShardResult = datahubClient.listShard(projectName, topicName);
if (listShardResult.getShards().size() > 0) {
for (ShardEntry entry : listShardResult.getShards()) {
System.out.println(entry.getShardId() + "\t"
+ entry.getState() + "\t"
+ entry.getLeftShardId() + "\t"
+ entry.getRightShardId());
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Split a shard
You can split an active shard of a specified topic. After a shard is split, two active new shards are generated, whereas the original shard is closed. You can only read data from, but not write data to, a closed shard. You can use the default split key or specify a split key to split a shard.
Syntax: SplitShardResult splitShard(String projectName, String topicName, String shardId), or SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
shardId: the ID of the shard to be split.
splitKey: the split key that is used to split the shard.
Errors
DatahubClientException
Sample code
public static void splitShard(String projectName, String topicName, String shardId) {
try {
shardId = "0";
SplitShardResult splitShardResult = datahubClient.splitShard(projectName, topicName, shardId);
for (ShardEntry entry : splitShardResult.getNewShards()) {
System.out.println(entry.getShardId());
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Merge shards
The two active shards to be merged in a topic must be adjacent to each other. For more information about the two adjacent shards of a shard, see the result that is returned by the listShard method.
Syntax: MergeShardResult mergeShard(String projectName, String topicName, String shardId, String adjacentShardId)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
shardId: the ID of the shard to be merged.
adjacentShardId: the ID of the shard that is adjacent to the specified shard.
Errors
DatahubClientException
Sample code
public static void mergeShard() {
try {
String shardId = "7";
// The values of the adjacentShardId and shardId parameters must be adjacent. For more information about the adjacent shards of a shard, see the result that is returned by the listShard method.
String adjacentShardId = "8";
MergeShardResult mergeShardResult = datahubClient.mergeShard(Constant.projectName, Constant.topicName, shardId, adjacentShardId);
System.out.println("merge successful");
System.out.println(mergeShardResult.getShardId());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Extend shards
The number of shards to be extended must be greater than or equal to the original shard quantity.
Syntax: ExtendShardResult extendShard(String projectName, String topicName, int shardCount)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
shardCount: the number of shards to be extended.
adjacentShardId: the ID of the shard that is adjacent to the specified shard.
Errors
DatahubClientException
Sample code
public static void extendTopic(String projectName, String topicName, int shardCount) { try { ExtendShardResult extendShardResult = datahubClient.extendShard(projectName, topicName, shardCount); } catch (DatahubClientException e) { System.out.println(e.getErrorMessage()); } }
Read and write data
You can read data from active and closed shards. However, you can write data only to active shards.
Read data
To read data, you must first obtain a cursor and then pass in the cursor value to the getRecords method. Alternatively, you can use the subscription feature of DataHub to directly associate a subscription to consume data. In this case, the server automatically saves consumption offsets. If you want to sample data to view data quality, you can read data.
Obtain a cursor
To read data from a topic, specify a shard and the cursor from which data starts to be read. You can obtain the cursor by using the following methods: OLDEST, LATEST, SEQUENCE, and SYSTEM_TIME.
OLDEST: the cursor that points to the earliest valid record in the specified shard.
LATEST: the cursor that points to the latest record in the specified shard.
SEQUENCE: the cursor that points to the record of the specified sequence number.
SYSTEM_TIME: the cursor that points to the first record whose timestamp value is greater than or equal to the specified timestamp value.
Select a method to obtain the cursor
The data to be read must be valid, which means that the data must be within the TTL. Otherwise, an error is reported.
Scenario 1: Read data from the beginning of a shard. In this case, we recommend that you use the OLDEST method. If all the data in the shard is valid, data starts to be read from the first record.
Scenario 2: Sample data to check whether the data whose timestamp value is greater than the specified timestamp value is valid. In this case, we recommend that you use the SYSTEM_TIME method. Data starts to be read from the first record that is subsequent to the record at the specified timestamp.
Scenario 3: View the latest data information. In this case, we recommend that you use the LATEST method. You can use this method to read the latest record or the latest N records. To obtain the latest N records, you must first obtain the sequence number of the latest record. Then, identify the previous N records of the latest record. The first sequence number of the previous N records is the sequence number of the latest record minus N.
Syntax: GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type), or GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
shardId: the ID of the shard.
CursorType: the type of cursor.
Errors
DatahubClientException
SeekOutOfRangeException
Sample code
If you want to sample data, convert the time to a timestamp. Then, obtain the cursor.
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
// Convert the time to a timestamp.
String time = "2019-07-01 10:00:00";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timestamp = 0L;
try {
Date date = simpleDateFormat.parse(time);
timestamp = date.getTime(); // Obtain the timestamp that corresponds to the time.
//System.out.println(timestamp);
}
// Obtain the cursor from which data starts to be read after the timestamp.
String timeCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
} catch (ParseException e) {
System.out.println(e.getErrorOffset());
}
}
Read data from the earliest record in the specified shard.
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
/* Use the OLDEST method. */
String oldestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Read the latest data that is written to the specified shard, which involves the following two scenarios:
Read the latest record that is written to the specified shard.
Read the latest N records that are written to the specified shard.
You must first obtain the sequence number of the latest record and then obtain the cursor.
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
/* Use the LATEST method. */
String latestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getCursor();
/* Use the SEQUENCE method. */
// Obtain the sequence number of the latest record.
long seq = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getSequence();
// Obtain the cursor for reading the latest 10 records.
String seqCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
}
catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Data reading method
Syntax: GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit), or GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
shardId: the ID of the shard.
schema: the schema that is required when you read records from a tuple topic.
cursor: the cursor from which data starts to be read.
limit: the maximum number of records to be read.
Errors
DatahubClientException
Sample code
Read records from a tuple topic
public static void example(String projectName,String topicName) {
// The maximum number of records to be read each time.
int recordLimit = 1000;
String shardId = "7";
// Obtain the cursor of the earliest valid record.
// Note: In general, you call the getCursor method only during initialization. After that, you can call the getNextCursor method to continue to consume data.
String cursor = "";
try {
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(projectName, topicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// If no record can be read, pause the thread for 10,000 ms and continue to read records.
Thread.sleep(10000);
continue;
}
for (RecordEntry entry : result.getRecords()) {
TupleRecordData data = (TupleRecordData) entry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
}
// Obtain the next cursor.
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// The cursor is invalid or has expired. Specify another cursor to start consumption.
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());;
}
}
}
Read records from a blob topic
public static void example(String projectName,String topicName) {
// The maximum number of records to be read each time.
int recordLimit = 1000;
String shardId = "7";
// Obtain the cursor of the earliest valid record.
// Note: In general, you call the getCursor method only during initialization. After that, you can call the getNextCursor method to continue to consume data.
String cursor = "";
try {
cursor = datahubClient.getCursor(projectName, blobTopicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(projectName, blobTopicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// If no record can be read, pause the thread for 10,000 ms and continue to read records.
Thread.sleep(10000);
continue;
}
/* Consume data. */
for (RecordEntry record: result.getRecords()){
BlobRecordData data = (BlobRecordData) record.getRecordData();
System.out.println(new String(data.getData()));
}
// Obtain the next cursor.
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// The cursor is invalid or has expired. Specify another cursor to start consumption.
cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}
Write data
In DataHub SDK for Java V2.12 and later, the server supports the PutRecordsByShardResult method. In versions earlier than V2.12, the server supports the putRecords method. To call the putRecordsByShard method, you must specify the shard to which you want to write data. Otherwise, data is written to the first active shard by default. The input parameters of the preceding two methods are a list of records of the same type, such as the tuple or blob type. DataHub SDK for Java allows you to write data by shard by calling the putRecordsByShard
method or write data in hybrid mode by calling the putRecords
method. You can write data to DataHub by shard in DataHub SDK for Java V2.12 and later. If you call the putRecords
method to write data to DataHub, you must check the return result to determine whether data is written to DataHub. If you call the putRecordsByShard
method to write data to DataHub but data fails to be written, an error is reported. If your server supports the putRecordsByShard method, we recommend that you use the putRecordsByShard
method.
Syntax: PutRecordsResult putRecords(String projectName, String topicName, List records), or PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
shardId: the ID of the shard.
records: the list of records to be written to DataHub.
Errors
DatahubClientException
Write records to a tuple topic
// Write tuple records.
public static void tupleExample(String project,String topic,int retryTimes) {
// Obtain the schema.
RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
// Generate 10 records.
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// You can specify additional attributes, such as the IP address and machine name of the server, for each record. If you do not specify additional attributes, data writing is not affected.
recordEntry.addAttribute("key1", "value1");
TupleRecordData data = new TupleRecordData(recordSchema);
data.setField("field1", "HelloWorld");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntries.add(recordEntry);
}
try {
PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
int i = result.getFailedRecordCount();
if (i > 0) {
retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
}
} catch (DatahubClientException e) {
System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
}
}
// The retry mechanism.
public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
boolean suc = false;
while (retryTimes != 0) {
retryTimes = retryTimes - 1;
PutRecordsResult recordsResult = client.putRecords(project, topic, records);
if (recordsResult.getFailedRecordCount() > 0) {
retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
}
suc = true;
break;
}
if (!suc) {
System.out.println("retryFailure");
}
}
'''Java
<br />
<br />** Write records to a blob topic**<br />
'''Java
// Write blob records.
public static void blobExample() {
// Generate 10 records.
List<RecordEntry> recordEntries = new ArrayList<>();
String shardId = "4";
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// Specify additional attributes for each record.
recordEntry.addAttribute("key1", "value1");
BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
recordEntry.setShardId("0");
}
while (true) {
try {
// In DataHub SDK for Java V2.12 and later, the server supports the PutRecordsByShardResult method. If your SDK version is earlier than V2.12, use the putRecords method.
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
System.out.println("write data successful");
break;
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}
Write records in various modes
If your SDK version is earlier than V2.12, you can write records only by calling the putRecords
method. The RecordEntry
class contains the following three attributes: shardId
, partitionKey
, and hashKey
. You can specify the values of the preceding attributes to determine the shard to which records are written.
In DataHub SDK for Java V2.12 and later, we recommend that you call the putRecordsByShard method to write records. This prevents the performance loss that is caused by repartitioning on the server.
Write records by shard ID. This mode is recommended. Sample code:
RecordEntry entry = new RecordEntry();
entry.setShardId("0");
Write records by hash key. In this mode, specify a 128-bit message-digest algorithm 5 (MD5) value. If you write records by hash key, the values of the BeginHashKey and EndHashKey parameters are used to determine the shard to which records are written. Sample code:
RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");
Write records by partition key. In this mode, specify a string as the partition key. Then, the shard to which records are written is determined based on the MD5 value of the string and the values of the BeginHashKey and EndHashKey parameters. Sample code:
RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");
Query metering information
Query metering information
Syntax: GetMeterInfoResult getMeterInfo(String projectName, String topicName, String shardId)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
shardId: the ID of the shard.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void getMeter(String projectName,String topicName) {
String shardId = "5";
try {
GetMeterInfoResult getMeterInfoResult = datahubClient.getMeterInfo(projectName, topicName, shardId);
System.out.println("get meter successful");
System.out.println(getMeterInfoResult.getActiveTime() + "\t" + getMeterInfoResult.getStorage());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Manage subscriptions
DataHub allows the server to save the consumption offsets of a subscription. You can obtain highly available offset storage services by performing simple configurations.
Create a subscription
Syntax: CreateSubscriptionResult createSubscription(String projectName, String topicName, String comment)
The comments on a subscription are in the following format: {"application":"Application","description":"Description"}.
Parameters
projectName: the name of the project.
topicName: the name of the topic.
comment: the comments on the subscription.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void createSubscription(String projectName,String topicName) {
try {
CreateSubscriptionResult createSubscriptionResult = datahubClient.createSubscription(projectName, topicName, Constant.subscribtionComment);
System.out.println("create subscription successful");
System.out.println(createSubscriptionResult.getSubId());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Delete a subscription
Syntax: DeleteSubscriptionResult deleteSubscription(String projectName, String topicName, String subId)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
subId: the ID of the subscription.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void deleteSubscription(String projectName,String topicName,String subId) {
try {
datahubClient.deleteSubscription(projectName, topicName, subId);
System.out.println("delete subscription successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Update a subscription
You can update only the comments on an existing subscription.
Syntax: UpdateSubscriptionResult updateSubscription(String projectName, String topicName, String subId, String comment)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
subId: the ID of the subscription.
comment: the comments to be updated.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void updateSubscription(String projectName, String topicName, String subId, String comment){
try {
datahubClient.updateSubscription(projectName,topicName,subId,comment)
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
List subscriptions
The pageNum and pageSize parameters of the listSubscription method specify the range of subscriptions to be listed. For example, you can set the pageNum and pageSize parameters to 1 and 10 to list the first 10 subscriptions. For another example, you can set the pageNum and pageSize parameters to 2 and 5 to list the sixth to tenth subscriptions.
Syntax: ListSubscriptionResult listSubscription(String projectName, String topicName, int pageNum, int pageSize)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
pageNum: the number of the page to return.
pageSize: the number of entries to return on each page.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
Sample code
public static void listSubscription(String projectName, String topicName, int pageNum, int pageSize) {
try {
ListSubscriptionResult listSubscriptionResult = datahubClient.listSubscription(projectName, topicName, pageNum, pageSize);
if (listSubscriptionResult.getSubscriptions().size() > 0) {
System.out.println(listSubscriptionResult.getTotalCount());
System.out.println(listSubscriptionResult.getSubscriptions().size());
for (SubscriptionEntry entry : listSubscriptionResult.getSubscriptions()) {
System.out.println(entry.getSubId() + "\t"
+ entry.getState() + "\t"
+ entry.getType() + "\t"
+ entry.getComment());
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Query a subscription
Syntax: GetSubscriptionResult getSubscription(String projectName, String topicName, String subId)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
subId: the ID of the subscription.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
public static void getSubscription(String projectName, String topicName, String subId) {
try {
GetSubscriptionResult getSubscriptionResult = datahubClient.getSubscription(projectName, topicName, subId);
System.out.println(getSubscriptionResult.getSubId() + "\t"
+ getSubscriptionResult.getState() + "\t"
+ getSubscriptionResult.getType() + "\t"
+ getSubscriptionResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Update the status of a subscription
A subscription can be in the OFFLINE or ONLINE state, which indicates an offline or online subscription.
Syntax: UpdateSubscriptionStateResult updateSubscriptionState(String projectName, String topicName, String subId, SubscriptionState state)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
subId: the ID of the subscription.
state: the state to be updated.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void updateSubscriptionState(String projectName, String topicName,String subId) {
try {
datahubClient.updateSubscriptionState(projectName, topicName, subId, SubscriptionState.ONLINE);
System.out.println("update subscription state successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Manage offsets
After a subscription is created, it is initially unconsumed. To use the offset storage feature of the subscription, perform the following operations on offsets:
Initialize an offset
To initialize an offset, you need to call the openSubscriptionSession method only once. The second time you call this method, a new consumption session ID is generated. In this case, the previous session becomes invalid, and you cannot submit offsets.
Syntax: OpenSubscriptionSessionResult openSubscriptionSession(String projectName, String topicName, String subId, List shardIds)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
subId: the ID of the subscription.
shardIds: the IDs of shards.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void openSubscriptionSession(String projectName, String topicName) {
shardId = "4";
shardIds = new ArrayList<String>();
shardIds.add("0");
shardIds.add("4");
try {
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
System.out.println(subscriptionOffset.getSessionId() + "\t"
+ subscriptionOffset.getVersionId() + "\t"
+ subscriptionOffset.getSequence());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Obtain an offset
Syntax: GetSubscriptionOffsetResult getSubscriptionOffset(String projectName, String topicName, String subId, List shardIds)
The return result of the getSubscriptionOffset method is a GetSubscriptionOffsetResult object, which is basically the same as the return result of the openSubscriptionSession method. However, the GetSubscriptionOffsetResult object does not contain the session ID of the offset. You can call the getSubscriptionOffset method only to view the information about the offset.
Parameters
projectName: the name of the project.
topicName: the name of the topic.
subId: the ID of the subscription.
shardIds: the IDs of shards.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
// Obtain an offset.
public static void getSubscriptionOffset(String projectName, String topicName,String subId) {
shardId = "4";
shardIds = new ArrayList<String>();
shardIds.add("0");
shardIds.add("4");
try {
GetSubscriptionOffsetResult getSubscriptionOffsetResult = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = getSubscriptionOffsetResult.getOffsets().get(shardId);
System.out.println(subscriptionOffset.getVersionId() + "\t"
+ subscriptionOffset.getSequence());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Submit an offset
Syntax: CommitSubscriptionOffsetResult commitSubscriptionOffset(String projectName, String topicName, String subId, Map offsets)
When you submit an offset, DataHub verifies the values of the versionId and sessionId parameters. Make sure that the values are the same as those in the current session. The offset information to be submitted is not limited. We recommend that you enter the actual sequence number and timestamp of the record.
Parameters
projectName: the name of the project.
topicName: the name of the topic.
subId: the ID of the subscription.
offsets: the offset map of shards.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
SubscriptionOffsetResetException
SubscriptionSessionInvalidException
SubscriptionOfflineException
Sample code
// Submit an offset.
public static void commitSubscriptionOffset(String projectName, String topicName,String subId) {
while (true) {
try {
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// The sample code is used only for testing. For the complete code, see the sample code that provides an example on how to consume data from a saved consumption offset and submit offsets during consumption.
subscriptionOffset.setSequence(10);
subscriptionOffset.setTimestamp(100);
Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
offsets.put(shardId, subscriptionOffset);
// Submit the offset.
datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}
Reset an offset
Syntax: ResetSubscriptionOffsetResult resetSubscriptionOffset(String projectName, String topicName, String shardId, Map offsets)
You can reset an offset to a specific point in time. If multiple records are involved at this point in time, the reset offset points to the first record that is involved at this point in time. After an offset is reset, the offset information changes, and the version ID is updated. If a task that is running submits offsets by using the previous version ID, the SubscriptionOffsetResetException error is reported. You can call the getSubscriptionOffset method to obtain a new version ID.
Parameters
projectName: the name of the project.
topicName: the name of the topic.
subId: the ID of the subscription.
offsets: the offset map of shards.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
// Reset an offset.
public static void resetSubscriptionOffset(String projectName, String topicName) throws ParseException {
List<String> shardIds = Arrays.asList("0");
// Specify the time to which you want to reset the offset and convert the time to a timestamp.
String time = "2019-07-09 10:00:00";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(time);
long timestamp = date.getTime(); // Obtain the timestamp that corresponds to the time.
long sequence = client.getCursor(projectName, topicName, subId, CursorType.SYSTEM_TIME, timestamp).getSequence();
SubscriptionOffset offset = new SubscriptionOffset();
offset.setTimestamp(timestamp);
offset.setSequence(sequence);
Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
for (String shardId : shardIds) {
offsets.put(shardId, offset);
}
try {
datahubClient.resetSubscriptionOffset(projectName, topicName, subId, offsets);
System.out.println("reset successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Associate a subscription to consume data in DataHub
Similar to reading data from DataHub, you can associate a subscription to consume data in DataHub. A subscription saves consumption offsets. You can select consumption offsets as needed.
Usage notes:
Call the openSubscriptionSession method to initialize an offset and obtain the version ID and session ID of this subscription. You can call this method only once to initialize an offset. If you call this method for more than once, the previous session becomes invalid. In this case, you cannot submit offsets.
Call the getCursor method to obtain the offset of a record in a subscription to consume data. After you consumes the first record, call the getNextCursor method to obtain the offset of a next record and continue to consume data.
Call the commitSubscriptionOffset method to submit an offset. When you submit an offset, the version ID and session ID of this subscription need to be verified. Therefore, make sure that the version ID and session ID are the same as those in the current session.
// The following sample code provides an example on how to consume data from a saved offset and submit offsets during consumption.
public static void example(String projectName, String topicName,String subId) {
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1. Obtain the cursor of the record at the current offset. If the record expires or is not consumed, obtain the cursor of the first record within the TTL of the topic.
String cursor = null;
// If the sequence number is smaller than 0, the record is not consumed.
if (subscriptionOffset.getSequence() < 0) {
// Obtain the cursor of the first record within the TTL of the topic.
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// Obtain the cursor of the next record.
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
// If the SeekOutOfRange error is returned after you obtain the cursor based on the sequence number, the record of the current cursor expires.
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// Obtain the cursor of the first record within the TTL of the topic.
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2. Read records and save offsets. For example, read tuple records and save an offset each time 1,000 records are read.
long recordCount = 0L;
// Read 10 records each time.
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// If no record can be read, pause the thread for 1,000 ms and continue to read records.
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
// Consume data.
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// Save the offset after the data is consumed.
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
// Submit the offset.
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | OffsetSessionChangedException e) {
// The subscription session is exited. Offline: The subscription is offline. SessionChange: The subscription is also used on other clients.
break;
} catch (OffsetResetedException e) {
// The offset is reset. You must obtain the offset information of the subscription again. In this example, the sequence number is rest.
// If the timestamp is rest, you must use the CursorType.SYSTEM_TIME parameter to obtain the cursor.
subscriptionOffset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (DatahubClientException e) {
// TODO: Specify whether to exit when an error occurs.
} catch (Exception e) {
break;
}
}
}
Manage DataConnectors
A DataConnector in DataHub synchronizes streaming data from DataHub to other cloud services. You can use DataConnectors to synchronize data from DataHub topics to MaxCompute, Object Storage Service (OSS), ApsaraDB RDS for MySQL, Tablestore, Elasticsearch, and Function Compute in real-time or near real-time mode. After DataConnectors are configured, the data you write to DataHub can be used in other Alibaba Cloud services.
Create a DataConnector
Syntax: CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, List columnFields, SinkConfig config), or CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, long sinkStartTime, List columnFields, SinkConfig config)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
ConnectorType: the type of DataConnector that you want to create.
columnFields: the fields that you want to synchronize.
sinkStartTime: the time at which data starts to be synchronized to DataHub. Unit: milliseconds.
config: the configuration details of the specific type of DataConnector.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to MaxCompute:
public static void createConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
SinkOdpsConfig config = new SinkOdpsConfig() {{
setEndpoint(Constant.odps_endpoint);
setProject(Constant.odps_project);
setTable(Constant.odps_table);
setAccessId(Constant.odps_accessId);
setAccessKey(Constant.odps_accessKey);
setPartitionMode(PartitionMode.SYSTEM_TIME);
setTimeRange(60);
}};
// Specify the partition format.
SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
addConfig("ds", "%Y%m%d");
addConfig("hh", "%H");
addConfig("mm", "%M");
}};
config.setPartitionConfig(partitionConfig);
try {
// Create a DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ODPS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to OSS:
public static void createOssConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
SinkOssConfig config = new SinkOssConfig() {{
setAccessId(Constant.oss_accessId);
setAccessKey(Constant.oss_accessKey);
setAuthMode(AuthMode.STS);
setBucket(Constant.oss_bucket);
setEndpoint(Constant.oss_endpoint);
setPrefix(Constant.oss_prefix);
setTimeFormat(Constant.oss_timeFormat);
setTimeRange(60);
}};
try {
// Create a DataConnector.
datahubClient.createConnector(projectName,topicName, ConnectorType.SINK_OSS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to Tablestore:
public static void createOtsConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkOtsConfig config = new SinkOtsConfig() {{
setAccessId(Constant.ots_accessId);
setAccessKey(Constant.ots_accessKey);
setEndpoint(Constant.ots_endpoint);
setInstance(Constant.ots_instance);
setTable(Constant.ots_table);
setAuthMode(AuthMode.AK);
}};
try {
// Create a DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_OTS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to Hologres:
public static void createHoloConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkHologresConfig config = new SinkHologresConfig() {{
setAccessId(Constant.accessId);
setAccessKey(Constant.accessKey);
setProjectName(Constant.projectName);
setTopicName(Constant.topicName);
setAuthMode(AuthMode.AK);
setInstanceId(Constant.instanceId);
// Set the timestamp unit.
setTimestampUnit(TimestampUnit.MILLISECOND);
}};
try {
// Create a DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_HOLOGRES, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to Elasticsearch:
public static void createEsConnector(String projectName,String topicName){
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkEsConfig config = new SinkEsConfig() {{
setEndpoint(Constant.es_endpoint);
setIdFields(Constant.es_fields);
setIndex(Constant.es_index);
setPassword(Constant.es_password);
setProxyMode(Constant.es_proxyMode);
setTypeFields(Constant.es_typeFields);
setUser(Constant.es_user);
}};
try {
// Create a DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ES, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to Function Compute:
public static void createFcConnector(String projectName,String topicName){
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkFcConfig config = new SinkFcConfig() {{
setEndpoint(Constant.fc_endpoint);
setAccessId(Constant.fc_accessId);
setAccessKey(Constant.fc_accessKey);
setAuthMode(AuthMode.AK);
setFunction(Constant.fc_function);
setService(Constant.fc_service);
}};
try {
// Create a DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_FC, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to a MySQL database:
public static void createMysqlConnector(String projectName,String topicName){
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkMysqlConfig config = new SinkMysqlConfig() {{
setDatabase( Constant.mysql_database);
setHost(Constant.mysql_host);
setInsertMode(InsertMode.OVERWRITE);
setPassword(Constant.mysql_password);
setPort(Constant.mysql_port);
setTable(Constant.mysql_table);
setUser(Constant.mysql_user);
}};
try {
// Create a DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_MYSQL, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Delete a DataConnector
Syntax: DeleteConnectorResult deleteConnector(String projectName, String topicName, ConnectorType connectorType)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
ConnectorType: the type of DataConnector that you want to delete.
columnFields: the fields that you want to synchronize.
sinkStartTime: the time at which data starts to be synchronized to DataHub. Unit: milliseconds.
config: the configuration details of the specific type of DataConnector.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void deleteConnector(String projectName,String topicName) {
try {
datahubClient.deleteConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println("delete connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Query a DataConnector
Syntax: GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName,topicName,ConnectorType.SINK_ODPS)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
ConnectorType: the type of DataConnector that you want to query.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void getConnector(String projectName,String topicName) {
try {
GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println(getConnectorResult.getState() + "\t" + getConnectorResult.getSubId());
for (String fieldName : getConnectorResult.getColumnFields()) {
System.out.println(fieldName);
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Update a DataConnector
You can update the configurations of a DataConnector.
Syntax: UpdateConnectorResult updateConnector(String projectName, String topicName, ConnectorType connectorType, SinkConfig config)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
ConnectorType: the type of DataConnector that you want to update.
config: the configuration details of the specific type of DataConnector.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void updateConnector(String projectName,String topicName) {
SinkOdpsConfig config = (SinkOdpsConfig) datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS).getConfig();
// Change the AccessKey pair.
config.setTimeRange(100);
config.setAccessId(accessId);
config.setAccessKey(accessKey);
// Modify the timestamp type.
config.setTimestampUnit(ConnectorConfig.TimestampUnit.MICROSECOND);
try {
datahubClient.updateConnector(projectName, topicName, ConnectorType.SINK_ODPS, config);
System.out.println("update connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Update the fields to be synchronized by using a DataConnector
Syntax: UpdateConnectorResult updateConnector(String projectName, String topicName, String connectorId, List columnFields)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
connectorId: the ID of the DataConnector that you want to update.
columnFields: the fields that you want to synchronize.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void updateConnector(String projectName,String topicName) {
String connectorId = "";
// The columnField parameter specifies all the fields to be synchronized to the downstream, which include but are not limited to the newly added fields.
List<String> columnField = new ArrayList<>();
columnField.add("f1");
try {
batchClient.updateConnector(projectName, topicName,connectorId,columnField);
System.out.println("update connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Update the status of a DataConnector
Syntax: UpdateConnectorStateResult updateConnectorState(String projectName, String topicName, ConnectorType connectorType, ConnectorState connectorState)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
ConnectorType: the type of the DataConnector.
connectorState: the state of the DataConnector. Valid values: STOPPED and RUNNING.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void updateConnectorState(String projectName,String topicName) {
try {
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
System.out.println("update connector state successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Update the offset of a DataConnector
Syntax: UpdateConnectorOffsetResult updateConnectorOffset(String projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
ConnectorType: the type of the DataConnector.
shardId: the ID of the shard. If the shardID parameter is set to null, the offsets of all shards are updated.
offset: the offset of the DataConnector.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void updateConnectorOffset(String projectName,String topicName) {
ConnectorOffset offset = new ConnectorOffset() {{
setSequence(10);
setTimestamp(1000);
}};
try {
// Before you update the offset of a DataConnector, stop the DataConnector.
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
datahubClient.updateConnectorOffset(projectName, topicName, ConnectorType.SINK_ODPS, shardId, offset);
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
System.out.println("update connector offset successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
List DataConnectors
Syntax: ListConnectorResult listConnector(String projectName, String topicName)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void listConnector(String projectName,String topicName) {
try {
ListConnectorResult listConnectorResult = datahubClient.listConnector(projectName, topicName);
for (String cName : listConnectorResult.getConnectorNames()) {
System.out.println(cName);
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Query the shard status of a DataConnector
Syntax: GetConnectorShardStatusResult getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType), or ConnectorShardStatusEntry getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType, String shardId)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
ConnectorType: the type of the DataConnector.
shardId: the ID of the shard.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void getConnectorShardStatusByShard(String projectName,String topicName,String shardId) {
try {
ConnectorShardStatusEntry connectorShardStatusEntry = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
System.out.println(connectorShardStatusEntry.getState() + "\t"
+ connectorShardStatusEntry.getCurrSequence() + "\t"
+ connectorShardStatusEntry.getDiscardCount() + "\t"
+ connectorShardStatusEntry.getUpdateTime());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
public static void getConnectorShardStatus(String projectName,String topicName) {
try {
GetConnectorShardStatusResult getConnectorShardStatusResult = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS);
for (Map.Entry<String, ConnectorShardStatusEntry> entry : getConnectorShardStatusResult.getStatusEntryMap().entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue().getState() + "\t"
+ entry.getValue().getCurrSequence() + "\t"
+ entry.getValue().getDiscardCount() + "\t"
+ entry.getValue().getUpdateTime());
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Restart a DataConnector
Syntax: ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType), or ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType, String shardId)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
ConnectorType: the type of the DataConnector.
shardId: the ID of the shard.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void reloadConnector(String projectName,String topicName ) {
try {
datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println("reload connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
public static void reloadConnectorByShard(String projectName,String topicName,String shardId) {
try {
datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
System.out.println("reload connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Query the completion time of a DataConnector
Syntax: GetConnectorDoneTimeResult getConnectorDoneTime(String projectName, String topicName, ConnectorType connectorType)
Parameters
projectName: the name of the project.
topicName: the name of the topic.
ConnectorType: the type of the DataConnector.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void getDoneTime(String projectName,String topicName ) {
try {
GetConnectorDoneTimeResult getConnectorDoneTimeResult = datahubClient.getConnectorDoneTime(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println(getConnectorDoneTimeResult.getDoneTime());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Update the VPC whitelist
Syntax: UpdateProjectVpcWhitelistResult updateProjectVpcWhitelist(String projectName, String vpcIds)
Parameters
projectName: the name of the project.
vpcids: the IDs of the virtual private clouds (VPCs).
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void updateProjectVpcWhitelist(String projectName) {
String vpcid = "12345";
try {
datahubClient.updateProjectVpcWhitelist(projectName, vpcid);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Add a field
Syntax: AppendConnectorFieldResult appendConnectorField(String projectName, String topicName, ConnectorType connectorType, String fieldName)
You can add a field to be synchronized by using a DataConnector, provided that a MaxCompute table contains the field.
Parameters
projectName: the name of the project.
topicName: the name of the topic.
ConnectorType The type of DataConnector.
fieldName: the name of the field to be added. The field can be set to null.
Errors
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Sample code
public static void appendConnectorField(String projectName,String topicName) {
String newField = "newfield";
try {
// Both the topic and the MaxCompute table contain the field to be added. In addition, the schema for the topic is the same as that of the MaxCompute table.
datahubClient.appendConnectorField(projectName, topicName, ConnectorType.SINK_ODPS, newField);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Manage multiple objects at a time
We recommend that you use the command-line tool in the DataHub console.