pallet_stateful_storage/migration/
v2.rs

1use crate::{
2	migration::v1,
3	stateful_child_tree::{MultipartKey, StatefulChildTree},
4	types::{ItemHeader, ITEMIZED_STORAGE_PREFIX, PAGINATED_STORAGE_PREFIX, PALLET_STORAGE_PREFIX},
5	weights, Config, Event, Pallet,
6};
7use alloc::vec::Vec;
8use common_primitives::{
9	msa::{MessageSourceId, MsaLookup},
10	schema::PayloadLocation,
11};
12use core::marker::PhantomData;
13use frame_support::{
14	migrations::{MigrationId, SteppedMigration, SteppedMigrationError},
15	pallet_prelude::{ConstU32, Get, GetStorageVersion, RuntimeDebug, StorageVersion},
16	weights::WeightMeter,
17	BoundedVec,
18};
19use parity_scale_codec::{Decode, Encode, MaxEncodedLen};
20use sp_core::storage::ChildInfo;
21#[cfg(feature = "try-runtime")]
22use sp_runtime::TryRuntimeError;
23
24const LOG_TARGET: &str = "pallet::stateful-storage::migration::v2";
25
26/// The length of a PaginatedKey (twox_128, twox_128, u16, u16)
27pub type PaginatedKeyLength = ConstU32<68>;
28/// The length of an ItemizedKey (twox_128, u16)
29pub type ItemizedKeyLength = ConstU32<34>;
30
31/// Type to encapsulate a child key of a certain size, or no key.
32/// Necessary because we need MaxEncodedLen, which `Vec<u8>` doesn't give us.
33/// Cursor struct for tracking migration progress
34#[derive(Encode, Decode, Clone, PartialEq, Eq, MaxEncodedLen, RuntimeDebug)]
35pub struct ChildCursor<N: Get<u32>> {
36	/// Current MSA ID data being migrated
37	pub id: MessageSourceId,
38	/// Last inner key processed; None means start from first key
39	pub last_key: BoundedVec<u8, N>,
40	/// Cumulative count of migrated pages
41	pub cumulative_pages: u64,
42}
43
44impl<N: Get<u32>> Default for ChildCursor<N> {
45	fn default() -> Self {
46		Self { id: 1, last_key: BoundedVec::<u8, N>::default(), cumulative_pages: u64::default() }
47	}
48}
49
50fn next_key<N: Get<u32>>(
51	child: &ChildInfo,
52	after: &BoundedVec<u8, N>,
53) -> Result<Option<BoundedVec<u8, N>>, SteppedMigrationError> {
54	match sp_io::default_child_storage::next_key(child.storage_key(), after.as_slice()) {
55		Some(k) => Ok(Some(k.try_into().map_err(|_| SteppedMigrationError::Failed)?)),
56		None => Ok(None),
57	}
58}
59
60/// Migrates a single PaginatedPage
61pub fn process_paginated_page<T: Config, N: Get<u32>>(
62	child: &ChildInfo,
63	cur: &mut ChildCursor<N>,
64) -> Result<bool, SteppedMigrationError> {
65	let Some(k) = next_key(child, &cur.last_key).unwrap_or(None) else {
66		if cur.id % <u64>::from(T::MigrateEmitEvery::get()) == 0 {
67			Pallet::<T>::deposit_event(Event::<T>::StatefulPagesMigrated {
68				last_trie: (cur.id, PayloadLocation::Paginated),
69				total_page_count: cur.cumulative_pages,
70			});
71		}
72		// Finished this child → next id
73		cur.last_key = BoundedVec::default();
74		cur.id += 1;
75		return Ok(false);
76	};
77
78	if let Some(old) =
79		StatefulChildTree::<T::KeyHasher>::try_read_raw::<v1::PaginatedPage<T>>(child, &k)
80			.map_err(|_| SteppedMigrationError::Failed)?
81	{
82		let (schema_id, _page_index) =
83			<v1::PaginatedKey as MultipartKey<T::KeyHasher>>::decode(&k[..])
84				.map_err(|_| SteppedMigrationError::Failed)?;
85		let page_parts = (Some(schema_id), old);
86		let mut new_page: crate::PaginatedPage<T> = page_parts.into();
87		new_page.nonce = new_page.nonce.wrapping_add(1);
88
89		StatefulChildTree::<T::KeyHasher>::write_raw(child, &k, new_page);
90	}
91
92	cur.last_key = k;
93	cur.cumulative_pages += 1;
94
95	if cur.cumulative_pages % <u64>::from(T::MigrateEmitEvery::get()) == 0 {
96		Pallet::<T>::deposit_event(Event::<T>::StatefulPagesMigrated {
97			last_trie: (cur.id, PayloadLocation::Paginated),
98			total_page_count: cur.cumulative_pages,
99		});
100	}
101	Ok(true)
102}
103
104/// Migrates a single ItemizedPage
105pub fn process_itemized_page<T: Config, N: Get<u32>>(
106	child: &ChildInfo,
107	cur: &mut ChildCursor<N>,
108) -> Result<bool, SteppedMigrationError> {
109	let Some(k) = next_key(child, &cur.last_key).unwrap_or(None) else {
110		if cur.id % <u64>::from(T::MigrateEmitEvery::get()) == 0 {
111			Pallet::<T>::deposit_event(Event::<T>::StatefulPagesMigrated {
112				last_trie: (cur.id, PayloadLocation::Paginated),
113				total_page_count: cur.cumulative_pages,
114			});
115		}
116		// Finished this child → next id
117		cur.last_key = BoundedVec::default();
118		cur.id += 1;
119		return Ok(false);
120	};
121
122	if let Some(old) =
123		StatefulChildTree::<T::KeyHasher>::try_read_raw::<v1::ItemizedPage<T>>(child, &k)
124			.map_err(|_| SteppedMigrationError::Failed)
125			.expect("failed to read raw itemized page")
126	{
127		let (schema_id,) = <v1::ItemizedKey as MultipartKey<T::KeyHasher>>::decode(&k[..])
128			.map_err(|_| SteppedMigrationError::Failed)
129			.expect("failed to decode itemized key");
130
131		// Parse old page into old items
132		let parsed_page = v1::ItemizedOperations::<T>::try_parse(&old)
133			.map_err(|_| SteppedMigrationError::Failed)?;
134		let min_expected_size = parsed_page.items.len() *
135			(crate::types::ItemHeader::max_encoded_len() - v1::ItemHeader::max_encoded_len()) +
136			parsed_page.page_size;
137
138		// Migrate each old item to the new format and add to a new page buffer
139		let mut updated_page_buffer = Vec::with_capacity(min_expected_size);
140		parsed_page.items.into_iter().for_each(|(_item_index, parsed_item)| {
141			let header = ItemHeader::V2 { schema_id, payload_len: parsed_item.header.payload_len };
142			let mut encoded_item = header.encode();
143			encoded_item.extend_from_slice(&parsed_item.data);
144			updated_page_buffer.extend_from_slice(&encoded_item);
145		});
146
147		let bounded_page_buffer: BoundedVec<u8, T::MaxItemizedPageSizeBytes> =
148			updated_page_buffer.try_into().map_err(|_| SteppedMigrationError::Failed)?;
149		let mut new_page: crate::ItemizedPage<T> = bounded_page_buffer.clone().into();
150		new_page.schema_id = None;
151		new_page.nonce = old.nonce.wrapping_add(1);
152
153		StatefulChildTree::<T::KeyHasher>::write_raw(child, &k, new_page);
154	}
155
156	cur.last_key = k;
157	cur.cumulative_pages += 1;
158
159	if cur.cumulative_pages % <u64>::from(T::MigrateEmitEvery::get()) == 0 {
160		Pallet::<T>::deposit_event(Event::<T>::StatefulPagesMigrated {
161			last_trie: (cur.id, PayloadLocation::Itemized),
162			total_page_count: cur.cumulative_pages,
163		});
164	}
165	Ok(true)
166}
167
168/// The `step` function will be called once per block. It is very important that this function
169/// *never* panics and never uses more weight than it got in its meter. The migrations should also
170/// try to make maximal progress per step, so that the total time it takes to migrate stays low.
171pub struct MigratePaginatedV1ToV2<T: Config, W: weights::WeightInfo>(PhantomData<(T, W)>);
172impl<T: Config, W: weights::WeightInfo> SteppedMigration for MigratePaginatedV1ToV2<T, W> {
173	type Cursor = ChildCursor<PaginatedKeyLength>;
174	// Without the explicit length here the construction of the ID would not be infallible.
175	type Identifier = MigrationId<50>;
176
177	/// The identifier of this migration. Which should be globally unique.
178	fn id() -> Self::Identifier {
179		MigrationId {
180			pallet_id: *b"pallet::stateful-storage::migration::paginated::v2",
181			version_from: 1,
182			version_to: 2,
183		}
184	}
185
186	/// The actual logic of the migration.
187	///
188	/// This function is called repeatedly until it returns `Ok(None)`, indicating that the
189	/// migration is complete. Ideally, the migration should be designed in such a way that each
190	/// step consumes as much weight as possible.
191	fn step(
192		cursor: Option<Self::Cursor>,
193		meter: &mut WeightMeter,
194	) -> Result<Option<Self::Cursor>, SteppedMigrationError> {
195		meter.try_consume(T::DbWeight::get().reads(2)).map_err(|_| {
196			SteppedMigrationError::InsufficientWeight { required: T::DbWeight::get().reads(1) }
197		})?;
198		if StorageVersion::new(2) <= Pallet::<T>::on_chain_storage_version() {
199			log::info!(target: LOG_TARGET, "Skipping migrating paginated storage: storage version already set to 2");
200			return Ok(None);
201		}
202		let max_id = <T::MsaInfoProvider>::get_max_msa_id();
203		let mut cur = cursor.unwrap_or_else(|| {
204			log::info!(target: LOG_TARGET, "Starting migrating paginated storage, max MSA: {max_id}");
205			Self::Cursor::default()
206		});
207		let hit_weight = W::paginated_v1_to_v2_hit();
208		let miss_weight = W::paginated_v1_to_v2_miss();
209
210		if meter.remaining().any_lt(hit_weight) {
211			return Err(SteppedMigrationError::InsufficientWeight { required: hit_weight });
212		}
213
214		let mut page_count = 0u32;
215
216		while cur.id <= max_id {
217			let child = StatefulChildTree::<T::KeyHasher>::get_child_tree_for_storage(
218				cur.id,
219				PALLET_STORAGE_PREFIX,
220				PAGINATED_STORAGE_PREFIX,
221			);
222
223			'inner: loop {
224				if !meter.can_consume(hit_weight) {
225					log::info!(target: LOG_TARGET, "Migrated {page_count} pages; current MSA {}", cur.id);
226					return Ok(Some(cur));
227				}
228				if !process_paginated_page::<T, PaginatedKeyLength>(&child, &mut cur)? {
229					meter.consume(miss_weight);
230					break 'inner;
231				} else {
232					meter.consume(hit_weight);
233					page_count += 1;
234				}
235			}
236		}
237
238		meter.try_consume(T::DbWeight::get().writes(1)).map_err(|_| {
239			SteppedMigrationError::InsufficientWeight { required: T::DbWeight::get().writes(1) }
240		})?;
241		v1::DonePaginated::<T>::put(true);
242		log::info!(target: LOG_TARGET, "Finished migrating paginated storage; migrated {} total pages", cur.cumulative_pages);
243		Ok(None) // done
244	}
245}
246
247/// The `step` function will be called once per block. It is very important that this function
248/// *never* panics and never uses more weight than it got in its meter. The migrations should also
249/// try to make maximal progress per step, so that the total time it takes to migrate stays low.
250pub struct MigrateItemizedV1ToV2<T: Config, W: weights::WeightInfo>(PhantomData<(T, W)>);
251impl<T: Config, W: weights::WeightInfo> SteppedMigration for MigrateItemizedV1ToV2<T, W> {
252	type Cursor = ChildCursor<ItemizedKeyLength>;
253	// Without the explicit length here the construction of the ID would not be infallible.
254	type Identifier = MigrationId<49>;
255
256	/// The identifier of this migration. Which should be globally unique.
257	fn id() -> Self::Identifier {
258		MigrationId {
259			pallet_id: *b"pallet::stateful-storage::migration::itemized::v2",
260			version_from: 1,
261			version_to: 2,
262		}
263	}
264
265	/// The actual logic of the migration.
266	///
267	/// This function is called repeatedly until it returns `Ok(None)`, indicating that the
268	/// migration is complete. Ideally, the migration should be designed in such a way that each
269	/// step consumes as much weight as possible.
270	fn step(
271		cursor: Option<Self::Cursor>,
272		meter: &mut WeightMeter,
273	) -> Result<Option<Self::Cursor>, SteppedMigrationError> {
274		meter.try_consume(T::DbWeight::get().reads(2)).map_err(|_| {
275			SteppedMigrationError::InsufficientWeight { required: T::DbWeight::get().reads(1) }
276		})?;
277		if StorageVersion::new(2) <= Pallet::<T>::on_chain_storage_version() {
278			log::info!(target: LOG_TARGET, "Skipping migrating itemized storage: storage version already set to 2");
279			return Ok(None);
280		}
281		let max_id = <T::MsaInfoProvider>::get_max_msa_id();
282		let mut cur = cursor.unwrap_or_else(|| {
283			log::info!(target: LOG_TARGET, "Starting migrating itemized storage, max MSA: {max_id}");
284			Self::Cursor::default()
285		});
286		let hit_weight = W::itemized_v1_to_v2_hit();
287		let miss_weight = W::itemized_v1_to_v2_miss();
288
289		if meter.remaining().any_lt(hit_weight) {
290			return Err(SteppedMigrationError::InsufficientWeight { required: hit_weight });
291		}
292
293		let mut page_count = 0u32;
294
295		while cur.id <= max_id {
296			let child = StatefulChildTree::<T::KeyHasher>::get_child_tree_for_storage(
297				cur.id,
298				PALLET_STORAGE_PREFIX,
299				ITEMIZED_STORAGE_PREFIX,
300			);
301
302			'inner: loop {
303				if !meter.can_consume(hit_weight) {
304					log::info!(target: LOG_TARGET, "Migrated {page_count} pages; current MSA {}", cur.id);
305					return Ok(Some(cur));
306				}
307				if !process_itemized_page::<T, ItemizedKeyLength>(&child, &mut cur)? {
308					meter.consume(miss_weight);
309					break 'inner;
310				} else {
311					meter.consume(hit_weight);
312					page_count += 1;
313				}
314			}
315		}
316
317		meter.try_consume(T::DbWeight::get().writes(1)).map_err(|_| {
318			SteppedMigrationError::InsufficientWeight { required: T::DbWeight::get().writes(1) }
319		})?;
320		v1::DoneItemized::<T>::put(true);
321		log::info!(target: LOG_TARGET, "Finished migrating itemized storage; migrated {} total pages", cur.cumulative_pages);
322		Ok(None) // done
323	}
324}
325
326/// Finalize the migration by updating the pallet storage version.
327pub struct FinalizeV2Migration<T: Config, W: weights::WeightInfo>(PhantomData<(T, W)>);
328impl<T: Config, W: weights::WeightInfo> SteppedMigration for FinalizeV2Migration<T, W> {
329	type Cursor = ();
330	// Without the explicit length here the construction of the ID would not be infallible.
331	type Identifier = MigrationId<48>;
332
333	/// The identifier of this migration. Which should be globally unique.
334	fn id() -> Self::Identifier {
335		MigrationId {
336			pallet_id: *b"pallet::stateful-storage::migration::v3-finalize",
337			version_from: 1,
338			version_to: 2,
339		}
340	}
341
342	/// Final migration step
343	fn step(
344		_cursor: Option<Self::Cursor>,
345		meter: &mut WeightMeter,
346	) -> Result<Option<Self::Cursor>, SteppedMigrationError> {
347		// If there is not enough weight for a single step, return an error. This case can be
348		// problematic if it is the first migration that ran in this block. But there is nothing
349		// that we can do about it here.
350		let required = T::DbWeight::get().reads(1).saturating_add(T::DbWeight::get().writes(3));
351		if meter.try_consume(required).is_err() {
352			return Err(SteppedMigrationError::InsufficientWeight { required });
353		}
354		if StorageVersion::new(2) <= Pallet::<T>::on_chain_storage_version() {
355			log::info!(target: LOG_TARGET, "Skipping finalization of stateful-storage pallet migration: storage version already set to 2 or higher");
356			return Ok(None);
357		}
358		StorageVersion::new(2).put::<Pallet<T>>();
359		v1::DonePaginated::<T>::kill();
360		v1::DoneItemized::<T>::kill();
361
362		log::info!(target: LOG_TARGET, "Finalized stateful-storage pallet migration: storage version set to 2");
363		Ok(None)
364	}
365
366	#[cfg(feature = "try-runtime")]
367	fn pre_upgrade() -> Result<Vec<u8>, frame_support::sp_runtime::TryRuntimeError> {
368		// pre-upgrade hook is really meant for single-block migrations, as the hook is called for
369		// every block. For MBMs, just return empty until the SteppedMigration is complete
370		if v1::DonePaginated::<T>::exists() && v1::DoneItemized::<T>::exists() {
371			// Return the storage version before the migration
372			Ok(Pallet::<T>::on_chain_storage_version().encode())
373		} else {
374			Ok(Vec::new())
375		}
376	}
377
378	#[cfg(feature = "try-runtime")]
379	fn post_upgrade(prev: Vec<u8>) -> Result<(), frame_support::sp_runtime::TryRuntimeError> {
380		// post-upgrade hook is really meant for single-block migrations, as the hook is called
381		// after every block. For MBMs, we'll set the pre-upgrade to generate an empty Vec<_>,
382		// so here we check for that and only perform our validation if the input is non-empty.
383		if !prev.is_empty() {
384			// Check the len of prev and post are the same.
385			let cur_version = StorageVersion::get::<Pallet<T>>();
386			let target_version = StorageVersion::new(2);
387			if cur_version < target_version {
388				return Err(TryRuntimeError::Other(
389					"Migration failed: current storage version is not 2 or higher",
390				));
391			} else {
392				v1::DonePaginated::<T>::kill();
393				v1::DoneItemized::<T>::kill();
394			}
395		}
396
397		Ok(())
398	}
399}