frequency_service/
service.rs

1#![allow(unused_imports)]
2//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
3
4// Originally from https://github.com/paritytech/cumulus/blob/master/parachain-template/node/src/service.rs
5
6// std
7use cumulus_client_cli::CollatorOptions;
8use frequency_runtime::RuntimeApi;
9use sc_client_api::Backend;
10use std::{ptr::addr_eq, sync::Arc, time::Duration};
11
12// RPC
13use common_primitives::node::{AccountId, Balance, Block, Hash, Index as Nonce};
14use jsonrpsee::RpcModule;
15use substrate_prometheus_endpoint::Registry;
16
17// Cumulus Imports
18use cumulus_client_collator::service::CollatorService;
19use cumulus_client_consensus_aura::{AuraConsensus, SlotProportion};
20use cumulus_client_consensus_common::{
21	ParachainBlockImport as TParachainBlockImport, ParachainConsensus,
22};
23use cumulus_client_consensus_proposer::Proposer;
24use cumulus_client_network::RequireSecondedInBlockAnnounce;
25use cumulus_client_service::{
26	build_network, build_relay_chain_interface, prepare_node_config, start_relay_chain_tasks,
27	BuildNetworkParams, CollatorSybilResistance, DARecoveryProfile, StartCollatorParams,
28	StartFullNodeParams, StartRelayChainTasksParams,
29};
30use cumulus_primitives_core::{
31	relay_chain::{CollatorPair, ValidationCode},
32	ParaId,
33};
34use cumulus_relay_chain_interface::{OverseerHandle, RelayChainError, RelayChainInterface};
35use sc_service::TransactionPoolOptions;
36
37// Substrate Imports
38use frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE;
39use futures::FutureExt;
40use sc_consensus::{ImportQueue, LongestChain};
41use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
42
43use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock, NetworkService};
44use sc_network_sync::SyncingService;
45use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager};
46use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
47use sc_transaction_pool_api::OffchainTransactionPoolFactory;
48use sp_blockchain::HeaderBackend;
49use sp_keystore::KeystorePtr;
50
51use common_runtime::prod_or_testnet_or_local;
52
53type FullBackend = TFullBackend<Block>;
54
55type MaybeFullSelectChain = Option<LongestChain<FullBackend, Block>>;
56
57#[cfg(not(feature = "runtime-benchmarks"))]
58type HostFunctions = (
59	cumulus_client_service::ParachainHostFunctions,
60	common_primitives::offchain::custom::HostFunctions,
61);
62
63#[cfg(feature = "runtime-benchmarks")]
64type HostFunctions = (
65	cumulus_client_service::ParachainHostFunctions,
66	frame_benchmarking::benchmarking::HostFunctions,
67	common_primitives::offchain::custom::HostFunctions,
68);
69
70use crate::common::start_offchain_workers;
71pub use frequency_runtime;
72
73type ParachainExecutor = WasmExecutor<HostFunctions>;
74
75/// Frequency parachain
76pub type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
77
78type ParachainBackend = TFullBackend<Block>;
79
80type ParachainBlockImport = TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
81
82/// Starts a `ServiceBuilder` for a full service.
83///
84/// Use this macro if you don't actually need the full service, but just the builder in order to
85/// be able to perform chain operations.
86#[allow(deprecated)]
87pub fn new_partial(
88	config: &Configuration,
89	instant_sealing: bool,
90	override_pool_config: Option<TransactionPoolOptions>,
91) -> Result<
92	PartialComponents<
93		ParachainClient,
94		ParachainBackend,
95		MaybeFullSelectChain,
96		sc_consensus::DefaultImportQueue<Block>,
97		sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient>,
98		(ParachainBlockImport, Option<Telemetry>, Option<TelemetryWorkerHandle>),
99	>,
100	sc_service::Error,
101> {
102	let telemetry = config
103		.telemetry_endpoints
104		.clone()
105		.filter(|x| !x.is_empty())
106		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
107			let worker = TelemetryWorker::new(16)?;
108			let telemetry = worker.handle().new_telemetry(endpoints);
109			Ok((worker, telemetry))
110		})
111		.transpose()?;
112
113	let heap_pages = config
114		.executor
115		.default_heap_pages
116		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static { extra_pages: h as _ });
117
118	let executor = ParachainExecutor::builder()
119		.with_execution_method(config.executor.wasm_method)
120		.with_onchain_heap_alloc_strategy(heap_pages)
121		.with_offchain_heap_alloc_strategy(heap_pages)
122		.with_max_runtime_instances(config.executor.max_runtime_instances)
123		.with_runtime_cache_size(config.executor.runtime_cache_size)
124		.build();
125
126	let (client, backend, keystore_container, task_manager) =
127		sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
128			config,
129			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
130			executor,
131			true,
132		)?;
133	let client = Arc::new(client);
134
135	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
136
137	let telemetry = telemetry.map(|(worker, telemetry)| {
138		task_manager.spawn_handle().spawn("telemetry", None, worker.run());
139		telemetry
140	});
141
142	let transaction_pool_option = override_pool_config.unwrap_or(config.transaction_pool.clone());
143
144	// See https://github.com/paritytech/polkadot-sdk/pull/4639 for how to enable the fork-aware pool.
145	let transaction_pool = Arc::from(
146		sc_transaction_pool::Builder::new(
147			task_manager.spawn_essential_handle(),
148			client.clone(),
149			config.role.is_authority().into(),
150		)
151		.with_options(transaction_pool_option)
152		.with_prometheus(config.prometheus_registry())
153		.build(),
154	);
155
156	let block_import = ParachainBlockImport::new(client.clone(), backend.clone());
157
158	#[cfg(feature = "frequency-no-relay")]
159	let import_queue = sc_consensus_manual_seal::import_queue(
160		Box::new(client.clone()),
161		&task_manager.spawn_essential_handle(),
162		config.prometheus_registry(),
163	);
164
165	#[cfg(not(feature = "frequency-no-relay"))]
166	let import_queue = build_import_queue(
167		client.clone(),
168		block_import.clone(),
169		config,
170		telemetry.as_ref().map(|telemetry| telemetry.handle()),
171		&task_manager,
172	);
173
174	let select_chain =
175		if instant_sealing { Some(LongestChain::new(backend.clone())) } else { None };
176
177	let params = PartialComponents {
178		backend,
179		client,
180		import_queue,
181		keystore_container,
182		task_manager,
183		transaction_pool,
184		select_chain,
185		other: (block_import, telemetry, telemetry_worker_handle),
186	};
187
188	Ok(params)
189}
190
191/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
192///
193/// This is the actual implementation that is abstract over the executor and the runtime api.
194#[allow(clippy::expect_used)]
195#[sc_tracing::logging::prefix_logs_with("Parachain")]
196#[cfg(any(not(feature = "frequency-no-relay"), feature = "frequency-lint-check"))]
197pub async fn start_parachain_node(
198	parachain_config: Configuration,
199	polkadot_config: Configuration,
200	collator_options: CollatorOptions,
201	para_id: ParaId,
202	hwbench: Option<sc_sysinfo::HwBench>,
203	override_pool_config: Option<TransactionPoolOptions>,
204) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
205	use crate::common::listen_addrs_to_normalized_strings;
206	use common_primitives::offchain::OcwCustomExt;
207	use sc_client_db::Backend;
208
209	let parachain_config = prepare_node_config(parachain_config);
210
211	let params = new_partial(&parachain_config, false, override_pool_config)?;
212	let (block_import, mut telemetry, telemetry_worker_handle) = params.other;
213
214	let prometheus_registry = parachain_config.prometheus_registry().cloned();
215	let net_config = FullNetworkConfiguration::<_, _, sc_network::NetworkWorker<Block, Hash>>::new(
216		&parachain_config.network,
217		prometheus_registry,
218	);
219
220	let client = params.client.clone();
221	let backend = params.backend.clone();
222	let mut task_manager = params.task_manager;
223
224	let (relay_chain_interface, collator_key, _relay_chain_network, _paranode_rx) =
225		build_relay_chain_interface(
226			polkadot_config,
227			&parachain_config,
228			telemetry_worker_handle,
229			&mut task_manager,
230			collator_options.clone(),
231			hwbench.clone(),
232		)
233		.await
234		.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
235
236	let validator = parachain_config.role.is_authority();
237	let prometheus_registry = parachain_config.prometheus_registry().cloned();
238	let transaction_pool = params.transaction_pool.clone();
239	let import_queue_service = params.import_queue.service();
240
241	// NOTE: because we use Aura here explicitly, we can use `CollatorSybilResistance::Resistant`
242	// when starting the network.
243	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
244		build_network(BuildNetworkParams {
245			parachain_config: &parachain_config,
246			net_config,
247			client: client.clone(),
248			transaction_pool: transaction_pool.clone(),
249			para_id,
250			spawn_handle: task_manager.spawn_handle(),
251			relay_chain_interface: relay_chain_interface.clone(),
252			import_queue: params.import_queue,
253			sybil_resistance_level: CollatorSybilResistance::Resistant,
254			metrics: sc_network::NetworkWorker::<Block, Hash>::register_notification_metrics(
255				parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
256			),
257		})
258		.await?;
259
260	// Start off-chain workers if enabled
261	if parachain_config.offchain_worker.enabled {
262		log::info!("OFFCHAIN WORKER is Enabled!");
263		start_offchain_workers(
264			&client,
265			&parachain_config,
266			Some(params.keystore_container.keystore()),
267			&backend,
268			Some(OffchainTransactionPoolFactory::new(transaction_pool.clone())),
269			Arc::new(network.clone()),
270			&task_manager,
271		);
272	}
273
274	let rpc_builder = {
275		let client = client.clone();
276		let transaction_pool = transaction_pool.clone();
277
278		let backend = match parachain_config.offchain_worker.enabled {
279			true => backend.offchain_storage(),
280			false => None,
281		};
282
283		Box::new(move |_| {
284			let deps = crate::rpc::FullDeps {
285				client: client.clone(),
286				pool: transaction_pool.clone(),
287				command_sink: None,
288			};
289
290			crate::rpc::create_full(deps, backend.clone()).map_err(Into::into)
291		})
292	};
293
294	sc_service::spawn_tasks(sc_service::SpawnTasksParams {
295		rpc_builder,
296		client: client.clone(),
297		transaction_pool: transaction_pool.clone(),
298		task_manager: &mut task_manager,
299		config: parachain_config,
300		keystore: params.keystore_container.keystore(),
301		backend: backend.clone(),
302		network: network.clone(),
303		sync_service: sync_service.clone(),
304		system_rpc_tx,
305		tx_handler_controller,
306		telemetry: telemetry.as_mut(),
307	})?;
308
309	if let Some(hwbench) = hwbench {
310		sc_sysinfo::print_hwbench(&hwbench);
311		let is_rc_authority = false;
312		// Here you can check whether the hardware meets your chains' requirements. Putting a link
313		// in there and swapping out the requirements for your own are probably a good idea. The
314		// requirements for a para-chain are dictated by its relay-chain.
315		if validator &&
316			SUBSTRATE_REFERENCE_HARDWARE.check_hardware(&hwbench, is_rc_authority).is_err()
317		{
318			log::warn!(
319				"⚠️  The hardware does not meet the minimal requirements for role 'Authority'."
320			);
321		}
322
323		if let Some(ref mut telemetry) = telemetry {
324			let telemetry_handle = telemetry.handle();
325			task_manager.spawn_handle().spawn(
326				"telemetry_hwbench",
327				None,
328				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
329			);
330		}
331	}
332
333	let announce_block = {
334		let sync_service = sync_service.clone();
335		Arc::new(move |hash, data| sync_service.announce_block(hash, data))
336	};
337
338	let relay_chain_slot_duration = Duration::from_secs(6);
339
340	let overseer_handle = relay_chain_interface
341		.overseer_handle()
342		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
343
344	start_relay_chain_tasks(StartRelayChainTasksParams {
345		client: client.clone(),
346		announce_block: announce_block.clone(),
347		para_id,
348		relay_chain_interface: relay_chain_interface.clone(),
349		task_manager: &mut task_manager,
350		da_recovery_profile: if validator {
351			DARecoveryProfile::Collator
352		} else {
353			DARecoveryProfile::FullNode
354		},
355		import_queue: import_queue_service,
356		relay_chain_slot_duration,
357		recovery_handle: Box::new(overseer_handle.clone()),
358		sync_service: sync_service.clone(),
359		prometheus_registry: prometheus_registry.as_ref(),
360	})?;
361
362	if validator {
363		start_consensus(
364			client.clone(),
365			backend.clone(),
366			block_import,
367			prometheus_registry.as_ref(),
368			telemetry.as_ref().map(|t| t.handle()),
369			&task_manager,
370			relay_chain_interface.clone(),
371			transaction_pool,
372			sync_service.clone(),
373			params.keystore_container.keystore(),
374			relay_chain_slot_duration,
375			para_id,
376			collator_key.expect("Command line arguments do not allow this. qed"),
377			overseer_handle,
378			announce_block,
379		)?;
380	}
381
382	Ok((task_manager, client))
383}
384
385/// Build the import queue for the parachain runtime.
386#[cfg(not(feature = "frequency-no-relay"))]
387/// Build the import queue for the parachain runtime.
388fn build_import_queue(
389	client: Arc<ParachainClient>,
390	block_import: ParachainBlockImport,
391	config: &Configuration,
392	telemetry: Option<TelemetryHandle>,
393	task_manager: &TaskManager,
394) -> sc_consensus::DefaultImportQueue<Block> {
395	cumulus_client_consensus_aura::equivocation_import_queue::fully_verifying_import_queue::<
396		sp_consensus_aura::sr25519::AuthorityPair,
397		_,
398		_,
399		_,
400		_,
401	>(
402		client,
403		block_import,
404		move |_, _| async move {
405			let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
406			Ok(timestamp)
407		},
408		&task_manager.spawn_essential_handle(),
409		config.prometheus_registry(),
410		telemetry,
411	)
412}
413
414#[cfg(any(not(feature = "frequency-no-relay"), feature = "frequency-lint-check"))]
415fn start_consensus(
416	client: Arc<ParachainClient>,
417	backend: Arc<ParachainBackend>,
418	block_import: ParachainBlockImport,
419	prometheus_registry: Option<&Registry>,
420	telemetry: Option<TelemetryHandle>,
421	task_manager: &TaskManager,
422	relay_chain_interface: Arc<dyn RelayChainInterface>,
423	transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient>>,
424	_sync_oracle: Arc<SyncingService<Block>>,
425	keystore: KeystorePtr,
426	relay_chain_slot_duration: Duration,
427	para_id: ParaId,
428	collator_key: CollatorPair,
429	overseer_handle: OverseerHandle,
430	announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
431) -> Result<(), sc_service::Error> {
432	use cumulus_client_consensus_aura::collators::lookahead::{self as aura, Params as AuraParams};
433
434	let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
435		task_manager.spawn_handle(),
436		client.clone(),
437		transaction_pool,
438		prometheus_registry,
439		telemetry.clone(),
440	);
441
442	let proposer = Proposer::new(proposer_factory);
443
444	let collator_service = CollatorService::new(
445		client.clone(),
446		Arc::new(task_manager.spawn_handle()),
447		announce_block,
448		client.clone(),
449	);
450
451	let params = AuraParams {
452		create_inherent_data_providers: move |_, ()| async move { Ok(()) },
453		block_import,
454		para_client: client.clone(),
455		para_backend: backend.clone(),
456		relay_client: relay_chain_interface,
457		code_hash_provider: move |block_hash| {
458			client.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
459		},
460		keystore,
461		collator_key,
462		para_id,
463		overseer_handle,
464		relay_chain_slot_duration,
465		proposer,
466		collator_service,
467		authoring_duration: Duration::from_millis(prod_or_testnet_or_local!(500, 2000, 2000)),
468		reinitialize: false,
469		max_pov_percentage: None, // default 50%
470	};
471
472	let fut = aura::run::<Block, sp_consensus_aura::sr25519::AuthorityPair, _, _, _, _, _, _, _, _>(
473		params,
474	);
475
476	task_manager.spawn_essential_handle().spawn("aura", None, fut);
477
478	Ok(())
479}