diff --git a/swarm/shed/db.go b/swarm/shed/db.go
index 7377e12d2d35d0f33033420b3a61d3f2cd850e10..d4e5d1b2313bad0d5dc34693411d1d1051a06eca 100644
--- a/swarm/shed/db.go
+++ b/swarm/shed/db.go
@@ -18,7 +18,7 @@
 // more complex operations on storage data organized in fields and indexes.
 //
 // Only type which holds logical information about swarm storage chunks data
-// and metadata is IndexItem. This part is not generalized mostly for
+// and metadata is Item. This part is not generalized mostly for
 // performance reasons.
 package shed
 
diff --git a/swarm/shed/example_store_test.go b/swarm/shed/example_store_test.go
index 908a1e446d64676cd13263a738041a8b6104e877..9a83855e70c196def305523fbaaaae8488685475 100644
--- a/swarm/shed/example_store_test.go
+++ b/swarm/shed/example_store_test.go
@@ -71,20 +71,20 @@ func New(path string) (s *Store, err error) {
 	}
 	// Index storing actual chunk address, data and store timestamp.
 	s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
-		EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
+		EncodeKey: func(fields shed.Item) (key []byte, err error) {
 			return fields.Address, nil
 		},
-		DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
+		DecodeKey: func(key []byte) (e shed.Item, err error) {
 			e.Address = key
 			return e, nil
 		},
-		EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
+		EncodeValue: func(fields shed.Item) (value []byte, err error) {
 			b := make([]byte, 8)
 			binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
 			value = append(b, fields.Data...)
 			return value, nil
 		},
-		DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
+		DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
 			e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
 			e.Data = value[8:]
 			return e, nil
@@ -96,19 +96,19 @@ func New(path string) (s *Store, err error) {
 	// Index storing access timestamp for a particular address.
 	// It is needed in order to update gc index keys for iteration order.
 	s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{
-		EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
+		EncodeKey: func(fields shed.Item) (key []byte, err error) {
 			return fields.Address, nil
 		},
-		DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
+		DecodeKey: func(key []byte) (e shed.Item, err error) {
 			e.Address = key
 			return e, nil
 		},
-		EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
+		EncodeValue: func(fields shed.Item) (value []byte, err error) {
 			b := make([]byte, 8)
 			binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp))
 			return b, nil
 		},
-		DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
+		DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
 			e.AccessTimestamp = int64(binary.BigEndian.Uint64(value))
 			return e, nil
 		},
@@ -118,23 +118,23 @@ func New(path string) (s *Store, err error) {
 	}
 	// Index with keys ordered by access timestamp for garbage collection prioritization.
 	s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{
-		EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
+		EncodeKey: func(fields shed.Item) (key []byte, err error) {
 			b := make([]byte, 16, 16+len(fields.Address))
 			binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
 			binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
 			key = append(b, fields.Address...)
 			return key, nil
 		},
-		DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
+		DecodeKey: func(key []byte) (e shed.Item, err error) {
 			e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
 			e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
 			e.Address = key[16:]
 			return e, nil
 		},
-		EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
+		EncodeValue: func(fields shed.Item) (value []byte, err error) {
 			return nil, nil
 		},
-		DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
+		DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
 			return e, nil
 		},
 	})
@@ -146,7 +146,7 @@ func New(path string) (s *Store, err error) {
 
 // Put stores the chunk and sets it store timestamp.
 func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) {
-	return s.retrievalIndex.Put(shed.IndexItem{
+	return s.retrievalIndex.Put(shed.Item{
 		Address:        ch.Address(),
 		Data:           ch.Data(),
 		StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -161,7 +161,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
 	batch := new(leveldb.Batch)
 
 	// Get the chunk data and storage timestamp.
-	item, err := s.retrievalIndex.Get(shed.IndexItem{
+	item, err := s.retrievalIndex.Get(shed.Item{
 		Address: addr,
 	})
 	if err != nil {
@@ -172,13 +172,13 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
 	}
 
 	// Get the chunk access timestamp.
-	accessItem, err := s.accessIndex.Get(shed.IndexItem{
+	accessItem, err := s.accessIndex.Get(shed.Item{
 		Address: addr,
 	})
 	switch err {
 	case nil:
 		// Remove gc index entry if access timestamp is found.
-		err = s.gcIndex.DeleteInBatch(batch, shed.IndexItem{
+		err = s.gcIndex.DeleteInBatch(batch, shed.Item{
 			Address:         item.Address,
 			StoreTimestamp:  accessItem.AccessTimestamp,
 			AccessTimestamp: item.StoreTimestamp,
@@ -197,7 +197,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
 	accessTimestamp := time.Now().UTC().UnixNano()
 
 	// Put new access timestamp in access index.
-	err = s.accessIndex.PutInBatch(batch, shed.IndexItem{
+	err = s.accessIndex.PutInBatch(batch, shed.Item{
 		Address:         addr,
 		AccessTimestamp: accessTimestamp,
 	})
@@ -206,7 +206,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
 	}
 
 	// Put new access timestamp in gc index.
-	err = s.gcIndex.PutInBatch(batch, shed.IndexItem{
+	err = s.gcIndex.PutInBatch(batch, shed.Item{
 		Address:         item.Address,
 		AccessTimestamp: accessTimestamp,
 		StoreTimestamp:  item.StoreTimestamp,
@@ -244,7 +244,7 @@ func (s *Store) CollectGarbage() (err error) {
 		// New batch for a new cg round.
 		trash := new(leveldb.Batch)
 		// Iterate through all index items and break when needed.
-		err = s.gcIndex.IterateAll(func(item shed.IndexItem) (stop bool, err error) {
+		err = s.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
 			// Remove the chunk.
 			err = s.retrievalIndex.DeleteInBatch(trash, item)
 			if err != nil {
@@ -265,7 +265,7 @@ func (s *Store) CollectGarbage() (err error) {
 				return true, nil
 			}
 			return false, nil
-		})
+		}, nil)
 		if err != nil {
 			return err
 		}
diff --git a/swarm/shed/field_uint64.go b/swarm/shed/field_uint64.go
index 80e0069ae438196a27b38de44b574f4aa2860531..0417583ac3159a686fafdd7f382aac9dc0c5b354 100644
--- a/swarm/shed/field_uint64.go
+++ b/swarm/shed/field_uint64.go
@@ -99,6 +99,44 @@ func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error) {
 	return val, nil
 }
 
+// Dec decrements a uint64 value in the database.
+// This operation is not goroutine save.
+// The field is protected from overflow to a negative value.
+func (f Uint64Field) Dec() (val uint64, err error) {
+	val, err = f.Get()
+	if err != nil {
+		if err == leveldb.ErrNotFound {
+			val = 0
+		} else {
+			return 0, err
+		}
+	}
+	if val != 0 {
+		val--
+	}
+	return val, f.Put(val)
+}
+
+// DecInBatch decrements a uint64 value in the batch
+// by retreiving a value from the database, not the same batch.
+// This operation is not goroutine save.
+// The field is protected from overflow to a negative value.
+func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error) {
+	val, err = f.Get()
+	if err != nil {
+		if err == leveldb.ErrNotFound {
+			val = 0
+		} else {
+			return 0, err
+		}
+	}
+	if val != 0 {
+		val--
+	}
+	f.PutInBatch(batch, val)
+	return val, nil
+}
+
 // encode transforms uint64 to 8 byte long
 // slice in big endian encoding.
 func encodeUint64(val uint64) (b []byte) {
diff --git a/swarm/shed/field_uint64_test.go b/swarm/shed/field_uint64_test.go
index 69ade71ba3a71c50c63a8d64d9f73373b132a20c..9462b56dd1846d6a1ca5e73396d98f7293001ddf 100644
--- a/swarm/shed/field_uint64_test.go
+++ b/swarm/shed/field_uint64_test.go
@@ -192,3 +192,109 @@ func TestUint64Field_IncInBatch(t *testing.T) {
 		t.Errorf("got uint64 %v, want %v", got, want)
 	}
 }
+
+// TestUint64Field_Dec validates Dec operation
+// of the Uint64Field.
+func TestUint64Field_Dec(t *testing.T) {
+	db, cleanupFunc := newTestDB(t)
+	defer cleanupFunc()
+
+	counter, err := db.NewUint64Field("counter")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// test overflow protection
+	var want uint64
+	got, err := counter.Dec()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if got != want {
+		t.Errorf("got uint64 %v, want %v", got, want)
+	}
+
+	want = 32
+	err = counter.Put(want)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	want = 31
+	got, err = counter.Dec()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if got != want {
+		t.Errorf("got uint64 %v, want %v", got, want)
+	}
+}
+
+// TestUint64Field_DecInBatch validates DecInBatch operation
+// of the Uint64Field.
+func TestUint64Field_DecInBatch(t *testing.T) {
+	db, cleanupFunc := newTestDB(t)
+	defer cleanupFunc()
+
+	counter, err := db.NewUint64Field("counter")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	batch := new(leveldb.Batch)
+	var want uint64
+	got, err := counter.DecInBatch(batch)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if got != want {
+		t.Errorf("got uint64 %v, want %v", got, want)
+	}
+	err = db.WriteBatch(batch)
+	if err != nil {
+		t.Fatal(err)
+	}
+	got, err = counter.Get()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if got != want {
+		t.Errorf("got uint64 %v, want %v", got, want)
+	}
+
+	batch2 := new(leveldb.Batch)
+	want = 42
+	counter.PutInBatch(batch2, want)
+	err = db.WriteBatch(batch2)
+	if err != nil {
+		t.Fatal(err)
+	}
+	got, err = counter.Get()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if got != want {
+		t.Errorf("got uint64 %v, want %v", got, want)
+	}
+
+	batch3 := new(leveldb.Batch)
+	want = 41
+	got, err = counter.DecInBatch(batch3)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if got != want {
+		t.Errorf("got uint64 %v, want %v", got, want)
+	}
+	err = db.WriteBatch(batch3)
+	if err != nil {
+		t.Fatal(err)
+	}
+	got, err = counter.Get()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if got != want {
+		t.Errorf("got uint64 %v, want %v", got, want)
+	}
+}
diff --git a/swarm/shed/index.go b/swarm/shed/index.go
index ba803e3c23b0e5666fc64a966c6633eb90514a02..df88b1b62dc36f3503ce5d6397253ad11b665885 100644
--- a/swarm/shed/index.go
+++ b/swarm/shed/index.go
@@ -17,22 +17,24 @@
 package shed
 
 import (
+	"bytes"
+
 	"github.com/syndtr/goleveldb/leveldb"
 )
 
-// IndexItem holds fields relevant to Swarm Chunk data and metadata.
+// Item holds fields relevant to Swarm Chunk data and metadata.
 // All information required for swarm storage and operations
 // on that storage must be defined here.
 // This structure is logically connected to swarm storage,
 // the only part of this package that is not generalized,
 // mostly for performance reasons.
 //
-// IndexItem is a type that is used for retrieving, storing and encoding
+// Item is a type that is used for retrieving, storing and encoding
 // chunk data and metadata. It is passed as an argument to Index encoding
 // functions, get function and put function.
 // But it is also returned with additional data from get function call
 // and as the argument in iterator function definition.
-type IndexItem struct {
+type Item struct {
 	Address         []byte
 	Data            []byte
 	AccessTimestamp int64
@@ -43,9 +45,9 @@ type IndexItem struct {
 }
 
 // Merge is a helper method to construct a new
-// IndexItem by filling up fields with default values
-// of a particular IndexItem with values from another one.
-func (i IndexItem) Merge(i2 IndexItem) (new IndexItem) {
+// Item by filling up fields with default values
+// of a particular Item with values from another one.
+func (i Item) Merge(i2 Item) (new Item) {
 	if i.Address == nil {
 		i.Address = i2.Address
 	}
@@ -67,26 +69,26 @@ func (i IndexItem) Merge(i2 IndexItem) (new IndexItem) {
 // Index represents a set of LevelDB key value pairs that have common
 // prefix. It holds functions for encoding and decoding keys and values
 // to provide transparent actions on saved data which inclide:
-// - getting a particular IndexItem
-// - saving a particular IndexItem
+// - getting a particular Item
+// - saving a particular Item
 // - iterating over a sorted LevelDB keys
 // It implements IndexIteratorInterface interface.
 type Index struct {
 	db              *DB
 	prefix          []byte
-	encodeKeyFunc   func(fields IndexItem) (key []byte, err error)
-	decodeKeyFunc   func(key []byte) (e IndexItem, err error)
-	encodeValueFunc func(fields IndexItem) (value []byte, err error)
-	decodeValueFunc func(value []byte) (e IndexItem, err error)
+	encodeKeyFunc   func(fields Item) (key []byte, err error)
+	decodeKeyFunc   func(key []byte) (e Item, err error)
+	encodeValueFunc func(fields Item) (value []byte, err error)
+	decodeValueFunc func(keyFields Item, value []byte) (e Item, err error)
 }
 
 // IndexFuncs structure defines functions for encoding and decoding
 // LevelDB keys and values for a specific index.
 type IndexFuncs struct {
-	EncodeKey   func(fields IndexItem) (key []byte, err error)
-	DecodeKey   func(key []byte) (e IndexItem, err error)
-	EncodeValue func(fields IndexItem) (value []byte, err error)
-	DecodeValue func(value []byte) (e IndexItem, err error)
+	EncodeKey   func(fields Item) (key []byte, err error)
+	DecodeKey   func(key []byte) (e Item, err error)
+	EncodeValue func(fields Item) (value []byte, err error)
+	DecodeValue func(keyFields Item, value []byte) (e Item, err error)
 }
 
 // NewIndex returns a new Index instance with defined name and
@@ -105,7 +107,7 @@ func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) {
 		// by appending the provided index id byte.
 		// This is needed to avoid collisions between keys of different
 		// indexes as all index ids are unique.
-		encodeKeyFunc: func(e IndexItem) (key []byte, err error) {
+		encodeKeyFunc: func(e Item) (key []byte, err error) {
 			key, err = funcs.EncodeKey(e)
 			if err != nil {
 				return nil, err
@@ -115,7 +117,7 @@ func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) {
 		// This function reverses the encodeKeyFunc constructed key
 		// to transparently work with index keys without their index ids.
 		// It assumes that index keys are prefixed with only one byte.
-		decodeKeyFunc: func(key []byte) (e IndexItem, err error) {
+		decodeKeyFunc: func(key []byte) (e Item, err error) {
 			return funcs.DecodeKey(key[1:])
 		},
 		encodeValueFunc: funcs.EncodeValue,
@@ -123,10 +125,10 @@ func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) {
 	}, nil
 }
 
-// Get accepts key fields represented as IndexItem to retrieve a
+// Get accepts key fields represented as Item to retrieve a
 // value from the index and return maximum available information
-// from the index represented as another IndexItem.
-func (f Index) Get(keyFields IndexItem) (out IndexItem, err error) {
+// from the index represented as another Item.
+func (f Index) Get(keyFields Item) (out Item, err error) {
 	key, err := f.encodeKeyFunc(keyFields)
 	if err != nil {
 		return out, err
@@ -135,16 +137,16 @@ func (f Index) Get(keyFields IndexItem) (out IndexItem, err error) {
 	if err != nil {
 		return out, err
 	}
-	out, err = f.decodeValueFunc(value)
+	out, err = f.decodeValueFunc(keyFields, value)
 	if err != nil {
 		return out, err
 	}
 	return out.Merge(keyFields), nil
 }
 
-// Put accepts IndexItem to encode information from it
+// Put accepts Item to encode information from it
 // and save it to the database.
-func (f Index) Put(i IndexItem) (err error) {
+func (f Index) Put(i Item) (err error) {
 	key, err := f.encodeKeyFunc(i)
 	if err != nil {
 		return err
@@ -159,7 +161,7 @@ func (f Index) Put(i IndexItem) (err error) {
 // PutInBatch is the same as Put method, but it just
 // saves the key/value pair to the batch instead
 // directly to the database.
-func (f Index) PutInBatch(batch *leveldb.Batch, i IndexItem) (err error) {
+func (f Index) PutInBatch(batch *leveldb.Batch, i Item) (err error) {
 	key, err := f.encodeKeyFunc(i)
 	if err != nil {
 		return err
@@ -172,9 +174,9 @@ func (f Index) PutInBatch(batch *leveldb.Batch, i IndexItem) (err error) {
 	return nil
 }
 
-// Delete accepts IndexItem to remove a key/value pair
+// Delete accepts Item to remove a key/value pair
 // from the database based on its fields.
-func (f Index) Delete(keyFields IndexItem) (err error) {
+func (f Index) Delete(keyFields Item) (err error) {
 	key, err := f.encodeKeyFunc(keyFields)
 	if err != nil {
 		return err
@@ -184,7 +186,7 @@ func (f Index) Delete(keyFields IndexItem) (err error) {
 
 // DeleteInBatch is the same as Delete just the operation
 // is performed on the batch instead on the database.
-func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields IndexItem) (err error) {
+func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields Item) (err error) {
 	key, err := f.encodeKeyFunc(keyFields)
 	if err != nil {
 		return err
@@ -193,32 +195,71 @@ func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields IndexItem) (err err
 	return nil
 }
 
-// IndexIterFunc is a callback on every IndexItem that is decoded
+// IndexIterFunc is a callback on every Item that is decoded
 // by iterating on an Index keys.
 // By returning a true for stop variable, iteration will
 // stop, and by returning the error, that error will be
 // propagated to the called iterator method on Index.
-type IndexIterFunc func(item IndexItem) (stop bool, err error)
+type IndexIterFunc func(item Item) (stop bool, err error)
+
+// IterateOptions defines optional parameters for Iterate function.
+type IterateOptions struct {
+	// StartFrom is the Item to start the iteration from.
+	StartFrom *Item
+	// If SkipStartFromItem is true, StartFrom item will not
+	// be iterated on.
+	SkipStartFromItem bool
+	// Iterate over items which keys have a common prefix.
+	Prefix []byte
+}
 
-// IterateAll iterates over all keys of the Index.
-func (f Index) IterateAll(fn IndexIterFunc) (err error) {
+// Iterate function iterates over keys of the Index.
+// If IterateOptions is nil, the iterations is over all keys.
+func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
+	if options == nil {
+		options = new(IterateOptions)
+	}
+	// construct a prefix with Index prefix and optional common key prefix
+	prefix := append(f.prefix, options.Prefix...)
+	// start from the prefix
+	startKey := prefix
+	if options.StartFrom != nil {
+		// start from the provided StartFrom Item key value
+		startKey, err = f.encodeKeyFunc(*options.StartFrom)
+		if err != nil {
+			return err
+		}
+	}
 	it := f.db.NewIterator()
 	defer it.Release()
 
-	for ok := it.Seek(f.prefix); ok; ok = it.Next() {
+	// move the cursor to the start key
+	ok := it.Seek(startKey)
+	if !ok {
+		// stop iterator if seek has failed
+		return it.Error()
+	}
+	if options.SkipStartFromItem && bytes.Equal(startKey, it.Key()) {
+		// skip the start from Item if it is the first key
+		// and it is explicitly configured to skip it
+		ok = it.Next()
+	}
+	for ; ok; ok = it.Next() {
 		key := it.Key()
-		if key[0] != f.prefix[0] {
+		if !bytes.HasPrefix(key, prefix) {
 			break
 		}
-		keyIndexItem, err := f.decodeKeyFunc(key)
+		// create a copy of key byte slice not to share leveldb underlaying slice array
+		keyItem, err := f.decodeKeyFunc(append([]byte(nil), key...))
 		if err != nil {
 			return err
 		}
-		valueIndexItem, err := f.decodeValueFunc(it.Value())
+		// create a copy of value byte slice not to share leveldb underlaying slice array
+		valueItem, err := f.decodeValueFunc(keyItem, append([]byte(nil), it.Value()...))
 		if err != nil {
 			return err
 		}
-		stop, err := fn(keyIndexItem.Merge(valueIndexItem))
+		stop, err := fn(keyItem.Merge(valueItem))
 		if err != nil {
 			return err
 		}
@@ -229,12 +270,27 @@ func (f Index) IterateAll(fn IndexIterFunc) (err error) {
 	return it.Error()
 }
 
-// IterateFrom iterates over Index keys starting from the key
-// encoded from the provided IndexItem.
-func (f Index) IterateFrom(start IndexItem, fn IndexIterFunc) (err error) {
+// Count returns the number of items in index.
+func (f Index) Count() (count int, err error) {
+	it := f.db.NewIterator()
+	defer it.Release()
+
+	for ok := it.Seek(f.prefix); ok; ok = it.Next() {
+		key := it.Key()
+		if key[0] != f.prefix[0] {
+			break
+		}
+		count++
+	}
+	return count, it.Error()
+}
+
+// CountFrom returns the number of items in index keys
+// starting from the key encoded from the provided Item.
+func (f Index) CountFrom(start Item) (count int, err error) {
 	startKey, err := f.encodeKeyFunc(start)
 	if err != nil {
-		return err
+		return 0, err
 	}
 	it := f.db.NewIterator()
 	defer it.Release()
@@ -244,21 +300,7 @@ func (f Index) IterateFrom(start IndexItem, fn IndexIterFunc) (err error) {
 		if key[0] != f.prefix[0] {
 			break
 		}
-		keyIndexItem, err := f.decodeKeyFunc(key)
-		if err != nil {
-			return err
-		}
-		valueIndexItem, err := f.decodeValueFunc(it.Value())
-		if err != nil {
-			return err
-		}
-		stop, err := fn(keyIndexItem.Merge(valueIndexItem))
-		if err != nil {
-			return err
-		}
-		if stop {
-			break
-		}
+		count++
 	}
-	return it.Error()
+	return count, it.Error()
 }
diff --git a/swarm/shed/index_test.go b/swarm/shed/index_test.go
index ba82216dfeb9c5c0b0bd5c8835e68e299a498f80..97d7c91f439f95934b43cc328ee8e8633c95c4b3 100644
--- a/swarm/shed/index_test.go
+++ b/swarm/shed/index_test.go
@@ -29,20 +29,20 @@ import (
 
 // Index functions for the index that is used in tests in this file.
 var retrievalIndexFuncs = IndexFuncs{
-	EncodeKey: func(fields IndexItem) (key []byte, err error) {
+	EncodeKey: func(fields Item) (key []byte, err error) {
 		return fields.Address, nil
 	},
-	DecodeKey: func(key []byte) (e IndexItem, err error) {
+	DecodeKey: func(key []byte) (e Item, err error) {
 		e.Address = key
 		return e, nil
 	},
-	EncodeValue: func(fields IndexItem) (value []byte, err error) {
+	EncodeValue: func(fields Item) (value []byte, err error) {
 		b := make([]byte, 8)
 		binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
 		value = append(b, fields.Data...)
 		return value, nil
 	},
-	DecodeValue: func(value []byte) (e IndexItem, err error) {
+	DecodeValue: func(keyItem Item, value []byte) (e Item, err error) {
 		e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
 		e.Data = value[8:]
 		return e, nil
@@ -60,7 +60,7 @@ func TestIndex(t *testing.T) {
 	}
 
 	t.Run("put", func(t *testing.T) {
-		want := IndexItem{
+		want := Item{
 			Address:        []byte("put-hash"),
 			Data:           []byte("DATA"),
 			StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -70,16 +70,16 @@ func TestIndex(t *testing.T) {
 		if err != nil {
 			t.Fatal(err)
 		}
-		got, err := index.Get(IndexItem{
+		got, err := index.Get(Item{
 			Address: want.Address,
 		})
 		if err != nil {
 			t.Fatal(err)
 		}
-		checkIndexItem(t, got, want)
+		checkItem(t, got, want)
 
 		t.Run("overwrite", func(t *testing.T) {
-			want := IndexItem{
+			want := Item{
 				Address:        []byte("put-hash"),
 				Data:           []byte("New DATA"),
 				StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -89,18 +89,18 @@ func TestIndex(t *testing.T) {
 			if err != nil {
 				t.Fatal(err)
 			}
-			got, err := index.Get(IndexItem{
+			got, err := index.Get(Item{
 				Address: want.Address,
 			})
 			if err != nil {
 				t.Fatal(err)
 			}
-			checkIndexItem(t, got, want)
+			checkItem(t, got, want)
 		})
 	})
 
 	t.Run("put in batch", func(t *testing.T) {
-		want := IndexItem{
+		want := Item{
 			Address:        []byte("put-in-batch-hash"),
 			Data:           []byte("DATA"),
 			StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -112,16 +112,16 @@ func TestIndex(t *testing.T) {
 		if err != nil {
 			t.Fatal(err)
 		}
-		got, err := index.Get(IndexItem{
+		got, err := index.Get(Item{
 			Address: want.Address,
 		})
 		if err != nil {
 			t.Fatal(err)
 		}
-		checkIndexItem(t, got, want)
+		checkItem(t, got, want)
 
 		t.Run("overwrite", func(t *testing.T) {
-			want := IndexItem{
+			want := Item{
 				Address:        []byte("put-in-batch-hash"),
 				Data:           []byte("New DATA"),
 				StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -133,13 +133,13 @@ func TestIndex(t *testing.T) {
 			if err != nil {
 				t.Fatal(err)
 			}
-			got, err := index.Get(IndexItem{
+			got, err := index.Get(Item{
 				Address: want.Address,
 			})
 			if err != nil {
 				t.Fatal(err)
 			}
-			checkIndexItem(t, got, want)
+			checkItem(t, got, want)
 		})
 	})
 
@@ -150,13 +150,13 @@ func TestIndex(t *testing.T) {
 		address := []byte("put-in-batch-twice-hash")
 
 		// put the first item
-		index.PutInBatch(batch, IndexItem{
+		index.PutInBatch(batch, Item{
 			Address:        address,
 			Data:           []byte("DATA"),
 			StoreTimestamp: time.Now().UTC().UnixNano(),
 		})
 
-		want := IndexItem{
+		want := Item{
 			Address:        address,
 			Data:           []byte("New DATA"),
 			StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -168,17 +168,17 @@ func TestIndex(t *testing.T) {
 		if err != nil {
 			t.Fatal(err)
 		}
-		got, err := index.Get(IndexItem{
+		got, err := index.Get(Item{
 			Address: address,
 		})
 		if err != nil {
 			t.Fatal(err)
 		}
-		checkIndexItem(t, got, want)
+		checkItem(t, got, want)
 	})
 
 	t.Run("delete", func(t *testing.T) {
-		want := IndexItem{
+		want := Item{
 			Address:        []byte("delete-hash"),
 			Data:           []byte("DATA"),
 			StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -188,15 +188,15 @@ func TestIndex(t *testing.T) {
 		if err != nil {
 			t.Fatal(err)
 		}
-		got, err := index.Get(IndexItem{
+		got, err := index.Get(Item{
 			Address: want.Address,
 		})
 		if err != nil {
 			t.Fatal(err)
 		}
-		checkIndexItem(t, got, want)
+		checkItem(t, got, want)
 
-		err = index.Delete(IndexItem{
+		err = index.Delete(Item{
 			Address: want.Address,
 		})
 		if err != nil {
@@ -204,7 +204,7 @@ func TestIndex(t *testing.T) {
 		}
 
 		wantErr := leveldb.ErrNotFound
-		got, err = index.Get(IndexItem{
+		got, err = index.Get(Item{
 			Address: want.Address,
 		})
 		if err != wantErr {
@@ -213,7 +213,7 @@ func TestIndex(t *testing.T) {
 	})
 
 	t.Run("delete in batch", func(t *testing.T) {
-		want := IndexItem{
+		want := Item{
 			Address:        []byte("delete-in-batch-hash"),
 			Data:           []byte("DATA"),
 			StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -223,16 +223,16 @@ func TestIndex(t *testing.T) {
 		if err != nil {
 			t.Fatal(err)
 		}
-		got, err := index.Get(IndexItem{
+		got, err := index.Get(Item{
 			Address: want.Address,
 		})
 		if err != nil {
 			t.Fatal(err)
 		}
-		checkIndexItem(t, got, want)
+		checkItem(t, got, want)
 
 		batch := new(leveldb.Batch)
-		index.DeleteInBatch(batch, IndexItem{
+		index.DeleteInBatch(batch, Item{
 			Address: want.Address,
 		})
 		err = db.WriteBatch(batch)
@@ -241,7 +241,7 @@ func TestIndex(t *testing.T) {
 		}
 
 		wantErr := leveldb.ErrNotFound
-		got, err = index.Get(IndexItem{
+		got, err = index.Get(Item{
 			Address: want.Address,
 		})
 		if err != wantErr {
@@ -250,8 +250,9 @@ func TestIndex(t *testing.T) {
 	})
 }
 
-// TestIndex_iterate validates index iterator functions for correctness.
-func TestIndex_iterate(t *testing.T) {
+// TestIndex_Iterate validates index Iterate
+// functions for correctness.
+func TestIndex_Iterate(t *testing.T) {
 	db, cleanupFunc := newTestDB(t)
 	defer cleanupFunc()
 
@@ -260,7 +261,7 @@ func TestIndex_iterate(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	items := []IndexItem{
+	items := []Item{
 		{
 			Address: []byte("iterate-hash-01"),
 			Data:    []byte("data80"),
@@ -290,7 +291,7 @@ func TestIndex_iterate(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	item04 := IndexItem{
+	item04 := Item{
 		Address: []byte("iterate-hash-04"),
 		Data:    []byte("data0"),
 	}
@@ -306,31 +307,53 @@ func TestIndex_iterate(t *testing.T) {
 
 	t.Run("all", func(t *testing.T) {
 		var i int
-		err := index.IterateAll(func(item IndexItem) (stop bool, err error) {
+		err := index.Iterate(func(item Item) (stop bool, err error) {
 			if i > len(items)-1 {
 				return true, fmt.Errorf("got unexpected index item: %#v", item)
 			}
 			want := items[i]
-			checkIndexItem(t, item, want)
+			checkItem(t, item, want)
 			i++
 			return false, nil
-		})
+		}, nil)
 		if err != nil {
 			t.Fatal(err)
 		}
 	})
 
-	t.Run("from", func(t *testing.T) {
+	t.Run("start from", func(t *testing.T) {
 		startIndex := 2
 		i := startIndex
-		err := index.IterateFrom(items[startIndex], func(item IndexItem) (stop bool, err error) {
+		err := index.Iterate(func(item Item) (stop bool, err error) {
 			if i > len(items)-1 {
 				return true, fmt.Errorf("got unexpected index item: %#v", item)
 			}
 			want := items[i]
-			checkIndexItem(t, item, want)
+			checkItem(t, item, want)
 			i++
 			return false, nil
+		}, &IterateOptions{
+			StartFrom: &items[startIndex],
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+	})
+
+	t.Run("skip start from", func(t *testing.T) {
+		startIndex := 2
+		i := startIndex + 1
+		err := index.Iterate(func(item Item) (stop bool, err error) {
+			if i > len(items)-1 {
+				return true, fmt.Errorf("got unexpected index item: %#v", item)
+			}
+			want := items[i]
+			checkItem(t, item, want)
+			i++
+			return false, nil
+		}, &IterateOptions{
+			StartFrom:         &items[startIndex],
+			SkipStartFromItem: true,
 		})
 		if err != nil {
 			t.Fatal(err)
@@ -341,19 +364,19 @@ func TestIndex_iterate(t *testing.T) {
 		var i int
 		stopIndex := 3
 		var count int
-		err := index.IterateAll(func(item IndexItem) (stop bool, err error) {
+		err := index.Iterate(func(item Item) (stop bool, err error) {
 			if i > len(items)-1 {
 				return true, fmt.Errorf("got unexpected index item: %#v", item)
 			}
 			want := items[i]
-			checkIndexItem(t, item, want)
+			checkItem(t, item, want)
 			count++
 			if i == stopIndex {
 				return true, nil
 			}
 			i++
 			return false, nil
-		})
+		}, nil)
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -369,46 +392,378 @@ func TestIndex_iterate(t *testing.T) {
 			t.Fatal(err)
 		}
 
-		secondIndexItem := IndexItem{
+		secondItem := Item{
 			Address: []byte("iterate-hash-10"),
 			Data:    []byte("data-second"),
 		}
-		err = secondIndex.Put(secondIndexItem)
+		err = secondIndex.Put(secondItem)
 		if err != nil {
 			t.Fatal(err)
 		}
 
 		var i int
-		err = index.IterateAll(func(item IndexItem) (stop bool, err error) {
+		err = index.Iterate(func(item Item) (stop bool, err error) {
 			if i > len(items)-1 {
 				return true, fmt.Errorf("got unexpected index item: %#v", item)
 			}
 			want := items[i]
-			checkIndexItem(t, item, want)
+			checkItem(t, item, want)
 			i++
 			return false, nil
-		})
+		}, nil)
 		if err != nil {
 			t.Fatal(err)
 		}
 
 		i = 0
-		err = secondIndex.IterateAll(func(item IndexItem) (stop bool, err error) {
+		err = secondIndex.Iterate(func(item Item) (stop bool, err error) {
 			if i > 1 {
 				return true, fmt.Errorf("got unexpected index item: %#v", item)
 			}
-			checkIndexItem(t, item, secondIndexItem)
+			checkItem(t, item, secondItem)
+			i++
+			return false, nil
+		}, nil)
+		if err != nil {
+			t.Fatal(err)
+		}
+	})
+}
+
+// TestIndex_Iterate_withPrefix validates index Iterate
+// function for correctness.
+func TestIndex_Iterate_withPrefix(t *testing.T) {
+	db, cleanupFunc := newTestDB(t)
+	defer cleanupFunc()
+
+	index, err := db.NewIndex("retrieval", retrievalIndexFuncs)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	allItems := []Item{
+		{Address: []byte("want-hash-00"), Data: []byte("data80")},
+		{Address: []byte("skip-hash-01"), Data: []byte("data81")},
+		{Address: []byte("skip-hash-02"), Data: []byte("data82")},
+		{Address: []byte("skip-hash-03"), Data: []byte("data83")},
+		{Address: []byte("want-hash-04"), Data: []byte("data84")},
+		{Address: []byte("want-hash-05"), Data: []byte("data85")},
+		{Address: []byte("want-hash-06"), Data: []byte("data86")},
+		{Address: []byte("want-hash-07"), Data: []byte("data87")},
+		{Address: []byte("want-hash-08"), Data: []byte("data88")},
+		{Address: []byte("want-hash-09"), Data: []byte("data89")},
+		{Address: []byte("skip-hash-10"), Data: []byte("data90")},
+	}
+	batch := new(leveldb.Batch)
+	for _, i := range allItems {
+		index.PutInBatch(batch, i)
+	}
+	err = db.WriteBatch(batch)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	prefix := []byte("want")
+
+	items := make([]Item, 0)
+	for _, item := range allItems {
+		if bytes.HasPrefix(item.Address, prefix) {
+			items = append(items, item)
+		}
+	}
+	sort.SliceStable(items, func(i, j int) bool {
+		return bytes.Compare(items[i].Address, items[j].Address) < 0
+	})
+
+	t.Run("with prefix", func(t *testing.T) {
+		var i int
+		err := index.Iterate(func(item Item) (stop bool, err error) {
+			if i > len(items)-1 {
+				return true, fmt.Errorf("got unexpected index item: %#v", item)
+			}
+			want := items[i]
+			checkItem(t, item, want)
 			i++
 			return false, nil
+		}, &IterateOptions{
+			Prefix: prefix,
 		})
 		if err != nil {
 			t.Fatal(err)
 		}
+		if i != len(items) {
+			t.Errorf("got %v items, want %v", i, len(items))
+		}
+	})
+
+	t.Run("with prefix and start from", func(t *testing.T) {
+		startIndex := 2
+		var count int
+		i := startIndex
+		err := index.Iterate(func(item Item) (stop bool, err error) {
+			if i > len(items)-1 {
+				return true, fmt.Errorf("got unexpected index item: %#v", item)
+			}
+			want := items[i]
+			checkItem(t, item, want)
+			i++
+			count++
+			return false, nil
+		}, &IterateOptions{
+			StartFrom: &items[startIndex],
+			Prefix:    prefix,
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+		wantCount := len(items) - startIndex
+		if count != wantCount {
+			t.Errorf("got %v items, want %v", count, wantCount)
+		}
+	})
+
+	t.Run("with prefix and skip start from", func(t *testing.T) {
+		startIndex := 2
+		var count int
+		i := startIndex + 1
+		err := index.Iterate(func(item Item) (stop bool, err error) {
+			if i > len(items)-1 {
+				return true, fmt.Errorf("got unexpected index item: %#v", item)
+			}
+			want := items[i]
+			checkItem(t, item, want)
+			i++
+			count++
+			return false, nil
+		}, &IterateOptions{
+			StartFrom:         &items[startIndex],
+			SkipStartFromItem: true,
+			Prefix:            prefix,
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+		wantCount := len(items) - startIndex - 1
+		if count != wantCount {
+			t.Errorf("got %v items, want %v", count, wantCount)
+		}
+	})
+
+	t.Run("stop", func(t *testing.T) {
+		var i int
+		stopIndex := 3
+		var count int
+		err := index.Iterate(func(item Item) (stop bool, err error) {
+			if i > len(items)-1 {
+				return true, fmt.Errorf("got unexpected index item: %#v", item)
+			}
+			want := items[i]
+			checkItem(t, item, want)
+			count++
+			if i == stopIndex {
+				return true, nil
+			}
+			i++
+			return false, nil
+		}, &IterateOptions{
+			Prefix: prefix,
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+		wantItemsCount := stopIndex + 1
+		if count != wantItemsCount {
+			t.Errorf("got %v items, expected %v", count, wantItemsCount)
+		}
+	})
+
+	t.Run("no overflow", func(t *testing.T) {
+		secondIndex, err := db.NewIndex("second-index", retrievalIndexFuncs)
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		secondItem := Item{
+			Address: []byte("iterate-hash-10"),
+			Data:    []byte("data-second"),
+		}
+		err = secondIndex.Put(secondItem)
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		var i int
+		err = index.Iterate(func(item Item) (stop bool, err error) {
+			if i > len(items)-1 {
+				return true, fmt.Errorf("got unexpected index item: %#v", item)
+			}
+			want := items[i]
+			checkItem(t, item, want)
+			i++
+			return false, nil
+		}, &IterateOptions{
+			Prefix: prefix,
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+		if i != len(items) {
+			t.Errorf("got %v items, want %v", i, len(items))
+		}
+	})
+}
+
+// TestIndex_count tests if Index.Count and Index.CountFrom
+// returns the correct number of items.
+func TestIndex_count(t *testing.T) {
+	db, cleanupFunc := newTestDB(t)
+	defer cleanupFunc()
+
+	index, err := db.NewIndex("retrieval", retrievalIndexFuncs)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	items := []Item{
+		{
+			Address: []byte("iterate-hash-01"),
+			Data:    []byte("data80"),
+		},
+		{
+			Address: []byte("iterate-hash-02"),
+			Data:    []byte("data84"),
+		},
+		{
+			Address: []byte("iterate-hash-03"),
+			Data:    []byte("data22"),
+		},
+		{
+			Address: []byte("iterate-hash-04"),
+			Data:    []byte("data41"),
+		},
+		{
+			Address: []byte("iterate-hash-05"),
+			Data:    []byte("data1"),
+		},
+	}
+	batch := new(leveldb.Batch)
+	for _, i := range items {
+		index.PutInBatch(batch, i)
+	}
+	err = db.WriteBatch(batch)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	t.Run("Count", func(t *testing.T) {
+		got, err := index.Count()
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		want := len(items)
+		if got != want {
+			t.Errorf("got %v items count, want %v", got, want)
+		}
+	})
+
+	t.Run("CountFrom", func(t *testing.T) {
+		got, err := index.CountFrom(Item{
+			Address: items[1].Address,
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		want := len(items) - 1
+		if got != want {
+			t.Errorf("got %v items count, want %v", got, want)
+		}
+	})
+
+	// update the index with another item
+	t.Run("add item", func(t *testing.T) {
+		item04 := Item{
+			Address: []byte("iterate-hash-06"),
+			Data:    []byte("data0"),
+		}
+		err = index.Put(item04)
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		count := len(items) + 1
+
+		t.Run("Count", func(t *testing.T) {
+			got, err := index.Count()
+			if err != nil {
+				t.Fatal(err)
+			}
+
+			want := count
+			if got != want {
+				t.Errorf("got %v items count, want %v", got, want)
+			}
+		})
+
+		t.Run("CountFrom", func(t *testing.T) {
+			got, err := index.CountFrom(Item{
+				Address: items[1].Address,
+			})
+			if err != nil {
+				t.Fatal(err)
+			}
+
+			want := count - 1
+			if got != want {
+				t.Errorf("got %v items count, want %v", got, want)
+			}
+		})
+	})
+
+	// delete some items
+	t.Run("delete items", func(t *testing.T) {
+		deleteCount := 3
+
+		for _, item := range items[:deleteCount] {
+			err := index.Delete(item)
+			if err != nil {
+				t.Fatal(err)
+			}
+		}
+
+		count := len(items) + 1 - deleteCount
+
+		t.Run("Count", func(t *testing.T) {
+			got, err := index.Count()
+			if err != nil {
+				t.Fatal(err)
+			}
+
+			want := count
+			if got != want {
+				t.Errorf("got %v items count, want %v", got, want)
+			}
+		})
+
+		t.Run("CountFrom", func(t *testing.T) {
+			got, err := index.CountFrom(Item{
+				Address: items[deleteCount+1].Address,
+			})
+			if err != nil {
+				t.Fatal(err)
+			}
+
+			want := count - 1
+			if got != want {
+				t.Errorf("got %v items count, want %v", got, want)
+			}
+		})
 	})
 }
 
-// checkIndexItem is a test helper function that compares if two Index items are the same.
-func checkIndexItem(t *testing.T, got, want IndexItem) {
+// checkItem is a test helper function that compares if two Index items are the same.
+func checkItem(t *testing.T, got, want Item) {
 	t.Helper()
 
 	if !bytes.Equal(got.Address, want.Address) {