1#![doc = include_str!("../README.md")]
11#![allow(clippy::expect_used)]
13#![cfg_attr(not(feature = "std"), no_std)]
15#![deny(
17 rustdoc::broken_intra_doc_links,
18 rustdoc::missing_crate_level_docs,
19 rustdoc::invalid_codeblock_attributes,
20 missing_docs
21)]
22
23#[cfg(feature = "runtime-benchmarks")]
24mod benchmarking;
25#[cfg(test)]
26mod tests;
27
28pub mod weights;
29
30mod types;
31
32pub mod migration;
34
35use core::{convert::TryInto, fmt::Debug};
36use frame_support::{ensure, pallet_prelude::Weight, traits::Get, BoundedVec};
37use sp_runtime::DispatchError;
38
39extern crate alloc;
40extern crate core;
41
42use alloc::vec::Vec;
43use common_primitives::{
44 cid::*,
45 messages::*,
46 msa::{DelegatorId, GrantValidator, MessageSourceId, MsaLookup, MsaValidator, ProviderId},
47 schema::*,
48};
49use frame_support::dispatch::DispatchResult;
50use parity_scale_codec::Encode;
51
52#[cfg(feature = "runtime-benchmarks")]
53use common_primitives::benchmarks::{MsaBenchmarkHelper, SchemaBenchmarkHelper};
54
55pub use pallet::*;
56pub use types::*;
57pub use weights::*;
58
59use common_primitives::node::BlockNumber;
60use frame_system::pallet_prelude::*;
61
62#[frame_support::pallet]
63pub mod pallet {
64 use super::*;
65 use frame_support::pallet_prelude::*;
66
67 pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(3);
69
70 #[pallet::config]
71 pub trait Config: frame_system::Config {
72 #[allow(deprecated)]
74 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
75
76 type WeightInfo: WeightInfo;
78
79 type MsaInfoProvider: MsaLookup + MsaValidator<AccountId = Self::AccountId>;
81
82 type SchemaGrantValidator: GrantValidator<IntentId, BlockNumberFor<Self>>;
84
85 type SchemaProvider: SchemaProvider<SchemaId>;
87
88 #[pallet::constant]
90 type MessagesMaxPayloadSizeBytes: Get<u32> + Clone + Debug + MaxEncodedLen;
91
92 type MigrateEmitEvery: Get<u32> + Clone + Debug;
97
98 #[cfg(feature = "runtime-benchmarks")]
99 type MsaBenchmarkHelper: MsaBenchmarkHelper<Self::AccountId>;
101
102 #[cfg(feature = "runtime-benchmarks")]
103 type SchemaBenchmarkHelper: SchemaBenchmarkHelper;
105 }
106
107 #[pallet::pallet]
108 #[pallet::storage_version(STORAGE_VERSION)]
109 pub struct Pallet<T>(_);
110
111 #[pallet::storage]
114 #[pallet::whitelist_storage]
115 pub(super) type BlockMessageIndex<T: Config> = StorageValue<_, MessageIndex, ValueQuery>;
116
117 #[pallet::storage]
119 pub(super) type MessagesV3<T: Config> = StorageNMap<
120 _,
121 (
122 storage::Key<Twox64Concat, BlockNumberFor<T>>,
123 storage::Key<Twox64Concat, IntentId>,
124 storage::Key<Twox64Concat, MessageIndex>,
125 ),
126 Message<T::MessagesMaxPayloadSizeBytes>,
127 OptionQuery,
128 >;
129
130 #[pallet::error]
131 pub enum Error<T> {
132 TooManyMessagesInBlock,
134
135 ExceedsMaxMessagePayloadSizeBytes,
137
138 TypeConversionOverflow,
140
141 InvalidMessageSourceAccount,
143
144 InvalidSchemaId,
146
147 UnAuthorizedDelegate,
149
150 InvalidPayloadLocation,
152
153 UnsupportedCidVersion,
155
156 InvalidCid,
158 }
159
160 #[pallet::event]
161 #[pallet::generate_deposit(pub(super) fn deposit_event)]
162 pub enum Event<T: Config> {
163 MessagesStored {
166 schema_id: SchemaId,
168 block_number: BlockNumberFor<T>,
170 },
171 MessagesInBlock,
173 MessagesMigrated {
175 from_version: u16,
177 to_version: u16,
179 cumulative_total_migrated: u64,
181 },
182 }
183
184 #[pallet::hooks]
185 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
186 fn on_initialize(_current: BlockNumberFor<T>) -> Weight {
187 <BlockMessageIndex<T>>::set(0u16);
188 T::DbWeight::get().reads(1u64).saturating_add(T::DbWeight::get().writes(1u64))
190 }
192 }
193
194 #[pallet::call]
195 impl<T: Config> Pallet<T> {
196 #[pallet::call_index(0)]
215 #[pallet::weight(T::WeightInfo::add_ipfs_message())]
216 pub fn add_ipfs_message(
217 origin: OriginFor<T>,
218 #[pallet::compact] schema_id: SchemaId,
219 cid: Vec<u8>,
220 #[pallet::compact] payload_length: u32,
221 ) -> DispatchResult {
222 let provider_key = ensure_signed(origin)?;
223 let cid_binary = Self::validate_cid(&cid)?;
224 let payload_tuple: OffchainPayloadType = (cid_binary, payload_length);
225 let bounded_payload: BoundedVec<u8, T::MessagesMaxPayloadSizeBytes> = payload_tuple
226 .encode()
227 .try_into()
228 .map_err(|_| Error::<T>::ExceedsMaxMessagePayloadSizeBytes)?;
229
230 if let Some(schema) = T::SchemaProvider::get_schema_info_by_id(schema_id) {
231 ensure!(
232 schema.payload_location == PayloadLocation::IPFS,
233 Error::<T>::InvalidPayloadLocation
234 );
235
236 let provider_msa_id = Self::find_msa_id(&provider_key)?;
237 let current_block = frame_system::Pallet::<T>::block_number();
238 if Self::add_message(
239 provider_msa_id,
240 None,
241 bounded_payload,
242 schema.intent_id,
243 schema_id,
244 current_block,
245 )? {
246 Self::deposit_event(Event::MessagesInBlock);
247 }
248 Ok(())
249 } else {
250 Err(Error::<T>::InvalidSchemaId.into())
251 }
252 }
253
254 #[pallet::call_index(1)]
268 #[pallet::weight(T::WeightInfo::add_onchain_message(payload.len() as u32))]
269 pub fn add_onchain_message(
270 origin: OriginFor<T>,
271 on_behalf_of: Option<MessageSourceId>,
272 #[pallet::compact] schema_id: SchemaId,
273 payload: Vec<u8>,
274 ) -> DispatchResult {
275 let provider_key = ensure_signed(origin)?;
276
277 let bounded_payload: BoundedVec<u8, T::MessagesMaxPayloadSizeBytes> =
278 payload.try_into().map_err(|_| Error::<T>::ExceedsMaxMessagePayloadSizeBytes)?;
279
280 if let Some(schema) = T::SchemaProvider::get_schema_info_by_id(schema_id) {
281 ensure!(
282 schema.payload_location == PayloadLocation::OnChain,
283 Error::<T>::InvalidPayloadLocation
284 );
285
286 let provider_msa_id = Self::find_msa_id(&provider_key)?;
287 let provider_id = ProviderId(provider_msa_id);
288
289 let current_block = frame_system::Pallet::<T>::block_number();
290 let maybe_delegator = match on_behalf_of {
292 Some(delegator_msa_id) => {
293 let delegator_id = DelegatorId(delegator_msa_id);
294 T::SchemaGrantValidator::ensure_valid_grant(
295 provider_id,
296 delegator_id,
297 schema.intent_id,
298 current_block,
299 )
300 .map_err(|_| Error::<T>::UnAuthorizedDelegate)?;
301 delegator_id
302 },
303 None => DelegatorId(provider_msa_id), };
305
306 if Self::add_message(
307 provider_msa_id,
308 Some(maybe_delegator.into()),
309 bounded_payload,
310 schema.intent_id,
311 schema_id,
312 current_block,
313 )? {
314 Self::deposit_event(Event::MessagesInBlock);
315 }
316
317 Ok(())
318 } else {
319 Err(Error::<T>::InvalidSchemaId.into())
320 }
321 }
322 }
323}
324
325impl<T: Config> Pallet<T> {
326 pub fn add_message(
332 provider_msa_id: MessageSourceId,
333 msa_id: Option<MessageSourceId>,
334 payload: BoundedVec<u8, T::MessagesMaxPayloadSizeBytes>,
335 intent_id: IntentId,
336 schema_id: SchemaId,
337 current_block: BlockNumberFor<T>,
338 ) -> Result<bool, DispatchError> {
339 let index = BlockMessageIndex::<T>::get();
340 let first = index == 0;
341 let msg = Message {
342 schema_id,
343 payload, provider_msa_id,
345 msa_id,
346 };
347
348 <MessagesV3<T>>::insert((current_block, intent_id, index), msg);
349 BlockMessageIndex::<T>::set(index.saturating_add(1));
350 Ok(first)
351 }
352
353 pub fn find_msa_id(key: &T::AccountId) -> Result<MessageSourceId, DispatchError> {
360 Ok(T::MsaInfoProvider::ensure_valid_msa_key(key)
361 .map_err(|_| Error::<T>::InvalidMessageSourceAccount)?)
362 }
363
364 pub fn get_messages_by_intent_id(
366 intent_id: IntentId,
367 pagination: BlockPaginationRequest,
368 ) -> BlockPaginationResponse<MessageResponseV2> {
369 let mut response = BlockPaginationResponse::new();
370
371 if !pagination.validate() {
373 return response
374 }
375
376 let intent = match <T>::SchemaProvider::get_intent_by_id(intent_id) {
378 Some(intent) => intent,
379 None => return response,
380 };
381
382 let mut from_index: u32 = pagination.from_index;
383
384 'block_loop: for block_number in pagination.from_block..pagination.to_block {
385 let list: Vec<MessageResponseV2> = Self::get_messages_by_intent_and_block(
386 intent_id,
387 intent.payload_location,
388 block_number.into(),
389 );
390
391 let list_size = list.len();
394 debug_assert!(
395 list_size <= MessageIndex::MAX.into(),
396 "unexpected number of messages in block"
397 );
398 let list_size: u32 = list_size as u32;
399
400 let iter = list.into_iter().skip(from_index.saturating_sub(1) as usize);
401 from_index = 0;
403 'message_loop: for (i, m) in iter.enumerate() {
404 response.content.push(m);
405
406 if response.check_end_condition_and_set_next_pagination(
407 block_number,
408 i as u32,
409 list_size,
410 &pagination,
411 ) {
412 break 'block_loop;
413 }
414
415 continue 'message_loop;
416 }
417 }
418 response
419 }
420
421 pub fn get_messages_by_intent_and_block(
428 intent_id: IntentId,
429 payload_location: PayloadLocation,
430 block_number: BlockNumberFor<T>,
431 ) -> Vec<MessageResponseV2> {
432 let block_number_value: BlockNumber = block_number.try_into().unwrap_or_default();
433
434 match payload_location {
435 PayloadLocation::Itemized | PayloadLocation::Paginated => Vec::new(),
436 _ => {
437 let mut messages: Vec<MessageResponseV2> =
438 MessagesV3::<T>::iter_prefix((block_number, intent_id))
439 .filter_map(|(index, msg)| {
440 msg.map_to_response((block_number_value, payload_location, index))
441 })
442 .collect();
443 messages.sort_by(|a, b| a.index.cmp(&b.index));
444 messages
445 },
446 }
447 }
448
449 pub fn validate_cid(in_cid: &[u8]) -> Result<Vec<u8>, DispatchError> {
456 Ok(validate_cid(in_cid).map_err(|e| match e {
457 CidError::UnsupportedCidVersion => Error::<T>::UnsupportedCidVersion,
458 _ => Error::<T>::InvalidCid,
459 })?)
460 }
461}