Commit 31d25dca authored by wuerqiQs's avatar wuerqiQs

cdp -> base all

parent c20c223e
Pipeline #18954 failed with stages
......@@ -16,8 +16,8 @@ import (
"github.com/labstack/echo/v4"
"github.com/spf13/viper"
cmq "github.com/yougg/cmq-go-tdmq"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/crypt"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/crypt"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/wechat"
"gopkg.in/gomail.v2"
)
......
......@@ -5,7 +5,7 @@ import (
"github.com/getsentry/sentry-go"
"github.com/spf13/viper"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/version"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/version"
)
var host, _ = os.Hostname()
......
......@@ -10,7 +10,7 @@ import (
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/rs/zerolog"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/exception"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
......
......@@ -7,4 +7,4 @@ if [ -x "$(command -v git)" ];then
fi
basepath=$(cd `dirname $0`; pwd)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -tags netgo -ldflags "-X 'gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/version.GitHash=$(git show -s --format=%h)' -X 'gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/version.Version=1.0.0'"
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -tags netgo -ldflags "-X 'gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/version.GitHash=$(git show -s --format=%h)' -X 'gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/version.Version=1.0.0'"
......@@ -17,7 +17,7 @@ package cmd
import (
"github.com/spf13/cobra"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/exception"
)
// mailCmd represents the mail command
......
......@@ -20,8 +20,8 @@ import (
"github.com/jmoiron/sqlx"
"github.com/spf13/cobra"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/wechat"
// mysql
_ "github.com/go-sql-driver/mysql"
......
......@@ -20,8 +20,8 @@ import (
"os"
"github.com/spf13/cobra"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/version"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/version"
"github.com/spf13/viper"
)
......
package cmd
import (
"github.com/spf13/cobra"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/supervisor"
)
/*
由于Supervirsor 通过stdin/stdout 和EventListener 进行协议通信,所以EventListener 不能通过stdout输入其他非协议内容,不然EventListener的状态就会不正常
这个暂时还没找到解决方案,暂时把initConfig里的输出注释掉
*/
// mailCmd represents the mail command
var spListenerCmd = &cobra.Command{
Use: "splisten",
Short: "supervisor 进程状态监听报警",
Long: `supervisor 进程状态监听报警`,
Run: func(cmd *cobra.Command, args []string) {
exception.SetAlertViper()
supervisor.NewListener().Start()
},
}
func init() {
rootCmd.AddCommand(spListenerCmd)
}
package cmd
import (
"github.com/spf13/cobra"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/supervisor"
)
/*
由于Supervirsor 通过stdin/stdout 和EventListener 进行协议通信,所以EventListener 不能通过stdout输入其他非协议内容,不然EventListener的状态就会不正常
这个暂时还没找到解决方案,暂时把initConfig里的输出注释掉
*/
// mailCmd represents the mail command
var spListenerCmd = &cobra.Command{
Use: "splisten",
Short: "supervisor 进程状态监听报警",
Long: `supervisor 进程状态监听报警`,
Run: func(cmd *cobra.Command, args []string) {
exception.SetAlertViper()
supervisor.NewListener().Start()
},
}
func init() {
rootCmd.AddCommand(spListenerCmd)
}
......@@ -17,8 +17,8 @@ package cmd
import (
"github.com/spf13/cobra"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/sdktool/tkserver"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/sdktool/tkserver"
)
// tkserverCmd represents the tkserver command
......
......@@ -15,7 +15,7 @@ limitations under the License.
*/
package main
import "gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/sdktool/cmd"
import "gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/sdktool/cmd"
func main() {
cmd.Execute()
......
......@@ -8,10 +8,10 @@ import (
"github.com/douyu/jupiter/pkg/server/xgrpc"
"github.com/getsentry/sentry-go"
"github.com/spf13/viper"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/cache"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/client"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/crypt"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/exception"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/wechat/cache"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/wechat/client"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/wechat/crypt"
)
type tkServerEngine struct {
......
package supervisor
import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
"time"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/schedule"
"go.uber.org/multierr"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/exception"
)
const (
READY = "READY\n"
RESULTOK = "RESULT 2\nOK"
)
type deamon struct {
pid string
pname string
nextAlert int64
alertTime int
}
func (d *deamon) resetState() {
d.nextAlert = 0
d.alertTime = 0
}
func (d *deamon) alert() {
now := time.Now().Unix()
if d.nextAlert < now {
log(fmt.Sprintf("进程:pid:%s pname:%s 未存活", d.pid, d.pname))
exception.SendAlert("supervisor_listener", fmt.Errorf("进程:pid:%s pname:%s 未存活", d.pid, d.pname))
d.alertTime += 1
if d.alertTime < 20 {
d.nextAlert = now + 60
} else if d.alertTime < 50 {
d.nextAlert = now + 300
} else if d.alertTime < 100 {
d.nextAlert = now + 1800
} else {
d.nextAlert = now + 7200
}
}
log(fmt.Sprintf("%s %s 下次报警时间:%d", d.pid, d.pname, d.nextAlert))
}
func NewListener() *Listener {
return &Listener{
deamons: make(map[string]*deamon, 0),
}
}
//Listener supervisor 子进程状态监听
type Listener struct {
deamons map[string]*deamon
lock sync.Mutex
}
func log(s string) {
fmt.Fprintf(os.Stderr, fmt.Sprintf("info:%s\n", s))
os.Stderr.Sync()
}
func (sl *Listener) state(s string) {
fmt.Fprintf(os.Stdout, s)
os.Stdout.Sync()
}
func (sl *Listener) registerDeamon(pname string, pid string) {
sl.lock.Lock()
defer sl.lock.Unlock()
sl.deamons[pname] = &deamon{pid: pid, pname: pname, nextAlert: 0, alertTime: 0}
}
func (sl *Listener) unRegisterDeamon(pname string, pid string) {
if _, ok := sl.deamons[pname]; ok {
sl.lock.Lock()
defer sl.lock.Unlock()
delete(sl.deamons, pname)
}
}
func (sl *Listener) init() {
cmd := exec.Command("supervisorctl", "status", "")
out, err := cmd.Output()
if err != nil {
log(err.Error())
}
outlines := strings.Split(string(out), "\n")
sl.lock.Lock()
defer sl.lock.Unlock()
for _, outline := range outlines {
spitem := strings.Split(outline, ",")
pinfo := strings.Split(deleteExtraSpace(spitem[0]), " ")
if len(pinfo) == 4 {
if pinfo[1] == "RUNNING" {
sl.deamons[pinfo[0]] = &deamon{pid: pinfo[3], pname: pinfo[0], nextAlert: 0, alertTime: 0}
}
}
}
}
func (sl *Listener) checkDeamonState() {
tt := schedule.NewTimerTask(schedule.WithErrHandler(func(name string, err error) {
exception.SendAlert("supervisor_listener", fmt.Errorf("%s:%s", name, err.Error()))
}), schedule.WithRunHandler(func(name string, nextAt time.Time) {
// log(name)
}))
tt.Do(time.Now(), time.Second*10, func(args ...interface{}) error {
sl.lock.Lock()
defer sl.lock.Unlock()
var err error
for _, deamon := range sl.deamons {
pid, e := strconv.Atoi(deamon.pid)
if e != nil {
err = multierr.Append(err, e)
continue
}
p, err := os.FindProcess(pid)
if err != nil {
deamon.alert()
} else {
if p == nil {
deamon.alert()
} else {
if err := p.Signal(syscall.Signal(0)); err != nil {
deamon.alert()
} else {
deamon.resetState()
}
}
}
}
return err
}, "进程状态监测")
tt.Run()
}
func (sl *Listener) recover(process, from, to string) {
log(fmt.Sprintf("进程 %s 被拉起", process))
exception.SendAlert("supervisor_listener", fmt.Errorf("进程 %s 拉起成功", process))
}
func (sl *Listener) panic(process, from, to string) {
log(fmt.Sprintf("进程 %s 异常退出,请注意排查", process))
exception.SendAlert("supervisor_listener", fmt.Errorf("进程 %s 异常退出,请注意排查", process))
}
func stringSliceContain(slice []string, value string) bool {
for _, item := range slice {
if item == value {
return true
}
}
return false
}
func deleteExtraSpace(s string) string {
s1 := strings.Replace(s, " ", " ", -1)
regstr := "\\s{2,}"
reg, _ := regexp.Compile(regstr)
s2 := make([]byte, len(s1))
copy(s2, s1)
spc_index := reg.FindStringIndex(string(s2))
for len(spc_index) > 0 {
s2 = append(s2[:spc_index[0]+1], s2[spc_index[1]:]...)
spc_index = reg.FindStringIndex(string(s2))
}
return string(s2)
}
//Start 开始监听子进程状态
func (sl *Listener) Start() {
sl.init()
sl.checkDeamonState()
lastSerial := "0"
lastPoolSerial := "0"
psExit := make(map[string]string, 0)
var body [4096]byte
reader := bufio.NewReader(os.Stdin)
for {
sl.state(READY)
line, _, err := reader.ReadLine()
if err != nil {
exception.SendAlert("supervisor_listener", err)
continue
}
log(string(line))
header := make(map[string]string, 0)
items := strings.Split(string(line), " ")
for _, item := range items {
kv := strings.Split(item, ":")
header[kv[0]] = kv[1]
}
serial := header["serial"]
poolSerial := header["poolserial"]
bodylen, err := strconv.ParseFloat(header["len"], 32)
if err != nil {
exception.SendAlert("supervisor_listener", err)
continue
}
_, err = io.ReadFull(reader, body[:int(bodylen)])
if err != nil {
exception.SendAlert("supervisor_listener", err)
continue
}
data := body[:int(bodylen)]
log("body+" + string(data))
if serial == lastSerial && poolSerial == lastPoolSerial {
sl.state(RESULTOK)
}
lastSerial = serial
lastPoolSerial = poolSerial
bodys := make(map[string]string, 0)
items = strings.Split(string(data), " ")
for _, item := range items {
kv := strings.Split(item, ":")
if len(kv) > 1 {
bodys[kv[0]] = kv[1]
}
}
process := bodys["processname"]
fromState := bodys["from_state"]
pid := bodys["pid"]
toState := header["eventname"]
log(fmt.Sprintf("\nprocessname:[%s] from_state:[%s] to_state:[%s] last_serial:[%s] last_pool_serial:[%s]\n", process, fromState, toState, lastSerial, lastPoolSerial))
if stringSliceContain([]string{"PROCESS_STATE_RUNNING"}, toState) {
sl.registerDeamon(process, pid)
if _, ok := psExit[process]; ok {
delete(psExit, process)
sl.recover(process, fromState, toState)
}
}
if stringSliceContain([]string{"PROCESS_STATE_STOPPED"}, toState) {
sl.unRegisterDeamon(process, pid)
}
if stringSliceContain([]string{"PROCESS_STATE_EXITED", "PROCESS_STATE_FATAL", "PROCESS_STATE_BACKOFF"}, toState) {
psExit[process] = ""
sl.panic(process, fromState, toState)
}
sl.state(RESULTOK)
}
}
package supervisor
import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
"time"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/schedule"
"go.uber.org/multierr"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/exception"
)
const (
READY = "READY\n"
RESULTOK = "RESULT 2\nOK"
)
type deamon struct {
pid string
pname string
nextAlert int64
alertTime int
}
func (d *deamon) resetState() {
d.nextAlert = 0
d.alertTime = 0
}
func (d *deamon) alert() {
now := time.Now().Unix()
if d.nextAlert < now {
log(fmt.Sprintf("进程:pid:%s pname:%s 未存活", d.pid, d.pname))
exception.SendAlert("supervisor_listener", fmt.Errorf("进程:pid:%s pname:%s 未存活", d.pid, d.pname))
d.alertTime += 1
if d.alertTime < 20 {
d.nextAlert = now + 60
} else if d.alertTime < 50 {
d.nextAlert = now + 300
} else if d.alertTime < 100 {
d.nextAlert = now + 1800
} else {
d.nextAlert = now + 7200
}
}
log(fmt.Sprintf("%s %s 下次报警时间:%d", d.pid, d.pname, d.nextAlert))
}
func NewListener() *Listener {
return &Listener{
deamons: make(map[string]*deamon, 0),
}
}
//Listener supervisor 子进程状态监听
type Listener struct {
deamons map[string]*deamon
lock sync.Mutex
}
func log(s string) {
fmt.Fprintf(os.Stderr, fmt.Sprintf("info:%s\n", s))
os.Stderr.Sync()
}
func (sl *Listener) state(s string) {
fmt.Fprintf(os.Stdout, s)
os.Stdout.Sync()
}
func (sl *Listener) registerDeamon(pname string, pid string) {
sl.lock.Lock()
defer sl.lock.Unlock()
sl.deamons[pname] = &deamon{pid: pid, pname: pname, nextAlert: 0, alertTime: 0}
}
func (sl *Listener) unRegisterDeamon(pname string, pid string) {
if _, ok := sl.deamons[pname]; ok {
sl.lock.Lock()
defer sl.lock.Unlock()
delete(sl.deamons, pname)
}
}
func (sl *Listener) init() {
cmd := exec.Command("supervisorctl", "status", "")
out, err := cmd.Output()
if err != nil {
log(err.Error())
}
outlines := strings.Split(string(out), "\n")
sl.lock.Lock()
defer sl.lock.Unlock()
for _, outline := range outlines {
spitem := strings.Split(outline, ",")
pinfo := strings.Split(deleteExtraSpace(spitem[0]), " ")
if len(pinfo) == 4 {
if pinfo[1] == "RUNNING" {
sl.deamons[pinfo[0]] = &deamon{pid: pinfo[3], pname: pinfo[0], nextAlert: 0, alertTime: 0}
}
}
}
}
func (sl *Listener) checkDeamonState() {
tt := schedule.NewTimerTask(schedule.WithErrHandler(func(name string, err error) {
exception.SendAlert("supervisor_listener", fmt.Errorf("%s:%s", name, err.Error()))
}), schedule.WithRunHandler(func(name string, nextAt time.Time) {
// log(name)
}))
tt.Do(time.Now(), time.Second*10, func(args ...interface{}) error {
sl.lock.Lock()
defer sl.lock.Unlock()
var err error
for _, deamon := range sl.deamons {
pid, e := strconv.Atoi(deamon.pid)
if e != nil {
err = multierr.Append(err, e)
continue
}
p, err := os.FindProcess(pid)
if err != nil {
deamon.alert()
} else {
if p == nil {
deamon.alert()
} else {
if err := p.Signal(syscall.Signal(0)); err != nil {
deamon.alert()
} else {
deamon.resetState()
}
}
}
}
return err
}, "进程状态监测")
tt.Run()
}
func (sl *Listener) recover(process, from, to string) {
log(fmt.Sprintf("进程 %s 被拉起", process))
exception.SendAlert("supervisor_listener", fmt.Errorf("进程 %s 拉起成功", process))
}
func (sl *Listener) panic(process, from, to string) {
log(fmt.Sprintf("进程 %s 异常退出,请注意排查", process))
exception.SendAlert("supervisor_listener", fmt.Errorf("进程 %s 异常退出,请注意排查", process))
}
func stringSliceContain(slice []string, value string) bool {
for _, item := range slice {
if item == value {
return true
}
}
return false
}
func deleteExtraSpace(s string) string {
s1 := strings.Replace(s, " ", " ", -1)
regstr := "\\s{2,}"
reg, _ := regexp.Compile(regstr)
s2 := make([]byte, len(s1))
copy(s2, s1)
spc_index := reg.FindStringIndex(string(s2))
for len(spc_index) > 0 {
s2 = append(s2[:spc_index[0]+1], s2[spc_index[1]:]...)
spc_index = reg.FindStringIndex(string(s2))
}
return string(s2)
}
//Start 开始监听子进程状态
func (sl *Listener) Start() {
sl.init()
sl.checkDeamonState()
lastSerial := "0"
lastPoolSerial := "0"
psExit := make(map[string]string, 0)
var body [4096]byte
reader := bufio.NewReader(os.Stdin)
for {
sl.state(READY)
line, _, err := reader.ReadLine()
if err != nil {
exception.SendAlert("supervisor_listener", err)
continue
}
log(string(line))
header := make(map[string]string, 0)
items := strings.Split(string(line), " ")
for _, item := range items {
kv := strings.Split(item, ":")
header[kv[0]] = kv[1]
}
serial := header["serial"]
poolSerial := header["poolserial"]
bodylen, err := strconv.ParseFloat(header["len"], 32)
if err != nil {
exception.SendAlert("supervisor_listener", err)
continue
}
_, err = io.ReadFull(reader, body[:int(bodylen)])
if err != nil {
exception.SendAlert("supervisor_listener", err)
continue
}
data := body[:int(bodylen)]
log("body+" + string(data))
if serial == lastSerial && poolSerial == lastPoolSerial {
sl.state(RESULTOK)
}
lastSerial = serial
lastPoolSerial = poolSerial
bodys := make(map[string]string, 0)
items = strings.Split(string(data), " ")
for _, item := range items {
kv := strings.Split(item, ":")
if len(kv) > 1 {
bodys[kv[0]] = kv[1]
}
}
process := bodys["processname"]
fromState := bodys["from_state"]
pid := bodys["pid"]
toState := header["eventname"]
log(fmt.Sprintf("\nprocessname:[%s] from_state:[%s] to_state:[%s] last_serial:[%s] last_pool_serial:[%s]\n", process, fromState, toState, lastSerial, lastPoolSerial))
if stringSliceContain([]string{"PROCESS_STATE_RUNNING"}, toState) {
sl.registerDeamon(process, pid)
if _, ok := psExit[process]; ok {
delete(psExit, process)
sl.recover(process, fromState, toState)
}
}
if stringSliceContain([]string{"PROCESS_STATE_STOPPED"}, toState) {
sl.unRegisterDeamon(process, pid)
}
if stringSliceContain([]string{"PROCESS_STATE_EXITED", "PROCESS_STATE_FATAL", "PROCESS_STATE_BACKOFF"}, toState) {
psExit[process] = ""
sl.panic(process, fromState, toState)
}
sl.state(RESULTOK)
}
}
package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/glutwins/webclient"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/cache"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/crypt"
)
type WachatReq map[string]interface{}
type Client struct {
*crypt.WechatConfig
data cache.Cache
tkexpire bool
}
func NewClient(cfg *crypt.WechatConfig, d cache.Cache) *Client {
c := &Client{WechatConfig: cfg, data: d}
return c
}
type JsConfig struct {
AppID string
TimeStamp int64
NonceStr string
Signature string
}
type cacheClient struct {
Token accessToken
Ticket resTicket
}
type tokenReq struct {
Sign string `json:"Auth-Sign"`
OverdueToken string `json:"overdue_token"`
Eid int `json:"enterprise_id"`
}
type tokenRsp struct {
Token string `json:"access_token"`
}
//GetAccessToken 获取access_token
func (c *Client) GetAccessToken() (string, error) {
var token accessToken
if !c.tkexpire {
if err := c.data.Get("token", &token); err == nil {
if token.valid() {
return token.AccessToken, nil
}
}
}
c.data.Lock("token")
defer c.data.Unlock("token")
url := fmt.Sprintf(accessTokenURL, c.AppID, c.AppSecret)
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("GetAccessToken httpcode=%d", resp.StatusCode)
}
if b, err := ioutil.ReadAll(resp.Body); err != nil {
return "", err
} else if err = json.Unmarshal(b, &token); err != nil {
return "", err
}
token.LastUpTs = time.Now().Unix()
c.data.Set("token", token)
c.tkexpire = false
return token.AccessToken, token.Error()
}
// ExpireToken 使token失效
func (c *Client) ExpireToken() {
c.tkexpire = true
}
func (c *Client) formatUrlWithAccessToken(base string, args ...interface{}) (string, error) {
var token string
var err error
token, err = c.GetAccessToken()
if err != nil {
return "", err
}
return fmt.Sprintf(base, append([]interface{}{token}, args...)...), nil
}
func (c *Client) postJsonUrlFormat(req interface{}, res interface{}, url string, args ...interface{}) error {
if uri, err := c.formatUrlWithAccessToken(url, args...); err != nil {
return err
} else if response, err := c.jsonPost(uri, req); err != nil {
return err
} else if err := json.Unmarshal(response, res); err != nil {
return err
}
return nil
}
func (c *Client) jsonPost(uri string, obj interface{}) ([]byte, error) {
var jsonData []byte
var err error
if str, ok := obj.(string); ok {
jsonData = []byte(str)
} else {
jsonData, err = json.Marshal(obj)
if err != nil {
return nil, err
}
}
return c.doPost(uri, "application/json;charset=utf-8", bytes.NewBuffer(jsonData))
}
func (c *Client) formPost(uri string, data map[string]string) ([]byte, error) {
body := bytes.NewBuffer(nil)
for k, v := range data {
body.WriteString(k)
body.WriteString("=")
body.WriteString(url.QueryEscape(v))
body.WriteString("&")
}
body.Truncate(body.Len() - 1)
return c.doPost(uri, "application/x-www-form-urlencoded;charset=utf-8", body)
}
func (c *Client) doPost(uri, contentType string, r io.Reader) ([]byte, error) {
response, err := http.Post(uri, contentType, r)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http get error : uri=%v , statusCode=%v", uri, response.StatusCode)
}
return ioutil.ReadAll(response.Body)
}
func (c *Client) getJsonUrlFormat(res interface{}, url string, args ...interface{}) error {
uri, err := c.formatUrlWithAccessToken(url, args...)
if err != nil {
return err
}
if b, err := webclient.DoGet(uri); err != nil {
return err
} else {
return json.Unmarshal(b, res)
}
}
//uri 为当前网页地址
func (js *Client) GetJsConfig(uri string) (config *JsConfig, err error) {
config = new(JsConfig)
var ticketStr string
ticketStr, err = js.getTicket()
if err != nil {
return
}
nonceStr := crypt.RandomStr(16)
timestamp := time.Now().Unix()
str := fmt.Sprintf("jsapi_ticket=%s&noncestr=%s&timestamp=%d&url=%s", ticketStr, nonceStr, timestamp, uri)
sigStr := crypt.Signature(str)
config.AppID = js.AppID
config.NonceStr = nonceStr
config.TimeStamp = timestamp
config.Signature = sigStr
return
}
//getTicket 获取jsapi_tocket全局缓存
func (c *Client) getTicket() (string, error) {
var ticket resTicket
if err := c.data.Get("ticket", &ticket); err == nil {
if ticket.valid() {
return ticket.Ticket, nil
}
}
c.data.Lock("ticket")
defer c.data.Unlock("ticket")
if err := c.getJsonUrlFormat(&ticket, getTicketURL); err != nil {
return "", err
}
ticket.LastUpTs = time.Now().Unix()
c.data.Set("ticket", ticket)
return ticket.Ticket, ticket.Error()
}
func (c *Client) NewQrCode(sceneId interface{}, expire int) (*QrCode, error) {
var msg string
if _, ok := sceneId.(string); ok {
if expire == 0 {
msg = fmt.Sprintf(kQrStrLimitFormat, sceneId)
} else {
return nil, fmt.Errorf("not support expire str qrcode")
}
} else {
if expire == 0 {
msg = fmt.Sprintf(kQrIntLimitFormat, sceneId)
} else {
msg = fmt.Sprintf(kQrIntFormat, expire, sceneId)
}
}
var qrcode = &QrCode{}
if err := c.postJsonUrlFormat(msg, qrcode, qrcodeURL); err != nil {
return nil, err
}
return qrcode, qrcode.Error()
}
type CustomerText struct {
Content string `json:"content"`
}
type CustomerImage struct {
MediaID string `json:"media_id"`
}
type CustomerMessage struct {
ToUser string `json:"touser"`
MsgType string `json:"msgtype"`
Text *CustomerText `json:"text,omitempty"`
Image *CustomerImage `json:"image,omitempty"`
}
func (c *Client) SendMessage(msg *CustomerMessage) error {
resp := &CommonResp{}
if err := c.postJsonUrlFormat(msg, resp, customerMsgURL); err != nil {
return err
}
if resp.ErrCode == 40001 {
if err := c.postJsonUrlFormat(msg, resp, customerMsgURL); err != nil {
return err
}
return resp.Error()
}
return resp.Error()
}
package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/glutwins/webclient"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/wechat/cache"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/wechat/crypt"
)
type WachatReq map[string]interface{}
type Client struct {
*crypt.WechatConfig
data cache.Cache
tkexpire bool
}
func NewClient(cfg *crypt.WechatConfig, d cache.Cache) *Client {
c := &Client{WechatConfig: cfg, data: d}
return c
}
type JsConfig struct {
AppID string
TimeStamp int64
NonceStr string
Signature string
}
type cacheClient struct {
Token accessToken
Ticket resTicket
}
type tokenReq struct {
Sign string `json:"Auth-Sign"`
OverdueToken string `json:"overdue_token"`
Eid int `json:"enterprise_id"`
}
type tokenRsp struct {
Token string `json:"access_token"`
}
//GetAccessToken 获取access_token
func (c *Client) GetAccessToken() (string, error) {
var token accessToken
if !c.tkexpire {
if err := c.data.Get("token", &token); err == nil {
if token.valid() {
return token.AccessToken, nil
}
}
}
c.data.Lock("token")
defer c.data.Unlock("token")
url := fmt.Sprintf(accessTokenURL, c.AppID, c.AppSecret)
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("GetAccessToken httpcode=%d", resp.StatusCode)
}
if b, err := ioutil.ReadAll(resp.Body); err != nil {
return "", err
} else if err = json.Unmarshal(b, &token); err != nil {
return "", err
}
token.LastUpTs = time.Now().Unix()
c.data.Set("token", token)
c.tkexpire = false
return token.AccessToken, token.Error()
}
// ExpireToken 使token失效
func (c *Client) ExpireToken() {
c.tkexpire = true
}
func (c *Client) formatUrlWithAccessToken(base string, args ...interface{}) (string, error) {
var token string
var err error
token, err = c.GetAccessToken()
if err != nil {
return "", err
}
return fmt.Sprintf(base, append([]interface{}{token}, args...)...), nil
}
func (c *Client) postJsonUrlFormat(req interface{}, res interface{}, url string, args ...interface{}) error {
if uri, err := c.formatUrlWithAccessToken(url, args...); err != nil {
return err
} else if response, err := c.jsonPost(uri, req); err != nil {
return err
} else if err := json.Unmarshal(response, res); err != nil {
return err
}
return nil
}
func (c *Client) jsonPost(uri string, obj interface{}) ([]byte, error) {
var jsonData []byte
var err error
if str, ok := obj.(string); ok {
jsonData = []byte(str)
} else {
jsonData, err = json.Marshal(obj)
if err != nil {
return nil, err
}
}
return c.doPost(uri, "application/json;charset=utf-8", bytes.NewBuffer(jsonData))
}
func (c *Client) formPost(uri string, data map[string]string) ([]byte, error) {
body := bytes.NewBuffer(nil)
for k, v := range data {
body.WriteString(k)
body.WriteString("=")
body.WriteString(url.QueryEscape(v))
body.WriteString("&")
}
body.Truncate(body.Len() - 1)
return c.doPost(uri, "application/x-www-form-urlencoded;charset=utf-8", body)
}
func (c *Client) doPost(uri, contentType string, r io.Reader) ([]byte, error) {
response, err := http.Post(uri, contentType, r)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http get error : uri=%v , statusCode=%v", uri, response.StatusCode)
}
return ioutil.ReadAll(response.Body)
}
func (c *Client) getJsonUrlFormat(res interface{}, url string, args ...interface{}) error {
uri, err := c.formatUrlWithAccessToken(url, args...)
if err != nil {
return err
}
if b, err := webclient.DoGet(uri); err != nil {
return err
} else {
return json.Unmarshal(b, res)
}
}
//uri 为当前网页地址
func (js *Client) GetJsConfig(uri string) (config *JsConfig, err error) {
config = new(JsConfig)
var ticketStr string
ticketStr, err = js.getTicket()
if err != nil {
return
}
nonceStr := crypt.RandomStr(16)
timestamp := time.Now().Unix()
str := fmt.Sprintf("jsapi_ticket=%s&noncestr=%s&timestamp=%d&url=%s", ticketStr, nonceStr, timestamp, uri)
sigStr := crypt.Signature(str)
config.AppID = js.AppID
config.NonceStr = nonceStr
config.TimeStamp = timestamp
config.Signature = sigStr
return
}
//getTicket 获取jsapi_tocket全局缓存
func (c *Client) getTicket() (string, error) {
var ticket resTicket
if err := c.data.Get("ticket", &ticket); err == nil {
if ticket.valid() {
return ticket.Ticket, nil
}
}
c.data.Lock("ticket")
defer c.data.Unlock("ticket")
if err := c.getJsonUrlFormat(&ticket, getTicketURL); err != nil {
return "", err
}
ticket.LastUpTs = time.Now().Unix()
c.data.Set("ticket", ticket)
return ticket.Ticket, ticket.Error()
}
func (c *Client) NewQrCode(sceneId interface{}, expire int) (*QrCode, error) {
var msg string
if _, ok := sceneId.(string); ok {
if expire == 0 {
msg = fmt.Sprintf(kQrStrLimitFormat, sceneId)
} else {
return nil, fmt.Errorf("not support expire str qrcode")
}
} else {
if expire == 0 {
msg = fmt.Sprintf(kQrIntLimitFormat, sceneId)
} else {
msg = fmt.Sprintf(kQrIntFormat, expire, sceneId)
}
}
var qrcode = &QrCode{}
if err := c.postJsonUrlFormat(msg, qrcode, qrcodeURL); err != nil {
return nil, err
}
return qrcode, qrcode.Error()
}
type CustomerText struct {
Content string `json:"content"`
}
type CustomerImage struct {
MediaID string `json:"media_id"`
}
type CustomerMessage struct {
ToUser string `json:"touser"`
MsgType string `json:"msgtype"`
Text *CustomerText `json:"text,omitempty"`
Image *CustomerImage `json:"image,omitempty"`
}
func (c *Client) SendMessage(msg *CustomerMessage) error {
resp := &CommonResp{}
if err := c.postJsonUrlFormat(msg, resp, customerMsgURL); err != nil {
return err
}
if resp.ErrCode == 40001 {
if err := c.postJsonUrlFormat(msg, resp, customerMsgURL); err != nil {
return err
}
return resp.Error()
}
return resp.Error()
}
package context
import (
"encoding/xml"
"errors"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/crypt"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/message"
)
var (
// ErrSign wechat sign error
ErrSign = errors.New("invalid sign")
)
type Context interface {
Query(key string) (string, bool)
ParseRawMessage() (*message.MixMessage, error)
Render([]byte)
RenderString(string)
RenderXML(interface{})
RenderMessage(message.Reply)
}
func NewContext(req *http.Request, w http.ResponseWriter, cfg *crypt.WechatConfig) Context {
ctx := &ctxImpl{Writer: w, Request: req, cfg: cfg}
ctx.parse()
return ctx
}
// Context struct
type ctxImpl struct {
Writer http.ResponseWriter
Request *http.Request
isSafeMode bool
params url.Values
random []byte
cfg *crypt.WechatConfig
}
func (ctx *ctxImpl) parse() {
ctx.params = ctx.Request.URL.Query()
if enctype, ok := ctx.Query("encrypt_type"); ok {
ctx.isSafeMode = enctype == "aes"
}
}
func (ctx *ctxImpl) ParseRawMessage() (*message.MixMessage, error) {
timestamp, _ := ctx.Query("timestamp")
nonce, _ := ctx.Query("nonce")
signature, _ := ctx.Query("signature")
if signature != crypt.Signature(ctx.cfg.Token, timestamp, nonce) {
return nil, ErrSign
}
var rawXMLMsgBytes []byte
var err error
if ctx.isSafeMode {
var encryptedXMLMsg message.EncryptedXMLMsg
if err = xml.NewDecoder(ctx.Request.Body).Decode(&encryptedXMLMsg); err != nil {
return nil, err
}
msgSignature, _ := ctx.Query("msg_signature")
msgSignatureGen := crypt.Signature(ctx.cfg.Token, timestamp, nonce, encryptedXMLMsg.EncryptedMsg)
if msgSignature != msgSignatureGen {
return nil, ErrSign
}
//解密
ctx.random, rawXMLMsgBytes, err = crypt.DecryptMsg(ctx.cfg.AppID, encryptedXMLMsg.EncryptedMsg, ctx.cfg.AESKey)
if err != nil {
return nil, err
}
} else {
rawXMLMsgBytes, err = ioutil.ReadAll(ctx.Request.Body)
if err != nil {
return nil, err
}
}
msg := &message.MixMessage{}
err = xml.Unmarshal(rawXMLMsgBytes, msg)
return msg, err
}
// GetQuery is like Query(), it returns the keyed url query value
func (ctx *ctxImpl) Query(key string) (string, bool) {
if values, ok := ctx.params[key]; ok && len(values) > 0 {
return values[0], true
}
return "", false
}
//Render render from bytes
func (ctx *ctxImpl) Render(bytes []byte) {
ctx.Writer.WriteHeader(200)
_, err := ctx.Writer.Write(bytes)
if err != nil {
panic(err)
}
}
//String render from string
func (ctx *ctxImpl) RenderString(str string) {
ctx.Writer.Header().Set("Content-Type", "text/plain; charset=utf-8")
ctx.Render([]byte(str))
}
//XML render to xml
func (ctx *ctxImpl) RenderXML(obj interface{}) {
ctx.Writer.Header().Set("Content-Type", "application/xml; charset=utf-8")
bytes, err := xml.Marshal(obj)
if err != nil {
panic(err)
}
ctx.Render(bytes)
}
func (ctx *ctxImpl) RenderMessage(reply message.Reply) {
var replyMsg interface{} = reply
if ctx.isSafeMode {
rawMsg, err := xml.Marshal(replyMsg)
if err != nil {
return
}
//安全模式下对消息进行加密
encryptedMsg, err := crypt.EncryptMsg(ctx.random, rawMsg, ctx.cfg.AppID, ctx.cfg.AESKey)
if err != nil {
return
}
timestamp := time.Now().Unix()
strts := strconv.FormatInt(timestamp, 64)
nonce := crypt.RandomStr(16)
msgSignature := crypt.Signature(ctx.cfg.Token, strts, nonce, string(encryptedMsg))
replyMsg = message.ResponseEncryptedXMLMsg{
EncryptedMsg: string(encryptedMsg),
MsgSignature: msgSignature,
Timestamp: timestamp,
Nonce: nonce,
}
}
ctx.RenderXML(replyMsg)
return
}
package context
import (
"encoding/xml"
"errors"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/wechat/crypt"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/wechat/message"
)
var (
// ErrSign wechat sign error
ErrSign = errors.New("invalid sign")
)
type Context interface {
Query(key string) (string, bool)
ParseRawMessage() (*message.MixMessage, error)
Render([]byte)
RenderString(string)
RenderXML(interface{})
RenderMessage(message.Reply)
}
func NewContext(req *http.Request, w http.ResponseWriter, cfg *crypt.WechatConfig) Context {
ctx := &ctxImpl{Writer: w, Request: req, cfg: cfg}
ctx.parse()
return ctx
}
// Context struct
type ctxImpl struct {
Writer http.ResponseWriter
Request *http.Request
isSafeMode bool
params url.Values
random []byte
cfg *crypt.WechatConfig
}
func (ctx *ctxImpl) parse() {
ctx.params = ctx.Request.URL.Query()
if enctype, ok := ctx.Query("encrypt_type"); ok {
ctx.isSafeMode = enctype == "aes"
}
}
func (ctx *ctxImpl) ParseRawMessage() (*message.MixMessage, error) {
timestamp, _ := ctx.Query("timestamp")
nonce, _ := ctx.Query("nonce")
signature, _ := ctx.Query("signature")
if signature != crypt.Signature(ctx.cfg.Token, timestamp, nonce) {
return nil, ErrSign
}
var rawXMLMsgBytes []byte
var err error
if ctx.isSafeMode {
var encryptedXMLMsg message.EncryptedXMLMsg
if err = xml.NewDecoder(ctx.Request.Body).Decode(&encryptedXMLMsg); err != nil {
return nil, err
}
msgSignature, _ := ctx.Query("msg_signature")
msgSignatureGen := crypt.Signature(ctx.cfg.Token, timestamp, nonce, encryptedXMLMsg.EncryptedMsg)
if msgSignature != msgSignatureGen {
return nil, ErrSign
}
//解密
ctx.random, rawXMLMsgBytes, err = crypt.DecryptMsg(ctx.cfg.AppID, encryptedXMLMsg.EncryptedMsg, ctx.cfg.AESKey)
if err != nil {
return nil, err
}
} else {
rawXMLMsgBytes, err = ioutil.ReadAll(ctx.Request.Body)
if err != nil {
return nil, err
}
}
msg := &message.MixMessage{}
err = xml.Unmarshal(rawXMLMsgBytes, msg)
return msg, err
}
// GetQuery is like Query(), it returns the keyed url query value
func (ctx *ctxImpl) Query(key string) (string, bool) {
if values, ok := ctx.params[key]; ok && len(values) > 0 {
return values[0], true
}
return "", false
}
//Render render from bytes
func (ctx *ctxImpl) Render(bytes []byte) {
ctx.Writer.WriteHeader(200)
_, err := ctx.Writer.Write(bytes)
if err != nil {
panic(err)
}
}
//String render from string
func (ctx *ctxImpl) RenderString(str string) {
ctx.Writer.Header().Set("Content-Type", "text/plain; charset=utf-8")
ctx.Render([]byte(str))
}
//XML render to xml
func (ctx *ctxImpl) RenderXML(obj interface{}) {
ctx.Writer.Header().Set("Content-Type", "application/xml; charset=utf-8")
bytes, err := xml.Marshal(obj)
if err != nil {
panic(err)
}
ctx.Render(bytes)
}
func (ctx *ctxImpl) RenderMessage(reply message.Reply) {
var replyMsg interface{} = reply
if ctx.isSafeMode {
rawMsg, err := xml.Marshal(replyMsg)
if err != nil {
return
}
//安全模式下对消息进行加密
encryptedMsg, err := crypt.EncryptMsg(ctx.random, rawMsg, ctx.cfg.AppID, ctx.cfg.AESKey)
if err != nil {
return
}
timestamp := time.Now().Unix()
strts := strconv.FormatInt(timestamp, 64)
nonce := crypt.RandomStr(16)
msgSignature := crypt.Signature(ctx.cfg.Token, strts, nonce, string(encryptedMsg))
replyMsg = message.ResponseEncryptedXMLMsg{
EncryptedMsg: string(encryptedMsg),
MsgSignature: msgSignature,
Timestamp: timestamp,
Nonce: nonce,
}
}
ctx.RenderXML(replyMsg)
return
}
......@@ -5,7 +5,7 @@ import (
"encoding/xml"
"io/ioutil"
"gitlab-ce.k8s.tools.vchangyi.com/cdp/cy-sdk-go/wechat/crypt"
"gitlab-ce.k8s.tools.vchangyi.com/base/cy-sdk-go/wechat/crypt"
)
type MchBaseRes struct {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment