Creating a Predictive System using PyTorch and Rust

October 25, 2024 - Neirth

I hadn’t published anything here for a while. Many adventures that I will surely have to document soon.

This time, I wanted to practice with a project that I had been meaning to explore for some time. Setting up my first AI model beyond a classroom and integrating it with an application (which could have been made in Rust, Golang, Flutter, or any other technology, to be honest).

As a curious fact, this project is inspired by the same technological foundations as the ‘Bachellor’s Thesis’ that I will present in a few months at the Universitat Politècnica de València thus ending a stage that has been a true rollercoaster of emotions.

Understanding the Underlying Problem

Previously, at the Universitat Politècnica de València, I had worked with TensorFlow and Scikit Learn, creating many small Classification and Regression models. However, I was worried about having such a high degree of dependency on TensorFlow and not exploring other frameworks like PyTorch.

Additionally, I didn’t quite understand how to design a Deep Learning model that, using a context of previous values and their respective timestamps, could predict the next value of a series. I’m not talking about a series of features inputted as different inputs to return an output; I’m referring to a matrix as a feature whose values are time-dependent.

So, I decided to take a chance and venture into researching this issue. To do this, I had to investigate the topic I was going to address to solve this problem. In this case, I was inspired by models predicting electrical loads, which are quite relevant for understanding the demand that an electrical grid might receive and, consequently, for adapting to it.

Honestly, before diving deep into this, I particularly like the concept of decoupling elements. This generally helps avoid introducing unnecessary runtime dependencies, thereby minimizing the overall size of the application and introducing a good degree of modularity to the project.

What is ONNX, and Why is it Important for Portability?

To understand ONNX thoroughly, it’s essential to clarify that Machine Learning and Deep Learning models are developed in two fundamental stages: the Training stage and the Inference stage. These phases do not necessarily have to run on the same device or under the same frameworks.

Based on this, it becomes crucial to have a method for serializing the model after training, so it can then be used in the inference phase. For embedded systems, for example, TensorFlow developed the .tflite format, which I have previously tested with very good results on such devices. Additionally, the inference library from Sonos, Tract, provides excellent support for this format in Rust. However, I intended to explore options beyond the TensorFlow ecosystem.

Once this is explained, this is where ONNX (Open Neural Network Exchange) plays a fundamental role, standing out as a neutral format independent of any specific framework, allowing the serialization of models that can be used with different tools and environments. In fact, ONNX has established itself as the de facto standard for facilitating the flexible deployment of models, even in cases where TensorFlow Lite presents limitations.

To further understand the power of this format, we should consider that when we define a model in PyTorch, for example, this graph can ultimately be represented within the ONNX model.

Let’s illustrate this with a good example; here we have the model we will use in this project, which we will explain how it works internally later on:

import torch
import torch.nn as nn

INPUT_SIZE = 3  # Number of features in the input
HIDDEN_SIZE = 64  # Size of hidden units in the LSTM

class GridLSTMModel(nn.Module):
    def __init__(self):
        super(GridLSTMModel, self).__init__()

        # LSTM with 4 layers
        self.lstm = nn.LSTM(INPUT_SIZE, HIDDEN_SIZE, 4, batch_first=True, bidirectional=False)

        # Using mean pooling to reduce the sequence
        self.fc = nn.Linear(HIDDEN_SIZE, 1)  # Final fully-connected layer

    def forward(self, x):
        # x shape: (batch_size, seq_length, input_size)
        lstm_out, _ = self.lstm(x)  # lstm_out: (batch_size, seq_length, hidden_size)

        # Compute the mean across the sequence dimension
        mean_out = torch.mean(lstm_out, dim=1)  # (batch_size, hidden_size)

        # Pass the reduced context through the fully-connected layer
        output = self.fc(mean_out)  # (batch_size, 1)

        return output

And this is how the graph would look when visualized with tools like Netron or similar:

Modelo de Red Neuronal Recurrente en ONNX

The ability to perform an efficient conversion of models without relying on specific frameworks like TensorFlow or PyTorch for inference is what gives ONNX its great utility. This format allows for easy and effective model exporting, facilitating implementation in production environments. This is exactly what we will need to quickly debug this project, as it means we can make changes to the model without impacting the code we write in Rust.

How Do We Collect (and Preprocess) Training Data?

I have to be honest here; the ENTSOE Transparency Portal has been very helpful for retrieving the CSV dataset I used for this project. Additionally, its public API has allowed me to collect all the data related to real-time electricity demand.

START_TIME STOP_TIME FORECAST_LOAD REAL_LOAD
01/01/15 0:00 01/01/15 0:15 6794 6168
01/01/15 0:15 01/01/15 0:30 6757 6088
01/01/15 0:30 01/01/15 0:45 6791 6060
01/01/15 0:45 01/01/15 1:00 6750 5958
01/01/15 1:00 01/01/15 1:15 6737 6017
01/01/15 1:15 01/01/15 1:30 6692 5967
01/01/15 1:30 01/01/15 1:45 6722 5936
01/01/15 1:45 01/01/15 2:00 6690 5934
01/01/15 2:00 01/01/15 2:15 6633 5751
01/01/15 2:15 01/01/15 2:30 6573 5778
01/01/15 2:30 01/01/15 2:45 6602 5746

This is roughly what the dataset I’ve been using looks like, which is essentially a near-pure extract of what I received from the historical series. I say “near-pure extract” because I’ve renamed the columns for convenience. If we take a look, we can see that we have a column describing the expected load according to the model that the folks at ENTSOE have in production. With this, we could calculate the deviation of our model compared to their production model.

If we want the model to learn based on this data, we will need to use the concept of sliding time windows so that it can predict the next value of the series conditioned by the hour and the day.

How Do We Design (and Export) the Prediction Model?

The model I have decided to use for this project is a Recurrent Neural Network (RNN), specifically an LSTM model. This type of model is very useful for working with time series, as it can remember information from previous steps and use it to predict the next value.

In this case, the model I designed is an LSTM model that takes a sequence of values and returns a single value. To incorporate time information into the model, I decided to use a sinusoidal representation, which involves representing values such as the day of the year or the hour of the day as a sine wave.

In simple terms, to understand what the LSTM operator does: an LSTM is a type of recurrent neural network that can remember information from previous steps and use it to predict the next value. In this case, the model I designed is an LSTM model that takes a sequence of values and returns a single value.

To do this, I had to tackle the challenge of creating a function that takes the entire DataFrame and from there can create the sliding time window. To achieve this, I created a function that takes the DataFrame and the window size as input and returns a sequence of tensors with the time information encoded in sinusoidal form. Below is the code for the function:

def create_daily_sliding_windows(df: DataFrame, window_size: int) -> DataFrame:
    df = normalize_by_year(df)

    # Convert TIMESTAMP to datetime for easier grouping
    df['TIMESTAMP'] = to_datetime(df['TIMESTAMP'], unit='ns')

    # Create the sinusoidal components
    df['day_of_year'] = df['TIMESTAMP'].dt.dayofyear
    df['minutes_of_day'] = df['TIMESTAMP'].dt.hour * 60 + df['TIMESTAMP'].dt.minute

    # Sinusoidal component of the day in the year
    df['day_sin'] = np.sin(2 * np.pi * df['day_of_year'] / 365)

    # Sinusoidal component of the minutes in the day
    df['minute_sin'] = np.sin(2 * np.pi * df['minutes_of_day'] / 1440)  # 1440 = 60 * 24 (minutes in a day)

    # Group by day
    daily_groups = df.groupby(df['TIMESTAMP'].dt.date)

    windows = []

    for date, group in daily_groups:
        # Check if the group has enough data to create windows
        if len(group) > window_size:
            # Create sliding windows for each daily group
            for i in range(len(group) - window_size):
                window_values = group['GLOBAL_LOAD_TOTAL'].iloc[i:i + window_size].values
                next_value = group['GLOBAL_LOAD_TOTAL'].iloc[i + window_size]

                # Add the sinusoidal components to the window
                window_day_sin = group['day_sin'].iloc[i:i + window_size].values
                window_minute_sin = group['minute_sin'].iloc[i:i + window_size].values

                if len(window_values) == window_size:
                    windows.append((window_values, window_day_sin, window_minute_sin, next_value))
        else:
            print(f"[!] The group for {date} has fewer records than the window size ({len(group)} < {window_size})")

    # Check if any windows were generated
    if len(windows) == 0:
        print("[!] No sliding windows have been generated, check the size of your groups and the window.")

    # Convert the list of windows to a DataFrame
    window_df = DataFrame(windows, columns=['window_values', 'day_sin', 'minute_sin', 'next_value'])

    print("[*] Data generated from the raw dataset:")
    print(window_df.tail())

    print("[*] Preprocessed dataset to a two-column matrix...")

    return window_df

There are likely other, more effective ways to represent time in the model, but I chose to use this representation for its simplicity and effectiveness. It’s important to note that to incorporate the time information into the model, I had to scale the network’s point load values between 0 and 1 so that the model could learn more efficiently, we take later a more consideration about this. To do this, I normalized the network load values using the following function:

def normalize_by_year(df: DataFrame) -> DataFrame:
    # Extract the year from the TIMESTAMP for grouping
    df['TIMESTAMP'] = to_datetime(df['START_TIME'])
    df['year'] = df['TIMESTAMP'].dt.year

    # Initialize the scaler
    scaler = MinMaxScaler()

    # Create a new DataFrame with the normalized values
    normalized_df = df.copy()

    # Group by year and normalize GLOBAL_LOAD_TOTAL
    for year, group in df.groupby('year'):
        # Normalize the values for each year
        values = group['GLOBAL_LOAD_TOTAL'].values.reshape(-1, 1)
        normalized_values = scaler.fit_transform(values)

        # Replace the values in the original DataFrame
        normalized_df.loc[df['year'] == year, 'GLOBAL_LOAD_TOTAL'] = normalized_values

    # Convert the TIMESTAMP to an integer for easier processing
    normalized_df['TIMESTAMP'] = df['TIMESTAMP'].astype(int)

    # Drop the year column as it's no longer needed
    normalized_df.drop(columns=['year', 'START_TIME'], inplace=True)

    print("[*] Normalized GLOBAL_LOAD_TOTAL by year.")
    print(normalized_df.tail())

    return normalized_df

In this way, the remaining model can take all this information into account to predict the next value of the time series. One not-so-obvious consideration, but which is very important for our AI models to learn efficiently, is data normalization. In this case, after normalizing the data, the model can learn effectively and make accurate predictions. For this normalization, I used the MinMaxScaler function from the Scikit Learn library. Which formula is as follows:

X_norm = X - X _ min X _ max - X _ min

Where:

  • (X_norm) is the normalized value.
  • (X) is the original value.
  • (X_min) is the minimum value in the dataset, in this case, the minimum in the year.
  • (X_max) is the maximum value in the dataset, in this case, the maximum in the year.

I have noticed that without this normalization, the model is unable to learn efficiently and, consequently, cannot make accurate predictions. Therefore, it is essential to normalize the data before inputting it into the model so that the model can learn effectively and make precise predictions.

To assess the performance of the model, I have decided to use the R² statistic (coefficient of determination) to evaluate the model’s ability to predict the electric grid load. I will also use this statistic to calculate the deviation of the model compared to the production model from ENTSOE.

The formula for calculating the R² statistic is as follows:

R ^ 2 = 1 - SS _ res SS _ tot

Where:

  • (SS_res) is the residual sum of squares.
  • (SS_tot) is the total sum of squares.

With this, we can calculate the R² statistic and evaluate the model’s performance in predicting the electric grid load. This will allow us to compare the model’s performance with the production model from ENTSOE and determine the deviation of the model compared to the production model.

Finally, to export the model to ONNX, I used the ONNX library from PyTorch, which allows for easy export of PyTorch models to the ONNX format. To do this, I used the following function:

def export_model(model, device, input_size, output_path):
    """
    Exports a PyTorch model to ONNX format and simplifies the ONNX model.

    :param model: PyTorch model to be exported.
    :param device: PyTorch device being used for training.
    :param input_size: Input size of the model (e.g., (batch_size, channels, height, width)).
    :param output_path: Path where the ONNX model will be saved.
    """
    # Set the model to evaluation mode
    model.eval()

    # Create a dummy input tensor
    dummy_input = torch.randn(*input_size).to(device)

    # Check and create the export directory if it doesn't exist
    output_dir = os.path.dirname(output_path)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
        print(f'[*] Directory created: {output_dir}')

    # Export the model to ONNX format
    torch.onnx.export(
        model, dummy_input, output_path,
        export_params=True,
        opset_version=11,
        do_constant_folding=True,  # Constant optimization
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={
            'input': {0: 'batch_size'},  # Dynamic axis for batch size
            'output': {0: 'batch_size'}
        }
    )

    # Load the ONNX model for simplification
    model_onnx = onnx.load(output_path)

    # Save the simplified model
    onnx.save(model_onnx, output_path)

    print(f'[*] Model exported and simplified to: {output_path}')

With this, we now have the model exported to ONNX and ready to be loaded in Rust for making predictions. Next, it’s time to deploy the model in the application and enable it to make real-time predictions.

Deploying the Model in the Application

We already understand how the model works, and we have clarified how we preprocessed the information to make the learning effective. Now it’s time to make the model deployable in an application that can be consumed from production.

This project, just like I would like it to be in my “Bachellor’s Thesis”, operates with a core in Rust that handles loading the ONNX model and making predictions. For this, I used the Tract library, which, as I mentioned earlier, is a neural network inference library in Rust that supports the ONNX format.

The great thing about Tract is that it is written in Rust, and its low-level operations are implemented in Assembly, making it very efficient and fast. Additionally, Tract is very easy to use and has a straightforward API that allows you to load an ONNX model and make predictions with it in just a few lines of code.

To load the ONNX model in Tract, I used the following function:

impl Default for PredictLoadService {
    fn default() -> PredictLoadService {
        let model_path = PathBuf::from("assets/grid_predictor.onnx");
        let model = tract_onnx::onnx()
            .model_for_path(model_path)
            .expect("OS: Failed to read model file")
            .with_input_fact(0, InferenceFact::dt_shape(f32::datum_type(), tvec!(1, 19, 3)))
            .expect("OS: Failed to set input shape")
            .into_optimized()
            .expect("OS: Failed to optimize model")
            .into_runnable()
            .expect("OS: Failed to create runnable model");

        PredictLoadService { model }
    }
}

And to effectively utilize the model, I created a function that takes a time window and returns the model’s prediction:

impl PredictLoadService {
    pub fn predict_load(&self, input_data: Vec<(i64, f32)>) -> Result<f32, String> {
        if input_data.len() != 19 {
            return Err("BUG: Input data must have 19 elements".to_string());
        }

        let window_values: Vec<f32> = input_data.iter().flat_map(
            |(timestamp, load)| {
                let (day_sin, minute_sin) = generate_sin_components(*timestamp)
                    .map_err(|e| format!("BUG: Error generating sin components -> {}", e)).unwrap();

                vec![day_sin, minute_sin, *load]
            }
        ).collect();

        // Convert the values to a Tensor with the appropriate shape [1, 19, 3]
        let input_tensor = Tensor::from_shape(&[1, 19, 3], &*window_values)
            .map_err(|e| format!("BUG: Error creating input tensor -> {}", e))?;

        // Run the model
        let result = self.model.run(
            tvec!(input_tensor.into())).map_err(|e| format!("BUG: Error running model -> {}", e)
        )?;

        // Extract the output value
        let output = result[0].to_scalar::<f32>().map_err(|e|
            format!("BUG: Error extracting output -> {}", e)
        )?;

        Ok(*output)
    }
}

As we can see, the code remains clean and easy to understand. Additionally, Tract allows us to load the model very efficiently and make predictions with it in just a few lines of code. This is what I enjoyed about working with Tract, and it has enabled me to effectively integrate the model into the application.

To retrieve production data, I used the ENTSOE API, which allowed me to obtain real-time production data and make predictions with the model. In the end, it all comes down to making API calls, and I think explaining how it’s done would be a bit redundant. So, dear reader, I invite you to explore the project’s source code to see how I retrieved the production data and made predictions with the model.

Out of curiosity, I wanted to extract the R² statistic to compare the deviation of my model against the ENTSOE production model. Using the Scikit Learn library, I was able to calculate it and obtained the following values after finishing the model training:

[neirth@beast-dragon electrical_grid_model] $ python3 src/main.py
[*] Training module for the "Electrical_Grid" model
[*] The device to be used will be "mps"
...
[*] Training completed in 1026.80 seconds.
[*] R^2 of the model with the evaluation set: 0.9973
[*] R^2 of the production forecast: 0.8754

As we can see, the model achieved an R² value of 0.9973, indicating that it can predict the electrical grid load with an accuracy of 99.73%. On the other hand, the ENTSOE production model obtained an R² value of 0.8754, suggesting that the ENTSOE production model can predict the electrical grid load with an accuracy of 87.54%.

This means that the model I developed is capable of predicting the electrical grid load with much higher accuracy than the ENTSOE production model. This is a very positive result, as it far exceeds the objective I had set at the beginning of the project.

Conclusions

It has been a very interesting and enriching project, allowing me to explore new technologies and learn a lot about developing AI models and deploying them in production applications. Additionally, it has enabled me to explore the use of ONNX and Tract, which are fascinating technologies with great potential for the development of AI applications in production.

This project is published in the GitHub repository if you want to take a look at the source code and see how I developed the model and deployed it in the application. In the repository, you will also find everything I used to calculate the results and make predictions with the model, making it all replicable.

Credits

The header image of this post is made using Midjourney AI.