package ess import ( "context" "encoding/json" "reflect" "time" "gitea.ddegame.cn/open/servicebase/pkg/common/es/document" "gitea.ddegame.cn/open/servicebase/pkg/common/messages" "github.com/anxpp/beego/logs" "github.com/olivere/elastic/v7" "github.com/spf13/viper" ) var client *elastic.Client type EsClient struct { } func Init() { index := messages.TagIndex(string(messages.EventTagUser)) client = _connect() exists, e := client.IndexExists(index).Do(context.Background()) if e != nil { logs.Error("IndexExists ", index, " error: ", e.Error()) return } if !exists { logs.Info("es user not exist") createIndex, e := client.CreateIndex(index).BodyJson(document.UserIndex).Do(context.Background()) if e != nil { logs.Error("IndexCreate ", index, " error: ", e.Error()) return } if !createIndex.Acknowledged { } } indexMessage := messages.TagIndex(string(messages.EventTagMessage)) existsMessage, e := client.IndexExists(indexMessage).Do(context.Background()) if e != nil { logs.Error("IndexExists indexMessage", index, " error: ", e.Error()) return } if !existsMessage { logs.Info("es indexMessage not exist") createIndex, e := client.CreateIndex(indexMessage).BodyJson(document.MessageIndex).Do(context.Background()) if e != nil { logs.Error("IndexCreate ", indexMessage, " error: ", e.Error()) return } if !createIndex.Acknowledged { } } logs.Info("init elasticsearch finish") } func _connect() (c *elastic.Client) { c, e := elastic.NewSimpleClient( elastic.SetHealthcheck(true), elastic.SetHealthcheckInterval(10*time.Second), elastic.SetURL(viper.GetString("es.default.addr")), // elastic.SetBasicAuth(beego.AppConfig.String("es_username"), beego.AppConfig.String("es_password")), ) if e != nil { logs.Error("NewClient_error: ", e.Error()) } return c } // 创建记录 func (*EsClient) Create(index, id string, model interface{}) (body string, success bool, msg string) { client := _connect() _, e := client.Index().Index(index).Id(id).BodyJson(model).Do(context.Background()) if e != nil { success = false msg = e.Error() b, _ := json.Marshal(model) logs.Error("Document Create Error: ", e.Error(), " document: ", string(b), index, id) return } success = true return } // 判断存在 func (*EsClient) Exists(index, id string) (exists, success bool, msg string, e error) { client := _connect() exists, e = client.Exists().Index(index).Type("_doc").Id(id).Do(context.Background()) if e != nil { success = false msg = e.Error() logs.Error("Id Exists Error: ", e.Error(), " id: ", id) return } success = true return } // 更新记录 func (*EsClient) Update(index, id string, model interface{}) (body string, success bool, msg string) { client := _connect() _, e := client.Update().Index(index).Id(id).Doc(model).Do(context.Background()) if e != nil { success = false msg = e.Error() return } success = true return } // 搜索 func (*EsClient) Search(index string, key string, fields ...string) (body []interface{}, success bool, msg string) { client := _connect() logs.Info("ES Search index =", index, " key =", key, " fields =", fields) var list []*elastic.WildcardQuery for _, field := range fields { list = append(list, elastic.NewWildcardQuery(field, key)) } s := client.Search().Index(index) for _, query := range list { s = s.Query(query) } res, e := s.From(0).Size(20).Pretty(true).Do(context.Background()) if e != nil { success = false msg = e.Error() return } logs.Info(res.Status) logs.Info(res.Hits.TotalHits) var item document.User for _, item := range res.Each(reflect.TypeOf(item)) { if t, ok := item.(document.User); ok { body = append(body, t) } } //if res.Hits.TotalHits.Value > 0 { // for _, hit := range res.Hits.Hits { // var t document.User // _ := json.Unmarshal(hit.Source, &t) // body = append(body, t) // } //} return } type ESFilter struct { Queries []ESQuery BoolMustInShouldQueries [][]ESQuery Sort ESSort } type ESQuery struct { Key string Value string Type string } type ESSort struct { Field string Ascending bool } // 搜索消息 func (*EsClient) SearchMulti(index string, filter ESFilter, page, size int) (result interface{}, success bool, msg string) { client := _connect() var list []elastic.Query for _, item := range filter.Queries { switch item.Type { case "match": list = append(list, elastic.NewMatchQuery(item.Key, item.Value)) case "multi_match": list = append(list, elastic.NewMultiMatchQuery(item.Value).Type("best_fields").Lenient(true)) case "range_gte": list = append(list, elastic.NewRangeQuery(item.Key).Gte(item.Value)) case "range_lte": list = append(list, elastic.NewRangeQuery(item.Key).Lte(item.Value)) } } if len(filter.BoolMustInShouldQueries) > 0 { BQ := elastic.NewBoolQuery() var bqList []elastic.Query for _, bq := range filter.BoolMustInShouldQueries { var listBq []elastic.Query boolQuery := elastic.NewBoolQuery() for _, item := range bq { switch item.Type { case "match": listBq = append(listBq, elastic.NewMatchQuery(item.Key, item.Value)) case "multi_match": listBq = append(listBq, elastic.NewMultiMatchQuery(item.Value).Type("best_fields").Lenient(true)) case "range_gte": listBq = append(listBq, elastic.NewRangeQuery(item.Key).Gte(item.Value)) case "range_lte": listBq = append(listBq, elastic.NewRangeQuery(item.Key).Lte(item.Value)) } } boolQuery.Must(listBq...) bqList = append(bqList, boolQuery) } BQ.Should(bqList...) list = append(list, BQ) } query := elastic.NewBoolQuery().Filter(list...) //logs.Info(len(list)) //a1, _ := query.Source() //a2, _ := json.MarshalIndent(a1, "", " ") //logs.Info(string(a2)) s := client.Search().Index(index).Query(query) if len(filter.Sort.Field) > 0 { s = s.Sort(filter.Sort.Field, filter.Sort.Ascending) } res, e := s.From(page * size).Size(size).Pretty(false).Do(context.Background()) if e != nil { return } var body []interface{} for _, item := range res.Hits.Hits { b, _ := item.Source.MarshalJSON() m := make(map[string]interface{}) _ = json.Unmarshal(b, &m) body = append(body, m) } result = map[string]interface{}{ "Status": res.Status, "Total": res.Hits.TotalHits, "List": body, } return } // 搜索 func (*EsClient) QueryString(page, size int, index string, key string, fields ...string) (list []document.User, count int64, success bool, msg string) { client := _connect() logs.Info("ES QueryString index =", index, " key =", key, " fields =", fields) query := elastic.NewQueryStringQuery(key) for _, field := range fields { query.Field(field) } query.AnalyzeWildcard(false) s := client.Search().Index(index) s = s.Query(query) s = s.Sort("_score", false) res, e := s.From(page * size).Size(size).Pretty(true).Do(context.Background()) if e != nil { success = false msg = e.Error() return } count = res.TotalHits() var user document.User for _, item := range res.Each(reflect.TypeOf(user)) { if t, ok := item.(document.User); ok { list = append(list, t) } } success = true return }