1use crate::{pallet::OffchainIndexEventCount, Config, Event, Pallet, PublicKeyToMsaId};
2pub use common_primitives::msa::MessageSourceId;
3use common_primitives::offchain::{
5 self as offchain_common, get_msa_account_lock_name, get_msa_account_storage_key_name,
6 LockStatus, MSA_ACCOUNT_LOCK_TIMEOUT_EXPIRATION_MS,
7};
8use frame_support::{RuntimeDebugNoBound, Twox128};
9use frame_system::pallet_prelude::BlockNumberFor;
10use parity_scale_codec::{Decode, Encode};
11use sp_core::serde::{Deserialize, Serialize};
12extern crate alloc;
13use alloc::{collections::btree_map::BTreeMap, string::String, vec, vec::Vec};
14use core::fmt::Debug;
15use frame_support::{
16 pallet_prelude::{DecodeWithMemTracking, TypeInfo},
17 StorageHasher,
18};
19use sp_io::offchain_index;
20use sp_runtime::{
21 offchain::{
22 storage::StorageValueRef,
23 storage_lock::{BlockAndTime, StorageLock, Time},
24 Duration,
25 },
26 traits::One,
27 Saturating,
28};
29
30const BLOCK_EVENT_KEY: &[u8] = b"frequency::block_event::msa::";
32const BLOCK_EVENT_FORK_AWARE_KEY: &[u8] = b"frequency::block_event_fork::msa::";
34const MAX_FORK_AWARE_BUCKET: u32 = 1000;
36const MAX_NUMBER_OF_STORAGE_CHECKS: u16 = 1000;
38const MSA_INITIAL_LOCK_TIMEOUT_EXPIRATION_MS: u64 = 6000;
40
41const MSA_INITIAL_LOCK_BLOCK_EXPIRATION_BLOCKS: u32 = 120;
43
44const MSA_INITIAL_LOCK_NAME: &[u8; 28] = b"Msa::ofw::initial-index-lock";
46
47pub const MSA_INITIAL_INDEXED_STORAGE_NAME: &[u8; 25] = b"Msa::ofw::initial-indexed";
49
50const LAST_PROCESSED_BLOCK_LOCK_NAME: &[u8; 35] = b"Msa::ofw::last-processed-block-lock";
52
53pub const LAST_PROCESSED_BLOCK_STORAGE_NAME: &[u8; 30] = b"Msa::ofw::last-processed-block";
55
56const LAST_PROCESSED_BLOCK_LOCK_TIMEOUT_EXPIRATION_MS: u64 = 5000;
58
59const LAST_PROCESSED_BLOCK_LOCK_BLOCK_EXPIRATION_BLOCKS: u32 = 20;
61
62const NUMBER_OF_PREVIOUS_BLOCKS_TO_CHECK: u32 = 5u32;
64
65const NUMBER_OF_BLOCKS_TO_EXPLORE: u32 = 1000;
67
68pub const HTTP_REQUEST_DEADLINE_MS: u64 = 2000;
70
71pub const RPC_FINALIZED_BLOCK_REQUEST_URL: &str = "http://localhost:9944";
74pub const RPC_FINALIZED_BLOCK_REQUEST_BODY: &[u8; 78] =
76 b"{\"id\": 10, \"jsonrpc\": \"2.0\", \"method\": \"chain_getFinalizedHead\", \"params\": []}";
77
78#[derive(
80 TypeInfo, RuntimeDebugNoBound, Clone, Decode, DecodeWithMemTracking, Encode, PartialEq, Eq,
81)]
82#[scale_info(skip_type_params(T))]
83pub enum OffchainReplayEvent<T: Config> {
84 MsaPallet(MsaOffchainReplayEvent<T>),
86}
87#[derive(
89 TypeInfo, RuntimeDebugNoBound, Clone, Decode, DecodeWithMemTracking, Encode, PartialEq, Eq,
90)]
91#[scale_info(skip_type_params(T))]
92pub enum MsaOffchainReplayEvent<T: Config> {
93 KeyReIndex {
95 msa_id: MessageSourceId,
97 index_key: Option<T::AccountId>,
99 },
100}
101
102pub fn do_offchain_worker<T: Config>(block_number: BlockNumberFor<T>) {
104 if let Some(finalized_block_number) = get_finalized_block_number::<T>(block_number) {
105 match offchain_index_initial_state::<T>(finalized_block_number) {
106 LockStatus::Locked => {
107 log::info!("initiating-index is still locked in {block_number:?}");
108 },
109 LockStatus::Released => {
110 apply_offchain_events::<T>(finalized_block_number);
111 },
112 }
113 };
114}
115pub fn offchain_index_event<T: Config>(event: Option<&Event<T>>, msa_id: MessageSourceId) {
117 if let Some(event) = IndexedEvent::map(event, msa_id) {
118 let block_number: u32 =
119 <frame_system::Pallet<T>>::block_number().try_into().unwrap_or_default();
120 let current_event_count: u16 = <OffchainIndexEventCount<T>>::get().saturating_add(1);
121 <OffchainIndexEventCount<T>>::put(current_event_count);
122 let event_key = get_indexed_event_key(block_number, current_event_count);
123 set_offchain_index(&event_key, event.clone());
125
126 let fork_aware_key = get_fork_aware_event_key(block_number, get_bucket_number(&event));
129
130 set_offchain_index(&fork_aware_key, event);
131 }
132}
133
134fn offchain_index_initial_state<T: Config>(block_number: BlockNumberFor<T>) -> LockStatus {
137 let mut lock = StorageLock::<BlockAndTime<Pallet<T>>>::with_block_and_time_deadline(
138 MSA_INITIAL_LOCK_NAME,
139 MSA_INITIAL_LOCK_BLOCK_EXPIRATION_BLOCKS,
140 Duration::from_millis(MSA_INITIAL_LOCK_TIMEOUT_EXPIRATION_MS),
141 );
142 if let Ok(mut guard) = lock.try_lock() {
143 let processed_storage = StorageValueRef::persistent(MSA_INITIAL_INDEXED_STORAGE_NAME);
144 let is_initial_indexed = processed_storage.get::<bool>().unwrap_or(None);
145
146 if !is_initial_indexed.unwrap_or_default() {
147 log::info!("Msa::ofw::initial-indexed is {is_initial_indexed:?}");
148
149 init_last_processed_block::<T>(block_number);
152
153 let mut counter = 0u64;
154 for (account_id, msa_id) in PublicKeyToMsaId::<T>::iter() {
155 process_offchain_events::<T>(
156 msa_id,
157 vec![IndexedEvent::IndexedPublicKeyAdded { key: account_id, msa_id }],
158 );
159
160 counter += 1;
162 if counter % 1000 == 0 {
163 log::info!("Added {counter} more keys!");
164 if guard.extend_lock().is_err() {
165 log::warn!("lock is expired in block {block_number:?}");
166 return LockStatus::Released;
167 }
168 }
169 }
170
171 processed_storage.set(&true);
172 log::info!("Finished adding {counter} keys!");
173 }
174 } else {
175 return LockStatus::Locked;
176 };
177 LockStatus::Released
178}
179
180fn apply_offchain_events<T: Config>(block_number: BlockNumberFor<T>) {
182 let mut lock = StorageLock::<BlockAndTime<Pallet<T>>>::with_block_and_time_deadline(
183 LAST_PROCESSED_BLOCK_LOCK_NAME,
184 LAST_PROCESSED_BLOCK_LOCK_BLOCK_EXPIRATION_BLOCKS,
185 Duration::from_millis(LAST_PROCESSED_BLOCK_LOCK_TIMEOUT_EXPIRATION_MS),
186 );
187
188 if let Ok(mut guard) = lock.try_lock() {
189 log::info!("processing events in {block_number:?}");
190
191 let last_processed_block_storage =
192 StorageValueRef::persistent(LAST_PROCESSED_BLOCK_STORAGE_NAME);
193 let default_starting_block_number = block_number
194 .saturating_sub(BlockNumberFor::<T>::from(NUMBER_OF_PREVIOUS_BLOCKS_TO_CHECK));
195 let mut start_block_number = last_processed_block_storage
196 .get::<BlockNumberFor<T>>()
197 .unwrap_or(Some(default_starting_block_number))
198 .unwrap_or(default_starting_block_number);
199
200 start_block_number += BlockNumberFor::<T>::one();
202 while start_block_number <= block_number {
203 if reverse_map_msa_keys::<T>(start_block_number) && guard.extend_lock().is_err() {
204 log::warn!("last processed block lock is expired in block {block_number:?}");
205 break;
206 }
207 last_processed_block_storage.set(&start_block_number);
208 start_block_number += BlockNumberFor::<T>::one();
209 }
210 } else {
211 log::info!("skip processing events on {block_number:?} due to existing lock!");
212 };
213}
214
215fn set_offchain_index<V>(key: &[u8], value: V)
217where
218 V: Encode + Clone + Decode + Eq + Debug,
219{
220 offchain_index::set(key, value.encode().as_slice());
221}
222
223fn get_offchain_index<V>(key: &[u8]) -> Option<V>
225where
226 V: Encode + Clone + Decode + Eq + Debug,
227{
228 let value = offchain_common::get_index_value::<V>(key);
229 value.unwrap_or_else(|e| {
230 log::error!("Error getting offchain index value: {e:?}");
231 None
232 })
233}
234
235#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebugNoBound)]
237pub enum IndexedEvent<T: Config> {
238 IndexedMsaCreated {
240 msa_id: MessageSourceId,
242
243 key: T::AccountId,
245 },
246 IndexedPublicKeyAdded {
248 msa_id: MessageSourceId,
250
251 key: T::AccountId,
253 },
254 IndexedPublicKeyDeleted {
256 msa_id: MessageSourceId,
258 key: T::AccountId,
260 },
261 MsaIndexInvalidated {
263 msa_id: MessageSourceId,
265 },
266}
267
268impl<T: Config> IndexedEvent<T> {
269 pub fn map(event: Option<&Event<T>>, event_msa_id: MessageSourceId) -> Option<Self> {
271 match event {
272 Some(Event::MsaCreated { msa_id, key }) =>
273 Some(Self::IndexedMsaCreated { msa_id: *msa_id, key: key.clone() }),
274 Some(Event::PublicKeyAdded { msa_id, key }) =>
275 Some(Self::IndexedPublicKeyAdded { msa_id: *msa_id, key: key.clone() }),
276 Some(Event::PublicKeyDeleted { key }) =>
277 Some(Self::IndexedPublicKeyDeleted { msa_id: event_msa_id, key: key.clone() }),
278 None => Some(Self::MsaIndexInvalidated { msa_id: event_msa_id }),
279 _ => None,
280 }
281 }
282}
283
284fn init_last_processed_block<T: Config>(current_block_number: BlockNumberFor<T>) {
286 let mut last_processed_block_lock = StorageLock::<'_, Time>::with_deadline(
287 LAST_PROCESSED_BLOCK_LOCK_NAME,
288 Duration::from_millis(LAST_PROCESSED_BLOCK_LOCK_TIMEOUT_EXPIRATION_MS),
289 );
290 let _ = last_processed_block_lock.lock();
291 let last_processed_block_storage =
292 StorageValueRef::persistent(LAST_PROCESSED_BLOCK_STORAGE_NAME);
293
294 let target_block: BlockNumberFor<T> =
296 current_block_number.saturating_sub(BlockNumberFor::<T>::one());
297 last_processed_block_storage.set(&target_block);
298}
299
300fn read_offchain_events<T: Config>(
301 block_number: BlockNumberFor<T>,
302) -> Vec<(IndexedEvent<T>, Vec<u8>)> {
303 let current_block: u32 = block_number.try_into().unwrap_or_default();
304 let mut events = vec![];
305
306 for i in 1..=MAX_NUMBER_OF_STORAGE_CHECKS {
307 let key = get_indexed_event_key(current_block, i);
308 match get_offchain_index::<IndexedEvent<T>>(&key) {
309 Some(decoded_event) => {
310 events.push((decoded_event, key));
311 },
312 None => {
313 break;
315 },
316 }
317 }
318
319 for i in 1u16..=MAX_FORK_AWARE_BUCKET.try_into().unwrap_or_default() {
320 let key = get_fork_aware_event_key(current_block, i);
321 if let Some(decoded_event) = get_offchain_index::<IndexedEvent<T>>(&key) {
322 events.push((decoded_event, key));
323 }
324 }
325 events
326}
327
328fn clean_offchain_events(storage_keys: &Vec<Vec<u8>>) {
330 for key in storage_keys {
331 offchain_index::clear(key);
332 }
333}
334
335fn reverse_map_msa_keys<T: Config>(block_number: BlockNumberFor<T>) -> bool {
338 let events_to_process: Vec<(IndexedEvent<T>, Vec<u8>)> = read_offchain_events(block_number);
340 let events_exists = !events_to_process.is_empty();
341 if events_exists {
342 log::info!(
343 "found {} double indexed events for block {:?}",
344 events_to_process.len(),
345 block_number
346 );
347 }
348
349 let mut events_by_msa_id: BTreeMap<MessageSourceId, Vec<IndexedEvent<T>>> = BTreeMap::new();
351
352 for (event, _) in events_to_process.iter() {
354 match event {
355 IndexedEvent::IndexedPublicKeyAdded { msa_id, .. } |
356 IndexedEvent::IndexedMsaCreated { msa_id, .. } |
357 IndexedEvent::IndexedPublicKeyDeleted { msa_id, .. } |
358 IndexedEvent::MsaIndexInvalidated { msa_id } => {
359 let events = events_by_msa_id.entry(*msa_id).or_default();
360 events.push(event.clone());
361 },
362 }
363 }
364
365 for (msa_id, events) in events_by_msa_id {
367 if !events.is_empty() {
368 process_offchain_events(msa_id, events);
369 }
370 }
371
372 if events_exists {
373 let storage_keys = events_to_process.iter().map(|(_, key)| key.clone()).collect();
374 clean_offchain_events(&storage_keys);
375 }
376
377 events_exists
378}
379
380fn process_offchain_events<T: Config>(msa_id: MessageSourceId, events: Vec<IndexedEvent<T>>) {
381 let msa_lock_name = get_msa_account_lock_name(msa_id);
384 let mut msa_lock = StorageLock::<'_, Time>::with_deadline(
385 &msa_lock_name,
386 Duration::from_millis(MSA_ACCOUNT_LOCK_TIMEOUT_EXPIRATION_MS),
387 );
388 let _lock = msa_lock.lock();
389 let msa_storage_name = get_msa_account_storage_key_name(msa_id);
390 let mut msa_storage = StorageValueRef::persistent(&msa_storage_name);
391
392 let mut msa_keys = msa_storage.get::<Vec<T::AccountId>>().unwrap_or(None).unwrap_or_default();
393 let mut old_msa_keys = msa_keys.clone();
394 let mut changed = false;
395
396 for event in events {
397 match &event {
398 IndexedEvent::IndexedPublicKeyAdded { key, .. } |
399 IndexedEvent::IndexedMsaCreated { key, .. } => {
400 if let Some(on_chain_msa_id) = PublicKeyToMsaId::<T>::get(key) {
401 if on_chain_msa_id != msa_id {
402 log::warn!(
403 "{key:?} forked onchain-MsaId={on_chain_msa_id:?}, forked-MsaId=={msa_id:?}",
404 );
405 } else if !msa_keys.contains(key) {
406 msa_keys.push(key.clone());
407 changed = true;
408 }
409 }
410 },
411 IndexedEvent::IndexedPublicKeyDeleted { key, .. } => {
412 if PublicKeyToMsaId::<T>::get(key).is_none() && msa_keys.contains(key) {
413 msa_keys.retain(|k| k != key);
414 old_msa_keys.retain(|k| k != key);
415 changed = true;
416 }
417 },
418 IndexedEvent::MsaIndexInvalidated { .. } => {
419 },
421 }
422 }
423
424 for old_key in &old_msa_keys {
426 match PublicKeyToMsaId::<T>::get(old_key) {
427 Some(on_chain_msa_id) if on_chain_msa_id == msa_id => {
428 },
430 _ => {
431 msa_keys.retain(|k| k != old_key);
432 changed = true;
433 },
434 }
435 }
436
437 if changed {
438 if msa_keys.len() > 0 {
439 msa_storage.set(&msa_keys);
440 } else {
441 msa_storage.clear();
442 }
443 }
444}
445#[derive(Serialize, Deserialize, Encode, Decode, Default, Debug)]
447pub struct FinalizedBlockResponse {
448 pub result: String,
450}
451
452fn fetch_finalized_block_hash<T: Config>() -> Result<T::Hash, sp_runtime::offchain::http::Error> {
454 let rpc_address_bytes: Vec<u8> = if cfg!(feature = "runtime-benchmarks") {
457 RPC_FINALIZED_BLOCK_REQUEST_URL.into()
458 } else {
459 let legacy_val = common_primitives::offchain::custom::get_val();
462 if legacy_val.is_some() {
463 legacy_val.unwrap_or(RPC_FINALIZED_BLOCK_REQUEST_URL.into())
464 } else {
465 let mut buffer = vec![0u8; 256];
467 let len = common_primitives::offchain::custom::get_val_buffered(&mut buffer);
468 if len == 0 {
469 RPC_FINALIZED_BLOCK_REQUEST_URL.into()
470 } else {
471 match Vec::<u8>::decode(&mut &buffer[..len as usize]) {
472 Ok(v) if !v.is_empty() => v,
473 _ => RPC_FINALIZED_BLOCK_REQUEST_URL.into(),
474 }
475 }
476 }
477 };
478 let url = core::str::from_utf8(&rpc_address_bytes)
479 .map_err(|_| sp_runtime::offchain::http::Error::Unknown)?;
480 let deadline =
485 sp_io::offchain::timestamp().add(Duration::from_millis(HTTP_REQUEST_DEADLINE_MS));
486 let body = vec![RPC_FINALIZED_BLOCK_REQUEST_BODY];
487 let request = sp_runtime::offchain::http::Request::post(url, body);
488 let pending = request
489 .add_header("Content-Type", "application/json")
490 .deadline(deadline)
491 .send()
492 .map_err(|_| sp_runtime::offchain::http::Error::IoError)?;
493
494 let response = pending
495 .try_wait(deadline)
496 .map_err(|_| sp_runtime::offchain::http::Error::DeadlineReached)??;
497 if response.code != 200 {
499 log::warn!("Unexpected status code: {}", response.code);
500 return Err(sp_runtime::offchain::http::Error::Unknown);
501 }
502
503 let body = response.body().collect::<Vec<u8>>();
507
508 let body_str = core::str::from_utf8(&body).map_err(|_| {
510 log::warn!("No UTF8 body");
511 sp_runtime::offchain::http::Error::Unknown
512 })?;
513
514 log::debug!("{body_str}");
515 let finalized_block_response: FinalizedBlockResponse =
516 serde_json::from_str(body_str).map_err(|_| sp_runtime::offchain::http::Error::Unknown)?;
517
518 let decoded_from_hex = hex::decode(&finalized_block_response.result[2..])
520 .map_err(|_| sp_runtime::offchain::http::Error::Unknown)?;
521
522 let val = T::Hash::decode(&mut &decoded_from_hex[..])
523 .map_err(|_| sp_runtime::offchain::http::Error::Unknown)?;
524 Ok(val)
525}
526
527fn get_finalized_block_number<T: Config>(
529 current_block: BlockNumberFor<T>,
530) -> Option<BlockNumberFor<T>> {
531 let mut finalized_block_number = None;
532 let last_finalized_hash = match fetch_finalized_block_hash::<T>() {
533 Ok(hash) => hash,
534 Err(e) => {
535 log::error!("failure to get the finalized hash {e:?}");
536 return finalized_block_number;
537 },
538 };
539
540 let mut current_block_number = current_block;
542 let last_block_number =
543 current_block.saturating_sub(BlockNumberFor::<T>::from(NUMBER_OF_BLOCKS_TO_EXPLORE));
544 while current_block_number > last_block_number {
545 if last_finalized_hash == frame_system::Pallet::<T>::block_hash(current_block_number) {
546 finalized_block_number = Some(current_block_number);
547 break;
548 }
549 current_block_number.saturating_dec();
550 }
551
552 match finalized_block_number {
553 None => {
554 log::error!(
555 "Not able to find any imported block with {last_finalized_hash:?} hash and {current_block:?} block",
556 );
557 },
558 Some(inner) => {
559 log::info!("last finalized block number {inner:?} and hash {last_finalized_hash:?}",);
560 },
561 }
562 finalized_block_number
563}
564
565#[allow(clippy::precedence)]
567pub fn get_bucket_number<T: Config>(event: &IndexedEvent<T>) -> u16 {
568 let hashed = Twox128::hash(&event.encode());
569 let num = (hashed[0] as u32) << 24 |
571 (hashed[1] as u32) << 16 |
572 (hashed[2] as u32) << 8 |
573 (hashed[3] as u32);
574
575 ((num % MAX_FORK_AWARE_BUCKET) + 1u32) as u16
576}
577
578fn get_fork_aware_event_key(block_number: u32, event_index: u16) -> Vec<u8> {
579 [BLOCK_EVENT_FORK_AWARE_KEY, block_number.encode().as_slice(), event_index.encode().as_slice()]
580 .concat()
581}
582
583fn get_indexed_event_key(block_number: u32, event_index: u16) -> Vec<u8> {
584 [BLOCK_EVENT_KEY, block_number.encode().as_slice(), event_index.encode().as_slice()].concat()
585}