styx/plugins/certstream.go
Christopher Talib d0c8deae99 saving
2020-03-19 09:27:15 +01:00

160 lines
4 KiB
Go

package plugins
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/CaliDog/certstream-go"
"github.com/christalib/structs"
"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/jmoiron/jsonq"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/filters"
"gitlab.dcso.lolcat/LABS/styx/models"
)
// CertStreamPlugin defines the general CertStreamPlugin structure.
type CertStreamPlugin struct {
Stream chan jsonq.JsonQuery
ErrStream chan error
StopChan chan bool
StoppedChan chan bool
Running bool
}
// Initialize initialises the certstream configuration.
func (c *CertStreamPlugin) Initialize() bool {
if !viper.GetBool("certstream.activated") {
return false
}
logrus.Info("certstream plugin is activated")
stream, errStream := certstream.CertStreamEventStream(false)
c.Stream = stream
c.ErrStream = errStream
return true
}
// Run runs the Certstream plugin.
func (c *CertStreamPlugin) Run(wg *sync.WaitGroup, dgraphClient *dgo.Dgraph) {
if !c.Running {
c.StopChan = make(chan bool)
wg.Add(1)
go c.doRun(dgraphClient)
c.Running = true
}
}
// Stop stops the Certstream plugin.
func (c *CertStreamPlugin) Stop(wg *sync.WaitGroup) {
if c.Running {
c.StopChan = make(chan bool)
close(c.StopChan)
<-c.StopChan
wg.Done()
c.Running = false
}
}
func (c *CertStreamPlugin) doRun(graphClient *dgo.Dgraph) {
for {
select {
case jq := <-c.Stream:
if data, err := models.ExtractCertFromStream(jq); err == nil {
allDomains := data.CSData.LeafCert.AllDomains
for _, domain := range allDomains {
if filters.RunDomainFilters(domain) {
rawNode := models.WrapCertStreamData(*data)
// models.SaveCertStreamRaw("raw_certstream.json", rawNode)
certNode := models.BuildCertNode(rawNode)
models.SaveCertNode("cert_nodes.json", certNode)
mainNode := models.BuildNode("node", "certstream", certNode.ID)
models.SaveNode("nodes.json", mainNode)
rawEdge := models.BuildEdge("certstream", structs.Map(rawNode), structs.Map(mainNode))
models.SaveEdge(rawEdge)
edge := models.BuildEdge("certstream", structs.Map(mainNode), structs.Map(certNode))
models.SaveEdge(edge)
// saveSingleValues(conn, "certstream", "domain", certNode.ID, domain)
mu := &api.Mutation{
CommitNow: true,
}
marshaled, err := json.Marshal(mainNode)
if err != nil {
logrus.Fatal(err)
}
mu.SetJson = marshaled
_, err = graphClient.NewTxn().Mutate(context.Background(), mu)
if err != nil {
logrus.Fatal(err)
}
variables := map[string]string{"$id": mainNode.ID}
q := `query Node($id: string){
node(func: eq(id, $id)) {
uid
id
type
ndata
created
modified
}
}`
node, err := graphClient.NewTxn().QueryWithVars(context.Background(), q, variables)
if err != nil {
logrus.Fatal(err)
}
marshaled, err = json.Marshal(certNode)
if err != nil {
logrus.Fatal(err)
}
mu.SetJson = marshaled
_, err = graphClient.NewTxn().Mutate(context.Background(), mu)
if err != nil {
logrus.Fatal(err)
}
query := `
query Node($mainNodeID: string, $subNodeID: string) {
node as var(func: eq(id, $mainNodeID))
}
`
mu = &api.Mutation{
SetNquads: []byte(`uid(node) <CertNode> "$subNodeID" .`),
}
req := &api.Request{
Query: query,
Mutations: []*api.Mutation{mu},
CommitNow: true,
Vars: map[string]string{"$mainNodeID": node.Uids[mainNode.ID], "$subNodeID": certNode.ID},
}
res, err := graphClient.NewTxn().Do(context.Background(), req)
if err != nil {
logrus.Error(err)
}
fmt.Println(res)
}
}
}
case err := <-c.ErrStream:
logrus.Error(err)
}
}
}
// Destroy closes the channel to the certstream websocket.
func (c *CertStreamPlugin) Destroy() {
close(c.Stream)
}