styx/plugins/certstream.go
2020-08-28 15:55:18 +02:00

120 lines
2.9 KiB
Go

package plugins
import (
"context"
"encoding/json"
"sync"
"github.com/CaliDog/certstream-go"
"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/jmoiron/jsonq"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/filters"
"gitlab.dcso.lolcat/LABS/styx/models"
)
// CertStreamPlugin defines the general CertStreamPlugin structure.
type CertStreamPlugin struct {
Stream chan jsonq.JsonQuery
ErrStream chan error
StopChan chan bool
StoppedChan chan bool
Running bool
}
// Initialize initialises the certstream configuration.
func (c *CertStreamPlugin) Initialize() bool {
if !viper.GetBool("certstream.activated") {
return false
}
logrus.Info("certstream plugin is activated")
stream, errStream := certstream.CertStreamEventStream(false)
c.Stream = stream
c.ErrStream = errStream
return true
}
// Run runs the Certstream plugin.
func (c *CertStreamPlugin) Run(wg *sync.WaitGroup, dgraphClient *dgo.Dgraph) {
if !c.Running {
c.StopChan = make(chan bool)
wg.Add(1)
go c.doRun(dgraphClient)
c.Running = true
}
}
// Stop stops the Certstream plugin.
func (c *CertStreamPlugin) Stop(wg *sync.WaitGroup) {
if c.Running {
c.StopChan = make(chan bool)
close(c.StopChan)
<-c.StopChan
wg.Done()
c.Running = false
}
}
func (c *CertStreamPlugin) doRun(graphClient *dgo.Dgraph) {
for {
select {
case jq := <-c.Stream:
if data, err := models.ExtractCertFromStream(jq); err == nil {
allDomains := data.CSData.LeafCert.AllDomains
for _, domain := range allDomains {
if filters.RunDomainFilters(domain) {
rawNode := models.WrapCertStreamData(*data)
// models.SaveCertStreamRaw("raw_certstream.json", rawNode)
certNode := models.BuildCertNode(rawNode)
// models.SaveCertNode("cert_nodes.json", certNode)
mainNode := models.BuildNode("node", "certstream", certNode.ID)
// models.SaveNode("nodes.json", mainNode)
// rawEdge := models.BuildEdge("certstream", structs.Map(rawNode), structs.Map(mainNode))
// models.SaveEdge(rawEdge)
// edge := models.BuildEdge("certstream", structs.Map(mainNode), structs.Map(certNode))
// models.SaveEdge(edge)
// saveSingleValues(conn, "certstream", "domain", certNode.ID, domain)
// edge between Node and CertNode
e := models.Node{
ID: mainNode.ID,
NodeType: mainNode.NodeType,
NData: mainNode.NData,
Created: mainNode.Created,
Modified: mainNode.Modified,
CertNode: *certNode,
}
ctx := context.Background()
mu := &api.Mutation{
CommitNow: true,
}
pb, err := json.Marshal(e)
if err != nil {
logrus.Fatal(err)
}
mu.SetJson = pb
_, err = graphClient.NewTxn().Mutate(ctx, mu)
if err != nil {
logrus.Fatal(err)
}
}
}
}
case err := <-c.ErrStream:
logrus.Error(err)
}
}
}