Home » Tutorials » How to Extract Frames from Video using OpenCV in Python

How to Extract Frames from Video using OpenCV in Python

OpenCV, short for Open Source Computer Vision Library, is an incredibly versatile and widely-used tool in the realm of computer vision and image processing. Whether you’re delving into object detection, face recognition, or video processing, OpenCV has a rich set of features to help you achieve your goals. One particularly cool thing you can do with OpenCV is extract frames from videos. This means you can capture specific moments from a video and analyze them frame by frame. It’s a handy skill for anyone working in video editing, scientific research, or even machine learning.

Today, you’ll learn how to extract frames from a video using OpenCV in Python. In this tutorial, we’ll create a user-friendly graphical interface (GUI) that lets you pick a video and specify how many frames you want to extract. Let’s get started and see how Python, tkinter, and OpenCV come together to make this happen!

Table of Contents

Necessary Libraries

Let’s get everything set up before we dive into the code part, so make sure to install these libraries for the code to work as intended:

$ pip install tk 
$ pip install opencv-python
$ pip install install futures

Imports

As programmers, we want our program to be user-friendly. We create a graphical user interface using the tkinter library. From this library, we import filedialog to select directories and messagebox to use message boxes. We also import cv2 from the OpenCV library to process images and videos. Additionally, we import the os module to interact with the operating system.

We use the timedelta class from the datetime module to represent durations of time. To allow concurrent execution of tasks, we import the ThreadPoolExecutor class from the concurrent.futures module. We also use the threading module to work with threads, ensuring the main window remains responsive. Finally, we import the logging module to provide a flexible framework for emitting log messages from our program.

import tkinter as tk
from tkinter import filedialog, messagebox
import cv2
import os
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor
import threading
import logging

Setting Up Logging

We set up our logging system to the default settings using logging.basicConfig(). We log only messages of INFO level or higher. The log messages display the message, timestamp, and severity level using %(message)s, %(asctime)s, and %(levelname)s respectively.

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

Functions for Extracting Frames from Video

Now, let’s define our functions:

format_timedelta Function

The first function takes a timedelta object and uses divmod() to calculate the total number of seconds. It then splits the result into two parts: the integer part (seconds) and the fractional part (milliseconds). After converting the fractional part to milliseconds, the function formats the result into hours, minutes, and seconds, separated by colons. Finally, it replaces the colons with hyphens.

def format_timedelta(td):
   """Formats timedelta to a string in HH-MM-SS.MS format."""
   result, ms = divmod(td.total_seconds(), 1)
   ms = int(ms * 1000)
   return f"{str(timedelta(seconds=int(result)))}.{ms:03}".replace(":", "-")

save_frame Function

This one takes a frame from the video and saves it as a JPEG image file with specific parameters. The parameters include frame (representing the frame image data), frame_index (the index of the frame in the video), fps (frames per second), output_dir (the directory where the frames will be saved), and quality (set to 95 for a good balance between image quality and file size).

First, the function uses frame_index and fps to calculate the timestamp of the frame with timedelta. It then constructs the filename for the frame image by combining the output directory, the prefix “frame-“, and the timestamp, with the “.jpg” extension. Next, it uses cv2.imwrite() to write the frame image to the specified file path with the specified JPEG quality. Finally, once the frame has been successfully saved to the output directory, a log message indicating this will be displayed.

def save_frame(frame, frame_index, fps, output_dir, quality=95):
   """Save a single frame with specified JPEG quality."""
   timestamp = format_timedelta(timedelta(seconds=frame_index / fps))
   frame_filename = os.path.join(output_dir, f"frame-{timestamp}.jpg")
   cv2.imwrite(frame_filename, frame, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
   logging.info(f"Saved {frame_filename}")

extract_frames Function

As the name suggests, this function is responsible for extracting frames from the video. It has four parameters: video_path, the number of frames (num_frames), output_dir, and progress_callback, which is a function that updates the progress of the frame extraction in the main window. Now that we have defined its parameters, let’s discuss how it works:

  • First, it takes the video_path parameter and opens the specified video file using cv2.VideoCapture(). It then checks whether the video file was opened successfully using if not cap.isOpened(). If the video file is not opened, an error message is displayed. If it is opened, the function retrieves the total number of frames in the video and the frames per second (fps).
  • Next, it calculates the interval between the frames to ensure even spacing. It then creates four worker threads using ThreadPoolExecutor to iterate through each frame of the video with the calculated interval. For each frame, it sets the frame to the desired index using cap.set() and checks if the frame was read successfully.
  • In a separate thread, it submits a task to save the frame using executor.submit() and the save_frame() function. It then updates the progress of the frame extraction by calling the progress_callback() function. At the end, it ensures the progress is set to 100% using progress_callback(1) if all frames were read; otherwise, it logs a warning message.
  • Finally, it releases the video capture object using cap.release() to free up resources.
def extract_frames(video_path, num_frames, output_dir, progress_callback):
   """Extracts a specified number of frames evenly spaced throughout the video."""
   cap = cv2.VideoCapture(video_path)
   if not cap.isOpened():
       messagebox.showerror("Error", "Cannot open video file.")
       return


   total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
   fps = cap.get(cv2.CAP_PROP_FPS)
   interval = total_frames // num_frames


   with ThreadPoolExecutor(max_workers=4) as executor:
       for i in range(0, total_frames, interval):
           cap.set(cv2.CAP_PROP_POS_FRAMES, i)
           ret, frame = cap.read()
           if ret:
               executor.submit(save_frame, frame, i, fps, output_dir)
               progress_callback(i / total_frames)
           else:
               logging.warning(f"Skipping frame at index {i}: Cannot read frame.")


   cap.release()
   progress_callback(1)  # Ensure the progress is set to 100% at the end

load_file Function

This one allows the user to choose a video file and display its path in the entry_widget. It opens a file dialog using filedialog.askopenfilename(), which lets the user select a video file and stores the path in file_path. Once a video is selected and file_path is not empty, the function deletes any existing content from the entry_widget and inserts the path of the selected video file into it.

def load_file(entry_widget):
   file_path = filedialog.askopenfilename()
   if file_path:
       entry_widget.delete(0, tk.END)
       entry_widget.insert(0, file_path)

select_output_directory Function

The role of this one is to allow the user to select the directory where they want to save the extracted frames and displays the directory path in the entry_widget. It works similarly to the previous function, following the same steps but using a different path and entry_widget.

def select_output_directory(entry_widget):
   directory = filedialog.askdirectory()
   if directory:
       entry_widget.delete(0, tk.END)
       entry_widget.insert(0, directory)

start_extraction_thread Function

This function, start_extraction_thread, is defined with four parameters: video_path, num_frames, output_dir, and progress_label. It begins by checking if the user has specified the video file, the number of frames to extract, and the directory to save them in. If any of these three inputs is left blank, a warning message is displayed, and the function returns.

Next, it checks if the output directory exists. If it does not, the directory is created using os.makedirs().

Then, a new thread is started with threading.Thread, targeting the extract_frames function. The thread is passed the video_path, num_frames, and output_dir as arguments. Additionally, a lambda function is used to update the progress_label text to show the extraction progress as a percentage. The thread is set as a daemon by specifying daemon=True, meaning it will run in the background and automatically close when the main program exits.

def start_extraction_thread(video_path, num_frames, output_dir, progress_label):
   if not video_path or not num_frames or not output_dir:
       messagebox.showwarning("Warning", "Please specify a video file, total frames, and output directory.")
       return


   if not os.path.exists(output_dir):
       os.makedirs(output_dir)


   threading.Thread(target=extract_frames, args=(video_path, num_frames, output_dir, lambda progress: progress_label.config(text=f"Progress: {int(progress * 100)}%")), daemon=True).start()

start_extraction Function

The last one is defined by four parameters:

  • path_entry, which is the entry widget for the video file.
  • output_dir_entry, which is the entry widget for the output directory.
  • frame_count_entry, which is the entry widget for specifying the number of frames the user wants to extract.
  • progress_label, which is the label widget that displays the progress of the extraction.

The function works as follows: it starts by retrieving the inputs from the entry widgets. Specifically, it gets the video path from path_entry, converts the input from frame_count_entry to an integer to get the number of frames, and gets the output directory from output_dir_entry. After retrieving these inputs, it triggers the start_extraction_thread() function with the provided parameters.

def start_extraction(path_entry, frame_count_entry, output_dir_entry, progress_label):
   video_path = path_entry.get()
   num_frames = int(frame_count_entry.get())
   output_dir = output_dir_entry.get()
   start_extraction_thread(video_path, num_frames, output_dir, progress_label)

Main Functionality

The first thing this part does is ensure that the script is run directly and not imported as a module. Then, it creates the main window, sets its title to “Frame Extractor Tool – The Pycodes“, and defines its geometry as 600×300 pixels.

After this, it begins creating the GUI elements:

  • It creates the path_entry widget to display the video file path and places it in the grid at row 0, column 1.
  • Next to it, it places a “Browse” button that triggers the load_file() function using the path_entry widget, at row 0, column 2.
  • It creates the frame_count_entry widget for the number of frames, places it at row 1, column 1, and sets a default value of ’10’.
  • It also creates a label that says “Total Frames:” and places it at row 1, column 0.
  • The output_dir_entry widget is created to display the output directory path and is placed at row 2, column 1.
  • Next to it, it places a “Select Output Directory” button that triggers the select_output_directory() function using the output_dir_entry widget, at row 2, column 2.
  • It creates the “Start Extraction” button that triggers the start_extraction() function using the path_entry, frame_count_entry, output_dir_entry, and progress_label widgets, and places it at row 3, column 1.
  • Finally, it creates the progress_label widget to display the extraction progress and places it at row 4, column 1.

The script then starts the main event loop with root.mainloop(), ensuring that the main window remains open and responsive until the user exits.

if __name__ == "__main__":
   root = tk.Tk()
   root.title("Frame Extractor Tool - The Pycodes")
   root.geometry("600x300")


   # Widgets
   path_entry = tk.Entry(root, width=50)
   path_entry.grid(row=0, column=1, padx=10, pady=10)


   browse_button = tk.Button(root, text="Browse", command=lambda: load_file(path_entry))
   browse_button.grid(row=0, column=2, padx=10, pady=10)


   frame_count_entry = tk.Entry(root, width=10)
   frame_count_entry.grid(row=1, column=1, padx=10, pady=10)
   frame_count_entry.insert(0, "10")  # Default number of frames


   frame_count_label = tk.Label(root, text="Total Frames:")
   frame_count_label.grid(row=1, column=0, padx=10, pady=10)


   output_dir_entry = tk.Entry(root, width=50)
   output_dir_entry.grid(row=2, column=1, padx=10, pady=10)


   output_dir_button = tk.Button(root, text="Select Output Directory", command=lambda: select_output_directory(output_dir_entry))
   output_dir_button.grid(row=2, column=2, padx=10, pady=10)


   start_button = tk.Button(root, text="Start Extraction", command=lambda: start_extraction(path_entry, frame_count_entry, output_dir_entry, progress_label))
   start_button.grid(row=3, column=1, padx=10, pady=10)


   progress_label = tk.Label(root, text="Progress: 0%")
   progress_label.grid(row=4, column=1, padx=10, pady=10)


   root.mainloop()

Example

I ran this code on Windows as shown below:

Also on a Linux system:

Full Code

import tkinter as tk
from tkinter import filedialog, messagebox
import cv2
import os
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor
import threading
import logging


# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


def format_timedelta(td):
   """Formats timedelta to a string in HH-MM-SS.MS format."""
   result, ms = divmod(td.total_seconds(), 1)
   ms = int(ms * 1000)
   return f"{str(timedelta(seconds=int(result)))}.{ms:03}".replace(":", "-")


def save_frame(frame, frame_index, fps, output_dir, quality=95):
   """Save a single frame with specified JPEG quality."""
   timestamp = format_timedelta(timedelta(seconds=frame_index / fps))
   frame_filename = os.path.join(output_dir, f"frame-{timestamp}.jpg")
   cv2.imwrite(frame_filename, frame, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
   logging.info(f"Saved {frame_filename}")


def extract_frames(video_path, num_frames, output_dir, progress_callback):
   """Extracts a specified number of frames evenly spaced throughout the video."""
   cap = cv2.VideoCapture(video_path)
   if not cap.isOpened():
       messagebox.showerror("Error", "Cannot open video file.")
       return


   total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
   fps = cap.get(cv2.CAP_PROP_FPS)
   interval = total_frames // num_frames


   with ThreadPoolExecutor(max_workers=4) as executor:
       for i in range(0, total_frames, interval):
           cap.set(cv2.CAP_PROP_POS_FRAMES, i)
           ret, frame = cap.read()
           if ret:
               executor.submit(save_frame, frame, i, fps, output_dir)
               progress_callback(i / total_frames)
           else:
               logging.warning(f"Skipping frame at index {i}: Cannot read frame.")


   cap.release()
   progress_callback(1)  # Ensure the progress is set to 100% at the end


def load_file(entry_widget):
   file_path = filedialog.askopenfilename()
   if file_path:
       entry_widget.delete(0, tk.END)
       entry_widget.insert(0, file_path)


def select_output_directory(entry_widget):
   directory = filedialog.askdirectory()
   if directory:
       entry_widget.delete(0, tk.END)
       entry_widget.insert(0, directory)


def start_extraction_thread(video_path, num_frames, output_dir, progress_label):
   if not video_path or not num_frames or not output_dir:
       messagebox.showwarning("Warning", "Please specify a video file, total frames, and output directory.")
       return


   if not os.path.exists(output_dir):
       os.makedirs(output_dir)


   threading.Thread(target=extract_frames, args=(video_path, num_frames, output_dir, lambda progress: progress_label.config(text=f"Progress: {int(progress * 100)}%")), daemon=True).start()


def start_extraction(path_entry, frame_count_entry, output_dir_entry, progress_label):
   video_path = path_entry.get()
   num_frames = int(frame_count_entry.get())
   output_dir = output_dir_entry.get()
   start_extraction_thread(video_path, num_frames, output_dir, progress_label)


if __name__ == "__main__":
   root = tk.Tk()
   root.title("Frame Extractor Tool - The Pycodes")
   root.geometry("600x300")


   # Widgets
   path_entry = tk.Entry(root, width=50)
   path_entry.grid(row=0, column=1, padx=10, pady=10)


   browse_button = tk.Button(root, text="Browse", command=lambda: load_file(path_entry))
   browse_button.grid(row=0, column=2, padx=10, pady=10)


   frame_count_entry = tk.Entry(root, width=10)
   frame_count_entry.grid(row=1, column=1, padx=10, pady=10)
   frame_count_entry.insert(0, "10")  # Default number of frames


   frame_count_label = tk.Label(root, text="Total Frames:")
   frame_count_label.grid(row=1, column=0, padx=10, pady=10)


   output_dir_entry = tk.Entry(root, width=50)
   output_dir_entry.grid(row=2, column=1, padx=10, pady=10)


   output_dir_button = tk.Button(root, text="Select Output Directory", command=lambda: select_output_directory(output_dir_entry))
   output_dir_button.grid(row=2, column=2, padx=10, pady=10)


   start_button = tk.Button(root, text="Start Extraction", command=lambda: start_extraction(path_entry, frame_count_entry, output_dir_entry, progress_label))
   start_button.grid(row=3, column=1, padx=10, pady=10)


   progress_label = tk.Label(root, text="Progress: 0%")
   progress_label.grid(row=4, column=1, padx=10, pady=10)


   root.mainloop()

Happy Coding!

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top