styx/main.go
Christopher Talib b2da64a9d7 Enh/modular arch
2020-02-25 10:05:31 +01:00

168 lines
4.1 KiB
Go

package main
import (
"fmt"
"os"
"sync"
"time"
"github.com/ns3777k/go-shodan/v4/shodan"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/broker"
"gitlab.dcso.lolcat/LABS/styx/elasticsearch"
"gitlab.dcso.lolcat/LABS/styx/filters"
"gitlab.dcso.lolcat/LABS/styx/models"
"gitlab.dcso.lolcat/LABS/styx/plugins"
)
func init() {
// Setting up logging.
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
})
logrus.SetReportCaller(true)
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
}
func main() {
err := viper.ReadInConfig()
if err != nil {
panic(err)
}
os.Setenv("SHODAN_KEY", viper.GetString("shodan.key"))
fmt.Println("Starting to get data from the Internet...")
// The false flag specifies that we want heartbeat messages.
// conn, err := broker.SetUpKafkaConnecter()
// if err != nil {
// panic(err)
// }
// stop channel
stopChan := make(chan os.Signal)
var wg sync.WaitGroup
// go broker.ReadEventFromKafka()
// certstream
c := plugins.CertStreamPlugin{}
if ok := c.Initialize(); !ok {
logrus.Error("Error initialising certstream module")
}
c.Run(&wg)
// pastebin
p := plugins.PastebinPlugin{}
if ok := p.Initialize(); !ok {
logrus.Error("error initialising pastebin module")
}
p.Run(&wg)
// shodan
s := plugins.ShodanPlugin{}
if ok := s.Initialize(); !ok {
logrus.Error("error initialising shodan plugin")
}
p.Run(&wg)
go func() {
<-stopChan
logrus.Println("Shutting down...")
c.Stop(&wg)
p.Stop(&wg)
s.Stop(&wg)
}()
wg.Wait()
fmt.Println("done")
}
// routines
func pastebinRoutine(stopChan chan os.Signal, wg *sync.WaitGroup) {
fmt.Println("pastebin is activated")
elastic := viper.GetBool("elasticsearch.activated")
var e *elasticsearch.ElasticStorageModule
if elastic {
fmt.Println("elasticsearch is activated")
e = &elasticsearch.ElasticStorageModule{
ElasticURL: viper.GetString("elasticsearch.url"),
DailyIndexes: true,
UseIndex: "pastebin",
LastChk: time.Now(),
}
err := e.Initialize()
if err != nil {
panic(err)
}
}
}
func shodanRoutine(client *shodan.Client, shodanChan chan *shodan.HostData, conn *kafka.Conn, stopChan chan os.Signal, wg *sync.WaitGroup) {
fmt.Println("shodan is activated")
for {
select {
default:
banner, ok := <-shodanChan
if !ok {
logrus.Error("channel is closed")
break
}
shodanNode := models.BuildShodanNode(banner)
// first filter poc
if shodanNode.Data.HTML != "" {
if !filters.RunIPFilters(shodanNode.Data.IP) {
hostnames := shodanNode.Data.Hostnames
var hostNotInFilters, domainNotInFilters bool
if len(hostnames) != 0 {
for _, hostname := range hostnames {
hostNotInFilters = filters.RunDomainFilters(hostname)
if hostNotInFilters {
saveSingleValues(conn, "shodan_stream", "hostname", shodanNode.ID, hostname)
}
}
}
domains := shodanNode.Data.Domains
if len(domains) != 0 {
for _, domain := range domains {
domainNotInFilters = filters.RunDomainFilters(domain)
saveSingleValues(conn, "shodan_stream", "domain", shodanNode.ID, domain)
}
}
if domainNotInFilters && hostNotInFilters {
models.SaveShodanNode("raw_shodan.json", shodanNode)
node := models.BuildNode("shodan", "shodan_stream", shodanNode.ID)
models.SaveNode("nodes.json", node)
edge := models.BuildEdge("shodan", shodanNode.ID, node.ID)
models.SaveEdge(edge)
}
} else {
fmt.Println("is akamai", shodanNode.Data.IP)
}
}
case <-stopChan:
wg.Done()
return
}
}
}
// helpers
func saveSingleValues(brokerConn *kafka.Conn, source string, datatype string, originNodeID string, values string) {
domainNode := models.BuildNode(source, datatype, values)
models.SaveNode("nodes.json", domainNode)
if domainNode.Type == "domain" || domainNode.Type == "hostname" {
broker.SendEventToKafka(brokerConn, *domainNode)
}
edge := models.BuildEdge(source, originNodeID, domainNode.ID)
models.SaveEdge(edge)
}