pallet_messages/migration/
v3.rs1use crate::{
6 migration::{
7 v2,
8 v2::{DoneV3Migration, K1Type, K2Type, K3Type, MessagesV2},
9 },
10 pallet::MessagesV3,
11 weights, Config, Event, Message, MessageIndex, Pallet,
12};
13#[cfg(feature = "try-runtime")]
14use alloc::vec::Vec;
15use common_primitives::schema::SchemaId;
16use core::marker::PhantomData;
17use frame_support::{
18 migrations::{MigrationId, SteppedMigration, SteppedMigrationError},
19 pallet_prelude::{StorageVersion, Zero},
20 storage::PrefixIterator,
21 traits::{Get, GetStorageVersion},
22 weights::WeightMeter,
23};
24use frame_system::pallet_prelude::BlockNumberFor;
25#[cfg(feature = "try-runtime")]
26use parity_scale_codec::Encode;
27
28const LOG_TARGET: &str = "pallet::messages::migration::v3";
29
30#[allow(type_alias_bounds)]
32pub(crate) type MessagesCursor<T: Config> = (BlockNumberFor<T>, SchemaId, MessageIndex, u64);
33
34pub(crate) fn migrate_single_record<T: Config>(
35 iter: &mut PrefixIterator<(
36 (K1Type<T>, K2Type, K3Type),
37 v2::Message<T::MessagesMaxPayloadSizeBytes>,
38 )>,
39 cursor: &mut MessagesCursor<T>,
40) -> bool {
41 let messages_remain = if let Some(((block_number, schema_id, index), value)) = iter.next() {
43 let new_value = Message {
45 schema_id, payload: value.payload,
47 provider_msa_id: value.provider_msa_id,
48 msa_id: value.msa_id,
49 };
50 MessagesV3::<T>::insert((block_number, schema_id, index), new_value);
52
53 cursor.3 += 1;
54 true
55 } else {
56 false
57 };
58
59 if !messages_remain || cursor.3 % <u64>::from(T::MigrateEmitEvery::get()) == 0 {
60 Pallet::<T>::deposit_event(Event::<T>::MessagesMigrated {
61 from_version: 2,
62 to_version: 3,
63 cumulative_total_migrated: cursor.3,
64 });
65 }
66
67 messages_remain
68}
69
70pub struct MigrateV2ToV3<T: Config, W: weights::WeightInfo>(PhantomData<(T, W)>);
76impl<T: Config, W: weights::WeightInfo> SteppedMigration for MigrateV2ToV3<T, W> {
77 type Cursor = MessagesCursor<T>;
81 type Identifier = MigrationId<31>;
83
84 fn id() -> Self::Identifier {
86 MigrationId {
87 pallet_id: *b"pallet::messages::migration::v3",
88 version_from: 2,
89 version_to: 3,
90 }
91 }
92
93 fn step(
99 cursor: Option<Self::Cursor>,
100 meter: &mut WeightMeter,
101 ) -> Result<Option<Self::Cursor>, SteppedMigrationError> {
102 #[cfg(feature = "try-runtime")]
103 if StorageVersion::new(3) <= Pallet::<T>::on_chain_storage_version() {
106 return Ok(None);
107 }
108 let required = W::v2_to_v3_step();
109 if meter.remaining().any_lt(required) {
113 return Err(SteppedMigrationError::InsufficientWeight { required });
114 }
115
116 let mut step_count = 0u32;
117 let mut iter = MessagesV2::<T>::drain();
118 let mut last_cursor = cursor.unwrap_or((BlockNumberFor::<T>::zero(), 0, 0, 0));
119 let mut messages_remain = true;
120
121 while meter.try_consume(required).is_ok() {
123 messages_remain = migrate_single_record::<T>(&mut iter, &mut last_cursor);
125 if !messages_remain {
126 break;
127 } else {
128 step_count += 1;
129 }
130 }
131
132 if step_count > 0 {
133 log::info!(target: LOG_TARGET, "Migrated {}{} messages", step_count, if messages_remain { "" } else { " final" });
134 }
135
136 if !messages_remain {
137 meter.try_consume(T::DbWeight::get().writes(1)).map_err(|_| {
138 SteppedMigrationError::InsufficientWeight { required: T::DbWeight::get().writes(1) }
139 })?;
140 v2::DoneV3Migration::<T>::put(true);
141 }
142 Ok(messages_remain.then_some(last_cursor))
143 }
144}
145
146pub struct FinalizeV3Migration<T: Config, W: weights::WeightInfo>(PhantomData<(T, W)>);
149impl<T: Config, W: weights::WeightInfo> SteppedMigration for FinalizeV3Migration<T, W> {
150 type Cursor = (BlockNumberFor<T>, SchemaId, MessageIndex);
151 type Identifier = MigrationId<40>;
153
154 fn id() -> Self::Identifier {
156 MigrationId {
157 pallet_id: *b"pallet::messages::migration::v3-finalize",
158 version_from: 2,
159 version_to: 3,
160 }
161 }
162
163 fn step(
165 _cursor: Option<Self::Cursor>,
166 meter: &mut WeightMeter,
167 ) -> Result<Option<Self::Cursor>, SteppedMigrationError> {
168 #[cfg(feature = "try-runtime")]
169 if Pallet::<T>::on_chain_storage_version() >= StorageVersion::new(3) {
172 return Ok(None);
173 }
174 let required = W::v2_to_v3_final_step();
175 if meter.try_consume(required).is_err() {
179 return Err(SteppedMigrationError::InsufficientWeight { required });
180 }
181
182 if Pallet::<T>::on_chain_storage_version() >= StorageVersion::new(3) {
184 log::info!(target: LOG_TARGET, "Messages pallet migration finalization: storage version already set to 3");
185 } else {
186 StorageVersion::new(3).put::<Pallet<T>>();
187 log::info!(target: LOG_TARGET, "Finalized messages pallet migration: storage version set to 3");
188 }
189
190 DoneV3Migration::<T>::kill();
192 Ok(None)
193 }
194
195 #[cfg(feature = "try-runtime")]
196 fn pre_upgrade() -> Result<Vec<u8>, frame_support::sp_runtime::TryRuntimeError> {
197 if v2::DoneV3Migration::<T>::exists() {
200 Ok(Pallet::<T>::on_chain_storage_version().encode())
202 } else {
203 Ok(Vec::new())
204 }
205 }
206
207 #[cfg(feature = "try-runtime")]
208 fn post_upgrade(prev: Vec<u8>) -> Result<(), frame_support::sp_runtime::TryRuntimeError> {
209 if !prev.is_empty() {
213 let target_version = StorageVersion::new(3);
216 let current_version = StorageVersion::get::<Pallet<T>>();
217 if current_version < target_version {
218 return Err(frame_support::sp_runtime::TryRuntimeError::Other(
219 "Migration failed: current storage version is not 3 or higher",
220 ))
221 }
222
223 v2::DoneV3Migration::<T>::kill();
224 }
225
226 Ok(())
227 }
228}