Cleaning and working write/read to kafka and saving objects

This commit is contained in:
Christopher Talib 2020-01-23 14:36:24 +01:00
parent 2548c19ca4
commit 7cbb7e9180
2 changed files with 10 additions and 12 deletions

View file

@ -3,7 +3,6 @@ package broker
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/segmentio/kafka-go"
@ -26,7 +25,6 @@ func SetUpKafkaConnecter() (*kafka.Conn, error) {
func SendEventToKafka(conn *kafka.Conn, node models.Node) {
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
utils.SaveNode(&node)
packaged, _ := json.Marshal(node)
_, err := conn.WriteMessages(kafka.Message{Value: packaged})
if err != nil {
@ -52,14 +50,14 @@ func ReadEventFromKafka() {
logrus.Error("error reading message:", err)
}
var node map[string]models.Node
var node models.Node
json.Unmarshal(m.Value, &node)
// fmt.Println(string(m.Value))
// utils.SaveNode(node)
fmt.Println("$$$$$$$", node)
err = json.Unmarshal(m.Value, &node)
if err != nil {
logrus.Error("error unmarshalling kafka object:", err)
}
// utils.SaveDomains(node.Data.Data.LeafCert.AllDomains)
utils.SaveNode(&node)
}

View file

@ -10,11 +10,11 @@ import (
// Styx terminology
// (https://docs.google.com/document/d/1dIrh1Lp3KAjEMm8o2VzAmuV0Peu-jt9aAh1IHrjAroM/pub#h.xzbicbtscatx)
type Node struct {
ID string `json:""`
Type string `json:""`
ID string `json:"id"`
Type string `json:"type"`
Data CertStreamStruct `json:"data"`
Created time.Time `json:""`
Modified time.Time `json:""`
Created time.Time `json:"created"`
Modified time.Time `json:"modified"`
}
// Edge defines a relation between two nodes.