Files
servicebase/pkg/common/es/es_client.go
2025-11-19 14:24:13 +08:00

263 lines
7.0 KiB
Go

package ess
import (
"context"
"encoding/json"
"reflect"
"time"
"gitea.ddegame.cn/open/servicebase/pkg/common/es/document"
"gitea.ddegame.cn/open/servicebase/pkg/common/messages"
"github.com/anxpp/beego/logs"
"github.com/olivere/elastic/v7"
"github.com/spf13/viper"
)
var client *elastic.Client
type EsClient struct {
}
func Init() {
index := messages.TagIndex(string(messages.EventTagUser))
client = _connect()
exists, e := client.IndexExists(index).Do(context.Background())
if e != nil {
logs.Error("IndexExists ", index, " error: ", e.Error())
return
}
if !exists {
logs.Info("es user not exist")
createIndex, e := client.CreateIndex(index).BodyJson(document.UserIndex).Do(context.Background())
if e != nil {
logs.Error("IndexCreate ", index, " error: ", e.Error())
return
}
if !createIndex.Acknowledged {
}
}
indexMessage := messages.TagIndex(string(messages.EventTagMessage))
existsMessage, e := client.IndexExists(indexMessage).Do(context.Background())
if e != nil {
logs.Error("IndexExists indexMessage", index, " error: ", e.Error())
return
}
if !existsMessage {
logs.Info("es indexMessage not exist")
createIndex, e := client.CreateIndex(indexMessage).BodyJson(document.MessageIndex).Do(context.Background())
if e != nil {
logs.Error("IndexCreate ", indexMessage, " error: ", e.Error())
return
}
if !createIndex.Acknowledged {
}
}
logs.Info("init elasticsearch finish")
}
func _connect() (c *elastic.Client) {
c, e := elastic.NewSimpleClient(
elastic.SetHealthcheck(true),
elastic.SetHealthcheckInterval(10*time.Second),
elastic.SetURL(viper.GetString("es.default.addr")),
// elastic.SetBasicAuth(beego.AppConfig.String("es_username"), beego.AppConfig.String("es_password")),
)
if e != nil {
logs.Error("NewClient_error: ", e.Error())
}
return c
}
// 创建记录
func (*EsClient) Create(index, id string, model any) (body string, success bool, msg string) {
client := _connect()
_, e := client.Index().Index(index).Id(id).BodyJson(model).Do(context.Background())
if e != nil {
success = false
msg = e.Error()
b, _ := json.Marshal(model)
logs.Error("Document Create Error: ", e.Error(), " document: ", string(b), index, id)
return
}
success = true
return
}
// 判断存在
func (*EsClient) Exists(index, id string) (exists, success bool, msg string, e error) {
client := _connect()
exists, e = client.Exists().Index(index).Type("_doc").Id(id).Do(context.Background())
if e != nil {
success = false
msg = e.Error()
logs.Error("Id Exists Error: ", e.Error(), " id: ", id)
return
}
success = true
return
}
// 更新记录
func (*EsClient) Update(index, id string, model any) (body string, success bool, msg string) {
client := _connect()
_, e := client.Update().Index(index).Id(id).Doc(model).Do(context.Background())
if e != nil {
success = false
msg = e.Error()
return
}
success = true
return
}
// 搜索
func (*EsClient) Search(index string, key string, fields ...string) (body []any, success bool, msg string) {
client := _connect()
logs.Info("ES Search index =", index, " key =", key, " fields =", fields)
var list []*elastic.WildcardQuery
for _, field := range fields {
list = append(list, elastic.NewWildcardQuery(field, key))
}
s := client.Search().Index(index)
for _, query := range list {
s = s.Query(query)
}
res, e := s.From(0).Size(20).Pretty(true).Do(context.Background())
if e != nil {
success = false
msg = e.Error()
return
}
logs.Info(res.Status)
logs.Info(res.Hits.TotalHits)
var item document.User
for _, item := range res.Each(reflect.TypeOf(item)) {
if t, ok := item.(document.User); ok {
body = append(body, t)
}
}
//if res.Hits.TotalHits.Value > 0 {
// for _, hit := range res.Hits.Hits {
// var t document.User
// _ := json.Unmarshal(hit.Source, &t)
// body = append(body, t)
// }
//}
return
}
type ESFilter struct {
Queries []ESQuery
BoolMustInShouldQueries [][]ESQuery
Sort ESSort
}
type ESQuery struct {
Key string
Value string
Type string
}
type ESSort struct {
Field string
Ascending bool
}
// 搜索消息
func (*EsClient) SearchMulti(index string, filter ESFilter, page, size int) (result any, success bool, msg string) {
client := _connect()
var list []elastic.Query
for _, item := range filter.Queries {
switch item.Type {
case "match":
list = append(list, elastic.NewMatchQuery(item.Key, item.Value))
case "multi_match":
list = append(list, elastic.NewMultiMatchQuery(item.Value).Type("best_fields").Lenient(true))
case "range_gte":
list = append(list, elastic.NewRangeQuery(item.Key).Gte(item.Value))
case "range_lte":
list = append(list, elastic.NewRangeQuery(item.Key).Lte(item.Value))
}
}
if len(filter.BoolMustInShouldQueries) > 0 {
BQ := elastic.NewBoolQuery()
var bqList []elastic.Query
for _, bq := range filter.BoolMustInShouldQueries {
var listBq []elastic.Query
boolQuery := elastic.NewBoolQuery()
for _, item := range bq {
switch item.Type {
case "match":
listBq = append(listBq, elastic.NewMatchQuery(item.Key, item.Value))
case "multi_match":
listBq = append(listBq, elastic.NewMultiMatchQuery(item.Value).Type("best_fields").Lenient(true))
case "range_gte":
listBq = append(listBq, elastic.NewRangeQuery(item.Key).Gte(item.Value))
case "range_lte":
listBq = append(listBq, elastic.NewRangeQuery(item.Key).Lte(item.Value))
}
}
boolQuery.Must(listBq...)
bqList = append(bqList, boolQuery)
}
BQ.Should(bqList...)
list = append(list, BQ)
}
query := elastic.NewBoolQuery().Filter(list...)
//logs.Info(len(list))
//a1, _ := query.Source()
//a2, _ := json.MarshalIndent(a1, "", " ")
//logs.Info(string(a2))
s := client.Search().Index(index).Query(query)
if len(filter.Sort.Field) > 0 {
s = s.Sort(filter.Sort.Field, filter.Sort.Ascending)
}
res, e := s.From(page * size).Size(size).Pretty(false).Do(context.Background())
if e != nil {
return
}
var body []any
for _, item := range res.Hits.Hits {
b, _ := item.Source.MarshalJSON()
m := make(map[string]any)
_ = json.Unmarshal(b, &m)
body = append(body, m)
}
result = map[string]any{
"Status": res.Status,
"Total": res.Hits.TotalHits,
"List": body,
}
return
}
// 搜索
func (*EsClient) QueryString(page, size int, index string, key string, fields ...string) (list []document.User, count int64, success bool, msg string) {
client := _connect()
logs.Info("ES QueryString index =", index, " key =", key, " fields =", fields)
query := elastic.NewQueryStringQuery(key)
for _, field := range fields {
query.Field(field)
}
query.AnalyzeWildcard(false)
s := client.Search().Index(index)
s = s.Query(query)
s = s.Sort("_score", false)
res, e := s.From(page * size).Size(size).Pretty(true).Do(context.Background())
if e != nil {
success = false
msg = e.Error()
return
}
count = res.TotalHits()
var user document.User
for _, item := range res.Each(reflect.TypeOf(user)) {
if t, ok := item.(document.User); ok {
list = append(list, t)
}
}
success = true
return
}