pallet_messages/migration/
v3.rs

1//! Migration to add schema_id to `messages` pallet storage.
2//! NOTE: Pallet activity is not suspended during this migration.
3//! All writes go to the new storage with new keys, so there is no danger
4//! of overlap with entries being migrated.
5use crate::{
6	migration::{
7		v2,
8		v2::{DoneV3Migration, K1Type, K2Type, K3Type, MessagesV2},
9	},
10	pallet::MessagesV3,
11	weights, Config, Event, Message, MessageIndex, Pallet,
12};
13#[cfg(feature = "try-runtime")]
14use alloc::vec::Vec;
15use common_primitives::schema::SchemaId;
16use core::marker::PhantomData;
17use frame_support::{
18	migrations::{MigrationId, SteppedMigration, SteppedMigrationError},
19	pallet_prelude::{StorageVersion, Zero},
20	storage::PrefixIterator,
21	traits::{Get, GetStorageVersion},
22	weights::WeightMeter,
23};
24use frame_system::pallet_prelude::BlockNumberFor;
25#[cfg(feature = "try-runtime")]
26use parity_scale_codec::Encode;
27
28const LOG_TARGET: &str = "pallet::messages::migration::v3";
29
30// Migration cursor: (storage index, cumulative records processed)
31#[allow(type_alias_bounds)]
32pub(crate) type MessagesCursor<T: Config> = (BlockNumberFor<T>, SchemaId, MessageIndex, u64);
33
34pub(crate) fn migrate_single_record<T: Config>(
35	iter: &mut PrefixIterator<(
36		(K1Type<T>, K2Type, K3Type),
37		v2::Message<T::MessagesMaxPayloadSizeBytes>,
38	)>,
39	cursor: &mut MessagesCursor<T>,
40) -> bool {
41	// If there's a next item in the iterator, perform the migration.
42	let messages_remain = if let Some(((block_number, schema_id, index), value)) = iter.next() {
43		// Migrate the inner value to the new structure
44		let new_value = Message {
45			schema_id, // schema_id is added to the struct
46			payload: value.payload,
47			provider_msa_id: value.provider_msa_id,
48			msa_id: value.msa_id,
49		};
50		// We can just insert here since the old and the new map share the same key-space.
51		MessagesV3::<T>::insert((block_number, schema_id, index), new_value);
52
53		cursor.3 += 1;
54		true
55	} else {
56		false
57	};
58
59	if !messages_remain || cursor.3 % <u64>::from(T::MigrateEmitEvery::get()) == 0 {
60		Pallet::<T>::deposit_event(Event::<T>::MessagesMigrated {
61			from_version: 2,
62			to_version: 3,
63			cumulative_total_migrated: cursor.3,
64		});
65	}
66
67	messages_remain
68}
69
70/// Migrates the items of the [`v2::MessagesV2`] map to [`crate::MessagesV3`]
71///
72/// The `step` function will be called once per block. It is very important that this function
73/// *never* panics and never uses more weight than it got in its meter. The migrations should also
74/// try to make maximal progress per step, so that the total time it takes to migrate stays low.
75pub struct MigrateV2ToV3<T: Config, W: weights::WeightInfo>(PhantomData<(T, W)>);
76impl<T: Config, W: weights::WeightInfo> SteppedMigration for MigrateV2ToV3<T, W> {
77	// Cursor type for migration. Not used for migration logic itself, other than
78	// for emitting/tracking migration progress/status.
79	// ((tuple of last message processed), cumulative message count)
80	type Cursor = MessagesCursor<T>;
81	// Without the explicit length here the construction of the ID would not be infallible.
82	type Identifier = MigrationId<31>;
83
84	/// The identifier of this migration. Which should be globally unique.
85	fn id() -> Self::Identifier {
86		MigrationId {
87			pallet_id: *b"pallet::messages::migration::v3",
88			version_from: 2,
89			version_to: 3,
90		}
91	}
92
93	/// The actual logic of the migration.
94	///
95	/// This function is called repeatedly until it returns `Ok(None)`, indicating that the
96	/// migration is complete. Ideally, the migration should be designed in such a way that each
97	/// step consumes as much weight as possible.
98	fn step(
99		cursor: Option<Self::Cursor>,
100		meter: &mut WeightMeter,
101	) -> Result<Option<Self::Cursor>, SteppedMigrationError> {
102		#[cfg(feature = "try-runtime")]
103		// Extra check for try-runtime, since pallet-migrations is not seeded with
104		// the completed migrations.
105		if StorageVersion::new(3) <= Pallet::<T>::on_chain_storage_version() {
106			return Ok(None);
107		}
108		let required = W::v2_to_v3_step();
109		// If there is not enough weight for a single step, return an error. This case can be
110		// problematic if it is the first migration that ran in this block. But there is nothing
111		// that we can do about it here.
112		if meter.remaining().any_lt(required) {
113			return Err(SteppedMigrationError::InsufficientWeight { required });
114		}
115
116		let mut step_count = 0u32;
117		let mut iter = MessagesV2::<T>::drain();
118		let mut last_cursor = cursor.unwrap_or((BlockNumberFor::<T>::zero(), 0, 0, 0));
119		let mut messages_remain = true;
120
121		// We loop here to do as much progress as possible per step.
122		while meter.try_consume(required).is_ok() {
123			// If there's a next item in the iterator, perform the migration.
124			messages_remain = migrate_single_record::<T>(&mut iter, &mut last_cursor);
125			if !messages_remain {
126				break;
127			} else {
128				step_count += 1;
129			}
130		}
131
132		if step_count > 0 {
133			log::info!(target: LOG_TARGET, "Migrated {}{} messages", step_count, if messages_remain { "" } else { " final" });
134		}
135
136		if !messages_remain {
137			meter.try_consume(T::DbWeight::get().writes(1)).map_err(|_| {
138				SteppedMigrationError::InsufficientWeight { required: T::DbWeight::get().writes(1) }
139			})?;
140			v2::DoneV3Migration::<T>::put(true);
141		}
142		Ok(messages_remain.then_some(last_cursor))
143	}
144}
145
146/// Finalize the migration of [`v2::MessagesV2`] map to [`crate::MessagesV3`]
147/// by updating the pallet storage version.
148pub struct FinalizeV3Migration<T: Config, W: weights::WeightInfo>(PhantomData<(T, W)>);
149impl<T: Config, W: weights::WeightInfo> SteppedMigration for FinalizeV3Migration<T, W> {
150	type Cursor = (BlockNumberFor<T>, SchemaId, MessageIndex);
151	// Without the explicit length here the construction of the ID would not be infallible.
152	type Identifier = MigrationId<40>;
153
154	/// The identifier of this migration. Which should be globally unique.
155	fn id() -> Self::Identifier {
156		MigrationId {
157			pallet_id: *b"pallet::messages::migration::v3-finalize",
158			version_from: 2,
159			version_to: 3,
160		}
161	}
162
163	/// Final migration step
164	fn step(
165		_cursor: Option<Self::Cursor>,
166		meter: &mut WeightMeter,
167	) -> Result<Option<Self::Cursor>, SteppedMigrationError> {
168		#[cfg(feature = "try-runtime")]
169		// Extra check for try-runtime, since pallet-migrations is not seeded with
170		// the completed migrations.
171		if Pallet::<T>::on_chain_storage_version() >= StorageVersion::new(3) {
172			return Ok(None);
173		}
174		let required = W::v2_to_v3_final_step();
175		// If there is not enough weight for a single step, return an error. This case can be
176		// problematic if it is the first migration that ran in this block. But there is nothing
177		// that we can do about it here.
178		if meter.try_consume(required).is_err() {
179			return Err(SteppedMigrationError::InsufficientWeight { required });
180		}
181
182		// Make sure this migration is idempotent--don't set storage version if already at or higher then 3
183		if Pallet::<T>::on_chain_storage_version() >= StorageVersion::new(3) {
184			log::info!(target: LOG_TARGET, "Messages pallet migration finalization: storage version already set to 3");
185		} else {
186			StorageVersion::new(3).put::<Pallet<T>>();
187			log::info!(target: LOG_TARGET, "Finalized messages pallet migration: storage version set to 3");
188		}
189
190		// Clean up ephemeral migration storage
191		DoneV3Migration::<T>::kill();
192		Ok(None)
193	}
194
195	#[cfg(feature = "try-runtime")]
196	fn pre_upgrade() -> Result<Vec<u8>, frame_support::sp_runtime::TryRuntimeError> {
197		// pre-upgrade hook is really meant for single-block migrations, as the hook is called for
198		// every block. For MBMs, just return empty until the SteppedMigration is complete
199		if v2::DoneV3Migration::<T>::exists() {
200			// Return the storage version before the migration
201			Ok(Pallet::<T>::on_chain_storage_version().encode())
202		} else {
203			Ok(Vec::new())
204		}
205	}
206
207	#[cfg(feature = "try-runtime")]
208	fn post_upgrade(prev: Vec<u8>) -> Result<(), frame_support::sp_runtime::TryRuntimeError> {
209		// post-upgrade hook is really meant for single-block migrations, as the hook is called
210		// after every block. For MBMs, we'll set the pre-upgrade to generate an empty Vec<_>,
211		// so here we check for that and only perform our validation if the input is non-empty.
212		if !prev.is_empty() {
213			// Check the state of the storage after the migration.
214			// Check the len of prev and post are the same.
215			let target_version = StorageVersion::new(3);
216			let current_version = StorageVersion::get::<Pallet<T>>();
217			if current_version < target_version {
218				return Err(frame_support::sp_runtime::TryRuntimeError::Other(
219					"Migration failed: current storage version is not 3 or higher",
220				))
221			}
222
223			v2::DoneV3Migration::<T>::kill();
224		}
225
226		Ok(())
227	}
228}