package broker import ( "context" "encoding/json" "time" "github.com/segmentio/kafka-go" "github.com/sirupsen/logrus" "gitlab.dcso.lolcat/LABS/styx/balboa" "gitlab.dcso.lolcat/LABS/styx/models" ) // SetUpKafkaConnecter builds the connection to Kafka with a timeout. func SetUpKafkaConnecter() (*kafka.Conn, error) { topic := "styx" partition := 0 conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) if err != nil { return nil, err } conn.SetDeadline(time.Now().Add(10 * time.Second)) return conn, nil } // SendEventToKafka sends a node to the broker. func SendEventToKafka(conn *kafka.Conn, node models.Node) { conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) packaged, _ := json.Marshal(node) _, err := conn.WriteMessages(kafka.Message{Value: packaged}) if err != nil { logrus.Error(err) } } // ReadEventFromKafka read the event sent to Kafka and acts upon it. func ReadEventFromKafka() { r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "styx", Partition: 0, MinBytes: 10e3, MaxBytes: 10e6, }) r.SetOffset(42) defer r.Close() for { m, err := r.ReadMessage(context.Background()) if err != nil { logrus.Error("error reading message:", err) } c, err := balboa.GetClient() if err != nil { logrus.Warn("cannot get balboa client:", err) } var node models.Node json.Unmarshal(m.Value, &node) if len(node.ID) != 0 { // TODO: refactor this context ctx := context.Background() entries, err := c.GetAllEntries(ctx, node.Data, "", "", int32(1)) if err != nil { logrus.Error("error from balboa", err) } if len(entries) != 0 { balboaNode := models.BuildBalboaNode(entries) models.SaveBalboaNode("bnodes.json", balboaNode) edge := models.BuildEdge("balboa", node.ID, balboaNode.ID) models.SaveEdge(edge) } } } }