frequency_service/
common.rs1#![allow(missing_docs)]
2use crate::service::ParachainClient;
3use common_primitives::{node::Block, offchain::OcwCustomExt};
4use sc_client_api::Backend;
5use sc_offchain::NetworkProvider;
6use sc_service::{config::RpcEndpoint, TFullBackend, TaskManager};
7use sc_transaction_pool_api::OffchainTransactionPoolFactory;
8use sp_keystore::KeystorePtr;
9use std::{net::SocketAddr, sync::Arc};
10
11const HTTP_PREFIX: &str = "http://";
12
13pub fn listen_addrs_to_normalized_strings(addr: &Option<Vec<RpcEndpoint>>) -> Option<Vec<Vec<u8>>> {
15 match addr {
16 None => None,
17 Some(rpc_endpoints) => {
18 let endpoints = rpc_endpoints
19 .iter()
20 .map(|endpoint| {
21 let socket_addr = endpoint.listen_addr;
22 let mut address = match socket_addr {
23 SocketAddr::V4(v4) => v4.to_string(),
24 SocketAddr::V6(v6) => v6.to_string(),
25 };
26 if !address.starts_with(HTTP_PREFIX) {
27 address = format!("{HTTP_PREFIX}{address}");
28 }
29 address.into_bytes()
30 })
31 .filter(|addr| addr.len() > 0)
32 .collect();
33 Some(endpoints)
34 },
35 }
36}
37type ParachainBackend = TFullBackend<Block>;
38#[allow(clippy::expect_used)]
39pub fn start_offchain_workers(
40 client: &Arc<ParachainClient>,
41 parachain_config: &sc_service::Configuration,
42 keystore: Option<KeystorePtr>,
43 backend: &Arc<ParachainBackend>,
44 transaction_pool: Option<OffchainTransactionPoolFactory<Block>>,
45 network_provider: Arc<dyn NetworkProvider + Send + Sync>,
46 task_manager: &TaskManager,
47) {
48 use futures::FutureExt;
49 let rpc_addresses = listen_addrs_to_normalized_strings(¶chain_config.rpc.addr)
50 .expect("config.rpc.addr has invalid input");
51 let rpc_address = rpc_addresses.get(0).expect("no rpc addresses in config").clone();
52 let is_validator = parachain_config.role.is_authority();
53 let offchain_workers = sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
54 runtime_api_provider: client.clone(),
55 is_validator,
56 keystore,
57 offchain_db: backend.offchain_storage(),
58 transaction_pool,
59 network_provider,
60 enable_http_requests: true,
61 custom_extensions: move |_hash| vec![Box::new(OcwCustomExt(rpc_address.clone())) as Box<_>],
62 })
63 .expect("Could not create Offchain Worker");
64
65 task_manager.spawn_handle().spawn(
68 "offchain-workers-runner",
69 "offchain-work",
70 offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(),
71 );
72}