Home » Tutorials » How to Detect Network Activity with Scapy in Python

How to Detect Network Activity with Scapy in Python

In the face of growing digital threats, understanding and enhancing your cybersecurity has never been more crucial. Whether you’re the one keeping the network safe at work or you’re deeply passionate about cybersecurity in your free time. This guide speaks directly to your needs.

In today’s tutorial, we dive into the essential techniques of packet sniffing using Python’s Scapy, you will learn how to monitor your network traffic and enhance your cybersecurity with packet Sniffing.

Let’s get started!

Table of Contents

Disclaimer

Please note: before we proceed with this code, I must warn you not to use this code illegally or without consent since it is for educational purposes only.

Necessary Libraries

For the code to function properly, make sure to install the Scapy library via the terminal or your command prompt:

$ pip install scapy

PS: Packet Sniffing is the analysis of the data that is being sent over a network.

Imports

Since we’re going to use packet Sniffing we will need to import all the functions from scapy, which is a powerful program for packet manipulation.

from scapy.all import *

We also gonna need various time-related functions, and for that, we import the time module.

import time

Class Definition PacketSniffer

After we import the necessary libraries and modules, it’s time to define our classes:

_init_ Method

This method sets up the initial values of the objects that are created in this class which basically are these three parameters:

The network interface we want to sniff (eth0, Wi-Fi), a packet_callback which tells the sniffer what to do with each captured packet and a filter to tell the sniffer which packet to capture (TCP, UDP, IP).

class PacketSniffer:
   def __init__(self, interface, packet_callback, filter="ip"):
       self.interface = interface
       self.filter = filter
       self.packet_callback = packet_callback

start_sniffing Method

Once the user specifies the network interface, and filter (optional, set IP as default), this method asks for the count, meaning the number of packets to be captured which the user can decide (if the user sets 0 as the count then packet capturing will go on indefinitely until manually stopped), once the sniffer captures a packet this method calls the packet_callback to analyze every captured packet.

   def start_sniffing(self, count=0):
       print(f"[*] Sniffing started on interface {self.interface}")
       try:
           sniff(iface=self.interface, filter=self.filter, prn=self.packet_callback, store=0, count=count)
       except KeyboardInterrupt:
           print("[*] Sniffing stopped.")

Class Definition NetworkMonitor

Method _init_

This class sets up only one parameter which is the network interface that the user wants to monitor.

class NetworkMonitor:
   def __init__(self, interface):
       self.interface = interface

monitor_network Method

This one will monitor the traffic on the network interface since it is the only parameter in the _init_ method. However, this Monitoring is only during a certain period, once that duration is over the monitoring ends and the program continues.

   def monitor_network(self, duration):
       print(f"[*] Monitoring network traffic on interface '{self.interface}' for {duration} seconds...")
       start_time = time.time()
       try:
           while (time.time() - start_time) < duration:
               pass
       except KeyboardInterrupt:
           print("[*] Monitoring stopped.")

Defining packet_callback Function

Now, let’s define our functions – the heart of our Script:

The packet_callback() function notifies upon detecting each sniffed packet and retrieves information from them, such as ‘source’ and ‘destination IP addresses’, before printing this information.

Then, it checks for suspicious TCP or UDP activity and prints a message if such activity is detected.

def packet_callback(packet):
   if IP in packet:
       ip_src = packet[IP].src
       ip_dst = packet[IP].dst
       print(f"Source IP: {ip_src} --> Destination IP: {ip_dst}")


       # Check for suspicious TCP activity
       if TCP in packet:
           if packet[TCP].flags & (0x01 | 0x02 | 0x04 | 0x08 | 0x10 | 0x20) != 0:
               print("Suspicious TCP activity detected!")
       # Check for suspicious UDP activity
       elif UDP in packet:
           print("Suspicious UDP activity detected!")

Main Function

The main function interacts with the user by asking what interface to sniff, what BPF filter to apply, and count for the number of packets to be captured, then create a PacketSniffer object to start Packet Sniffing,

After that, it creates a NetworkMonitor object to start monitoring network traffic for a duration specified by the user. In simple terms, this part of the code is the engine of the script.

def main():
   interface = input("Enter the interface to sniff (e.g., eth0, Wi-Fi): ")
   filter = input("Enter the BPF filter (e.g., 'ip') to apply (press Enter for default 'ip' filter): ").strip()
   count = int(input("Enter the number of packets to capture (0 for unlimited): "))
   monitor_duration = int(input("Enter the duration of network monitoring in seconds: "))


   packet_sniffer = PacketSniffer(interface, packet_callback, filter)
   packet_sniffer.start_sniffing(count)


   network_monitor = NetworkMonitor(interface)
   network_monitor.monitor_network(monitor_duration)

Execution

Lastly, this part triggers the main function while also making sure that this script can only run directly and cannot be executed as an imported module.

if __name__ == "__main__":
   main()

How is this Code Useful

  • We can ensure a smooth operation of our network infrastructure by monitoring our network traffic in real-time.
  • By analyzing network packets we can detect suspicious TCP or UDP activities, unauthorized access attempts, or port scanning.
  • This code is also flexible and it can be customized to perform deeper analysis with the aid of other tools.

Example

Full Code

from scapy.all import *
import time


class PacketSniffer:
   def __init__(self, interface, packet_callback, filter="ip"):
       self.interface = interface
       self.filter = filter
       self.packet_callback = packet_callback


   def start_sniffing(self, count=0):
       print(f"[*] Sniffing started on interface {self.interface}")
       try:
           sniff(iface=self.interface, filter=self.filter, prn=self.packet_callback, store=0, count=count)
       except KeyboardInterrupt:
           print("[*] Sniffing stopped.")


class NetworkMonitor:
   def __init__(self, interface):
       self.interface = interface


   def monitor_network(self, duration):
       print(f"[*] Monitoring network traffic on interface '{self.interface}' for {duration} seconds...")
       start_time = time.time()
       try:
           while (time.time() - start_time) < duration:
               pass
       except KeyboardInterrupt:
           print("[*] Monitoring stopped.")


def packet_callback(packet):
   if IP in packet:
       ip_src = packet[IP].src
       ip_dst = packet[IP].dst
       print(f"Source IP: {ip_src} --> Destination IP: {ip_dst}")


       # Check for suspicious TCP activity
       if TCP in packet:
           if packet[TCP].flags & (0x01 | 0x02 | 0x04 | 0x08 | 0x10 | 0x20) != 0:
               print("Suspicious TCP activity detected!")
       # Check for suspicious UDP activity
       elif UDP in packet:
           print("Suspicious UDP activity detected!")


def main():
   interface = input("Enter the interface to sniff (e.g., eth0, Wi-Fi): ")
   filter = input("Enter the BPF filter (e.g., 'ip') to apply (press Enter for default 'ip' filter): ").strip()
   count = int(input("Enter the number of packets to capture (0 for unlimited): "))
   monitor_duration = int(input("Enter the duration of network monitoring in seconds: "))


   packet_sniffer = PacketSniffer(interface, packet_callback, filter)
   packet_sniffer.start_sniffing(count)


   network_monitor = NetworkMonitor(interface)
   network_monitor.monitor_network(monitor_duration)


if __name__ == "__main__":
   main()

Happy Coding!

Related:

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top