全部产品
Search
文档中心

云原生大数据计算服务 MaxCompute:使用Go SDK

更新时间:Oct 22, 2024

成功构建ODPS对象后,您可对项目空间下的Tables和Instances等对象执行后续操作,包括SQL操作、数据上传/下载、表/分区管理,以及Instance管理等。

您可使用访问凭证配置方式中的任意一种方法来创建ODPS对象,为了方便,本文的示例代码中,均使用config.ini中加载AK的方法。

执行SQL

您可通过SQLTask对象的run方法或MaxCompute SQL Driver执行各类MaxCompute SQL。

通过SDK执行SQL

您可通过SQLTask对象的run方法执行各类MaxCompute SQL,该方法会返回Instance对象。当执行SELECT语句时,如果查询结果大于10000行数据,需要使用Tunnel下载全部的查询结果。当查询结果小于10000行数据时,可以直接从Instance对象获取查询结果。下面将以SELECT语句为例介绍SQL执行方法。

示例一:执行SELECT并获取查询结果

当查询结果小于10000行数据时,查询结果可用CSV Reader的形式直接读取。

package main

import (
	"fmt"
	"io"
	"log"

	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1>0 or p2 > '';"

	// SQL引擎参数, 例如odps.sql.skewjoin 
	var hints map[string]string = nil

	sqlTask := odps.NewSqlTask("select_demo", sql, hints)
	ins, err := sqlTask.Run(odpsIns, odpsIns.DefaultProjectName())
	if err != nil {
		log.Fatalf("%+v", err)
	}

	err = ins.WaitForSuccess()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	csvReader, err := sqlTask.GetSelectResultAsCsv(ins, true)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for {
		record, err := csvReader.Read()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("%+v", err)
		}
		fmt.Printf("%v\n", record)
	}
}

示例二:执行SELECT,通过Tunnel获取查询结果

当查询结果大于10000行数据时,查询结果需要通过Tunnel获取。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// 配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
	    log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1 = 20 and p2 = 'hangzhou';"

	// SQL引擎参数, 例如odps.sql.skewjoin
	var hints map[string]string = nil

	sqlTask := odps.NewSqlTask("select_demo", sql, hints)

	// 使用项目关联的配额运行SQL
	projectName := odpsIns.DefaultProjectName()

	ins, err := sqlTask.Run(odpsIns, projectName)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	err = ins.WaitForSuccess()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 生成logView以获取作业详细信息
	lv := odpsIns.LogView()
	lvUrl, err := lv.GenerateLogView(ins, 10)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(lvUrl)

	project := odpsIns.DefaultProject()
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 创建Tunnel实例
	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
	session, err := tunnelIns.CreateInstanceResultDownloadSession(project.Name(), ins.Id())
	if err != nil {
		log.Fatalf("%+v", err)
	}

	start := 0
	step := 200000
	recordCount := session.RecordCount()
	schema := session.Schema()
	total := 0

	for start < recordCount {
		reader, err := session.OpenRecordReader(start, step, 0, nil)
		if err != nil {
			log.Fatalf("%+v", err)
		}

		count := 0
		err = reader.Iterator(func(record data.Record, _err error) {
			count += 1

			if _err != nil {
				return
			}
			
			for i, d := range record {
				if d == nil {
					fmt.Printf("%s=null", schema.Columns[i].Name)
				} else {
					fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
				}
			
				if i < record.Len()-1 {
					fmt.Printf(", ")
				} else {
					fmt.Println()
				}
			}
		})

		if err != nil {
			log.Fatalf("%+v", err)
		}

		start += count
		total += count
		log.Println(count)
		if err = reader.Close(); err != nil {
			log.Fatalf("%+v", err)
		}
	}

	println("total count ", total)
}

通过MaxCompute SQL Driver执行SQL

示例一:执行CREATE TABLE语句

package main

import (
	"database/sql"
	"log"
)

func main() {
	// 在环境变量中设置AK: ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECURITY
	dsn := "http://<endpoint>?project=<project>&odps.sql.type.system.odps2=true&odps.sql.decimal.odps2=true"

	db, err := sql.Open("odps", dsn)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	sqlStr := "create table table_with_date (date_col DATE);"
	_, err = db.Exec(sqlStr)
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

示例二:执行SELECT语句,并获取结果

package main

import (
	"database/sql"
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/sqldriver"
	"log"
	"reflect"
)

func main() {
	config, err := sqldriver.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	dsn := config.FormatDsn()
	// or dsn := "http://<accessId>:<accessKey>@<endpoint>?project=<project>"

	db, err := sql.Open("odps", dsn)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	selectSql := "select * from all_types_demo where bigint_type=@bigint_type and p1=@p1 and p2='@p2';"

	rows, err := db.Query(
		selectSql,
		sql.Named("bigint_type", 100000000000),
		sql.Named("p1", 20),
		sql.Named("p2", "hangzhou"),
	)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	columnTypes, err := rows.ColumnTypes()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	record := make([]interface{}, len(columnTypes))

	for i, columnType := range columnTypes {
		record[i] = reflect.New(columnType.ScanType()).Interface()
		t := reflect.TypeOf(record[i])

		fmt.Printf("kind=%s, name=%s\n", t.Kind(), t.String())
	}

	columns, err := rows.Columns()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for rows.Next() {
		err = rows.Scan(record...)
		if err != nil {
			log.Fatalf("%+v", err)
		}

		for i, r := range record {
			rr := r.(sqldriver.NullAble)

			if rr.IsNull() {
				fmt.Printf("%s=NULL", columns[i])
			} else {
				switch r.(type) {
				case *sqldriver.NullInt8:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt8).Int8)
				case *sqldriver.NullInt16:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt16).Int16)
				case *sqldriver.NullInt32:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt32).Int32)
				case *sqldriver.NullInt64:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt64).Int64)
				case *sqldriver.Binary:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullFloat32:
					fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat32).Float32)
				case *sqldriver.NullFloat64:
					fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat64).Float64)
				case *sqldriver.Decimal:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullString:
					fmt.Printf("%s=%s", columns[i], r.(*sqldriver.NullString).String)
				case *sqldriver.NullDate:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullDateTime:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullTimeStamp:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullBool:
					fmt.Printf("%s=%v", columns[i], r.(*sqldriver.NullBool).Bool)
				case *sqldriver.Map:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Array:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Struct:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Json:
					fmt.Printf("%s=%s", columns[i], r)
				}
			}

			if i < len(record)-1 {
				fmt.Printf(", ")
			} else {
				fmt.Print("\n\n")
			}
		}
	}
}

数据上传与下载

您可使用Tunnel对表/分区的数据进行批量上传与下载,也可通过流式数据通道将数据写入表/分区。

初始化Tunnel

Tunnel的初始化示例代码如下。您可使用访问凭证配置方式中的任意一种方法来创建ODPS对象,为了方便,接下来的示例代码中,都使用config.ini中加载AK的方法。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// 从配置文件中获取配置信息
	conf, err := odps.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 初始化ODPS
	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)
	project := odpsIns.DefaultProject()

	// 获取Tunnel Endpoint
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
	
	// 初始化Tunnel
	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
	
	println("%+v", tunnelIns)
}

批量数据上传

上传表或分区数据时,在初始化Tunnel后,需要进行如下操作:

  1. 创建UploadSession,指定要将数据上传到哪张表/分区,以及指定上传数据使用的压缩算法等。

  2. 使用UploadSession创建Writer,Writer进行数据上传,一个Writer上传的数据被称为一个Block,使用一个INT类型值作为Block ID。为了提高上传速度,可以创建多个Writer进行并发数据上传。

  3. Writer上传数据完毕后,需要调用UploadSession.commit来最终完成上传,commit需要指定Block ID列表。

    package main
    
    import (
    	"fmt"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
    	"log"
    )
    
    func main() {
    	// 从配置文件中获取配置信息
    	conf, err := odps.NewConfigFromIni("./config.ini")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	// 初始化ODPS
    	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
    	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
    	odpsIns.SetDefaultProjectName(conf.ProjectName)
    	project := odpsIns.DefaultProject()
    
    	// 初始化Tunnel
    	tunnelEndpoint, err := project.GetTunnelEndpoint()
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
    	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
    
            // 创建一个UploadSession,指定要写入的表/分区
    	session, err := tunnelIns.CreateUploadSession(
    		project.Name(),
    		"all_types_demo",
    		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
    		tunnel.SessionCfg.WithDefaultDeflateCompressor(),
    	)
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	writerNum := 3
    	blockIds := make([]int, writerNum)
    	for i := 0; i < writerNum; i++ {
    		blockIds[i] = i
    	}
    
    	errChan := make(chan error, writerNum)
    
    	// 通过多个writer并发上传数据,每个writer都有一个blockId作为它写入的数据的身份
    	for _, blockId := range blockIds {
    		blockId := blockId
    		go func() {
    			schema := session.Schema()
    			record, err := makeRecord(schema)
    			if err != nil {
    				errChan <- err
    				return
    			}
    
    			recordWriter, err := session.OpenRecordWriter(blockId)
    			if err != nil {
    				errChan <- err
    				return
    			}
    
    			for i := 0; i < 100; i++ {
    				err = recordWriter.Write(record)
    
    				if err != nil {
    					_ = recordWriter.Close()
    					errChan <- err
    					return
    				}
    			}
    			err = recordWriter.Close()
    			if err == nil {
    				fmt.Printf("success to upload %d record, %d bytes\n", recordWriter.RecordCount(), recordWriter.BytesCount())
    			}
    
    			errChan <- err
    		}()
    	}
    
    	// 等待所有Writers完成数据上传
    	for i := 0; i < writerNum; i++ {
    		err := <-errChan
    
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    	}
    
    	// 提交所有Blocks完成上传,即可在表中查看数据
    	err = session.Commit(blockIds)
    	log.Println("success to commit all blocks")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    }
    
    func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
    	varchar, _ := data.NewVarChar(500, "varchar")
    	char, _ := data.NewVarChar(254, "char")
    	s := data.String("hello world")
    	date, _ := data.NewDate("2022-10-19")
    	datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
    	timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")
    
    	mapType := schema.Columns[15].Type.(datatype.MapType)
    	mapData := data.NewMapWithType(mapType)
    	err := mapData.Set("hello", 1)
    	if err != nil {
    		return nil, err
    	}
    
    	err = mapData.Set("world", 2)
    	if err != nil {
    		return nil, err
    	}
    
    	arrayType := schema.Columns[16].Type.(datatype.ArrayType)
    	arrayData := data.NewArrayWithType(arrayType)
    	err = arrayData.Append("a")
    	if err != nil {
    		return nil, err
    	}
    
    	err = arrayData.Append("b")
    	if err != nil {
    		return nil, err
    	}
    
    	structType := schema.Columns[17].Type.(datatype.StructType)
    	structData := data.NewStructWithTyp(structType)
    
    	arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
    	err = arr.Append("x")
    	if err != nil {
    		return nil, err
    	}
    	err = arr.Append("y")
    	if err != nil {
    		return nil, err
    	}
    	err = structData.SetField("arr", arr)
    	if err != nil {
    		return nil, err
    	}
    	err = structData.SetField("name", "tom")
    	if err != nil {
    		return nil, err
    	}
    
    	record := []data.Data{
    		data.TinyInt(1),
    		data.SmallInt(32767),
    		data.Int(100),
    		data.BigInt(100000000000),
    		data.Binary("binary"),
    		data.Float(3.14),
    		data.Double(3.1415926),
    		data.NewDecimal(38, 18, "3.1415926"),
    		varchar,
    		char,
    		s,
    		date,
    		datetime,
    		timestamp,
    		data.Bool(true),
    		mapData,
    		arrayData,
    		structData,
    	}
    
    	return record, nil
    }
    

批量数据下载

下载表/分区数据时,在初始化Tunnel后,需要进行如下操作:

  1. 创建DownloadSession,指定从哪个表、分区下载数据,以及指定传输数据使用的压缩算法等。

  2. 使用DownloadSession创建Reader,Reader可以用分页的形式,分批将数据下载到本地。

    package main
    
    import (
    	"fmt"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
    	"log"
    )
    
    func main() {
    	// 从配置文件中读取配置信息
    	conf, err := odps.NewConfigFromIni("./config.ini")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	// 初始化ODPS
    	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
    	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
    	odpsIns.SetDefaultProjectName(conf.ProjectName)
    	project := odpsIns.DefaultProject()
    
    	// 获取Tunnel Endpoint
    	tunnelEndpoint, err := project.GetTunnelEndpoint()
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
            tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
    
    	// 创建一个DownloadSession,指定要读取的表/分区
    	session, err := tunnelIns.CreateDownloadSession(
    		project.Name(),
    		"all_types_demo",
    		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
    	)
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	recordCount := session.RecordCount()
    	fmt.Printf("record count is %d\n", recordCount)
    
    	start := 0
    	step := 100001
    	total := 0
    	schema := session.Schema()
    
    	for start < recordCount {
    		reader, err := session.OpenRecordReader(start, step, nil)
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    
    		count := 0
    		err = reader.Iterator(func(record data.Record, _err error) {
    			if _err != nil {
    				return
    			}
    
    			for i, d := range record {
    				if d == nil {
    					fmt.Printf("%s=null", schema.Columns[i].Name)
    				} else {
    					fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
    				}
    
    				if i < record.Len()-1 {
    					fmt.Printf(", ")
    				} else {
    					fmt.Println()
    				}
    			}
    		})
    
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    
    		start += count
    		total += count
    
    		log.Println(count)
    
    		if err = reader.Close(); err != nil {
    			log.Fatalf("%+v", err)
    		}
    	}
    
    	println("total count ", total)
    }

流式数据上传

MaxCompute支持通过流式数据通道将数据写入表/分区的能力。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// 从配置文件中获取配置信息
	conf, err := odps.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 初始化ODPS
	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)
	project := odpsIns.DefaultProject()

	// 获取Tunnel Endpoint
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	fmt.Println("tunnel endpoint: " + tunnelEndpoint)

	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)

	session, err := tunnelIns.CreateStreamUploadSession(
		project.Name(),
		"all_types_demo",
		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
		tunnel.SessionCfg.WithCreatePartition(), // 如果指定分区不存在,则创建新的分区
		tunnel.SessionCfg.WithDefaultDeflateCompressor(),
	)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	packWriter := session.OpenRecordPackWriter()

	for i := 0; i < 2; i++ {
		record, err := makeRecord(session.Schema())
		if err != nil {
			log.Fatalf("%+v", err)
		}

		// 将数据加入packWriter中,直到数据大小达到阈值
		for packWriter.DataSize() < 64 {
			err = packWriter.Append(record)
			if err != nil {
				log.Fatalf("%+v", err)
			}
		}

		// 刷新数据
		traceId, recordCount, bytesSend, err := packWriter.Flush()
		if err != nil {
			log.Fatalf("%+v", err)
		}

		fmt.Printf(
			"success to upload data with traceId=%s, record count=%d, record bytes=%d\n",
			traceId, recordCount, bytesSend,
		)
	}
}

func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
	varchar, _ := data.NewVarChar(500, "varchar")
	char, _ := data.NewVarChar(254, "char")
	s := data.String("hello world")
	date, _ := data.NewDate("2022-10-19")
	datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
	timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")

	mapType := schema.Columns[15].Type.(datatype.MapType)
	mapData := data.NewMapWithType(mapType)
	err := mapData.Set("hello", 1)
	if err != nil {
		return nil, err
	}

	err = mapData.Set("world", 2)
	if err != nil {
		return nil, err
	}

	arrayType := schema.Columns[16].Type.(datatype.ArrayType)
	arrayData := data.NewArrayWithType(arrayType)
	err = arrayData.Append("a")
	if err != nil {
		return nil, err
	}

	err = arrayData.Append("b")
	if err != nil {
		return nil, err
	}

	structType := schema.Columns[17].Type.(datatype.StructType)
	structData := data.NewStructWithTyp(structType)

	arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
	err = arr.Append("x")
	if err != nil {
		return nil, err
	}
	err = arr.Append("y")
	if err != nil {
		return nil, err
	}
	err = structData.SetField("arr", arr)
	if err != nil {
		return nil, err
	}
	err = structData.SetField("name", "tom")
	if err != nil {
		return nil, err
	}

	record := []data.Data{
		data.TinyInt(1),
		data.SmallInt(32767),
		data.Int(100),
		data.BigInt(100000000000),
		data.Binary("binary"),
		data.Float(3.14),
		data.Double(3.1415926),
		data.NewDecimal(38, 18, "3.1415926"),
		varchar,
		char,
		s,
		date,
		datetime,
		timestamp,
		data.Bool(true),
		mapData,
		arrayData,
		structData,
	}

	return record, nil
}

表管理

获取表列表

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	ts := project.Tables()

	ts.List(
		func(t *odps.Table, err error) {
			if err != nil {
				log.Fatalf("%+v", err)
			}

			println(t.Name())
		},
		// 按表名前缀过滤
		odps.TableFilter.NamePrefix("all_type"),
		// 按表类型过滤. 其他表类型包括:VirtualView、ExternalTable
		odps.TableFilter.Type(odps.ManagedTable),
	)
}

获取单个表信息

判断表是否存在

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	ok, err := table.Exists()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(ok)
}

获取表的大小、行数

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")
	
	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	
	// 获取表大小(以字节为单位)
	size := table.Size()
	println("size = ", size)
	
	// 获取表的行数
	rowCount := table.RecordNum()
	println("rowCount = ", rowCount)
}

获取表的CreatedTime、LastDDLTime、ModifiedTime

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 获取表的创建时间
	createTime := table.CreatedTime()
	println("create time = ", createTime)
	
	// 获取最近一次执行DDL操作的时间
	lastDDLTime := table.LastDDLTime()
	println("last ddl time = ", lastDDLTime)
	
	// 获取最近一次修改表的时间
	lastModifiedTime := table.LastModifiedTime()
	println("last modified time = ", lastModifiedTime)
}

获取表的所有者

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 获取表Owner
	owner := table.Owner()
	println("owner is ", owner)
}

获取表类型

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 获取表类型
	t := table.Type()
	println("type is ", t)
}

获取表结构

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
	"strings"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("test_cluster_table")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 获取表的Schema
	schema := table.Schema()

	println("table name = ", schema.TableName)
	if table.LifeCycle() > 0 {
		println("table lifecycle = ", table.LifeCycle())		
	}

	// 获取列
	for _, c := range schema.Columns {
		fmt.Printf("column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
	}

	// 获取分区列
	for _, c := range schema.PartitionColumns {
		fmt.Printf("partition column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
	}

	// 获取集群信息
	if schema.ClusterInfo.ClusterType != "" {
		ci := schema.ClusterInfo
		println("cluster type = ", ci.ClusterType)
		println("cluster columns = ", strings.Join(ci.ClusterCols, ", "))
		println("cluster bucket num = ", ci.BucketNum)
	}
}

删除表

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("test_cluster_table")

	err = table.Delete()
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

分区管理

获取分区列表

通过MaxCompute SDK既可以获取一个表的所有“分区值”列表,也可以获取一个表的所有“分区对象”列表。“分区对象”中包含分区的一些基本信息,如size、lastModifiedTime等。

获取分区列表值

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 获取配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	partitionValues, err := table.GetPartitionValues()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for _, pv := range partitionValues {
		println(pv)
	}

}

获取分区对象列表

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	partitions, err := table.GetPartitions()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	
	fmt.Printf("get %d partitions\n", len(partitions))
	
	for _, p := range partitions {
        fmt.Printf(	
    		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
    		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
    	)
	}
}

获取单个分区信息

获取分区的基本信息

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	p, err := table.GetPartition("p1=20/p2=hangzhou")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf(
		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
	)
}

获取分区的扩展信息

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	p, err := table.GetPartition("p1=20/p2=hangzhou")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 获取基本分区信息
	fmt.Printf(
		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
	)

	// 获取扩展分区信息
	err = p.LoadExtended()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf(
		"isArchived=%t, lifeCycle=%d, physicalSize=%d",
		p.IsArchivedEx(), p.LifeCycleEx(), p.PhysicalSizeEx(),
	)
}

添加分区

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.AddPartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

删除分区

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.DeletePartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

Instance管理

MaxCompute执行SQL后返回Instance对象,Instance表示MaxCompute SQL作业,用于追踪SQL执行状态、结果。

获取Instance列表

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
	"time"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	timeFormat := "2006-01-02 15:04:05"
	startTime, _ := time.Parse(timeFormat, "2024-10-11 02:15:30")
	endTime, _ := time.Parse(timeFormat, "2024-10-13 06:22:02")

	var f = func(i *odps.Instance) {
		if err != nil {
			log.Fatalf("%+v", err)
		}

		println(
			fmt.Sprintf(
				"%s, %s, %s, %s, %s",
				i.Id(), i.Owner(), i.StartTime().Format(timeFormat), i.EndTime().Format(timeFormat), i.Status(),
			))
	}

	instances := odpsIns.Instances()
	instances.List(
		f,
		odps.InstanceFilter.TimeRange(startTime, endTime),
		odps.InstanceFilter.Status(odps.InstanceTerminated),
	)
}

获取Instance信息

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	ins := odpsIns.Instances().Get("<yourInstanceId>")
	err = ins.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf("owner=%s\n", ins.Owner())
	fmt.Printf("status=%s\n", ins.Status())
	fmt.Printf("startTime=%s\n", ins.StartTime())
	fmt.Printf("endTime=%s\n", ins.EndTime())
	fmt.Printf("result=%+v\n", ins.TaskResults())
}

权限管理

MaxCompute可以通过操作权限的相关命令进行权限管理,关于授权方案详情,请参见授权方案概述。下述示例通过DESC ROLE命令查看角色的相关信息。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/security"
	"log"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	var restClient = odpsIns.RestClient()

	sm := security.NewSecurityManager(restClient, conf.ProjectName)
	result, err := sm.RunQuery("desc role role_project_admin;", true, "")

	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(fmt.Sprintf("ok: %s", result))
}

Logview

您可以通过Logview查看已提交的MaxCompute作业,并进行Debug调试,详情请参见使用Logview查看作业运行信息

package main

import (
	"log"

	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)

func main() {
	// 指定配置文件路径
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 设置默认的MaxCompute项目
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1>0 or p2 > '';"

	// SQL引擎参数, 例如odps.sql.skewjoin
	var hints map[string]string = nil

	// 创建一个SqlTask
	sqlTask := odps.NewSqlTask("select", sql, hints)

	// 使用项目关联的Quota运行SQL
	project := odpsIns.DefaultProjectName()
	ins, err := sqlTask.Run(odpsIns, project)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	logView, err := odpsIns.LogView().GenerateLogView(ins, 1)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(logView)
}