common_helpers/avro.rs
1use crate::types::*;
2use apache_avro::{from_avro_datum, schema::Schema, to_avro_datum, types::Record, Codec, Writer};
3use std::{collections::HashMap, io::Cursor, str};
4
5/// Represents error types returned by the `avro` module.
6#[derive(thiserror::Error, Debug)]
7pub enum AvroError {
8 #[error("I/O error")]
9 Io(#[from] std::io::Error),
10 #[error("Avro error")]
11 Avro(#[from] apache_avro::Error),
12 #[error("Invalid avro schema: {0}")]
13 InvalidSchema(String),
14 #[error("Invalid avro records")]
15 InvalidRecords(),
16}
17
18/// Function to convert a raw schema into serialized Avro schema.
19/// If schema is malformed or invalid, returns an error.
20///
21/// # Examples
22/// ```
23/// use common_helpers::avro;
24/// use common_helpers::types::*;
25/// let raw_schema = r#"{"type": "record", "name": "User", "fields": [{"name": "name", "type": "string"}, {"name": "favorite_number", "type": "int"}]}"#;
26/// let schema_result = avro::fingerprint_raw_schema(raw_schema);
27/// assert!(schema_result.is_ok());
28/// let serialized_schema = schema_result.unwrap().1;
29/// ```
30pub fn fingerprint_raw_schema(raw_schema: &str) -> Result<(Schema, Vec<u8>), AvroError> {
31 let schema_result = Schema::parse_str(raw_schema)?;
32 let schema_canonical_form = schema_result.canonical_form();
33 Ok((schema_result, schema_canonical_form.as_bytes().to_vec()))
34}
35
36///Function to convert a serialized Avro schema into Avro Schema type.
37/// If schema is malformed or invalid, returns an error.
38///
39/// # Examples
40/// ```
41/// use common_helpers::avro;
42/// use common_helpers::types::*;
43/// let raw_schema = r#"{"type": "record", "name": "User", "fields": [{"name": "name", "type": "string"}, {"name": "favorite_number", "type": "int"}]}"#;
44/// let serialized_schema = avro::fingerprint_raw_schema(raw_schema);
45/// assert!(serialized_schema.is_ok());
46/// let schema = serialized_schema.unwrap().1;
47/// let translated_schema = avro::translate_schema(schema);
48/// assert!(translated_schema.is_ok());
49/// ```
50pub fn translate_schema(serialized_schema: Vec<u8>) -> Result<Schema, AvroError> {
51 match str::from_utf8(&serialized_schema) {
52 Ok(schema_str) => {
53 let schema = Schema::parse_str(schema_str)?;
54 Ok(schema)
55 },
56 Err(error) => Err(AvroError::InvalidSchema(error.to_string())),
57 }
58}
59
60/// Function to get the schema writer with default container as `Vec<u8>`
61///
62/// # Examples
63/// ```
64/// use common_helpers::avro;
65/// use common_helpers::types::*;
66/// let raw_schema = r#"{"type": "record", "name": "User", "fields": [{"name": "name", "type": "string"}, {"name": "favorite_number", "type": "int"}]}"#;
67/// let schema_result = avro::fingerprint_raw_schema(raw_schema);
68/// assert!(schema_result.is_ok());
69/// let avro_schema = schema_result.unwrap().0;
70/// let schema_writer = avro::get_schema_data_writer(&avro_schema);
71/// ```
72pub fn get_schema_data_writer(schema: &Schema) -> Writer<Vec<u8>> {
73 Writer::with_codec(schema, Vec::new(), Codec::Snappy)
74}
75
76/// Function to populate a given schema with data and return serialized record.
77///
78/// # Remarks
79/// * `record` is the record to be written as HashMap<String, SchemaValue> where SchemaValue is common types
80///
81/// # Examples
82/// ```
83/// use common_helpers::avro;
84/// use common_helpers::types::*;
85/// use std::{collections::HashMap};
86/// let raw_schema = r#"
87/// {
88/// "type": "record",
89/// "name": "test",
90/// "fields": [
91/// {"name": "a", "type": "long", "default": 42},
92/// {"name": "b", "type": "string"}
93/// ]
94/// }
95/// "#;
96/// let schema_fingerprint = avro::fingerprint_raw_schema(raw_schema);
97/// let mut hashmap_data = HashMap::new();
98/// let mut name = "name".to_string();
99/// let mut name_field = SchemaValue::String("John".to_string());
100/// hashmap_data.insert("a".to_string(), SchemaValue::Long(27i64));
101/// hashmap_data.insert("b".to_string(), SchemaValue::String("foo".to_string()));
102/// assert!(schema_fingerprint.is_ok());
103/// let data_schema = schema_fingerprint.unwrap().0;
104/// let serialized_record = avro::populate_schema_and_serialize(&data_schema, &hashmap_data);
105/// assert!(serialized_record.is_ok());
106/// ```
107pub fn populate_schema_and_serialize(
108 schema: &Schema,
109 records: &HashMap<String, SchemaValue>,
110) -> Result<Vec<u8>, AvroError> {
111 let writer = get_schema_data_writer(schema);
112 match Record::new(writer.schema()) {
113 None =>
114 Err(AvroError::InvalidSchema("Could not create record from this schema".to_string())),
115 Some(mut record_list) => {
116 for (field_name, field_value) in records.iter() {
117 record_list.put(field_name, field_value.clone());
118 }
119 let datum_res = to_avro_datum(schema, record_list)?;
120 Ok(datum_res)
121 },
122 }
123}
124
125/// Function to get serialized datum data for a given schema into hashmap.
126///
127/// # Examples
128/// ```
129/// use common_helpers::avro;
130/// use common_helpers::types::*;
131/// use std::{collections::HashMap};
132/// let raw_schema = r#"
133/// {
134/// "type": "record",
135/// "name": "test",
136/// "fields": [
137/// {"name": "a", "type": "long", "default": 42},
138/// {"name": "b", "type": "string"}
139/// ]
140/// }
141/// "#;
142/// let schema_fingerprint = avro::fingerprint_raw_schema(raw_schema);
143/// let mut hashmap_data = HashMap::new();
144/// let mut name = "name".to_string();
145/// let mut name_field = SchemaValue::String("John".to_string());
146/// hashmap_data.insert("a".to_string(), SchemaValue::Long(27i64));
147/// hashmap_data.insert("b".to_string(), SchemaValue::String("foo".to_string()));
148/// assert!(schema_fingerprint.is_ok());
149/// let data_schema = schema_fingerprint.unwrap().0;
150/// let serialized_record = avro::populate_schema_and_serialize(&data_schema, &hashmap_data);
151/// assert!(serialized_record.is_ok());
152/// let serialized_data = serialized_record.unwrap();
153/// let deserialized_data = avro::get_schema_data_map(&serialized_data, &data_schema);
154/// assert!(deserialized_data.is_ok());
155/// ```
156pub fn get_schema_data_map<'a>(
157 serialized_data: &'a Vec<u8>,
158 schema: &'a Schema,
159) -> Result<HashMap<String, SchemaValue>, AvroError> {
160 let from_data_datum = from_avro_datum(schema, &mut Cursor::new(serialized_data), None)?;
161 let mut result_record = HashMap::<String, SchemaValue>::new();
162
163 match from_data_datum {
164 SchemaValue::Record(record) =>
165 for (field_name, field_value) in record.iter() {
166 result_record.insert(field_name.clone(), field_value.clone());
167 },
168 _ => return Err(AvroError::InvalidRecords()),
169 }
170
171 Ok(result_record)
172}
173
174/// Function to validate incoming json serialized schema against avro schema.
175///
176/// # Examples
177/// ```
178/// use common_helpers::avro;
179/// use common_helpers::types::*;
180/// let raw_schema = r#"
181/// {
182/// "type": "record",
183/// "name": "test",
184/// "fields": [
185/// {"name": "a", "type": "long", "default": 42},
186/// {"name": "b", "type": "string"}
187/// ]
188/// }
189/// "#;
190/// let schema_fingerprint = avro::fingerprint_raw_schema(raw_schema);
191/// assert!(schema_fingerprint.is_ok());
192/// ```
193pub fn validate_raw_avro_schema(json_schema: &[u8]) -> Result<(), AvroError> {
194 match String::from_utf8(json_schema.to_owned()) {
195 Err(_e) => Err(AvroError::InvalidSchema("Invalid schema".to_string())),
196 Ok(avro_schema) => {
197 let schema_fingerprint = fingerprint_raw_schema(&avro_schema);
198 if schema_fingerprint.is_err() {
199 return Err(AvroError::InvalidSchema("Invalid schema".to_string()));
200 }
201 Ok(())
202 },
203 }
204}