Adding a consummer for Kafka, WIP

This commit is contained in:
Christopher Talib 2020-01-21 16:50:50 +01:00
parent 06f42fc472
commit 48cc976595
3 changed files with 101 additions and 77 deletions

View file

@ -49,3 +49,25 @@ func SendEventToKafka(conn *kafka.Conn, node models.Node) error {
return nil
}
func ReadEventFromKafka() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "styx",
Partition: 0,
MinBytes: 10e3,
MaxBytes: 10e6,
})
r.SetOffset(42)
defer r.Close()
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("message at offset %d:%s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
}

31
main.go
View file

@ -2,6 +2,7 @@ package main
import (
"fmt"
"os"
"github.com/CaliDog/certstream-go"
"github.com/sirupsen/logrus"
@ -10,7 +11,22 @@ import (
"gitlab.dcso.lolcat/LABS/styx/utils"
)
const (
nodesFilename = "nodes.json"
edgesFilename = "edges.json"
)
func main() {
// checking that data files exist
err := fileExists(nodesFilename)
if err != nil {
logrus.Error(err)
}
err = fileExists(edgesFilename)
if err != nil {
logrus.Error(err)
}
// The false flag specifies that we want heartbeat messages.
stream, errStream := certstream.CertStreamEventStream(false)
fmt.Println("Starting to get data from CertStream...")
@ -18,6 +34,7 @@ func main() {
if err != nil {
panic(err)
}
go broker.ReadEventFromKafka()
for {
select {
case jq := <-stream:
@ -39,3 +56,17 @@ func main() {
}
}
}
// Helpers
func fileExists(filename string) error {
_, err := os.Stat(filename)
if os.IsNotExist(err) {
_, err := os.Create(filename)
if err != nil {
return err
}
}
return nil
}

View file

@ -1,87 +1,72 @@
package models
import (
"encoding/json"
"io/ioutil"
"os"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
const (
nodesFilename = "nodes.json"
edgesFilename = "edges.json"
)
// SaveData is the main function used to save data. You need to pass a specific
// flag to it and the data recieved. It just saves data, it doesn't filter or
// look for it in the stream.
func SaveData(flag string, data CertStreamStruct) {
err := fileExists(nodesFilename)
if err != nil {
logrus.Error(err)
}
// func SaveData(flag string, data CertStreamStruct) {
// nodeFile, err := ioutil.ReadFile(NodesFilename)
// if err != nil {
// logrus.Error(err)
// }
err = fileExists(edgesFilename)
if err != nil {
logrus.Error(err)
}
// edgeFile, err := ioutil.ReadFile(EdgesFilename)
// if err != nil {
// logrus.Error(err)
// }
nodeFile, err := ioutil.ReadFile(nodesFilename)
if err != nil {
logrus.Error(err)
}
// nodeDatas := []Node{}
// edgeDatas := []Edge{}
edgeFile, err := ioutil.ReadFile(edgesFilename)
if err != nil {
logrus.Error(err)
}
// if err := json.Unmarshal(nodeFile, &nodeDatas); err != nil {
// logrus.Error(err)
// }
// if err := json.Unmarshal(edgeFile, &edgeDatas); err != nil {
// logrus.Error(err)
// }
nodeDatas := []Node{}
edgeDatas := []Edge{}
// // res := CertStreamStruct{}
// // json.Unmarshal(data, &res)
json.Unmarshal(nodeFile, &nodeDatas)
json.Unmarshal(edgeFile, &edgeDatas)
// node, err := BuildNode(flag, data)
// if err != nil {
// logrus.Error(err)
// }
// res := CertStreamStruct{}
// json.Unmarshal(data, &res)
// // edge := &Edge{
// // ID: uuid.New(),
// // NodeOneID: node.ID,
// // Timestamp: time.Now(),
// // }
node, err := BuildNode(flag, data)
if err != nil {
logrus.Error(err)
}
// nodeDatas = append(nodeDatas, *node)
// // edgeDatas = append(edgeDatas, *edge)
// edge := &Edge{
// ID: uuid.New(),
// NodeOneID: node.ID,
// Timestamp: time.Now(),
// }
// nodeBytes, err := json.Marshal(nodeDatas)
// if err != nil {
// logrus.Error(err)
// }
nodeDatas = append(nodeDatas, *node)
// edgeDatas = append(edgeDatas, *edge)
// // edgeBytes, err := json.Marshal(edgeDatas)
// // if err != nil {
// // logrus.Error(err)
// // }
nodeBytes, err := json.Marshal(nodeDatas)
if err != nil {
logrus.Error(err)
}
// err = ioutil.WriteFile(nodesFilename, nodeBytes, 0644)
// if err != nil {
// logrus.Error(err)
// }
// edgeBytes, err := json.Marshal(edgeDatas)
// if err != nil {
// logrus.Error(err)
// }
err = ioutil.WriteFile(nodesFilename, nodeBytes, 0644)
if err != nil {
logrus.Error(err)
}
// err = ioutil.WriteFile(edgesFilename, edgeBytes, 0644)
// if err != nil {
// logrus.Error(err)
// }
}
// // err = ioutil.WriteFile(edgesFilename, edgeBytes, 0644)
// // if err != nil {
// // logrus.Error(err)
// // }
// }
// BuildNode builds a node to send to MQ instance.
func BuildNode(flag string, data CertStreamStruct) (*Node, error) {
return &Node{
ID: flag + "--" + uuid.New().String(),
@ -90,17 +75,3 @@ func BuildNode(flag string, data CertStreamStruct) (*Node, error) {
}, nil
}
// Helpers
func fileExists(filename string) error {
_, err := os.Stat(filename)
if os.IsNotExist(err) {
_, err := os.Create(filename)
if err != nil {
return err
}
}
return nil
}