Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Hook RPC extrinsic import into propagation (#158)
Browse files Browse the repository at this point in the history
* call `on_new_transactions` when we import

* fix trace

* pass correct bytes to network

* clean up

* cull before repropagating; repropagate on timer

* add a little tracing
  • Loading branch information
rphmeier authored and gavofyork committed May 15, 2018
1 parent e1bb6bc commit 58223b0
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 44 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions polkadot/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ tokio-core = "0.1.12"
futures = "0.1.17"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
fdlimit = "0.1"
parking_lot = "0.4"
substrate-client = { path = "../../substrate/client" }
substrate-network = { path = "../../substrate/network" }
substrate-codec = { path = "../../substrate/codec" }
substrate-runtime-support = { path = "../../substrate/runtime-support" }
substrate-state-machine = { path = "../../substrate/state-machine" }
substrate-executor = { path = "../../substrate/executor" }
substrate-primitives = { path = "../../substrate/primitives" }
substrate-rpc = { path = "../../substrate/rpc" }
substrate-rpc-servers = { path = "../../substrate/rpc-servers" }
polkadot-primitives = { path = "../primitives" }
polkadot-executor = { path = "../executor" }
polkadot-runtime = { path = "../runtime" }
polkadot-service = { path = "../service" }
polkadot-transaction-pool = { path = "../transaction-pool" }
39 changes: 38 additions & 1 deletion polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ extern crate ctrlc;
extern crate fdlimit;
extern crate ed25519;
extern crate triehash;
extern crate parking_lot;

extern crate substrate_codec as codec;
extern crate substrate_state_machine as state_machine;
extern crate substrate_client as client;
extern crate substrate_primitives as primitives;
extern crate substrate_network as network;
extern crate substrate_rpc;
extern crate substrate_rpc_servers as rpc;
extern crate substrate_runtime_support as runtime_support;
extern crate polkadot_primitives;
extern crate polkadot_executor;
extern crate polkadot_runtime;
extern crate polkadot_service as service;
extern crate polkadot_transaction_pool as txpool;

#[macro_use]
extern crate lazy_static;
Expand All @@ -57,10 +61,39 @@ mod informant;
use std::io;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
use tokio_core::reactor;
use parking_lot::Mutex;
use service::ChainSpec;
use primitives::block::Extrinsic;

struct RpcTransactionPool {
inner: Arc<Mutex<txpool::TransactionPool>>,
network: Arc<network::Service>,
}

impl substrate_rpc::author::AuthorApi for RpcTransactionPool {
fn submit_extrinsic(&self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> {
use primitives::hexdisplay::HexDisplay;
use polkadot_runtime::UncheckedExtrinsic;
use codec::Slicable;

info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0));
let decoded = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s))
.ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?;

info!("Correctly formatted: {:?}", decoded);

self.inner.lock().import(decoded)
.map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError)?;

self.network.trigger_repropagate();
Ok(())
}
}

/// Parse command line arguments and start the node.
///
Expand Down Expand Up @@ -172,7 +205,11 @@ pub fn run<I, T>(args: I) -> error::Result<()> where

let handler = || {
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
rpc::rpc_handler(service.client(), chain, service.transaction_pool())
let pool = RpcTransactionPool {
inner: service.transaction_pool(),
network: service.network(),
};
rpc::rpc_handler(service.client(), chain, pool)
};
(
start_server(http_address, |address| rpc::start_http(address, handler())),
Expand Down
16 changes: 0 additions & 16 deletions polkadot/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
extern crate ed25519;
extern crate ethereum_types;
extern crate substrate_codec as codec;
extern crate substrate_rpc;
extern crate substrate_primitives as substrate_primitives;
extern crate substrate_runtime_primitives as substrate_runtime_primitives;
extern crate polkadot_runtime as runtime;
Expand All @@ -35,10 +34,8 @@ use std::collections::HashMap;
use std::cmp::Ordering;
use std::sync::Arc;

use codec::Slicable;
use polkadot_api::PolkadotApi;
use primitives::{AccountId, Timestamp};
use substrate_primitives::block::Extrinsic;
use runtime::{Block, UncheckedExtrinsic, TimestampCall, Call};
use substrate_runtime_primitives::traits::Checkable;
use transaction_pool::{Pool, Readiness};
Expand Down Expand Up @@ -380,19 +377,6 @@ impl TransactionPool {
}
}

impl substrate_rpc::author::AsyncAuthorApi for TransactionPool {
fn submit_extrinsic(&mut self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> {
use substrate_primitives::hexdisplay::HexDisplay;
info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0));
let xt = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s))
.ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?;
info!("Correctly formatted: {:?}", xt);
self.import(xt)
.map(|_| ())
.map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError.into())
}
}

#[cfg(test)]
mod tests {
}
11 changes: 8 additions & 3 deletions substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ impl Protocol {
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
return;
}
trace!(target: "sync", "Received {} transactions from {}", peer_id, transactions.len());
trace!(target: "sync", "Received {} transactions from {}", transactions.len(), peer_id);
let mut peers = self.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
for t in transactions {
Expand All @@ -445,12 +445,17 @@ impl Protocol {
}
}

/// Called when peer sends us new transactions
pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
/// Called when we propagate ready transactions to peers.
pub fn propagate_transactions(&self, io: &mut SyncIo) {
debug!(target: "sync", "Propagating transactions");

// Accept transactions only when fully synced
if self.sync.read().status().state != SyncState::Idle {
return;
}

let transactions = self.transaction_pool.transactions();

let mut peers = self.peers.write();
for (peer_id, ref mut peer) in peers.iter_mut() {
let to_send: Vec<_> = transactions.iter().filter_map(|&(hash, ref t)|
Expand Down
25 changes: 20 additions & 5 deletions substrate/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::sync::Arc;
use std::collections::{BTreeMap};
use std::io;
use std::time::Duration;
use futures::sync::{oneshot, mpsc};
use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
Expand All @@ -41,6 +42,12 @@ pub type StatementStream = mpsc::UnboundedReceiver<Statement>;
/// Type that represents bft messages stream.
pub type BftMessageStream = mpsc::UnboundedReceiver<LocalizedBftMessage>;

const TICK_TOKEN: TimerToken = 0;
const TICK_TIMEOUT: Duration = Duration::from_millis(1000);

const PROPAGATE_TOKEN: TimerToken = 1;
const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000);

bitflags! {
/// Node roles bitmask.
pub struct Role: u32 {
Expand Down Expand Up @@ -162,9 +169,9 @@ impl Service {
}

/// Called when new transactons are imported by the client.
pub fn on_new_transactions(&self, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
pub fn trigger_repropagate(&self) {
self.network.with_context(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions);
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context));
});
}

Expand Down Expand Up @@ -268,7 +275,11 @@ impl ConsensusService for Service {

impl NetworkProtocolHandler for ProtocolHandler {
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
io.register_timer(0, ::std::time::Duration::from_millis(1000)).expect("Error registering sync timer");
io.register_timer(TICK_TOKEN, TICK_TIMEOUT)
.expect("Error registering sync timer");

io.register_timer(PROPAGATE_TOKEN, PROPAGATE_TIMEOUT)
.expect("Error registering transaction propagation timer");
}

fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) {
Expand All @@ -283,8 +294,12 @@ impl NetworkProtocolHandler for ProtocolHandler {
self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer);
}

fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
self.protocol.tick(&mut NetSyncIo::new(io));
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
match timer {
TICK_TOKEN => self.protocol.tick(&mut NetSyncIo::new(io)),
PROPAGATE_TOKEN => self.protocol.propagate_transactions(&mut NetSyncIo::new(io)),
_ => {}
}
}
}

Expand Down
14 changes: 0 additions & 14 deletions substrate/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

//! Substrate block-author/full-node API.
use std::sync::Arc;
use parking_lot::Mutex;
use primitives::block::Extrinsic;

pub mod error;
Expand All @@ -35,15 +33,3 @@ build_rpc_trait! {
fn submit_extrinsic(&self, Extrinsic) -> Result<()>;
}
}

/// Variant of the AuthorApi that doesn't need to be Sync + Send + 'static.
pub trait AsyncAuthorApi: Send + 'static {
/// Submit extrinsic for inclusion in block.
fn submit_extrinsic(&mut self, Extrinsic) -> Result<()>;
}

impl<T: AsyncAuthorApi> AuthorApi for Arc<Mutex<T>> {
fn submit_extrinsic(&self, xt: Extrinsic) -> Result<()> {
self.as_ref().lock().submit_extrinsic(xt)
}
}
14 changes: 9 additions & 5 deletions substrate/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,24 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use primitives::block;
use super::*;
use super::error::*;

use std::sync::Arc;
use parking_lot::Mutex;
use primitives::block;

#[derive(Default)]
struct DummyTxPool {
submitted: Vec<block::Extrinsic>,
}

impl AsyncAuthorApi for DummyTxPool {
impl AuthorApi for Arc<Mutex<DummyTxPool>> {
/// Submit extrinsic for inclusion in block.
fn submit_extrinsic(&mut self, xt: Extrinsic) -> Result<()> {
if self.submitted.len() < 1 {
self.submitted.push(xt);
fn submit_extrinsic(&self, xt: Extrinsic) -> Result<()> {
let mut s = self.lock();
if s.submitted.len() < 1 {
s.submitted.push(xt);
Ok(())
} else {
Err(ErrorKind::PoolError.into())
Expand Down

0 comments on commit 58223b0

Please sign in to comment.