Goodps

阿里云MaxCompute Go-SDK

Vonng Golang SDK Github Repo

GOODPS

ODPS SQL Driver for golang

MaxCompute(ODPS) 非官方Golang SDK

作者:墨航

当前版本:0.0.3


提供了了与Go标准库database/sql兼容的ODPS SQL执行查询接口。采用原生HTTP API。

Install

go get github.com/Vonng/goodps

Usage

因为我只需要执行SQL的功能,最多加上一个下载上传,所以其他乱七八糟的功能看情况慢慢加吧。

连接串

这里因为Go要求使用一个字符串表示连接信息,通常就是数据库的URL。所以这里ODPS连接串的格式是以endpoint为基础URL,将access_id,access_key,project分别作为Username,Password,QueryParam填入,拼成的一整个URL。

例如,办公网的endpoint是http://service-corp.odps.aliyun-inc.com/api,那么应当使用的ODPS URL格式就是:http://<access_id>:<access_key>@service-corp.odps.aliyun-inc.com/api?curr_project=<project>

这里给一个测试用例,其他例子test里都有(或者慢慢补吧)

	// cred is my secret, define your own credential somewhere
	db, err := sql.Open("goodps", cred)
	if err != nil {
		t.Error(err)
	}

	const sql = `SELECT
					cast('1'                   AS BIGINT)  AS a,
					cast(TRUE                  AS BOOLEAN) AS b,
					cast('hi'                  AS STRING)  AS c,
					cast('3.14'                AS DOUBLE)  AS d,
					cast('2017-11-11 03:12:11' AS DATETIME) AS e,
					cast('100.01' AS DECIMAL)  AS f;`
	rows, err := db.Query(sql)
	if err != nil {
		t.Error(err)
		return
	}
	defer rows.Close()

	for rows.Next() {
		var (
			a Bigint
			b Boolean
			c String
			d Double
			e Datetime
			f Decimal
		)

		err := rows.Scan(&a, &b, &c, &d, &e, &f)
		if err != nil {
			t.Error(err)
		}

		if a.V != 1 {
			t.Errorf("bigint a should be 1, got %d", a.V)
		}

		if b.V != true {
			t.Errorf("boolean b should be true, got %v", b.V)
		}

		if c.V != "hi" {
			t.Errorf("string c should be `hi`, got `%s`", c.V)
		}

		if d.V != 3.14 {
			t.Errorf("double d should be 3.14, got `%v`", d.V)
		}

		if e.V.Format(StdTimeFmt) != "2017-11-11 03:12:11" {
			t.Errorf("datetime e should be `2017-11-11 03:12:11`, got `%s`", e.V)
		}

		if f.V != 100.01 {
			t.Errorf("decimal f should be `100.01`, got `%v`", f.V)
		}
	}

	if err := rows.Err(); err != nil {
		t.Error(err)
	}

很多和Project,Instance,Table相关的API虽然包装了一些,但因为还不稳定,就不写文档了。

介绍

想在集团内愉快地使用Go语言,一些基础设施的支持不能少。不过无论是HSF,Diamond,或者ODPS,都没有Go的SDK,路漫漫兮呀。

使用Shell命令直接调odpscmd的方式,任何语言都可以执行ODPS SQL,但这种方式实在是太土鳖了。毕竟ODPS本身也是通过HTTP API提供服务的,能不能直接解析这个协议,包装一个SDK呢?

好在ODPS有一个Python SDK,Python藏不住源码,通过研究pyodps的实现,就可以逆向出ODPS的协议来。这里功能不贪多,只要执行ODPS SQL就好。我写了一个简单的Wrapper,放在Gitlab和Github上。

使用Go来实现这个有什么好处呢?最大的好处当然是无依赖!一个ODPS SQL的工作流,可以用Go编写命令,实现诸如一键上传,一键下载,一键启动指定工作流之类的效果。然后编译成二进制,分发出去执行。不需要安装什么Python啊,Java运行时啊,拿着二进制就可以跑。

1. Short Ver

Install

 go get github.com/Vonng/goodps

Github Repo

集团内部也有一个分支:

 go get gitlab.alibaba-inc.com/ruohang.frh/goodps

Gitlab Repo

Usage

目前goodps实现了标准库database/sql/driver的相关接口,所以可以使用Go访问数据库的标准姿势:

需要注意的是这里Credential不再拆成四个参数access_id,access_key,project,endpoint,而是将它们合并为一个ODPSURL,即将access_id,access_key,project分别作为Username,Password,QueryParam填入endpoint,拼成一整个URL。

package main

import "fmt"
import "database/sql"
import _ "github.com/Vonng/goodps"

const cred = "http://<access_id>:<access_key>@service-corp.odps.aliyun-inc.com/api?curr_project=<project>"

var ODPS, _ = sql.Open("goodps", cred)

func main() {
	var (
		a Bigint
		b Boolean
		c String
		d Double
		e Datetime
		f Decimal
	)

	if err := ODPS.QueryRow(`SELECT
		cast('1'    AS BIGINT)  AS a,
		cast(TRUE   AS BOOLEAN) AS b,
		'hi'					AS c,
		cast('3.14' AS DOUBLE)  AS d,
		cast('2017-11-11 03:12:11' AS DATETIME) AS e,
		cast('100.01' AS DECIMAL) AS f;`).Scan(&a, &b, &c, &d, &e, &f); err != nil {
		fmt.Println(err.Error())
		return
	}
	fmt.Println(a, b, c, d, e, f)
}

要注意目前从ODPS Endpoint直接拉取数据有一万条的限制,但不需要Tunnel奇奇怪怪的权限。可以用一些聚合函数,窗口函数,临时表曲线救国实现花式下载。例如首字母分桶+分隔符聚合的方式:

func Query() {
	// table udmp_package_os_increase_di(package string, os string) partition (ds string)
	rows, err := ODPS.Query(`SELECT
		ASCII(package) AS head,WM_CONCAT('\t',package) AS pkgs
		FROM ump_ads.udmp_package_os_increase_di WHERE ds = '20170821'
		GROUP BY  ASCII(package);`)
	if err != nil {
		fmt.Println(err.Error())
		return
	}

	defer rows.Close()
	for rows.Next() {
		var pkgs String
		var head Bigint
		err := rows.Scan(&head, &pkgs)
		if err != nil {
			fmt.Println(err.Error())
			return
		}
		fmt.Println(pkgs, head)
	}

	if err := rows.Err(); err != nil {
		fmt.Println(err.Error())
		return
	}
}

其他标准方法,例如Query,Exec,也都会符合预期地工作,不再赘述。

ODPS.Exec(`CREATE TABLE haha(id STRING);`)
ODPS.Exec(`DROP TABLE haha;`)

其他例如列出Partition,获取TableSchema,都不是标准的SQL,用Ad Hoc API的方式提供。不过那些地方还没弄完,好多API我也没权限调用。

2. 实现原理

实际上阿里云的一系列产品对外提供的都是HTTP API,使用HMAC认证。关键是解决两个问题,第一个是认证,第二个是协议。

2.1 API的认证

认证一直是API对接里比较蛋疼的事情,尤其是很多时候大家都爱造轮子,弄一些自己的认证协议。总体而言,阿里云的认证方式还是比较蛋疼的。通过解析Python中的认证逻辑,还原出了认证的具体算法,简单说包括以下几个Point:

  • 使用复杂的方式构造一个待签名消息msg,使用AccessSecret对其签名,与AccessID一起按指定格式填入Authorization首部。
  • 待签名消息包括:固定MagicString:ODPS,首部Content-MD5, Content-Type,Date,以x-odps开头的首部。以x-odps开头的参数,标准化的resource(就是url path后面去掉/api的部分)。
  • 细节是魔鬼,show you the code:
// Sign will add necessary operations to make request valid for odps server
func (client *Client) Sign(r *http.Request) {
	var canonPath, msg, auth bytes.Buffer
	var signKeyList []string = []string{"content-md5", "content-type", "date"}
	signParams := make(map[string]string, len(r.Header)+3)

	// fill date header in RFC1123
	if dateStr := r.Header.Get("Date"); dateStr == "" {
		gmtTime := time.Now().In(location).Format(time.RFC1123)
		r.Header.Set("Date", gmtTime)
	}

	// build canonical resource.
	canonPath.WriteString(r.URL.Path)
	if urlParams := r.URL.Query(); len(urlParams) > 0 {
		canonPath.WriteByte('?')
		var paramKeys []string
		for k, _ := range urlParams {
			paramKeys = append(paramKeys, k)
		}
		sort.Strings(paramKeys)
		for i, k := range paramKeys {
			if i > 0 {
				canonPath.WriteByte('&')
			}
			canonPath.WriteString(k)
			if v := urlParams.Get(k); v != "" {
				canonPath.WriteByte('=')
				canonPath.WriteString(v)
			}
		}
	}
	canonPathStr := strings.TrimPrefix(canonPath.String(), "/api")

	// add headers to signParamsMap
	for k, v := range r.Header {
		lk := strings.ToLower(k)
		switch {
		case lk == "content-md5":
			signParams["content-md5"] = v[0]
		case lk == "content-type":
			signParams["content-type"] = v[0]
		case lk == "date":
			signParams["date"] = v[0]
		case strings.HasPrefix(lk, "x-odps"):
			signKeyList = append(signKeyList, lk)
			signParams[lk] = v[0]
		}
	}

	// add url query params with prefix "x-odps-" to singParamsMap
	for k, v := range r.URL.Query() {
		lk := strings.ToLower(k)
		if strings.HasPrefix(lk, "x-odps-") {
			signKeyList = append(signKeyList, lk)
			signParams[lk] = v[0]
		}
	}

	// build signing message
	msg.WriteString(r.Method)
	sort.Strings(signKeyList)
	for _, k := range signKeyList {
		msg.WriteByte('\n')
		v := signParams[k]
		if strings.HasPrefix(k, "x-odps-") {
			msg.WriteString(k)
			msg.WriteByte(':')
		}
		msg.WriteString(v)
	}
	msg.WriteByte('\n')
	msg.WriteString(canonPathStr)

	// calculate hmac-sha1 of msg
	hasher := hmac.New(sha1.New, []byte(client.AccessKey))
	hasher.Write(msg.Bytes())

	// build authorization header: `ODPS <access_id>:<signature>`
	auth.WriteString("ODPS ")
	auth.WriteString(client.AccessID)
	auth.WriteByte(':')
	auth.WriteString(base64.StdEncoding.EncodeToString(hasher.Sum(nil)))

	log.Debugf("[Client.Sign] Authorization: %s", auth.String())
	r.Header.Set("Authorization", auth.String())
}

这是一个通用的认证方式,发往ODPS任何请求都需要这样签名。当我意识到这一点的时候突然想到OSS里说不定也是这样签名的……?

2.2 协议

发往ODPS的请求都使用XML格式,Instance里面套着JobJOB里面套着Task。大概长这样

<?xml version="1.0" ?>
<Instance>
    <Job>
        <Priority>1</Priority>
        <Tasks><SQL>
                <Name>AnonymousSQLTask</Name>
                <Query><![CDATA[SELECT 1;]]></Query>
                <Config>
                    <Property>
                        <Name>settings</Name>
                        <Value>{"odps.sql.udf.strict.mode": "true"}</Value>
                    </Property>
                </Config>
            </SQL></Tasks>
        <DAG><RunMode>Sequence</RunMode></DAG>
    </Job>
</Instance>

其中//Instance/Job/Tasks/SQL/Query/text()里面填的是要执行的SQL。就是这么简单……。

而ODPS的响应也是一个XML。比较恍惚的是执行SELECT查询出来的结果竟然是放在XML里面的CSV。而且没有双引号扩起转义,这实在是让人无比头大……

3. 后续

这是一个半成品啊半成品。一些RestAPI包装了一些,但还没弄完。执行SQL是够用了。

数据上传下载理论上当然用Tunnel更好,但没找到Tunnel的RestAPI文档…………

逆向又太费时间了,哪位同志愿意提供一下详细的Restful API 文档,不胜感激哈。

有时间继续搞一下吧。