From fa28de820e3ee70a2a0b6aa9c9cfa358675b3874 Mon Sep 17 00:00:00 2001
From: Ethan Koenig <ethantkoenig@gmail.com>
Date: Sun, 24 Sep 2017 17:08:48 -0700
Subject: [PATCH] Make indexer code more reusable (#2590)

---
 models/issue_indexer.go    | 37 +++++++++++++++++-----------
 modules/indexer/indexer.go | 49 ++++++++++++++++++++++++++++++++++++++
 modules/indexer/issue.go   | 32 +++++++++----------------
 3 files changed, 83 insertions(+), 35 deletions(-)

diff --git a/models/issue_indexer.go b/models/issue_indexer.go
index 1e14268a0e..b58c9dc2d1 100644
--- a/models/issue_indexer.go
+++ b/models/issue_indexer.go
@@ -25,6 +25,7 @@ func InitIssueIndexer() {
 
 // populateIssueIndexer populate the issue indexer with issue data
 func populateIssueIndexer() error {
+	batch := indexer.IssueIndexerBatch()
 	for page := 1; ; page++ {
 		repos, _, err := Repositories(&SearchRepoOptions{
 			Page:     page,
@@ -34,7 +35,7 @@ func populateIssueIndexer() error {
 			return fmt.Errorf("Repositories: %v", err)
 		}
 		if len(repos) == 0 {
-			return nil
+			return batch.Flush()
 		}
 		for _, repo := range repos {
 			issues, err := Issues(&IssuesOptions{
@@ -42,29 +43,37 @@ func populateIssueIndexer() error {
 				IsClosed: util.OptionalBoolNone,
 				IsPull:   util.OptionalBoolNone,
 			})
-			updates := make([]indexer.IssueIndexerUpdate, len(issues))
-			for i, issue := range issues {
-				updates[i] = issue.update()
+			if err != nil {
+				return err
 			}
-			if err = indexer.BatchUpdateIssues(updates...); err != nil {
-				return fmt.Errorf("BatchUpdate: %v", err)
+			for _, issue := range issues {
+				if err := batch.Add(issue.update()); err != nil {
+					return err
+				}
 			}
 		}
 	}
 }
 
 func processIssueIndexerUpdateQueue() {
+	batch := indexer.IssueIndexerBatch()
 	for {
+		var issueID int64
 		select {
-		case issueID := <-issueIndexerUpdateQueue:
-			issue, err := GetIssueByID(issueID)
-			if err != nil {
-				log.Error(4, "issuesIndexer.Index: %v", err)
-				continue
-			}
-			if err = indexer.UpdateIssue(issue.update()); err != nil {
-				log.Error(4, "issuesIndexer.Index: %v", err)
+		case issueID = <-issueIndexerUpdateQueue:
+		default:
+			// flush whatever updates we currently have, since we
+			// might have to wait a while
+			if err := batch.Flush(); err != nil {
+				log.Error(4, "IssueIndexer: %v", err)
 			}
+			issueID = <-issueIndexerUpdateQueue
+		}
+		issue, err := GetIssueByID(issueID)
+		if err != nil {
+			log.Error(4, "GetIssueByID: %v", err)
+		} else if err = batch.Add(issue.update()); err != nil {
+			log.Error(4, "IssueIndexer: %v", err)
 		}
 	}
 }
diff --git a/modules/indexer/indexer.go b/modules/indexer/indexer.go
index 5ee813412d..d5bdd51f9c 100644
--- a/modules/indexer/indexer.go
+++ b/modules/indexer/indexer.go
@@ -9,6 +9,8 @@ import (
 	"strconv"
 
 	"github.com/blevesearch/bleve"
+	"github.com/blevesearch/bleve/analysis/token/unicodenorm"
+	"github.com/blevesearch/bleve/mapping"
 	"github.com/blevesearch/bleve/search/query"
 )
 
@@ -41,3 +43,50 @@ func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhrase
 	q.Analyzer = analyzer
 	return q
 }
+
+const unicodeNormalizeName = "unicodeNormalize"
+
+func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
+	return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{
+		"type": unicodenorm.Name,
+		"form": unicodenorm.NFC,
+	})
+}
+
+// Update represents an update to an indexer
+type Update interface {
+	addToBatch(batch *bleve.Batch) error
+}
+
+const maxBatchSize = 16
+
+// Batch batch of indexer updates that automatically flushes once it
+// reaches a certain size
+type Batch struct {
+	batch *bleve.Batch
+	index bleve.Index
+}
+
+// Add add update to batch, possibly flushing
+func (batch *Batch) Add(update Update) error {
+	if err := update.addToBatch(batch.batch); err != nil {
+		return err
+	}
+	return batch.flushIfFull()
+}
+
+func (batch *Batch) flushIfFull() error {
+	if batch.batch.Size() >= maxBatchSize {
+		return batch.Flush()
+	}
+	return nil
+}
+
+// Flush manually flush the batch, regardless of its size
+func (batch *Batch) Flush() error {
+	if err := batch.index.Batch(batch.batch); err != nil {
+		return err
+	}
+	batch.batch.Reset()
+	return nil
+}
diff --git a/modules/indexer/issue.go b/modules/indexer/issue.go
index 050a623ce2..62a18e2b3b 100644
--- a/modules/indexer/issue.go
+++ b/modules/indexer/issue.go
@@ -13,7 +13,6 @@ import (
 	"github.com/blevesearch/bleve"
 	"github.com/blevesearch/bleve/analysis/analyzer/custom"
 	"github.com/blevesearch/bleve/analysis/token/lowercase"
-	"github.com/blevesearch/bleve/analysis/token/unicodenorm"
 	"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
 	"github.com/blevesearch/bleve/index/upsidedown"
 )
@@ -35,6 +34,10 @@ type IssueIndexerUpdate struct {
 	Data    *IssueIndexerData
 }
 
+func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error {
+	return batch.Index(indexerID(update.IssueID), update.Data)
+}
+
 const issueIndexerAnalyzer = "issueIndexer"
 
 // InitIssueIndexer initialize issue indexer
@@ -74,17 +77,13 @@ func createIssueIndexer() error {
 	docMapping.AddFieldMappingsAt("Content", textFieldMapping)
 	docMapping.AddFieldMappingsAt("Comments", textFieldMapping)
 
-	const unicodeNormNFC = "unicodeNormNFC"
-	if err := mapping.AddCustomTokenFilter(unicodeNormNFC, map[string]interface{}{
-		"type": unicodenorm.Name,
-		"form": unicodenorm.NFC,
-	}); err != nil {
+	if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
 		return err
 	} else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{
 		"type":          custom.Name,
 		"char_filters":  []string{},
 		"tokenizer":     unicode.Name,
-		"token_filters": []string{unicodeNormNFC, lowercase.Name},
+		"token_filters": []string{unicodeNormalizeName, lowercase.Name},
 	}); err != nil {
 		return err
 	}
@@ -97,21 +96,12 @@ func createIssueIndexer() error {
 	return err
 }
 
-// UpdateIssue update the issue indexer
-func UpdateIssue(update IssueIndexerUpdate) error {
-	return issueIndexer.Index(indexerID(update.IssueID), update.Data)
-}
-
-// BatchUpdateIssues perform a batch update of the issue indexer
-func BatchUpdateIssues(updates ...IssueIndexerUpdate) error {
-	batch := issueIndexer.NewBatch()
-	for _, update := range updates {
-		err := batch.Index(indexerID(update.IssueID), update.Data)
-		if err != nil {
-			return err
-		}
+// IssueIndexerBatch batch to add updates to
+func IssueIndexerBatch() *Batch {
+	return &Batch{
+		batch: issueIndexer.NewBatch(),
+		index: issueIndexer,
 	}
-	return issueIndexer.Batch(batch)
 }
 
 // SearchIssuesByKeyword searches for issues by given conditions.