diff --git a/main.go b/main.go index 49e0d9e..2491d59 100644 --- a/main.go +++ b/main.go @@ -1,14 +1,11 @@ package main import ( - "context" "fmt" "os" "sync" "time" - "github.com/CaliDog/certstream-go" - "github.com/jmoiron/jsonq" "github.com/ns3777k/go-shodan/v4/shodan" "github.com/segmentio/kafka-go" "github.com/sirupsen/logrus" @@ -17,6 +14,7 @@ import ( "gitlab.dcso.lolcat/LABS/styx/elasticsearch" "gitlab.dcso.lolcat/LABS/styx/filters" "gitlab.dcso.lolcat/LABS/styx/models" + "gitlab.dcso.lolcat/LABS/styx/plugins" ) func init() { @@ -40,84 +38,53 @@ func main() { fmt.Println("Starting to get data from the Internet...") // The false flag specifies that we want heartbeat messages. - conn, err := broker.SetUpKafkaConnecter() - if err != nil { - panic(err) - } + // conn, err := broker.SetUpKafkaConnecter() + // if err != nil { + // panic(err) + // } // stop channel stopChan := make(chan os.Signal) var wg sync.WaitGroup - wg.Add(3) - go broker.ReadEventFromKafka() + // go broker.ReadEventFromKafka() // certstream - stream, errStream := certstream.CertStreamEventStream(false) - if viper.GetBool("certstream.activated") { - go certstreamRoutine(stream, errStream, conn, stopChan, &wg) + c := plugins.CertStreamPlugin{} + if ok := c.Initialize(); !ok { + logrus.Error("Error initialising certstream module") } + c.Run(&wg) // pastebin - if viper.GetBool("pastebin.activated") { - go pastebinRoutine(stopChan, &wg) + p := plugins.PastebinPlugin{} + if ok := p.Initialize(); !ok { + logrus.Error("error initialising pastebin module") } + p.Run(&wg) // shodan - client := shodan.NewEnvClient(nil) - ch := make(chan *shodan.HostData) - err = client.GetBanners(context.Background(), ch) - if err != nil { - logrus.Panic(err) + s := plugins.ShodanPlugin{} + if ok := s.Initialize(); !ok { + logrus.Error("error initialising shodan plugin") } + p.Run(&wg) - if viper.GetBool("shodan.activated") { - go shodanRoutine(client, ch, conn, stopChan, &wg) - } + go func() { + <-stopChan + logrus.Println("Shutting down...") + c.Stop(&wg) + p.Stop(&wg) + s.Stop(&wg) + }() wg.Wait() + fmt.Println("done") } // routines -// CertstreamRoutine wraps the routine for grabbing Certstream data. -func certstreamRoutine(stream chan jsonq.JsonQuery, errStream chan error, conn *kafka.Conn, stopChan chan os.Signal, wg *sync.WaitGroup) { - fmt.Println("certstream is activated") - for { - select { - case jq := <-stream: - if data, err := models.ExtractCertFromStream(jq); err == nil { - - allDomains := data.Data.LeafCert.AllDomains - for _, domain := range allDomains { - if filters.RunDomainFilters(domain) { - rawNode := models.WrapCertStreamData(*data) - models.SaveCertStreamRaw("raw_certstream.json", rawNode) - - certNode := models.BuildCertNode(rawNode) - models.SaveCertNode("cert_nodes.json", certNode) - mainNode := models.BuildNode("node", "certstream", certNode.ID) - models.SaveNode("nodes.json", mainNode) - edge := models.BuildEdge("certstream", rawNode.ID, mainNode.ID) - models.SaveEdge(edge) - edge = models.BuildEdge("certstream", mainNode.ID, certNode.ID) - models.SaveEdge(edge) - saveSingleValues(conn, "certstream", "domain", certNode.ID, domain) - } - } - - } - - case err := <-errStream: - logrus.Error(err) - case <-stopChan: - wg.Done() - return - } - } -} - func pastebinRoutine(stopChan chan os.Signal, wg *sync.WaitGroup) { fmt.Println("pastebin is activated") @@ -136,36 +103,6 @@ func pastebinRoutine(stopChan chan os.Signal, wg *sync.WaitGroup) { panic(err) } } - for { - select { - default: - pastes, err := models.QueryPastes() - if err != nil { - logrus.Panic(err) - } - for _, p := range pastes { - paste, err := models.FetchPaste(p) - if err != nil { - logrus.Error("cannot fetch paste", err) - } - fp := models.FullPaste{ - Meta: p, - Full: paste, - } - res := models.BuildPasteNode(&fp) - if elastic { - e.StorePaste(fp) - } - models.SavePaste("paste_formatted.json", res) - time.Sleep(1 * time.Second) - - } - time.Sleep(3 * time.Second) - case <-stopChan: - wg.Done() - return - } - } } func shodanRoutine(client *shodan.Client, shodanChan chan *shodan.HostData, conn *kafka.Conn, stopChan chan os.Signal, wg *sync.WaitGroup) { diff --git a/models/shodan.go b/models/shodan.go deleted file mode 100644 index 2640e7f..0000000 --- a/models/shodan.go +++ /dev/null @@ -1 +0,0 @@ -package models diff --git a/parser/main.go b/parser/main.go deleted file mode 100644 index 0bfe2c2..0000000 --- a/parser/main.go +++ /dev/null @@ -1 +0,0 @@ -package parser diff --git a/plugins/certstream.go b/plugins/certstream.go new file mode 100644 index 0000000..fe9cff6 --- /dev/null +++ b/plugins/certstream.go @@ -0,0 +1,93 @@ +package plugins + +import ( + "sync" + + "github.com/CaliDog/certstream-go" + "github.com/jmoiron/jsonq" + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "gitlab.dcso.lolcat/LABS/styx/filters" + "gitlab.dcso.lolcat/LABS/styx/models" +) + +// CertStreamPlugin defines the general CertStreamPlugin structure. +type CertStreamPlugin struct { + Stream chan jsonq.JsonQuery + ErrStream chan error + StopChan chan bool + StoppedChan chan bool + Running bool +} + +// Initialize initialises the certstream configuration. +func (c *CertStreamPlugin) Initialize() bool { + if !viper.GetBool("certstream.activated") { + return false + } + + logrus.Info("certstream plugin is activated") + stream, errStream := certstream.CertStreamEventStream(false) + c.Stream = stream + c.ErrStream = errStream + return true +} + +// Run runs the Certstream plugin. +func (c *CertStreamPlugin) Run(wg *sync.WaitGroup) { + if !c.Running { + c.StopChan = make(chan bool) + wg.Add(1) + go c.doRun() + c.Running = true + } +} + +// Stop stops the Certstream plugin. +func (c *CertStreamPlugin) Stop(wg *sync.WaitGroup) { + if c.Running { + c.StopChan = make(chan bool) + close(c.StopChan) + <-c.StopChan + wg.Done() + c.Running = false + } +} + +func (c *CertStreamPlugin) doRun() { + for { + select { + case jq := <-c.Stream: + if data, err := models.ExtractCertFromStream(jq); err == nil { + + allDomains := data.Data.LeafCert.AllDomains + for _, domain := range allDomains { + if filters.RunDomainFilters(domain) { + rawNode := models.WrapCertStreamData(*data) + models.SaveCertStreamRaw("raw_certstream.json", rawNode) + + certNode := models.BuildCertNode(rawNode) + models.SaveCertNode("cert_nodes.json", certNode) + mainNode := models.BuildNode("node", "certstream", certNode.ID) + models.SaveNode("nodes.json", mainNode) + edge := models.BuildEdge("certstream", rawNode.ID, mainNode.ID) + models.SaveEdge(edge) + edge = models.BuildEdge("certstream", mainNode.ID, certNode.ID) + models.SaveEdge(edge) + // saveSingleValues(conn, "certstream", "domain", certNode.ID, domain) + } + } + + } + + case err := <-c.ErrStream: + logrus.Error(err) + + } + } +} + +// Destroy closes the channel to the certstream websocket. +func (c *CertStreamPlugin) Destroy() { + close(c.Stream) +} diff --git a/plugins/main.go b/plugins/main.go new file mode 100644 index 0000000..e5dbad5 --- /dev/null +++ b/plugins/main.go @@ -0,0 +1,23 @@ +package plugins + +var StyxPlugins = []StyxPlugin{} + +// StyxPlugin defines the general plugin architecture. +type StyxPlugin interface { + Initialize() + Run() + Stop() + Check() +} + +// RegisterStyxPlugin makes an enrichment plugin available for usage +func RegisterStyxPlugin(p StyxPlugin) { + StyxPlugins = append(StyxPlugins, p) +} + +// Worker is a generic set of fields to support graceful start/stop of a concurrent service. +type Worker struct { + StopChan chan bool + StoppedChan chan bool + Running bool +} diff --git a/plugins/pastebin.go b/plugins/pastebin.go new file mode 100644 index 0000000..18ccbf2 --- /dev/null +++ b/plugins/pastebin.go @@ -0,0 +1,77 @@ +package plugins + +import ( + "sync" + "time" + + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "gitlab.dcso.lolcat/LABS/styx/models" +) + +// PastebinPlugin defines the general PastebinPlugin structure. +type PastebinPlugin struct { + StopChan chan bool + StoppedChan chan bool + Running bool +} + +// Initialize initialises the certstream configuration. +func (p *PastebinPlugin) Initialize() bool { + if !viper.GetBool("pastebin.activated") { + return false + } + logrus.Info("pastebin plugin is activated") + return true +} + +// Run runs the Pastebin plugin. +func (p *PastebinPlugin) Run(wg *sync.WaitGroup) { + if !p.Running { + p.StoppedChan = make(chan bool) + wg.Add(1) + go p.doRun() + p.Running = true + } +} + +// Stop stops the Pastebin plugin. +func (p *PastebinPlugin) Stop(wg *sync.WaitGroup) { + if p.Running { + p.StoppedChan = make(chan bool) + close(p.StopChan) + <-p.StopChan + wg.Done() + p.Running = false + } +} + +func (p *PastebinPlugin) doRun() { + for { + select { + default: + pastes, err := models.QueryPastes() + if err != nil { + logrus.Error(err) + } + for _, p := range pastes { + paste, err := models.FetchPaste(p) + if err != nil { + logrus.Error("cannot fetch paste", err) + } + fp := models.FullPaste{ + Meta: p, + Full: paste, + } + res := models.BuildPasteNode(&fp) + // if elastic { + // e.StorePaste(fp) + // } + models.SavePaste("paste_formatted.json", res) + time.Sleep(1 * time.Second) + + } + time.Sleep(3 * time.Second) + } + } +} diff --git a/plugins/shodan.go b/plugins/shodan.go new file mode 100644 index 0000000..fb02eef --- /dev/null +++ b/plugins/shodan.go @@ -0,0 +1,97 @@ +package plugins + +import ( + "fmt" + "sync" + + "github.com/ns3777k/go-shodan/v4/shodan" + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "gitlab.dcso.lolcat/LABS/styx/filters" + "gitlab.dcso.lolcat/LABS/styx/models" +) + +// ShodanPlugin defines the general ShodanPlugin structure. +type ShodanPlugin struct { + Client *shodan.Client + ShodanChan chan *shodan.HostData + StopChan chan bool + StoppedChan chan bool + Running bool +} + +// Initialize initialises the certstream configuration. +func (s *ShodanPlugin) Initialize() bool { + if !viper.GetBool("shodan.activated") { + return false + } + logrus.Info("shodan plugin is activated") + return true +} + +// Run runs the Shodan plugin. +func (s *ShodanPlugin) Run(wg *sync.WaitGroup) { + if !s.Running { + s.StoppedChan = make(chan bool) + wg.Add(1) + go s.doRun() + s.Running = true + } +} + +// Stop stops the Shodan plugin. +func (s *ShodanPlugin) Stop(wg *sync.WaitGroup) { + if s.Running { + s.StoppedChan = make(chan bool) + close(s.StopChan) + <-s.StopChan + wg.Done() + s.Running = false + } +} + +func (s *ShodanPlugin) doRun() { + for { + select { + default: + banner, ok := <-s.ShodanChan + if !ok { + logrus.Error("channel is closed") + break + } + + shodanNode := models.BuildShodanNode(banner) + // first filter poc + if shodanNode.Data.HTML != "" { + if !filters.RunIPFilters(shodanNode.Data.IP) { + hostnames := shodanNode.Data.Hostnames + var hostNotInFilters, domainNotInFilters bool + if len(hostnames) != 0 { + for _, hostname := range hostnames { + hostNotInFilters = filters.RunDomainFilters(hostname) + if hostNotInFilters { + // saveSingleValues(conn, "shodan_stream", "hostname", shodanNode.ID, hostname) + } + } + } + domains := shodanNode.Data.Domains + if len(domains) != 0 { + for _, domain := range domains { + domainNotInFilters = filters.RunDomainFilters(domain) + // saveSingleValues(conn, "shodan_stream", "domain", shodanNode.ID, domain) + } + } + if domainNotInFilters && hostNotInFilters { + models.SaveShodanNode("raw_shodan.json", shodanNode) + node := models.BuildNode("shodan", "shodan_stream", shodanNode.ID) + models.SaveNode("nodes.json", node) + edge := models.BuildEdge("shodan", shodanNode.ID, node.ID) + models.SaveEdge(edge) + } + } else { + fmt.Println("is akamai", shodanNode.Data.IP) + } + } + } + } +} diff --git a/utils/main.go b/utils/main.go deleted file mode 100644 index d4b585b..0000000 --- a/utils/main.go +++ /dev/null @@ -1 +0,0 @@ -package utils diff --git a/utils/saves.go b/utils/saves.go deleted file mode 100644 index d4b585b..0000000 --- a/utils/saves.go +++ /dev/null @@ -1 +0,0 @@ -package utils