package plugins import ( "context" "encoding/json" "sync" "github.com/CaliDog/certstream-go" "github.com/dgraph-io/dgo/v2" "github.com/dgraph-io/dgo/v2/protos/api" "github.com/jmoiron/jsonq" "github.com/sirupsen/logrus" "github.com/spf13/viper" "gitlab.dcso.lolcat/LABS/styx/filters" "gitlab.dcso.lolcat/LABS/styx/models" ) // CertStreamPlugin defines the general CertStreamPlugin structure. type CertStreamPlugin struct { Stream chan jsonq.JsonQuery ErrStream chan error StopChan chan bool StoppedChan chan bool Running bool } // Initialize initialises the certstream configuration. func (c *CertStreamPlugin) Initialize() bool { if !viper.GetBool("certstream.activated") { return false } logrus.Info("certstream plugin is activated") stream, errStream := certstream.CertStreamEventStream(false) c.Stream = stream c.ErrStream = errStream return true } // Run runs the Certstream plugin. func (c *CertStreamPlugin) Run(wg *sync.WaitGroup, dgraphClient *dgo.Dgraph) { if !c.Running { c.StopChan = make(chan bool) wg.Add(1) go c.doRun(dgraphClient) c.Running = true } } // Stop stops the Certstream plugin. func (c *CertStreamPlugin) Stop(wg *sync.WaitGroup) { if c.Running { c.StopChan = make(chan bool) close(c.StopChan) <-c.StopChan wg.Done() c.Running = false } } func (c *CertStreamPlugin) doRun(graphClient *dgo.Dgraph) { for { select { case jq := <-c.Stream: if data, err := models.ExtractCertFromStream(jq); err == nil { allDomains := data.CSData.LeafCert.AllDomains for _, domain := range allDomains { if filters.RunDomainFilters(domain) { rawNode := models.WrapCertStreamData(*data) // models.SaveCertStreamRaw("raw_certstream.json", rawNode) certNode := models.BuildCertNode(rawNode) // models.SaveCertNode("cert_nodes.json", certNode) mainNode := models.BuildNode("node", "certstream", certNode.ID) // models.SaveNode("nodes.json", mainNode) // rawEdge := models.BuildEdge("certstream", structs.Map(rawNode), structs.Map(mainNode)) // models.SaveEdge(rawEdge) // edge := models.BuildEdge("certstream", structs.Map(mainNode), structs.Map(certNode)) // models.SaveEdge(edge) // saveSingleValues(conn, "certstream", "domain", certNode.ID, domain) // edge between Node and CertNode e := models.Node{ ID: mainNode.ID, NodeType: mainNode.NodeType, NData: mainNode.NData, Created: mainNode.Created, Modified: mainNode.Modified, CertNode: *certNode, } ctx := context.Background() mu := &api.Mutation{ CommitNow: true, } pb, err := json.Marshal(e) if err != nil { logrus.Fatal(err) } mu.SetJson = pb _, err = graphClient.NewTxn().Mutate(ctx, mu) if err != nil { logrus.Fatal(err) } } } } case err := <-c.ErrStream: logrus.Error(err) } } }