1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
//! Types for the Schema Pallet
use crate::{Config, Error};
use common_primitives::schema::{
	ModelType, PayloadLocation, SchemaId, SchemaSetting, SchemaSettings, SchemaVersion,
	SchemaVersionResponse,
};
use frame_support::{ensure, pallet_prelude::ConstU32, traits::StorageVersion, BoundedVec};
use parity_scale_codec::{Decode, Encode, MaxEncodedLen};
use scale_info::TypeInfo;
use sp_runtime::DispatchError;
use sp_std::fmt::Debug;
extern crate alloc;
use alloc::string::String;
use frame_support::traits::Len;
use sp_std::{vec, vec::*};

/// Current storage version of the schemas pallet.
pub const SCHEMA_STORAGE_VERSION: StorageVersion = StorageVersion::new(4);

/// The maximum size of schema name including all parts
pub const SCHEMA_NAME_BYTES_MAX: u32 = 32; // Hard limit of 32 bytes
/// A schema name following following structure NAMESPACE.DESCRIPTOR
pub type SchemaNamePayload = BoundedVec<u8, ConstU32<SCHEMA_NAME_BYTES_MAX>>;
/// schema namespace type
pub type SchemaNamespace = BoundedVec<u8, ConstU32<NAMESPACE_MAX>>;
/// schema descriptor type
pub type SchemaDescriptor = BoundedVec<u8, ConstU32<DESCRIPTOR_MAX>>;
/// The minimum size of a namespace in schema
pub const NAMESPACE_MIN: u32 = 3;
/// The maximum size of a namespace in schema
pub const NAMESPACE_MAX: u32 = SCHEMA_NAME_BYTES_MAX - (DESCRIPTOR_MIN + 1);
/// The minimum size of a schema descriptor
pub const DESCRIPTOR_MIN: u32 = 1;
/// The maximum size of a schema descriptor
pub const DESCRIPTOR_MAX: u32 = SCHEMA_NAME_BYTES_MAX - (NAMESPACE_MIN + 1);
/// separator character
pub const SEPARATOR_CHAR: char = '.';
/// maximum number of versions for a certain schema name
/// -1 is to avoid overflow when converting the (index + 1) to `SchemaVersion` in `SchemaVersionId`
pub const MAX_NUMBER_OF_VERSIONS: u32 = SchemaVersion::MAX as u32 - 1;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
/// Genesis Schemas need a way to load up and this is it!
pub struct GenesisSchema {
	/// The type of model (AvroBinary, Parquet, etc.)
	pub model_type: ModelType,
	/// The payload location
	pub payload_location: PayloadLocation,
	/// The Payload Model
	pub model: String,
	/// Schema Full Name: {Namespace}.{Descriptor}
	pub name: String,
	/// Settings
	pub settings: Vec<SchemaSetting>,
}

#[derive(Clone, Encode, Decode, PartialEq, Debug, TypeInfo, Eq, MaxEncodedLen)]
/// A structure defining a Schema information (excluding the payload)
pub struct SchemaInfo {
	/// The type of model (AvroBinary, Parquet, etc.)
	pub model_type: ModelType,
	/// The payload location
	pub payload_location: PayloadLocation,
	/// additional control settings for the schema
	pub settings: SchemaSettings,
	/// Defines if a schema has a name or not
	pub has_name: bool,
}

#[derive(Clone, Encode, Decode, PartialEq, Debug, TypeInfo, Eq, MaxEncodedLen)]
/// A structure defining name of a schema
pub struct SchemaName {
	/// namespace or domain of the schema
	pub namespace: SchemaNamespace,
	/// name or descriptor of this schema
	pub descriptor: SchemaDescriptor,
}

#[derive(Clone, Encode, Decode, PartialEq, Debug, TypeInfo, Eq, MaxEncodedLen, Default)]
/// A structure defining name of a schema
pub struct SchemaVersionId {
	/// the index of each item + 1 is considered as their version.
	/// Ex: the schemaId located in `ids[2]` is for version number 3
	pub ids: BoundedVec<SchemaId, ConstU32<MAX_NUMBER_OF_VERSIONS>>,
}

impl SchemaName {
	/// parses and verifies the request and returns the SchemaName type if successful
	pub fn try_parse<T: Config>(
		payload: SchemaNamePayload,
		is_strict: bool,
	) -> Result<SchemaName, DispatchError> {
		// check if all ascii
		let mut str = String::from_utf8(payload.into_inner())
			.map_err(|_| Error::<T>::InvalidSchemaNameEncoding)?;
		ensure!(str.is_ascii(), Error::<T>::InvalidSchemaNameEncoding);

		// to canonical form
		str = String::from(str.to_lowercase().trim());

		// check if alphabetic or - or separator character
		ensure!(
			str.chars().all(|c| c.is_ascii_alphabetic() || c == '-' || c == SEPARATOR_CHAR),
			Error::<T>::InvalidSchemaNameCharacters
		);

		// split to namespace and descriptor
		let chunks: Vec<_> = str.split(SEPARATOR_CHAR).collect();
		ensure!(
			chunks.len() == 2 || (chunks.len() == 1 && !is_strict),
			Error::<T>::InvalidSchemaNameStructure
		);

		// check namespace
		let namespace = BoundedVec::try_from(chunks[0].as_bytes().to_vec())
			.map_err(|_| Error::<T>::InvalidSchemaNamespaceLength)?;
		ensure!(NAMESPACE_MIN <= namespace.len() as u32, Error::<T>::InvalidSchemaNamespaceLength);
		// should not start or end with -
		ensure!(
			!(namespace.starts_with(b"-") || namespace.ends_with(b"-")),
			Error::<T>::InvalidSchemaNameStructure
		);

		// check descriptor
		let descriptor = match chunks.len() == 2 {
			true => {
				let descriptor = BoundedVec::try_from(chunks[1].as_bytes().to_vec())
					.map_err(|_| Error::<T>::InvalidSchemaDescriptorLength)?;
				ensure!(
					DESCRIPTOR_MIN <= descriptor.len() as u32,
					Error::<T>::InvalidSchemaDescriptorLength
				);
				// should not start or end with -
				ensure!(
					!(descriptor.starts_with(b"-") || descriptor.ends_with(b"-")),
					Error::<T>::InvalidSchemaNameStructure
				);
				descriptor
			},
			false => BoundedVec::default(),
		};

		Ok(SchemaName { namespace, descriptor })
	}

	/// get the combined name namespace.descriptor
	pub fn get_combined_name(&self) -> Vec<u8> {
		vec![
			self.namespace.clone().into_inner(),
			vec![SEPARATOR_CHAR as u8],
			self.descriptor.clone().into_inner(),
		]
		.concat()
	}

	/// creates a new SchemaName using provided descriptor
	pub fn new_with_descriptor(&self, descriptor: SchemaDescriptor) -> Self {
		Self { namespace: self.namespace.clone(), descriptor }
	}

	/// returns true if the descriptor exists
	pub fn descriptor_exists(&self) -> bool {
		self.descriptor.len() > 0
	}
}

impl SchemaVersionId {
	/// adds a new schema id and returns the version for that schema_id
	pub fn add<T: Config>(&mut self, schema_id: SchemaId) -> Result<SchemaVersion, DispatchError> {
		let is_new = !self.ids.iter().any(|id| id == &schema_id);
		ensure!(is_new, Error::<T>::SchemaIdAlreadyExists);
		self.ids
			.try_push(schema_id)
			.map_err(|_| Error::<T>::ExceedsMaxNumberOfVersions)?;
		let version = self.ids.len() as SchemaVersion;
		Ok(version)
	}

	/// convert into a response vector
	pub fn convert_to_response(&self, schema_name: &SchemaName) -> Vec<SchemaVersionResponse> {
		self.ids
			.iter()
			.enumerate()
			.map(|(index, schema_id)| SchemaVersionResponse {
				schema_name: schema_name.get_combined_name(),
				schema_id: *schema_id,
				schema_version: (index + 1) as SchemaVersion,
			})
			.collect()
	}
}