From 1018bf6a00be475cb9af2812c1f61fa8530068a3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Fri, 24 Mar 2017 13:07:12 +0200
Subject: [PATCH] rpc: honour pending requests before tearing conn down (#3814)

---
 rpc/server.go | 42 ++++++++++++++++++++++++++++--------------
 1 file changed, 28 insertions(+), 14 deletions(-)

diff --git a/rpc/server.go b/rpc/server.go
index ca7e3c01a..8627b5592 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -21,6 +21,7 @@ import (
 	"fmt"
 	"reflect"
 	"runtime"
+	"sync"
 	"sync/atomic"
 
 	"github.com/ethereum/go-ethereum/log"
@@ -143,6 +144,8 @@ func hasOption(option CodecOption, options []CodecOption) bool {
 // requests until the codec returns an error when reading a request (in most cases
 // an EOF). It executes requests in parallel when singleShot is false.
 func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error {
+	var pend sync.WaitGroup
+
 	defer func() {
 		if err := recover(); err != nil {
 			const size = 64 << 10
@@ -150,7 +153,6 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
 			buf = buf[:runtime.Stack(buf, false)]
 			log.Error(fmt.Sprint(string(buf)))
 		}
-
 		s.codecsMu.Lock()
 		s.codecs.Remove(codec)
 		s.codecsMu.Unlock()
@@ -179,8 +181,13 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
 	for atomic.LoadInt32(&s.run) == 1 {
 		reqs, batch, err := s.readRequest(codec)
 		if err != nil {
-			log.Debug(fmt.Sprintf("read error %v\n", err))
-			codec.Write(codec.CreateErrorResponse(nil, err))
+			// If a parsing error occurred, send an error
+			if err.Error() != "EOF" {
+				log.Debug(fmt.Sprintf("read error %v\n", err))
+				codec.Write(codec.CreateErrorResponse(nil, err))
+			}
+			// Error or end of stream, wait for requests and tear down
+			pend.Wait()
 			return nil
 		}
 
@@ -199,20 +206,27 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
 			}
 			return nil
 		}
-
-		if singleShot && batch {
-			s.execBatch(ctx, codec, reqs)
-			return nil
-		} else if singleShot && !batch {
-			s.exec(ctx, codec, reqs[0])
+		// If a single shot request is executing, run and return immediately
+		if singleShot {
+			if batch {
+				s.execBatch(ctx, codec, reqs)
+			} else {
+				s.exec(ctx, codec, reqs[0])
+			}
 			return nil
-		} else if !singleShot && batch {
-			go s.execBatch(ctx, codec, reqs)
-		} else {
-			go s.exec(ctx, codec, reqs[0])
 		}
-	}
+		// For multi-shot connections, start a goroutine to serve and loop back
+		pend.Add(1)
 
+		go func(reqs []*serverRequest, batch bool) {
+			defer pend.Done()
+			if batch {
+				s.execBatch(ctx, codec, reqs)
+			} else {
+				s.exec(ctx, codec, reqs[0])
+			}
+		}(reqs, batch)
+	}
 	return nil
 }
 
-- 
GitLab