styx/main.go
2020-02-20 14:53:18 +01:00

231 lines
5.8 KiB
Go

package main
import (
"context"
"fmt"
"os"
"sync"
"time"
"github.com/CaliDog/certstream-go"
"github.com/jmoiron/jsonq"
"github.com/ns3777k/go-shodan/v4/shodan"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/broker"
"gitlab.dcso.lolcat/LABS/styx/elasticsearch"
"gitlab.dcso.lolcat/LABS/styx/filters"
"gitlab.dcso.lolcat/LABS/styx/models"
)
func init() {
// Setting up logging.
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
})
logrus.SetReportCaller(true)
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
}
func main() {
err := viper.ReadInConfig()
if err != nil {
panic(err)
}
os.Setenv("SHODAN_KEY", viper.GetString("shodan.key"))
fmt.Println("Starting to get data from the Internet...")
// The false flag specifies that we want heartbeat messages.
conn, err := broker.SetUpKafkaConnecter()
if err != nil {
panic(err)
}
// stop channel
stopChan := make(chan os.Signal)
var wg sync.WaitGroup
wg.Add(3)
go broker.ReadEventFromKafka()
// certstream
stream, errStream := certstream.CertStreamEventStream(false)
if viper.GetBool("certstream.activated") {
go certstreamRoutine(stream, errStream, conn, stopChan, &wg)
}
// pastebin
if viper.GetBool("pastebin.activated") {
go pastebinRoutine(stopChan, &wg)
}
// shodan
client := shodan.NewEnvClient(nil)
ch := make(chan *shodan.HostData)
err = client.GetBanners(context.Background(), ch)
if err != nil {
logrus.Panic(err)
}
if viper.GetBool("shodan.activated") {
go shodanRoutine(client, ch, conn, stopChan, &wg)
}
wg.Wait()
}
// routines
// CertstreamRoutine wraps the routine for grabbing Certstream data.
func certstreamRoutine(stream chan jsonq.JsonQuery, errStream chan error, conn *kafka.Conn, stopChan chan os.Signal, wg *sync.WaitGroup) {
fmt.Println("certstream is activated")
for {
select {
case jq := <-stream:
if data, err := models.ExtractCertFromStream(jq); err == nil {
allDomains := data.Data.LeafCert.AllDomains
for _, domain := range allDomains {
if filters.RunDomainFilters(domain) {
rawNode := models.WrapCertStreamData(*data)
models.SaveCertStreamRaw("raw_certstream.json", rawNode)
certNode := models.BuildCertNode(rawNode)
models.SaveCertNode("cert_nodes.json", certNode)
mainNode := models.BuildNode("node", "certstream", certNode.ID)
models.SaveNode("nodes.json", mainNode)
edge := models.BuildEdge("certstream", rawNode.ID, mainNode.ID)
models.SaveEdge(edge)
edge = models.BuildEdge("certstream", mainNode.ID, certNode.ID)
models.SaveEdge(edge)
saveSingleValues(conn, "certstream", "domain", certNode.ID, domain)
}
}
}
case err := <-errStream:
logrus.Error(err)
case <-stopChan:
wg.Done()
return
}
}
}
func pastebinRoutine(stopChan chan os.Signal, wg *sync.WaitGroup) {
fmt.Println("pastebin is activated")
elastic := viper.GetBool("elasticsearch.activated")
var e *elasticsearch.ElasticStorageModule
if elastic {
fmt.Println("elasticsearch is activated")
e = &elasticsearch.ElasticStorageModule{
ElasticURL: viper.GetString("elasticsearch.url"),
DailyIndexes: true,
UseIndex: "pastebin",
LastChk: time.Now(),
}
err := e.Initialize()
if err != nil {
panic(err)
}
}
for {
select {
default:
pastes, err := models.QueryPastes()
if err != nil {
logrus.Panic(err)
}
for _, p := range pastes {
paste, err := models.FetchPaste(p)
if err != nil {
logrus.Error("cannot fetch paste", err)
}
fp := models.FullPaste{
Meta: p,
Full: paste,
}
res := models.BuildPasteNode(&fp)
if elastic {
e.StorePaste(fp)
}
models.SavePaste("paste_formatted.json", res)
time.Sleep(1 * time.Second)
}
time.Sleep(3 * time.Second)
case <-stopChan:
wg.Done()
return
}
}
}
func shodanRoutine(client *shodan.Client, shodanChan chan *shodan.HostData, conn *kafka.Conn, stopChan chan os.Signal, wg *sync.WaitGroup) {
fmt.Println("shodan is activated")
for {
select {
default:
banner, ok := <-shodanChan
if !ok {
logrus.Error("channel is closed")
break
}
shodanNode := models.BuildShodanNode(banner)
// first filter poc
if shodanNode.Data.HTML != "" {
if !filters.RunIPFilters(shodanNode.Data.IP) {
hostnames := shodanNode.Data.Hostnames
var hostNotInFilters, domainNotInFilters bool
if len(hostnames) != 0 {
for _, hostname := range hostnames {
hostNotInFilters = filters.RunDomainFilters(hostname)
if hostNotInFilters {
saveSingleValues(conn, "shodan_stream", "hostname", shodanNode.ID, hostname)
}
}
}
domains := shodanNode.Data.Domains
if len(domains) != 0 {
for _, domain := range domains {
domainNotInFilters = filters.RunDomainFilters(domain)
saveSingleValues(conn, "shodan_stream", "domain", shodanNode.ID, domain)
}
}
if domainNotInFilters && hostNotInFilters {
models.SaveShodanNode("raw_shodan.json", shodanNode)
node := models.BuildNode("shodan", "shodan_stream", shodanNode.ID)
models.SaveNode("nodes.json", node)
edge := models.BuildEdge("shodan", shodanNode.ID, node.ID)
models.SaveEdge(edge)
}
} else {
fmt.Println("is akamai", shodanNode.Data.IP)
}
}
case <-stopChan:
wg.Done()
return
}
}
}
// helpers
func saveSingleValues(brokerConn *kafka.Conn, source string, datatype string, originNodeID string, values string) {
domainNode := models.BuildNode(source, datatype, values)
models.SaveNode("nodes.json", domainNode)
if domainNode.Type == "domain" || domainNode.Type == "hostname" {
broker.SendEventToKafka(brokerConn, *domainNode)
}
edge := models.BuildEdge(source, originNodeID, domainNode.ID)
models.SaveEdge(edge)
}