pallet_messages/
lib.rs

1//! Stores messages for `IPFS` and `OnChain` Schema payload locations
2//!
3//! ## Quick Links
4//! - [Configuration: `Config`](Config)
5//! - [Extrinsics: `Call`](Call)
6//! - [Runtime API: `MessagesRuntimeApi`](../pallet_messages_runtime_api/trait.MessagesRuntimeApi.html)
7//! - [Custom RPC API: `MessagesApiServer`](../pallet_messages_rpc/trait.MessagesApiServer.html)
8//! - [Event Enum: `Event`](Event)
9//! - [Error Enum: `Error`](Error)
10#![doc = include_str!("../README.md")]
11// Substrate macros are tripping the clippy::expect_used lint.
12#![allow(clippy::expect_used)]
13// Ensure we're `no_std` when compiling for Wasm.
14#![cfg_attr(not(feature = "std"), no_std)]
15// Strong Documentation Lints
16#![deny(
17	rustdoc::broken_intra_doc_links,
18	rustdoc::missing_crate_level_docs,
19	rustdoc::invalid_codeblock_attributes,
20	missing_docs
21)]
22
23#[cfg(feature = "runtime-benchmarks")]
24mod benchmarking;
25#[cfg(test)]
26mod tests;
27
28pub mod weights;
29
30mod types;
31
32/// Storage migrations
33pub mod migration;
34
35use core::{convert::TryInto, fmt::Debug};
36use frame_support::{ensure, pallet_prelude::Weight, traits::Get, BoundedVec};
37use sp_runtime::DispatchError;
38
39extern crate alloc;
40extern crate core;
41
42use alloc::vec::Vec;
43use common_primitives::{
44	cid::*,
45	messages::*,
46	msa::{DelegatorId, GrantValidator, MessageSourceId, MsaLookup, MsaValidator, ProviderId},
47	schema::*,
48};
49use frame_support::dispatch::DispatchResult;
50use parity_scale_codec::Encode;
51
52#[cfg(feature = "runtime-benchmarks")]
53use common_primitives::benchmarks::{MsaBenchmarkHelper, SchemaBenchmarkHelper};
54
55pub use pallet::*;
56pub use types::*;
57pub use weights::*;
58
59use common_primitives::node::BlockNumber;
60use frame_system::pallet_prelude::*;
61
62#[frame_support::pallet]
63pub mod pallet {
64	use super::*;
65	use frame_support::pallet_prelude::*;
66
67	/// The current storage version.
68	pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(3);
69
70	#[pallet::config]
71	pub trait Config: frame_system::Config {
72		/// The overarching event type.
73		#[allow(deprecated)]
74		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
75
76		/// Weight information for extrinsics in this pallet.
77		type WeightInfo: WeightInfo;
78
79		/// A type that will supply MSA related information
80		type MsaInfoProvider: MsaLookup + MsaValidator<AccountId = Self::AccountId>;
81
82		/// A type that will validate schema grants
83		type SchemaGrantValidator: GrantValidator<IntentId, BlockNumberFor<Self>>;
84
85		/// A type that will supply schema related information.
86		type SchemaProvider: SchemaProvider<SchemaId>;
87
88		/// The maximum size of a message payload bytes.
89		#[pallet::constant]
90		type MessagesMaxPayloadSizeBytes: Get<u32> + Clone + Debug + MaxEncodedLen;
91
92		/// How often to emit status events during a storage migration.
93		/// Try to make this larger than the number of message migrations that will fit
94		/// in a block by weight, as multiple of these events in a block is not really useful
95		/// or desireable.
96		type MigrateEmitEvery: Get<u32> + Clone + Debug;
97
98		#[cfg(feature = "runtime-benchmarks")]
99		/// A set of helper functions for benchmarking.
100		type MsaBenchmarkHelper: MsaBenchmarkHelper<Self::AccountId>;
101
102		#[cfg(feature = "runtime-benchmarks")]
103		/// A set of helper functions for benchmarking.
104		type SchemaBenchmarkHelper: SchemaBenchmarkHelper;
105	}
106
107	#[pallet::pallet]
108	#[pallet::storage_version(STORAGE_VERSION)]
109	pub struct Pallet<T>(_);
110
111	/// A temporary storage for getting the index for messages
112	/// At the start of the next block this storage is set to 0
113	#[pallet::storage]
114	#[pallet::whitelist_storage]
115	pub(super) type BlockMessageIndex<T: Config> = StorageValue<_, MessageIndex, ValueQuery>;
116
117	/// Storage for messages
118	#[pallet::storage]
119	pub(super) type MessagesV3<T: Config> = StorageNMap<
120		_,
121		(
122			storage::Key<Twox64Concat, BlockNumberFor<T>>,
123			storage::Key<Twox64Concat, IntentId>,
124			storage::Key<Twox64Concat, MessageIndex>,
125		),
126		Message<T::MessagesMaxPayloadSizeBytes>,
127		OptionQuery,
128	>;
129
130	#[pallet::error]
131	pub enum Error<T> {
132		/// Deprecated: Too many messages are added to existing block
133		TooManyMessagesInBlock,
134
135		/// Message payload size is too large
136		ExceedsMaxMessagePayloadSizeBytes,
137
138		/// Type Conversion Overflow
139		TypeConversionOverflow,
140
141		/// Invalid Message Source Account
142		InvalidMessageSourceAccount,
143
144		/// Invalid SchemaId or Schema not found
145		InvalidSchemaId,
146
147		/// UnAuthorizedDelegate
148		UnAuthorizedDelegate,
149
150		/// Invalid payload location
151		InvalidPayloadLocation,
152
153		/// Unsupported CID version
154		UnsupportedCidVersion,
155
156		/// Invalid CID
157		InvalidCid,
158	}
159
160	#[pallet::event]
161	#[pallet::generate_deposit(pub(super) fn deposit_event)]
162	pub enum Event<T: Config> {
163		/// Deprecated: please use [`Event::MessagesInBlock`]
164		/// Messages are stored for a specified schema id and block number
165		MessagesStored {
166			/// The schema for these messages
167			schema_id: SchemaId,
168			/// The block number for these messages
169			block_number: BlockNumberFor<T>,
170		},
171		/// Messages stored in the current block
172		MessagesInBlock,
173		/// Event emitted during storage migration to track progress
174		MessagesMigrated {
175			/// The storage version being migrated from
176			from_version: u16,
177			/// The storage version being migrated to
178			to_version: u16,
179			/// Total number of messages migrated in the current migration
180			cumulative_total_migrated: u64,
181		},
182	}
183
184	#[pallet::hooks]
185	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
186		fn on_initialize(_current: BlockNumberFor<T>) -> Weight {
187			<BlockMessageIndex<T>>::set(0u16);
188			// allocates 1 read and 1 write for any access of `MessageIndex` in every block
189			T::DbWeight::get().reads(1u64).saturating_add(T::DbWeight::get().writes(1u64))
190			// TODO: add retention policy execution GitHub Issue: #126 and #25
191		}
192	}
193
194	#[pallet::call]
195	impl<T: Config> Pallet<T> {
196		/// Adds a message for a resource hosted on IPFS. The input consists of
197		/// a Base32-encoded [CID](https://docs.ipfs.tech/concepts/content-addressing/#version-1-v1)
198		/// as well as a 32-bit content length. The stored payload will contain the
199		/// CID encoded as binary, as well as the 32-bit message content length.
200		/// The actual message content will be on IPFS.
201		///
202		/// # Events
203		/// * [`Event::MessagesInBlock`] - Messages Stored in the block
204		///
205		/// # Errors
206		/// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large.
207		/// * [`Error::InvalidSchemaId`] - Schema not found.
208		/// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location.
209		/// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA.
210		/// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full.
211		/// * [`Error::UnsupportedCidVersion`] - CID version is not supported (V0).
212		/// * [`Error::InvalidCid`] - Unable to parse provided CID.
213		///
214		#[pallet::call_index(0)]
215		#[pallet::weight(T::WeightInfo::add_ipfs_message())]
216		pub fn add_ipfs_message(
217			origin: OriginFor<T>,
218			#[pallet::compact] schema_id: SchemaId,
219			cid: Vec<u8>,
220			#[pallet::compact] payload_length: u32,
221		) -> DispatchResult {
222			let provider_key = ensure_signed(origin)?;
223			let cid_binary = Self::validate_cid(&cid)?;
224			let payload_tuple: OffchainPayloadType = (cid_binary, payload_length);
225			let bounded_payload: BoundedVec<u8, T::MessagesMaxPayloadSizeBytes> = payload_tuple
226				.encode()
227				.try_into()
228				.map_err(|_| Error::<T>::ExceedsMaxMessagePayloadSizeBytes)?;
229
230			if let Some(schema) = T::SchemaProvider::get_schema_info_by_id(schema_id) {
231				ensure!(
232					schema.payload_location == PayloadLocation::IPFS,
233					Error::<T>::InvalidPayloadLocation
234				);
235
236				let provider_msa_id = Self::find_msa_id(&provider_key)?;
237				let current_block = frame_system::Pallet::<T>::block_number();
238				if Self::add_message(
239					provider_msa_id,
240					None,
241					bounded_payload,
242					schema.intent_id,
243					schema_id,
244					current_block,
245				)? {
246					Self::deposit_event(Event::MessagesInBlock);
247				}
248				Ok(())
249			} else {
250				Err(Error::<T>::InvalidSchemaId.into())
251			}
252		}
253
254		/// Add an on-chain message for a given schema id.
255		///
256		/// # Events
257		/// * [`Event::MessagesInBlock`] - In the next block
258		///
259		/// # Errors
260		/// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large.
261		/// * [`Error::InvalidSchemaId`] - Schema not found.
262		/// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location.
263		/// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA.
264		/// * [`Error::UnAuthorizedDelegate`] - Trying to add a message without a proper delegation between the origin and the on_behalf_of MSA.
265		/// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full.
266		///
267		#[pallet::call_index(1)]
268		#[pallet::weight(T::WeightInfo::add_onchain_message(payload.len() as u32))]
269		pub fn add_onchain_message(
270			origin: OriginFor<T>,
271			on_behalf_of: Option<MessageSourceId>,
272			#[pallet::compact] schema_id: SchemaId,
273			payload: Vec<u8>,
274		) -> DispatchResult {
275			let provider_key = ensure_signed(origin)?;
276
277			let bounded_payload: BoundedVec<u8, T::MessagesMaxPayloadSizeBytes> =
278				payload.try_into().map_err(|_| Error::<T>::ExceedsMaxMessagePayloadSizeBytes)?;
279
280			if let Some(schema) = T::SchemaProvider::get_schema_info_by_id(schema_id) {
281				ensure!(
282					schema.payload_location == PayloadLocation::OnChain,
283					Error::<T>::InvalidPayloadLocation
284				);
285
286				let provider_msa_id = Self::find_msa_id(&provider_key)?;
287				let provider_id = ProviderId(provider_msa_id);
288
289				let current_block = frame_system::Pallet::<T>::block_number();
290				// On-chain messages either are sent from the user themselves, or on behalf of another MSA Id
291				let maybe_delegator = match on_behalf_of {
292					Some(delegator_msa_id) => {
293						let delegator_id = DelegatorId(delegator_msa_id);
294						T::SchemaGrantValidator::ensure_valid_grant(
295							provider_id,
296							delegator_id,
297							schema.intent_id,
298							current_block,
299						)
300						.map_err(|_| Error::<T>::UnAuthorizedDelegate)?;
301						delegator_id
302					},
303					None => DelegatorId(provider_msa_id), // Delegate is also the Provider
304				};
305
306				if Self::add_message(
307					provider_msa_id,
308					Some(maybe_delegator.into()),
309					bounded_payload,
310					schema.intent_id,
311					schema_id,
312					current_block,
313				)? {
314					Self::deposit_event(Event::MessagesInBlock);
315				}
316
317				Ok(())
318			} else {
319				Err(Error::<T>::InvalidSchemaId.into())
320			}
321		}
322	}
323}
324
325impl<T: Config> Pallet<T> {
326	/// Stores a message for a given schema id.
327	/// returns true if it needs to emit an event
328	/// # Errors
329	/// * [`Error::TypeConversionOverflow`]
330	///
331	pub fn add_message(
332		provider_msa_id: MessageSourceId,
333		msa_id: Option<MessageSourceId>,
334		payload: BoundedVec<u8, T::MessagesMaxPayloadSizeBytes>,
335		intent_id: IntentId,
336		schema_id: SchemaId,
337		current_block: BlockNumberFor<T>,
338	) -> Result<bool, DispatchError> {
339		let index = BlockMessageIndex::<T>::get();
340		let first = index == 0;
341		let msg = Message {
342			schema_id,
343			payload, // size is checked on top of extrinsic
344			provider_msa_id,
345			msa_id,
346		};
347
348		<MessagesV3<T>>::insert((current_block, intent_id, index), msg);
349		BlockMessageIndex::<T>::set(index.saturating_add(1));
350		Ok(first)
351	}
352
353	/// Resolve an MSA from an account key(key)
354	/// An MSA Id associated with the account key is returned, if one exists.
355	///
356	/// # Errors
357	/// * [`Error::InvalidMessageSourceAccount`]
358	///
359	pub fn find_msa_id(key: &T::AccountId) -> Result<MessageSourceId, DispatchError> {
360		Ok(T::MsaInfoProvider::ensure_valid_msa_key(key)
361			.map_err(|_| Error::<T>::InvalidMessageSourceAccount)?)
362	}
363
364	/// Retrieve the messages for a particular intent and block range (paginated)
365	pub fn get_messages_by_intent_id(
366		intent_id: IntentId,
367		pagination: BlockPaginationRequest,
368	) -> BlockPaginationResponse<MessageResponseV2> {
369		let mut response = BlockPaginationResponse::new();
370
371		// Request Validation
372		if !pagination.validate() {
373			return response
374		}
375
376		// Schema Fetch and Check
377		let intent = match <T>::SchemaProvider::get_intent_by_id(intent_id) {
378			Some(intent) => intent,
379			None => return response,
380		};
381
382		let mut from_index: u32 = pagination.from_index;
383
384		'block_loop: for block_number in pagination.from_block..pagination.to_block {
385			let list: Vec<MessageResponseV2> = Self::get_messages_by_intent_and_block(
386				intent_id,
387				intent.payload_location,
388				block_number.into(),
389			);
390
391			// Max messages in a block are constrained to MessageIndex (u16) by the storage,
392			// so this is a safe type coercion. Just to be safe, we'll trap in in debug builds
393			let list_size = list.len();
394			debug_assert!(
395				list_size <= MessageIndex::MAX.into(),
396				"unexpected number of messages in block"
397			);
398			let list_size: u32 = list_size as u32;
399
400			let iter = list.into_iter().skip(from_index.saturating_sub(1) as usize);
401			// all subsequent blocks in this call should start at index 0
402			from_index = 0;
403			'message_loop: for (i, m) in iter.enumerate() {
404				response.content.push(m);
405
406				if response.check_end_condition_and_set_next_pagination(
407					block_number,
408					i as u32,
409					list_size,
410					&pagination,
411				) {
412					break 'block_loop;
413				}
414
415				continue 'message_loop;
416			}
417		}
418		response
419	}
420
421	/// Gets messages for a given IntentId and block number.
422	///
423	/// Payload location is included to map to correct response (To avoid fetching the Intent in this method)
424	///
425	/// Result is a vector of [`MessageResponseV2`].
426	///
427	pub fn get_messages_by_intent_and_block(
428		intent_id: IntentId,
429		payload_location: PayloadLocation,
430		block_number: BlockNumberFor<T>,
431	) -> Vec<MessageResponseV2> {
432		let block_number_value: BlockNumber = block_number.try_into().unwrap_or_default();
433
434		match payload_location {
435			PayloadLocation::Itemized | PayloadLocation::Paginated => Vec::new(),
436			_ => {
437				let mut messages: Vec<MessageResponseV2> =
438					MessagesV3::<T>::iter_prefix((block_number, intent_id))
439						.filter_map(|(index, msg)| {
440							msg.map_to_response((block_number_value, payload_location, index))
441						})
442						.collect();
443				messages.sort_by(|a, b| a.index.cmp(&b.index));
444				messages
445			},
446		}
447	}
448
449	/// Validates a CID to conform to IPFS CIDv1 (or higher) formatting (does not validate decoded CID fields)
450	///
451	/// # Errors
452	/// * [`Error::UnsupportedCidVersion`] - CID version is not supported (V0)
453	/// * [`Error::InvalidCid`] - Unable to parse provided CID
454	///
455	pub fn validate_cid(in_cid: &[u8]) -> Result<Vec<u8>, DispatchError> {
456		Ok(validate_cid(in_cid).map_err(|e| match e {
457			CidError::UnsupportedCidVersion => Error::<T>::UnsupportedCidVersion,
458			_ => Error::<T>::InvalidCid,
459		})?)
460	}
461}