Commit 50fda9aa authored by wuerqiQs's avatar wuerqiQs

init

parents
Pipeline #18951 canceled with stages
package mycanal
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/gomodule/redigo/redis"
)
type Canal struct {
name string
redisPool *redis.Pool
rowEvCanal RowEventHandler
c *canal.Canal
pos mysql.Position
last bool
}
type RowEventHandler interface {
OnRow(ev *canal.RowsEvent) error
}
func NewCanal(name, my_host, my_user, my_pass string, my_port int, redis_conn, redis_pass string, redis_db int, last bool, rowEvCanal RowEventHandler) (*Canal, error) {
pool := &redis.Pool{
Dial: func() (redis.Conn, error) {
opts := []redis.DialOption{
redis.DialConnectTimeout(time.Millisecond * 120),
redis.DialReadTimeout(time.Millisecond * 120),
redis.DialWriteTimeout(time.Second),
redis.DialDatabase(redis_db),
}
if redis_pass != "" {
opts = append(opts, redis.DialPassword(redis_pass))
}
return redis.Dial("tcp", redis_conn, opts...)
},
MaxIdle: 10,
MaxActive: 1024,
IdleTimeout: time.Minute,
}
cfgCanal := canal.NewDefaultConfig()
cfgCanal.Addr = fmt.Sprintf("%s:%d", my_host, my_port)
cfgCanal.User = my_user
cfgCanal.Password = my_pass
c, err := canal.NewCanal(cfgCanal)
if err != nil {
return nil, err
}
// Register a handler to handle RowsEvent
self := &Canal{
name: name,
redisPool: pool,
rowEvCanal: rowEvCanal,
c: c,
last: last,
}
conn := pool.Get()
defer conn.Close()
log.Println("redis name", name)
b, err := redis.Bytes(conn.Do("HGET", "binlog_connection", name))
if err == nil {
json.Unmarshal(b, &self.pos)
} else if err != redis.ErrNil {
return nil, err
}
c.SetEventHandler(self)
return self, nil
}
func (h *Canal) OnRotate(*replication.RotateEvent) error { return nil }
func (h *Canal) OnTableChanged(schema string, table string) error { return nil }
func (h *Canal) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
return nil
}
func (h *Canal) OnXID(mysql.Position) error { return nil }
func (h *Canal) OnGTID(mysql.GTIDSet) error { return nil }
func (h *Canal) OnPosSynced(pos mysql.Position, gtidset mysql.GTIDSet, force bool) error {
if b, err := json.Marshal(pos); err == nil {
conn := h.redisPool.Get()
defer conn.Close()
conn.Do("HSET", "binlog_connection", h.name, string(b))
}
return nil
}
func (h *Canal) OnRow(ev *canal.RowsEvent) error {
return h.rowEvCanal.OnRow(ev)
}
func (h *Canal) String() string { return h.name }
func (h *Canal) Run() (err error) {
if h.pos.Name != "" {
return h.c.RunFrom(h.pos)
}
if h.last {
pos, err := h.c.GetMasterPos()
if err != nil {
return err
}
if b, err := json.Marshal(pos); err == nil {
conn := h.redisPool.Get()
defer conn.Close()
conn.Do("HSET", "binlog_connection", h.name, string(b))
}
h.pos = pos
h.c.RunFrom(pos)
}
return h.c.Run()
}
func (h *Canal) Close() (err error) {
h.c.Close()
return nil
}
package clickhouse
import (
"database/sql"
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/jmoiron/sqlx"
)
// CKNode clickhouse节点增强功能
type CKNode struct {
*sqlx.DB
}
// NewCKTable 初始化数据表
func (node *CKNode) NewCKTable(database, table string) *CKTable {
ckt := &CKTable{
database: database,
table: table,
node: node,
}
ckt.SetColumn()
return ckt
}
type rowDesc struct {
Name string `db:"name"`
Type string `db:"type"`
DefaultKind string `db:"default_kind"`
DefaultExpression string `db:"default_expression"`
DefaultValue interface{} `db:"-"`
}
// CKTable clickhouse数据表
type CKTable struct {
insertSQL string
desc []rowDesc
database string
table string
node *CKNode
}
// SetColumn 初始化字段
func (ckt *CKTable) SetColumn() error {
ckt.desc = ckt.desc[0:0]
if err := ckt.node.Select(&ckt.desc, "select name, type, default_kind, default_expression from system.columns where `database`=? and `table`=?", ckt.database, ckt.table); err != nil {
return err
}
for i, d := range ckt.desc {
if d.DefaultKind != "" {
ckt.desc[i].DefaultValue = d.DefaultExpression
continue
}
switch d.Type {
case "String":
ckt.desc[i].DefaultValue = ""
case "Date", "DateTime":
ckt.desc[i].DefaultValue = time.Unix(0, 0)
case "UInt8", "UInt16", "UInt32", "UInt64", "Int8", "Int16", "Int32", "Int64":
ckt.desc[i].DefaultValue = 0
case "Float32", "Float64":
ckt.desc[i].DefaultValue = 0.0
case "Array(String)":
ckt.desc[i].DefaultValue = []string{}
default:
ckt.desc[i].DefaultValue = ""
}
}
fields := make([]string, len(ckt.desc))
questionMarks := make([]string, len(ckt.desc))
for i := range fields {
fields[i] = fmt.Sprintf("`%s`", ckt.desc[i].Name)
questionMarks[i] = "?"
}
ckt.insertSQL = fmt.Sprintf("INSERT INTO %s.%s (%s) VALUES (%s)", ckt.database, ckt.table, strings.Join(fields, ","), strings.Join(questionMarks, ","))
log.Println("query: ", ckt.insertSQL)
return nil
}
func (ckt *CKTable) setArgs(ev map[string]interface{}) []interface{} {
args := make([]interface{}, len(ckt.desc))
for i, field := range ckt.desc {
if v, ok := ev[field.Name]; ok && v != nil {
switch field.Type {
case "String", "Date", "DateTime", "Array(String)":
args[i] = fmt.Sprintf("%v", v)
case "UInt8", "UInt16", "UInt32", "UInt64", "Int8", "Int16", "Int32", "Int64":
if f, ok := v.(float64); ok {
args[i] = int64(f)
} else if s, ok := v.(string); ok {
args[i], _ = strconv.ParseInt(s, 10, 64)
} else {
args[i] = v
}
default:
args[i] = v
}
} else {
args[i] = field.DefaultValue
}
}
return args
}
func (ckt *CKTable) doBatchInsert(events []map[string]interface{}) error {
var tx *sql.Tx
var err error
if tx, err = ckt.node.Begin(); err != nil {
return err
}
var stmt *sql.Stmt
if stmt, err = tx.Prepare(ckt.insertSQL); err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, event := range events {
if _, err = stmt.Exec(ckt.setArgs(event)...); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
// BatchInsert 批量插入
func (ckt *CKTable) BatchInsert(batchSize int, events []map[string]interface{}) ([]map[string]interface{}, error) {
for {
if len(events) > batchSize {
if err := ckt.doBatchInsert(events[0:batchSize]); err != nil {
return events, err
}
events = events[batchSize:]
} else {
if err := ckt.doBatchInsert(events); err != nil {
return events, err
}
return nil, nil
}
}
}
package clickhouse
import (
"database/sql"
"fmt"
"sync"
"time"
"github.com/jmoiron/sqlx"
// clichouse
_ "github.com/ClickHouse/clickhouse-go"
)
type ckNode struct {
Host string `db:"host_address"`
Port int `db:"port"`
IsLocal int `db:"is_local"`
}
// ClickHouseCluster clickhouse集群
type ClickHouseCluster struct {
Name string
dbs map[string]*sqlx.DB
hosts []string
masters []*sqlx.DB
robinIdx int
robinLocker sync.Mutex
}
// NewCluster 创建集群句柄
func NewCluster(name string, addrs []string, user, pass string) (*ClickHouseCluster, error) {
cluster := &ClickHouseCluster{Name: name, dbs: make(map[string]*sqlx.DB)}
var errs []error
for _, addr := range addrs {
host := addr[len("tcp://"):]
if db, err := sqlx.Connect("clickhouse", fmt.Sprintf("%s?username=%s&password=%s", addr, user, pass)); err == nil {
cluster.masters = append(cluster.masters, db)
cluster.dbs[host] = db
cluster.hosts = append(cluster.hosts, host)
} else {
errs = append(errs, err)
}
}
if len(cluster.masters) == 0 {
return nil, fmt.Errorf("no valid master: %s", errs[0].Error())
}
if err := cluster.setCluster(user, pass); err != nil {
return nil, err
}
if len(cluster.dbs) == 0 {
return nil, fmt.Errorf("no valid node")
}
return cluster, nil
}
func (ck *ClickHouseCluster) setCluster(user, pass string) error {
for _, db := range ck.masters {
var ckNodes []*ckNode
if err := db.Select(&ckNodes, "select host_address, port,is_local from system.clusters where cluster=?", ck.Name); err == nil {
if len(ckNodes) == len(ck.dbs) {
return nil
}
for _, node := range ckNodes {
if node.IsLocal == 0 {
if _, ok := ck.dbs[node.Host]; ok {
continue
}
ndb, err := sqlx.Connect("clickhouse", fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", node.Host, node.Port, user, pass))
if err != nil {
return err
}
ck.dbs[node.Host] = ndb
ck.hosts = append(ck.hosts, node.Host)
}
}
return nil
}
}
return fmt.Errorf("query node all fail: %d", len(ck.masters))
}
// ExecOnLeader leader节点执行语句
func (ck *ClickHouseCluster) ExecOnLeader(database, table, query string, args ...interface{}) error {
for _, db := range ck.dbs {
var isLeader int
if err := db.Get(&isLeader, "select is_leader from system.replicas where database=? and table=?", database, table); err != nil {
return err
}
println("ExecOnLeader")
if _, err := db.Exec(query, args...); err != nil {
return err
}
}
return nil
}
func (ck *ClickHouseCluster) Select(dest interface{}, query string, args ...interface{}) (err error) {
for _, db := range ck.masters {
err = db.Select(dest, query, args...)
if err == nil {
return nil
}
}
return err
}
func (ck *ClickHouseCluster) Get(dest interface{}, query string, args ...interface{}) (err error) {
for _, db := range ck.masters {
err = db.Get(dest, query, args...)
if err == nil {
return nil
}
}
return err
}
// ClickHouseClusterFunc 执行函数
type ClickHouseClusterFunc func(db *sqlx.DB) error
// ExecFunc 在所有节点上执行任务
func (ck *ClickHouseCluster) ExecFunc(ckfunc ClickHouseClusterFunc) error {
for _, db := range ck.dbs {
if err := ckfunc(db); err != nil {
return err
}
}
return nil
}
// RobinDB round robin 方式获取一个节点
func (ck *ClickHouseCluster) RobinDB() (db *sqlx.DB) {
ck.robinLocker.Lock()
defer ck.robinLocker.Unlock()
ck.robinIdx++
ck.robinIdx = ck.robinIdx % len(ck.dbs)
host := ck.hosts[ck.robinIdx]
db = ck.dbs[host]
return
}
// ExecFuncOnRobinNode 在Round robin 一个节点上执行
func (ck *ClickHouseCluster) ExecFuncOnRobinNode(ckfunc ClickHouseClusterFunc) (err error) {
for i := 0; i < len(ck.dbs); i++ {
db := ck.RobinDB()
err = ckfunc(db)
if err == nil {
break
}
}
return err
}
// RobinGet 在Round robin 一个节点上执行Get
func (ck *ClickHouseCluster) RobinGet(dest interface{}, query string, args ...interface{}) (err error) {
for i := 0; i < len(ck.dbs); i++ {
db := ck.RobinDB()
err = db.Get(dest, query, args...)
if err == nil {
break
}
}
return err
}
// RobinSelect 在Round robin 一个节点上执行Select
func (ck *ClickHouseCluster) RobinSelect(dest interface{}, query string, args ...interface{}) (err error) {
for i := 0; i < len(ck.dbs); i++ {
db := ck.RobinDB()
err = db.Select(dest, query, args...)
if err == nil {
break
}
}
return err
}
// ExecFuncOnLeader 在leader节点上执行任务
func (ck *ClickHouseCluster) ExecFuncOnLeader(database, table string, ckfunc ClickHouseClusterFunc) error {
for _, db := range ck.dbs {
var isLeader int
if err := db.Get(&isLeader, "select is_leader from system.replicas where database=? and table=?", database, table); err != nil && err != sql.ErrNoRows {
return err
}
if isLeader == 1 {
if err := ckfunc(db); err != nil {
return err
}
}
}
return nil
}
func (ck *ClickHouseCluster) ExecFuncOnMaster(ckfunc ClickHouseClusterFunc) (err error) {
for _, master := range ck.masters {
err = ckfunc(master)
if err == nil {
return
}
}
return
}
// CheckMutation 检查mutation数量
func (ck *ClickHouseCluster) CheckMutation(dbname, table string) (int, error) {
for _, db := range ck.dbs {
var count int
if err := db.Get(&count, "select count() from system.mutations where database=? and table=? and is_done=0", dbname, table); err != nil && err != sql.ErrNoRows {
return 0, err
}
if count > 0 {
return count, nil
}
}
return 0, nil
}
// ErrMutationDuplicate mutation未结束
var ErrMutationDuplicate = fmt.Errorf("mutation duplicate")
// WaitMutation 等待Mutation结束
func (ck *ClickHouseCluster) WaitMutation(dbname, table string, retry int) error {
for i := 0; i < retry; i++ {
time.Sleep(time.Second)
count, err := ck.CheckMutation(dbname, table)
if err != nil {
return err
}
if count == 0 {
return nil
}
}
return ErrMutationDuplicate
}
// KillMutation 杀死mutation进程
func (ck *ClickHouseCluster) KillMutation(dbname, table string) error {
_, err := ck.masters[0].Exec(fmt.Sprintf("kill mutation on cluster %s where database=? and table=? sync", ck.Name), dbname, table)
return err
}
package crypt
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"errors"
"io"
)
//PKCS7Padding 使用PKCS7进行填充
func PKCS7Padding(ciphertext []byte, blockSize int) []byte {
padding := blockSize - len(ciphertext)%blockSize
padtext := bytes.Repeat([]byte{byte(padding)}, padding)
return append(ciphertext, padtext...)
}
//PKCS7UnPadding PKCS7反填充
func PKCS7UnPadding(origData []byte) []byte {
length := len(origData)
unpadding := int(origData[length-1])
return origData[:(length - unpadding)]
}
//AESCBCEncryptPKCS7 aes加密,填充秘钥key的16位,24,32分别对应AES-128, AES-192, or AES-256.
func AESCBCEncryptPKCS7(rawData, key []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
//填充原文
blockSize := block.BlockSize()
rawData = PKCS7Padding(rawData, blockSize)
//初始向量IV必须是唯一,但不需要保密
cipherText := make([]byte, blockSize+len(rawData))
//block大小 16
iv := cipherText[:blockSize]
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
return nil, err
}
//block大小和初始向量大小一定要一致
mode := cipher.NewCBCEncrypter(block, iv)
mode.CryptBlocks(cipherText[blockSize:], rawData)
return cipherText, nil
}
// AESCBCDecryptPKCS7 AES解密CBC模式
func AESCBCDecryptPKCS7(encryptData, key []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
blockSize := block.BlockSize()
if len(encryptData) < blockSize {
return nil, errors.New("ciphertext too short")
}
iv := encryptData[:aes.BlockSize]
encryptData = encryptData[aes.BlockSize:]
// CBC mode always works in whole blocks.
if len(encryptData)%blockSize != 0 {
panic("ciphertext is not a multiple of the block size")
}
mode := cipher.NewCBCDecrypter(block, iv)
// CryptBlocks can work in-place if the two arguments are the same.
mode.CryptBlocks(encryptData, encryptData)
//解填充
encryptData = PKCS7UnPadding(encryptData)
return encryptData, nil
}
// AESCBCDecryptPKCS7WithIV AES解密CBC模式
func AESCBCDecryptPKCS7WithIV(encryptData, key, iv []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
blockSize := block.BlockSize()
if len(encryptData) < blockSize {
return nil, errors.New("ciphertext too short")
}
// CBC mode always works in whole blocks.
if len(encryptData)%blockSize != 0 {
panic("ciphertext is not a multiple of the block size")
}
mode := cipher.NewCBCDecrypter(block, iv)
// CryptBlocks can work in-place if the two arguments are the same.
mode.CryptBlocks(encryptData, encryptData)
//解填充
encryptData = PKCS7UnPadding(encryptData)
return encryptData, nil
}
package crypt
import (
"crypto/md5"
"encoding/hex"
"regexp"
"unicode"
)
// MD5Hash md5值
func MD5Hash(v string) string {
h := md5.New()
h.Write([]byte(v))
return hex.EncodeToString(h.Sum(nil))
}
//是否包含中文
func IsChineseChar(str string) bool {
for _, r := range str {
if unicode.Is(unicode.Scripts["Han"], r) || (regexp.MustCompile("[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]").MatchString(string(r))) {
return true
}
}
return false
}
package exception
import (
"bytes"
"context"
"encoding/json"
"fmt"
"html/template"
"io/ioutil"
"log"
"net/http"
"strconv"
"time"
"github.com/getsentry/sentry-go"
"github.com/labstack/echo/v4"
"github.com/spf13/viper"
cmq "github.com/yougg/cmq-go-tdmq"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/crypt"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat"
"gopkg.in/gomail.v2"
)
// AlertContent 邮件内容
type AlertContent struct {
Topic string `json:"topc"`
Host string `json:"host"`
PID int `json:"pid"`
Content string `json:"content"`
Time string `json:"time"`
Args []interface{} `json:"args,omitempty"`
}
type cmdQueue struct {
client *cmq.Client
queue string
}
var defaultTemplate = template.Must(template.New("default").Parse("主题: {{.Topic}} 发生报警 {{.Contents | len}} 次\n{{range .Contents}}报警时间:\t{{.Time}}\n主机名称:\t{{.Host}}\n进程ID:\t{{.PID}}\n报警内容:\t{{.Content}}\n{{range .Args}}\t-\t{{.}}\n {{end}} {{end}}"))
// Hash 生成过滤hash
func (ac AlertContent) Hash(hashAlgo string) string {
switch hashAlgo {
case "topic":
return crypt.MD5Hash(ac.Topic + ac.Host)
case "process":
return crypt.MD5Hash(ac.Topic + ac.Host + strconv.Itoa(ac.PID))
case "content":
return crypt.MD5Hash(ac.Topic + ac.Host + ac.Content)
case "arg0":
return crypt.MD5Hash(fmt.Sprintf("%s%s%s%v", ac.Topic, ac.Host, ac.Content, ac.Args[0]))
}
return ""
}
var alertChan = make(chan *AlertContent, 64)
var alertQueue *cmdQueue
func sendContents(queue string, contents []*AlertContent) {
if alertQueue == nil {
b, err := json.Marshal(contents)
if err != nil {
sentry.CaptureException(err)
} else {
sentry.CaptureMessage(string(b))
}
} else {
var msgs []string
for _, content := range contents {
b, _ := json.Marshal(content)
msgs = append(msgs, string(b))
}
rsp, err := alertQueue.client.BatchSendMessage(queue, msgs, 0)
log.Println(rsp, err)
}
}
// SetAlertConfig 显式设置消息队列
func SetAlertConfig(queue string, endpoint string, secretID string, secretKey string) {
alertQueue = &cmdQueue{
client: cmq.NewClient(endpoint, secretID, secretKey, 5*time.Second),
queue: queue,
}
var contents []*AlertContent
go func() {
for {
select {
case content := <-alertChan:
contents = append(contents, content)
if len(contents) >= 16 {
go sendContents(queue, contents)
contents = contents[0:0]
}
case <-time.After(time.Second * 5):
if len(contents) > 0 {
go sendContents(queue, contents)
contents = contents[0:0]
}
}
}
}()
}
// SetAlertViper 自动获取viper配置
func SetAlertViper() {
SetAlertConfig(viper.GetString("alert_queue"), viper.GetString("alert_endpoint"), viper.GetString("alert_secret_id"), viper.GetString("alert_secret_key"))
}
// SendAlert 发送报警内容
func SendAlert(topic string, err error, args ...interface{}) {
SendContent(topic, err.Error(), args...)
}
// SendContent 发送内容
func SendContent(topic string, content string, args ...interface{}) {
alertChan <- &AlertContent{
Topic: topic,
Content: content,
Host: host,
PID: pid,
Time: time.Now().Format(time.RFC3339),
Args: args,
}
}
func receiveAlert() ([]cmq.Message, bool) {
rsp, err := alertQueue.client.BatchReceiveMessage(alertQueue.queue, 10, 1)
if err == context.DeadlineExceeded {
return nil, true
}
if err != nil {
return nil, true
}
return rsp.MsgInfos(), len(rsp.MsgInfos()) < 10
}
// alertTopic 告警主题配置
type alertTopic struct {
Mails []string `yaml:"mails"`
Robots []string `yaml:"robots"`
Filter string `yaml:"filter"`
Template string `yaml:"template"`
tpl *template.Template
}
// 异常监听
func HttpServer() {
address := viper.GetString("http_server")
if len(address) > 0 {
SetAlertViper()
e := echo.New()
e.Use(func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) (err error) {
req := c.Request()
res := c.Response()
reqBody := []byte{}
defer func() {
fmt.Printf(`{"timestamp":%d,"method":"%s","url":"%s","status":%d,"body":%s}\n`, time.Now().Unix(), req.Method, req.URL.Path, res.Status, string(reqBody))
}()
if c.Request().Body != nil { // Read
reqBody, _ = ioutil.ReadAll(c.Request().Body)
}
c.Request().Body = ioutil.NopCloser(bytes.NewBuffer(reqBody)) // Reset
if err = next(c); err != nil {
c.Error(err)
}
return
}
})
e.POST("/exception/report", func(c echo.Context) error {
req := struct {
Topic string `json:"topic"`
Message string `json:"message"`
Host string `json:"host,omitempty"`
PID int `json:"pid,omitempty"`
}{}
c.Bind(&req)
if len(req.Topic) == 0 || len(req.Message) == 0 {
return c.JSON(http.StatusBadRequest, "error: topic和message 不能为空")
}
alertChan <- &AlertContent{
Topic: req.Topic,
Content: req.Message,
Host: req.Host,
PID: req.PID,
Time: time.Now().Format(time.RFC3339),
}
return c.JSON(http.StatusOK, "ok")
})
if err := e.Start(address); err != nil {
log.Println("start http server on ", address, "error", err)
return
}
log.Println("start http server on ", address)
}
}
// ProcessAlert 处理异常
func ProcessAlert() {
var alertTopicMap map[string]*alertTopic
Assert(viper.UnmarshalKey("alert_topics", &alertTopicMap))
for topic, at := range alertTopicMap {
if at.Template != "" {
at.tpl = template.Must(template.New(topic).Parse(at.Template))
} else {
at.tpl = defaultTemplate
}
}
alertQueue = &cmdQueue{
client: cmq.NewClient(viper.GetString("alert_endpoint"), viper.GetString("alert_secret_id"), viper.GetString("alert_secret_key"), 5*time.Second),
queue: viper.GetString("alert_queue"),
}
mailConn := gomail.NewPlainDialer(viper.GetString("alert_mail_host"), viper.GetInt("alert_mail_port"), viper.GetString("alert_mail_user"), viper.GetString("alert_mail_pass"))
hashMap := make(map[string]int64)
for {
var candidate []cmq.Message
for {
msgs, send := receiveAlert()
if len(msgs) > 0 {
candidate = append(candidate, msgs...)
}
if send || len(candidate) >= 1024 {
break
}
}
if len(candidate) == 0 {
time.Sleep(time.Minute)
continue
}
var contents = make(map[string][]*AlertContent)
var receipts []string
for _, msg := range candidate {
log.Println(msg.MsgId(), msg.Handle(), msg.MsgBody())
receipts = append(receipts, msg.Handle())
content := &AlertContent{}
if err := json.Unmarshal([]byte(msg.MsgBody()), content); err != nil {
log.Println(msg.MsgBody(), "unmarshal error", err)
sentry.CaptureException(err)
continue
}
contents[content.Topic] = append(contents[content.Topic], content)
}
ts := time.Now().Unix()
for topic, topicContents := range contents {
if alertTp, ok := alertTopicMap[topic]; ok {
hasMessage := false
for _, content := range topicContents {
if hashVal := content.Hash(alertTp.Filter); hashVal != "" {
if hashMap[hashVal] > ts-1800 {
continue
} else {
hashMap[hashVal] = ts
hasMessage = true
}
} else {
hasMessage = true
}
}
if !hasMessage {
log.Println("topic:", topic, "has no message")
continue
}
msg := bytes.NewBuffer(nil)
alertTp.tpl.Execute(msg, struct {
Topic string
Contents []*AlertContent
}{Topic: topic, Contents: topicContents})
if len(alertTp.Mails) > 0 {
m := gomail.NewMessage()
m.SetHeader("From", viper.GetString("alert_mail_user"))
m.SetHeader("To", alertTp.Mails...)
m.SetHeader("Subject", fmt.Sprintf("服务器报警: %s", topic))
m.SetBody("text/plain", msg.String())
err := mailConn.DialAndSend(m)
log.Println("send mail", alertTp.Mails, msg.String(), err)
if err != nil {
sentry.CaptureException(err)
}
}
if len(alertTp.Robots) > 0 {
for _, robot := range alertTp.Robots {
err := wechat.SendQYWechatRobotMarkDown(robot, msg.String())
log.Println("send robot", robot, msg.String(), err)
if err != nil {
sentry.CaptureException(err)
}
}
}
} else {
log.Println("can't find topic:", topic)
}
}
for i := 0; i < len(receipts); i += 16 {
j := i + 16
if j > len(receipts) {
j = len(receipts)
}
rsp, err := alertQueue.client.BatchDeleteMessage(alertQueue.queue, receipts[i:j])
if err == nil && rsp.Code() != 0 {
err = fmt.Errorf("requestId(%s),response(%v)", rsp.RequestId(), rsp.Code())
}
if err != nil && err != context.DeadlineExceeded {
log.Println("delete receipts ", receipts[i:j], err)
}
}
}
}
package exception
import (
"fmt"
"os"
"testing"
"time"
"github.com/spf13/viper"
)
func TestSendContent(t *testing.T) {
viper.SetConfigFile("../sdktool/config.yaml")
// If a config file is found, read it in.
if err := viper.ReadInConfig(); err == nil {
fmt.Println("Using config file:", viper.ConfigFileUsed())
}
SetAlertViper()
SendContent("test", "测试一下报警邮件", "111", "222", 3333)
SendContent("test", "测试一下报警邮件", "111", "333", 3333)
time.Sleep(time.Minute)
}
func TestDefault(t *testing.T) {
data := struct {
Topic string
Contents []*AlertContent
}{"ttt", []*AlertContent{
{
Content: "你好",
Args: []interface{}{"fjaljf", "fjlf"},
},
}}
defaultTemplate.Execute(os.Stdout, data)
}
package exception
// Assert 断言
func Assert(err error) {
if err != nil {
panic(err)
}
}
package exception
import (
"os"
"github.com/getsentry/sentry-go"
"github.com/spf13/viper"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/version"
)
var host, _ = os.Hostname()
var pid = os.Getpid()
// SentryInit 初始化sentry
func SentryInit(dsn string) error {
return sentry.Init(sentry.ClientOptions{
Dsn: dsn,
ServerName: host,
})
}
// SentryInitWithViper 通过viper获取配置
func SentryInitWithViper() error {
return sentry.Init(sentry.ClientOptions{
Dsn: viper.GetString("sentry_dsn"),
Release: version.Version + "-" + version.GitHash,
Environment: viper.GetString("sentry_env"),
ServerName: host,
Debug: viper.GetBool("sentry_debug"),
})
}
package exception
import (
"errors"
"testing"
"time"
"github.com/getsentry/sentry-go"
)
func TestSentryInit(t *testing.T) {
Assert(SentryInit("http://c823130ef5094713a34e58280767080f@sentry.vchangyi.com/16"))
sentry.CaptureException(errors.New("TestSentryInit"))
sentry.Flush(time.Second * 5)
}
module gitlab-ce.k8s.tools.vchangyi.com/project/ncrs/backend/cy-sdk-go
go 1.12
require (
github.com/ClickHouse/clickhouse-go v1.4.1
github.com/OneOfOne/xxhash v1.2.8
github.com/douyu/jupiter v0.2.2
github.com/edwingeng/wuid v0.0.0-20200721021842-1655b1f205ea
github.com/frankban/quicktest v1.10.0 // indirect
github.com/getsentry/sentry-go v0.7.0
github.com/glutwins/webclient v0.0.0-20170120042132-419c6416fbd4
github.com/go-mysql-org/go-mysql v1.3.0
github.com/go-redis/redis v6.15.8+incompatible
github.com/go-sql-driver/mysql v1.5.0
github.com/golang/protobuf v1.4.2
github.com/gomodule/redigo v1.7.1-0.20190724094224-574c33c3df38
github.com/jmoiron/sqlx v1.3.3
github.com/json-iterator/go v1.1.10
github.com/juju/ratelimit v1.0.1
github.com/labstack/echo/v4 v4.1.16
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/lestrrat-go/strftime v1.0.3 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/robfig/cron v1.2.0
github.com/rs/zerolog v1.19.0
github.com/simukti/sqldb-logger v0.0.0-20200602044015-843152fd150e
github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.1
github.com/tebeka/strftime v0.1.5 // indirect
github.com/yougg/cmq-go-tdmq v0.0.0-20211110083622-e09ffe63fedd
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0
golang.org/x/text v0.3.6
google.golang.org/grpc v1.31.0
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
This source diff could not be displayed because it is too large. You can view the blob instead.
package logger
import (
"fmt"
"io"
"log"
"os"
"path/filepath"
"time"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/rs/zerolog"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
)
// RotateType 轮转类型
type RotateType time.Duration
const (
// RotateWeek 按周轮转
RotateWeek RotateType = RotateType(time.Hour * 24 * 7)
// RotateDay 按日轮转
RotateDay RotateType = RotateType(time.Hour * 24)
// RotateHour 按日轮转
RotateHour RotateType = RotateType(time.Hour)
// RotateMinute 按分钟轮转
RotateMinute RotateType = RotateType(time.Minute)
)
// NewRotateWriter 轮转日志写入
func NewRotateWriter(name string, rt RotateType) io.Writer {
var pattern string = name + ".%Y%m%d%H.log"
var duration time.Duration = time.Duration(rt)
switch rt {
case RotateWeek:
pattern = name + ".%Y%W.log"
case RotateDay:
pattern = name + ".%Y%m%d.log"
case RotateMinute:
pattern = name + ".%Y%m%d%H%M.log"
}
fd, err := rotatelogs.New(
pattern,
rotatelogs.WithLinkName(name+".log"),
rotatelogs.WithRotationTime(duration),
rotatelogs.WithMaxAge(-1),
rotatelogs.WithLocation(time.Local),
rotatelogs.WithRotationCount(30),
rotatelogs.WithLinkName(""),
)
exception.Assert(err)
return fd
}
// NewRotateWriterWithHostName 增加主机名称
func NewRotateWriterWithHostName(path string, name string, rt RotateType) io.Writer {
h, _ := os.Hostname()
return NewRotateWriter(filepath.Join(path, name+"-"+h), rt)
}
// NewSimpleRotateLogger 新建轮转日志
func NewSimpleRotateLogger(name string, rt RotateType) *log.Logger {
return log.New(NewRotateWriter(name, rt), "", 0)
}
// NewSimpleRotateLoggerWithHostName 新建轮转日志
func NewSimpleRotateLoggerWithHostName(path string, name string, rt RotateType) *log.Logger {
return log.New(NewRotateWriterWithHostName(path, name, rt), "", 0)
}
// NewZeroRotateLogger zerolog轮转日志
func NewZeroRotateLogger(name string, rt RotateType) zerolog.Logger {
return zerolog.New(NewRotateWriter(name, rt)).With().Timestamp().Logger()
}
// NewCurrentLogger 新建本次启动日志
func NewCurrentLogger(name string) *log.Logger {
fd, err := os.OpenFile(fmt.Sprintf("%s.%d", name, time.Now().Unix()), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
exception.Assert(err)
return log.New(fd, "", 0)
}
// NewRotateLogger 创建轮转日志
func NewRotateLogger(filename string, level zapcore.Level) *zap.Logger {
hook := lumberjack.Logger{
Filename: filename, // 日志文件路径
MaxBackups: 7,
MaxAge: 28, //days
LocalTime: true,
Compress: true, // 是否压缩 disabled by default
}
w := zapcore.AddSync(&hook)
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
return zap.New(zapcore.NewCore(zapcore.NewConsoleEncoder(encoderConfig), w, level))
}
package logger
import (
"log"
"testing"
)
func TestNewRotateWriterWithHostName(t *testing.T) {
w := NewRotateWriterWithHostName(".", "test", RotateDay)
w.Write([]byte("xxxxx"))
dlog := log.New(w, "", 0)
dlog.Println("test1")
dlog.Println("test2")
}
package logger
import (
"bufio"
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net"
"net/http"
"strconv"
"time"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
// EchoLoggerConfig defines the config for Logger middleware.
type EchoLoggerConfig struct {
// Skipper defines a function to skip middleware.
Skipper middleware.Skipper
Output io.Writer
// WithBody 是否记录消息内容
WithBody bool
// Header 消息头
Header map[string]string
}
type bodyDumpResponseWriter struct {
io.Writer
http.ResponseWriter
}
func (w *bodyDumpResponseWriter) WriteHeader(code int) {
w.ResponseWriter.WriteHeader(code)
}
func (w *bodyDumpResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
}
func (w *bodyDumpResponseWriter) Flush() {
w.ResponseWriter.(http.Flusher).Flush()
}
func (w *bodyDumpResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return w.ResponseWriter.(http.Hijacker).Hijack()
}
// EchoLogger returns a middleware that logs HTTP requests.
func EchoLogger() echo.MiddlewareFunc {
return EchoLoggerWithConfig(EchoLoggerConfig{})
}
type logEvent struct {
Ts int64 `json:"ts"`
IP string `json:"ip"`
Protocol string `json:"protocol"`
Method string `json:"method"`
Host string `json:"host"`
Path string `json:"path"`
IB int `json:"ib"`
OB int64 `json:"ob"`
Req json.RawMessage `json:"req,omitempty"`
Resp json.RawMessage `json:"resp,omitempty"`
Status int `json:"status"`
Latency int `json:"latency"`
User interface{} `json:"user,omitempty"`
Option interface{} `json:"option,omitempty"`
Query map[string]string `json:"query,omitempty"`
Header map[string]string `json:"header,omitempty"`
Error error `json:"err,omitempty"`
}
// EchoLoggerWithConfig returns a Logger middleware with config.
// See: `Logger()`.
func EchoLoggerWithConfig(config EchoLoggerConfig) echo.MiddlewareFunc {
// Defaults
if config.Skipper == nil {
config.Skipper = middleware.DefaultSkipper
}
if config.Output == nil {
config.Output = NewRotateWriter("echo", RotateDay)
}
return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) (err error) {
if config.Skipper(c) {
return next(c)
}
req := c.Request()
res := c.Response()
start := time.Now()
ev := &logEvent{}
if cl := req.Header.Get(echo.HeaderContentLength); cl != "" {
ev.IB, _ = strconv.Atoi(cl)
}
ev.Ts = start.Unix()
ev.Protocol = req.Proto
ev.Host = req.Host
ev.Method = req.Method
ev.Path = req.URL.Path
ev.IP = c.RealIP()
defer func() {
b, _ := json.Marshal(ev)
config.Output.Write(b)
config.Output.Write([]byte("\n"))
}()
if config.WithBody {
reqBody := []byte{}
if c.Request().Body != nil { // Read
reqBody, _ = ioutil.ReadAll(c.Request().Body)
c.Request().Body.Close()
}
c.Request().Body = ioutil.NopCloser(bytes.NewBuffer(reqBody)) // Reset
ev.Req = reqBody
// Response
resBody := new(bytes.Buffer)
mw := io.MultiWriter(res.Writer, resBody)
res.Writer = &bodyDumpResponseWriter{Writer: mw, ResponseWriter: res.Writer}
defer func() { ev.Resp = resBody.Bytes() }()
}
query := c.QueryParams()
if len(query) > 0 {
ev.Query = make(map[string]string)
for k := range query {
ev.Query[k] = query.Get(k)
}
}
if len(config.Header) > 0 {
ev.Header = make(map[string]string)
for k, flattern := range config.Header {
if v := req.Header.Get(k); v != "" {
ev.Header[flattern] = v
}
}
}
if err = next(c); err != nil {
ev.Error = err
c.Error(err)
}
ev.User = c.Get("user")
ev.Option = c.Get("option")
ev.Status = res.Status
ev.Latency = int(time.Now().Sub(start) / time.Millisecond)
ev.OB = res.Size
return
}
}
}
package logger
import (
"context"
"github.com/rs/zerolog"
sqldblogger "github.com/simukti/sqldb-logger"
)
type zerologAdapter struct {
logger zerolog.Logger
}
// NewSQLLogger sql日志
func NewSQLLogger() sqldblogger.Logger {
return &zerologAdapter{logger: NewZeroRotateLogger("sql", RotateDay)}
}
// Log implement sqldblogger.Logger and log it as is.
// To use context.Context values, please copy this file and adjust to your needs.
func (zl *zerologAdapter) Log(_ context.Context, level sqldblogger.Level, msg string, data map[string]interface{}) {
var lvl zerolog.Level
switch level {
case sqldblogger.LevelError:
lvl = zerolog.ErrorLevel
case sqldblogger.LevelInfo:
lvl = zerolog.InfoLevel
case sqldblogger.LevelDebug:
lvl = zerolog.DebugLevel
case sqldblogger.LevelTrace:
lvl = zerolog.TraceLevel
default:
lvl = zerolog.DebugLevel
}
zl.logger.WithLevel(lvl).Fields(data).Msg(msg)
}
# Golang 开发工具库
## 使用方式
1. 参考go第三方库使用方法
2. 配置git将请求从ssh转化为http:
```git config --global url."ssh://git@gitlab-ce.k8s.tools.vchangyi.com:32201".insteadOf "http://gitlab-ce.k8s.tools.vchangyi.com"```
3. 如果定义了GOPROXY环境变量,额外定义 ```GOPRIVATE=gitlab-ce.k8s.tools.vchangyi.com```
## 模块说明
### splisten
Supervisor 子进程异常状态监控
```由于Supervirsor 通过stdin/stdout 和EventListener 进行协议通信,所以EventListener 不能通过stdout输入其他非协议内容,不然EventListener的状态就会不正常 这个暂时还没找到解决方案,暂时把initConfig里的输出代码注释,然后编译```
\ No newline at end of file
package region
import (
"fmt"
"log"
"time"
"github.com/jmoiron/sqlx"
)
const (
LevelCountry = 0
LevelProvince = 1
LevelCity = 2
LevelDistrict = 3
)
type Region struct {
ID int `db:"id"`
Name string `db:"sname"`
Pinyin string `db:"pinyin"`
Pid int `db:"pid"`
Level int `db:"level"`
subRegionNameMap map[string]*Region `db:"-"`
subRegionPinyinMap map[string]*Region `db:"-"`
}
func NewRegion(db *sqlx.DB) (region *Region, err error) {
region = &Region{
subRegionNameMap: make(map[string]*Region, 0),
subRegionPinyinMap: make(map[string]*Region, 0),
}
err = region.init(db)
return region, err
}
func (r *Region) init(db *sqlx.DB) (err error) {
now := time.Now()
defer func() {
log.Println("region init use time:", time.Now().Sub(now))
}()
regions := make([]*Region, 0)
pmap := make(map[int][]*Region, 0)
err = db.Select(&regions, "select id,pid,sname,level,pinyin from region")
if err != nil {
return err
}
for _, r := range regions {
r.subRegionNameMap = make(map[string]*Region, 0)
r.subRegionPinyinMap = make(map[string]*Region, 0)
if _, ok := pmap[r.Pid]; !ok {
pmap[r.Pid] = make([]*Region, 0)
}
pmap[r.Pid] = append(pmap[r.Pid], r)
}
var addsubRegion func(*Region) error
addsubRegion = func(base *Region) error {
if subrs, ok := pmap[base.ID]; ok {
for _, subr := range subrs {
if err := addsubRegion(subr); err != nil {
return err
}
if err := base.AddsubRegion(subr); err != nil {
return err
}
}
}
return nil
}
return addsubRegion(r)
}
func (a *Region) AddsubRegion(subRegion *Region) error {
if len(subRegion.Name) == 0 || len(subRegion.Pinyin) == 0 {
log.Println(fmt.Sprintf("Aare 信息不完整:(id:%d,name:%s,pinyin:%s)", subRegion.ID, subRegion.Name, subRegion.Pinyin))
return nil
}
a.subRegionNameMap[subRegion.Name] = subRegion
a.subRegionPinyinMap[subRegion.Pinyin] = subRegion
return nil
}
func (a *Region) GetSubRegionByName(name string) *Region {
return a.subRegionNameMap[name]
}
func (a *Region) GetSubRegionByPinyin(pinyin string) *Region {
return a.subRegionPinyinMap[pinyin]
}
func (a *Region) RangeSub(fun func(sub *Region) bool) {
for _, sub := range a.subRegionNameMap {
if !fun(sub) {
break
}
}
}
package schedule
import (
"reflect"
"time"
gocron "github.com/robfig/cron"
)
type TimeTaskOption func(c *TimerTask)
type ErrHandler func(name string, err error)
type TaskStatusHandler func(name string, nextAt time.Time)
type ErrHandlerWithContext func(context TaskContext, err error)
type TaskStatusHandlerWithContext func(context TaskContext)
type TimeTaskFunc func(...interface{}) error
type TaskContext struct {
id int64
Name string
fuc TimeTaskFunc
args []interface{}
Duration time.Duration
}
func WithErrHandler(errHandler ErrHandler) TimeTaskOption {
return func(tt *TimerTask) {
tt.errHandler = errHandler
}
}
func WithRunHandler(runHandler TaskStatusHandler) TimeTaskOption {
return func(tt *TimerTask) {
tt.runHandler = runHandler
}
}
func WithEndHandler(runHandler TaskStatusHandler) TimeTaskOption {
return func(tt *TimerTask) {
tt.endHandler = runHandler
}
}
func WithContextErrHandler(errHandler ErrHandlerWithContext) TimeTaskOption {
return func(tt *TimerTask) {
tt.errContextHandler = errHandler
}
}
func WithContextRunHandler(runHandler TaskStatusHandlerWithContext) TimeTaskOption {
return func(tt *TimerTask) {
tt.runContextHandler = runHandler
}
}
func WithContextEndHandler(runHandler TaskStatusHandlerWithContext) TimeTaskOption {
return func(tt *TimerTask) {
tt.endContextHandler = runHandler
}
}
type TimerTask struct {
tws []*TaskContext
cases []reflect.SelectCase
durations map[*TaskContext]time.Duration
errHandler ErrHandler
runHandler TaskStatusHandler
endHandler TaskStatusHandler
errContextHandler ErrHandlerWithContext
runContextHandler TaskStatusHandlerWithContext
endContextHandler TaskStatusHandlerWithContext
cron *gocron.Cron
}
// type taskWrapper struct {
// id int64
// fuc func() error
// name string
// }
func NewTimerTask(options ...TimeTaskOption) *TimerTask {
tt := &TimerTask{
tws: make([]*TaskContext, 0, 0),
cases: make([]reflect.SelectCase, 0, 0),
durations: make(map[*TaskContext]time.Duration, 0),
}
for _, opt := range options {
opt(tt)
}
tt.cron = gocron.New()
return tt
}
/*
firstAt :首次执行时间,<=当前时间,立即执行
duration: 之后执行的时间间隔
task: 任务
taskName: 任务名称
*/
func (tt *TimerTask) Do(firstAt time.Time, duration time.Duration, task TimeTaskFunc, taskName string, args ...interface{}) error {
now := time.Now()
at := now
if firstAt.After(now) {
at = firstAt
}
t := time.NewTimer(at.Sub(now))
tw := &TaskContext{id: at.Unix(), fuc: task, Name: taskName, args: args}
tt.cases = append(tt.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(t.C)})
tt.tws = append(tt.tws, tw)
tt.durations[tw] = duration
return nil
}
/*
表达式说明:
Usage
Callers may register Funcs to be invoked on a given schedule. Cron will run
them in their own goroutines.
Cron("0 30 * * * *", func(arg ...interface{}) { fmt.Println("Every hour on the half hour") }("args..."))
Cron("@hourly", func(arg ...interface{}) { fmt.Println("Every hour") }("args..."))
Cron("@every 1h30m", func(arg ...interface{}) { fmt.Println("Every hour thirty")("args...") })
// Funcs are invoked in their own goroutine, asynchronously.
...
// Funcs may also be added to a running Cron
Cron("@daily", func(arg ...interface{}) { fmt.Println("Every day") }("args..."))
CRON Expression Format
A cron expression represents a set of times, using 6 space-separated fields.
Field name | Mandatory? | Allowed values | Allowed special characters
---------- | ---------- | -------------- | --------------------------
Seconds | Yes | 0-59 | * / , -
Minutes | Yes | 0-59 | * / , -
Hours | Yes | 0-23 | * / , -
Day of month | Yes | 1-31 | * / , - ?
Month | Yes | 1-12 or JAN-DEC | * / , -
Day of week | Yes | 0-6 or SUN-SAT | * / , - ?
Note: Month and Day-of-week field values are case insensitive. "SUN", "Sun",
and "sun" are equally accepted.
Special Characters
Asterisk ( * )
The asterisk indicates that the cron expression will match for all values of the
field; e.g., using an asterisk in the 5th field (month) would indicate every
month.
Slash ( / )
Slashes are used to describe increments of ranges. For example 3-59/15 in the
1st field (minutes) would indicate the 3rd minute of the hour and every 15
minutes thereafter. The form "*\/..." is equivalent to the form "first-last/...",
that is, an increment over the largest possible range of the field. The form
"N/..." is accepted as meaning "N-MAX/...", that is, starting at N, use the
increment until the end of that specific range. It does not wrap around.
Comma ( , )
Commas are used to separate items of a list. For example, using "MON,WED,FRI" in
the 5th field (day of week) would mean Mondays, Wednesdays and Fridays.
Hyphen ( - )
Hyphens are used to define ranges. For example, 9-17 would indicate every
hour between 9am and 5pm inclusive.
Question mark ( ? )
Question mark may be used instead of '*' for leaving either day-of-month or
day-of-week blank.
Predefined schedules
You may use one of several pre-defined schedules in place of a cron expression.
Entry | Description | Equivalent To
----- | ----------- | -------------
@yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 0 1 1 *
@monthly | Run once a month, midnight, first of month | 0 0 0 1 * *
@weekly | Run once a week, midnight on Sunday | 0 0 0 * * 0
@daily (or @midnight) | Run once a day, midnight | 0 0 0 * * *
@hourly | Run once an hour, beginning of hour | 0 0 * * * *
Intervals
You may also schedule a job to execute at fixed intervals, starting at the time it's added
or cron is run. This is supported by formatting the cron spec like this:
*******************************************************************************************
@every <duration>
where "duration" is a string accepted by time.ParseDuration
(http://golang.org/pkg/time/#ParseDuration).
For example, "@every 1h30m10s" would indicate a schedule that activates immediately,
and then every 1 hour, 30 minutes, 10 seconds.
Note: The interval does not take the job runtime into account. For example,
if a job takes 3 minutes to run, and it is scheduled to run every 5 minutes,
it will have only 2 minutes of idle time between each run.
Time zones
All interpretation and scheduling is done in the machine's local time zone (as
provided by the Go time package (http://www.golang.org/pkg/time).
Be aware that jobs scheduled during daylight-savings leap-ahead transitions will
not be run!
Thread safety
Since the Cron service runs concurrently with the calling code, some amount of
care must be taken to ensure proper synchronization.
All cron methods are designed to be correctly synchronized as long as the caller
ensures that invocations have a clear happens-before ordering between them.
Implementation
Cron entries are stored in an array, sorted by their next activation time. Cron
sleeps until the next job is due to be run.
Upon waking:
- it runs each entry that is active on that second
- it calculates the next run times for the jobs that were run
- it re-sorts the array of entries by next activation time.
- it goes to sleep until the soonest job.
*/
func (tt *TimerTask) Cron(spec string, task TimeTaskFunc, taskName string, args ...interface{}) error {
ctx := TaskContext{id: time.Now().Unix(), fuc: task, Name: taskName, args: args}
return tt.cron.AddFunc(spec, func() {
sch, _ := gocron.Parse(spec)
next := sch.Next(time.Now())
if tt.runHandler != nil {
tt.runHandler(taskName, next)
}
if tt.runContextHandler != nil {
tt.runContextHandler(ctx)
}
err := task(args)
if tt.endHandler != nil {
tt.endHandler(taskName, next)
}
if tt.endContextHandler != nil {
tt.endContextHandler(ctx)
}
if err != nil {
if tt.errHandler != nil {
tt.errHandler(taskName, err)
}
if tt.errContextHandler != nil {
tt.errContextHandler(ctx, err)
}
}
})
}
func (tt *TimerTask) Run() {
tt.cron.Start()
go func() {
for {
chosen, _, _ := reflect.Select(tt.cases)
tw := tt.tws[chosen]
tt.cases[chosen] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(time.NewTimer(tt.durations[tw]).C)}
go func(tw *TaskContext) {
if tt.runHandler != nil {
tt.runHandler(tw.Name, time.Now().Add(tt.durations[tw]))
}
if tt.runContextHandler != nil {
tt.runContextHandler(*tw)
}
err := tw.fuc(tw.args...)
if tt.endHandler != nil {
tt.endHandler(tw.Name, time.Now().Add(tt.durations[tw]))
}
if tt.endContextHandler != nil {
tt.endContextHandler(*tw)
}
if err != nil {
if tt.errHandler != nil {
tt.errHandler(tw.Name, err)
}
if tt.errContextHandler != nil {
tt.errContextHandler(*tw, err)
}
}
}(tw)
}
}()
}
package schedule
import (
"fmt"
"testing"
"time"
)
func TestTimeTask(t *testing.T) {
tt := NewTimerTask(WithErrHandler(func(name string, err error) {
fmt.Sprintln("task ", name, err.Error())
}), WithRunHandler(func(name string, nextAt time.Time) {
fmt.Sprintln("task ", name, " nexttime run at ", nextAt)
}))
tt.Do(time.Now().Add(time.Second*5), time.Second*5, func(arg ...interface{}) error {
println("task run")
println(time.Now().Unix())
return nil
}, "test task 1")
tt.Do(time.Now().Add(-time.Second*50), time.Second*5, func(arg ...interface{}) error {
println("task run")
println(time.Now().Unix())
return nil
}, "test task 2")
tt.Cron("*/5 * * * * ?", func(arg ...interface{}) error {
println("cron task run")
println(time.Now().Unix())
return nil
}, "cron test task 1")
tt.Run()
time.Sleep(time.Second * 30)
}
This diff is collapsed.
#cgo的dnslookup时,调用glibc getaddrinfo会存在bug
#可以通过netgo tag调用go 原生的dnslookup
if [ -x "$(command -v git)" ];then
branch=`git branch -vv | grep "*" | awk '{print $2}'`
echo -e "\033[31m\033[01m\033[05m[ 当前分支:$branch ]\033[0m"
fi
basepath=$(cd `dirname $0`; pwd)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -tags netgo -ldflags "-X 'gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/version.GitHash=$(git show -s --format=%h)' -X 'gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/version.Version=1.0.0'"
/*
Copyright © 2020 NAME HERE <EMAIL ADDRESS>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"github.com/spf13/cobra"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
)
// mailCmd represents the mail command
var mailCmd = &cobra.Command{
Use: "mail",
Short: "错误报警分发",
Long: `错误报警分发,参考配置`,
Run: func(cmd *cobra.Command, args []string) {
go exception.HttpServer()
exception.ProcessAlert()
},
}
func init() {
rootCmd.AddCommand(mailCmd)
}
/*
Copyright © 2020 NAME HERE <EMAIL ADDRESS>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"fmt"
"github.com/jmoiron/sqlx"
"github.com/spf13/cobra"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat"
// mysql
_ "github.com/go-sql-driver/mysql"
)
var partnerStockID string
var partnerMchID string
var sourceMchID string
var sourceDsn string
// partnerCmd represents the partner command
var partnerCmd = &cobra.Command{
Use: "partner",
Short: "建立委托营销关系",
Long: `建立委托营销关系:参数商户号和批次号必须指定`,
Run: func(cmd *cobra.Command, args []string) {
db := sqlx.MustConnect("mysql", sourceDsn)
cfg := &wechat.ClientV3Config{}
exception.Assert(db.Get(cfg, "select mch_id, serial_no, pemkey, pemcert, appid, api_secret, service_mch_id from hwj_mch_conf where mch_id=?", sourceMchID))
wc, err := wechat.NewWechatClientV3WithPem(cfg, nil)
exception.Assert(err)
if partnerStockID != "" && partnerMchID != "" {
result, err := wc.PartnerShip(wechat.StockTypeBusiFavor, partnerMchID, partnerStockID)
if err != nil {
fmt.Println("建立失败", err)
} else {
fmt.Println("建立成功", result)
}
}
},
}
func init() {
rootCmd.AddCommand(partnerCmd)
// Here you will define your flags and configuration settings.
partnerCmd.Flags().StringVar(&sourceDsn, "dsn", "", "批次号配置数据库")
partnerCmd.Flags().StringVar(&sourceMchID, "source-mchid", "1561018721", "批次号所属商户")
partnerCmd.Flags().StringVar(&partnerStockID, "partner-stock", "", "批次号")
partnerCmd.Flags().StringVar(&partnerMchID, "partner-mchid", "", "授权商户号")
}
/*
Copyright © 2020 NAME HERE <EMAIL ADDRESS>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"fmt"
"os"
"github.com/spf13/cobra"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/version"
"github.com/spf13/viper"
)
var cfgFile string
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "sdktool",
Short: "sdk工具",
Long: `sdk支持工具,如报警邮件处理程序`,
// Uncomment the following line if your bare application
// has an action associated with it:
// Run: func(cmd *cobra.Command, args []string) { },
}
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
func init() {
rootCmd.Version = version.Version + "-" + version.GitHash
cobra.OnInitialize(initConfig)
// Here you will define your flags and configuration settings.
// Cobra supports persistent flags, which, if defined here,
// will be global for your application.
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "config.yaml", "config file (default is config.yaml)")
}
// initConfig reads in config file and ENV variables if set.
func initConfig() {
viper.SetConfigFile(cfgFile)
viper.AutomaticEnv() // read in environment variables that match
// If a config file is found, read it in.
if err := viper.ReadInConfig(); err == nil {
fmt.Println("Using config file:", viper.ConfigFileUsed())
}
fmt.Println("version: ", rootCmd.Version)
exception.SentryInitWithViper()
}
package cmd
import (
"github.com/spf13/cobra"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/supervisor"
)
/*
由于Supervirsor 通过stdin/stdout 和EventListener 进行协议通信,所以EventListener 不能通过stdout输入其他非协议内容,不然EventListener的状态就会不正常
这个暂时还没找到解决方案,暂时把initConfig里的输出注释掉
*/
// mailCmd represents the mail command
var spListenerCmd = &cobra.Command{
Use: "splisten",
Short: "supervisor 进程状态监听报警",
Long: `supervisor 进程状态监听报警`,
Run: func(cmd *cobra.Command, args []string) {
exception.SetAlertViper()
supervisor.NewListener().Start()
},
}
func init() {
rootCmd.AddCommand(spListenerCmd)
}
/*
Copyright © 2020 NAME HERE <EMAIL ADDRESS>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"github.com/spf13/cobra"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/sdktool/tkserver"
)
// tkserverCmd represents the tkserver command
var tkserverCmd = &cobra.Command{
Use: "tkserver",
Short: "微信access_token获取服务",
Long: `微信access_token获取服务`,
Run: func(cmd *cobra.Command, args []string) {
exception.Assert(exception.SentryInitWithViper())
engine := tkserver.NewEngine()
exception.Assert(engine.Run())
},
}
func init() {
rootCmd.AddCommand(tkserverCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// tkserverCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// tkserverCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}
alert_secret_id: "AKIDDXGlBPuRHwRVKt8OdonX0Yx5xZfzaJYo"
alert_secret_key: "9D9pIY5Xoqrc7XiDMFpE5mDnBOfkgZq5"
# alert_endpoint: "http://cmq-queue-sh.api.tencentyun.com"
# alert_endpoint: "https://cmq-sh.public.tencenttdmq.com"
#alert_endpoint: "https://cmq-queue-sh.api.qcloud.com"
alert_endpoint: "http://sh.mqadapter.cmq.tencentyun.com"
alert_queue: prod-cdp-alert
alert_topics:
test:
mails:
- luhong@vchangyi.com
robots:
- https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=8f5e75de-6117-41ec-bdf8-c8256fef6256
filter: arg0
alert_mail_host: smtp.exmail.qq.com
alert_mail_port: 465
alert_mail_user: data-report@vchangyi.com
alert_mail_pass: ZAQ12wsx
sentry_dsn: "http://0755d5cb42ee4f13adf0515b00c70e42@sentry.vchangyi.com/19"
sentry_env: "test"
\ No newline at end of file
/*
Copyright © 2020 NAME HERE <EMAIL ADDRESS>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import "gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/sdktool/cmd"
func main() {
cmd.Execute()
}
package tkserver
import (
context "context"
"errors"
"github.com/douyu/jupiter"
"github.com/douyu/jupiter/pkg/server/xgrpc"
"github.com/getsentry/sentry-go"
"github.com/spf13/viper"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/cache"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/client"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/crypt"
)
type tkServerEngine struct {
jupiter.Application
apps map[string]*client.Client
apikey string
}
func (engine *tkServerEngine) serverRPC() error {
cfg := xgrpc.DefaultConfig()
cfg.Host = viper.GetString("tkserver.addr")
cfg.Port = viper.GetInt("tkserver.port")
server := cfg.Build()
RegisterTKServerServer(server.Server, &impTKServer{engine: engine})
return engine.Serve(server)
}
// NewEngine 创建服务
func NewEngine() *jupiter.Application {
engine := &tkServerEngine{
apps: make(map[string]*client.Client),
apikey: viper.GetString("apikey"),
}
apps := viper.GetStringMapString("apps")
for appid, appsecret := range apps {
engine.apps[appid] = client.NewClient(&crypt.WechatConfig{
AppID: appid,
AppSecret: appsecret,
}, cache.NewJsonFileCache(appid+".json"))
}
exception.Assert(engine.Startup(engine.serverRPC))
return &engine.Application
}
type impTKServer struct {
engine *tkServerEngine
}
// ErrAuth 未授权
var ErrAuth = errors.New("noauth")
// ErrAppID 应用不支持
var ErrAppID = errors.New("appid")
func (g impTKServer) AccessToken(context context.Context, request *AccessTokenRequest) (*AccessTokenReply, error) {
if request.Apikey != g.engine.apikey {
return nil, ErrAuth
}
c, ok := g.engine.apps[request.Appid]
if !ok {
return nil, ErrAppID
}
reply := &AccessTokenReply{}
var err error
if reply.Token, err = c.GetAccessToken(); err != nil {
sentry.CaptureException(err)
return nil, err
}
if reply.Token == request.Expired {
c.ExpireToken()
if reply.Token, err = c.GetAccessToken(); err != nil {
sentry.CaptureException(err)
return nil, err
}
}
return reply, nil
}
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: tkserver.proto
package tkserver
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type AccessTokenRequest struct {
Apikey string `protobuf:"bytes,1,opt,name=apikey,proto3" json:"apikey,omitempty"`
Appid string `protobuf:"bytes,2,opt,name=appid,proto3" json:"appid,omitempty"`
Expired string `protobuf:"bytes,3,opt,name=expired,proto3" json:"expired,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AccessTokenRequest) Reset() { *m = AccessTokenRequest{} }
func (m *AccessTokenRequest) String() string { return proto.CompactTextString(m) }
func (*AccessTokenRequest) ProtoMessage() {}
func (*AccessTokenRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_2c3d75d18c198611, []int{0}
}
func (m *AccessTokenRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AccessTokenRequest.Unmarshal(m, b)
}
func (m *AccessTokenRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_AccessTokenRequest.Marshal(b, m, deterministic)
}
func (m *AccessTokenRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_AccessTokenRequest.Merge(m, src)
}
func (m *AccessTokenRequest) XXX_Size() int {
return xxx_messageInfo_AccessTokenRequest.Size(m)
}
func (m *AccessTokenRequest) XXX_DiscardUnknown() {
xxx_messageInfo_AccessTokenRequest.DiscardUnknown(m)
}
var xxx_messageInfo_AccessTokenRequest proto.InternalMessageInfo
func (m *AccessTokenRequest) GetApikey() string {
if m != nil {
return m.Apikey
}
return ""
}
func (m *AccessTokenRequest) GetAppid() string {
if m != nil {
return m.Appid
}
return ""
}
func (m *AccessTokenRequest) GetExpired() string {
if m != nil {
return m.Expired
}
return ""
}
type AccessTokenReply struct {
Token string `protobuf:"bytes,1,opt,name=token,proto3" json:"token,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AccessTokenReply) Reset() { *m = AccessTokenReply{} }
func (m *AccessTokenReply) String() string { return proto.CompactTextString(m) }
func (*AccessTokenReply) ProtoMessage() {}
func (*AccessTokenReply) Descriptor() ([]byte, []int) {
return fileDescriptor_2c3d75d18c198611, []int{1}
}
func (m *AccessTokenReply) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AccessTokenReply.Unmarshal(m, b)
}
func (m *AccessTokenReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_AccessTokenReply.Marshal(b, m, deterministic)
}
func (m *AccessTokenReply) XXX_Merge(src proto.Message) {
xxx_messageInfo_AccessTokenReply.Merge(m, src)
}
func (m *AccessTokenReply) XXX_Size() int {
return xxx_messageInfo_AccessTokenReply.Size(m)
}
func (m *AccessTokenReply) XXX_DiscardUnknown() {
xxx_messageInfo_AccessTokenReply.DiscardUnknown(m)
}
var xxx_messageInfo_AccessTokenReply proto.InternalMessageInfo
func (m *AccessTokenReply) GetToken() string {
if m != nil {
return m.Token
}
return ""
}
func init() {
proto.RegisterType((*AccessTokenRequest)(nil), "AccessTokenRequest")
proto.RegisterType((*AccessTokenReply)(nil), "AccessTokenReply")
}
func init() {
proto.RegisterFile("tkserver.proto", fileDescriptor_2c3d75d18c198611)
}
var fileDescriptor_2c3d75d18c198611 = []byte{
// 162 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0xc9, 0x2e, 0x4e,
0x2d, 0x2a, 0x4b, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x57, 0x8a, 0xe1, 0x12, 0x72, 0x4c,
0x4e, 0x4e, 0x2d, 0x2e, 0x0e, 0xc9, 0xcf, 0x4e, 0xcd, 0x0b, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e,
0x11, 0x12, 0xe3, 0x62, 0x4b, 0x2c, 0xc8, 0xcc, 0x4e, 0xad, 0x94, 0x60, 0x54, 0x60, 0xd4, 0xe0,
0x0c, 0x82, 0xf2, 0x84, 0x44, 0xb8, 0x58, 0x13, 0x0b, 0x0a, 0x32, 0x53, 0x24, 0x98, 0xc0, 0xc2,
0x10, 0x8e, 0x90, 0x04, 0x17, 0x7b, 0x6a, 0x45, 0x41, 0x66, 0x51, 0x6a, 0x8a, 0x04, 0x33, 0x58,
0x1c, 0xc6, 0x55, 0xd2, 0xe0, 0x12, 0x40, 0x31, 0xbd, 0x20, 0x07, 0x6c, 0x46, 0x09, 0x88, 0x07,
0x35, 0x1a, 0xc2, 0x31, 0x72, 0xe6, 0xe2, 0x08, 0xf1, 0x0e, 0x06, 0xbb, 0x4c, 0xc8, 0x9c, 0x8b,
0x1b, 0x49, 0x97, 0x90, 0xb0, 0x1e, 0xa6, 0x0b, 0xa5, 0x04, 0xf5, 0xd0, 0x0d, 0x56, 0x62, 0x48,
0x62, 0x03, 0xfb, 0xc9, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x6b, 0x0b, 0x59, 0x23, 0xe5, 0x00,
0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// TKServerClient is the client API for TKServer service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type TKServerClient interface {
// 获取access_token
AccessToken(ctx context.Context, in *AccessTokenRequest, opts ...grpc.CallOption) (*AccessTokenReply, error)
}
type tKServerClient struct {
cc grpc.ClientConnInterface
}
func NewTKServerClient(cc grpc.ClientConnInterface) TKServerClient {
return &tKServerClient{cc}
}
func (c *tKServerClient) AccessToken(ctx context.Context, in *AccessTokenRequest, opts ...grpc.CallOption) (*AccessTokenReply, error) {
out := new(AccessTokenReply)
err := c.cc.Invoke(ctx, "/TKServer/AccessToken", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// TKServerServer is the server API for TKServer service.
type TKServerServer interface {
// 获取access_token
AccessToken(context.Context, *AccessTokenRequest) (*AccessTokenReply, error)
}
// UnimplementedTKServerServer can be embedded to have forward compatible implementations.
type UnimplementedTKServerServer struct {
}
func (*UnimplementedTKServerServer) AccessToken(ctx context.Context, req *AccessTokenRequest) (*AccessTokenReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method AccessToken not implemented")
}
func RegisterTKServerServer(s *grpc.Server, srv TKServerServer) {
s.RegisterService(&_TKServer_serviceDesc, srv)
}
func _TKServer_AccessToken_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AccessTokenRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TKServerServer).AccessToken(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/TKServer/AccessToken",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TKServerServer).AccessToken(ctx, req.(*AccessTokenRequest))
}
return interceptor(ctx, in, info, handler)
}
var _TKServer_serviceDesc = grpc.ServiceDesc{
ServiceName: "TKServer",
HandlerType: (*TKServerServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "AccessToken",
Handler: _TKServer_AccessToken_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "tkserver.proto",
}
syntax = "proto3";
service TKServer {
// 获取access_token
rpc AccessToken (AccessTokenRequest) returns (AccessTokenReply) {}
}
message AccessTokenRequest {
string apikey = 1;
string appid = 2;
string expired = 3;
}
message AccessTokenReply {
string token = 1;
}
\ No newline at end of file
package supervisor
import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
"time"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/schedule"
"go.uber.org/multierr"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
)
const (
READY = "READY\n"
RESULTOK = "RESULT 2\nOK"
)
type deamon struct {
pid string
pname string
nextAlert int64
alertTime int
}
func (d *deamon) resetState() {
d.nextAlert = 0
d.alertTime = 0
}
func (d *deamon) alert() {
now := time.Now().Unix()
if d.nextAlert < now {
log(fmt.Sprintf("进程:pid:%s pname:%s 未存活", d.pid, d.pname))
exception.SendAlert("supervisor_listener", fmt.Errorf("进程:pid:%s pname:%s 未存活", d.pid, d.pname))
d.alertTime += 1
if d.alertTime < 20 {
d.nextAlert = now + 60
} else if d.alertTime < 50 {
d.nextAlert = now + 300
} else if d.alertTime < 100 {
d.nextAlert = now + 1800
} else {
d.nextAlert = now + 7200
}
}
log(fmt.Sprintf("%s %s 下次报警时间:%d", d.pid, d.pname, d.nextAlert))
}
func NewListener() *Listener {
return &Listener{
deamons: make(map[string]*deamon, 0),
}
}
//Listener supervisor 子进程状态监听
type Listener struct {
deamons map[string]*deamon
lock sync.Mutex
}
func log(s string) {
fmt.Fprintf(os.Stderr, fmt.Sprintf("info:%s\n", s))
os.Stderr.Sync()
}
func (sl *Listener) state(s string) {
fmt.Fprintf(os.Stdout, s)
os.Stdout.Sync()
}
func (sl *Listener) registerDeamon(pname string, pid string) {
sl.lock.Lock()
defer sl.lock.Unlock()
sl.deamons[pname] = &deamon{pid: pid, pname: pname, nextAlert: 0, alertTime: 0}
}
func (sl *Listener) unRegisterDeamon(pname string, pid string) {
if _, ok := sl.deamons[pname]; ok {
sl.lock.Lock()
defer sl.lock.Unlock()
delete(sl.deamons, pname)
}
}
func (sl *Listener) init() {
cmd := exec.Command("supervisorctl", "status", "")
out, err := cmd.Output()
if err != nil {
log(err.Error())
}
outlines := strings.Split(string(out), "\n")
sl.lock.Lock()
defer sl.lock.Unlock()
for _, outline := range outlines {
spitem := strings.Split(outline, ",")
pinfo := strings.Split(deleteExtraSpace(spitem[0]), " ")
if len(pinfo) == 4 {
if pinfo[1] == "RUNNING" {
sl.deamons[pinfo[0]] = &deamon{pid: pinfo[3], pname: pinfo[0], nextAlert: 0, alertTime: 0}
}
}
}
}
func (sl *Listener) checkDeamonState() {
tt := schedule.NewTimerTask(schedule.WithErrHandler(func(name string, err error) {
exception.SendAlert("supervisor_listener", fmt.Errorf("%s:%s", name, err.Error()))
}), schedule.WithRunHandler(func(name string, nextAt time.Time) {
// log(name)
}))
tt.Do(time.Now(), time.Second*10, func(args ...interface{}) error {
sl.lock.Lock()
defer sl.lock.Unlock()
var err error
for _, deamon := range sl.deamons {
pid, e := strconv.Atoi(deamon.pid)
if e != nil {
err = multierr.Append(err, e)
continue
}
p, err := os.FindProcess(pid)
if err != nil {
deamon.alert()
} else {
if p == nil {
deamon.alert()
} else {
if err := p.Signal(syscall.Signal(0)); err != nil {
deamon.alert()
} else {
deamon.resetState()
}
}
}
}
return err
}, "进程状态监测")
tt.Run()
}
func (sl *Listener) recover(process, from, to string) {
log(fmt.Sprintf("进程 %s 被拉起", process))
exception.SendAlert("supervisor_listener", fmt.Errorf("进程 %s 拉起成功", process))
}
func (sl *Listener) panic(process, from, to string) {
log(fmt.Sprintf("进程 %s 异常退出,请注意排查", process))
exception.SendAlert("supervisor_listener", fmt.Errorf("进程 %s 异常退出,请注意排查", process))
}
func stringSliceContain(slice []string, value string) bool {
for _, item := range slice {
if item == value {
return true
}
}
return false
}
func deleteExtraSpace(s string) string {
s1 := strings.Replace(s, " ", " ", -1)
regstr := "\\s{2,}"
reg, _ := regexp.Compile(regstr)
s2 := make([]byte, len(s1))
copy(s2, s1)
spc_index := reg.FindStringIndex(string(s2))
for len(spc_index) > 0 {
s2 = append(s2[:spc_index[0]+1], s2[spc_index[1]:]...)
spc_index = reg.FindStringIndex(string(s2))
}
return string(s2)
}
//Start 开始监听子进程状态
func (sl *Listener) Start() {
sl.init()
sl.checkDeamonState()
lastSerial := "0"
lastPoolSerial := "0"
psExit := make(map[string]string, 0)
var body [4096]byte
reader := bufio.NewReader(os.Stdin)
for {
sl.state(READY)
line, _, err := reader.ReadLine()
if err != nil {
exception.SendAlert("supervisor_listener", err)
continue
}
log(string(line))
header := make(map[string]string, 0)
items := strings.Split(string(line), " ")
for _, item := range items {
kv := strings.Split(item, ":")
header[kv[0]] = kv[1]
}
serial := header["serial"]
poolSerial := header["poolserial"]
bodylen, err := strconv.ParseFloat(header["len"], 32)
if err != nil {
exception.SendAlert("supervisor_listener", err)
continue
}
_, err = io.ReadFull(reader, body[:int(bodylen)])
if err != nil {
exception.SendAlert("supervisor_listener", err)
continue
}
data := body[:int(bodylen)]
log("body+" + string(data))
if serial == lastSerial && poolSerial == lastPoolSerial {
sl.state(RESULTOK)
}
lastSerial = serial
lastPoolSerial = poolSerial
bodys := make(map[string]string, 0)
items = strings.Split(string(data), " ")
for _, item := range items {
kv := strings.Split(item, ":")
if len(kv) > 1 {
bodys[kv[0]] = kv[1]
}
}
process := bodys["processname"]
fromState := bodys["from_state"]
pid := bodys["pid"]
toState := header["eventname"]
log(fmt.Sprintf("\nprocessname:[%s] from_state:[%s] to_state:[%s] last_serial:[%s] last_pool_serial:[%s]\n", process, fromState, toState, lastSerial, lastPoolSerial))
if stringSliceContain([]string{"PROCESS_STATE_RUNNING"}, toState) {
sl.registerDeamon(process, pid)
if _, ok := psExit[process]; ok {
delete(psExit, process)
sl.recover(process, fromState, toState)
}
}
if stringSliceContain([]string{"PROCESS_STATE_STOPPED"}, toState) {
sl.unRegisterDeamon(process, pid)
}
if stringSliceContain([]string{"PROCESS_STATE_EXITED", "PROCESS_STATE_FATAL", "PROCESS_STATE_BACKOFF"}, toState) {
psExit[process] = ""
sl.panic(process, fromState, toState)
}
sl.state(RESULTOK)
}
}
package uuid
import (
"fmt"
"net"
"os"
"time"
"github.com/OneOfOne/xxhash"
wuid "github.com/edwingeng/wuid/callback/wuid"
rwuid "github.com/edwingeng/wuid/redis/wuid"
"github.com/go-redis/redis"
"github.com/spf13/viper"
)
// NewWuidWithHost 唯一标识生成器,主机标识
func NewWuidWithHost(tag string) (*wuid.WUID, error) {
g := wuid.NewWUID(tag, nil)
err := g.LoadH28WithCallback(func() (int64, func(), error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return 0, nil, err
}
hostname, _ := os.Hostname()
for _, address := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
ipv4 := ipnet.IP.To4()
if ipv4 != nil {
hostname = hostname + ipv4.String()
break
}
}
}
h := xxhash.New32()
h.WriteString(hostname)
return int64(h.Sum32() & 0x07FFFFFF), nil, nil
})
return g, err
}
// NewWuidWithRedis 唯一标识生成器,基于redis,key:生产id在redis中key
func NewWuidWithRedis(tag string, addr, pwd, key string, db int) (*rwuid.WUID, error) {
newClient := func() (redis.Cmdable, bool, error) {
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: pwd,
DB: db,
DialTimeout: 3 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
PoolSize: 10,
PoolTimeout: 30 * time.Second,
})
return client, true, nil
}
g := rwuid.NewWUID(tag, nil)
_ = g.LoadH28FromRedis(newClient, key)
return g, nil
}
// NewWuidWithRedisViper 唯一标识生成器,基于redis,key:生产id在redis中key
func NewWuidWithRedisViper(vk string) (*rwuid.WUID, error) {
v := viper.Sub(vk)
if v == nil {
return nil, fmt.Errorf("invalid config key: %s", vk)
}
newClient := func() (redis.Cmdable, bool, error) {
client := redis.NewClient(&redis.Options{
Addr: v.GetString("redis_addr"),
Password: v.GetString("redis_pass"),
DB: v.GetInt("redis_index"),
DialTimeout: 3 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
PoolSize: 10,
PoolTimeout: 30 * time.Second,
})
return client, true, nil
}
v.SetDefault("tag", "default")
g := rwuid.NewWUID(v.GetString("tag"), nil)
v.SetDefault("wuid_key", "wuid")
err := g.LoadH28FromRedis(newClient, v.GetString("wuid_key"))
return g, err
}
package uuid
import (
"fmt"
"log"
"os"
"sync"
"testing"
"time"
)
func genID(id int) {
g, err := NewWuidWithRedis("default", "127.0.0.1:6379", "", "session_id", 0)
if err != nil {
log.Println(err)
}
f, err := os.OpenFile(fmt.Sprintf("c:/Users/EDZ/Downloads/sessionids_%d.txt", id+1), os.O_WRONLY|os.O_TRUNC, 0600)
defer f.Close()
if err != nil {
fmt.Println(err.Error())
return
}
for i := 0; i < 10000000; i++ {
id := g.Next()
log.Println("write:", id)
_, err = f.Write([]byte(fmt.Sprintf("%d\n", id)))
if err != nil {
fmt.Println(err.Error())
}
time.Sleep(time.Millisecond * 50)
}
}
func TestUUID(t *testing.T) {
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 3; i++ {
go func(id int) {
defer wg.Done()
genID(id)
}(i)
}
wg.Wait()
}
package version
// Version 版本
var Version string = "1.0.0"
// GitHash githash
var GitHash string
package cache
type Cache interface {
Set(string, interface{}) error
Get(string, interface{}) error
Lock(string) error
Unlock(string) error
}
package cache
import (
"bytes"
//"encoding/gob"
"encoding/json"
"errors"
"io/ioutil"
"os"
"sync"
)
func NewJsonFileCache(dsn string) Cache {
cache := &fileCache{}
cache.lockers = make(map[string]*sync.Mutex)
cache.data = make(map[string][]byte)
cache.path = dsn
cache.load()
return cache
}
type fileCache struct {
gmutex sync.RWMutex
lockers map[string]*sync.Mutex
data map[string][]byte
path string
}
func (cache *fileCache) load() error {
b, err := ioutil.ReadFile(cache.path)
if err != nil {
return err
}
return json.NewDecoder(bytes.NewReader(b)).Decode(&cache.data)
}
func (cache *fileCache) save() error {
w, err := os.OpenFile(cache.path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, os.ModePerm)
if err != nil {
return err
}
defer w.Close()
return json.NewEncoder(w).Encode(cache.data)
}
func (cache *fileCache) Get(key string, val interface{}) error {
cache.gmutex.RLock()
defer cache.gmutex.RUnlock()
if b, ok := cache.data[key]; ok {
return json.NewDecoder(bytes.NewReader(b)).Decode(val)
}
return errors.New("key not exists")
}
func (cache *fileCache) Set(key string, val interface{}) error {
cache.gmutex.Lock()
defer cache.gmutex.Unlock()
w := bytes.NewBuffer(nil)
if err := json.NewEncoder(w).Encode(val); err != nil {
return err
}
cache.data[key] = w.Bytes()
return cache.save()
}
func (cache *fileCache) Lock(key string) error {
cache.gmutex.RLock()
mu, ok := cache.lockers[key]
cache.gmutex.RUnlock()
if ok {
mu.Lock()
} else {
cache.gmutex.Lock()
defer cache.gmutex.Unlock()
if mu, ok = cache.lockers[key]; ok {
mu.Lock()
} else {
mu = &sync.Mutex{}
cache.lockers[key] = mu
mu.Lock()
}
}
return nil
}
func (cache *fileCache) Unlock(key string) error {
cache.gmutex.RLock()
mu, ok := cache.lockers[key]
cache.gmutex.RUnlock()
if ok {
mu.Unlock()
}
return nil
}
package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/glutwins/webclient"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/cache"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/crypt"
)
type WachatReq map[string]interface{}
type Client struct {
*crypt.WechatConfig
data cache.Cache
tkexpire bool
}
func NewClient(cfg *crypt.WechatConfig, d cache.Cache) *Client {
c := &Client{WechatConfig: cfg, data: d}
return c
}
type JsConfig struct {
AppID string
TimeStamp int64
NonceStr string
Signature string
}
type cacheClient struct {
Token accessToken
Ticket resTicket
}
type tokenReq struct {
Sign string `json:"Auth-Sign"`
OverdueToken string `json:"overdue_token"`
Eid int `json:"enterprise_id"`
}
type tokenRsp struct {
Token string `json:"access_token"`
}
//GetAccessToken 获取access_token
func (c *Client) GetAccessToken() (string, error) {
var token accessToken
if !c.tkexpire {
if err := c.data.Get("token", &token); err == nil {
if token.valid() {
return token.AccessToken, nil
}
}
}
c.data.Lock("token")
defer c.data.Unlock("token")
url := fmt.Sprintf(accessTokenURL, c.AppID, c.AppSecret)
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("GetAccessToken httpcode=%d", resp.StatusCode)
}
if b, err := ioutil.ReadAll(resp.Body); err != nil {
return "", err
} else if err = json.Unmarshal(b, &token); err != nil {
return "", err
}
token.LastUpTs = time.Now().Unix()
c.data.Set("token", token)
c.tkexpire = false
return token.AccessToken, token.Error()
}
// ExpireToken 使token失效
func (c *Client) ExpireToken() {
c.tkexpire = true
}
func (c *Client) formatUrlWithAccessToken(base string, args ...interface{}) (string, error) {
var token string
var err error
token, err = c.GetAccessToken()
if err != nil {
return "", err
}
return fmt.Sprintf(base, append([]interface{}{token}, args...)...), nil
}
func (c *Client) postJsonUrlFormat(req interface{}, res interface{}, url string, args ...interface{}) error {
if uri, err := c.formatUrlWithAccessToken(url, args...); err != nil {
return err
} else if response, err := c.jsonPost(uri, req); err != nil {
return err
} else if err := json.Unmarshal(response, res); err != nil {
return err
}
return nil
}
func (c *Client) jsonPost(uri string, obj interface{}) ([]byte, error) {
var jsonData []byte
var err error
if str, ok := obj.(string); ok {
jsonData = []byte(str)
} else {
jsonData, err = json.Marshal(obj)
if err != nil {
return nil, err
}
}
return c.doPost(uri, "application/json;charset=utf-8", bytes.NewBuffer(jsonData))
}
func (c *Client) formPost(uri string, data map[string]string) ([]byte, error) {
body := bytes.NewBuffer(nil)
for k, v := range data {
body.WriteString(k)
body.WriteString("=")
body.WriteString(url.QueryEscape(v))
body.WriteString("&")
}
body.Truncate(body.Len() - 1)
return c.doPost(uri, "application/x-www-form-urlencoded;charset=utf-8", body)
}
func (c *Client) doPost(uri, contentType string, r io.Reader) ([]byte, error) {
response, err := http.Post(uri, contentType, r)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http get error : uri=%v , statusCode=%v", uri, response.StatusCode)
}
return ioutil.ReadAll(response.Body)
}
func (c *Client) getJsonUrlFormat(res interface{}, url string, args ...interface{}) error {
uri, err := c.formatUrlWithAccessToken(url, args...)
if err != nil {
return err
}
if b, err := webclient.DoGet(uri); err != nil {
return err
} else {
return json.Unmarshal(b, res)
}
}
//uri 为当前网页地址
func (js *Client) GetJsConfig(uri string) (config *JsConfig, err error) {
config = new(JsConfig)
var ticketStr string
ticketStr, err = js.getTicket()
if err != nil {
return
}
nonceStr := crypt.RandomStr(16)
timestamp := time.Now().Unix()
str := fmt.Sprintf("jsapi_ticket=%s&noncestr=%s&timestamp=%d&url=%s", ticketStr, nonceStr, timestamp, uri)
sigStr := crypt.Signature(str)
config.AppID = js.AppID
config.NonceStr = nonceStr
config.TimeStamp = timestamp
config.Signature = sigStr
return
}
//getTicket 获取jsapi_tocket全局缓存
func (c *Client) getTicket() (string, error) {
var ticket resTicket
if err := c.data.Get("ticket", &ticket); err == nil {
if ticket.valid() {
return ticket.Ticket, nil
}
}
c.data.Lock("ticket")
defer c.data.Unlock("ticket")
if err := c.getJsonUrlFormat(&ticket, getTicketURL); err != nil {
return "", err
}
ticket.LastUpTs = time.Now().Unix()
c.data.Set("ticket", ticket)
return ticket.Ticket, ticket.Error()
}
func (c *Client) NewQrCode(sceneId interface{}, expire int) (*QrCode, error) {
var msg string
if _, ok := sceneId.(string); ok {
if expire == 0 {
msg = fmt.Sprintf(kQrStrLimitFormat, sceneId)
} else {
return nil, fmt.Errorf("not support expire str qrcode")
}
} else {
if expire == 0 {
msg = fmt.Sprintf(kQrIntLimitFormat, sceneId)
} else {
msg = fmt.Sprintf(kQrIntFormat, expire, sceneId)
}
}
var qrcode = &QrCode{}
if err := c.postJsonUrlFormat(msg, qrcode, qrcodeURL); err != nil {
return nil, err
}
return qrcode, qrcode.Error()
}
type CustomerText struct {
Content string `json:"content"`
}
type CustomerImage struct {
MediaID string `json:"media_id"`
}
type CustomerMessage struct {
ToUser string `json:"touser"`
MsgType string `json:"msgtype"`
Text *CustomerText `json:"text,omitempty"`
Image *CustomerImage `json:"image,omitempty"`
}
func (c *Client) SendMessage(msg *CustomerMessage) error {
resp := &CommonResp{}
if err := c.postJsonUrlFormat(msg, resp, customerMsgURL); err != nil {
return err
}
if resp.ErrCode == 40001 {
if err := c.postJsonUrlFormat(msg, resp, customerMsgURL); err != nil {
return err
}
return resp.Error()
}
return resp.Error()
}
package client
const (
accessTokenURL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=%s&secret=%s"
getTicketURL = "https://api.weixin.qq.com/cgi-bin/ticket/getticket?access_token=%s&type=jsapi"
addNewsURL = "https://api.weixin.qq.com/cgi-bin/material/add_news?access_token=%s"
addMaterialURL = "https://api.weixin.qq.com/cgi-bin/material/add_material?access_token=%s&type=%s"
delMaterialURL = "https://api.weixin.qq.com/cgi-bin/material/del_material?access_token=%s"
mediaUploadURL = "https://api.weixin.qq.com/cgi-bin/media/upload?access_token=%s&type=%s"
mediaUploadImageURL = "https://api.weixin.qq.com/cgi-bin/media/uploadimg?access_token=%s"
mediaGetURL = "https://api.weixin.qq.com/cgi-bin/media/get?access_token=%s&media_id=%s"
menuCreateURL = "https://api.weixin.qq.com/cgi-bin/menu/create?access_token=%s"
menuGetURL = "https://api.weixin.qq.com/cgi-bin/menu/get?access_token=%s"
menuDeleteURL = "https://api.weixin.qq.com/cgi-bin/menu/delete?access_token=%s"
menuAddConditionalURL = "https://api.weixin.qq.com/cgi-bin/menu/addconditional?access_token=%s"
menuDeleteConditionalURL = "https://api.weixin.qq.com/cgi-bin/menu/delconditional?access_token=%s"
menuTryMatchURL = "https://api.weixin.qq.com/cgi-bin/menu/trymatch?access_token=%s"
menuSelfMenuInfoURL = "https://api.weixin.qq.com/cgi-bin/get_current_selfmenu_info?access_token=%s"
redirectOauthURL = "https://open.weixin.qq.com/connect/oauth2/authorize?appid=%s&redirect_uri=%s&response_type=code&scope=%s&state=%s#wechat_redirect"
userAccessTokenURL = "https://api.weixin.qq.com/sns/oauth2/access_token?appid=%s&secret=%s&code=%s&grant_type=authorization_code"
code2SessionURL = "https://api.weixin.qq.com/sns/jscode2session?appid=%s&secret=%s&js_code=%s&grant_type=authorization_code"
refreshAccessTokenURL = "https://api.weixin.qq.com/sns/oauth2/refresh_token?appid=%s&grant_type=refresh_token&refresh_token=%s"
userInfoURL = "https://api.weixin.qq.com/sns/userinfo?access_token=%s&openid=%s&lang=zh_CN"
checkAccessTokenURL = "https://api.weixin.qq.com/sns/auth?access_token=%s&openid=%s"
qrcodeURL = "https://api.weixin.qq.com/cgi-bin/qrcode/create?access_token=%s"
customerMsgURL = "https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token=%s"
)
//MediaType 媒体文件类型
type MediaType string
const (
//MediaTypeImage 媒体文件:图片
MediaTypeImage MediaType = "image"
//MediaTypeVoice 媒体文件:声音
MediaTypeVoice = "voice"
//MediaTypeVideo 媒体文件:视频
MediaTypeVideo = "video"
//MediaTypeThumb 媒体文件:缩略图
MediaTypeThumb = "thumb"
)
const (
kQrIntFormat = `{"expire_seconds": %d, "action_name": "QR_SCENE", "action_info": {"scene": {"scene_id": %d}}}`
kQrIntLimitFormat = `{"action_name": "QR_LIMIT_SCENE", "action_info": {"scene": {"scene_id": %d}}}`
kQrStrLimitFormat = `{"action_name": "QR_LIMIT_STR_SCENE", "action_info": {"scene": {"scene_str": "%s"}}}`
)
package client
// MsgSecCheck 内容检测
func (c *Client) MsgSecCheck(content string) *CommonResp {
resp := &CommonResp{}
if err := c.postJsonUrlFormat(map[string]string{"content": content}, resp, "https://api.weixin.qq.com/wxa/msg_sec_check?access_token=%s"); err != nil {
resp.ErrCode = -1
resp.ErrMsg = err.Error()
}
return resp
}
package client
import (
"encoding/json"
"log"
"github.com/glutwins/webclient"
)
//Article 永久图文素材
type Article struct {
Title string `json:"title"`
ThumbMediaID string `json:"thumb_media_id"`
Author string `json:"author"`
Digest string `json:"digest"`
ShowCoverPic int `json:"show_cover_pic"`
Content string `json:"content"`
ContentSourceURL string `json:"content_source_url"`
}
//AddNews 新增永久图文素材
func (c *Client) AddNews(articles []*Article) (string, error) {
var res Material
if err := c.postJsonUrlFormat(WachatReq{"articles": articles}, &res, addNewsURL); err != nil {
return "", err
}
return res.MediaID, res.Error()
}
func (c *Client) upload(filename string, resp interface{}, urlfmt string, args ...interface{}) error {
url, err := c.formatUrlWithAccessToken(urlfmt, args...)
if err != nil {
return err
}
log.Println(url)
b, err := webclient.PostMultipartForm(nil, map[string]string{"media": filename}, url)
if err != nil {
return err
}
return json.Unmarshal(b, resp)
}
// AddMaterial 上传永久性素材(处理视频需要单独上传)
func (c *Client) AddMaterial(mediaType MediaType, filename string) (string, string, error) {
var resMaterial Material
if err := c.upload(filename, &resMaterial, addMaterialURL, mediaType); err != nil {
return "", "", err
}
return resMaterial.MediaID, resMaterial.URL, resMaterial.Error()
}
//AddVideo 永久视频素材文件上传
func (c *Client) AddVideo(filename, title, introduction string) (string, string, error) {
req := WachatReq{"title": title, "introduction": introduction}
uri, err := c.formatUrlWithAccessToken(addMaterialURL, MediaTypeVideo)
if err != nil {
return "", "", err
}
fieldValue, _ := json.Marshal(req)
response, err := webclient.PostMultipartForm(
map[string]string{"description": string(fieldValue)},
map[string]string{"video": filename},
uri)
if err != nil {
return "", "", err
}
var resMaterial Material
if err = json.Unmarshal(response, &resMaterial); err != nil {
return "", "", err
}
return resMaterial.MediaID, resMaterial.URL, resMaterial.Error()
}
//DeleteMaterial 删除永久素材
func (c *Client) DeleteMaterial(mediaID string) error {
var result CommonResp
if err := c.postJsonUrlFormat(WachatReq{"media_id": mediaID}, &result, delMaterialURL); err != nil {
return err
}
return result.Error()
}
//MediaUpload 临时素材上传
func (c *Client) MediaUpload(mediaType MediaType, filename string) (*Material, error) {
media := &Material{}
if err := c.upload(filename, media, mediaUploadURL, mediaType); err != nil {
return nil, err
}
return media, media.Error()
}
//GetMediaURL 返回临时素材的下载地址供用户自己处理
//NOTICE: URL 不可公开,因为含access_token 需要立即另存文件
func (c *Client) GetMediaURL(mediaID string) (string, error) {
return c.formatUrlWithAccessToken(mediaGetURL, mediaID)
}
//ImageUpload 图片上传
func (c *Client) ImageUpload(filename string) (string, error) {
var image Material
if err := c.upload(filename, &image, mediaUploadImageURL); err != nil {
return "", err
}
return image.URL, image.Error()
}
package client
//Button 菜单按钮
type Button struct {
Type string `json:"type,omitempty"`
Name string `json:"name,omitempty"`
Key string `json:"key,omitempty"`
URL string `json:"url,omitempty"`
MediaID string `json:"media_id,omitempty"`
SubButtons []*Button `json:"sub_button,omitempty"`
}
type Menu struct {
pbtn *Button
btns []*Button
}
func (menu *Menu) AddSub(name string) *Menu {
if menu.pbtn != nil {
return nil
}
sub := &Menu{pbtn: &Button{Name: name}}
menu.btns = append(menu.btns, sub.pbtn)
return sub
}
func (menu *Menu) addButton(btn *Button) {
if menu.pbtn != nil {
menu.pbtn.SubButtons = append(menu.pbtn.SubButtons, btn)
} else {
menu.btns = append(menu.btns, btn)
}
}
// 添加click类型按钮
func (menu *Menu) AddClickButton(name, key string) {
menu.addButton(&Button{Type: "click", Name: name, Key: key})
}
//添加view类型
func (menu *Menu) AddViewButton(name, url string) {
menu.addButton(&Button{Type: "view", Name: name, URL: url})
}
// SetScanCodePushButton 扫码推事件
func (menu *Menu) AddScanCodePushButton(name, key string) {
menu.addButton(&Button{Type: "scancode_push", Name: name, Key: key})
}
//SetScanCodeWaitMsgButton 设置 扫码推事件且弹出"消息接收中"提示框
func (menu *Menu) AddScanCodeWaitMsgButton(name, key string) {
menu.addButton(&Button{Type: "scancode_waitmsg", Name: name, Key: key})
}
//SetPicSysPhotoButton 设置弹出系统拍照发图按钮
func (menu *Menu) AddPicSysPhotoButton(name, key string) {
menu.addButton(&Button{Type: "pic_sysphoto", Name: name, Key: key})
}
//SetPicPhotoOrAlbumButton 设置弹出拍照或者相册发图类型按钮
func (menu *Menu) AddPicPhotoOrAlbumButton(name, key string) {
menu.addButton(&Button{Type: "pic_photo_or_album", Name: name, Key: key})
}
// SetPicWeixinButton 设置弹出微信相册发图器类型按钮
func (menu *Menu) AddPicWeixinButton(name, key string) {
menu.addButton(&Button{Type: "pic_weixin", Name: name, Key: key})
}
// SetLocationSelectButton 设置 弹出地理位置选择器 类型按钮
func (menu *Menu) AddLocationSelectButton(name, key string) {
menu.addButton(&Button{Type: "location_select", Name: name, Key: key})
}
//SetMediaIDButton 设置 下发消息(除文本消息) 类型按钮
func (menu *Menu) AddMediaIDButton(name, mediaID string) {
menu.addButton(&Button{Type: "media_id", Name: name, MediaID: mediaID})
}
//SetViewLimitedButton 设置 跳转图文消息URL 类型按钮
func (menu *Menu) AddViewLimitedButton(name, mediaID string) {
menu.addButton(&Button{Type: "view_limited", Name: name, MediaID: mediaID})
}
//resMenuTryMatch 菜单匹配请求结果
type resMenuTryMatch struct {
CommonResp
Button []Button `json:"button"`
}
//ResMenu 查询菜单的返回数据
type ResMenu struct {
CommonResp
Menu struct {
Button []Button `json:"button"`
MenuID int64 `json:"menuid"`
} `json:"menu"`
Conditionalmenu []struct {
Button []Button `json:"button"`
MatchRule MatchRule `json:"matchrule"`
MenuID int64 `json:"menuid"`
} `json:"conditionalmenu"`
}
//ResSelfMenuInfo 自定义菜单配置返回结果
type ResSelfMenuInfo struct {
CommonResp
IsMenuOpen int32 `json:"is_menu_open"`
SelfMenuInfo struct {
Button []SelfMenuButton `json:"button"`
} `json:"selfmenu_info"`
}
//SelfMenuButton 自定义菜单配置详情
type SelfMenuButton struct {
Type string `json:"type"`
Name string `json:"name"`
Key string `json:"key"`
URL string `json:"url,omitempty"`
Value string `json:"value,omitempty"`
SubButton struct {
List []SelfMenuButton `json:"list"`
} `json:"sub_button,omitempty"`
NewsInfo struct {
List []struct {
Title string `json:"title"`
Author string `json:"author"`
Digest string `json:"digest"`
ShowCover int32 `json:"show_cover"`
CoverURL string `json:"cover_url"`
ContentURL string `json:"content_url"`
SourceURL string `json:"source_url"`
} `json:"list"`
} `json:"news_info,omitempty"`
}
//MatchRule 个性化菜单规则
type MatchRule struct {
GroupID int32 `json:"group_id,omitempty"`
Sex int32 `json:"sex,omitempty"`
Country string `json:"country,omitempty"`
Province string `json:"province,omitempty"`
City string `json:"city,omitempty"`
ClientPlatformType int32 `json:"client_platform_type,omitempty"`
Language string `json:"language,omitempty"`
}
//SetMenu 设置按钮
func (c *Client) SetMenu(menu *Menu) error {
var commError CommonResp
if err := c.postJsonUrlFormat(WachatReq{"button": menu.btns}, &commError, menuCreateURL); err != nil {
return err
}
return commError.Error()
}
//GetMenu 获取菜单配置
func (c *Client) GetMenu() (*ResMenu, error) {
resMenu := &ResMenu{}
if err := c.getJsonUrlFormat(resMenu, menuGetURL); err != nil {
return nil, err
}
return resMenu, resMenu.Error()
}
//DeleteMenu 删除菜单
func (c *Client) DeleteMenu() error {
var commError CommonResp
if err := c.getJsonUrlFormat(&commError, menuDeleteURL); err != nil {
return err
}
return commError.Error()
}
//AddConditional 添加个性化菜单
func (c *Client) AddConditional(menu *Menu, matchRule *MatchRule) error {
var commError CommonResp
req := WachatReq{
"button": menu.btns,
"matchrule": matchRule,
}
if err := c.postJsonUrlFormat(req, &commError, menuAddConditionalURL); err != nil {
return err
}
return commError.Error()
}
//DeleteConditional 删除个性化菜单
func (c *Client) DeleteConditional(menuID int64) error {
var commError CommonResp
if err := c.postJsonUrlFormat(WachatReq{"menuid": menuID}, &commError, menuDeleteConditionalURL); err != nil {
return err
}
return commError.Error()
}
//MenuTryMatch 菜单匹配
func (c *Client) MenuTryMatch(userID string) ([]Button, error) {
var res resMenuTryMatch
if err := c.postJsonUrlFormat(WachatReq{"user_id": userID}, &res, menuTryMatchURL); err != nil {
return nil, err
}
return res.Button, res.Error()
}
//GetCurrentSelfMenuInfo 获取自定义菜单配置接口
func (c *Client) GetCurrentSelfMenuInfo() (*ResSelfMenuInfo, error) {
resSelfMenuInfo := &ResSelfMenuInfo{}
if err := c.getJsonUrlFormat(resSelfMenuInfo, menuSelfMenuInfoURL); err != nil {
return nil, err
}
return resSelfMenuInfo, resSelfMenuInfo.Error()
}
package client
import (
"encoding/json"
"fmt"
"net/url"
"github.com/glutwins/webclient"
)
//GetRedirectURL 获取跳转的url地址
func (c *Client) GetRedirectURL(redirectURI, scope, state string) string {
//url encode
urlStr := url.QueryEscape(redirectURI)
return fmt.Sprintf(redirectOauthURL, c.AppID, urlStr, scope, state)
}
// GetUserAccessToken 通过网页授权的code 换取access_token(区别于context中的access_token)
func (c *Client) GetUserAccessToken(code string) (*UserAccessToken, error) {
b, err := webclient.DoGet(fmt.Sprintf(userAccessTokenURL, c.AppID, c.AppSecret, code))
if err != nil {
return nil, err
}
token := &UserAccessToken{}
if err = json.Unmarshal(b, token); err != nil {
return nil, err
}
return token, token.Error()
}
//RefreshAccessToken 刷新access_token
func (c *Client) RefreshAccessToken(refreshToken string) (*UserAccessToken, error) {
if b, err := webclient.DoGet(fmt.Sprintf(refreshAccessTokenURL, c.AppID, refreshToken)); err != nil {
return nil, err
} else {
token := &UserAccessToken{}
if err = json.Unmarshal(b, token); err != nil {
return nil, err
}
return token, token.Error()
}
}
//CheckAccessToken 检验access_token是否有效
func (c *Client) CheckAccessToken(accessToken, openID string) error {
var result CommonResp
if err := c.getJsonUrlFormat(&result, checkAccessTokenURL, openID); err != nil {
return err
}
return result.Error()
}
// Code2Session 登录凭证校验
func (c *Client) Code2Session(code string) (*UserSession, error) {
b, err := webclient.DoGet(fmt.Sprintf(code2SessionURL, c.AppID, c.AppSecret, code))
if err != nil {
return nil, err
}
session := &UserSession{}
if err = json.Unmarshal(b, session); err != nil {
return nil, err
}
return session, session.Error()
}
//GetUserInfo 如果scope为 snsapi_userinfo 则可以通过此方法获取到用户基本信息
func (c *Client) GetUserInfo(accessToken, openID string) (*UserInfo, error) {
user := &UserInfo{}
uri := fmt.Sprintf(userInfoURL, accessToken, openID)
if b, err := webclient.DoGet(uri); err != nil {
return nil, err
} else if err = json.Unmarshal(b, &user); err != nil {
return nil, err
}
return user, user.Error()
}
package client
import (
"fmt"
"time"
)
//ResAccessToken struct
type accessToken struct {
ExpireMessage
AccessToken string `json:"access_token"`
}
// resTicket 请求jsapi_tikcet返回结果
type resTicket struct {
ExpireMessage
Ticket string `json:"ticket"`
}
type CommonResp struct {
ErrCode int64 `json:"errcode"`
ErrMsg string `json:"errmsg"`
}
func (msg *CommonResp) Error() error {
if msg.ErrCode != 0 {
return fmt.Errorf("result code=%d: %s", msg.ErrCode, msg.ErrMsg)
}
return nil
}
type ExpireMessage struct {
CommonResp
ExpiresIn int64 `json:"expires_in"`
LastUpTs int64 `json:"update_in"`
}
func (msg *ExpireMessage) valid() bool {
return msg.ErrCode == 0 && time.Now().Unix() < msg.LastUpTs+msg.ExpiresIn-60
}
// 素材返回结果
type Material struct {
CommonResp
Type MediaType `json:"type"`
MediaID string `json:"media_id"`
CreatedAt int64 `json:"created_at"`
URL string `json:"url"`
}
//UserInfo 用户授权获取到用户信息
type UserInfo struct {
CommonResp
OpenID string `json:"openid"`
Nickname string `json:"nickname"`
Sex int `json:"sex"`
Province string `json:"province"`
City string `json:"city"`
Country string `json:"country"`
HeadImgURL string `json:"headimgurl"`
Privilege []string `json:"privilege"`
Unionid string `json:"unionid"`
}
type QrCode struct {
CommonResp
ExpiresSeconds int64 `json:"expire_seconds"`
Ticket string `json:"ticket"`
Url string `json:"url"`
}
// UserAccessToken 获取用户授权access_token的返回结果
type UserAccessToken struct {
ExpireMessage
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
OpenID string `json:"openid"`
Scope string `json:"scope"`
}
// UserSession 登录凭证校验结果
type UserSession struct {
CommonResp
OpenID string `json:"openid"`
SessionKey string `json:"session_key"`
UnionID string `json:"unionid"`
}
package context
import (
"encoding/xml"
"errors"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/crypt"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/message"
)
var (
// ErrSign wechat sign error
ErrSign = errors.New("invalid sign")
)
type Context interface {
Query(key string) (string, bool)
ParseRawMessage() (*message.MixMessage, error)
Render([]byte)
RenderString(string)
RenderXML(interface{})
RenderMessage(message.Reply)
}
func NewContext(req *http.Request, w http.ResponseWriter, cfg *crypt.WechatConfig) Context {
ctx := &ctxImpl{Writer: w, Request: req, cfg: cfg}
ctx.parse()
return ctx
}
// Context struct
type ctxImpl struct {
Writer http.ResponseWriter
Request *http.Request
isSafeMode bool
params url.Values
random []byte
cfg *crypt.WechatConfig
}
func (ctx *ctxImpl) parse() {
ctx.params = ctx.Request.URL.Query()
if enctype, ok := ctx.Query("encrypt_type"); ok {
ctx.isSafeMode = enctype == "aes"
}
}
func (ctx *ctxImpl) ParseRawMessage() (*message.MixMessage, error) {
timestamp, _ := ctx.Query("timestamp")
nonce, _ := ctx.Query("nonce")
signature, _ := ctx.Query("signature")
if signature != crypt.Signature(ctx.cfg.Token, timestamp, nonce) {
return nil, ErrSign
}
var rawXMLMsgBytes []byte
var err error
if ctx.isSafeMode {
var encryptedXMLMsg message.EncryptedXMLMsg
if err = xml.NewDecoder(ctx.Request.Body).Decode(&encryptedXMLMsg); err != nil {
return nil, err
}
msgSignature, _ := ctx.Query("msg_signature")
msgSignatureGen := crypt.Signature(ctx.cfg.Token, timestamp, nonce, encryptedXMLMsg.EncryptedMsg)
if msgSignature != msgSignatureGen {
return nil, ErrSign
}
//解密
ctx.random, rawXMLMsgBytes, err = crypt.DecryptMsg(ctx.cfg.AppID, encryptedXMLMsg.EncryptedMsg, ctx.cfg.AESKey)
if err != nil {
return nil, err
}
} else {
rawXMLMsgBytes, err = ioutil.ReadAll(ctx.Request.Body)
if err != nil {
return nil, err
}
}
msg := &message.MixMessage{}
err = xml.Unmarshal(rawXMLMsgBytes, msg)
return msg, err
}
// GetQuery is like Query(), it returns the keyed url query value
func (ctx *ctxImpl) Query(key string) (string, bool) {
if values, ok := ctx.params[key]; ok && len(values) > 0 {
return values[0], true
}
return "", false
}
//Render render from bytes
func (ctx *ctxImpl) Render(bytes []byte) {
ctx.Writer.WriteHeader(200)
_, err := ctx.Writer.Write(bytes)
if err != nil {
panic(err)
}
}
//String render from string
func (ctx *ctxImpl) RenderString(str string) {
ctx.Writer.Header().Set("Content-Type", "text/plain; charset=utf-8")
ctx.Render([]byte(str))
}
//XML render to xml
func (ctx *ctxImpl) RenderXML(obj interface{}) {
ctx.Writer.Header().Set("Content-Type", "application/xml; charset=utf-8")
bytes, err := xml.Marshal(obj)
if err != nil {
panic(err)
}
ctx.Render(bytes)
}
func (ctx *ctxImpl) RenderMessage(reply message.Reply) {
var replyMsg interface{} = reply
if ctx.isSafeMode {
rawMsg, err := xml.Marshal(replyMsg)
if err != nil {
return
}
//安全模式下对消息进行加密
encryptedMsg, err := crypt.EncryptMsg(ctx.random, rawMsg, ctx.cfg.AppID, ctx.cfg.AESKey)
if err != nil {
return
}
timestamp := time.Now().Unix()
strts := strconv.FormatInt(timestamp, 64)
nonce := crypt.RandomStr(16)
msgSignature := crypt.Signature(ctx.cfg.Token, strts, nonce, string(encryptedMsg))
replyMsg = message.ResponseEncryptedXMLMsg{
EncryptedMsg: string(encryptedMsg),
MsgSignature: msgSignature,
Timestamp: timestamp,
Nonce: nonce,
}
}
ctx.RenderXML(replyMsg)
return
}
package crypt
type WechatConfig struct {
AppID string
AppSecret string
Token string
AESKey string
}
package crypt
import (
"crypto/aes"
"crypto/cipher"
"encoding/base64"
"encoding/binary"
"fmt"
)
//EncryptMsg 加密消息
func EncryptMsg(random, rawXMLMsg []byte, appID, aesKey string) (encrtptMsg []byte, err error) {
var key []byte
key, err = aesKeyDecode(aesKey)
if err != nil {
return nil, err
}
ciphertext := AESEncryptMsg(random, rawXMLMsg, appID, key)
encrtptMsg = []byte(base64.StdEncoding.EncodeToString(ciphertext))
return
}
//AESEncryptMsg ciphertext = AES_Encrypt[random(16B) + msg_len(4B) + rawXMLMsg + appId]
//参考:github.com/chanxuehong/wechat.v2
func AESEncryptMsg(random, rawXMLMsg []byte, appID string, aesKey []byte) (ciphertext []byte) {
const (
BlockSize = 32 // PKCS#7
BlockMask = BlockSize - 1 // BLOCK_SIZE 为 2^n 时, 可以用 mask 获取针对 BLOCK_SIZE 的余数
)
appIDOffset := 20 + len(rawXMLMsg)
contentLen := appIDOffset + len(appID)
amountToPad := BlockSize - contentLen&BlockMask
plaintextLen := contentLen + amountToPad
plaintext := make([]byte, plaintextLen)
// 拼接
copy(plaintext[:16], random)
binary.BigEndian.PutUint32(plaintext[16:20], uint32(len(rawXMLMsg)))
copy(plaintext[20:], rawXMLMsg)
copy(plaintext[appIDOffset:], appID)
// PKCS#7 补位
for i := contentLen; i < plaintextLen; i++ {
plaintext[i] = byte(amountToPad)
}
// 加密
block, err := aes.NewCipher(aesKey[:])
if err != nil {
panic(err)
}
mode := cipher.NewCBCEncrypter(block, aesKey[:16])
mode.CryptBlocks(plaintext, plaintext)
ciphertext = plaintext
return
}
//DecryptMsg 消息解密
func DecryptMsg(appID, encryptedMsg, aesKey string) (random, rawMsgXMLBytes []byte, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic error: err=%v", e)
return
}
}()
var encryptedMsgBytes, key, getAppIDBytes []byte
encryptedMsgBytes, err = base64.StdEncoding.DecodeString(encryptedMsg)
if err != nil {
return
}
key, err = aesKeyDecode(aesKey)
if err != nil {
panic(err)
}
random, rawMsgXMLBytes, getAppIDBytes, err = AESDecryptMsg(encryptedMsgBytes, key)
if err != nil {
err = fmt.Errorf("消息解密失败,%v", err)
return
}
if appID != string(getAppIDBytes) {
err = fmt.Errorf("消息解密校验APPID失败")
return
}
return
}
func aesKeyDecode(encodedAESKey string) (key []byte, err error) {
if len(encodedAESKey) != 43 {
err = fmt.Errorf("the length of encodedAESKey must be equal to 43")
return
}
key, err = base64.StdEncoding.DecodeString(encodedAESKey + "=")
if err != nil {
return
}
if len(key) != 32 {
err = fmt.Errorf("encodingAESKey invalid")
return
}
return
}
// AESDecryptMsg ciphertext = AES_Encrypt[random(16B) + msg_len(4B) + rawXMLMsg + appId]
//参考:github.com/chanxuehong/wechat.v2
func AESDecryptMsg(ciphertext []byte, aesKey []byte) (random, rawXMLMsg, appID []byte, err error) {
const (
BlockSize = 32 // PKCS#7
BlockMask = BlockSize - 1 // BLOCK_SIZE 为 2^n 时, 可以用 mask 获取针对 BLOCK_SIZE 的余数
)
if len(ciphertext) < BlockSize {
err = fmt.Errorf("the length of ciphertext too short: %d", len(ciphertext))
return
}
if len(ciphertext)&BlockMask != 0 {
err = fmt.Errorf("ciphertext is not a multiple of the block size, the length is %d", len(ciphertext))
return
}
plaintext := make([]byte, len(ciphertext)) // len(plaintext) >= BLOCK_SIZE
// 解密
block, err := aes.NewCipher(aesKey)
if err != nil {
panic(err)
}
mode := cipher.NewCBCDecrypter(block, aesKey[:16])
mode.CryptBlocks(plaintext, ciphertext)
// PKCS#7 去除补位
amountToPad := int(plaintext[len(plaintext)-1])
if amountToPad < 1 || amountToPad > BlockSize {
err = fmt.Errorf("the amount to pad is incorrect: %d", amountToPad)
return
}
plaintext = plaintext[:len(plaintext)-amountToPad]
// 反拼接
if len(plaintext) <= 20 {
err = fmt.Errorf("plaintext too short, the length is %d", len(plaintext))
return
}
rawXMLMsgLen := int(binary.BigEndian.Uint32(plaintext[16:20]))
if rawXMLMsgLen < 0 {
err = fmt.Errorf("incorrect msg length: %d", rawXMLMsgLen)
return
}
appIDOffset := 20 + rawXMLMsgLen
if len(plaintext) <= appIDOffset {
err = fmt.Errorf("msg length too large: %d", rawXMLMsgLen)
return
}
random = plaintext[:16:20]
rawXMLMsg = plaintext[20:appIDOffset:appIDOffset]
appID = plaintext[appIDOffset:]
return
}
package crypt
import (
"bytes"
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"fmt"
"io"
"math/rand"
"reflect"
"sort"
"strings"
"time"
)
//Signature sha1签名
func Signature(params ...string) string {
sort.Strings(params)
h := sha1.New()
for _, s := range params {
io.WriteString(h, s)
}
return fmt.Sprintf("%x", h.Sum(nil))
}
func Md5Sign(val map[string]interface{}, mchSecret string) string {
strs := make([]string, 0, len(val) + 1)
for k, v := range val {
if !reflect.DeepEqual(v, nil) && !reflect.DeepEqual(v, "") {
// 不为空的参数才参与sign计算
strs = append(strs, fmt.Sprintf("%s=%v", k, v))
}
}
sort.Strings(strs)
// 商户secret加到最后参数sign计算
strs = append(strs, fmt.Sprintf("key=%s", mchSecret))
h := md5.New()
h.Write([]byte(strings.Join(strs, "&")))
return strings.ToUpper(hex.EncodeToString(h.Sum(nil))) // 计算出的md5大写
}
func EncodeXml(val map[string]interface{}) string {
var buf bytes.Buffer
buf.WriteString("<xml>")
for k, v := range val {
buf.WriteString("<" + k + ">")
buf.WriteString(fmt.Sprint(v))
buf.WriteString("</" + k + ">")
}
buf.WriteString("</xml>")
return buf.String()
}
func RandomStr(length int) string {
str := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
bytes := []byte(str)
result := []byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < length; i++ {
result = append(result, bytes[r.Intn(len(bytes))])
}
return string(result)
}
package wechat
import (
"bytes"
"encoding/xml"
"io/ioutil"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/crypt"
)
type MchBaseRes struct {
ReturnCode string `xml:"return_code"`
ReturnMsg string `xml:"return_msg"`
ResultCode string `xml:"result_code"`
ErrCode string `xml:"err_code"`
ErrCodeDes string `xml:"err_code_des"`
}
func (r MchBaseRes) Error() string {
if r.ReturnCode != "SUCCESS" {
return r.ReturnMsg
}
if r.ResultCode != "SUCCESS" {
return r.ErrCode + " " + r.ErrCodeDes
}
return ""
}
type MchTransRes struct {
MchBaseRes
AppId string `xml:"mch_appid"`
MchId string `xml:"mchid"`
DeviceInfo string `xml:"device_info"`
NonceStr string `xml:"nonce_str"`
TradeNo string `xml:"partner_trade_no"`
PayNo string `xml:"payment_no"`
PayTime string `xml:"payment_time"`
}
type MchTransferInfoRes struct {
MchBaseRes
TradeNo string `xml:"partner_trade_no"`
AppId string `xml:"appid"`
MchId string `xml:"mchid"`
PayNo string `xml:"detail_id"`
Status string `xml:"status"`
Reason string `xml:"reason"`
OpenId string `xml:"openid"`
Name string `xml:"transfer_name"`
Amount int `xml:"payment_amount"`
TransTime string `xml:"transfer_time"`
PayTime string `xml:"payment_time"`
Desc string `xml:"desc"`
}
type mchRepackRes struct {
MchBaseRes
Sign string `xml:"sign"`
TradeNo string `xml:"mch_billno"`
MchId string `xml:"mch_id"`
AppId string `xml:"wxappid"`
OpenId string `xml:"re_openid"`
Amount int `xml:"total_amount"`
PayNo string `xml:"send_listid"`
}
func (c *WechatClientV3) mchPost(url string, req map[string]interface{}, res error) error {
sign := crypt.Md5Sign(req, c.apiSecret)
req["sign"] = sign
resp, err := c.secuClient.Post(url, "application/xml", bytes.NewReader([]byte(crypt.EncodeXml(req))))
if err != nil {
return err
}
b, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return err
}
if err = xml.Unmarshal(b, res); err != nil {
return err
}
return nil
}
func (c *WechatClientV3) Transfer(openid string, tradeno string, amount int, desc string, checkName string, name string) (*MchTransRes, error) {
val := make(map[string]interface{})
val["mch_appid"] = c.AppID
val["mchid"] = c.mchid
val["nonce_str"] = crypt.RandomStr(10)
val["partner_trade_no"] = tradeno
val["openid"] = openid
val["check_name"] = checkName
val["re_user_name"] = name
val["amount"] = amount
val["desc"] = desc
val["spbill_create_ip"] = ""
res := &MchTransRes{}
if err := c.mchPost("https://api.mch.weixin.qq.com/mmpaymkttransfers/promotion/transfers", val, res); err != nil {
return nil, err
}
return res, nil
}
func (c *WechatClientV3) TransferInfo(tradeNo string) (*MchTransferInfoRes, error) {
val := make(map[string]interface{})
val["appid"] = c.AppID
val["mch_id"] = c.mchid
val["nonce_str"] = crypt.RandomStr(10)
val["partner_trade_no"] = tradeNo
res := &MchTransferInfoRes{}
if err := c.mchPost("https://api.mch.weixin.qq.com/mmpaymkttransfers/gettransferinfo", val, res); err != nil {
return nil, err
}
return res, nil
}
func (c *WechatClientV3) Redpack(openid, tradeno, mchname, wishing, act, remark, scene_id string, amount int, num int) (string, error) {
val := make(map[string]interface{})
val["nonce_str"] = crypt.RandomStr(10)
val["mch_billno"] = tradeno
val["mch_id"] = c.mchid
val["wxappid"] = c.AppID
val["send_name"] = mchname
val["re_openid"] = openid
val["total_amount"] = amount
val["total_num"] = num
val["wishing"] = wishing
val["client_ip"] = ""
val["act_name"] = act
val["remark"] = remark
if amount > 20000 {
val["scene_id"] = scene_id
}
var res mchRepackRes
if err := c.mchPost("https://api.mch.weixin.qq.com/mmpaymkttransfers/sendredpack", val, &res); err != nil {
return "", nil
}
return res.PayNo, nil
}
func (c *WechatClientV3) GroupRedpack(openid, tradeno, mchname, wishing, act, remark, scene_id string, amount int, num int) (string, error) {
val := make(map[string]interface{})
val["nonce_str"] = crypt.RandomStr(10)
val["mch_billno"] = tradeno
val["mch_id"] = c.mchid
val["wxappid"] = c.AppID
val["send_name"] = mchname
val["re_openid"] = openid
val["total_amount"] = amount
val["total_num"] = num
val["wishing"] = wishing
val["client_ip"] = ""
val["act_name"] = act
val["remark"] = remark
val["amt_type"] = "ALL_RAND"
val["scene_id"] = scene_id
var res mchRepackRes
if err := c.mchPost("https://api.mch.weixin.qq.com/mmpaymkttransfers/sendgroupredpack", val, &res); err != nil {
return "", nil
}
return res.PayNo, nil
}
package wechat
import (
"fmt"
"strconv"
"time"
)
var wechatTimeFormat = "2006-01-02T15:04:05-07:00"
// WechatTime 微信格式时间序列化
type WechatTime time.Time
// UnmarshalJSON 反序列化
func (t *WechatTime) UnmarshalJSON(data []byte) error {
if len(data) <= 2 {
return nil
}
tm, err := time.Parse(wechatTimeFormat, string(data[1:len(data)-1]))
if err == nil {
*t = WechatTime(tm)
}
return err
}
// MarshalJSON 序列化
func (t WechatTime) MarshalJSON() ([]byte, error) {
return []byte(time.Time(t).Format(`"` + wechatTimeFormat + `"`)), nil
}
type SingleTypeMsg struct {
SinglePriceMax int64 `json:"single_price_max"`
CutToPrice int64 `json:"cut_to_price"`
}
type FixedValueStockMsg struct {
CouponAmount int64 `json:"coupon_amount"`
TransactionMin int64 `json:"transaction_minimum"`
}
type CouponConsumeInfo struct {
ConsumeTime WechatTime `json:"consume_time"`
ConsumeMchID string `json:"consume_mchid"`
TransactionID string `json:"transaction_id"`
GoodsDetail []map[string]interface{} `json:"goods_detail"`
}
type CouponEntity struct {
StockCreatorMchID string `json:"stock_creator_mchid"`
StockID string `json:"stock_id"`
CouponID string `json:"coupon_id"`
CutToMessage *SingleTypeMsg `json:"cut_to_message,omitempty"`
CouponName string `json:"coupon_name"`
Status string `json:"status"`
Description string `json:"description"`
CreateTime WechatTime `json:"create_time"`
CouponType string `json:"coupon_type"`
NoCash bool `json:"no_cash"`
AvlBeginTime WechatTime `json:"available_begin_time"`
AvlEndTime WechatTime `json:"available_end_time"`
SingleItem bool `json:"singleitem"`
NormalCouponInfo *FixedValueStockMsg `json:"normal_coupon_information,omitempty"`
ConsumeInfo *CouponConsumeInfo `json:"consume_information,omitempty"`
}
type StockUseRule struct {
MaxCoupons int `json:"max_coupons"`
MaxAmount int `json:"max_amount"`
MaxAmountByDay int `json:"max_amount_by_day"`
FixedNormalCoupon *FixedValueStockMsg `json:"fixed_normal_coupon,omitempty"`
MaxCouponsPerUser int `json:"max_coupons_per_user"`
CouponType string `json:"coupon_type"`
CombineUse bool `json:"combine_use"`
}
type StockEntity struct {
StockCreatorMchID string `json:"stock_creator_mchid"`
StockID string `json:"stock_id"`
StockName string `json:"stock_name"`
Status string `json:"status"`
CreateTime string `json:"create_time"`
Description string `json:"description"`
StockUseRule *StockUseRule `json:"stock_use_rule,omitempty"`
AvlBeginTime WechatTime `json:"available_begin_time"`
AvlEndTime WechatTime `json:"available_end_time"`
NoCash bool `json:"no_cash"`
StartTime WechatTime `json:"start_time"`
StopTime WechatTime `json:"stop_time"`
CutToMessage *SingleTypeMsg `json:"cut_to_message,omitempty"`
SingleItem bool `json:"singleitem"`
StockType string `json:"stock_type"`
}
func (stock StockEntity) NewCoupon(couponID string) *CouponEntity {
entity := &CouponEntity{}
entity.AvlBeginTime = stock.AvlBeginTime
entity.AvlEndTime = stock.AvlEndTime
entity.CouponID = couponID
entity.CouponName = stock.StockName
entity.CouponType = stock.StockType
entity.CreateTime = WechatTime(time.Now())
entity.CutToMessage = stock.CutToMessage
entity.Description = stock.Description
entity.NoCash = stock.NoCash
entity.NormalCouponInfo = stock.StockUseRule.FixedNormalCoupon
entity.SingleItem = stock.SingleItem
entity.Status = "SENDED"
entity.StockCreatorMchID = stock.StockCreatorMchID
entity.StockID = stock.StockID
return entity
}
type QueryByUseMchResponse struct {
TotalCount int `json:"total_count"`
Data []*CouponEntity `json:"data"`
}
func (resp QueryByUseMchResponse) HasStock(stockid string) bool {
for _, stock := range resp.Data {
if stock.StockID == stockid {
return true
}
}
return false
}
type StockListResponse struct {
TotalCount int `json:"total_count"`
Data []*StockEntity `json:"data"`
}
type StockUseFlowResponse struct {
Url string `json:"url"`
HashValue string `json:"hash_value"`
HashType string `json:"hash_type"`
}
type ErrResponse struct {
Link string `json:"-"`
AppID string `json:"-"`
Code string `json:"code"`
Message string `json:"message"`
}
func (resp ErrResponse) Error() string {
return fmt.Sprintf("%s request %s (%s: %s)", resp.AppID, resp.Link, resp.Code, resp.Message)
}
// WechatOption 微信appOnShow参数结构体
type WechatOption struct {
Path string `json:"path"`
Scene int `json:"scene"`
Query interface{} `json:"query"`
ShareTicket string `json:"shareTicket"`
ReferrerInfo interface{} `json:"referrerInfo"`
}
func (opt *WechatOption) GetExtraData() (string, map[string]interface{}) {
if opt.ReferrerInfo == nil {
return "", nil
}
refer, ok := opt.ReferrerInfo.(map[string]interface{})
if !ok {
return "", nil
}
var appid string
var query map[string]interface{}
if v, ok := refer["appId"].(string); ok {
appid = v
}
if v, ok := refer["extraData"].(map[string]interface{}); ok {
query = v
}
return appid, query
}
func (opt *WechatOption) GetQueryString(key string) string {
query, ok := opt.Query.(map[string]interface{})
if !ok {
return ""
}
v, ok := query[key].(string)
if !ok {
return ""
}
return v
}
func (opt *WechatOption) GetQueryInt(key string) int {
query, ok := opt.Query.(map[string]interface{})
if !ok {
return 0
}
if v, ok := query[key].(float64); ok {
return int(v)
}
if v, ok := query[key].(int); ok {
return v
}
if v, ok := query[key].(string); ok {
iv, _ := strconv.Atoi(v)
return iv
}
return 0
}
// StockWithValue 指定面额券
type StockWithValue struct {
StockID string // 批次号
CrtMchID string // 创建商户号
AppID string // appid
CouponValue int // 金额
CouponMinimum int // 门槛
}
type MchStockResp struct {
Data []*MchStock `json:"data"`
TotalCount int `json:"total_count"`
Limit int `json:"limit"`
Offset int `json:"offset"`
}
type MchStock struct {
BelongMerchant string `json:"belong_merchant"`
StockName string `json:"stock_name"`
Comment string `json:"comment"`
GoodsName string `json:"goods_name"`
StockType string `json:"stock_type"`
Transferable string `json:"transferable"`
Shareable string `json:"shareable"`
CouponState string `json:"coupon_state"`
DisplayPatternInfo struct {
MerchantLogoURL string `json:"merchant_logo_url"`
MerchantName string `json:"merchant_name"`
BackgroundColor string `json:"background_color"`
CouponImageURL string `json:"coupon_image_url"`
} `json:"display_pattern_info"`
CouponUseRule struct {
CouponAvailableTime struct {
AvailableBeginTime time.Time `json:"available_begin_time"`
AvailableEndTime time.Time `json:"available_end_time"`
AvailableDayAfterReceive int `json:"available_day_after_receive"`
AvailableWeek struct {
WeekDay []string `json:"week_day"`
AvailableDayTime []struct {
BeginTime int `json:"begin_time"`
EndTime int `json:"end_time"`
} `json:"available_day_time"`
} `json:"available_week"`
IrregularyAvaliableTime []struct {
BeginTime time.Time `json:"begin_time"`
EndTime time.Time `json:"end_time"`
} `json:"irregulary_avaliable_time"`
} `json:"coupon_available_time"`
FixedNormalCoupon struct {
DiscountAmount int `json:"discount_amount"`
TransactionMinimum int `json:"transaction_minimum"`
} `json:"fixed_normal_coupon"`
DiscountCoupon struct {
DiscountPercent int `json:"discount_percent"`
TransactionMinimum int `json:"transaction_minimum"`
} `json:"discount_coupon"`
ExchangeCoupon struct {
ExchangePrice int `json:"exchange_price"`
TransactionMinimum int `json:"transaction_minimum"`
} `json:"exchange_coupon"`
UseMethod string `json:"use_method"`
MiniProgramsAppid string `json:"mini_programs_appid"`
MiniProgramsPath string `json:"mini_programs_path"`
} `json:"coupon_use_rule"`
CustomEntrance struct {
MiniProgramsInfo struct {
MiniProgramsAppid string `json:"mini_programs_appid"`
MiniProgramsPath string `json:"mini_programs_path"`
EntranceWords string `json:"entrance_words"`
GuidingWords string `json:"guiding_words"`
} `json:"mini_programs_info"`
Appid string `json:"appid"`
HallID string `json:"hall_id"`
StoreID string `json:"store_id"`
} `json:"custom_entrance"`
CouponCode string `json:"coupon_code"`
StockID string `json:"stock_id"`
AvailableStartTime time.Time `json:"available_start_time"`
ExpireTime time.Time `json:"expire_time"`
ReceiveTime time.Time `json:"receive_time"`
SendRequestNo string `json:"send_request_no"`
UseRequestNo string `json:"use_request_no"`
UseTime time.Time `json:"use_time"`
}
This diff is collapsed.
This diff is collapsed.
package message
//Image 图片消息
type Image struct {
CommonToken
Image struct {
MediaID string `xml:"MediaId"`
} `xml:"Image"`
}
//NewImage 回复图片消息
func NewImage(mediaID string) *Image {
image := new(Image)
image.MsgType = MsgTypeImage
image.Image.MediaID = mediaID
return image
}
package message
import "encoding/xml"
// MsgType 基本消息类型
type MsgType string
// EventType 事件类型
type EventType string
const (
//MsgTypeText 表示文本消息
MsgTypeText MsgType = "text"
//MsgTypeImage 表示图片消息
MsgTypeImage = "image"
//MsgTypeVoice 表示语音消息
MsgTypeVoice = "voice"
//MsgTypeVideo 表示视频消息
MsgTypeVideo = "video"
//MsgTypeShortVideo 表示短视频消息[限接收]
MsgTypeShortVideo = "shortvideo"
//MsgTypeLocation 表示坐标消息[限接收]
MsgTypeLocation = "location"
//MsgTypeLink 表示链接消息[限接收]
MsgTypeLink = "link"
//MsgTypeMusic 表示音乐消息[限回复]
MsgTypeMusic = "music"
//MsgTypeNews 表示图文消息[限回复]
MsgTypeNews = "news"
//MsgTypeTransfer 表示消息消息转发到客服
MsgTypeTransfer = "transfer_customer_service"
//MsgTypeEvent 表示事件推送消息
MsgTypeEvent = "event"
)
const (
//EventSubscribe 订阅
EventSubscribe EventType = "subscribe"
//EventUnsubscribe 取消订阅
EventUnsubscribe = "unsubscribe"
//EventScan 用户已经关注公众号,则微信会将带场景值扫描事件推送给开发者
EventScan = "SCAN"
//EventLocation 上报地理位置事件
EventLocation = "LOCATION"
//EventClick 点击菜单拉取消息时的事件推送
EventClick = "CLICK"
//EventView 点击菜单跳转链接时的事件推送
EventView = "VIEW"
//EventScancodePush 扫码推事件的事件推送
EventScancodePush = "scancode_push"
//EventScancodeWaitmsg 扫码推事件且弹出“消息接收中”提示框的事件推送
EventScancodeWaitmsg = "scancode_waitmsg"
//EventPicSysphoto 弹出系统拍照发图的事件推送
EventPicSysphoto = "pic_sysphoto"
//EventPicPhotoOrAlbum 弹出拍照或者相册发图的事件推送
EventPicPhotoOrAlbum = "pic_photo_or_album"
//EventPicWeixin 弹出微信相册发图器的事件推送
EventPicWeixin = "pic_weixin"
//EventLocationSelect 弹出地理位置选择器的事件推送
EventLocationSelect = "location_select"
)
//MixMessage 存放所有微信发送过来的消息和事件
type MixMessage struct {
CommonToken
//基本消息
MsgID int64 `xml:"MsgId"`
Content string `xml:"Content"`
PicURL string `xml:"PicUrl"`
MediaID string `xml:"MediaId"`
Format string `xml:"Format"`
ThumbMediaID string `xml:"ThumbMediaId"`
LocationX float64 `xml:"Location_X"`
LocationY float64 `xml:"Location_Y"`
Scale float64 `xml:"Scale"`
Label string `xml:"Label"`
Title string `xml:"Title"`
Description string `xml:"Description"`
URL string `xml:"Url"`
//事件相关
Event EventType `xml:"Event"`
EventKey string `xml:"EventKey"`
Ticket string `xml:"Ticket"`
Latitude string `xml:"Latitude"`
Longitude string `xml:"Longitude"`
Precision string `xml:"Precision"`
MenuID string `xml:"MenuId"`
ScanCodeInfo struct {
ScanType string `xml:"ScanType"`
ScanResult string `xml:"ScanResult"`
} `xml:"ScanCodeInfo"`
SendPicsInfo struct {
Count int32 `xml:"Count"`
PicList []EventPic `xml:"PicList>item"`
} `xml:"SendPicsInfo"`
SendLocationInfo struct {
LocationX float64 `xml:"Location_X"`
LocationY float64 `xml:"Location_Y"`
Scale float64 `xml:"Scale"`
Label string `xml:"Label"`
Poiname string `xml:"Poiname"`
}
}
//EventPic 发图事件推送
type EventPic struct {
PicMd5Sum string `xml:"PicMd5Sum"`
}
//EncryptedXMLMsg 安全模式下的消息体
type EncryptedXMLMsg struct {
XMLName struct{} `xml:"xml" json:"-"`
ToUserName string `xml:"ToUserName" json:"ToUserName"`
EncryptedMsg string `xml:"Encrypt" json:"Encrypt"`
}
//ResponseEncryptedXMLMsg 需要返回的消息体
type ResponseEncryptedXMLMsg struct {
XMLName struct{} `xml:"xml" json:"-"`
EncryptedMsg string `xml:"Encrypt" json:"Encrypt"`
MsgSignature string `xml:"MsgSignature" json:"MsgSignature"`
Timestamp int64 `xml:"TimeStamp" json:"TimeStamp"`
Nonce string `xml:"Nonce" json:"Nonce"`
}
// CommonToken 消息中通用的结构
type CommonToken struct {
XMLName xml.Name `xml:"xml"`
ToUserName string `xml:"ToUserName"`
FromUserName string `xml:"FromUserName"`
CreateTime int64 `xml:"CreateTime"`
MsgType MsgType `xml:"MsgType"`
}
//SetToUserName set ToUserName
func (msg *CommonToken) SetToUserName(toUserName string) {
msg.ToUserName = toUserName
}
//SetFromUserName set FromUserName
func (msg *CommonToken) SetFromUserName(fromUserName string) {
msg.FromUserName = fromUserName
}
//SetCreateTime set createTime
func (msg *CommonToken) SetCreateTime(createTime int64) {
msg.CreateTime = createTime
}
This diff is collapsed.
This diff is collapsed.
package message
type Reply interface {
SetToUserName(string)
SetFromUserName(string)
SetCreateTime(int64)
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment