From fe01a9240f18400c2ff2d26cd154874d8d04f949 Mon Sep 17 00:00:00 2001 From: Christopher Talib Date: Wed, 10 Jun 2020 10:48:47 +0200 Subject: [PATCH] Parsing shodan, not droppping DB * Adding Certstream and Shodan matchers * Insert or skip for new matchers (working without having to drop the DB and not more duplicate matchers) * Closing files after using them * Adding Match model to schema and Node (for unmarshalling purposes) --- DEMO.md | 2 +- filters/main.go | 3 + graph/main.go | 16 +++-- matcher/main.go | 166 ++++++++++++++++++++++++++++++++++++++-------- models/main.go | 1 + plugins/shodan.go | 4 +- 6 files changed, 156 insertions(+), 36 deletions(-) diff --git a/DEMO.md b/DEMO.md index edbfc28..853f347 100644 --- a/DEMO.md +++ b/DEMO.md @@ -17,7 +17,7 @@ ## Notes -* Currently only runs matchers on Pastebin data +* There is TOO MUCH junk data * Upsert is not optimal * What do we do with the data so it can be exploitable by analysts * Sould we store matched data in an SQL-like db? diff --git a/filters/main.go b/filters/main.go index a2495d2..176114f 100644 --- a/filters/main.go +++ b/filters/main.go @@ -35,6 +35,8 @@ func RunIPFilters(inputIP string) bool { if err != nil { logrus.Warn("filters#OpenFile#", err) } + defer f.Close() + scanner := bufio.NewScanner(f) for scanner.Scan() { _, ipNet, err := net.ParseCIDR(scanner.Text()) @@ -69,6 +71,7 @@ func RunDomainFilters(domain string) bool { if err != nil { logrus.Warn("filters#OpenFile#", err) } + defer f.Close() scanner := bufio.NewScanner(f) for scanner.Scan() { diff --git a/graph/main.go b/graph/main.go index 48975ed..afcbe3f 100644 --- a/graph/main.go +++ b/graph/main.go @@ -25,14 +25,14 @@ func ConnectToDgraph() (*dgo.Dgraph, error) { } func setupDgraphSchema(c *dgo.Dgraph) error { - err := c.Alter(context.Background(), &api.Operation{ - DropAll: true, - }) - if err != nil { - return err - } + // err := c.Alter(context.Background(), &api.Operation{ + // DropAll: true, + // }) + // if err != nil { + // return err + // } - err = c.Alter(context.Background(), &api.Operation{ + err := c.Alter(context.Background(), &api.Operation{ Schema: ` id: string @index(exact, term) . type: string @index(term) . @@ -47,6 +47,7 @@ modified: string . certNode: uid . shodanNode: uid . pasteNode: uid . +match: uid . type Node { id: string @@ -57,6 +58,7 @@ modified: string certNode: CertNode shodanNode: ShodanNode pasteNode: PasteNode +match: Match } nodes: [uid] . diff --git a/matcher/main.go b/matcher/main.go index 2d66646..ef2cebc 100644 --- a/matcher/main.go +++ b/matcher/main.go @@ -90,34 +90,54 @@ func loadTargets(graphClient *dgo.Dgraph) ([]string, error) { Type: "matcher", } ctx := context.Background() - mu := &api.Mutation{ - CommitNow: true, - } - - pb, err := json.Marshal(matcher) - if err != nil { - logrus.Error(err) - return nil, err - } - - mu.SetJson = pb + query := `query eq($a: string){ +Node(func: eq(target, $a)){ +uid +} + }` txn := graphClient.NewTxn() - defer txn.Discard(ctx) - - _, err = txn.Mutate(ctx, mu) + ret, err := txn.QueryWithVars(ctx, query, map[string]string{"$a": scanner.Text()}) if err != nil { + logrus.Warn(err) + } + + n := Result{} + json.Unmarshal([]byte(ret.Json), &n) + + // Check if the target already exists, if so, skipping not inserting + // the data + if len(n.Result) == 0 { + logrus.Info("new matcher, charging...") + mu := &api.Mutation{ + CommitNow: true, + } + + pb, err := json.Marshal(matcher) + if err != nil { + logrus.Error(err) + return nil, err + } + + mu.SetJson = pb + + txn = graphClient.NewTxn() + defer txn.Discard(ctx) + + _, err = txn.Mutate(ctx, mu) + if err != nil { + logrus.Error(err) + return nil, err + } + + } + + res = append(res, scanner.Text()) + if err := scanner.Err(); err != nil { logrus.Error(err) return nil, err } - res = append(res, scanner.Text()) - } - if err := scanner.Err(); err != nil { - logrus.Error(err) - return nil, err - } - } return res, nil @@ -125,7 +145,7 @@ func loadTargets(graphClient *dgo.Dgraph) ([]string, error) { // Run runs the routine trying to find matches in the ingested Pastebin data. func (m *Matcher) Run(wg *sync.WaitGroup, graphClient *dgo.Dgraph) { - // logrus.Info("loading matcher targets, this might take some time...") + logrus.Info("loading matcher targets, this might take some time...") targets, err := loadTargets(graphClient) if err != nil { logrus.Error(err) @@ -135,9 +155,11 @@ func (m *Matcher) Run(wg *sync.WaitGroup, graphClient *dgo.Dgraph) { if !m.Running { m.StoppedChan = make(chan bool) wg.Add(1) + logrus.Info("Running matchers") for _, target := range targets { - go runPasteMatcher(target, graphClient) + // go runPasteMatcher(target, graphClient) go runCertstreamMatcher(target, graphClient) + go runShodanMatcher(target, graphClient) } // TODO: probably not the best design here @@ -147,7 +169,6 @@ func (m *Matcher) Run(wg *sync.WaitGroup, graphClient *dgo.Dgraph) { } func runPasteMatcher(target string, graphClient *dgo.Dgraph) { - logrus.Info("Running matcher for ", target) for { q := `query allofterms($a: string) { Node(func: allofterms(full, $a)) { @@ -198,7 +219,7 @@ Node(func: allofterms(full, $a)) { query := fmt.Sprintf(`query { match as var(func: eq(target, "%s")) }`, target) - pb, err := json.Marshal(models.Match{UID: "uid(match)", ID: matcher.ID, Target: target, Nodes: matcher.Nodes, Type: "matcher"}) + pb, err := json.Marshal(models.Match{UID: "uid(match)", ID: matcher.ID, Target: target, Nodes: matcher.Nodes, Type: "matcher", Timestamp: rfc3339time}) if err != nil { logrus.Fatal(err) } @@ -226,7 +247,6 @@ Node(func: allofterms(full, $a)) { } func runCertstreamMatcher(target string, graphClient *dgo.Dgraph) { - logrus.Info("Running Certstream matcher") for { q := `query allofterms($a: string){ Node(func: allofterms(cn, $a)){ @@ -304,6 +324,100 @@ Node(func: allofterms(cn, $a)){ } } +func runShodanMatcher(target string, graphClient *dgo.Dgraph) { + for { + q := `query allofterms($a: string){ + Node(func: eq(hostnames, $a)){ + uid + type + hostnames + product + version + title + ip + os + organization + isp + cpe + asn + port + html + banner + link + transport + domains + timestamp + } +} +` + + ctx := context.Background() + txn := graphClient.NewTxn() + res, err := txn.QueryWithVars(ctx, q, map[string]string{"$a": target}) + if err != nil { + logrus.Warn(err) + } + + n := Result{} + json.Unmarshal([]byte(res.Json), &n) + uuid := uuid.New().String() + t := time.Now() + rfc3339time := t.Format(time.RFC3339) + matcher := models.Match{ + ID: uuid, + Timestamp: rfc3339time, + Target: target, + Nodes: []models.Node{}, + Type: "matcher", + } + if len(n.Result) != 0 { + time.Sleep(time.Duration(2) * time.Second) + logrus.Info("Found match for ", target) + // TODO: review time and id to be updated on new resulsts + + for _, res := range n.Result { + if len(matcher.Nodes) == 0 { + matcher.Nodes = append(matcher.Nodes, res) + continue + } + + for _, node := range matcher.Nodes { + if res.UID != node.UID { + matcher.Nodes = append(matcher.Nodes, res) + + } + } + } + + query := fmt.Sprintf(`query { match as var(func: eq(target, "%s")) }`, target) + + pb, err := json.Marshal(models.Match{UID: "uid(match)", ID: matcher.ID, Target: target, Nodes: matcher.Nodes, Type: "matcher", Timestamp: rfc3339time}) + if err != nil { + logrus.Fatal(err) + } + + mu := &api.Mutation{ + SetJson: pb, + } + + req := &api.Request{ + Query: query, + Mutations: []*api.Mutation{mu}, + CommitNow: true, + } + + txn := graphClient.NewTxn() + _, err = txn.Do(ctx, req) + if err != nil { + logrus.Warn(err) + } + // txn.Discard(ctx) + time.Sleep(time.Duration(2) * time.Second) + + } + } +} + // RunDomainMatch looks for a target within the identified IOCs in /matcher/data. // the function could be refactored with RunDomainFilters func RunDomainMatch(domain string) bool { diff --git a/models/main.go b/models/main.go index 316fbf9..49aabe1 100644 --- a/models/main.go +++ b/models/main.go @@ -35,6 +35,7 @@ type Node struct { CertNode CertNode `json:"certNode,omitempty"` ShodanNode ShodanNode `json:"shodanNode,omitempty"` PasteNode PasteNode `json:"pasteNode,omitempty"` + Match Match `json:"match,omitempty"` } // BuildNode builds a node to send to MQ instance. diff --git a/plugins/shodan.go b/plugins/shodan.go index dc27a5a..9decda4 100644 --- a/plugins/shodan.go +++ b/plugins/shodan.go @@ -66,7 +66,7 @@ func (s *ShodanPlugin) doRun(graphClient *dgo.Dgraph) { default: banner, ok := <-s.ShodanChan if !ok { - logrus.Error("channel is closed") + logrus.Error("channel error: ", ok) break } @@ -124,7 +124,7 @@ func (s *ShodanPlugin) doRun(graphClient *dgo.Dgraph) { _, err = graphClient.NewTxn().Mutate(ctx, mu) if err != nil { - logrus.Fatal(err) + logrus.Error(err) } } } else {