pallet_msa/
offchain_storage.rs

1use crate::{pallet::OffchainIndexEventCount, Config, Event, Pallet, PublicKeyToMsaId};
2pub use common_primitives::msa::MessageSourceId;
3/// Offchain Storage for MSA
4use common_primitives::offchain::{
5	self as offchain_common, get_msa_account_lock_name, get_msa_account_storage_key_name,
6	LockStatus, MSA_ACCOUNT_LOCK_TIMEOUT_EXPIRATION_MS,
7};
8use frame_support::{RuntimeDebugNoBound, Twox128};
9use frame_system::pallet_prelude::BlockNumberFor;
10use parity_scale_codec::{Decode, Encode};
11use sp_core::serde::{Deserialize, Serialize};
12extern crate alloc;
13use alloc::{collections::btree_map::BTreeMap, string::String, vec, vec::Vec};
14use core::fmt::Debug;
15use frame_support::{
16	pallet_prelude::{DecodeWithMemTracking, TypeInfo},
17	StorageHasher,
18};
19use sp_io::offchain_index;
20use sp_runtime::{
21	offchain::{
22		storage::StorageValueRef,
23		storage_lock::{BlockAndTime, StorageLock, Time},
24		Duration,
25	},
26	traits::One,
27	Saturating,
28};
29
30/// Block event storage prefix
31const BLOCK_EVENT_KEY: &[u8] = b"frequency::block_event::msa::";
32/// Block event storage prefix for fork-aware events
33const BLOCK_EVENT_FORK_AWARE_KEY: &[u8] = b"frequency::block_event_fork::msa::";
34/// number of buckets to map the events for fork-aware storage
35const MAX_FORK_AWARE_BUCKET: u32 = 1000;
36/// max number of events to check from storage
37const MAX_NUMBER_OF_STORAGE_CHECKS: u16 = 1000;
38/// Lock expiration timeout in in milli-seconds for initial data import msa pallet
39const MSA_INITIAL_LOCK_TIMEOUT_EXPIRATION_MS: u64 = 6000;
40
41/// Lock expiration block for initial data import msa pallet
42const MSA_INITIAL_LOCK_BLOCK_EXPIRATION_BLOCKS: u32 = 120;
43
44/// Lock name for initial data index for msa pallet
45const MSA_INITIAL_LOCK_NAME: &[u8; 28] = b"Msa::ofw::initial-index-lock";
46
47/// storage name for initial data import storage
48pub const MSA_INITIAL_INDEXED_STORAGE_NAME: &[u8; 25] = b"Msa::ofw::initial-indexed";
49
50/// Lock name for last processed block number events
51const LAST_PROCESSED_BLOCK_LOCK_NAME: &[u8; 35] = b"Msa::ofw::last-processed-block-lock";
52
53/// lst processed block storage name
54pub const LAST_PROCESSED_BLOCK_STORAGE_NAME: &[u8; 30] = b"Msa::ofw::last-processed-block";
55
56/// Lock expiration timeout in milliseconds for last processed block
57const LAST_PROCESSED_BLOCK_LOCK_TIMEOUT_EXPIRATION_MS: u64 = 5000;
58
59/// Lock expiration for last processed block
60const LAST_PROCESSED_BLOCK_LOCK_BLOCK_EXPIRATION_BLOCKS: u32 = 20;
61
62/// number of previous blocks to check to mitigate offchain worker skips processing any block
63const NUMBER_OF_PREVIOUS_BLOCKS_TO_CHECK: u32 = 5u32;
64
65/// number of blocks to explore when trying to find the block number from block hash
66const NUMBER_OF_BLOCKS_TO_EXPLORE: u32 = 1000;
67
68/// HTTP request deadline in milliseconds
69pub const HTTP_REQUEST_DEADLINE_MS: u64 = 2000;
70
71/// LOCAL RPC URL and port
72/// warning: this should be updated if rpc port is set to anything different from 9944
73pub const RPC_FINALIZED_BLOCK_REQUEST_URL: &str = "http://localhost:9944";
74/// request body for getting last finalized block from rpc
75pub const RPC_FINALIZED_BLOCK_REQUEST_BODY: &[u8; 78] =
76	b"{\"id\": 10, \"jsonrpc\": \"2.0\", \"method\": \"chain_getFinalizedHead\", \"params\": []}";
77
78/// The overarching Offchain replay type that can allow replay of different events across different pallets
79#[derive(
80	TypeInfo, RuntimeDebugNoBound, Clone, Decode, DecodeWithMemTracking, Encode, PartialEq, Eq,
81)]
82#[scale_info(skip_type_params(T))]
83pub enum OffchainReplayEvent<T: Config> {
84	/// Msa pallet related replay event
85	MsaPallet(MsaOffchainReplayEvent<T>),
86}
87/// The Offchain replay type for Msa Pallet that can allow replay of different events
88#[derive(
89	TypeInfo, RuntimeDebugNoBound, Clone, Decode, DecodeWithMemTracking, Encode, PartialEq, Eq,
90)]
91#[scale_info(skip_type_params(T))]
92pub enum MsaOffchainReplayEvent<T: Config> {
93	/// Key re-indexing event
94	KeyReIndex {
95		/// Message Source Id that we like to reindex
96		msa_id: MessageSourceId,
97		/// optional key to index
98		index_key: Option<T::AccountId>,
99	},
100}
101
102/// offchain worker main execution function
103pub fn do_offchain_worker<T: Config>(block_number: BlockNumberFor<T>) {
104	if let Some(finalized_block_number) = get_finalized_block_number::<T>(block_number) {
105		match offchain_index_initial_state::<T>(finalized_block_number) {
106			LockStatus::Locked => {
107				log::info!("initiating-index is still locked in {block_number:?}");
108			},
109			LockStatus::Released => {
110				apply_offchain_events::<T>(finalized_block_number);
111			},
112		}
113	};
114}
115/// stores the event into offchain DB using offchain indexing
116pub fn offchain_index_event<T: Config>(event: Option<&Event<T>>, msa_id: MessageSourceId) {
117	if let Some(event) = IndexedEvent::map(event, msa_id) {
118		let block_number: u32 =
119			<frame_system::Pallet<T>>::block_number().try_into().unwrap_or_default();
120		let current_event_count: u16 = <OffchainIndexEventCount<T>>::get().saturating_add(1);
121		<OffchainIndexEventCount<T>>::put(current_event_count);
122		let event_key = get_indexed_event_key(block_number, current_event_count);
123		// set the event in offchain storage
124		set_offchain_index(&event_key, event.clone());
125
126		// to ensure we can handle the issues due to forking and overriding stored events we double
127		// index an event, and we choose to use or discard it on offchain worker side
128		let fork_aware_key = get_fork_aware_event_key(block_number, get_bucket_number(&event));
129
130		set_offchain_index(&fork_aware_key, event);
131	}
132}
133
134/// Offchain indexes all existing data in chain state
135/// returns the LockStatus
136fn offchain_index_initial_state<T: Config>(block_number: BlockNumberFor<T>) -> LockStatus {
137	let mut lock = StorageLock::<BlockAndTime<Pallet<T>>>::with_block_and_time_deadline(
138		MSA_INITIAL_LOCK_NAME,
139		MSA_INITIAL_LOCK_BLOCK_EXPIRATION_BLOCKS,
140		Duration::from_millis(MSA_INITIAL_LOCK_TIMEOUT_EXPIRATION_MS),
141	);
142	if let Ok(mut guard) = lock.try_lock() {
143		let processed_storage = StorageValueRef::persistent(MSA_INITIAL_INDEXED_STORAGE_NAME);
144		let is_initial_indexed = processed_storage.get::<bool>().unwrap_or(None);
145
146		if !is_initial_indexed.unwrap_or_default() {
147			log::info!("Msa::ofw::initial-indexed is {is_initial_indexed:?}");
148
149			// setting last processed block so we can start indexing from that block after
150			// initial index is done
151			init_last_processed_block::<T>(block_number);
152
153			let mut counter = 0u64;
154			for (account_id, msa_id) in PublicKeyToMsaId::<T>::iter() {
155				process_offchain_events::<T>(
156					msa_id,
157					vec![IndexedEvent::IndexedPublicKeyAdded { key: account_id, msa_id }],
158				);
159
160				// extend the initial index lock
161				counter += 1;
162				if counter % 1000 == 0 {
163					log::info!("Added {counter} more keys!");
164					if guard.extend_lock().is_err() {
165						log::warn!("lock is expired in block {block_number:?}");
166						return LockStatus::Released;
167					}
168				}
169			}
170
171			processed_storage.set(&true);
172			log::info!("Finished adding {counter} keys!");
173		}
174	} else {
175		return LockStatus::Locked;
176	};
177	LockStatus::Released
178}
179
180/// apply offchain event into offchain DB
181fn apply_offchain_events<T: Config>(block_number: BlockNumberFor<T>) {
182	let mut lock = StorageLock::<BlockAndTime<Pallet<T>>>::with_block_and_time_deadline(
183		LAST_PROCESSED_BLOCK_LOCK_NAME,
184		LAST_PROCESSED_BLOCK_LOCK_BLOCK_EXPIRATION_BLOCKS,
185		Duration::from_millis(LAST_PROCESSED_BLOCK_LOCK_TIMEOUT_EXPIRATION_MS),
186	);
187
188	if let Ok(mut guard) = lock.try_lock() {
189		log::info!("processing events in {block_number:?}");
190
191		let last_processed_block_storage =
192			StorageValueRef::persistent(LAST_PROCESSED_BLOCK_STORAGE_NAME);
193		let default_starting_block_number = block_number
194			.saturating_sub(BlockNumberFor::<T>::from(NUMBER_OF_PREVIOUS_BLOCKS_TO_CHECK));
195		let mut start_block_number = last_processed_block_storage
196			.get::<BlockNumberFor<T>>()
197			.unwrap_or(Some(default_starting_block_number))
198			.unwrap_or(default_starting_block_number);
199
200		// since this is the last processed block number we already processed it and starting from the next one
201		start_block_number += BlockNumberFor::<T>::one();
202		while start_block_number <= block_number {
203			if reverse_map_msa_keys::<T>(start_block_number) && guard.extend_lock().is_err() {
204				log::warn!("last processed block lock is expired in block {block_number:?}");
205				break;
206			}
207			last_processed_block_storage.set(&start_block_number);
208			start_block_number += BlockNumberFor::<T>::one();
209		}
210	} else {
211		log::info!("skip processing events on {block_number:?} due to existing lock!");
212	};
213}
214
215/// Set offchain index value, used to store MSA Events to be process by offchain worker
216fn set_offchain_index<V>(key: &[u8], value: V)
217where
218	V: Encode + Clone + Decode + Eq + Debug,
219{
220	offchain_index::set(key, value.encode().as_slice());
221}
222
223/// Get offchain index value, used to store MSA Events to be process by offchain worker
224fn get_offchain_index<V>(key: &[u8]) -> Option<V>
225where
226	V: Encode + Clone + Decode + Eq + Debug,
227{
228	let value = offchain_common::get_index_value::<V>(key);
229	value.unwrap_or_else(|e| {
230		log::error!("Error getting offchain index value: {e:?}");
231		None
232	})
233}
234
235/// Offchain indexed compatible Event type
236#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebugNoBound)]
237pub enum IndexedEvent<T: Config> {
238	/// A new Message Service Account was created with a new MessageSourceId
239	IndexedMsaCreated {
240		/// The MSA for the Event
241		msa_id: MessageSourceId,
242
243		/// The key added to the MSA
244		key: T::AccountId,
245	},
246	/// An AccountId has been associated with a MessageSourceId
247	IndexedPublicKeyAdded {
248		/// The MSA for the Event
249		msa_id: MessageSourceId,
250
251		/// The key added to the MSA
252		key: T::AccountId,
253	},
254	/// An AccountId had all permissions revoked from its MessageSourceId
255	IndexedPublicKeyDeleted {
256		/// The MSA for the Event
257		msa_id: MessageSourceId,
258		/// The key no longer approved for the associated MSA
259		key: T::AccountId,
260	},
261	/// The offchain MSA->PubKey index has been marked invalid for the indicated MessageSourceId.
262	MsaIndexInvalidated {
263		/// The MSA for the Event
264		msa_id: MessageSourceId,
265	},
266}
267
268impl<T: Config> IndexedEvent<T> {
269	/// maps a pallet event to indexed event type
270	pub fn map(event: Option<&Event<T>>, event_msa_id: MessageSourceId) -> Option<Self> {
271		match event {
272			Some(Event::MsaCreated { msa_id, key }) =>
273				Some(Self::IndexedMsaCreated { msa_id: *msa_id, key: key.clone() }),
274			Some(Event::PublicKeyAdded { msa_id, key }) =>
275				Some(Self::IndexedPublicKeyAdded { msa_id: *msa_id, key: key.clone() }),
276			Some(Event::PublicKeyDeleted { key }) =>
277				Some(Self::IndexedPublicKeyDeleted { msa_id: event_msa_id, key: key.clone() }),
278			None => Some(Self::MsaIndexInvalidated { msa_id: event_msa_id }),
279			_ => None,
280		}
281	}
282}
283
284/// Initializes the last_process_block value in offchain DB
285fn init_last_processed_block<T: Config>(current_block_number: BlockNumberFor<T>) {
286	let mut last_processed_block_lock = StorageLock::<'_, Time>::with_deadline(
287		LAST_PROCESSED_BLOCK_LOCK_NAME,
288		Duration::from_millis(LAST_PROCESSED_BLOCK_LOCK_TIMEOUT_EXPIRATION_MS),
289	);
290	let _ = last_processed_block_lock.lock();
291	let last_processed_block_storage =
292		StorageValueRef::persistent(LAST_PROCESSED_BLOCK_STORAGE_NAME);
293
294	// setting current_block-1 as the last processed so that we start indexing from current_block
295	let target_block: BlockNumberFor<T> =
296		current_block_number.saturating_sub(BlockNumberFor::<T>::one());
297	last_processed_block_storage.set(&target_block);
298}
299
300fn read_offchain_events<T: Config>(
301	block_number: BlockNumberFor<T>,
302) -> Vec<(IndexedEvent<T>, Vec<u8>)> {
303	let current_block: u32 = block_number.try_into().unwrap_or_default();
304	let mut events = vec![];
305
306	for i in 1..=MAX_NUMBER_OF_STORAGE_CHECKS {
307		let key = get_indexed_event_key(current_block, i);
308		match get_offchain_index::<IndexedEvent<T>>(&key) {
309			Some(decoded_event) => {
310				events.push((decoded_event, key));
311			},
312			None => {
313				// no more events for this block
314				break;
315			},
316		}
317	}
318
319	for i in 1u16..=MAX_FORK_AWARE_BUCKET.try_into().unwrap_or_default() {
320		let key = get_fork_aware_event_key(current_block, i);
321		if let Some(decoded_event) = get_offchain_index::<IndexedEvent<T>>(&key) {
322			events.push((decoded_event, key));
323		}
324	}
325	events
326}
327
328/// cleans the events from offchain storage
329fn clean_offchain_events(storage_keys: &Vec<Vec<u8>>) {
330	for key in storage_keys {
331		offchain_index::clear(key);
332	}
333}
334
335/// offchain worker callback for indexing msa keys
336/// return true if there are events and false if not
337fn reverse_map_msa_keys<T: Config>(block_number: BlockNumberFor<T>) -> bool {
338	// read the events indexed for the current block
339	let events_to_process: Vec<(IndexedEvent<T>, Vec<u8>)> = read_offchain_events(block_number);
340	let events_exists = !events_to_process.is_empty();
341	if events_exists {
342		log::info!(
343			"found {} double indexed events for block {:?}",
344			events_to_process.len(),
345			block_number
346		);
347	}
348
349	// collect a replay of all events by MSA id
350	let mut events_by_msa_id: BTreeMap<MessageSourceId, Vec<IndexedEvent<T>>> = BTreeMap::new();
351
352	// collect relevant events
353	for (event, _) in events_to_process.iter() {
354		match event {
355			IndexedEvent::IndexedPublicKeyAdded { msa_id, .. } |
356			IndexedEvent::IndexedMsaCreated { msa_id, .. } |
357			IndexedEvent::IndexedPublicKeyDeleted { msa_id, .. } |
358			IndexedEvent::MsaIndexInvalidated { msa_id } => {
359				let events = events_by_msa_id.entry(*msa_id).or_default();
360				events.push(event.clone());
361			},
362		}
363	}
364
365	// process and save to offchain db
366	for (msa_id, events) in events_by_msa_id {
367		if !events.is_empty() {
368			process_offchain_events(msa_id, events);
369		}
370	}
371
372	if events_exists {
373		let storage_keys = events_to_process.iter().map(|(_, key)| key.clone()).collect();
374		clean_offchain_events(&storage_keys);
375	}
376
377	events_exists
378}
379
380fn process_offchain_events<T: Config>(msa_id: MessageSourceId, events: Vec<IndexedEvent<T>>) {
381	// Lock will specifically prevent multiple offchain workers from
382	// processing the same msa events at the same time
383	let msa_lock_name = get_msa_account_lock_name(msa_id);
384	let mut msa_lock = StorageLock::<'_, Time>::with_deadline(
385		&msa_lock_name,
386		Duration::from_millis(MSA_ACCOUNT_LOCK_TIMEOUT_EXPIRATION_MS),
387	);
388	let _lock = msa_lock.lock();
389	let msa_storage_name = get_msa_account_storage_key_name(msa_id);
390	let mut msa_storage = StorageValueRef::persistent(&msa_storage_name);
391
392	let mut msa_keys = msa_storage.get::<Vec<T::AccountId>>().unwrap_or(None).unwrap_or_default();
393	let mut old_msa_keys = msa_keys.clone();
394	let mut changed = false;
395
396	for event in events {
397		match &event {
398			IndexedEvent::IndexedPublicKeyAdded { key, .. } |
399			IndexedEvent::IndexedMsaCreated { key, .. } => {
400				if let Some(on_chain_msa_id) = PublicKeyToMsaId::<T>::get(key) {
401					if on_chain_msa_id != msa_id {
402						log::warn!(
403							"{key:?} forked onchain-MsaId={on_chain_msa_id:?}, forked-MsaId=={msa_id:?}",
404						);
405					} else if !msa_keys.contains(key) {
406						msa_keys.push(key.clone());
407						changed = true;
408					}
409				}
410			},
411			IndexedEvent::IndexedPublicKeyDeleted { key, .. } => {
412				if PublicKeyToMsaId::<T>::get(key).is_none() && msa_keys.contains(key) {
413					msa_keys.retain(|k| k != key);
414					old_msa_keys.retain(|k| k != key);
415					changed = true;
416				}
417			},
418			IndexedEvent::MsaIndexInvalidated { .. } => {
419				// nothing to do since we take care of removing extra keys for all events anyway
420			},
421		}
422	}
423
424	// check old keys to ensure they are valid
425	for old_key in &old_msa_keys {
426		match PublicKeyToMsaId::<T>::get(old_key) {
427			Some(on_chain_msa_id) if on_chain_msa_id == msa_id => {
428				// everything is as expected. Do nothing
429			},
430			_ => {
431				msa_keys.retain(|k| k != old_key);
432				changed = true;
433			},
434		}
435	}
436
437	if changed {
438		if msa_keys.len() > 0 {
439			msa_storage.set(&msa_keys);
440		} else {
441			msa_storage.clear();
442		}
443	}
444}
445/// Response type of rpc to get finalized block
446#[derive(Serialize, Deserialize, Encode, Decode, Default, Debug)]
447pub struct FinalizedBlockResponse {
448	/// Hex encoded hash of last finalized block
449	pub result: String,
450}
451
452/// fetches finalized block hash from rpc
453fn fetch_finalized_block_hash<T: Config>() -> Result<T::Hash, sp_runtime::offchain::http::Error> {
454	// we are not able to use the custom extension in benchmarks due to feature conflict
455	// Build rpc_address bytes (Vec<u8>) either from benchmarks constant or via custom extension
456	let rpc_address_bytes: Vec<u8> = if cfg!(feature = "runtime-benchmarks") {
457		RPC_FINALIZED_BLOCK_REQUEST_URL.into()
458	} else {
459		// To prevent breaking change we call legacy function
460		// TODO remove this logic once all collators are upgraded
461		let legacy_val = common_primitives::offchain::custom::get_val();
462		if legacy_val.is_some() {
463			legacy_val.unwrap_or(RPC_FINALIZED_BLOCK_REQUEST_URL.into())
464		} else {
465			// call the runtime-interface function that fills our fixed buffer
466			let mut buffer = vec![0u8; 256];
467			let len = common_primitives::offchain::custom::get_val_buffered(&mut buffer);
468			if len == 0 {
469				RPC_FINALIZED_BLOCK_REQUEST_URL.into()
470			} else {
471				match Vec::<u8>::decode(&mut &buffer[..len as usize]) {
472					Ok(v) if !v.is_empty() => v,
473					_ => RPC_FINALIZED_BLOCK_REQUEST_URL.into(),
474				}
475			}
476		}
477	};
478	let url = core::str::from_utf8(&rpc_address_bytes)
479		.map_err(|_| sp_runtime::offchain::http::Error::Unknown)?;
480	// We want to keep the offchain worker execution time reasonable, so we set a hard-coded
481	// deadline to 2s to complete the external call.
482	// You can also wait indefinitely for the response, however you may still get a timeout
483	// coming from the host machine.
484	let deadline =
485		sp_io::offchain::timestamp().add(Duration::from_millis(HTTP_REQUEST_DEADLINE_MS));
486	let body = vec![RPC_FINALIZED_BLOCK_REQUEST_BODY];
487	let request = sp_runtime::offchain::http::Request::post(url, body);
488	let pending = request
489		.add_header("Content-Type", "application/json")
490		.deadline(deadline)
491		.send()
492		.map_err(|_| sp_runtime::offchain::http::Error::IoError)?;
493
494	let response = pending
495		.try_wait(deadline)
496		.map_err(|_| sp_runtime::offchain::http::Error::DeadlineReached)??;
497	// Let's check the status code before we proceed to reading the response.
498	if response.code != 200 {
499		log::warn!("Unexpected status code: {}", response.code);
500		return Err(sp_runtime::offchain::http::Error::Unknown);
501	}
502
503	// Next we want to fully read the response body and collect it to a vector of bytes.
504	// Note that the return object allows you to read the body in chunks as well
505	// with a way to control the deadline.
506	let body = response.body().collect::<Vec<u8>>();
507
508	// Create a str slice from the body.
509	let body_str = core::str::from_utf8(&body).map_err(|_| {
510		log::warn!("No UTF8 body");
511		sp_runtime::offchain::http::Error::Unknown
512	})?;
513
514	log::debug!("{body_str}");
515	let finalized_block_response: FinalizedBlockResponse =
516		serde_json::from_str(body_str).map_err(|_| sp_runtime::offchain::http::Error::Unknown)?;
517
518	// skipping 0x on front
519	let decoded_from_hex = hex::decode(&finalized_block_response.result[2..])
520		.map_err(|_| sp_runtime::offchain::http::Error::Unknown)?;
521
522	let val = T::Hash::decode(&mut &decoded_from_hex[..])
523		.map_err(|_| sp_runtime::offchain::http::Error::Unknown)?;
524	Ok(val)
525}
526
527/// fetch finalized block hash and convert it to block number
528fn get_finalized_block_number<T: Config>(
529	current_block: BlockNumberFor<T>,
530) -> Option<BlockNumberFor<T>> {
531	let mut finalized_block_number = None;
532	let last_finalized_hash = match fetch_finalized_block_hash::<T>() {
533		Ok(hash) => hash,
534		Err(e) => {
535			log::error!("failure to get the finalized hash {e:?}");
536			return finalized_block_number;
537		},
538	};
539
540	// iterates on imported blocks to find the block_number from block_hash
541	let mut current_block_number = current_block;
542	let last_block_number =
543		current_block.saturating_sub(BlockNumberFor::<T>::from(NUMBER_OF_BLOCKS_TO_EXPLORE));
544	while current_block_number > last_block_number {
545		if last_finalized_hash == frame_system::Pallet::<T>::block_hash(current_block_number) {
546			finalized_block_number = Some(current_block_number);
547			break;
548		}
549		current_block_number.saturating_dec();
550	}
551
552	match finalized_block_number {
553		None => {
554			log::error!(
555				"Not able to find any imported block with {last_finalized_hash:?} hash and {current_block:?} block",
556			);
557		},
558		Some(inner) => {
559			log::info!("last finalized block number {inner:?} and hash {last_finalized_hash:?}",);
560		},
561	}
562	finalized_block_number
563}
564
565/// converts an event to a number between [1, `MAX_FORK_AWARE_BUCKET`]
566#[allow(clippy::precedence)]
567pub fn get_bucket_number<T: Config>(event: &IndexedEvent<T>) -> u16 {
568	let hashed = Twox128::hash(&event.encode());
569	// Directly combine the first 4 bytes into a u32 using shifts and bitwise OR
570	let num = (hashed[0] as u32) << 24 |
571		(hashed[1] as u32) << 16 |
572		(hashed[2] as u32) << 8 |
573		(hashed[3] as u32);
574
575	((num % MAX_FORK_AWARE_BUCKET) + 1u32) as u16
576}
577
578fn get_fork_aware_event_key(block_number: u32, event_index: u16) -> Vec<u8> {
579	[BLOCK_EVENT_FORK_AWARE_KEY, block_number.encode().as_slice(), event_index.encode().as_slice()]
580		.concat()
581}
582
583fn get_indexed_event_key(block_number: u32, event_index: u16) -> Vec<u8> {
584	[BLOCK_EVENT_KEY, block_number.encode().as_slice(), event_index.encode().as_slice()].concat()
585}