From 9b99e3dfe04baabdb51917673bc046e33731caca Mon Sep 17 00:00:00 2001
From: Martin Holst Swende <martin@swende.se>
Date: Mon, 26 Apr 2021 18:19:07 +0200
Subject: [PATCH] core/rawdb: fix datarace in freezer (#22728)

The Append / truncate operations were racy. When a datafile reaches 2Gb, a new file is needed. For this operation, we require a writelock, which is not needed in the 99.99% of all cases where the data does fit in the current head-file.

This transition from readlock to writelock was incorrect, and as the readlock was released, a truncate operation could slip in between, and truncate the data. This would have been fine, however, the Append operation continued writing as if no truncation had occurred, e.g writing item 5 where item 0 should reside.

This PR changes the behaviour, so that if when we run into the situation that a new file is needed, it aborts, and retries, this time with a writelock.

The outcome of the situation described above, running on this PR, would instead be that the Append operation exits with a failure.
---
 core/rawdb/freezer_table.go      | 92 ++++++++++++++++++++------------
 core/rawdb/freezer_table_test.go | 58 ++++++++++++++++++--
 2 files changed, 113 insertions(+), 37 deletions(-)

diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go
index b614c10d3..d7bfe18e0 100644
--- a/core/rawdb/freezer_table.go
+++ b/core/rawdb/freezer_table.go
@@ -465,35 +465,59 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
 // Note, this method will *not* flush any data to disk so be sure to explicitly
 // fsync before irreversibly deleting data from the database.
 func (t *freezerTable) Append(item uint64, blob []byte) error {
+	// Encode the blob before the lock portion
+	if !t.noCompression {
+		blob = snappy.Encode(nil, blob)
+	}
 	// Read lock prevents competition with truncate
-	t.lock.RLock()
+	retry, err := t.append(item, blob, false)
+	if err != nil {
+		return err
+	}
+	if retry {
+		// Read lock was insufficient, retry with a writelock
+		_, err = t.append(item, blob, true)
+	}
+	return err
+}
+
+// append injects a binary blob at the end of the freezer table.
+// Normally, inserts do not require holding the write-lock, so it should be invoked with 'wlock' set to
+// false.
+// However, if the data will grown the current file out of bounds, then this
+// method will return 'true, nil', indicating that the caller should retry, this time
+// with 'wlock' set to true.
+func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool, error) {
+	if wlock {
+		t.lock.Lock()
+		defer t.lock.Unlock()
+	} else {
+		t.lock.RLock()
+		defer t.lock.RUnlock()
+	}
 	// Ensure the table is still accessible
 	if t.index == nil || t.head == nil {
-		t.lock.RUnlock()
-		return errClosed
+		return false, errClosed
 	}
 	// Ensure only the next item can be written, nothing else
 	if atomic.LoadUint64(&t.items) != item {
-		t.lock.RUnlock()
-		return fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item)
-	}
-	// Encode the blob and write it into the data file
-	if !t.noCompression {
-		blob = snappy.Encode(nil, blob)
+		return false, fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item)
 	}
-	bLen := uint32(len(blob))
+	bLen := uint32(len(encodedBlob))
 	if t.headBytes+bLen < bLen ||
 		t.headBytes+bLen > t.maxFileSize {
-		// we need a new file, writing would overflow
-		t.lock.RUnlock()
-		t.lock.Lock()
+		// Writing would overflow, so we need to open a new data file.
+		// If we don't already hold the writelock, abort and let the caller
+		// invoke this method a second time.
+		if !wlock {
+			return true, nil
+		}
 		nextID := atomic.LoadUint32(&t.headId) + 1
 		// We open the next file in truncated mode -- if this file already
 		// exists, we need to start over from scratch on it
 		newHead, err := t.openFile(nextID, openFreezerFileTruncated)
 		if err != nil {
-			t.lock.Unlock()
-			return err
+			return false, err
 		}
 		// Close old file, and reopen in RDONLY mode
 		t.releaseFile(t.headId)
@@ -503,13 +527,9 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
 		t.head = newHead
 		atomic.StoreUint32(&t.headBytes, 0)
 		atomic.StoreUint32(&t.headId, nextID)
-		t.lock.Unlock()
-		t.lock.RLock()
 	}
-
-	defer t.lock.RUnlock()
-	if _, err := t.head.Write(blob); err != nil {
-		return err
+	if _, err := t.head.Write(encodedBlob); err != nil {
+		return false, err
 	}
 	newOffset := atomic.AddUint32(&t.headBytes, bLen)
 	idx := indexEntry{
@@ -523,7 +543,7 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
 	t.sizeGauge.Inc(int64(bLen + indexEntrySize))
 
 	atomic.AddUint64(&t.items, 1)
-	return nil
+	return false, nil
 }
 
 // getBounds returns the indexes for the item
@@ -562,44 +582,48 @@ func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) {
 // Retrieve looks up the data offset of an item with the given number and retrieves
 // the raw binary blob from the data file.
 func (t *freezerTable) Retrieve(item uint64) ([]byte, error) {
+	blob, err := t.retrieve(item)
+	if err != nil {
+		return nil, err
+	}
+	if t.noCompression {
+		return blob, nil
+	}
+	return snappy.Decode(nil, blob)
+}
+
+// retrieve looks up the data offset of an item with the given number and retrieves
+// the raw binary blob from the data file. OBS! This method does not decode
+// compressed data.
+func (t *freezerTable) retrieve(item uint64) ([]byte, error) {
 	t.lock.RLock()
+	defer t.lock.RUnlock()
 	// Ensure the table and the item is accessible
 	if t.index == nil || t.head == nil {
-		t.lock.RUnlock()
 		return nil, errClosed
 	}
 	if atomic.LoadUint64(&t.items) <= item {
-		t.lock.RUnlock()
 		return nil, errOutOfBounds
 	}
 	// Ensure the item was not deleted from the tail either
 	if uint64(t.itemOffset) > item {
-		t.lock.RUnlock()
 		return nil, errOutOfBounds
 	}
 	startOffset, endOffset, filenum, err := t.getBounds(item - uint64(t.itemOffset))
 	if err != nil {
-		t.lock.RUnlock()
 		return nil, err
 	}
 	dataFile, exist := t.files[filenum]
 	if !exist {
-		t.lock.RUnlock()
 		return nil, fmt.Errorf("missing data file %d", filenum)
 	}
 	// Retrieve the data itself, decompress and return
 	blob := make([]byte, endOffset-startOffset)
 	if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil {
-		t.lock.RUnlock()
 		return nil, err
 	}
-	t.lock.RUnlock()
 	t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize))
-
-	if t.noCompression {
-		return blob, nil
-	}
-	return snappy.Decode(nil, blob)
+	return blob, nil
 }
 
 // has returns an indicator whether the specified number data
diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go
index b8d3170c6..0df28f236 100644
--- a/core/rawdb/freezer_table_test.go
+++ b/core/rawdb/freezer_table_test.go
@@ -18,10 +18,13 @@ package rawdb
 
 import (
 	"bytes"
+	"encoding/binary"
 	"fmt"
+	"io/ioutil"
 	"math/rand"
 	"os"
 	"path/filepath"
+	"sync"
 	"testing"
 	"time"
 
@@ -637,6 +640,55 @@ func TestOffset(t *testing.T) {
 // 1. have data files d0, d1, d2, d3
 // 2. remove d2,d3
 //
-// However, all 'normal' failure modes arising due to failing to sync() or save a file should be
-// handled already, and the case described above can only (?) happen if an external process/user
-// deletes files from the filesystem.
+// However, all 'normal' failure modes arising due to failing to sync() or save a file
+// should be handled already, and the case described above can only (?) happen if an
+// external process/user deletes files from the filesystem.
+
+// TestAppendTruncateParallel is a test to check if the Append/truncate operations are
+// racy.
+//
+// The reason why it's not a regular fuzzer, within tests/fuzzers, is that it is dependent
+// on timing rather than 'clever' input -- there's no determinism.
+func TestAppendTruncateParallel(t *testing.T) {
+	dir, err := ioutil.TempDir("", "freezer")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(dir)
+
+	f, err := newCustomTable(dir, "tmp", metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, 8, true)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	fill := func(mark uint64) []byte {
+		data := make([]byte, 8)
+		binary.LittleEndian.PutUint64(data, mark)
+		return data
+	}
+
+	for i := 0; i < 5000; i++ {
+		f.truncate(0)
+		data0 := fill(0)
+		f.Append(0, data0)
+		data1 := fill(1)
+
+		var wg sync.WaitGroup
+		wg.Add(2)
+		go func() {
+			f.truncate(0)
+			wg.Done()
+		}()
+		go func() {
+			f.Append(1, data1)
+			wg.Done()
+		}()
+		wg.Wait()
+
+		if have, err := f.Retrieve(0); err == nil {
+			if !bytes.Equal(have, data0) {
+				t.Fatalf("have %x want %x", have, data0)
+			}
+		}
+	}
+}
-- 
GitLab