styx/plugins/certstream.go
Christopher Talib f61fe566a5 Basic connection to Dgraph DB
The first work and input to the graph db is set up in this work. It's
for the moment very basic and doesn't cover relations and only works for
certstream data.
2020-03-04 15:16:59 +01:00

129 lines
3 KiB
Go

package plugins
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/CaliDog/certstream-go"
"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/jmoiron/jsonq"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/filters"
"gitlab.dcso.lolcat/LABS/styx/models"
)
// CertStreamPlugin defines the general CertStreamPlugin structure.
type CertStreamPlugin struct {
Stream chan jsonq.JsonQuery
ErrStream chan error
StopChan chan bool
StoppedChan chan bool
Running bool
}
// Initialize initialises the certstream configuration.
func (c *CertStreamPlugin) Initialize() bool {
if !viper.GetBool("certstream.activated") {
return false
}
logrus.Info("certstream plugin is activated")
stream, errStream := certstream.CertStreamEventStream(false)
c.Stream = stream
c.ErrStream = errStream
return true
}
// Run runs the Certstream plugin.
func (c *CertStreamPlugin) Run(wg *sync.WaitGroup, dgraphClient *dgo.Dgraph) {
if !c.Running {
c.StopChan = make(chan bool)
wg.Add(1)
go c.doRun(dgraphClient)
c.Running = true
}
}
// Stop stops the Certstream plugin.
func (c *CertStreamPlugin) Stop(wg *sync.WaitGroup) {
if c.Running {
c.StopChan = make(chan bool)
close(c.StopChan)
<-c.StopChan
wg.Done()
c.Running = false
}
}
func (c *CertStreamPlugin) doRun(graphClient *dgo.Dgraph) {
for {
select {
case jq := <-c.Stream:
if data, err := models.ExtractCertFromStream(jq); err == nil {
allDomains := data.Data.LeafCert.AllDomains
for _, domain := range allDomains {
if filters.RunDomainFilters(domain) {
rawNode := models.WrapCertStreamData(*data)
models.SaveCertStreamRaw("raw_certstream.json", rawNode)
certNode := models.BuildCertNode(rawNode)
models.SaveCertNode("cert_nodes.json", certNode)
mainNode := models.BuildNode("node", "certstream", certNode.ID)
models.SaveNode("nodes.json", mainNode)
edge := models.BuildEdge("certstream", rawNode.ID, mainNode.ID)
models.SaveEdge(edge)
edge = models.BuildEdge("certstream", mainNode.ID, certNode.ID)
models.SaveEdge(edge)
// saveSingleValues(conn, "certstream", "domain", certNode.ID, domain)
mu := &api.Mutation{
CommitNow: true,
}
marshaled, err := json.Marshal(mainNode)
if err != nil {
logrus.Fatal(err)
}
mu.SetJson = marshaled
assigned, err := graphClient.NewTxn().Mutate(context.Background(), mu)
if err != nil {
logrus.Fatal(err)
}
variables := map[string]string{"$id": assigned.Uids[mainNode.ID]}
q := `query Node($id: string){
node(func: uid($id)) {
uid
ID
Type
Data
Created
Modified
}
}`
resp, err := graphClient.NewTxn().QueryWithVars(context.Background(), q, variables)
if err != nil {
logrus.Fatal(err)
}
fmt.Println(resp)
}
}
}
case err := <-c.ErrStream:
logrus.Error(err)
}
}
}
// Destroy closes the channel to the certstream websocket.
func (c *CertStreamPlugin) Destroy() {
close(c.Stream)
}