styx/broker/main.go

146 lines
3.5 KiB
Go
Raw Normal View History

package broker
import (
"context"
"encoding/json"
"time"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/balboa"
"gitlab.dcso.lolcat/LABS/styx/models"
)
var (
Topic string
Partition int
Protocol string
Host string
Port string
)
func init() {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
err := viper.ReadInConfig()
if err != nil {
panic(err)
}
Topic = viper.GetString("kafka.topic")
Partition = viper.GetInt("kafka.partition")
Protocol = viper.GetString("kafka.protocol")
Host = viper.GetString("kafka.host")
Port = viper.GetString("kafka.port")
}
// SetUpKafkaConnecter builds the connection to Kafka with a timeout.
func SetUpKafkaConnecter() (*kafka.Conn, error) {
conn, err := kafka.DialLeader(context.Background(), Protocol, Host+":"+Port, Topic, Partition)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(10 * time.Second))
return conn, nil
}
// CreateTopic creates a topic in the Kafka Broker based on a target. Don't
// forget to close them as they are channels.
func CreateTopic(conn *kafka.Conn, target string) *kafka.Reader {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{Host + ":" + Port},
Topic: target,
Partition: Partition,
MinBytes: 10e3,
MaxBytes: 10e6,
})
r.SetOffset(42)
return r
}
// CloseReader closes the Kafka reader.
func CloseReader(r kafka.Reader) {
r.Close()
}
// SendEventToKafka sends a node to the broker.
2020-01-22 16:33:07 +01:00
func SendEventToKafka(conn *kafka.Conn, node models.Node) {
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
packaged, _ := json.Marshal(node)
_, err := conn.WriteMessages(kafka.Message{Value: packaged, Topic: node.Type})
if err != nil {
2020-01-22 16:33:07 +01:00
logrus.Error(err)
}
}
2020-01-21 16:50:50 +01:00
// ReadEventFromKafka read the event sent to Kafka and acts upon it.
2020-01-21 16:50:50 +01:00
func ReadEventFromKafka() {
if !viper.GetBool("kafka.activated") {
return
}
2020-01-21 16:50:50 +01:00
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{Host + ":" + Port},
Topic: Topic,
Partition: Partition,
2020-01-21 16:50:50 +01:00
MinBytes: 10e3,
MaxBytes: 10e6,
})
r.SetOffset(42)
defer r.Close()
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
logrus.Error("error reading message:", err)
2020-01-21 16:50:50 +01:00
}
var c *balboa.Client
if viper.GetBool("balboa.activated") {
c, err = balboa.GetClient()
if err != nil {
logrus.Warn("cannot get balboa client:", err)
}
}
2020-01-21 16:50:50 +01:00
var node models.Node
json.Unmarshal(m.Value, &node)
if viper.GetBool("balboa.activated") {
if len(node.ID) != 0 {
// TODO: refactor this context
ctx := context.Background()
entries, err := c.GetAllEntries(ctx, node.NData, "", "", int32(1))
if err != nil {
logrus.Error("error from balboa", err)
}
if len(entries) != 0 {
balboaNode := models.BuildBalboaNode(entries)
models.SaveBalboaNode("bnodes.json", balboaNode)
// edge := models.BuildEdge("balboa", node.ID, balboaNode.ID)
// models.SaveEdge(edge)
}
}
2020-01-23 15:01:08 +01:00
}
2020-01-21 16:50:50 +01:00
}
}
// Helpers
// func SaveSingleValues(brokerConn *kafka.Conn, source string, datatype string, originNodeID string, values []string) {
// for _, value := range values {
// domainNode := models.BuildNode(source, datatype, value)
// models.SaveNode("nodes.json", domainNode)
// if domainNode.Type == "domain" || domainNode.Type == "hostname" {
// broker.SendEventToKafka(brokerConn, *domainNode)
// }
// edge := models.BuildEdge(source, originNodeID, domainNode.ID)
// models.SaveEdge(edge)
// }
// }