package broker import ( "context" "encoding/json" "time" "github.com/segmentio/kafka-go" "github.com/sirupsen/logrus" "github.com/spf13/viper" "gitlab.dcso.lolcat/LABS/styx/balboa" "gitlab.dcso.lolcat/LABS/styx/models" ) var ( Topic string Partition int Protocol string Host string Port string ) func init() { viper.SetConfigName("config") viper.SetConfigType("yaml") viper.AddConfigPath(".") err := viper.ReadInConfig() if err != nil { panic(err) } Topic = viper.GetString("kafka.topic") Partition = viper.GetInt("kafka.partition") Protocol = viper.GetString("kafka.protocol") Host = viper.GetString("kafka.host") Port = viper.GetString("kafka.port") } // SetUpKafkaConnecter builds the connection to Kafka with a timeout. func SetUpKafkaConnecter() (*kafka.Conn, error) { conn, err := kafka.DialLeader(context.Background(), Protocol, Host+":"+Port, Topic, Partition) if err != nil { return nil, err } conn.SetDeadline(time.Now().Add(10 * time.Second)) return conn, nil } // CreateTopic creates a topic in the Kafka Broker based on a target. Don't // forget to close them as they are channels. func CreateTopic(conn *kafka.Conn, target string) *kafka.Reader { r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{Host + ":" + Port}, Topic: target, Partition: Partition, MinBytes: 10e3, MaxBytes: 10e6, }) r.SetOffset(42) return r } // CloseReader closes the Kafka reader. func CloseReader(r kafka.Reader) { r.Close() } // SendEventToKafka sends a node to the broker. func SendEventToKafka(conn *kafka.Conn, node models.Node) { conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) packaged, _ := json.Marshal(node) _, err := conn.WriteMessages(kafka.Message{Value: packaged, Topic: node.Type}) if err != nil { logrus.Error(err) } } // ReadEventFromKafka read the event sent to Kafka and acts upon it. func ReadEventFromKafka() { if !viper.GetBool("kafka.activated") { return } r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{Host + ":" + Port}, Topic: Topic, Partition: Partition, MinBytes: 10e3, MaxBytes: 10e6, }) r.SetOffset(42) defer r.Close() for { m, err := r.ReadMessage(context.Background()) if err != nil { logrus.Error("error reading message:", err) } var c *balboa.Client if viper.GetBool("balboa.activated") { c, err = balboa.GetClient() if err != nil { logrus.Warn("cannot get balboa client:", err) } } var node models.Node json.Unmarshal(m.Value, &node) if viper.GetBool("balboa.activated") { if len(node.ID) != 0 { // TODO: refactor this context ctx := context.Background() entries, err := c.GetAllEntries(ctx, node.NData, "", "", int32(1)) if err != nil { logrus.Error("error from balboa", err) } if len(entries) != 0 { balboaNode := models.BuildBalboaNode(entries) models.SaveBalboaNode("bnodes.json", balboaNode) // edge := models.BuildEdge("balboa", node.ID, balboaNode.ID) // models.SaveEdge(edge) } } } } } // Helpers // func SaveSingleValues(brokerConn *kafka.Conn, source string, datatype string, originNodeID string, values []string) { // for _, value := range values { // domainNode := models.BuildNode(source, datatype, value) // models.SaveNode("nodes.json", domainNode) // if domainNode.Type == "domain" || domainNode.Type == "hostname" { // broker.SendEventToKafka(brokerConn, *domainNode) // } // edge := models.BuildEdge(source, originNodeID, domainNode.ID) // models.SaveEdge(edge) // } // }