262 lines
7.0 KiB
Go
262 lines
7.0 KiB
Go
package ess
|
|
|
|
import (
|
|
"servicebase/pkg/common/es/document"
|
|
"servicebase/pkg/common/messages"
|
|
"context"
|
|
"encoding/json"
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/anxpp/beego/logs"
|
|
"github.com/olivere/elastic/v7"
|
|
"github.com/spf13/viper"
|
|
)
|
|
|
|
var client *elastic.Client
|
|
|
|
type EsClient struct {
|
|
}
|
|
|
|
func Init() {
|
|
index := messages.TagIndex(string(messages.EventTagUser))
|
|
client = _connect()
|
|
exists, e := client.IndexExists(index).Do(context.Background())
|
|
if e != nil {
|
|
logs.Error("IndexExists ", index, " error: ", e.Error())
|
|
return
|
|
}
|
|
if !exists {
|
|
logs.Info("es user not exist")
|
|
createIndex, e := client.CreateIndex(index).BodyJson(document.UserIndex).Do(context.Background())
|
|
if e != nil {
|
|
logs.Error("IndexCreate ", index, " error: ", e.Error())
|
|
return
|
|
}
|
|
if !createIndex.Acknowledged {
|
|
}
|
|
}
|
|
indexMessage := messages.TagIndex(string(messages.EventTagMessage))
|
|
existsMessage, e := client.IndexExists(indexMessage).Do(context.Background())
|
|
if e != nil {
|
|
logs.Error("IndexExists indexMessage", index, " error: ", e.Error())
|
|
return
|
|
}
|
|
if !existsMessage {
|
|
logs.Info("es indexMessage not exist")
|
|
createIndex, e := client.CreateIndex(indexMessage).BodyJson(document.MessageIndex).Do(context.Background())
|
|
if e != nil {
|
|
logs.Error("IndexCreate ", indexMessage, " error: ", e.Error())
|
|
return
|
|
}
|
|
if !createIndex.Acknowledged {
|
|
}
|
|
}
|
|
logs.Info("init elasticsearch finish")
|
|
}
|
|
|
|
func _connect() (c *elastic.Client) {
|
|
c, e := elastic.NewSimpleClient(
|
|
elastic.SetHealthcheck(true),
|
|
elastic.SetHealthcheckInterval(10*time.Second),
|
|
elastic.SetURL(viper.GetString("es.default.addr")),
|
|
// elastic.SetBasicAuth(beego.AppConfig.String("es_username"), beego.AppConfig.String("es_password")),
|
|
)
|
|
if e != nil {
|
|
logs.Error("NewClient_error: ", e.Error())
|
|
}
|
|
return c
|
|
}
|
|
|
|
// 创建记录
|
|
func (*EsClient) Create(index, id string, model interface{}) (body string, success bool, msg string) {
|
|
client := _connect()
|
|
_, e := client.Index().Index(index).Id(id).BodyJson(model).Do(context.Background())
|
|
if e != nil {
|
|
success = false
|
|
msg = e.Error()
|
|
b, _ := json.Marshal(model)
|
|
logs.Error("Document Create Error: ", e.Error(), " document: ", string(b), index, id)
|
|
return
|
|
}
|
|
success = true
|
|
return
|
|
}
|
|
|
|
// 判断存在
|
|
func (*EsClient) Exists(index, id string) (exists, success bool, msg string, e error) {
|
|
client := _connect()
|
|
exists, e = client.Exists().Index(index).Type("_doc").Id(id).Do(context.Background())
|
|
if e != nil {
|
|
success = false
|
|
msg = e.Error()
|
|
logs.Error("Id Exists Error: ", e.Error(), " id: ", id)
|
|
return
|
|
}
|
|
success = true
|
|
return
|
|
}
|
|
|
|
// 更新记录
|
|
func (*EsClient) Update(index, id string, model interface{}) (body string, success bool, msg string) {
|
|
client := _connect()
|
|
_, e := client.Update().Index(index).Id(id).Doc(model).Do(context.Background())
|
|
if e != nil {
|
|
success = false
|
|
msg = e.Error()
|
|
return
|
|
}
|
|
success = true
|
|
return
|
|
}
|
|
|
|
// 搜索
|
|
func (*EsClient) Search(index string, key string, fields ...string) (body []interface{}, success bool, msg string) {
|
|
client := _connect()
|
|
logs.Info("ES Search index =", index, " key =", key, " fields =", fields)
|
|
var list []*elastic.WildcardQuery
|
|
for _, field := range fields {
|
|
list = append(list, elastic.NewWildcardQuery(field, key))
|
|
}
|
|
s := client.Search().Index(index)
|
|
for _, query := range list {
|
|
s = s.Query(query)
|
|
}
|
|
res, e := s.From(0).Size(20).Pretty(true).Do(context.Background())
|
|
if e != nil {
|
|
success = false
|
|
msg = e.Error()
|
|
return
|
|
}
|
|
logs.Info(res.Status)
|
|
logs.Info(res.Hits.TotalHits)
|
|
var item document.User
|
|
for _, item := range res.Each(reflect.TypeOf(item)) {
|
|
if t, ok := item.(document.User); ok {
|
|
body = append(body, t)
|
|
}
|
|
}
|
|
//if res.Hits.TotalHits.Value > 0 {
|
|
// for _, hit := range res.Hits.Hits {
|
|
// var t document.User
|
|
// _ := json.Unmarshal(hit.Source, &t)
|
|
// body = append(body, t)
|
|
// }
|
|
//}
|
|
return
|
|
}
|
|
|
|
type ESFilter struct {
|
|
Queries []ESQuery
|
|
BoolMustInShouldQueries [][]ESQuery
|
|
Sort ESSort
|
|
}
|
|
|
|
type ESQuery struct {
|
|
Key string
|
|
Value string
|
|
Type string
|
|
}
|
|
|
|
type ESSort struct {
|
|
Field string
|
|
Ascending bool
|
|
}
|
|
|
|
// 搜索消息
|
|
func (*EsClient) SearchMulti(index string, filter ESFilter, page, size int) (result interface{}, success bool, msg string) {
|
|
client := _connect()
|
|
var list []elastic.Query
|
|
for _, item := range filter.Queries {
|
|
switch item.Type {
|
|
case "match":
|
|
list = append(list, elastic.NewMatchQuery(item.Key, item.Value))
|
|
case "multi_match":
|
|
list = append(list, elastic.NewMultiMatchQuery(item.Value).Type("best_fields").Lenient(true))
|
|
case "range_gte":
|
|
list = append(list, elastic.NewRangeQuery(item.Key).Gte(item.Value))
|
|
case "range_lte":
|
|
list = append(list, elastic.NewRangeQuery(item.Key).Lte(item.Value))
|
|
}
|
|
}
|
|
if len(filter.BoolMustInShouldQueries) > 0 {
|
|
BQ := elastic.NewBoolQuery()
|
|
var bqList []elastic.Query
|
|
for _, bq := range filter.BoolMustInShouldQueries {
|
|
var listBq []elastic.Query
|
|
boolQuery := elastic.NewBoolQuery()
|
|
for _, item := range bq {
|
|
switch item.Type {
|
|
case "match":
|
|
listBq = append(listBq, elastic.NewMatchQuery(item.Key, item.Value))
|
|
case "multi_match":
|
|
listBq = append(listBq, elastic.NewMultiMatchQuery(item.Value).Type("best_fields").Lenient(true))
|
|
case "range_gte":
|
|
listBq = append(listBq, elastic.NewRangeQuery(item.Key).Gte(item.Value))
|
|
case "range_lte":
|
|
listBq = append(listBq, elastic.NewRangeQuery(item.Key).Lte(item.Value))
|
|
}
|
|
}
|
|
boolQuery.Must(listBq...)
|
|
bqList = append(bqList, boolQuery)
|
|
}
|
|
BQ.Should(bqList...)
|
|
list = append(list, BQ)
|
|
}
|
|
query := elastic.NewBoolQuery().Filter(list...)
|
|
//logs.Info(len(list))
|
|
//a1, _ := query.Source()
|
|
//a2, _ := json.MarshalIndent(a1, "", " ")
|
|
//logs.Info(string(a2))
|
|
s := client.Search().Index(index).Query(query)
|
|
if len(filter.Sort.Field) > 0 {
|
|
s = s.Sort(filter.Sort.Field, filter.Sort.Ascending)
|
|
}
|
|
res, e := s.From(page * size).Size(size).Pretty(false).Do(context.Background())
|
|
if e != nil {
|
|
return
|
|
}
|
|
var body []interface{}
|
|
for _, item := range res.Hits.Hits {
|
|
b, _ := item.Source.MarshalJSON()
|
|
m := make(map[string]interface{})
|
|
_ = json.Unmarshal(b, &m)
|
|
body = append(body, m)
|
|
}
|
|
result = map[string]interface{}{
|
|
"Status": res.Status,
|
|
"Total": res.Hits.TotalHits,
|
|
"List": body,
|
|
}
|
|
return
|
|
}
|
|
|
|
// 搜索
|
|
func (*EsClient) QueryString(page, size int, index string, key string, fields ...string) (list []document.User, count int64, success bool, msg string) {
|
|
client := _connect()
|
|
logs.Info("ES QueryString index =", index, " key =", key, " fields =", fields)
|
|
query := elastic.NewQueryStringQuery(key)
|
|
for _, field := range fields {
|
|
query.Field(field)
|
|
}
|
|
query.AnalyzeWildcard(false)
|
|
s := client.Search().Index(index)
|
|
s = s.Query(query)
|
|
s = s.Sort("_score", false)
|
|
res, e := s.From(page * size).Size(size).Pretty(true).Do(context.Background())
|
|
if e != nil {
|
|
success = false
|
|
msg = e.Error()
|
|
return
|
|
}
|
|
count = res.TotalHits()
|
|
var user document.User
|
|
for _, item := range res.Each(reflect.TypeOf(user)) {
|
|
if t, ok := item.(document.User); ok {
|
|
list = append(list, t)
|
|
}
|
|
}
|
|
success = true
|
|
return
|
|
}
|