package rocket import ( "context" "encoding/json" "fmt" "strings" "sync" "gitea.ddegame.cn/open/servicebase/pkg/common" "gitea.ddegame.cn/open/servicebase/pkg/partner/mq/message" "gitea.ddegame.cn/open/servicebase/pkg/partner/mq/pusher" "github.com/anxpp/beego/logs" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "github.com/spf13/viper" ) var ( mqProducer rocketmq.Producer mP sync.Mutex topicName string topicGroup string ) type Client struct { } func init() { //rlog.SetLogger() } func NewRocketMQClient(topic, group string) pusher.PushClient { mP.Lock() defer mP.Unlock() if mqProducer != nil { return &Client{} } topicName = topic topicGroup = group nsList := strings.Split(viper.GetString("mq.default.nameserver"), ";") var ns []string if len(nsList) > 0 { for _, item := range nsList { ns = append(ns, item) } } else { ns = []string{"47.97.157.234:9876"} } logs.Info("NewRocketMQClient: %s %s %v", topicName, topicGroup, ns) var e error mqProducer, e = producer.NewDefaultProducer( producer.WithNameServer(ns), producer.WithRetry(2), producer.WithGroupName(topicGroup), ) if e != nil { logs.Error("producer.NewDefaultProducer error: " + e.Error()) panic(e) } if mqProducer == nil { logs.Error("producer.NewDefaultProducer error: mqProducer==nil") } err := mqProducer.Start() if err != nil { logs.Error("start producer error: %s", err.Error()) panic(err) } else { logs.Info("NewRocketMQClient success") } return &Client{} } func (client *Client) Push(tag, key string, message []byte) error { defer func() { if x := recover(); x != nil { logs.Error(common.LOG_QUOTE_STRING+"服务器异常 Push Message 获取到Panic @%"+common.LOG_QUOTE_STRING, x) return } }() return nil if nil == mqProducer { NewRocketMQClient(topicName, topicName) } msg := primitive.NewMessage(topicName, message) msg.WithTag(tag) msg.WithKeys([]string{key}) res, err := mqProducer.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } return err } func (client *Client) PushString(tag, key, message string) error { msg := primitive.NewMessage(topicName, []byte(message)) msg.WithTag(tag) msg.WithKeys([]string{key}) res, err := mqProducer.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } return err } // 发布注册消息 func (client *Client) PushRegisterMessage(message message.RegisterMessage) (e error) { bytes, _ := json.Marshal(message) return client.Push("register", message.MessageId, bytes) } // 发布事件消息 func (client *Client) PushEventTopicMessage(message message.EventMessage) (e error) { bytes, _ := json.Marshal(message) return client.Push("event", message.MessageId, bytes) } // 发布交易消息 func (client *Client) PushTransactionMessage(message message.TransactionMessage) (e error) { bytes, _ := json.Marshal(message) return client.Push("transaction", message.MessageId, bytes) } // 发布交易更新消息 func (client *Client) PushTransactionUpdateMessage(message message.TransactionUpdateMessage) (e error) { bytes, _ := json.Marshal(message) return client.Push("transaction-update", message.MessageId, bytes) } // 发布活跃消息 func (client *Client) PushActiveTopicMessage(message message.ActiveMessage) (e error) { bytes, _ := json.Marshal(message) return client.Push("active", message.MessageId, bytes) } // 发布错误消息 func (client *Client) PushErrorTopicMessage(message message.ErrorMessage) (e error) { bytes, _ := json.Marshal(message) return client.Push("error", message.MessageId, bytes) } // 发布激活消息 func (client *Client) PushActiveNewTopicMessage(message message.ActiveNewMessage) (e error) { bytes, _ := json.Marshal(message) return client.Push("active-new", message.MessageId, bytes) }