1#![allow(unused_imports)]
2use cumulus_client_cli::CollatorOptions;
8use frequency_runtime::RuntimeApi;
9use sc_client_api::Backend;
10use std::{ptr::addr_eq, sync::Arc, time::Duration};
11
12use common_primitives::node::{AccountId, Balance, Block, Hash, Index as Nonce};
14use jsonrpsee::RpcModule;
15use substrate_prometheus_endpoint::Registry;
16
17use cumulus_client_collator::service::CollatorService;
19use cumulus_client_consensus_aura::{AuraConsensus, SlotProportion};
20use cumulus_client_consensus_common::{
21 ParachainBlockImport as TParachainBlockImport, ParachainConsensus,
22};
23use cumulus_client_consensus_proposer::Proposer;
24use cumulus_client_network::RequireSecondedInBlockAnnounce;
25use cumulus_client_service::{
26 build_network, build_relay_chain_interface, prepare_node_config, start_relay_chain_tasks,
27 BuildNetworkParams, CollatorSybilResistance, DARecoveryProfile, StartCollatorParams,
28 StartFullNodeParams, StartRelayChainTasksParams,
29};
30use cumulus_primitives_core::{
31 relay_chain::{CollatorPair, ValidationCode},
32 ParaId,
33};
34use cumulus_relay_chain_interface::{OverseerHandle, RelayChainError, RelayChainInterface};
35use sc_service::TransactionPoolOptions;
36
37use frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE;
39use futures::FutureExt;
40use sc_consensus::{ImportQueue, LongestChain};
41use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
42
43use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock, NetworkService};
44use sc_network_sync::SyncingService;
45use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager};
46use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
47use sc_transaction_pool_api::OffchainTransactionPoolFactory;
48use sp_blockchain::HeaderBackend;
49use sp_keystore::KeystorePtr;
50
51use common_runtime::prod_or_testnet_or_local;
52
53type FullBackend = TFullBackend<Block>;
54
55type MaybeFullSelectChain = Option<LongestChain<FullBackend, Block>>;
56
57#[cfg(not(any(feature = "runtime-benchmarks", feature = "no-custom-host-functions")))]
58type HostFunctions = (
59 cumulus_client_service::ParachainHostFunctions,
60 common_primitives::offchain::custom::HostFunctions,
61);
62
63#[cfg(feature = "runtime-benchmarks")]
64type HostFunctions = (
65 cumulus_client_service::ParachainHostFunctions,
66 frame_benchmarking::benchmarking::HostFunctions,
67 common_primitives::offchain::custom::HostFunctions,
68);
69
70#[cfg(feature = "no-custom-host-functions")]
71type HostFunctions = (cumulus_client_service::ParachainHostFunctions,);
72
73use crate::common::start_offchain_workers;
74pub use frequency_runtime;
75
76type ParachainExecutor = WasmExecutor<HostFunctions>;
77
78pub type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
80
81type ParachainBackend = TFullBackend<Block>;
82
83type ParachainBlockImport = TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
84
85#[allow(deprecated, clippy::result_large_err)]
90pub fn new_partial(
91 config: &Configuration,
92 instant_sealing: bool,
93 override_pool_config: Option<TransactionPoolOptions>,
94) -> Result<
95 PartialComponents<
96 ParachainClient,
97 ParachainBackend,
98 MaybeFullSelectChain,
99 sc_consensus::DefaultImportQueue<Block>,
100 sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient>,
101 (ParachainBlockImport, Option<Telemetry>, Option<TelemetryWorkerHandle>),
102 >,
103 sc_service::Error,
104> {
105 let telemetry = config
106 .telemetry_endpoints
107 .clone()
108 .filter(|x| !x.is_empty())
109 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
110 let worker = TelemetryWorker::new(16)?;
111 let telemetry = worker.handle().new_telemetry(endpoints);
112 Ok((worker, telemetry))
113 })
114 .transpose()?;
115
116 let heap_pages = config
117 .executor
118 .default_heap_pages
119 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static { extra_pages: h as _ });
120
121 let executor = ParachainExecutor::builder()
122 .with_execution_method(config.executor.wasm_method)
123 .with_onchain_heap_alloc_strategy(heap_pages)
124 .with_offchain_heap_alloc_strategy(heap_pages)
125 .with_max_runtime_instances(config.executor.max_runtime_instances)
126 .with_runtime_cache_size(config.executor.runtime_cache_size)
127 .build();
128
129 let (client, backend, keystore_container, task_manager) =
130 sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
131 config,
132 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
133 executor,
134 true,
135 )?;
136 let client = Arc::new(client);
137
138 let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
139
140 let telemetry = telemetry.map(|(worker, telemetry)| {
141 task_manager.spawn_handle().spawn("telemetry", None, worker.run());
142 telemetry
143 });
144
145 let transaction_pool_option = override_pool_config.unwrap_or(config.transaction_pool.clone());
146
147 let transaction_pool = Arc::from(
149 sc_transaction_pool::Builder::new(
150 task_manager.spawn_essential_handle(),
151 client.clone(),
152 config.role.is_authority().into(),
153 )
154 .with_options(transaction_pool_option)
155 .with_prometheus(config.prometheus_registry())
156 .build(),
157 );
158
159 let block_import = ParachainBlockImport::new(client.clone(), backend.clone());
160
161 #[cfg(feature = "frequency-no-relay")]
162 let import_queue = sc_consensus_manual_seal::import_queue(
163 Box::new(client.clone()),
164 &task_manager.spawn_essential_handle(),
165 config.prometheus_registry(),
166 );
167
168 #[cfg(not(feature = "frequency-no-relay"))]
169 let import_queue = build_import_queue(
170 client.clone(),
171 block_import.clone(),
172 config,
173 telemetry.as_ref().map(|telemetry| telemetry.handle()),
174 &task_manager,
175 );
176
177 let select_chain =
178 if instant_sealing { Some(LongestChain::new(backend.clone())) } else { None };
179
180 let params = PartialComponents {
181 backend,
182 client,
183 import_queue,
184 keystore_container,
185 task_manager,
186 transaction_pool,
187 select_chain,
188 other: (block_import, telemetry, telemetry_worker_handle),
189 };
190
191 Ok(params)
192}
193
194#[allow(clippy::expect_used)]
198#[sc_tracing::logging::prefix_logs_with("Parachain")]
199#[cfg(any(not(feature = "frequency-no-relay"), feature = "frequency-lint-check"))]
200pub async fn start_parachain_node(
201 parachain_config: Configuration,
202 polkadot_config: Configuration,
203 collator_options: CollatorOptions,
204 para_id: ParaId,
205 hwbench: Option<sc_sysinfo::HwBench>,
206 override_pool_config: Option<TransactionPoolOptions>,
207) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
208 use crate::common::listen_addrs_to_normalized_strings;
209 use common_primitives::offchain::OcwCustomExt;
210 use sc_client_db::Backend;
211
212 let parachain_config = prepare_node_config(parachain_config);
213
214 let params = new_partial(¶chain_config, false, override_pool_config)?;
215 let (block_import, mut telemetry, telemetry_worker_handle) = params.other;
216
217 let prometheus_registry = parachain_config.prometheus_registry().cloned();
218 let net_config = FullNetworkConfiguration::<_, _, sc_network::NetworkWorker<Block, Hash>>::new(
219 ¶chain_config.network,
220 prometheus_registry,
221 );
222
223 let client = params.client.clone();
224 let backend = params.backend.clone();
225 let mut task_manager = params.task_manager;
226
227 let (relay_chain_interface, collator_key, _relay_chain_network, _paranode_rx) =
228 build_relay_chain_interface(
229 polkadot_config,
230 ¶chain_config,
231 telemetry_worker_handle,
232 &mut task_manager,
233 collator_options.clone(),
234 hwbench.clone(),
235 )
236 .await
237 .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
238
239 let validator = parachain_config.role.is_authority();
240 let prometheus_registry = parachain_config.prometheus_registry().cloned();
241 let transaction_pool = params.transaction_pool.clone();
242 let import_queue_service = params.import_queue.service();
243
244 let (network, system_rpc_tx, tx_handler_controller, sync_service) =
247 build_network(BuildNetworkParams {
248 parachain_config: ¶chain_config,
249 net_config,
250 client: client.clone(),
251 transaction_pool: transaction_pool.clone(),
252 para_id,
253 spawn_handle: task_manager.spawn_handle(),
254 relay_chain_interface: relay_chain_interface.clone(),
255 import_queue: params.import_queue,
256 sybil_resistance_level: CollatorSybilResistance::Resistant,
257 metrics: sc_network::NetworkWorker::<Block, Hash>::register_notification_metrics(
258 parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
259 ),
260 })
261 .await?;
262
263 if parachain_config.offchain_worker.enabled {
265 log::info!("OFFCHAIN WORKER is Enabled!");
266 start_offchain_workers(
267 &client,
268 ¶chain_config,
269 Some(params.keystore_container.keystore()),
270 &backend,
271 Some(OffchainTransactionPoolFactory::new(transaction_pool.clone())),
272 Arc::new(network.clone()),
273 &task_manager,
274 );
275 }
276
277 let rpc_builder = {
278 let client = client.clone();
279 let transaction_pool = transaction_pool.clone();
280
281 let backend = match parachain_config.offchain_worker.enabled {
282 true => backend.offchain_storage(),
283 false => None,
284 };
285
286 Box::new(move |_| {
287 let deps = crate::rpc::FullDeps {
288 client: client.clone(),
289 pool: transaction_pool.clone(),
290 command_sink: None,
291 };
292
293 crate::rpc::create_full(deps, backend.clone()).map_err(Into::into)
294 })
295 };
296
297 sc_service::spawn_tasks(sc_service::SpawnTasksParams {
298 rpc_builder,
299 client: client.clone(),
300 transaction_pool: transaction_pool.clone(),
301 task_manager: &mut task_manager,
302 config: parachain_config,
303 keystore: params.keystore_container.keystore(),
304 backend: backend.clone(),
305 network: network.clone(),
306 sync_service: sync_service.clone(),
307 system_rpc_tx,
308 tx_handler_controller,
309 telemetry: telemetry.as_mut(),
310 })?;
311
312 if let Some(hwbench) = hwbench {
313 sc_sysinfo::print_hwbench(&hwbench);
314 let is_rc_authority = false;
315 if validator &&
319 SUBSTRATE_REFERENCE_HARDWARE.check_hardware(&hwbench, is_rc_authority).is_err()
320 {
321 log::warn!(
322 "⚠️ The hardware does not meet the minimal requirements for role 'Authority'."
323 );
324 }
325
326 if let Some(ref mut telemetry) = telemetry {
327 let telemetry_handle = telemetry.handle();
328 task_manager.spawn_handle().spawn(
329 "telemetry_hwbench",
330 None,
331 sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
332 );
333 }
334 }
335
336 let announce_block = {
337 let sync_service = sync_service.clone();
338 Arc::new(move |hash, data| sync_service.announce_block(hash, data))
339 };
340
341 let relay_chain_slot_duration = Duration::from_secs(6);
342
343 let overseer_handle = relay_chain_interface
344 .overseer_handle()
345 .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
346
347 start_relay_chain_tasks(StartRelayChainTasksParams {
348 client: client.clone(),
349 announce_block: announce_block.clone(),
350 para_id,
351 relay_chain_interface: relay_chain_interface.clone(),
352 task_manager: &mut task_manager,
353 da_recovery_profile: if validator {
354 DARecoveryProfile::Collator
355 } else {
356 DARecoveryProfile::FullNode
357 },
358 import_queue: import_queue_service,
359 relay_chain_slot_duration,
360 recovery_handle: Box::new(overseer_handle.clone()),
361 sync_service: sync_service.clone(),
362 prometheus_registry: prometheus_registry.as_ref(),
363 })?;
364
365 if validator {
366 start_consensus(
367 client.clone(),
368 backend.clone(),
369 block_import,
370 prometheus_registry.as_ref(),
371 telemetry.as_ref().map(|t| t.handle()),
372 &task_manager,
373 relay_chain_interface.clone(),
374 transaction_pool,
375 sync_service.clone(),
376 params.keystore_container.keystore(),
377 relay_chain_slot_duration,
378 para_id,
379 collator_key.expect("Command line arguments do not allow this. qed"),
380 overseer_handle,
381 announce_block,
382 )?;
383 }
384
385 Ok((task_manager, client))
386}
387
388#[cfg(not(feature = "frequency-no-relay"))]
390fn build_import_queue(
392 client: Arc<ParachainClient>,
393 block_import: ParachainBlockImport,
394 config: &Configuration,
395 telemetry: Option<TelemetryHandle>,
396 task_manager: &TaskManager,
397) -> sc_consensus::DefaultImportQueue<Block> {
398 cumulus_client_consensus_aura::equivocation_import_queue::fully_verifying_import_queue::<
399 sp_consensus_aura::sr25519::AuthorityPair,
400 _,
401 _,
402 _,
403 _,
404 >(
405 client,
406 block_import,
407 move |_, _| async move {
408 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
409 Ok(timestamp)
410 },
411 &task_manager.spawn_essential_handle(),
412 config.prometheus_registry(),
413 telemetry,
414 )
415}
416
417#[cfg(any(not(feature = "frequency-no-relay"), feature = "frequency-lint-check"))]
418#[allow(clippy::result_large_err)]
419fn start_consensus(
420 client: Arc<ParachainClient>,
421 backend: Arc<ParachainBackend>,
422 block_import: ParachainBlockImport,
423 prometheus_registry: Option<&Registry>,
424 telemetry: Option<TelemetryHandle>,
425 task_manager: &TaskManager,
426 relay_chain_interface: Arc<dyn RelayChainInterface>,
427 transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient>>,
428 _sync_oracle: Arc<SyncingService<Block>>,
429 keystore: KeystorePtr,
430 relay_chain_slot_duration: Duration,
431 para_id: ParaId,
432 collator_key: CollatorPair,
433 overseer_handle: OverseerHandle,
434 announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
435) -> Result<(), sc_service::Error> {
436 use cumulus_client_consensus_aura::collators::lookahead::{self as aura, Params as AuraParams};
437
438 let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
439 task_manager.spawn_handle(),
440 client.clone(),
441 transaction_pool,
442 prometheus_registry,
443 telemetry.clone(),
444 );
445
446 let proposer = Proposer::new(proposer_factory);
447
448 let collator_service = CollatorService::new(
449 client.clone(),
450 Arc::new(task_manager.spawn_handle()),
451 announce_block,
452 client.clone(),
453 );
454
455 let params = AuraParams {
456 create_inherent_data_providers: move |_, ()| async move { Ok(()) },
457 block_import,
458 para_client: client.clone(),
459 para_backend: backend.clone(),
460 relay_client: relay_chain_interface,
461 code_hash_provider: move |block_hash| {
462 client.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
463 },
464 keystore,
465 collator_key,
466 para_id,
467 overseer_handle,
468 relay_chain_slot_duration,
469 proposer,
470 collator_service,
471 authoring_duration: Duration::from_millis(prod_or_testnet_or_local!(500, 2000, 2000)),
472 reinitialize: false,
473 max_pov_percentage: None, };
475
476 let fut = aura::run::<Block, sp_consensus_aura::sr25519::AuthorityPair, _, _, _, _, _, _, _, _>(
477 params,
478 );
479
480 task_manager.spawn_essential_handle().spawn("aura", None, fut);
481
482 Ok(())
483}