styx/plugins/certstream.go
2020-03-04 16:34:14 +01:00

127 lines
3.1 KiB
Go

package plugins
import (
"context"
"encoding/json"
"sync"
"github.com/CaliDog/certstream-go"
"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/jmoiron/jsonq"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/filters"
"gitlab.dcso.lolcat/LABS/styx/models"
)
// CertStreamPlugin defines the general CertStreamPlugin structure.
type CertStreamPlugin struct {
Stream chan jsonq.JsonQuery
ErrStream chan error
StopChan chan bool
StoppedChan chan bool
Running bool
}
// Initialize initialises the certstream configuration.
func (c *CertStreamPlugin) Initialize() bool {
if !viper.GetBool("certstream.activated") {
return false
}
logrus.Info("certstream plugin is activated")
stream, errStream := certstream.CertStreamEventStream(false)
c.Stream = stream
c.ErrStream = errStream
return true
}
// Run runs the Certstream plugin.
func (c *CertStreamPlugin) Run(wg *sync.WaitGroup, dgraphClient *dgo.Dgraph) {
if !c.Running {
c.StopChan = make(chan bool)
wg.Add(1)
go c.doRun(dgraphClient)
c.Running = true
}
}
// Stop stops the Certstream plugin.
func (c *CertStreamPlugin) Stop(wg *sync.WaitGroup) {
if c.Running {
c.StopChan = make(chan bool)
close(c.StopChan)
<-c.StopChan
wg.Done()
c.Running = false
}
}
func (c *CertStreamPlugin) doRun(graphClient *dgo.Dgraph) {
for {
select {
case jq := <-c.Stream:
if data, err := models.ExtractCertFromStream(jq); err == nil {
allDomains := data.Data.LeafCert.AllDomains
for _, domain := range allDomains {
if filters.RunDomainFilters(domain) {
rawNode := models.WrapCertStreamData(*data)
models.SaveCertStreamRaw("raw_certstream.json", rawNode)
certNode := models.BuildCertNode(rawNode)
models.SaveCertNode("cert_nodes.json", certNode)
mainNode := models.BuildNode("node", "certstream", certNode.ID)
models.SaveNode("nodes.json", mainNode)
edge := models.BuildEdge("certstream", rawNode.ID, mainNode.ID)
models.SaveEdge(edge)
edge = models.BuildEdge("certstream", mainNode.ID, certNode.ID)
models.SaveEdge(edge)
// saveSingleValues(conn, "certstream", "domain", certNode.ID, domain)
mu := &api.Mutation{
CommitNow: true,
}
marshaled, err := json.Marshal(mainNode)
if err != nil {
logrus.Fatal(err)
}
mu.SetJson = marshaled
_, err = graphClient.NewTxn().Mutate(context.Background(), mu)
if err != nil {
logrus.Fatal(err)
}
// variables := map[string]string{"$id": assigned.Uids[mainNode.ID]}
// q := `query Node($id: string){
// node(func: uid($id)) {
// uid
// ID
// type
// data
// dreated
// modified
// }
// }`
// _, err := graphClient.NewTxn().QueryWithVars(context.Background(), q, variables)
// if err != nil {
// logrus.Fatal(err)
// }
}
}
}
case err := <-c.ErrStream:
logrus.Error(err)
}
}
}
// Destroy closes the channel to the certstream websocket.
func (c *CertStreamPlugin) Destroy() {
close(c.Stream)
}