b1ca4b3c5f
Implementing first version for shodan node, missing yet some models, but the overal approach works and can be queried in Ratel.
110 lines
2.9 KiB
Go
110 lines
2.9 KiB
Go
package broker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/spf13/viper"
|
|
"gitlab.dcso.lolcat/LABS/styx/balboa"
|
|
"gitlab.dcso.lolcat/LABS/styx/models"
|
|
)
|
|
|
|
// SetUpKafkaConnecter builds the connection to Kafka with a timeout.
|
|
func SetUpKafkaConnecter() (*kafka.Conn, error) {
|
|
Topic := viper.GetString("kafka.topic")
|
|
Partition := viper.GetInt("kafka.partition")
|
|
Protocol := viper.GetString("kafka.protocol")
|
|
Host := viper.GetString("kafka.host")
|
|
Port := viper.GetString("kafka.port")
|
|
conn, err := kafka.DialLeader(context.Background(), Protocol, Host+":"+Port, Topic, Partition)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conn.SetDeadline(time.Now().Add(10 * time.Second))
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
// SendEventToKafka sends a node to the broker.
|
|
func SendEventToKafka(conn *kafka.Conn, node models.Node) {
|
|
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
packaged, _ := json.Marshal(node)
|
|
_, err := conn.WriteMessages(kafka.Message{Value: packaged})
|
|
if err != nil {
|
|
logrus.Error(err)
|
|
}
|
|
|
|
}
|
|
|
|
// ReadEventFromKafka read the event sent to Kafka and acts upon it.
|
|
func ReadEventFromKafka() {
|
|
if !viper.GetBool("kafka.activated") {
|
|
return
|
|
}
|
|
|
|
Topic := viper.GetString("kafka.topic")
|
|
Partition := viper.GetInt("kafka.partition")
|
|
Host := viper.GetString("kafka.host")
|
|
Port := viper.GetString("kafka.port")
|
|
r := kafka.NewReader(kafka.ReaderConfig{
|
|
Brokers: []string{Host + ":" + Port},
|
|
Topic: Topic,
|
|
Partition: Partition,
|
|
MinBytes: 10e3,
|
|
MaxBytes: 10e6,
|
|
})
|
|
r.SetOffset(42)
|
|
defer r.Close()
|
|
|
|
for {
|
|
m, err := r.ReadMessage(context.Background())
|
|
if err != nil {
|
|
logrus.Error("error reading message:", err)
|
|
}
|
|
|
|
var c *balboa.Client
|
|
if viper.GetBool("balboa.activated") {
|
|
c, err = balboa.GetClient()
|
|
if err != nil {
|
|
logrus.Warn("cannot get balboa client:", err)
|
|
}
|
|
}
|
|
|
|
var node models.Node
|
|
json.Unmarshal(m.Value, &node)
|
|
if viper.GetBool("balboa.activated") {
|
|
if len(node.ID) != 0 {
|
|
// TODO: refactor this context
|
|
ctx := context.Background()
|
|
entries, err := c.GetAllEntries(ctx, node.NData, "", "", int32(1))
|
|
if err != nil {
|
|
logrus.Error("error from balboa", err)
|
|
}
|
|
if len(entries) != 0 {
|
|
balboaNode := models.BuildBalboaNode(entries)
|
|
models.SaveBalboaNode("bnodes.json", balboaNode)
|
|
// edge := models.BuildEdge("balboa", node.ID, balboaNode.ID)
|
|
// models.SaveEdge(edge)
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Helpers
|
|
// func SaveSingleValues(brokerConn *kafka.Conn, source string, datatype string, originNodeID string, values []string) {
|
|
// for _, value := range values {
|
|
// domainNode := models.BuildNode(source, datatype, value)
|
|
// models.SaveNode("nodes.json", domainNode)
|
|
// if domainNode.Type == "domain" || domainNode.Type == "hostname" {
|
|
// broker.SendEventToKafka(brokerConn, *domainNode)
|
|
// }
|
|
// edge := models.BuildEdge(source, originNodeID, domainNode.ID)
|
|
// models.SaveEdge(edge)
|
|
// }
|
|
// }
|