Changing the plugin architecture for a modular architecture.

This commit is contained in:
Christopher Talib 2020-02-25 10:05:31 +01:00
commit d761e824f3
9 changed files with 316 additions and 93 deletions

115
main.go
View file

@ -1,14 +1,11 @@
package main
import (
"context"
"fmt"
"os"
"sync"
"time"
"github.com/CaliDog/certstream-go"
"github.com/jmoiron/jsonq"
"github.com/ns3777k/go-shodan/v4/shodan"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
@ -17,6 +14,7 @@ import (
"gitlab.dcso.lolcat/LABS/styx/elasticsearch"
"gitlab.dcso.lolcat/LABS/styx/filters"
"gitlab.dcso.lolcat/LABS/styx/models"
"gitlab.dcso.lolcat/LABS/styx/plugins"
)
func init() {
@ -40,84 +38,53 @@ func main() {
fmt.Println("Starting to get data from the Internet...")
// The false flag specifies that we want heartbeat messages.
conn, err := broker.SetUpKafkaConnecter()
if err != nil {
panic(err)
}
// conn, err := broker.SetUpKafkaConnecter()
// if err != nil {
// panic(err)
// }
// stop channel
stopChan := make(chan os.Signal)
var wg sync.WaitGroup
wg.Add(3)
go broker.ReadEventFromKafka()
// go broker.ReadEventFromKafka()
// certstream
stream, errStream := certstream.CertStreamEventStream(false)
if viper.GetBool("certstream.activated") {
go certstreamRoutine(stream, errStream, conn, stopChan, &wg)
c := plugins.CertStreamPlugin{}
if ok := c.Initialize(); !ok {
logrus.Error("Error initialising certstream module")
}
c.Run(&wg)
// pastebin
if viper.GetBool("pastebin.activated") {
go pastebinRoutine(stopChan, &wg)
p := plugins.PastebinPlugin{}
if ok := p.Initialize(); !ok {
logrus.Error("error initialising pastebin module")
}
p.Run(&wg)
// shodan
client := shodan.NewEnvClient(nil)
ch := make(chan *shodan.HostData)
err = client.GetBanners(context.Background(), ch)
if err != nil {
logrus.Panic(err)
s := plugins.ShodanPlugin{}
if ok := s.Initialize(); !ok {
logrus.Error("error initialising shodan plugin")
}
p.Run(&wg)
if viper.GetBool("shodan.activated") {
go shodanRoutine(client, ch, conn, stopChan, &wg)
}
go func() {
<-stopChan
logrus.Println("Shutting down...")
c.Stop(&wg)
p.Stop(&wg)
s.Stop(&wg)
}()
wg.Wait()
fmt.Println("done")
}
// routines
// CertstreamRoutine wraps the routine for grabbing Certstream data.
func certstreamRoutine(stream chan jsonq.JsonQuery, errStream chan error, conn *kafka.Conn, stopChan chan os.Signal, wg *sync.WaitGroup) {
fmt.Println("certstream is activated")
for {
select {
case jq := <-stream:
if data, err := models.ExtractCertFromStream(jq); err == nil {
allDomains := data.Data.LeafCert.AllDomains
for _, domain := range allDomains {
if filters.RunDomainFilters(domain) {
rawNode := models.WrapCertStreamData(*data)
models.SaveCertStreamRaw("raw_certstream.json", rawNode)
certNode := models.BuildCertNode(rawNode)
models.SaveCertNode("cert_nodes.json", certNode)
mainNode := models.BuildNode("node", "certstream", certNode.ID)
models.SaveNode("nodes.json", mainNode)
edge := models.BuildEdge("certstream", rawNode.ID, mainNode.ID)
models.SaveEdge(edge)
edge = models.BuildEdge("certstream", mainNode.ID, certNode.ID)
models.SaveEdge(edge)
saveSingleValues(conn, "certstream", "domain", certNode.ID, domain)
}
}
}
case err := <-errStream:
logrus.Error(err)
case <-stopChan:
wg.Done()
return
}
}
}
func pastebinRoutine(stopChan chan os.Signal, wg *sync.WaitGroup) {
fmt.Println("pastebin is activated")
@ -136,36 +103,6 @@ func pastebinRoutine(stopChan chan os.Signal, wg *sync.WaitGroup) {
panic(err)
}
}
for {
select {
default:
pastes, err := models.QueryPastes()
if err != nil {
logrus.Panic(err)
}
for _, p := range pastes {
paste, err := models.FetchPaste(p)
if err != nil {
logrus.Error("cannot fetch paste", err)
}
fp := models.FullPaste{
Meta: p,
Full: paste,
}
res := models.BuildPasteNode(&fp)
if elastic {
e.StorePaste(fp)
}
models.SavePaste("paste_formatted.json", res)
time.Sleep(1 * time.Second)
}
time.Sleep(3 * time.Second)
case <-stopChan:
wg.Done()
return
}
}
}
func shodanRoutine(client *shodan.Client, shodanChan chan *shodan.HostData, conn *kafka.Conn, stopChan chan os.Signal, wg *sync.WaitGroup) {

View file

@ -1 +0,0 @@
package models

View file

@ -1 +0,0 @@
package parser

93
plugins/certstream.go Normal file
View file

@ -0,0 +1,93 @@
package plugins
import (
"sync"
"github.com/CaliDog/certstream-go"
"github.com/jmoiron/jsonq"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/filters"
"gitlab.dcso.lolcat/LABS/styx/models"
)
// CertStreamPlugin defines the general CertStreamPlugin structure.
type CertStreamPlugin struct {
Stream chan jsonq.JsonQuery
ErrStream chan error
StopChan chan bool
StoppedChan chan bool
Running bool
}
// Initialize initialises the certstream configuration.
func (c *CertStreamPlugin) Initialize() bool {
if !viper.GetBool("certstream.activated") {
return false
}
logrus.Info("certstream plugin is activated")
stream, errStream := certstream.CertStreamEventStream(false)
c.Stream = stream
c.ErrStream = errStream
return true
}
// Run runs the Certstream plugin.
func (c *CertStreamPlugin) Run(wg *sync.WaitGroup) {
if !c.Running {
c.StopChan = make(chan bool)
wg.Add(1)
go c.doRun()
c.Running = true
}
}
// Stop stops the Certstream plugin.
func (c *CertStreamPlugin) Stop(wg *sync.WaitGroup) {
if c.Running {
c.StopChan = make(chan bool)
close(c.StopChan)
<-c.StopChan
wg.Done()
c.Running = false
}
}
func (c *CertStreamPlugin) doRun() {
for {
select {
case jq := <-c.Stream:
if data, err := models.ExtractCertFromStream(jq); err == nil {
allDomains := data.Data.LeafCert.AllDomains
for _, domain := range allDomains {
if filters.RunDomainFilters(domain) {
rawNode := models.WrapCertStreamData(*data)
models.SaveCertStreamRaw("raw_certstream.json", rawNode)
certNode := models.BuildCertNode(rawNode)
models.SaveCertNode("cert_nodes.json", certNode)
mainNode := models.BuildNode("node", "certstream", certNode.ID)
models.SaveNode("nodes.json", mainNode)
edge := models.BuildEdge("certstream", rawNode.ID, mainNode.ID)
models.SaveEdge(edge)
edge = models.BuildEdge("certstream", mainNode.ID, certNode.ID)
models.SaveEdge(edge)
// saveSingleValues(conn, "certstream", "domain", certNode.ID, domain)
}
}
}
case err := <-c.ErrStream:
logrus.Error(err)
}
}
}
// Destroy closes the channel to the certstream websocket.
func (c *CertStreamPlugin) Destroy() {
close(c.Stream)
}

23
plugins/main.go Normal file
View file

@ -0,0 +1,23 @@
package plugins
var StyxPlugins = []StyxPlugin{}
// StyxPlugin defines the general plugin architecture.
type StyxPlugin interface {
Initialize()
Run()
Stop()
Check()
}
// RegisterStyxPlugin makes an enrichment plugin available for usage
func RegisterStyxPlugin(p StyxPlugin) {
StyxPlugins = append(StyxPlugins, p)
}
// Worker is a generic set of fields to support graceful start/stop of a concurrent service.
type Worker struct {
StopChan chan bool
StoppedChan chan bool
Running bool
}

77
plugins/pastebin.go Normal file
View file

@ -0,0 +1,77 @@
package plugins
import (
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/models"
)
// PastebinPlugin defines the general PastebinPlugin structure.
type PastebinPlugin struct {
StopChan chan bool
StoppedChan chan bool
Running bool
}
// Initialize initialises the certstream configuration.
func (p *PastebinPlugin) Initialize() bool {
if !viper.GetBool("pastebin.activated") {
return false
}
logrus.Info("pastebin plugin is activated")
return true
}
// Run runs the Pastebin plugin.
func (p *PastebinPlugin) Run(wg *sync.WaitGroup) {
if !p.Running {
p.StoppedChan = make(chan bool)
wg.Add(1)
go p.doRun()
p.Running = true
}
}
// Stop stops the Pastebin plugin.
func (p *PastebinPlugin) Stop(wg *sync.WaitGroup) {
if p.Running {
p.StoppedChan = make(chan bool)
close(p.StopChan)
<-p.StopChan
wg.Done()
p.Running = false
}
}
func (p *PastebinPlugin) doRun() {
for {
select {
default:
pastes, err := models.QueryPastes()
if err != nil {
logrus.Error(err)
}
for _, p := range pastes {
paste, err := models.FetchPaste(p)
if err != nil {
logrus.Error("cannot fetch paste", err)
}
fp := models.FullPaste{
Meta: p,
Full: paste,
}
res := models.BuildPasteNode(&fp)
// if elastic {
// e.StorePaste(fp)
// }
models.SavePaste("paste_formatted.json", res)
time.Sleep(1 * time.Second)
}
time.Sleep(3 * time.Second)
}
}
}

97
plugins/shodan.go Normal file
View file

@ -0,0 +1,97 @@
package plugins
import (
"fmt"
"sync"
"github.com/ns3777k/go-shodan/v4/shodan"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/filters"
"gitlab.dcso.lolcat/LABS/styx/models"
)
// ShodanPlugin defines the general ShodanPlugin structure.
type ShodanPlugin struct {
Client *shodan.Client
ShodanChan chan *shodan.HostData
StopChan chan bool
StoppedChan chan bool
Running bool
}
// Initialize initialises the certstream configuration.
func (s *ShodanPlugin) Initialize() bool {
if !viper.GetBool("shodan.activated") {
return false
}
logrus.Info("shodan plugin is activated")
return true
}
// Run runs the Shodan plugin.
func (s *ShodanPlugin) Run(wg *sync.WaitGroup) {
if !s.Running {
s.StoppedChan = make(chan bool)
wg.Add(1)
go s.doRun()
s.Running = true
}
}
// Stop stops the Shodan plugin.
func (s *ShodanPlugin) Stop(wg *sync.WaitGroup) {
if s.Running {
s.StoppedChan = make(chan bool)
close(s.StopChan)
<-s.StopChan
wg.Done()
s.Running = false
}
}
func (s *ShodanPlugin) doRun() {
for {
select {
default:
banner, ok := <-s.ShodanChan
if !ok {
logrus.Error("channel is closed")
break
}
shodanNode := models.BuildShodanNode(banner)
// first filter poc
if shodanNode.Data.HTML != "" {
if !filters.RunIPFilters(shodanNode.Data.IP) {
hostnames := shodanNode.Data.Hostnames
var hostNotInFilters, domainNotInFilters bool
if len(hostnames) != 0 {
for _, hostname := range hostnames {
hostNotInFilters = filters.RunDomainFilters(hostname)
if hostNotInFilters {
// saveSingleValues(conn, "shodan_stream", "hostname", shodanNode.ID, hostname)
}
}
}
domains := shodanNode.Data.Domains
if len(domains) != 0 {
for _, domain := range domains {
domainNotInFilters = filters.RunDomainFilters(domain)
// saveSingleValues(conn, "shodan_stream", "domain", shodanNode.ID, domain)
}
}
if domainNotInFilters && hostNotInFilters {
models.SaveShodanNode("raw_shodan.json", shodanNode)
node := models.BuildNode("shodan", "shodan_stream", shodanNode.ID)
models.SaveNode("nodes.json", node)
edge := models.BuildEdge("shodan", shodanNode.ID, node.ID)
models.SaveEdge(edge)
}
} else {
fmt.Println("is akamai", shodanNode.Data.IP)
}
}
}
}
}

View file

@ -1 +0,0 @@
package utils

View file

@ -1 +0,0 @@
package utils