67aecd65c1
This work extracts fingerprints and domains from CertStream data structure received through the stream. It builds nodes and edges and saves them to the relevant files. It sends this data to Kafka but no logic is implemented at the exit of the broker yet.
59 lines
1.2 KiB
Go
59 lines
1.2 KiB
Go
package broker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go"
|
|
"github.com/sirupsen/logrus"
|
|
"gitlab.dcso.lolcat/LABS/styx/models"
|
|
)
|
|
|
|
func SetUpKafkaConnecter() (*kafka.Conn, error) {
|
|
topic := "styx"
|
|
partition := 0
|
|
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conn.SetDeadline(time.Now().Add(10 * time.Second))
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
func SendEventToKafka(conn *kafka.Conn, node models.Node) {
|
|
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
packaged, _ := json.Marshal(node)
|
|
_, err := conn.WriteMessages(kafka.Message{Value: packaged})
|
|
if err != nil {
|
|
logrus.Error(err)
|
|
}
|
|
|
|
}
|
|
|
|
func ReadEventFromKafka() {
|
|
r := kafka.NewReader(kafka.ReaderConfig{
|
|
Brokers: []string{"localhost:9092"},
|
|
Topic: "styx",
|
|
Partition: 0,
|
|
MinBytes: 10e3,
|
|
MaxBytes: 10e6,
|
|
})
|
|
r.SetOffset(42)
|
|
defer r.Close()
|
|
|
|
for {
|
|
m, err := r.ReadMessage(context.Background())
|
|
if err != nil {
|
|
logrus.Error("error reading message:", err)
|
|
}
|
|
|
|
var node models.Node
|
|
json.Unmarshal(m.Value, &node)
|
|
if len(node.ID) != 0 {
|
|
fmt.Println(node)
|
|
}
|
|
}
|
|
}
|