From e9c065bcc8e4bbe6a54bcef1b2ffaff88f527945 Mon Sep 17 00:00:00 2001 From: Christopher Talib Date: Thu, 16 Jan 2020 15:56:57 +0100 Subject: [PATCH] Setting up connector to Kafka and sending consummed and formatted Nodes to Kafka --- broker/main.go | 43 +++++++++++++++++++++++++++++++++++++++++++ go.mod | 11 +++++++++++ go.sum | 41 +++++++++++++++++++++++++++++++++++++++++ main.go | 24 +++++++++++++++++------- models/main.go | 27 +++++++++++++++++++++++++++ 5 files changed, 139 insertions(+), 7 deletions(-) create mode 100644 broker/main.go diff --git a/broker/main.go b/broker/main.go new file mode 100644 index 0000000..dd905e5 --- /dev/null +++ b/broker/main.go @@ -0,0 +1,43 @@ +package broker + +import ( + "context" + "time" + + "github.com/segmentio/kafka-go" +) + +func SetUpKafkaConnecter() (*kafka.Conn, error) { + topic := "styx" + partition := 0 + conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) + if err != nil { + return nil, err + } + conn.SetDeadline(time.Now().Add(10 * time.Second)) + + return conn, nil +} + +func ProduceEvent(conn *kafka.Conn, message string) error { + // SetWriteDeadline sets the deadline for future Write calls and any + // currently-blocked Write call. Even if write times out, it may return n > 0, + // indicating that some of the data was successfully written. A zero value for t + // means Write will not time out. + conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + + _, err := conn.WriteMessages( + kafka.Message{Value: []byte("one!")}, + kafka.Message{Value: []byte("two!")}, + kafka.Message{Value: []byte("three!")}, + ) + if err != nil { + return nil + } + + return nil +} + +func consumeEvent() { + +} diff --git a/go.mod b/go.mod index 7eb545e..2ece238 100644 --- a/go.mod +++ b/go.mod @@ -4,10 +4,21 @@ go 1.12 require ( github.com/CaliDog/certstream-go v0.0.0-20180219203951-6016c5462366 + github.com/DataDog/zstd v1.4.4 // indirect github.com/google/uuid v1.1.1 github.com/gorilla/websocket v1.4.1 // indirect github.com/jmoiron/jsonq v0.0.0-20150511023944-e874b168d07e github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 + github.com/pierrec/lz4 v2.4.0+incompatible // indirect github.com/pkg/errors v0.9.0 // indirect + github.com/segmentio/kafka-go v0.3.4 github.com/sirupsen/logrus v1.4.2 + golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d // indirect + golang.org/x/mod v0.2.0 // indirect + golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect + golang.org/x/sys v0.0.0-20200116001909-b77594299b42 // indirect + golang.org/x/text v0.3.2 // indirect + golang.org/x/tools v0.0.0-20200116062425-473961ec044c // indirect + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect ) diff --git a/go.sum b/go.sum index 598e00b..553453e 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,13 @@ github.com/CaliDog/certstream-go v0.0.0-20180219203951-6016c5462366 h1:qjPX+NGqyjCTkoQqEctkfhU4C/B4LhFG0ugVLhM7Maw= github.com/CaliDog/certstream-go v0.0.0-20180219203951-6016c5462366/go.mod h1:JBo69gi8JyPpZoLZgmZeXiq4o7Ib2qf2RiIxiWC0oYQ= +github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/DataDog/zstd v1.4.4 h1:+IawcoXhCBylN7ccwdwf8LOH2jKq7NavGpEPanrlTzE= +github.com/DataDog/zstd v1.4.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= @@ -12,14 +18,49 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGi github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4 v2.4.0+incompatible h1:06usnXXDNcPvCHDkmPpkidf4jTc52UKld7UPfqKatY4= +github.com/pierrec/lz4 v2.4.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.9.0 h1:J8lpUdobwIeCI7OiSxHqEwJUKvJwicL5+3v1oe2Yb4k= github.com/pkg/errors v0.9.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.3.4 h1:Mv9AcnCgU14/cU6Vd0wuRdG1FBO0HzXQLnjBduDLy70= +github.com/segmentio/kafka-go v0.3.4/go.mod h1:OT5KXBPbaJJTcvokhWR2KFmm0niEx3mnccTwjmLvSi4= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d h1:2+ZP7EfsZV7Vvmx3TIqSlSzATMkTAKqM14YGFPoSKjI= +golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa h1:F+8P+gmewFQYRk6JoLQLwjBCTu3mcIURZfNkVweuRKA= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200116062425-473961ec044c/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/main.go b/main.go index bf39213..56c4ba4 100644 --- a/main.go +++ b/main.go @@ -4,28 +4,38 @@ import ( "fmt" "github.com/CaliDog/certstream-go" - "github.com/op/go-logging" + "github.com/sirupsen/logrus" + "gitlab.dcso.lolcat/LABS/styx/broker" "gitlab.dcso.lolcat/LABS/styx/models" "gitlab.dcso.lolcat/LABS/styx/utils" ) -var ( - log = logging.MustGetLogger("") -) - func main() { // The false flag specifies that we want heartbeat messages. stream, errStream := certstream.CertStreamEventStream(false) fmt.Println("Starting to get data from CertStream...") + Conn, err := broker.SetUpKafkaConnecter() + if err != nil { + panic(err) + } for { select { case jq := <-stream: if data, err := utils.ExtractCertFromStream(jq); err == nil { - models.SaveData("certStream", *data) + node, err := models.BuildNode("certstream", *data) + if err != nil { + panic(err) + } + fmt.Println(node) + + err = models.SendEventToKafka(Conn, *node) + if err != nil { + panic(err) + } } case err := <-errStream: - log.Error(err) + logrus.Error(err) } } } diff --git a/models/main.go b/models/main.go index e2d777f..ba319d8 100644 --- a/models/main.go +++ b/models/main.go @@ -2,10 +2,13 @@ package models import ( "encoding/json" + "fmt" "io/ioutil" "os" + "time" "github.com/google/uuid" + "github.com/segmentio/kafka-go" "github.com/sirupsen/logrus" ) @@ -83,6 +86,30 @@ func SaveData(flag string, data CertStreamStruct) { // } } +func BuildNode(flag string, data CertStreamStruct) (*Node, error) { + err := fileExists(nodesFilename) + if err != nil { + return nil, err + } + + return &Node{ + ID: flag + "--" + uuid.New().String(), + Type: flag, + Data: data, + }, nil + +} + +func SendEventToKafka(conn *kafka.Conn, node Node) error { + conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + _, err := conn.WriteMessages(kafka.Message{Value: []byte(fmt.Sprintf("%v", node))}) + if err != nil { + panic(err) + } + + return nil +} + // Helpers func fileExists(filename string) error {