84e4937f85
This new work implements the server and the loader in two different binaries allowing the code while updating the IOC list. It updates also the documentation to reflect the new changes.
146 lines
3.5 KiB
Go
146 lines
3.5 KiB
Go
package broker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/spf13/viper"
|
|
"gitlab.dcso.lolcat/LABS/styx/balboa"
|
|
"gitlab.dcso.lolcat/LABS/styx/models"
|
|
)
|
|
|
|
var (
|
|
Topic string
|
|
Partition int
|
|
Protocol string
|
|
Host string
|
|
Port string
|
|
)
|
|
|
|
func init() {
|
|
viper.SetConfigName("config")
|
|
viper.SetConfigType("yaml")
|
|
viper.AddConfigPath(".")
|
|
|
|
err := viper.ReadInConfig()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
Topic = viper.GetString("kafka.topic")
|
|
Partition = viper.GetInt("kafka.partition")
|
|
Protocol = viper.GetString("kafka.protocol")
|
|
Host = viper.GetString("kafka.host")
|
|
Port = viper.GetString("kafka.port")
|
|
}
|
|
|
|
// SetUpKafkaConnecter builds the connection to Kafka with a timeout.
|
|
func SetUpKafkaConnecter() (*kafka.Conn, error) {
|
|
conn, err := kafka.DialLeader(context.Background(), Protocol, Host+":"+Port, Topic, Partition)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conn.SetDeadline(time.Now().Add(10 * time.Second))
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
// CreateTopic creates a topic in the Kafka Broker based on a target. Don't
|
|
// forget to close them as they are channels.
|
|
func CreateTopic(conn *kafka.Conn, target string) *kafka.Reader {
|
|
r := kafka.NewReader(kafka.ReaderConfig{
|
|
Brokers: []string{Host + ":" + Port},
|
|
Topic: target,
|
|
Partition: Partition,
|
|
MinBytes: 10e3,
|
|
MaxBytes: 10e6,
|
|
})
|
|
r.SetOffset(42)
|
|
|
|
return r
|
|
}
|
|
|
|
// CloseReader closes the Kafka reader.
|
|
func CloseReader(r kafka.Reader) {
|
|
r.Close()
|
|
}
|
|
|
|
// SendEventToKafka sends a node to the broker.
|
|
func SendEventToKafka(conn *kafka.Conn, node models.Node) {
|
|
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
packaged, _ := json.Marshal(node)
|
|
_, err := conn.WriteMessages(kafka.Message{Value: packaged, Topic: node.Type})
|
|
if err != nil {
|
|
logrus.Error(err)
|
|
}
|
|
|
|
}
|
|
|
|
// ReadEventFromKafka read the event sent to Kafka and acts upon it.
|
|
func ReadEventFromKafka() {
|
|
if !viper.GetBool("kafka.activated") {
|
|
return
|
|
}
|
|
|
|
r := kafka.NewReader(kafka.ReaderConfig{
|
|
Brokers: []string{Host + ":" + Port},
|
|
Topic: Topic,
|
|
Partition: Partition,
|
|
MinBytes: 10e3,
|
|
MaxBytes: 10e6,
|
|
})
|
|
r.SetOffset(42)
|
|
defer r.Close()
|
|
|
|
for {
|
|
m, err := r.ReadMessage(context.Background())
|
|
if err != nil {
|
|
logrus.Error("error reading message:", err)
|
|
}
|
|
|
|
var c *balboa.Client
|
|
if viper.GetBool("balboa.activated") {
|
|
c, err = balboa.GetClient()
|
|
if err != nil {
|
|
logrus.Warn("cannot get balboa client:", err)
|
|
}
|
|
}
|
|
|
|
var node models.Node
|
|
json.Unmarshal(m.Value, &node)
|
|
if viper.GetBool("balboa.activated") {
|
|
if len(node.ID) != 0 {
|
|
// TODO: refactor this context
|
|
ctx := context.Background()
|
|
entries, err := c.GetAllEntries(ctx, node.NData, "", "", int32(1))
|
|
if err != nil {
|
|
logrus.Error("error from balboa", err)
|
|
}
|
|
if len(entries) != 0 {
|
|
balboaNode := models.BuildBalboaNode(entries)
|
|
models.SaveBalboaNode("bnodes.json", balboaNode)
|
|
// edge := models.BuildEdge("balboa", node.ID, balboaNode.ID)
|
|
// models.SaveEdge(edge)
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Helpers
|
|
// func SaveSingleValues(brokerConn *kafka.Conn, source string, datatype string, originNodeID string, values []string) {
|
|
// for _, value := range values {
|
|
// domainNode := models.BuildNode(source, datatype, value)
|
|
// models.SaveNode("nodes.json", domainNode)
|
|
// if domainNode.Type == "domain" || domainNode.Type == "hostname" {
|
|
// broker.SendEventToKafka(brokerConn, *domainNode)
|
|
// }
|
|
// edge := models.BuildEdge(source, originNodeID, domainNode.ID)
|
|
// models.SaveEdge(edge)
|
|
// }
|
|
// }
|