styx/broker/main.go

62 lines
1.2 KiB
Go
Raw Normal View History

package broker
import (
"context"
"encoding/json"
"time"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
"gitlab.dcso.lolcat/LABS/styx/models"
"gitlab.dcso.lolcat/LABS/styx/utils"
)
func SetUpKafkaConnecter() (*kafka.Conn, error) {
topic := "styx"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(10 * time.Second))
return conn, nil
}
2020-01-22 16:33:07 +01:00
func SendEventToKafka(conn *kafka.Conn, node models.Node) {
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
utils.SaveNode(&node)
packaged, _ := json.Marshal(node)
_, err := conn.WriteMessages(kafka.Message{Value: packaged})
if err != nil {
2020-01-22 16:33:07 +01:00
logrus.Error(err)
}
}
2020-01-21 16:50:50 +01:00
func ReadEventFromKafka() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "styx",
Partition: 0,
MinBytes: 10e3,
MaxBytes: 10e6,
})
r.SetOffset(42)
defer r.Close()
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
logrus.Error("error reading message:", err)
2020-01-21 16:50:50 +01:00
}
var node models.Node
json.Unmarshal(m.Value, &node)
utils.SaveDomains(node.Data.Data.LeafCert.AllDomains)
2020-01-21 16:50:50 +01:00
}
}