Moving SendToKafka function to broker package

This commit is contained in:
Christopher Talib 2020-01-17 13:51:11 +01:00
parent e9c065bcc8
commit 06f42fc472
3 changed files with 13 additions and 24 deletions

View file

@ -2,9 +2,11 @@ package broker
import (
"context"
"fmt"
"time"
"github.com/segmentio/kafka-go"
"gitlab.dcso.lolcat/LABS/styx/models"
)
func SetUpKafkaConnecter() (*kafka.Conn, error) {
@ -38,6 +40,12 @@ func ProduceEvent(conn *kafka.Conn, message string) error {
return nil
}
func consumeEvent() {
func SendEventToKafka(conn *kafka.Conn, node models.Node) error {
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err := conn.WriteMessages(kafka.Message{Value: []byte(fmt.Sprintf("%v", node))})
if err != nil {
panic(err)
}
return nil
}

View file

@ -28,7 +28,7 @@ func main() {
}
fmt.Println(node)
err = models.SendEventToKafka(Conn, *node)
err = broker.SendEventToKafka(Conn, *node)
if err != nil {
panic(err)
}

View file

@ -2,13 +2,10 @@ package models
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"time"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
)
@ -50,10 +47,9 @@ func SaveData(flag string, data CertStreamStruct) {
// res := CertStreamStruct{}
// json.Unmarshal(data, &res)
node := &Node{
ID: flag + "--" + uuid.New().String(),
Type: flag,
Data: data,
node, err := BuildNode(flag, data)
if err != nil {
logrus.Error(err)
}
// edge := &Edge{
@ -87,11 +83,6 @@ func SaveData(flag string, data CertStreamStruct) {
}
func BuildNode(flag string, data CertStreamStruct) (*Node, error) {
err := fileExists(nodesFilename)
if err != nil {
return nil, err
}
return &Node{
ID: flag + "--" + uuid.New().String(),
Type: flag,
@ -100,16 +91,6 @@ func BuildNode(flag string, data CertStreamStruct) (*Node, error) {
}
func SendEventToKafka(conn *kafka.Conn, node Node) error {
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err := conn.WriteMessages(kafka.Message{Value: []byte(fmt.Sprintf("%v", node))})
if err != nil {
panic(err)
}
return nil
}
// Helpers
func fileExists(filename string) error {