Read/write from Kafka, simpler structure with plain data and not pointers

This commit is contained in:
Christopher Talib 2020-01-22 15:01:07 +01:00
parent 48cc976595
commit d33b293e7c
5 changed files with 84 additions and 76 deletions

View file

@ -2,10 +2,12 @@ package broker
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
"gitlab.dcso.lolcat/LABS/styx/models"
)
@ -42,7 +44,8 @@ func ProduceEvent(conn *kafka.Conn, message string) error {
func SendEventToKafka(conn *kafka.Conn, node models.Node) error {
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err := conn.WriteMessages(kafka.Message{Value: []byte(fmt.Sprintf("%v", node))})
packaged, _ := json.Marshal(node)
_, err := conn.WriteMessages(kafka.Message{Value: packaged})
if err != nil {
panic(err)
}
@ -64,10 +67,13 @@ func ReadEventFromKafka() {
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
panic(err)
logrus.Error("error reading message:", err)
}
fmt.Printf("message at offset %d:%s = %s\n", m.Offset, string(m.Key), string(m.Value))
var node models.Node
json.Unmarshal(m.Value, &node)
fmt.Println(node.Data.Data.LeafCert.AllDomains)
}
}

View file

@ -12,18 +12,18 @@ import (
)
const (
nodesFilename = "nodes.json"
edgesFilename = "edges.json"
NodesFilename = "nodes.json"
EdgesFilename = "edges.json"
)
func main() {
// checking that data files exist
err := fileExists(nodesFilename)
err := fileExists(NodesFilename)
if err != nil {
logrus.Error(err)
}
err = fileExists(edgesFilename)
err = fileExists(EdgesFilename)
if err != nil {
logrus.Error(err)
}
@ -43,7 +43,6 @@ func main() {
if err != nil {
panic(err)
}
fmt.Println(node)
err = broker.SendEventToKafka(Conn, *node)
if err != nil {

View file

@ -1,6 +1,8 @@
package models
import (
"time"
"github.com/google/uuid"
)
@ -69,9 +71,11 @@ import (
// BuildNode builds a node to send to MQ instance.
func BuildNode(flag string, data CertStreamStruct) (*Node, error) {
return &Node{
ID: flag + "--" + uuid.New().String(),
Type: flag,
Data: data,
ID: flag + "--" + uuid.New().String(),
Type: flag,
Data: data,
Created: time.Now(),
Modified: time.Now(),
}, nil
}

View file

@ -10,9 +10,11 @@ import (
// Styx terminology
// (https://docs.google.com/document/d/1dIrh1Lp3KAjEMm8o2VzAmuV0Peu-jt9aAh1IHrjAroM/pub#h.xzbicbtscatx)
type Node struct {
ID string `json:"id"`
Type string `json:"type"`
Data CertStreamStruct `json:"data"`
ID string `json:"id"`
Type string `json:"type"`
Data CertStreamStruct `json:"data"`
Created time.Time `json:"created"`
Modified time.Time `json:"modified"`
}
// Edge defines a relation between two nodes.
@ -44,14 +46,14 @@ type LeafCertSubject struct {
}
type LeafCertStruct struct {
Subject *LeafCertSubject `json:"subject"`
Extensions *LeafCertExtensions `json:"extensions"`
NotBefore string `json:"not_before"`
NotAfter string `json:"not_after"`
SerialNumber string `json:"serial_number"`
Fingerprint string `json:"fingerprint"`
AsDer string `json:"as_der"`
AllDomains []string `json:"all_domains"`
Subject LeafCertSubject `json:"subject"`
Extensions LeafCertExtensions `json:"extensions"`
NotBefore int `json:"not_before"`
NotAfter int `json:"not_after"`
SerialNumber string `json:"serial_number"`
Fingerprint string `json:"fingerprint"`
AsDer string `json:"as_der"`
AllDomains []string `json:"all_domains"`
}
type Source struct {
@ -60,15 +62,15 @@ type Source struct {
}
type CertStreamData struct {
UpdateType string `json:"update_type"`
LeafCert *LeafCertStruct `json:"leaf_cert"`
Chain []*LeafCertStruct `json:"chain"`
CertIndex int `json:"cert_index"`
Seen int `json:"seen"`
Source *Source `json:"source"`
UpdateType string `json:"update_type"`
LeafCert LeafCertStruct `json:"leaf_cert"`
Chain []LeafCertStruct `json:"chain"`
CertIndex int `json:"cert_index"`
Seen int `json:"seen"`
Source Source `json:"source"`
}
type CertStreamStruct struct {
MessageType string `json:"message_data"`
Data *CertStreamData `json:"data"`
MessageType string `json:"message_data"`
Data CertStreamData `json:"data"`
}

View file

@ -35,22 +35,22 @@ func ExtractCertFromStream(input jsonq.JsonQuery) (*models.CertStreamStruct, err
certIndex, err := input.Int("data", "cert_index")
seen, err := input.Int("data", "seen")
chain, err := input.ArrayOfObjects("data", "chain")
chainSlice := []*models.LeafCertStruct{}
chainSlice := []models.LeafCertStruct{}
for i := 0; i < len(chain); i++ {
c, err := extractLeafCertChainStruct(input, strconv.Itoa(i))
if err != nil {
panic(err)
logrus.Error("error extractLeafCertChainStruct: ", err)
}
chainSlice = append(chainSlice, &c)
chainSlice = append(chainSlice, c)
}
csd := models.CertStreamData{
UpdateType: updateType,
LeafCert: &leafCertStruct,
LeafCert: leafCertStruct,
Chain: chainSlice,
CertIndex: certIndex,
Seen: seen,
Source: &source,
Source: source,
}
// CertStreamStruct
@ -61,7 +61,7 @@ func ExtractCertFromStream(input jsonq.JsonQuery) (*models.CertStreamStruct, err
res := models.CertStreamStruct{
MessageType: messageType,
Data: &csd,
Data: csd,
}
return &res, nil
@ -70,16 +70,13 @@ func ExtractCertFromStream(input jsonq.JsonQuery) (*models.CertStreamStruct, err
func extractLeafCertChainStruct(input jsonq.JsonQuery, index string) (models.LeafCertStruct, error) {
// LeafCertStruct > Subject
aggregated, err := input.String("data", "chain", index, "subject", "aggregated")
if err != nil {
panic(err)
}
c, err := input.String("data", "chain", index, "subject", "C")
st, err := input.String("data", "chain", index, "subject", "ST")
l, err := input.String("data", "chain", index, "subject", "L")
o, err := input.String("data", "chain", index, "subject", "O")
ou, err := input.String("data", "chain", index, "subject", "OU")
cn, err := input.String("data", "chain", index, "subject", "CN")
aggregated, _ := input.String("data", "chain", index, "subject", "aggregated")
c, _ := input.String("data", "chain", index, "subject", "C")
st, _ := input.String("data", "chain", index, "subject", "ST")
l, _ := input.String("data", "chain", index, "subject", "L")
o, _ := input.String("data", "chain", index, "subject", "O")
ou, _ := input.String("data", "chain", index, "subject", "OU")
cn, _ := input.String("data", "chain", index, "subject", "CN")
subject := models.LeafCertSubject{
Aggregated: aggregated,
@ -92,13 +89,13 @@ func extractLeafCertChainStruct(input jsonq.JsonQuery, index string) (models.Lea
}
// LeafCertStruct > Extensions
keyUsage, err := input.String("data", "chain", index, "extensions", "keyUsage")
extendedKeyUsage, err := input.String("data", "chain", index, "extensions", "extendedKeyUsage")
basicConstrains, err := input.String("data", "chain", index, "extensions", "basicConstrains")
subjectKeyIdentifier, err := input.String("data", "chain", index, "extensions", "subjectKeyIdentifier")
authorityInfoAccess, err := input.String("data", "chain", index, "extensions", "authorityInfoAccess")
subjectAltName, err := input.String("data", "chain", index, "extensions", "subjectAltName")
certificatePolicies, err := input.String("data", "chain", index, "extensions", "certificatePolicies")
keyUsage, _ := input.String("data", "chain", index, "extensions", "keyUsage")
extendedKeyUsage, _ := input.String("data", "chain", index, "extensions", "extendedKeyUsage")
basicConstrains, _ := input.String("data", "chain", index, "extensions", "basicConstrains")
subjectKeyIdentifier, _ := input.String("data", "chain", index, "extensions", "subjectKeyIdentifier")
authorityInfoAccess, _ := input.String("data", "chain", index, "extensions", "authorityInfoAccess")
subjectAltName, _ := input.String("data", "chain", index, "extensions", "subjectAltName")
certificatePolicies, _ := input.String("data", "chain", index, "extensions", "certificatePolicies")
extensions := models.LeafCertExtensions{
KeyUsage: keyUsage,
@ -110,16 +107,16 @@ func extractLeafCertChainStruct(input jsonq.JsonQuery, index string) (models.Lea
CertificatePolicies: certificatePolicies,
}
notBefore, err := input.String("data", "chain", "not_before")
notAfter, err := input.String("data", "chain", "not_after")
serialNumber, err := input.String("data", "chain", "serialNumber")
fingerprint, err := input.String("data", "chain", "fingerprint")
asDer, err := input.String("data", "chain", "as_der")
allDomains, err := input.ArrayOfStrings("data", "chain", "all_domains")
notBefore, _ := input.Int("data", "chain", "not_before")
notAfter, _ := input.Int("data", "chain", "not_after")
serialNumber, _ := input.String("data", "chain", "serialNumber")
fingerprint, _ := input.String("data", "chain", "fingerprint")
asDer, _ := input.String("data", "chain", "as_der")
allDomains, _ := input.ArrayOfStrings("data", "chain", "all_domains")
return models.LeafCertStruct{
Subject: &subject,
Extensions: &extensions,
Subject: subject,
Extensions: extensions,
NotBefore: notBefore,
NotAfter: notAfter,
SerialNumber: serialNumber,
@ -153,13 +150,13 @@ func extractLeafCertStruct(input jsonq.JsonQuery) (models.LeafCertStruct, error)
}
// LeafCertStruct > Extensions
keyUsage, err := input.String("data", "leaf_cert", "extensions", "keyUsage")
extendedKeyUsage, err := input.String("data", "leaf_cert", "extensions", "extendedKeyUsage")
basicConstrains, err := input.String("data", "leaf_cert", "extensions", "basicConstrains")
subjectKeyIdentifier, err := input.String("data", "leaf_cert", "extensions", "subjectKeyIdentifier")
authorityInfoAccess, err := input.String("data", "leaf_cert", "extensions", "authorityInfoAccess")
subjectAltName, err := input.String("data", "leaf_cert", "extensions", "subjectAltName")
certificatePolicies, err := input.String("data", "leaf_cert", "extensions", "certificatePolicies")
keyUsage, _ := input.String("data", "leaf_cert", "extensions", "keyUsage")
extendedKeyUsage, _ := input.String("data", "leaf_cert", "extensions", "extendedKeyUsage")
basicConstrains, _ := input.String("data", "leaf_cert", "extensions", "basicConstrains")
subjectKeyIdentifier, _ := input.String("data", "leaf_cert", "extensions", "subjectKeyIdentifier")
authorityInfoAccess, _ := input.String("data", "leaf_cert", "extensions", "authorityInfoAccess")
subjectAltName, _ := input.String("data", "leaf_cert", "extensions", "subjectAltName")
certificatePolicies, _ := input.String("data", "leaf_cert", "extensions", "certificatePolicies")
extensions := models.LeafCertExtensions{
KeyUsage: keyUsage,
@ -171,16 +168,16 @@ func extractLeafCertStruct(input jsonq.JsonQuery) (models.LeafCertStruct, error)
CertificatePolicies: certificatePolicies,
}
notBefore, err := input.String("data", "leaf_cert", "not_before")
notAfter, err := input.String("data", "leaf_cert", "not_after")
serialNumber, err := input.String("data", "leaf_cert", "serialNumber")
fingerprint, err := input.String("data", "leaf_cert", "fingerprint")
asDer, err := input.String("data", "leaf_cert", "as_der")
allDomains, err := input.ArrayOfStrings("data", "leaf_cert", "all_domains")
notBefore, _ := input.Int("data", "leaf_cert", "not_before")
notAfter, _ := input.Int("data", "leaf_cert", "not_after")
serialNumber, _ := input.String("data", "leaf_cert", "serialNumber")
fingerprint, _ := input.String("data", "leaf_cert", "fingerprint")
asDer, _ := input.String("data", "leaf_cert", "as_der")
allDomains, _ := input.ArrayOfStrings("data", "leaf_cert", "all_domains")
return models.LeafCertStruct{
Subject: &subject,
Extensions: &extensions,
Subject: subject,
Extensions: extensions,
NotBefore: notBefore,
NotAfter: notAfter,
SerialNumber: serialNumber,