good morning!!!!

Skip to content
Snippets Groups Projects
Unverified Commit 75532ebf authored by Alex Sharov's avatar Alex Sharov Committed by GitHub
Browse files

remove (#2459)

parent 21cb7bef
No related branches found
No related tags found
No related merge requests found
pub mod types {
use arrayref::array_ref;
tonic::include_proto!("types");
macro_rules! U {
($proto:ty, $h:ty, $u:ty) => {
impl From<$u> for $proto {
fn from(value: $u) -> Self {
Self::from(<$h>::from(<[u8; <$h>::len_bytes()]>::from(value)))
}
}
impl From<$proto> for $u {
fn from(value: $proto) -> Self {
Self::from(<$h>::from(value).0)
}
}
};
}
// to PB
impl From<ethereum_types::H128> for H128 {
fn from(value: ethereum_types::H128) -> Self {
Self {
hi: u64::from_be_bytes(*array_ref!(value, 0, 8)),
lo: u64::from_be_bytes(*array_ref!(value, 8, 8)),
}
}
}
impl From<ethereum_types::H160> for H160 {
fn from(value: ethereum_types::H160) -> Self {
Self {
hi: Some(ethereum_types::H128::from_slice(&value[..16]).into()),
lo: u32::from_be_bytes(*array_ref!(value, 16, 4)),
}
}
}
impl From<ethereum_types::H256> for H256 {
fn from(value: ethereum_types::H256) -> Self {
Self {
hi: Some(ethereum_types::H128::from_slice(&value[..16]).into()),
lo: Some(ethereum_types::H128::from_slice(&value[16..]).into()),
}
}
}
impl From<ethereum_types::H512> for H512 {
fn from(value: ethereum_types::H512) -> Self {
Self {
hi: Some(ethereum_types::H256::from_slice(&value[..32]).into()),
lo: Some(ethereum_types::H256::from_slice(&value[32..]).into()),
}
}
}
// from PB
impl From<H128> for ethereum_types::H128 {
fn from(value: H128) -> Self {
let mut v = [0; Self::len_bytes()];
v[..8].copy_from_slice(&value.hi.to_be_bytes());
v[8..].copy_from_slice(&value.lo.to_be_bytes());
v.into()
}
}
impl From<H160> for ethereum_types::H160 {
fn from(value: H160) -> Self {
type H = ethereum_types::H128;
let mut v = [0; Self::len_bytes()];
v[..H::len_bytes()]
.copy_from_slice(H::from(value.hi.unwrap_or_default()).as_fixed_bytes());
v[H::len_bytes()..].copy_from_slice(&value.lo.to_be_bytes());
v.into()
}
}
impl From<H256> for ethereum_types::H256 {
fn from(value: H256) -> Self {
type H = ethereum_types::H128;
let mut v = [0; Self::len_bytes()];
v[..H::len_bytes()]
.copy_from_slice(H::from(value.hi.unwrap_or_default()).as_fixed_bytes());
v[H::len_bytes()..]
.copy_from_slice(H::from(value.lo.unwrap_or_default()).as_fixed_bytes());
v.into()
}
}
impl From<H512> for ethereum_types::H512 {
fn from(value: H512) -> Self {
type H = ethereum_types::H256;
let mut v = [0; Self::len_bytes()];
v[..H::len_bytes()]
.copy_from_slice(H::from(value.hi.unwrap_or_default()).as_fixed_bytes());
v[H::len_bytes()..]
.copy_from_slice(H::from(value.lo.unwrap_or_default()).as_fixed_bytes());
v.into()
}
}
U!(H128, ethereum_types::H128, ethereum_types::U128);
U!(H256, ethereum_types::H256, ethereum_types::U256);
U!(H512, ethereum_types::H512, ethereum_types::U512);
}
#[cfg(feature = "consensus")]
pub mod consensus {
tonic::include_proto!("consensus");
}
#[cfg(feature = "sentry")]
pub mod sentry {
tonic::include_proto!("sentry");
}
#[cfg(feature = "remotekv")]
pub mod remotekv {
tonic::include_proto!("remote");
}
#[cfg(feature = "snapshotsync")]
pub mod snapshotsync {
tonic::include_proto!("snapshotsync");
}
#[cfg(feature = "txpool")]
pub mod txpool {
tonic::include_proto!("txpool");
tonic::include_proto!("txpool_control");
}
package txpropagate
import (
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/types"
)
func (tp *TxPropagate) DeliverTransactions(peerID string, txs []types.Transaction, direct bool) {
tp.lock.Lock()
defer tp.lock.Unlock()
// Keep track of all the propagated transactions
//if direct {
// txReplyInMeter.Mark(int64(len(txs)))
//} else {
// txBroadcastInMeter.Mark(int64(len(txs)))
//}
// Push all the transactions into the pool, tracking underpriced ones to avoid
// re-requesting them and dropping the peer in case of malicious transfers.
var (
duplicate int64
underpriced int64
otherreject int64
)
errs := tp.txpool.AddRemotes(txs)
for i, err := range errs {
if err != nil {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.
if err == core.ErrUnderpriced || err == core.ErrReplaceUnderpriced {
for tp.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
tp.underpriced.Pop()
}
tp.underpriced.Add(txs[i].Hash())
}
// Track a few interesting failure types
switch err {
case nil: // Noop, but need to handle to not count these
case core.ErrAlreadyKnown:
duplicate++
case core.ErrUnderpriced, core.ErrReplaceUnderpriced:
underpriced++
default:
otherreject++
}
}
// TODO: remove this transaction from all other maps
/*
case delivery := <-f.cleanup:
// Independent if the delivery was direct or broadcast, remove all
// traces of the hash from internal trackers
for _, hash := range delivery.hashes {
if _, ok := f.waitlist[hash]; ok {
for peer, txset := range f.waitslots {
delete(txset, hash)
if len(txset) == 0 {
delete(f.waitslots, peer)
}
}
delete(f.waitlist, hash)
delete(f.waittime, hash)
} else {
for peer, txset := range f.announces {
delete(txset, hash)
if len(txset) == 0 {
delete(f.announces, peer)
}
}
delete(f.announced, hash)
delete(f.alternates, hash)
// If a transaction currently being fetched from a different
// origin was delivered (delivery stolen), mark it so the
// actual delivery won't double schedule it.
if origin, ok := f.fetching[hash]; ok && (origin != delivery.origin || !delivery.direct) {
stolen := f.requests[origin].stolen
if stolen == nil {
f.requests[origin].stolen = make(map[common.Hash]struct{})
stolen = f.requests[origin].stolen
}
stolen[hash] = struct{}{}
}
delete(f.fetching, hash)
}
}
// In case of a direct delivery, also reschedule anything missing
// from the original query
if delivery.direct {
// Mark the reqesting successful (independent of individual status)
//txRequestDoneMeter.Mark(int64(len(delivery.hashes)))
// Make sure something was pending, nuke it
req := f.requests[delivery.origin]
if req == nil {
log.Warn("Unexpected transaction delivery", "peer", delivery.origin)
break
}
delete(f.requests, delivery.origin)
// Anything not delivered should be re-scheduled (with or without
// this peer, depending on the response cutoff)
delivered := make(map[common.Hash]struct{})
for _, hash := range delivery.hashes {
delivered[hash] = struct{}{}
}
cutoff := len(req.hashes) // If nothing is delivered, assume everything is missing, don't retry!!!
for i, hash := range req.hashes {
if _, ok := delivered[hash]; ok {
cutoff = i
}
}
// Reschedule missing hashes from alternates, not-fulfilled from alt+self
for i, hash := range req.hashes {
// Skip rescheduling hashes already delivered by someone else
if req.stolen != nil {
if _, ok := req.stolen[hash]; ok {
continue
}
}
if _, ok := delivered[hash]; !ok {
if i < cutoff {
delete(f.alternates[hash], delivery.origin)
delete(f.announces[delivery.origin], hash)
if len(f.announces[delivery.origin]) == 0 {
delete(f.announces, delivery.origin)
}
}
if len(f.alternates[hash]) > 0 {
if _, ok := f.announced[hash]; ok {
panic(fmt.Sprintf("announced tracker already contains alternate item: %v", f.announced[hash]))
}
f.announced[hash] = f.alternates[hash]
}
}
delete(f.alternates, hash)
delete(f.fetching, hash)
}
// Something was delivered, try to rechedule requests
f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) // Partial delivery may enable others to deliver too
}
*/
}
//if direct {
// txReplyKnownMeter.Mark(duplicate)
// txReplyUnderpricedMeter.Mark(underpriced)
// txReplyOtherRejectMeter.Mark(otherreject)
//} else {
// txBroadcastKnownMeter.Mark(duplicate)
// txBroadcastUnderpricedMeter.Mark(underpriced)
// txBroadcastOtherRejectMeter.Mark(otherreject)
//}
}
func (tp *TxPropagate) DeliverAnnounces(peerID string, hashes []common.Hash) {
tp.lock.Lock()
defer tp.lock.Unlock()
// Skip any transaction announcements that we already know of, or that we've
// previously marked as cheap and discarded. This check is of course racey,
// because multiple concurrent notifies will still manage to pass it, but it's
// still valuable to check here because it runs concurrent to the internal
// loop, so anything caught here is time saved internally.
var (
duplicate, underpriced int64
)
for _, hash := range hashes {
if tp.txpool.Has(hash) {
duplicate++
continue
}
if tp.underpriced.Contains(hash) {
underpriced++
continue
}
tp.deliveredAnnounces[peerID][hash] = struct{}{}
}
//txAnnounceKnownMeter.Mark(duplicate)
//txAnnounceUnderpricedMeter.Mark(underpriced)
}
func (tp *TxPropagate) RequestTransactions(peerID string, hashes []common.Hash) error {
tp.requests = append(tp.requests, TxsRequest{PeerID: []byte(peerID), Hashes: hashes})
return nil
}
package txpropagate
import (
"sync"
mapset "github.com/deckarep/golang-set"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core"
)
// maxTxUnderpricedSetSize is the size of the underpriced transaction set that
// is used to track recent transactions that have been dropped so we don't
// re-request them.
const maxTxUnderpricedSetSize = 32768
type TxPropagate struct {
lock sync.RWMutex
txpool *core.TxPool
knownTxs map[string]mapset.Set
requests []TxsRequest
underpriced mapset.Set // Transactions discarded as too cheap (don't re-fetch)
deliveredAnnounces map[string]map[common.Hash]struct{}
/*
// ID 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]struct{} // Waiting announcement sgroupped by peer (DoS protection)
// ID 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
*/
}
func NewTxPropagate(txpool *core.TxPool) *TxPropagate {
return &TxPropagate{
txpool: txpool,
underpriced: mapset.NewSet(),
knownTxs: map[string]mapset.Set{},
/*
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
announced: make(map[common.Hash]map[string]struct{}),
*/
}
}
type TxsRequest struct {
Hashes []common.Hash
PeerID []byte
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment