Home » Tutorials » How to Perform Anomaly Detection in Time Series Data with Python

How to Perform Anomaly Detection in Time Series Data with Python

Hello everyone! Anomaly detection in time series data is a powerful technique for identifying unusual patterns that can signal significant events or issues. Today, we’re diving into this exciting field by creating a dynamic solution with Python. We’ll build a user-friendly graphical interface using Tkinter that allows us to upload data, train an LSTM Autoencoder model, and detect anomalies efficiently. For a detailed exploration of LSTM Autoencoders for anomaly detection, check out this insightful article.

In this tutorial, you’ll learn how to seamlessly integrate data visualization, machine learning, and a sleek GUI to tackle anomaly detection. We’ll guide you through every step, from loading your data to interpreting the results. So, let’s jump in and start building our anomaly detection system!

Table of Contents

Getting Started

To get this code up and running, you’ll need to install a few libraries. Just open your terminal or command prompt and add them in.

$ pip install tk
$ pip install pandas
$ pip install numpy
$ pip install matplotlib
$ pip install scikit-learn
$ pip install tensorflow

Imports

import tkinter as tk
from tkinter import filedialog
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, RepeatVector, TimeDistributed
from sklearn.model_selection import train_test_split

Before we dive in, we need to gather our essential tools. Here’s a quick overview of what we’ll be using:

  • tkinter: This library will help us create a graphical user interface and open a file selection dialog for uploading the CSV file.
  • Pandas: We’ll use pandas to load, manipulate, and analyze our data.
  • Numpy: This is essential for handling numbers, arrays, and performing mathematical computations.
  • Matplotlib.pyplot: We’ll use this to plot charts and graphs based on the data.
  • MinMaxScaler: This tool will normalize our data, ensuring that all values fall within a specific range.
  • Sequential, LSTM, Dense, RepeatVector, TimeDistributed: These are used to build and train our machine learning model.
  • Train_test_split: This will help us divide our data into training and testing sets.

Global Variables

# Global variables
model = None
scaler = None
X_test = None
data = None

While the heart of this operation lies in the command central, we can’t overlook the global variables that act as the nerve center of this program. Let’s explore these key components:

  • model: This variable stores the trained machine learning model.
  • scaler: This holds the MinMaxScaler object, which normalizes our data.
  • X_test: This contains the testing data used to evaluate the model after training.
  • data: This variable holds the data loaded from the CSV file.

Loading the Data

This is where the adventure begins as we crack down the seal that stores the data thanks to the load_data() function:

The function starts by opening the CSV file and reading it into a pandas DataFrame. It treats the timestamp column as dates to ensure proper handling.

data = pd.read_csv(file_path, parse_dates=['timestamp'], index_col='timestamp')

Next, it resamples the data to show each value per hour. If any hours are missing values, the ffill() method fills in these gaps with the last valid data point.

data_resampled = data.resample('H').mean().ffill()

If an error occurs at any point, the function catches it and informs the user with a helpful message.

except Exception as e:
    print(f"Error loading data: {e}")

In short, this function loads the CSV file data into a pandas DataFrame and provides feedback if something goes wrong during the process.

def load_data(file_path):
   global data
   try:
       data = pd.read_csv(file_path, parse_dates=['timestamp'], index_col='timestamp')
       data_resampled = data.resample('H').mean().ffill()  # Resample and forward-fill NaN values
       return data_resampled
   except Exception as e:
       print(f"Error loading data: {e}")
       return None

Uploading and Visualizing Data

Well, before we dive into loading the data, we first need to upload it. That’s where the upload_file() function comes in. This function uses filedialog to let the user select only CSV files. Once a file is chosen, it calls the load_data() function to process the file. If the data is loaded successfully, the function then proceeds to call plot_data() to visualize the data.

def upload_file():
   file_path = filedialog.askopenfilename(filetypes=[("CSV files", "*.csv")])
   if file_path:
       data_resampled = load_data(file_path)
       if data_resampled is not None:
           plot_data(data_resampled)
           global data
           data = data_resampled

The plot_data() function does just what you’d expect: it uses plt.plot() to create a plot of the data points, with time on the x-axis and values on the y-axis. Finally, it uses plt.show() to display the plot, complete with the title “Data Visualization”.

def plot_data(data):
   plt.figure(figsize=(12, 6))
   plt.plot(data, label='Data')
   plt.title('Data Visualization')
   plt.xlabel('Timestamp')
   plt.ylabel('Value')
   plt.legend()
   plt.show()

Preprocessing Data

Now that we have our data, we need to prepare it for training and testing. This is where the preprocess_data() function steps in. Here’s what it does:

First, it initializes a MinMaxScaler to adjust the data to a range between 0 and 1. This scaling helps the model learn patterns more effectively.

scaler = MinMaxScaler()

Then the function checks for any missing values and uses the ffill() method to fill them with the previous valid value. This method ensures there are no gaps in the data.

if data.isnull().values.any():
    print("Data contains NaN values. Filling NaNs with forward-fill.")
    data = data.ffill()

After that, the data is reshaped and scaled using the scaler. By doing so, it is prepared for the LSTM model, converting it into a format that the model can work with effectively.

data_scaled = scaler.fit_transform(data.values.reshape(-1, 1))

Next, it creates sequences of 48 time steps from the scaled data. LSTM models work better with data split into these chunks, which represent 2 days of hourly data.

def create_sequences(data, time_steps=TIME_STEPS):
    sequences = []
    for i in range(len(data) - time_steps):
        sequences.append(data[i:(i + time_steps)])
    return np.array(sequences)

Lastly, the data is split into two sets: X_train for training the model and X_test for evaluating its performance.

X_train, X_test = train_test_split(X, test_size=0.2, random_state=42, shuffle=False)

This preprocessing ensures that our data is well-prepared for the LSTM Autoencoder, making the training and evaluation process smoother and more effective.

Building and Training the Model

Now that we have our data loaded and preprocessed, we can finally move on to training our model. The build_and_train_model() function handles this task. It utilizes LSTM layers to enable the model to learn from the chunks of data we’ve created.

The function then employs the Adam optimizer to compile the model, enhancing its learning efficiency, and uses Mean Squared Error (MSE) as the loss function to evaluate the accuracy of the model’s predictions. The model is trained for 20 epochs, and after training, the function displays a plot showing the model’s error over time, allowing us to assess its learning progress.

def build_and_train_model():
   global model, X_test


   X_train, X_test = preprocess_data(data)


   model = Sequential([
       LSTM(128, input_shape=(X_train.shape[1], X_train.shape[2]), return_sequences=False),
       RepeatVector(X_train.shape[1]),
       LSTM(128, return_sequences=True),
       TimeDistributed(Dense(X_train.shape[2]))
   ])


   model.compile(optimizer='adam', loss='mse')
   model.summary()


   history = model.fit(X_train, X_train, epochs=20, batch_size=64, validation_split=0.2, shuffle=False)


   # Plotting training loss
   plt.figure(figsize=(10, 5))
   plt.plot(history.history['loss'], label='Training Loss')
   plt.plot(history.history['val_loss'], label='Validation Loss')
   plt.title('Model Loss')
   plt.xlabel('Epochs')
   plt.ylabel('Loss')
   plt.legend()
   plt.show()

Detecting Anomalies

With everything set up, we’re ready to detect anomalies in the data. The detect_anomalies() function uses our trained model to reconstruct the test data and compare it with the actual values. By calculating the Mean Squared Error (MSE) for each segment of test data, we can gauge the accuracy of the model’s predictions.

Any data points with errors exceeding the MSE plus three times the standard deviation are flagged as anomalies. The function then visualizes these anomalies on a plot, marking them with red points.

def detect_anomalies():
   global model, scaler, X_test, data


   X_test_pred = model.predict(X_test)


   # Calculate the reconstruction loss (MSE)
   mse = np.mean(np.power(X_test - X_test_pred, 2), axis=(1, 2))


   # Set a threshold for anomaly detection
   threshold = np.mean(mse) + 3 * np.std(mse)


   anomalies = mse > threshold


   # Plot anomalies
   plt.figure(figsize=(12, 6))
   plt.plot(data.index[-len(X_test):], scaler.inverse_transform(X_test[:, -1, 0].reshape(-1, 1)), label='True Data')
   plt.scatter(data.index[-len(X_test):][anomalies], scaler.inverse_transform(X_test[anomalies, -1, 0].reshape(-1, 1)), color='red', label='Anomalies')
   plt.title('Anomaly Detection')
   plt.xlabel('Timestamp')
   plt.ylabel('Value')
   plt.legend()
   plt.show()

Creating the GUI

It’s time to make our application user-friendly by adding a graphical interface. We’ve designed a simple and intuitive GUI to interact with our anomaly detection system:

First, we set up the main window of our application, giving it a title and specifying its size. This ensures the interface looks good and is easily accessible.

# Create the GUI
root = tk.Tk()
root.title("Anomaly Detection in Time Series Data - The Pycodes")
root.geometry("400x100")

Next, we’ve included an “Upload CSV File” button. This allows users to select and upload their CSV files directly through the interface. Clicking this button will invoke the upload_file() function.

upload_button = tk.Button(root, text="Upload CSV File", command=upload_file)
upload_button.pack()

We also have a “Train Model” button. When pressed, it triggers the build_and_train_model() function, which handles the training of our model.

train_button = tk.Button(root, text="Train Model", command=build_and_train_model)
train_button.pack()

To complete the setup, we’ve added a “Detect Anomalies” button. This button calls the detect_anomalies() function to perform anomaly detection on the data.

anomaly_button = tk.Button(root, text="Detect Anomalies", command=detect_anomalies)
anomaly_button.pack()

Finally, to keep the application responsive and functional, we start the Tkinter main event loop.

root.mainloop()

This setup provides an easy-to-use interface for interacting with the anomaly detection system, allowing users to upload data, train the model, and identify anomalies with just a few clicks.

Example

For this example, I used the CSV file available here. Feel free to download it and follow along!

First, I uploaded the CSV file. The Data Visualization plot appeared as shown below:

Then, I clicked the “Train Model” button, and here’s the plot of the model’s training loss over time:

After that, I detected anomalies by clicking on the “Detect Anomalies” button. Here’s the resulting plot with anomalies highlighted:

Full Code

import tkinter as tk
from tkinter import filedialog
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, RepeatVector, TimeDistributed
from sklearn.model_selection import train_test_split


# Global variables
model = None
scaler = None
X_test = None
data = None


def load_data(file_path):
   global data
   try:
       data = pd.read_csv(file_path, parse_dates=['timestamp'], index_col='timestamp')
       data_resampled = data.resample('H').mean().ffill()  # Resample and forward-fill NaN values
       return data_resampled
   except Exception as e:
       print(f"Error loading data: {e}")
       return None


def upload_file():
   file_path = filedialog.askopenfilename(filetypes=[("CSV files", "*.csv")])
   if file_path:
       data_resampled = load_data(file_path)
       if data_resampled is not None:
           plot_data(data_resampled)
           global data
           data = data_resampled


def plot_data(data):
   plt.figure(figsize=(12, 6))
   plt.plot(data, label='Data')
   plt.title('Data Visualization')
   plt.xlabel('Timestamp')
   plt.ylabel('Value')
   plt.legend()
   plt.show()


def preprocess_data(data):
   global scaler


   scaler = MinMaxScaler()


   # Ensure there are no NaN values
   if data.isnull().values.any():
       print("Data contains NaN values. Filling NaNs with forward-fill.")
       data = data.ffill()


   # Reshape if necessary and scale the data
   data_scaled = scaler.fit_transform(data.values.reshape(-1, 1))


   # Prepare the data for LSTM Autoencoder
   TIME_STEPS = 48  # 2 days of hourly data


   def create_sequences(data, time_steps=TIME_STEPS):
       sequences = []
       for i in range(len(data) - time_steps):
           sequences.append(data[i:(i + time_steps)])
       return np.array(sequences)


   X = create_sequences(data_scaled)
   X_train, X_test = train_test_split(X, test_size=0.2, random_state=42, shuffle=False)


   return X_train, X_test


def build_and_train_model():
   global model, X_test


   X_train, X_test = preprocess_data(data)


   model = Sequential([
       LSTM(128, input_shape=(X_train.shape[1], X_train.shape[2]), return_sequences=False),
       RepeatVector(X_train.shape[1]),
       LSTM(128, return_sequences=True),
       TimeDistributed(Dense(X_train.shape[2]))
   ])


   model.compile(optimizer='adam', loss='mse')
   model.summary()


   history = model.fit(X_train, X_train, epochs=20, batch_size=64, validation_split=0.2, shuffle=False)


   # Plotting training loss
   plt.figure(figsize=(10, 5))
   plt.plot(history.history['loss'], label='Training Loss')
   plt.plot(history.history['val_loss'], label='Validation Loss')
   plt.title('Model Loss')
   plt.xlabel('Epochs')
   plt.ylabel('Loss')
   plt.legend()
   plt.show()


def detect_anomalies():
   global model, scaler, X_test, data


   X_test_pred = model.predict(X_test)


   # Calculate the reconstruction loss (MSE)
   mse = np.mean(np.power(X_test - X_test_pred, 2), axis=(1, 2))


   # Set a threshold for anomaly detection
   threshold = np.mean(mse) + 3 * np.std(mse)


   anomalies = mse > threshold


   # Plot anomalies
   plt.figure(figsize=(12, 6))
   plt.plot(data.index[-len(X_test):], scaler.inverse_transform(X_test[:, -1, 0].reshape(-1, 1)), label='True Data')
   plt.scatter(data.index[-len(X_test):][anomalies], scaler.inverse_transform(X_test[anomalies, -1, 0].reshape(-1, 1)), color='red', label='Anomalies')
   plt.title('Anomaly Detection')
   plt.xlabel('Timestamp')
   plt.ylabel('Value')
   plt.legend()
   plt.show()


# Create the GUI
root = tk.Tk()
root.title("Anomaly Detection in Time Series Data - The Pycodes")
root.geometry("400x100")


upload_button = tk.Button(root, text="Upload CSV File", command=upload_file)
upload_button.pack()


train_button = tk.Button(root, text="Train Model", command=build_and_train_model)
train_button.pack()


anomaly_button = tk.Button(root, text="Detect Anomalies", command=detect_anomalies)
anomaly_button.pack()

root.mainloop()

Happy Coding!

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top