全部產品
Search
文件中心

MaxCompute:多線程上傳樣本

更新時間:Feb 28, 2024

本文通過程式碼範例向您介紹如何使用TableTunnel介面實現多線程上傳。

import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import com.aliyun.odps.Column;
 import com.aliyun.odps.Odps;
 import com.aliyun.odps.PartitionSpec;
 import com.aliyun.odps.TableSchema;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.data.RecordWriter;
 import com.aliyun.odps.tunnel.TableTunnel;
 import com.aliyun.odps.tunnel.TunnelException;
 import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
 class UploadThread implements Callable<Boolean> {
         private long id;
         private RecordWriter recordWriter;
         private Record record;
         private TableSchema tableSchema;
         public UploadThread(long id, RecordWriter recordWriter, Record record,
                         TableSchema tableSchema) {
                 this.id = id;
                 this.recordWriter = recordWriter;
                 this.record = record;
                 this.tableSchema = tableSchema;
         }
         @Override
         public Boolean call() {
                 for (int i = 0; i < tableSchema.getColumns().size(); i++) {
                         Column column = tableSchema.getColumn(i);
                         switch (column.getType()) {
                         case BIGINT:
                                 record.setBigint(i, 1L);
                                 break;
                         case BOOLEAN:
                                 record.setBoolean(i, true);
                                 break;
                         case DATETIME:
                                 record.setDatetime(i, new Date());
                                 break;
                         case DOUBLE:
                                 record.setDouble(i, 0.0);
                                 break;
                         case STRING:
                                 record.setString(i, "sample");
                                 break;
                         default:
                                 throw new RuntimeException("Unknown column type: "
                                                 + column.getType());
                         }
                 }
                boolean success = true;
                try {
	            for (int i = 0; i < 10; i++) {
                         recordWriter.write(record);
	            }
                } catch (IOException e) {
 	            success = false;
                    e.printStackTrace();
                } finally {
 	            recordWriter.close();
                }
                    return success;
               
 }
 public class UploadThreadSample {
         // 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運,請登入RAM控制台建立RAM使用者
         // 此處以把AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。您也可以根據業務需要,儲存到設定檔裡
         // 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險
			 	 private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
				 private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
         private static String odpsUrl = "<http://service.odps.aliyun.com/api>";
         private static String tunnelUrl = "<http://dt.cn-shanghai.maxcompute.aliyun-inc.com>";
         //設定tunnelUrl,若需要使用內網時必須設定,否則預設公網。此處給的是華東2傳統網路Tunnel Endpoint,其他Region請參見Endpoint。
         private static String project = "<your project>";
         private static String table = "<your table name>";
         private static String partition = "<your partition spec>";
         private static int threadNum = 10;
         public static void main(String args[]) {
                 Account account = new AliyunAccount(accessId, accessKey);
                 Odps odps = new Odps(account);
                 odps.setEndpoint(odpsUrl);
                 odps.setDefaultProject(project);
                 try {
                         TableTunnel tunnel = new TableTunnel(odps);
                         tunnel.setEndpoint(tunnelUrl);//tunnelUrl設定
                         PartitionSpec partitionSpec = new PartitionSpec(partition);
                         UploadSession uploadSession = tunnel.createUploadSession(project,
                                         table, partitionSpec);
                         System.out.println("Session Status is : "
                                         + uploadSession.getStatus().toString());
                         ExecutorService pool = Executors.newFixedThreadPool(threadNum);
                         ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
                         for (int i = 0; i < threadNum; i++) {
                                 RecordWriter recordWriter = uploadSession.openRecordWriter(i);
                                 Record record = uploadSession.newRecord();
                                 callers.add(new UploadThread(i, recordWriter, record,
                                                 uploadSession.getSchema()));
                         }
                         pool.invokeAll(callers);
                         pool.shutdown();
                         Long[] blockList = new Long[threadNum];
                         for (int i = 0; i < threadNum; i++)
                                 blockList[i] = Long.valueOf(i);
                         uploadSession.commit(blockList);
                         System.out.println("upload success!");
                 } catch (TunnelException e) {
                         e.printStackTrace();
                 } catch (IOException e) {
                         e.printStackTrace();
                 } catch (InterruptedException e) {
                         e.printStackTrace();
                 }
         }
 }
說明

對於Tunnel Endpoint,支援指定或者不指定。

  • 如果指定,按照指定的Endpoint下載。

  • 如果不指定,預設為公網Endpoint。

  • 文中使用的是華東2傳統網路Tunnel Endpoint,其他region的Tunnel Endpoint設定可以參考文檔配置Endpoint