styx/elasticsearch/main.go
2020-02-17 12:08:49 +01:00

151 lines
3.4 KiB
Go

package elasticsearch
import (
"context"
"fmt"
"log"
"time"
"gitlab.dcso.lolcat/LABS/styx/models"
elastic "gopkg.in/olivere/elastic.v5"
)
const mapping = `{
"settings":{
"number_of_shards":1,
"number_of_replicas":0
},
"mappings":{
"_default_":{
"_all":{
"enabled":true
}
},
"paste":{
"properties":{
"key":{
"type":"keyword"
},
"data":{
"type":"text",
"store":true,
"fielddata":true
},
"size":{
"type":"long"
},
"syntax":{
"type":"keyword"
},
"date":{
"type":"date",
"format":"epoch_second"
},
"expire":{
"type":"date",
"format":"epoch_second"
}
}
}
}
}`
// ElasticStorageModule is a StorageModule that stores pastes in an
// Elasticsearch instance.
type ElasticStorageModule struct {
Index string
ElasticURL string
DailyIndexes bool
UseIndex string
LastChk time.Time
Client *elastic.Client
}
func (e *ElasticStorageModule) makeIndexIfNotExists() error {
index := e.Index
if e.DailyIndexes {
index = time.Now().Format(fmt.Sprintf("%s-2006-1-2", e.Index))
}
exists, err := e.Client.IndexExists(index).Do(context.Background())
if err != nil {
return err
}
if !exists {
log.Printf("Creating new index %s", index)
e.Client.CreateIndex(index).Body(mapping).Do(context.Background())
}
e.UseIndex = index
return nil
}
// Initialize prepares the storage modules for use.
func (e *ElasticStorageModule) Initialize() error {
log.Printf("Connecting to %v", e.ElasticURL)
var err error
e.Index = "pastebin"
e.Client, err = elastic.NewSimpleClient(elastic.SetURL(e.ElasticURL))
if err != nil {
return err
}
info, code, err := e.Client.Ping(e.ElasticURL).Do(context.Background())
if err != nil {
return err
}
log.Printf("Elasticsearch returned with code %d and version %s", code,
info.Version.Number)
err = e.makeIndexIfNotExists()
if err != nil {
return err
}
e.LastChk = time.Now()
log.Printf("Using index %s", e.UseIndex)
return err
}
// StorePaste stores a single paste in the storage backend.
func (e *ElasticStorageModule) StorePaste(paste models.FullPaste) {
// if e.DailyIndexes && time.Since(e.LastChk) > 12*time.Hour {
// e.makeIndexIfNotExists()
// e.LastChk = time.Now()
// }
_, err := e.Client.Index().
Index(e.UseIndex).
Type("paste").
Id(paste.Meta.Key).
BodyJson(paste).
Do(context.Background())
if err != nil {
log.Printf("Could not store paste %s due to %v", paste.Meta.FullURL, err)
}
}
// Check returns true if the given paste is new in the current storage backend,
// false otherwise.
func (e *ElasticStorageModule) Check(paste models.PasteMeta) bool {
q := elastic.NewMatchQuery("_id", paste.Key)
searchResult, err := e.Client.Search().
Index(e.Index).
Query(q).
Pretty(true).
Do(context.Background())
if err != nil {
log.Printf("Could not check paste due to %v", err)
return true
}
return (searchResult.TotalHits() == 0)
}
// Destroy finishes all operations on the module.
func (e *ElasticStorageModule) Destroy() error {
_, err := e.Client.Flush().Index(e.Index).Do(context.Background())
return err
}