frequency_service/
service.rs

1#![allow(unused_imports)]
2//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
3
4// Originally from https://github.com/paritytech/cumulus/blob/master/parachain-template/node/src/service.rs
5
6// std
7use cumulus_client_cli::CollatorOptions;
8use frequency_runtime::RuntimeApi;
9use sc_client_api::Backend;
10use std::{ptr::addr_eq, sync::Arc, time::Duration};
11
12// RPC
13use common_primitives::node::{AccountId, Balance, Block, Hash, Index as Nonce};
14use jsonrpsee::RpcModule;
15use substrate_prometheus_endpoint::Registry;
16
17// Cumulus Imports
18use cumulus_client_collator::service::CollatorService;
19use cumulus_client_consensus_aura::{AuraConsensus, SlotProportion};
20use cumulus_client_consensus_common::{
21	ParachainBlockImport as TParachainBlockImport, ParachainConsensus,
22};
23use cumulus_client_consensus_proposer::Proposer;
24use cumulus_client_network::RequireSecondedInBlockAnnounce;
25use cumulus_client_service::{
26	build_network, build_relay_chain_interface, prepare_node_config, start_relay_chain_tasks,
27	BuildNetworkParams, CollatorSybilResistance, DARecoveryProfile, StartCollatorParams,
28	StartFullNodeParams, StartRelayChainTasksParams,
29};
30use cumulus_primitives_core::{
31	relay_chain::{CollatorPair, ValidationCode},
32	ParaId,
33};
34use cumulus_relay_chain_interface::{OverseerHandle, RelayChainError, RelayChainInterface};
35use sc_service::TransactionPoolOptions;
36
37// Substrate Imports
38use frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE;
39use futures::FutureExt;
40use sc_consensus::{ImportQueue, LongestChain};
41use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
42
43use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock, NetworkService};
44use sc_network_sync::SyncingService;
45use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager};
46use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
47use sc_transaction_pool_api::OffchainTransactionPoolFactory;
48use sp_blockchain::HeaderBackend;
49use sp_keystore::KeystorePtr;
50
51use common_runtime::prod_or_testnet_or_local;
52
53type FullBackend = TFullBackend<Block>;
54
55type MaybeFullSelectChain = Option<LongestChain<FullBackend, Block>>;
56
57#[cfg(not(any(feature = "runtime-benchmarks", feature = "no-custom-host-functions")))]
58type HostFunctions = (
59	cumulus_client_service::ParachainHostFunctions,
60	common_primitives::offchain::custom::HostFunctions,
61);
62
63#[cfg(feature = "runtime-benchmarks")]
64type HostFunctions = (
65	cumulus_client_service::ParachainHostFunctions,
66	frame_benchmarking::benchmarking::HostFunctions,
67	common_primitives::offchain::custom::HostFunctions,
68);
69
70#[cfg(feature = "no-custom-host-functions")]
71type HostFunctions = (cumulus_client_service::ParachainHostFunctions,);
72
73use crate::common::start_offchain_workers;
74pub use frequency_runtime;
75
76type ParachainExecutor = WasmExecutor<HostFunctions>;
77
78/// Frequency parachain
79pub type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
80
81type ParachainBackend = TFullBackend<Block>;
82
83type ParachainBlockImport = TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
84
85/// Starts a `ServiceBuilder` for a full service.
86///
87/// Use this macro if you don't actually need the full service, but just the builder in order to
88/// be able to perform chain operations.
89#[allow(deprecated, clippy::result_large_err)]
90pub fn new_partial(
91	config: &Configuration,
92	instant_sealing: bool,
93	override_pool_config: Option<TransactionPoolOptions>,
94) -> Result<
95	PartialComponents<
96		ParachainClient,
97		ParachainBackend,
98		MaybeFullSelectChain,
99		sc_consensus::DefaultImportQueue<Block>,
100		sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient>,
101		(ParachainBlockImport, Option<Telemetry>, Option<TelemetryWorkerHandle>),
102	>,
103	sc_service::Error,
104> {
105	let telemetry = config
106		.telemetry_endpoints
107		.clone()
108		.filter(|x| !x.is_empty())
109		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
110			let worker = TelemetryWorker::new(16)?;
111			let telemetry = worker.handle().new_telemetry(endpoints);
112			Ok((worker, telemetry))
113		})
114		.transpose()?;
115
116	let heap_pages = config
117		.executor
118		.default_heap_pages
119		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static { extra_pages: h as _ });
120
121	let executor = ParachainExecutor::builder()
122		.with_execution_method(config.executor.wasm_method)
123		.with_onchain_heap_alloc_strategy(heap_pages)
124		.with_offchain_heap_alloc_strategy(heap_pages)
125		.with_max_runtime_instances(config.executor.max_runtime_instances)
126		.with_runtime_cache_size(config.executor.runtime_cache_size)
127		.build();
128
129	let (client, backend, keystore_container, task_manager) =
130		sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
131			config,
132			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
133			executor,
134			true,
135		)?;
136	let client = Arc::new(client);
137
138	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
139
140	let telemetry = telemetry.map(|(worker, telemetry)| {
141		task_manager.spawn_handle().spawn("telemetry", None, worker.run());
142		telemetry
143	});
144
145	let transaction_pool_option = override_pool_config.unwrap_or(config.transaction_pool.clone());
146
147	// See https://github.com/paritytech/polkadot-sdk/pull/4639 for how to enable the fork-aware pool.
148	let transaction_pool = Arc::from(
149		sc_transaction_pool::Builder::new(
150			task_manager.spawn_essential_handle(),
151			client.clone(),
152			config.role.is_authority().into(),
153		)
154		.with_options(transaction_pool_option)
155		.with_prometheus(config.prometheus_registry())
156		.build(),
157	);
158
159	let block_import = ParachainBlockImport::new(client.clone(), backend.clone());
160
161	#[cfg(feature = "frequency-no-relay")]
162	let import_queue = sc_consensus_manual_seal::import_queue(
163		Box::new(client.clone()),
164		&task_manager.spawn_essential_handle(),
165		config.prometheus_registry(),
166	);
167
168	#[cfg(not(feature = "frequency-no-relay"))]
169	let import_queue = build_import_queue(
170		client.clone(),
171		block_import.clone(),
172		config,
173		telemetry.as_ref().map(|telemetry| telemetry.handle()),
174		&task_manager,
175	);
176
177	let select_chain =
178		if instant_sealing { Some(LongestChain::new(backend.clone())) } else { None };
179
180	let params = PartialComponents {
181		backend,
182		client,
183		import_queue,
184		keystore_container,
185		task_manager,
186		transaction_pool,
187		select_chain,
188		other: (block_import, telemetry, telemetry_worker_handle),
189	};
190
191	Ok(params)
192}
193
194/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
195///
196/// This is the actual implementation that is abstract over the executor and the runtime api.
197#[allow(clippy::expect_used)]
198#[sc_tracing::logging::prefix_logs_with("Parachain")]
199#[cfg(any(not(feature = "frequency-no-relay"), feature = "frequency-lint-check"))]
200pub async fn start_parachain_node(
201	parachain_config: Configuration,
202	polkadot_config: Configuration,
203	collator_options: CollatorOptions,
204	para_id: ParaId,
205	hwbench: Option<sc_sysinfo::HwBench>,
206	override_pool_config: Option<TransactionPoolOptions>,
207) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
208	use crate::common::listen_addrs_to_normalized_strings;
209	use common_primitives::offchain::OcwCustomExt;
210	use sc_client_db::Backend;
211
212	let parachain_config = prepare_node_config(parachain_config);
213
214	let params = new_partial(&parachain_config, false, override_pool_config)?;
215	let (block_import, mut telemetry, telemetry_worker_handle) = params.other;
216
217	let prometheus_registry = parachain_config.prometheus_registry().cloned();
218	let net_config = FullNetworkConfiguration::<_, _, sc_network::NetworkWorker<Block, Hash>>::new(
219		&parachain_config.network,
220		prometheus_registry,
221	);
222
223	let client = params.client.clone();
224	let backend = params.backend.clone();
225	let mut task_manager = params.task_manager;
226
227	let (relay_chain_interface, collator_key, _relay_chain_network, _paranode_rx) =
228		build_relay_chain_interface(
229			polkadot_config,
230			&parachain_config,
231			telemetry_worker_handle,
232			&mut task_manager,
233			collator_options.clone(),
234			hwbench.clone(),
235		)
236		.await
237		.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
238
239	let validator = parachain_config.role.is_authority();
240	let prometheus_registry = parachain_config.prometheus_registry().cloned();
241	let transaction_pool = params.transaction_pool.clone();
242	let import_queue_service = params.import_queue.service();
243
244	// NOTE: because we use Aura here explicitly, we can use `CollatorSybilResistance::Resistant`
245	// when starting the network.
246	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
247		build_network(BuildNetworkParams {
248			parachain_config: &parachain_config,
249			net_config,
250			client: client.clone(),
251			transaction_pool: transaction_pool.clone(),
252			para_id,
253			spawn_handle: task_manager.spawn_handle(),
254			relay_chain_interface: relay_chain_interface.clone(),
255			import_queue: params.import_queue,
256			sybil_resistance_level: CollatorSybilResistance::Resistant,
257			metrics: sc_network::NetworkWorker::<Block, Hash>::register_notification_metrics(
258				parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
259			),
260		})
261		.await?;
262
263	// Start off-chain workers if enabled
264	if parachain_config.offchain_worker.enabled {
265		log::info!("OFFCHAIN WORKER is Enabled!");
266		start_offchain_workers(
267			&client,
268			&parachain_config,
269			Some(params.keystore_container.keystore()),
270			&backend,
271			Some(OffchainTransactionPoolFactory::new(transaction_pool.clone())),
272			Arc::new(network.clone()),
273			&task_manager,
274		);
275	}
276
277	let rpc_builder = {
278		let client = client.clone();
279		let transaction_pool = transaction_pool.clone();
280
281		let backend = match parachain_config.offchain_worker.enabled {
282			true => backend.offchain_storage(),
283			false => None,
284		};
285
286		Box::new(move |_| {
287			let deps = crate::rpc::FullDeps {
288				client: client.clone(),
289				pool: transaction_pool.clone(),
290				command_sink: None,
291			};
292
293			crate::rpc::create_full(deps, backend.clone()).map_err(Into::into)
294		})
295	};
296
297	sc_service::spawn_tasks(sc_service::SpawnTasksParams {
298		rpc_builder,
299		client: client.clone(),
300		transaction_pool: transaction_pool.clone(),
301		task_manager: &mut task_manager,
302		config: parachain_config,
303		keystore: params.keystore_container.keystore(),
304		backend: backend.clone(),
305		network: network.clone(),
306		sync_service: sync_service.clone(),
307		system_rpc_tx,
308		tx_handler_controller,
309		telemetry: telemetry.as_mut(),
310	})?;
311
312	if let Some(hwbench) = hwbench {
313		sc_sysinfo::print_hwbench(&hwbench);
314		let is_rc_authority = false;
315		// Here you can check whether the hardware meets your chains' requirements. Putting a link
316		// in there and swapping out the requirements for your own are probably a good idea. The
317		// requirements for a para-chain are dictated by its relay-chain.
318		if validator &&
319			SUBSTRATE_REFERENCE_HARDWARE.check_hardware(&hwbench, is_rc_authority).is_err()
320		{
321			log::warn!(
322				"⚠️  The hardware does not meet the minimal requirements for role 'Authority'."
323			);
324		}
325
326		if let Some(ref mut telemetry) = telemetry {
327			let telemetry_handle = telemetry.handle();
328			task_manager.spawn_handle().spawn(
329				"telemetry_hwbench",
330				None,
331				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
332			);
333		}
334	}
335
336	let announce_block = {
337		let sync_service = sync_service.clone();
338		Arc::new(move |hash, data| sync_service.announce_block(hash, data))
339	};
340
341	let relay_chain_slot_duration = Duration::from_secs(6);
342
343	let overseer_handle = relay_chain_interface
344		.overseer_handle()
345		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
346
347	start_relay_chain_tasks(StartRelayChainTasksParams {
348		client: client.clone(),
349		announce_block: announce_block.clone(),
350		para_id,
351		relay_chain_interface: relay_chain_interface.clone(),
352		task_manager: &mut task_manager,
353		da_recovery_profile: if validator {
354			DARecoveryProfile::Collator
355		} else {
356			DARecoveryProfile::FullNode
357		},
358		import_queue: import_queue_service,
359		relay_chain_slot_duration,
360		recovery_handle: Box::new(overseer_handle.clone()),
361		sync_service: sync_service.clone(),
362		prometheus_registry: prometheus_registry.as_ref(),
363	})?;
364
365	if validator {
366		start_consensus(
367			client.clone(),
368			backend.clone(),
369			block_import,
370			prometheus_registry.as_ref(),
371			telemetry.as_ref().map(|t| t.handle()),
372			&task_manager,
373			relay_chain_interface.clone(),
374			transaction_pool,
375			sync_service.clone(),
376			params.keystore_container.keystore(),
377			relay_chain_slot_duration,
378			para_id,
379			collator_key.expect("Command line arguments do not allow this. qed"),
380			overseer_handle,
381			announce_block,
382		)?;
383	}
384
385	Ok((task_manager, client))
386}
387
388/// Build the import queue for the parachain runtime.
389#[cfg(not(feature = "frequency-no-relay"))]
390/// Build the import queue for the parachain runtime.
391fn build_import_queue(
392	client: Arc<ParachainClient>,
393	block_import: ParachainBlockImport,
394	config: &Configuration,
395	telemetry: Option<TelemetryHandle>,
396	task_manager: &TaskManager,
397) -> sc_consensus::DefaultImportQueue<Block> {
398	cumulus_client_consensus_aura::equivocation_import_queue::fully_verifying_import_queue::<
399		sp_consensus_aura::sr25519::AuthorityPair,
400		_,
401		_,
402		_,
403		_,
404	>(
405		client,
406		block_import,
407		move |_, _| async move {
408			let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
409			Ok(timestamp)
410		},
411		&task_manager.spawn_essential_handle(),
412		config.prometheus_registry(),
413		telemetry,
414	)
415}
416
417#[cfg(any(not(feature = "frequency-no-relay"), feature = "frequency-lint-check"))]
418#[allow(clippy::result_large_err)]
419fn start_consensus(
420	client: Arc<ParachainClient>,
421	backend: Arc<ParachainBackend>,
422	block_import: ParachainBlockImport,
423	prometheus_registry: Option<&Registry>,
424	telemetry: Option<TelemetryHandle>,
425	task_manager: &TaskManager,
426	relay_chain_interface: Arc<dyn RelayChainInterface>,
427	transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient>>,
428	_sync_oracle: Arc<SyncingService<Block>>,
429	keystore: KeystorePtr,
430	relay_chain_slot_duration: Duration,
431	para_id: ParaId,
432	collator_key: CollatorPair,
433	overseer_handle: OverseerHandle,
434	announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
435) -> Result<(), sc_service::Error> {
436	use cumulus_client_consensus_aura::collators::lookahead::{self as aura, Params as AuraParams};
437
438	let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
439		task_manager.spawn_handle(),
440		client.clone(),
441		transaction_pool,
442		prometheus_registry,
443		telemetry.clone(),
444	);
445
446	let proposer = Proposer::new(proposer_factory);
447
448	let collator_service = CollatorService::new(
449		client.clone(),
450		Arc::new(task_manager.spawn_handle()),
451		announce_block,
452		client.clone(),
453	);
454
455	let params = AuraParams {
456		create_inherent_data_providers: move |_, ()| async move { Ok(()) },
457		block_import,
458		para_client: client.clone(),
459		para_backend: backend.clone(),
460		relay_client: relay_chain_interface,
461		code_hash_provider: move |block_hash| {
462			client.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
463		},
464		keystore,
465		collator_key,
466		para_id,
467		overseer_handle,
468		relay_chain_slot_duration,
469		proposer,
470		collator_service,
471		authoring_duration: Duration::from_millis(prod_or_testnet_or_local!(500, 2000, 2000)),
472		reinitialize: false,
473		max_pov_percentage: None, // default 50%
474	};
475
476	let fut = aura::run::<Block, sp_consensus_aura::sr25519::AuthorityPair, _, _, _, _, _, _, _, _>(
477		params,
478	);
479
480	task_manager.spawn_essential_handle().spawn("aura", None, fut);
481
482	Ok(())
483}