Setting up connector to Kafka and sending consummed and formatted Nodes to Kafka
This commit is contained in:
parent
e6cbfc6466
commit
e9c065bcc8
43
broker/main.go
Normal file
43
broker/main.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/segmentio/kafka-go"
|
||||
)
|
||||
|
||||
func SetUpKafkaConnecter() (*kafka.Conn, error) {
|
||||
topic := "styx"
|
||||
partition := 0
|
||||
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.SetDeadline(time.Now().Add(10 * time.Second))
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func ProduceEvent(conn *kafka.Conn, message string) error {
|
||||
// SetWriteDeadline sets the deadline for future Write calls and any
|
||||
// currently-blocked Write call. Even if write times out, it may return n > 0,
|
||||
// indicating that some of the data was successfully written. A zero value for t
|
||||
// means Write will not time out.
|
||||
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
|
||||
_, err := conn.WriteMessages(
|
||||
kafka.Message{Value: []byte("one!")},
|
||||
kafka.Message{Value: []byte("two!")},
|
||||
kafka.Message{Value: []byte("three!")},
|
||||
)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func consumeEvent() {
|
||||
|
||||
}
|
11
go.mod
11
go.mod
|
@ -4,10 +4,21 @@ go 1.12
|
|||
|
||||
require (
|
||||
github.com/CaliDog/certstream-go v0.0.0-20180219203951-6016c5462366
|
||||
github.com/DataDog/zstd v1.4.4 // indirect
|
||||
github.com/google/uuid v1.1.1
|
||||
github.com/gorilla/websocket v1.4.1 // indirect
|
||||
github.com/jmoiron/jsonq v0.0.0-20150511023944-e874b168d07e
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
github.com/pierrec/lz4 v2.4.0+incompatible // indirect
|
||||
github.com/pkg/errors v0.9.0 // indirect
|
||||
github.com/segmentio/kafka-go v0.3.4
|
||||
github.com/sirupsen/logrus v1.4.2
|
||||
golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d // indirect
|
||||
golang.org/x/mod v0.2.0 // indirect
|
||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
|
||||
golang.org/x/sys v0.0.0-20200116001909-b77594299b42 // indirect
|
||||
golang.org/x/text v0.3.2 // indirect
|
||||
golang.org/x/tools v0.0.0-20200116062425-473961ec044c // indirect
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect
|
||||
)
|
||||
|
|
41
go.sum
41
go.sum
|
@ -1,7 +1,13 @@
|
|||
github.com/CaliDog/certstream-go v0.0.0-20180219203951-6016c5462366 h1:qjPX+NGqyjCTkoQqEctkfhU4C/B4LhFG0ugVLhM7Maw=
|
||||
github.com/CaliDog/certstream-go v0.0.0-20180219203951-6016c5462366/go.mod h1:JBo69gi8JyPpZoLZgmZeXiq4o7Ib2qf2RiIxiWC0oYQ=
|
||||
github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||
github.com/DataDog/zstd v1.4.4 h1:+IawcoXhCBylN7ccwdwf8LOH2jKq7NavGpEPanrlTzE=
|
||||
github.com/DataDog/zstd v1.4.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
|
||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
|
||||
|
@ -12,14 +18,49 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGi
|
|||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88=
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
|
||||
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pierrec/lz4 v2.4.0+incompatible h1:06usnXXDNcPvCHDkmPpkidf4jTc52UKld7UPfqKatY4=
|
||||
github.com/pierrec/lz4 v2.4.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pkg/errors v0.9.0 h1:J8lpUdobwIeCI7OiSxHqEwJUKvJwicL5+3v1oe2Yb4k=
|
||||
github.com/pkg/errors v0.9.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/segmentio/kafka-go v0.3.4 h1:Mv9AcnCgU14/cU6Vd0wuRdG1FBO0HzXQLnjBduDLy70=
|
||||
github.com/segmentio/kafka-go v0.3.4/go.mod h1:OT5KXBPbaJJTcvokhWR2KFmm0niEx3mnccTwjmLvSi4=
|
||||
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
|
||||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
|
||||
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
|
||||
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d h1:2+ZP7EfsZV7Vvmx3TIqSlSzATMkTAKqM14YGFPoSKjI=
|
||||
golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa h1:F+8P+gmewFQYRk6JoLQLwjBCTu3mcIURZfNkVweuRKA=
|
||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg=
|
||||
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200116062425-473961ec044c/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
|
24
main.go
24
main.go
|
@ -4,28 +4,38 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/CaliDog/certstream-go"
|
||||
"github.com/op/go-logging"
|
||||
"github.com/sirupsen/logrus"
|
||||
"gitlab.dcso.lolcat/LABS/styx/broker"
|
||||
"gitlab.dcso.lolcat/LABS/styx/models"
|
||||
"gitlab.dcso.lolcat/LABS/styx/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
log = logging.MustGetLogger("")
|
||||
)
|
||||
|
||||
func main() {
|
||||
// The false flag specifies that we want heartbeat messages.
|
||||
stream, errStream := certstream.CertStreamEventStream(false)
|
||||
fmt.Println("Starting to get data from CertStream...")
|
||||
Conn, err := broker.SetUpKafkaConnecter()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case jq := <-stream:
|
||||
if data, err := utils.ExtractCertFromStream(jq); err == nil {
|
||||
models.SaveData("certStream", *data)
|
||||
node, err := models.BuildNode("certstream", *data)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Println(node)
|
||||
|
||||
err = models.SendEventToKafka(Conn, *node)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
case err := <-errStream:
|
||||
log.Error(err)
|
||||
logrus.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,10 +2,13 @@ package models
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/segmentio/kafka-go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
@ -83,6 +86,30 @@ func SaveData(flag string, data CertStreamStruct) {
|
|||
// }
|
||||
}
|
||||
|
||||
func BuildNode(flag string, data CertStreamStruct) (*Node, error) {
|
||||
err := fileExists(nodesFilename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Node{
|
||||
ID: flag + "--" + uuid.New().String(),
|
||||
Type: flag,
|
||||
Data: data,
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
func SendEventToKafka(conn *kafka.Conn, node Node) error {
|
||||
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
_, err := conn.WriteMessages(kafka.Message{Value: []byte(fmt.Sprintf("%v", node))})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Helpers
|
||||
|
||||
func fileExists(filename string) error {
|
||||
|
|
Loading…
Reference in a new issue