2020-01-16 15:56:57 +01:00
|
|
|
package broker
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2020-01-22 15:01:07 +01:00
|
|
|
"encoding/json"
|
2020-01-16 15:56:57 +01:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/segmentio/kafka-go"
|
2020-01-22 15:01:07 +01:00
|
|
|
"github.com/sirupsen/logrus"
|
2020-02-10 16:11:25 +01:00
|
|
|
"github.com/spf13/viper"
|
2020-02-07 17:39:33 +01:00
|
|
|
"gitlab.dcso.lolcat/LABS/styx/balboa"
|
2020-01-17 13:51:11 +01:00
|
|
|
"gitlab.dcso.lolcat/LABS/styx/models"
|
2020-01-16 15:56:57 +01:00
|
|
|
)
|
|
|
|
|
2020-08-24 16:25:49 +02:00
|
|
|
var (
|
|
|
|
Topic string
|
|
|
|
Partition int
|
|
|
|
Protocol string
|
|
|
|
Host string
|
|
|
|
Port string
|
|
|
|
)
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
viper.SetConfigName("config")
|
|
|
|
viper.SetConfigType("yaml")
|
|
|
|
viper.AddConfigPath(".")
|
|
|
|
|
|
|
|
err := viper.ReadInConfig()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
Topic = viper.GetString("kafka.topic")
|
|
|
|
Partition = viper.GetInt("kafka.partition")
|
|
|
|
Protocol = viper.GetString("kafka.protocol")
|
|
|
|
Host = viper.GetString("kafka.host")
|
|
|
|
Port = viper.GetString("kafka.port")
|
|
|
|
}
|
|
|
|
|
2020-02-07 17:39:33 +01:00
|
|
|
// SetUpKafkaConnecter builds the connection to Kafka with a timeout.
|
2020-01-16 15:56:57 +01:00
|
|
|
func SetUpKafkaConnecter() (*kafka.Conn, error) {
|
2020-02-10 16:11:25 +01:00
|
|
|
conn, err := kafka.DialLeader(context.Background(), Protocol, Host+":"+Port, Topic, Partition)
|
2020-01-16 15:56:57 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
conn.SetDeadline(time.Now().Add(10 * time.Second))
|
|
|
|
|
|
|
|
return conn, nil
|
|
|
|
}
|
|
|
|
|
2020-08-24 16:25:49 +02:00
|
|
|
// CreateTopic creates a topic in the Kafka Broker based on a target. Don't
|
|
|
|
// forget to close them as they are channels.
|
|
|
|
func CreateTopic(conn *kafka.Conn, target string) *kafka.Reader {
|
|
|
|
r := kafka.NewReader(kafka.ReaderConfig{
|
|
|
|
Brokers: []string{Host + ":" + Port},
|
|
|
|
Topic: target,
|
|
|
|
Partition: Partition,
|
|
|
|
MinBytes: 10e3,
|
|
|
|
MaxBytes: 10e6,
|
|
|
|
})
|
|
|
|
r.SetOffset(42)
|
|
|
|
|
|
|
|
return r
|
|
|
|
}
|
|
|
|
|
|
|
|
// CloseReader closes the Kafka reader.
|
|
|
|
func CloseReader(r kafka.Reader) {
|
|
|
|
r.Close()
|
|
|
|
}
|
|
|
|
|
2020-02-07 17:39:33 +01:00
|
|
|
// SendEventToKafka sends a node to the broker.
|
2020-01-22 16:33:07 +01:00
|
|
|
func SendEventToKafka(conn *kafka.Conn, node models.Node) {
|
2020-01-17 13:51:11 +01:00
|
|
|
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
2020-01-22 15:01:07 +01:00
|
|
|
packaged, _ := json.Marshal(node)
|
2020-08-24 16:25:49 +02:00
|
|
|
_, err := conn.WriteMessages(kafka.Message{Value: packaged, Topic: node.Type})
|
2020-01-17 13:51:11 +01:00
|
|
|
if err != nil {
|
2020-01-22 16:33:07 +01:00
|
|
|
logrus.Error(err)
|
2020-01-17 13:51:11 +01:00
|
|
|
}
|
2020-01-16 15:56:57 +01:00
|
|
|
|
|
|
|
}
|
2020-01-21 16:50:50 +01:00
|
|
|
|
2020-02-07 17:39:33 +01:00
|
|
|
// ReadEventFromKafka read the event sent to Kafka and acts upon it.
|
2020-01-21 16:50:50 +01:00
|
|
|
func ReadEventFromKafka() {
|
2020-02-14 11:36:47 +01:00
|
|
|
if !viper.GetBool("kafka.activated") {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-01-21 16:50:50 +01:00
|
|
|
r := kafka.NewReader(kafka.ReaderConfig{
|
2020-02-10 16:11:25 +01:00
|
|
|
Brokers: []string{Host + ":" + Port},
|
|
|
|
Topic: Topic,
|
|
|
|
Partition: Partition,
|
2020-01-21 16:50:50 +01:00
|
|
|
MinBytes: 10e3,
|
|
|
|
MaxBytes: 10e6,
|
|
|
|
})
|
|
|
|
r.SetOffset(42)
|
|
|
|
defer r.Close()
|
|
|
|
|
|
|
|
for {
|
|
|
|
m, err := r.ReadMessage(context.Background())
|
|
|
|
if err != nil {
|
2020-01-22 15:01:07 +01:00
|
|
|
logrus.Error("error reading message:", err)
|
2020-01-21 16:50:50 +01:00
|
|
|
}
|
2020-02-19 15:26:47 +01:00
|
|
|
|
|
|
|
var c *balboa.Client
|
|
|
|
if viper.GetBool("balboa.activated") {
|
|
|
|
c, err = balboa.GetClient()
|
|
|
|
if err != nil {
|
|
|
|
logrus.Warn("cannot get balboa client:", err)
|
|
|
|
}
|
2020-02-07 17:39:33 +01:00
|
|
|
}
|
2020-01-21 16:50:50 +01:00
|
|
|
|
2020-01-23 14:36:24 +01:00
|
|
|
var node models.Node
|
2020-01-23 14:38:27 +01:00
|
|
|
json.Unmarshal(m.Value, &node)
|
2020-02-14 11:36:47 +01:00
|
|
|
if viper.GetBool("balboa.activated") {
|
|
|
|
if len(node.ID) != 0 {
|
|
|
|
// TODO: refactor this context
|
|
|
|
ctx := context.Background()
|
2020-05-18 16:09:04 +02:00
|
|
|
entries, err := c.GetAllEntries(ctx, node.NData, "", "", int32(1))
|
2020-02-14 11:36:47 +01:00
|
|
|
if err != nil {
|
|
|
|
logrus.Error("error from balboa", err)
|
|
|
|
}
|
|
|
|
if len(entries) != 0 {
|
|
|
|
balboaNode := models.BuildBalboaNode(entries)
|
|
|
|
models.SaveBalboaNode("bnodes.json", balboaNode)
|
2020-05-18 16:09:04 +02:00
|
|
|
// edge := models.BuildEdge("balboa", node.ID, balboaNode.ID)
|
|
|
|
// models.SaveEdge(edge)
|
2020-02-14 11:36:47 +01:00
|
|
|
}
|
2020-02-07 17:39:33 +01:00
|
|
|
|
2020-02-14 11:36:47 +01:00
|
|
|
}
|
2020-01-23 15:01:08 +01:00
|
|
|
}
|
2020-01-21 16:50:50 +01:00
|
|
|
}
|
|
|
|
}
|
2020-05-18 16:09:04 +02:00
|
|
|
|
|
|
|
// Helpers
|
|
|
|
// func SaveSingleValues(brokerConn *kafka.Conn, source string, datatype string, originNodeID string, values []string) {
|
|
|
|
// for _, value := range values {
|
|
|
|
// domainNode := models.BuildNode(source, datatype, value)
|
|
|
|
// models.SaveNode("nodes.json", domainNode)
|
|
|
|
// if domainNode.Type == "domain" || domainNode.Type == "hostname" {
|
|
|
|
// broker.SendEventToKafka(brokerConn, *domainNode)
|
|
|
|
// }
|
|
|
|
// edge := models.BuildEdge(source, originNodeID, domainNode.ID)
|
|
|
|
// models.SaveEdge(edge)
|
|
|
|
// }
|
|
|
|
// }
|