Working pastebin connector with parallel running with certstream

Parallel running of Certstream and Pastebin connectors in go routine
with wait groups to exit them properly.
This commit is contained in:
Christopher Talib 2020-01-30 17:43:10 +01:00
parent b9a2e73e33
commit 0cd0faa6fa
5 changed files with 136 additions and 34 deletions

92
main.go
View file

@ -2,6 +2,9 @@ package main
import (
"fmt"
"os"
"sync"
"time"
"github.com/CaliDog/certstream-go"
"github.com/sirupsen/logrus"
@ -17,37 +20,80 @@ func main() {
if err != nil {
panic(err)
}
// stop channel
stopChan := make(chan os.Signal)
var wg sync.WaitGroup
wg.Add(2)
go broker.ReadEventFromKafka()
// certstream
for {
select {
case jq := <-stream:
if data, err := models.ExtractCertFromStream(jq); err == nil {
go func() {
for {
select {
case jq := <-stream:
if data, err := models.ExtractCertFromStream(jq); err == nil {
rawNode := models.WrapCertStreamData(*data)
models.SaveRaw("raw_certstream.json", rawNode)
rawNode := models.WrapCertStreamData(*data)
models.SaveRaw("raw_certstream.json", rawNode)
fingerprintNode := models.BuildNode("certstream", "fingerprint", data.Data.LeafCert.Fingerprint)
models.SaveNode("nodes.json", fingerprintNode)
models.BuildEdge("certstream", rawNode.ID, fingerprintNode.ID)
fmt.Println(fingerprintNode)
allDomains := data.Data.LeafCert.AllDomains
var edge *models.Edge
for _, domain := range allDomains {
domainNode := models.BuildNode("certstream", "domain", domain)
models.SaveNode("nodes.json", domainNode)
edge = models.BuildEdge("certstream", fingerprintNode.ID, domainNode.ID)
fmt.Println(edge)
models.SaveEdge(edge)
fingerprintNode := models.BuildNode("certstream", "fingerprint", data.Data.LeafCert.Fingerprint)
models.SaveNode("nodes.json", fingerprintNode)
models.BuildEdge("certstream", rawNode.ID, fingerprintNode.ID)
fmt.Println(fingerprintNode)
allDomains := data.Data.LeafCert.AllDomains
var edge *models.Edge
for _, domain := range allDomains {
domainNode := models.BuildNode("certstream", "domain", domain)
models.SaveNode("nodes.json", domainNode)
edge = models.BuildEdge("certstream", fingerprintNode.ID, domainNode.ID)
fmt.Println(edge)
models.SaveEdge(edge)
}
broker.SendEventToKafka(Conn, *fingerprintNode)
}
broker.SendEventToKafka(Conn, *fingerprintNode)
}
case err := <-errStream:
logrus.Error(err)
case err := <-errStream:
logrus.Error(err)
case <-stopChan:
wg.Done()
return
}
}
}
}()
// pastebin
go func() {
for {
select {
default:
fmt.Println("Querying pastebin...")
pastes, err := models.QueryPastes()
if err != nil {
logrus.Panic(err)
}
for _, p := range pastes {
fmt.Println("fetching...", p.FullURL)
paste, err := models.FetchPaste(p)
if err != nil {
logrus.Error("cannot fetch paste", err)
}
fp := models.FullPaste{
Meta: p,
Full: paste,
}
res := models.BuildPasteNode(&fp)
models.SavePaste("paste_formatted.json", res)
time.Sleep(1 * time.Second)
}
time.Sleep(3 * time.Second)
case <-stopChan:
wg.Done()
return
}
}
}()
wg.Wait()
}

View file

@ -7,6 +7,16 @@ import (
"github.com/sirupsen/logrus"
)
// CertStreamWrapper is a wrapper around the stream function to unmarshall the
// data receive in a Go structure.
type CertStreamWrapper struct {
ID string `json:"id"`
Type string `json:"type"`
Data CertStreamStruct `json:"data"`
Created string `json:"created"`
Modified string `json:"modified"`
}
// LeafCertExtensions extends the LeafCert object.
type LeafCertExtensions struct {
KeyUsage string `json:"keyUsage"`

View file

@ -48,6 +48,35 @@ func BuildEdge(source string, nodeOneUUID string, nodeTwoUUID string) *Edge {
}
}
func SavePaste(filename string, data *PasteWrapper) {
err := utils.FileExists(filename)
if err != nil {
logrus.Error(err)
}
pasteFile, err := ioutil.ReadFile(filename)
if err != nil {
logrus.Error(err)
}
rawPaste := []PasteWrapper{}
if err := json.Unmarshal(pasteFile, &rawPaste); err != nil {
logrus.Error(err)
}
rawPaste = append(rawPaste, *data)
rawBytes, err := json.Marshal(rawPaste)
if err != nil {
logrus.Error(err)
}
err = ioutil.WriteFile(filename, rawBytes, 0644)
if err != nil {
logrus.Error(err)
}
}
func SaveRaw(filename string, data *CertStreamWrapper) {
err := utils.FileExists(filename)
if err != nil {

View file

@ -6,10 +6,25 @@ import (
"io/ioutil"
"log"
"net/http"
"time"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
type PasteWrapper struct {
ID string `json:"id"`
Type string `json:"type"`
Data FullPaste `json:"data"`
Created string `json:"create"`
Modified string `json:"modified"`
}
type FullPaste struct {
Meta PasteMeta `json:"meta"`
Full string `json:"full"`
}
// PasteMeta is a set of descriptive information on a paste.
type PasteMeta struct {
ScrapeURL string `json:"scrape_url"`
@ -43,7 +58,7 @@ type PasteFull struct {
// QueryPastes returns metadata for the last 100 public pastes.
func QueryPastes() ([]PasteMeta, error) {
server := "pastebin.com"
server := "scrape.pastebin.com"
req, err := http.NewRequest("GET", fmt.Sprintf("https://%s/api_scraping.php?limit=100", server), nil)
if err != nil {
@ -100,3 +115,15 @@ func FetchPaste(paste PasteMeta) (string, error) {
return string(body), nil
}
func BuildPasteNode(data *FullPaste) *PasteWrapper {
t := time.Now()
rfc3339time := t.Format(time.RFC3339)
return &PasteWrapper{
ID: "pastebin--" + uuid.New().String(),
Type: "pastebin",
Data: *data,
Created: rfc3339time,
Modified: rfc3339time,
}
}

View file

@ -19,13 +19,3 @@ type Edge struct {
Timestamp string `json:"timestamp"`
Source string `json:"source"`
}
// CertStreamWrapper is a wrapper around the stream function to unmarshall the
// data receive in a Go structure.
type CertStreamWrapper struct {
ID string `json:"id"`
Type string `json:"type"`
Data CertStreamStruct `json:"data"`
Created string `json:"created"`
Modified string `json:"modified"`
}