styx/broker/main.go
Christopher Talib 84e4937f85 Major version update
This new work implements the server and the loader in two different
binaries allowing the code while updating the IOC list.

It updates also the documentation to reflect the new changes.
2020-08-24 17:20:07 +02:00

146 lines
3.5 KiB
Go

package broker
import (
"context"
"encoding/json"
"time"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gitlab.dcso.lolcat/LABS/styx/balboa"
"gitlab.dcso.lolcat/LABS/styx/models"
)
var (
Topic string
Partition int
Protocol string
Host string
Port string
)
func init() {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
err := viper.ReadInConfig()
if err != nil {
panic(err)
}
Topic = viper.GetString("kafka.topic")
Partition = viper.GetInt("kafka.partition")
Protocol = viper.GetString("kafka.protocol")
Host = viper.GetString("kafka.host")
Port = viper.GetString("kafka.port")
}
// SetUpKafkaConnecter builds the connection to Kafka with a timeout.
func SetUpKafkaConnecter() (*kafka.Conn, error) {
conn, err := kafka.DialLeader(context.Background(), Protocol, Host+":"+Port, Topic, Partition)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(10 * time.Second))
return conn, nil
}
// CreateTopic creates a topic in the Kafka Broker based on a target. Don't
// forget to close them as they are channels.
func CreateTopic(conn *kafka.Conn, target string) *kafka.Reader {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{Host + ":" + Port},
Topic: target,
Partition: Partition,
MinBytes: 10e3,
MaxBytes: 10e6,
})
r.SetOffset(42)
return r
}
// CloseReader closes the Kafka reader.
func CloseReader(r kafka.Reader) {
r.Close()
}
// SendEventToKafka sends a node to the broker.
func SendEventToKafka(conn *kafka.Conn, node models.Node) {
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
packaged, _ := json.Marshal(node)
_, err := conn.WriteMessages(kafka.Message{Value: packaged, Topic: node.Type})
if err != nil {
logrus.Error(err)
}
}
// ReadEventFromKafka read the event sent to Kafka and acts upon it.
func ReadEventFromKafka() {
if !viper.GetBool("kafka.activated") {
return
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{Host + ":" + Port},
Topic: Topic,
Partition: Partition,
MinBytes: 10e3,
MaxBytes: 10e6,
})
r.SetOffset(42)
defer r.Close()
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
logrus.Error("error reading message:", err)
}
var c *balboa.Client
if viper.GetBool("balboa.activated") {
c, err = balboa.GetClient()
if err != nil {
logrus.Warn("cannot get balboa client:", err)
}
}
var node models.Node
json.Unmarshal(m.Value, &node)
if viper.GetBool("balboa.activated") {
if len(node.ID) != 0 {
// TODO: refactor this context
ctx := context.Background()
entries, err := c.GetAllEntries(ctx, node.NData, "", "", int32(1))
if err != nil {
logrus.Error("error from balboa", err)
}
if len(entries) != 0 {
balboaNode := models.BuildBalboaNode(entries)
models.SaveBalboaNode("bnodes.json", balboaNode)
// edge := models.BuildEdge("balboa", node.ID, balboaNode.ID)
// models.SaveEdge(edge)
}
}
}
}
}
// Helpers
// func SaveSingleValues(brokerConn *kafka.Conn, source string, datatype string, originNodeID string, values []string) {
// for _, value := range values {
// domainNode := models.BuildNode(source, datatype, value)
// models.SaveNode("nodes.json", domainNode)
// if domainNode.Type == "domain" || domainNode.Type == "hostname" {
// broker.SendEventToKafka(brokerConn, *domainNode)
// }
// edge := models.BuildEdge(source, originNodeID, domainNode.ID)
// models.SaveEdge(edge)
// }
// }