common_primitives/
messages.rs

1#[cfg(feature = "std")]
2use crate::utils;
3use crate::{msa::MessageSourceId, node::BlockNumber, schema::SchemaId};
4use parity_scale_codec::{Decode, Encode};
5use scale_info::TypeInfo;
6#[cfg(feature = "std")]
7use serde::{Deserialize, Serialize};
8use sp_runtime::traits::One;
9extern crate alloc;
10use alloc::{vec, vec::Vec};
11#[cfg(feature = "std")]
12use utils::*;
13
14/// A type for responding with a single Message in an RPC-call dependent on schema model
15/// IPFS, Parquet: { index, block_number, provider_msa_id, cid, payload_length }
16/// Avro, OnChain: { index, block_number, provider_msa_id, msa_id, payload }
17#[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
18#[derive(Default, Clone, Encode, Decode, PartialEq, Debug, TypeInfo, Eq)]
19pub struct MessageResponse {
20	/// Message source account id of the Provider. This may be the same id as contained in `msa_id`,
21	/// indicating that the original source MSA is acting as its own provider. An id differing from that
22	/// of `msa_id` indicates that `provider_msa_id` was delegated by `msa_id` to send this message on
23	/// its behalf.
24	pub provider_msa_id: MessageSourceId,
25	/// Index in block to get total order.
26	pub index: u16,
27	/// Block-number for which the message was stored.
28	pub block_number: BlockNumber,
29	///  Message source account id (the original source).
30	#[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none", default))]
31	pub msa_id: Option<MessageSourceId>,
32	/// Serialized data in a the schemas.
33	#[cfg_attr(
34		feature = "std",
35		serde(with = "as_hex_option", skip_serializing_if = "Option::is_none", default)
36	)]
37	pub payload: Option<Vec<u8>>,
38	/// The content address for an IPFS payload in Base32. Will always be CIDv1.
39	#[cfg_attr(
40		feature = "std",
41		serde(with = "as_string_option", skip_serializing_if = "Option::is_none", default)
42	)]
43	pub cid: Option<Vec<u8>>,
44	///  Offchain payload length (IPFS).
45	#[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none", default))]
46	pub payload_length: Option<u32>,
47}
48
49/// A type for responding with a single Message in an RPC-call dependent on schema model
50/// IPFS, Parquet: { index, block_number, provider_msa_id, cid, payload_length }
51/// Avro, OnChain: { index, block_number, provider_msa_id, msa_id, payload }
52#[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
53#[derive(Default, Clone, Encode, Decode, PartialEq, Debug, TypeInfo, Eq)]
54pub struct MessageResponseV2 {
55	/// Message source account id of the Provider. This may be the same id as contained in `msa_id`,
56	/// indicating that the original source MSA is acting as its own provider. An id differing from that
57	/// of `msa_id` indicates that `provider_msa_id` was delegated by `msa_id` to send this message on
58	/// its behalf.
59	pub provider_msa_id: MessageSourceId,
60	/// Index in block to get total order.
61	pub index: u16,
62	/// Block-number for which the message was stored.
63	pub block_number: BlockNumber,
64	///  Message source account id (the original source).
65	#[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none", default))]
66	pub msa_id: Option<MessageSourceId>,
67	/// Serialized data in a the schemas.
68	#[cfg_attr(
69		feature = "std",
70		serde(with = "as_hex_option", skip_serializing_if = "Option::is_none", default)
71	)]
72	pub payload: Option<Vec<u8>>,
73	/// The content address for an IPFS payload in Base32. Will always be CIDv1.
74	#[cfg_attr(
75		feature = "std",
76		serde(with = "as_string_option", skip_serializing_if = "Option::is_none", default)
77	)]
78	pub cid: Option<Vec<u8>>,
79	///  Offchain payload length (IPFS).
80	#[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none", default))]
81	pub payload_length: Option<u32>,
82	/// The SchemaId of the schema that defines the payload format
83	pub schema_id: SchemaId,
84}
85
86impl Into<MessageResponse> for MessageResponseV2 {
87	fn into(self) -> MessageResponse {
88		MessageResponse {
89			provider_msa_id: self.provider_msa_id,
90			index: self.index,
91			block_number: self.block_number,
92			msa_id: self.msa_id,
93			payload: self.payload,
94			cid: self.cid,
95			payload_length: self.payload_length,
96		}
97	}
98}
99
100/// A type for requesting paginated messages.
101#[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
102#[derive(Default, Clone, Encode, Decode, PartialEq, Debug, TypeInfo, Eq)]
103pub struct BlockPaginationRequest {
104	/// Starting block-number (inclusive).
105	pub from_block: BlockNumber,
106	/// Current page index starting from 0.
107	pub from_index: u32,
108	/// Ending block-number (exclusive).
109	pub to_block: BlockNumber,
110	/// The number of messages in a single page.
111	pub page_size: u32,
112}
113
114impl BlockPaginationRequest {
115	/// Hard limit on the number of items per page that can be returned
116	pub const MAX_PAGE_SIZE: u32 = 10000;
117	/// Hard limit on the block range for a request (~7 days at 12 sec per block)
118	pub const MAX_BLOCK_RANGE: u32 = 50000; // ~3 days (6 sec per block)= ~7 days (12 sec per block)
119
120	/// Helper function for request validation.
121	/// * Page size should not exceed MAX_PAGE_SIZE.
122	/// * Block range [from_block:to_block) should not exceed MAX_BLOCK_RANGE.
123	pub fn validate(&self) -> bool {
124		self.page_size > 0 &&
125			self.page_size <= Self::MAX_PAGE_SIZE &&
126			self.from_block < self.to_block &&
127			self.to_block - self.from_block <= Self::MAX_BLOCK_RANGE
128	}
129}
130
131/// A type for responding with a collection of paginated messages.
132#[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
133#[derive(Default, Clone, Encode, Decode, PartialEq, Debug, TypeInfo, Eq)]
134pub struct BlockPaginationResponse<T> {
135	/// Collection of messages for a given [`BlockPaginationRequest`].
136	pub content: Vec<T>,
137	/// Flag to indicate the end of paginated messages.
138	pub has_next: bool,
139	#[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none"))]
140	/// Flag to indicate the starting block number for the next page.
141	pub next_block: Option<BlockNumber>,
142	#[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none"))]
143	/// Flag to indicate the next index for the following request.
144	pub next_index: Option<u32>,
145}
146
147impl<T> BlockPaginationResponse<T> {
148	/// Generates a new empty Pagination response
149	pub const fn new() -> BlockPaginationResponse<T> {
150		BlockPaginationResponse {
151			content: vec![],
152			has_next: false,
153			next_block: None,
154			next_index: None,
155		}
156	}
157
158	/// Checks if we are at the end of the pagination
159	/// if we are, update the response with the correct next information
160	pub fn check_end_condition_and_set_next_pagination(
161		&mut self,
162		block_number: BlockNumber,
163		current_index: u32,
164		list_size: u32,
165		request: &BlockPaginationRequest,
166	) -> bool {
167		if self.content.len() as u32 == request.page_size {
168			let mut next_block = block_number;
169			let mut next_index = current_index + 1;
170
171			// checking if it's end of current list
172			if next_index == list_size {
173				next_block = block_number + BlockNumber::one();
174				next_index = 0;
175			}
176
177			if next_block < request.to_block {
178				self.has_next = true;
179				self.next_block = Some(next_block);
180				self.next_index = Some(next_index);
181			}
182			return true
183		}
184
185		false
186	}
187}
188
189#[cfg(test)]
190mod tests {
191	use crate::{
192		messages::{BlockPaginationRequest, BlockPaginationResponse, MessageResponse},
193		node::BlockNumber,
194	};
195
196	struct TestCase<T> {
197		input: BlockPaginationRequest,
198		expected: T,
199		message: String,
200	}
201
202	#[test]
203	fn as_hex_option_msg_ipfs_serialize_deserialize_test() {
204		// skip deserialize if Option::none works
205		let msg = MessageResponse {
206			payload: None,
207			msa_id: None,
208			provider_msa_id: 1,
209			index: 1,
210			block_number: 1,
211			cid: Some(
212				"bafkreidgvpkjawlxz6sffxzwgooowe5yt7i6wsyg236mfoks77nywkptdq"
213					.as_bytes()
214					.to_vec(),
215			),
216			payload_length: Some(42),
217		};
218		let serialized = serde_json::to_string(&msg).unwrap();
219		assert_eq!(serialized, "{\"provider_msa_id\":1,\"index\":1,\"block_number\":1,\"cid\":\"bafkreidgvpkjawlxz6sffxzwgooowe5yt7i6wsyg236mfoks77nywkptdq\",\"payload_length\":42}");
220
221		let deserialized: MessageResponse = serde_json::from_str(&serialized).unwrap();
222		assert_eq!(deserialized, msg);
223	}
224
225	#[test]
226	fn as_hex_option_empty_payload_deserialize_as_default_value() {
227		let expected_msg = MessageResponse {
228			payload: None,
229			msa_id: Some(1),
230			provider_msa_id: 1,
231			index: 1,
232			block_number: 1,
233			cid: None,
234			payload_length: None,
235		};
236
237		// Notice Payload field is missing
238		let serialized_msg_without_payload =
239			"{\"provider_msa_id\":1,\"index\":1,\"block_number\":1,\"msa_id\":1}";
240
241		let deserialized_result: MessageResponse =
242			serde_json::from_str(serialized_msg_without_payload).unwrap();
243		assert_eq!(deserialized_result, expected_msg);
244	}
245
246	#[test]
247	fn block_pagination_request_validation_test() {
248		let test_cases: Vec<TestCase<bool>> = vec![
249			TestCase {
250				input: BlockPaginationRequest { from_block: 10, from_index: 0, to_block: 12, page_size: 1 },
251				expected: true,
252				message: "Should be valid".to_string(),
253			},
254			TestCase {
255				input: BlockPaginationRequest { from_block: 10, from_index: 0, to_block: 12, page_size: 0 },
256				expected: false,
257				message: "Page with size 0 is invalid".to_string(),
258			},
259			TestCase {
260				input: BlockPaginationRequest { from_block: 10, from_index: 0, to_block: 8, page_size: 1 },
261				expected: false,
262				message: "from_block should be less than to_block".to_string(),
263			},
264			TestCase {
265				input: BlockPaginationRequest { from_block: 10, from_index: 0, to_block: 8, page_size: 10000 + 1 },
266				expected: false,
267				message: "page_size should be less than MAX_PAGE_SIZE".to_string(),
268			},
269			TestCase {
270				input: BlockPaginationRequest { from_block: 1, from_index: 0, to_block: 50000 + 2, page_size: 1 },
271				expected: false,
272				message: "the difference between from_block and to_block should be less than MAX_BLOCK_RANGE".to_string(),
273			},
274		];
275
276		for tc in test_cases {
277			assert_eq!(tc.expected, tc.input.validate(), "{}", tc.message);
278		}
279	}
280
281	#[test]
282	fn check_end_condition_does_not_mutate_when_at_the_end() {
283		let mut resp = BlockPaginationResponse::<u32> {
284			content: vec![1, 2, 3],
285			has_next: false,
286			next_block: None,
287			next_index: None,
288		};
289
290		let total_data_length: u32 = resp.content.len() as u32;
291
292		let request = BlockPaginationRequest {
293			from_block: 1 as BlockNumber,
294			from_index: 0,
295			to_block: 5,
296			// Page is LARGER
297			page_size: total_data_length + 10,
298		};
299		// We are at the LAST block
300		let current_block = 5;
301		// Index after content
302		let current_index = total_data_length - 1;
303		// Critical Bit: NO more data than index
304		let list_size = current_index;
305		let is_full = resp.check_end_condition_and_set_next_pagination(
306			current_block,
307			current_index,
308			list_size,
309			&request,
310		);
311		// NOT FULL
312		assert!(!is_full);
313		// NOTHING MORE
314		assert!(!resp.has_next);
315		// None
316		assert_eq!(None, resp.next_block);
317		assert_eq!(None, resp.next_index);
318	}
319
320	#[test]
321	fn check_end_condition_mutates_when_more_in_list_than_page() {
322		let mut resp = BlockPaginationResponse::<u32> {
323			content: vec![1, 2, 3],
324			has_next: false,
325			next_block: None,
326			next_index: None,
327		};
328
329		let total_data_length: u32 = resp.content.len() as u32;
330
331		let request = BlockPaginationRequest {
332			from_block: 1 as BlockNumber,
333			from_index: 0,
334			to_block: 5,
335			page_size: total_data_length,
336		};
337		// We have not completed the block yet
338		let current_block = 1;
339		// End of the Block
340		let current_index = total_data_length - 1;
341		// Critical Bit: MORE Data to go in length than page_size
342		let list_size = total_data_length + 1;
343		let is_full = resp.check_end_condition_and_set_next_pagination(
344			current_block,
345			current_index,
346			list_size,
347			&request,
348		);
349		assert!(is_full);
350		assert!(resp.has_next);
351		// SAME block
352		assert_eq!(Some(1), resp.next_block);
353		// NEXT index
354		assert_eq!(Some(current_index + 1), resp.next_index);
355	}
356
357	#[test]
358	fn check_end_condition_mutates_when_more_than_page_but_none_left_in_block() {
359		let mut resp = BlockPaginationResponse::<u32> {
360			content: vec![1, 2, 3],
361			has_next: false,
362			next_block: None,
363			next_index: None,
364		};
365
366		let total_data_length: u32 = resp.content.len() as u32;
367
368		let request = BlockPaginationRequest {
369			from_block: 1 as BlockNumber,
370			from_index: 0,
371			to_block: 5,
372			page_size: total_data_length,
373		};
374		// We have not completed the block yet
375		let current_block = 1;
376		// End of the Block
377		let current_index = total_data_length - 1;
378		// SAME in length than page_size
379		let list_size = total_data_length;
380		let is_full = resp.check_end_condition_and_set_next_pagination(
381			current_block,
382			current_index,
383			list_size,
384			&request,
385		);
386		assert!(is_full);
387		assert!(resp.has_next);
388		// NEXT block
389		assert_eq!(Some(current_block + 1), resp.next_block);
390		// ZERO index
391		assert_eq!(Some(0), resp.next_index);
392	}
393}