styx/broker/main.go

93 lines
2.3 KiB
Go

package broker
import (
"context"
"encoding/json"
"time"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/balboa"
"gitlab.dcso.lolcat/LABS/styx/models"
)
// SetUpKafkaConnecter builds the connection to Kafka with a timeout.
func SetUpKafkaConnecter() (*kafka.Conn, error) {
Topic := viper.GetString("kafka.topic")
Partition := viper.GetInt("kafka.partition")
Protocol := viper.GetString("kafka.protocol")
Host := viper.GetString("kafka.host")
Port := viper.GetString("kafka.port")
conn, err := kafka.DialLeader(context.Background(), Protocol, Host+":"+Port, Topic, Partition)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(10 * time.Second))
return conn, nil
}
// SendEventToKafka sends a node to the broker.
func SendEventToKafka(conn *kafka.Conn, node models.Node) {
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
packaged, _ := json.Marshal(node)
_, err := conn.WriteMessages(kafka.Message{Value: packaged})
if err != nil {
logrus.Error(err)
}
}
// ReadEventFromKafka read the event sent to Kafka and acts upon it.
func ReadEventFromKafka() {
if !viper.GetBool("kafka.activated") {
return
}
Topic := viper.GetString("kafka.topic")
Partition := viper.GetInt("kafka.partition")
Host := viper.GetString("kafka.host")
Port := viper.GetString("kafka.port")
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{Host + ":" + Port},
Topic: Topic,
Partition: Partition,
MinBytes: 10e3,
MaxBytes: 10e6,
})
r.SetOffset(42)
defer r.Close()
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
logrus.Error("error reading message:", err)
}
c, err := balboa.GetClient()
if err != nil {
logrus.Warn("cannot get balboa client:", err)
}
var node models.Node
json.Unmarshal(m.Value, &node)
if viper.GetBool("balboa.activated") {
if len(node.ID) != 0 {
// TODO: refactor this context
ctx := context.Background()
entries, err := c.GetAllEntries(ctx, node.Data, "", "", int32(1))
if err != nil {
logrus.Error("error from balboa", err)
}
if len(entries) != 0 {
balboaNode := models.BuildBalboaNode(entries)
models.SaveBalboaNode("bnodes.json", balboaNode)
edge := models.BuildEdge("balboa", node.ID, balboaNode.ID)
models.SaveEdge(edge)
}
}
}
}
}