good morning!!!!

Skip to content
Snippets Groups Projects
backend.go 6.86 KiB
Newer Older
package eth

import (
	"fmt"
	"sync"

	"github.com/ethereum/go-ethereum/core"
	"github.com/ethereum/go-ethereum/crypto"
	"github.com/ethereum/go-ethereum/ethdb"
	"github.com/ethereum/go-ethereum/ethutil"
	"github.com/ethereum/go-ethereum/event"
	ethlogger "github.com/ethereum/go-ethereum/logger"
	"github.com/ethereum/go-ethereum/p2p"
	"github.com/ethereum/go-ethereum/p2p/discover"
	"github.com/ethereum/go-ethereum/pow/ezp"
	"github.com/ethereum/go-ethereum/rpc"
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	"github.com/ethereum/go-ethereum/whisper"
type Config struct {
	Name     string
	KeyStore string
	DataDir  string
	LogFile  string
	LogLevel int
	KeyRing  string

	MaxPeers   int
	Port       string
	NATType    string
	PMPGateway string

Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	Shh  bool
	Dial bool

	KeyManager *crypto.KeyManager
}

var logger = ethlogger.NewLogger("SERV")

type Ethereum struct {
	// Channel for shutting down the ethereum
	shutdownChan chan bool
	quit         chan bool

	// DB interface
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	db        ethutil.Database
	blacklist p2p.Blacklist

	//*** SERVICES ***
	// State manager for processing new blocks and managing the over all states
	blockProcessor *core.BlockProcessor
	txPool         *core.TxPool
	chainManager   *core.ChainManager
	blockPool      *BlockPool
	whisper        *whisper.Whisper
	net      *p2p.Server
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	eventMux *event.TypeMux
	txSub    event.Subscription
	blockSub event.Subscription
	RpcServer  rpc.RpcServer
	WsServer   rpc.RpcServer
	keyManager *crypto.KeyManager


	synclock  sync.Mutex
	syncGroup sync.WaitGroup

Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	Mining bool
func New(config *Config) (*Ethereum, error) {
	// Boostrap database
	logger := ethlogger.New(config.DataDir, config.LogFile, config.LogLevel)
	db, err := ethdb.NewLDBDatabase("blockchain")
	if err != nil {
		return nil, err
	}

	// Perform database sanity checks
	d, _ := db.Get([]byte("ProtocolVersion"))
	protov := ethutil.NewValue(d).Uint()
	if protov != ProtocolVersion && protov != 0 {
		return nil, fmt.Errorf("Database version mismatch. Protocol(%d / %d). `rm -rf %s`", protov, ProtocolVersion, ethutil.Config.ExecPath+"/database")
	}

	// Create new keymanager
	var keyManager *crypto.KeyManager
	switch config.KeyStore {
	case "db":
		keyManager = crypto.NewDBKeyManager(db)
	case "file":
		keyManager = crypto.NewFileKeyManager(config.DataDir)
	default:
		return nil, fmt.Errorf("unknown keystore type: %s", config.KeyStore)
	}
	// Initialise the keyring
	keyManager.Init(config.KeyRing, 0, false)

	saveProtocolVersion(db)
	//ethutil.Config.Db = db
		shutdownChan: make(chan bool),
		quit:         make(chan bool),
		db:           db,
		keyManager:   keyManager,
		blacklist:    p2p.NewBlacklist(),
		eventMux:     &event.TypeMux{},
		logger:       logger,
	eth.chainManager = core.NewChainManager(db, eth.EventMux())
	eth.txPool = core.NewTxPool(eth.EventMux())
	eth.blockProcessor = core.NewBlockProcessor(db, eth.txPool, eth.chainManager, eth.EventMux())
	eth.chainManager.SetProcessor(eth.blockProcessor)
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	eth.whisper = whisper.New()

	hasBlock := eth.chainManager.HasBlock
	insertChain := eth.chainManager.InsertChain
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	eth.blockPool = NewBlockPool(hasBlock, insertChain, ezp.Verify)

	ethProto := EthProtocol(eth.txPool, eth.chainManager, eth.blockPool)
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	protocols := []p2p.Protocol{ethProto, eth.whisper.Protocol()}
	nat, err := p2p.ParseNAT(config.NATType, config.PMPGateway)
	if err != nil {
		return nil, err
	}
	netprv, err := crypto.GenerateKey()
	if err != nil {
		return nil, fmt.Errorf("could not generate server key: %v", err)
	}
	eth.net = &p2p.Server{
		PrivateKey: netprv,
		Name:       config.Name,
		MaxPeers:   config.MaxPeers,
		Protocols:  protocols,
		Blacklist:  eth.blacklist,
		NAT:        nat,
		NoDial:     !config.Dial,
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	}
	if len(config.Port) > 0 {
		eth.net.ListenAddr = ":" + config.Port
	}

	return eth, nil
}

func (s *Ethereum) KeyManager() *crypto.KeyManager {
	return s.keyManager
}

func (s *Ethereum) Logger() ethlogger.LogSystem {
	return s.logger
}

func (s *Ethereum) Name() string {
	return s.net.Name
}

func (s *Ethereum) ChainManager() *core.ChainManager {
	return s.chainManager
}

func (s *Ethereum) BlockProcessor() *core.BlockProcessor {
	return s.blockProcessor
}

func (s *Ethereum) TxPool() *core.TxPool {
	return s.txPool
}

func (s *Ethereum) BlockPool() *BlockPool {
	return s.blockPool
}

Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
func (s *Ethereum) Whisper() *whisper.Whisper {
	return s.whisper
}

func (s *Ethereum) EventMux() *event.TypeMux {
	return s.eventMux
}
func (self *Ethereum) Db() ethutil.Database {
	return self.db
}

func (s *Ethereum) IsMining() bool {
	return s.Mining
}

func (s *Ethereum) IsListening() bool {
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	// XXX TODO
	return false
}

func (s *Ethereum) PeerCount() int {
	return s.net.PeerCount()
}

func (s *Ethereum) Peers() []*p2p.Peer {
	return s.net.Peers()
	return s.net.MaxPeers
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
func (s *Ethereum) Coinbase() []byte {
	return nil // TODO
}

// Start the ethereum
func (s *Ethereum) Start(seedNode string) error {
	err := s.net.Start()
	if err != nil {
		return err
	}
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed

	// Start services
	s.txPool.Start()
	s.blockPool.Start()
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed

	if s.whisper != nil {
		s.whisper.Start()
	}

	// broadcast transactions
	s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
	go s.txBroadcastLoop()

	// broadcast mined blocks
	s.blockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{})
	go s.blockBroadcastLoop()

	logger.Infoln("Server started")
	return nil
}

func (self *Ethereum) SuggestPeer(nodeURL string) error {
	n, err := discover.ParseNode(nodeURL)
		return fmt.Errorf("invalid node URL: %v", err)
	return nil
}

func (s *Ethereum) Stop() {
	// Close the database
	defer s.db.Close()

	close(s.quit)

	s.txSub.Unsubscribe()    // quits txBroadcastLoop
	s.blockSub.Unsubscribe() // quits blockBroadcastLoop

	if s.RpcServer != nil {
		s.RpcServer.Stop()
	}
	if s.WsServer != nil {
		s.WsServer.Stop()
	}
	s.txPool.Stop()
	s.eventMux.Stop()
	s.blockPool.Stop()
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	if s.whisper != nil {
		s.whisper.Stop()
	}

	logger.Infoln("Server stopped")
	close(s.shutdownChan)
}

// This function will wait for a shutdown and resumes main thread execution
func (s *Ethereum) WaitForShutdown() {
	<-s.shutdownChan
}

// now tx broadcasting is taken out of txPool
// handled here via subscription, efficiency?
func (self *Ethereum) txBroadcastLoop() {
	// automatically stops if unsubscribe
	for obj := range self.txSub.Chan() {
		event := obj.(core.TxPreEvent)
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
		self.net.Broadcast("eth", TxMsg, event.Tx.RlpData())
	}
}

func (self *Ethereum) blockBroadcastLoop() {
	// automatically stops if unsubscribe
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
	for obj := range self.blockSub.Chan() {
		switch ev := obj.(type) {
		case core.NewMinedBlockEvent:
Jeffrey Wilcke's avatar
Jeffrey Wilcke committed
			self.net.Broadcast("eth", NewBlockMsg, ev.Block.RlpData(), ev.Block.Td)
	}
}

func saveProtocolVersion(db ethutil.Database) {
	d, _ := db.Get([]byte("ProtocolVersion"))
	protocolVersion := ethutil.NewValue(d).Uint()

	if protocolVersion == 0 {
		db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
	}
}