first commit
This commit is contained in:
159
pkg/partner/mq/rocket/client.go
Normal file
159
pkg/partner/mq/rocket/client.go
Normal file
@ -0,0 +1,159 @@
|
||||
package rocket
|
||||
|
||||
import (
|
||||
"servicebase/pkg/common"
|
||||
"servicebase/pkg/partner/mq/message"
|
||||
"servicebase/pkg/partner/mq/pusher"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/anxpp/beego/logs"
|
||||
"github.com/apache/rocketmq-client-go/v2"
|
||||
"github.com/apache/rocketmq-client-go/v2/primitive"
|
||||
"github.com/apache/rocketmq-client-go/v2/producer"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
var (
|
||||
mqProducer rocketmq.Producer
|
||||
mP sync.Mutex
|
||||
topicName string
|
||||
topicGroup string
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
}
|
||||
|
||||
func init() {
|
||||
//rlog.SetLogger()
|
||||
}
|
||||
|
||||
func NewRocketMQClient(topic, group string) pusher.PushClient {
|
||||
mP.Lock()
|
||||
defer mP.Unlock()
|
||||
|
||||
if mqProducer != nil {
|
||||
return &Client{}
|
||||
}
|
||||
|
||||
topicName = topic
|
||||
topicGroup = group
|
||||
|
||||
nsList := strings.Split(viper.GetString("mq.default.nameserver"), ";")
|
||||
var ns []string
|
||||
if len(nsList) > 0 {
|
||||
for _, item := range nsList {
|
||||
ns = append(ns, item)
|
||||
}
|
||||
} else {
|
||||
ns = []string{"47.97.157.234:9876"}
|
||||
}
|
||||
|
||||
logs.Info("NewRocketMQClient: %s %s %v", topicName, topicGroup, ns)
|
||||
|
||||
var e error
|
||||
mqProducer, e = producer.NewDefaultProducer(
|
||||
producer.WithNameServer(ns),
|
||||
producer.WithRetry(2),
|
||||
producer.WithGroupName(topicGroup),
|
||||
)
|
||||
if e != nil {
|
||||
logs.Error("producer.NewDefaultProducer error: " + e.Error())
|
||||
panic(e)
|
||||
}
|
||||
if mqProducer == nil {
|
||||
logs.Error("producer.NewDefaultProducer error: mqProducer==nil")
|
||||
}
|
||||
err := mqProducer.Start()
|
||||
if err != nil {
|
||||
logs.Error("start producer error: %s", err.Error())
|
||||
panic(err)
|
||||
} else {
|
||||
logs.Info("NewRocketMQClient success")
|
||||
}
|
||||
return &Client{}
|
||||
}
|
||||
|
||||
func (client *Client) Push(tag, key string, message []byte) error {
|
||||
|
||||
defer func() {
|
||||
if x := recover(); x != nil {
|
||||
logs.Error(common.LOG_QUOTE_STRING+"服务器异常 Push Message 获取到Panic @%"+common.LOG_QUOTE_STRING, x)
|
||||
return
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
|
||||
if nil == mqProducer {
|
||||
NewRocketMQClient(topicName, topicName)
|
||||
}
|
||||
|
||||
msg := primitive.NewMessage(topicName, message)
|
||||
msg.WithTag(tag)
|
||||
msg.WithKeys([]string{key})
|
||||
res, err := mqProducer.SendSync(context.Background(), msg)
|
||||
if err != nil {
|
||||
fmt.Printf("send message error: %s\n", err)
|
||||
} else {
|
||||
fmt.Printf("send message success: result=%s\n", res.String())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (client *Client) PushString(tag, key, message string) error {
|
||||
msg := primitive.NewMessage(topicName, []byte(message))
|
||||
msg.WithTag(tag)
|
||||
msg.WithKeys([]string{key})
|
||||
res, err := mqProducer.SendSync(context.Background(), msg)
|
||||
if err != nil {
|
||||
fmt.Printf("send message error: %s\n", err)
|
||||
} else {
|
||||
fmt.Printf("send message success: result=%s\n", res.String())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// 发布注册消息
|
||||
func (client *Client) PushRegisterMessage(message message.RegisterMessage) (e error) {
|
||||
bytes, _ := json.Marshal(message)
|
||||
return client.Push("register", message.MessageId, bytes)
|
||||
}
|
||||
|
||||
// 发布事件消息
|
||||
func (client *Client) PushEventTopicMessage(message message.EventMessage) (e error) {
|
||||
bytes, _ := json.Marshal(message)
|
||||
return client.Push("event", message.MessageId, bytes)
|
||||
}
|
||||
|
||||
// 发布交易消息
|
||||
func (client *Client) PushTransactionMessage(message message.TransactionMessage) (e error) {
|
||||
bytes, _ := json.Marshal(message)
|
||||
return client.Push("transaction", message.MessageId, bytes)
|
||||
}
|
||||
|
||||
// 发布交易更新消息
|
||||
func (client *Client) PushTransactionUpdateMessage(message message.TransactionUpdateMessage) (e error) {
|
||||
bytes, _ := json.Marshal(message)
|
||||
return client.Push("transaction-update", message.MessageId, bytes)
|
||||
}
|
||||
|
||||
// 发布活跃消息
|
||||
func (client *Client) PushActiveTopicMessage(message message.ActiveMessage) (e error) {
|
||||
bytes, _ := json.Marshal(message)
|
||||
return client.Push("active", message.MessageId, bytes)
|
||||
}
|
||||
|
||||
// 发布错误消息
|
||||
func (client *Client) PushErrorTopicMessage(message message.ErrorMessage) (e error) {
|
||||
bytes, _ := json.Marshal(message)
|
||||
return client.Push("error", message.MessageId, bytes)
|
||||
}
|
||||
|
||||
// 发布激活消息
|
||||
func (client *Client) PushActiveNewTopicMessage(message message.ActiveNewMessage) (e error) {
|
||||
bytes, _ := json.Marshal(message)
|
||||
return client.Push("active-new", message.MessageId, bytes)
|
||||
}
|
||||
Reference in New Issue
Block a user