frequency_service/
common.rs

1#![allow(missing_docs)]
2use crate::service::ParachainClient;
3use common_primitives::{node::Block, offchain::OcwCustomExt};
4use sc_client_api::Backend;
5use sc_offchain::NetworkProvider;
6use sc_service::{config::RpcEndpoint, TFullBackend, TaskManager};
7use sc_transaction_pool_api::OffchainTransactionPoolFactory;
8use sp_keystore::KeystorePtr;
9use std::{net::SocketAddr, sync::Arc};
10
11const HTTP_PREFIX: &str = "http://";
12
13/// Normalize and convert SocketAddr to string
14pub fn listen_addrs_to_normalized_strings(addr: &Option<Vec<RpcEndpoint>>) -> Option<Vec<Vec<u8>>> {
15	match addr {
16		None => None,
17		Some(rpc_endpoints) => {
18			let endpoints = rpc_endpoints
19				.iter()
20				.map(|endpoint| {
21					let socket_addr = endpoint.listen_addr;
22					let mut address = match socket_addr {
23						SocketAddr::V4(v4) => v4.to_string(),
24						SocketAddr::V6(v6) => v6.to_string(),
25					};
26					if !address.starts_with(HTTP_PREFIX) {
27						address = format!("{HTTP_PREFIX}{address}");
28					}
29					address.into_bytes()
30				})
31				.filter(|addr| addr.len() > 0)
32				.collect();
33			Some(endpoints)
34		},
35	}
36}
37type ParachainBackend = TFullBackend<Block>;
38#[allow(clippy::expect_used)]
39pub fn start_offchain_workers(
40	client: &Arc<ParachainClient>,
41	parachain_config: &sc_service::Configuration,
42	keystore: Option<KeystorePtr>,
43	backend: &Arc<ParachainBackend>,
44	transaction_pool: Option<OffchainTransactionPoolFactory<Block>>,
45	network_provider: Arc<dyn NetworkProvider + Send + Sync>,
46	task_manager: &TaskManager,
47) {
48	use futures::FutureExt;
49	let rpc_addresses = listen_addrs_to_normalized_strings(&parachain_config.rpc.addr)
50		.expect("config.rpc.addr has invalid input");
51	let rpc_address = rpc_addresses.get(0).expect("no rpc addresses in config").clone();
52	let is_validator = parachain_config.role.is_authority();
53	let offchain_workers = sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
54		runtime_api_provider: client.clone(),
55		is_validator,
56		keystore,
57		offchain_db: backend.offchain_storage(),
58		transaction_pool,
59		network_provider,
60		enable_http_requests: true,
61		custom_extensions: move |_hash| vec![Box::new(OcwCustomExt(rpc_address.clone())) as Box<_>],
62	})
63	.expect("Could not create Offchain Worker");
64
65	// Spawn a task to handle off-chain notifications.
66	// This task is responsible for processing off-chain events or data for the blockchain.
67	task_manager.spawn_handle().spawn(
68		"offchain-workers-runner",
69		"offchain-work",
70		offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(),
71	);
72}