7785372e3a
This work refactors saving and extracting function from the utils package to the models package as it is a main component of the tool. `utils` will take care of not related to models functions (such as finding the files for example). Also creating unique files for each type of source we are parsing.
60 lines
1.2 KiB
Go
60 lines
1.2 KiB
Go
package broker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go"
|
|
"github.com/sirupsen/logrus"
|
|
"gitlab.dcso.lolcat/LABS/styx/models"
|
|
)
|
|
|
|
func SetUpKafkaConnecter() (*kafka.Conn, error) {
|
|
topic := "styx"
|
|
partition := 0
|
|
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conn.SetDeadline(time.Now().Add(10 * time.Second))
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
func SendEventToKafka(conn *kafka.Conn, node models.Node) {
|
|
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
packaged, _ := json.Marshal(node)
|
|
_, err := conn.WriteMessages(kafka.Message{Value: packaged})
|
|
if err != nil {
|
|
logrus.Error(err)
|
|
}
|
|
|
|
}
|
|
|
|
func ReadEventFromKafka() {
|
|
r := kafka.NewReader(kafka.ReaderConfig{
|
|
Brokers: []string{"localhost:9092"},
|
|
Topic: "styx",
|
|
Partition: 0,
|
|
MinBytes: 10e3,
|
|
MaxBytes: 10e6,
|
|
})
|
|
r.SetOffset(42)
|
|
defer r.Close()
|
|
|
|
for {
|
|
m, err := r.ReadMessage(context.Background())
|
|
if err != nil {
|
|
logrus.Error("error reading message:", err)
|
|
}
|
|
|
|
var node models.Node
|
|
json.Unmarshal(m.Value, &node)
|
|
if len(node.ID) != 0 {
|
|
fmt.Println(node)
|
|
models.SaveNode(&node)
|
|
}
|
|
}
|
|
}
|