Wazuh Scripts

Schoolwork

For the Cybersecurity Project module at Nanyang Polytechnic, we were tasked with identifying a cyberattack scenario and designing a security architecture to mitigate and prevent those attacks.

Scenario

The chosen scenario is the “Malicious Memes” attack (see: https://www.trendmicro.com/en_us/research/18/l/cybercriminals-use-malicious-memes-that-communicate-with-malware.html).

In summary:

  • Attackers socially engineered employees (vishing) by impersonating the help desk.
  • Victims were directed to a phishing site that captured their credentials.
  • The attackers escalated access, identified accounts with administrative tools, and used further social engineering.
  • Compromised admin tools were used to hijack high-profile accounts and post scam content.
  • Sensitive user data (DMs, search history, contact numbers, emails) was exfiltrated via platform features.
  • Malicious images circulated on the platform contained embedded commands.
  • A two-stage payload downloads those images, extracts commands, and executes them on victims’ machines.
  • Exfiltrated data is sent back to the attacker’s server.

Proposed Security Architecture

Image of proposed security architecture

Responsibilities

I was responsible for deploying and configuring Wazuh, a host-based detection and response platform, to detect threats, monitor vulnerabilities, manage security events, and help meet compliance requirements.

To automate response actions, I developed custom Python scripts that integrate with Wazuh’s active response mechanism. When Wazuh detects suspicious activity (for example, an interactive shell, repeated failed logins, or a malicious file), it can trigger these scripts to mitigate the threat automatically.

To run the Python scripts on Wazuh, they’re to be converted to an executable first.

Backup / Recovery

This script provides an interactive utility to back up or recover Wazuh configuration and data. Backups can be created locally and optionally uploaded to Google Drive; restores can be performed from Google Drive or from a local archive.

#!/usr/bin/python3
"""
author: DnG-00
"""

from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
import shutil
import os
import re
from datetime import datetime

gauth = GoogleAuth()   
gauth.LocalWebserverAuth()        
drive = GoogleDrive(gauth)

while True:
    print("Backup Menu")
    print("1. Backup Wazuh Server Files")
    print("2. Recover Wazuh Server Files from Google Drive")
    print("3. Recover Wazuh Server Files from local archive")
    print("9. Exit")
    option = input("Option: ")

    if option == "1":
        dt_string = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
        wazuh_dir = "/var/ossec"
        print("Backing up files")
        backup_zip = shutil.make_archive(f"Wazuh Backup {dt_string}", 'zip', wazuh_dir)
        if os.path.exists("/var/backups/Wazuh"):
            shutil.move(backup_zip,f"/var/backups/Wazuh/Wazuh Backup {dt_string}")
            print("Backup created in /var/backups/Wazuh")
        else:
            print("Creating directory Wazuh in /var/backups")
            os.mkdir("/var/backups/Wazuh")
            shutil.move(backup_zip,f"/var/backups/Wazuh/Wazuh Backup {dt_string}")
            print("Backup created in /var/backups/Wazuh")
        upload_file_list = [f"/var/backups/Wazuh/Wazuh Backup {dt_string}"]
        for upload_file in upload_file_list:
            gfile = drive.CreateFile({'parents': [{'id': 'ID_OF_FOLDER_IN_GOOGLE_DRIVE'}]}) # Create GoogleDriveFile instance with title 'Hello.txt'.
            # Read file and set it as the content of this instance.
            gfile.SetContentFile(upload_file)
            gfile['title'] = f"Wazuh Backup {dt_string}"
            gfile.Upload() # Upload the file.
        print("Backup uploaded to Google Drive")

    elif option == "2":
        file_list = drive.ListFile({'q': "'{}' in parents and trashed=false".format('ID_OF_FOLDER_IN_GOOGLE_DRIVE')}).GetList()
        choosefilelist = []
        index = 1
        while True:
            print("Backup files found:")
            for file1 in file_list:
                choosefilelist.append(file1['id'])
                print(f"{index}. Title: {file1['title']}, {file1['id']}")
                index += 1
            try:
                backupfile = int(input("Choose Backup File: "))
                if backupfile > len(choosefilelist):
                    print("Please select a valid file")
                else:
                    break
            except:
                print("Please select a valid file")
            index = 1
            choosefilelist = []
        
        try: 
            downloadfile = drive.CreateFile({'id': choosefilelist[backupfile-1]})
        except: 
                print("File not found in Google Drive")
        try:
            downloadfile.GetContentFile(downloadfile['title'])
        except:
            print("Error occurred while downloading archive")

        archive = f"{os.getcwd()}/{downloadfile['title']}"
        print("Downloaded archive")
        print("Stopping Wazuh services")
        try:
            os.system(f'/var/ossec/bin/wazuh-control stop')
        except:
            print("Unable to stop Wazuh services")

        print("Unpacking archive")
        try:
            shutil.unpack_archive(archive, '/var/ossec', "zip")
            print("Archive unpacked, files restored")
        
            try:
                print("Starting Wazuh services")
                os.system(f'/var/ossec/bin/wazuh-control start')
                print("Wazuh Services started")
            except:
                print("Unable to start Wazuh services, please start manually")
        except:
            print("Error occurred while unpacking archive, please ensure Wazuh services was stopped")
    
    elif option == "3":
        while True:
            archives = os.listdir("/var/backups/Wazuh")
            backuparchives = []
            archiveindex = 1
            for archive in archives:
                print(f"{archiveindex}. {archive}")
                backuparchives.append(archive)
                archiveindex +=1 
        
            try:
                selectarchive = int(input("Choose Backup File: "))
            except:
                print("Please select a valid file")
            if selectarchive > len(backuparchives):
                print("Please select a valid file")
            else:
                break
            archiveindex = 1
            backuparchives = []

        print("Stopping Wazuh services")
        try:
            os.system(f'/var/ossec/bin/wazuh-control stop')
        except:
            print("Unable to stop Wazuh services")

        print("Unpacking archive")
        try:
            shutil.unpack_archive(f"/var/backups/Wazuh/{backuparchives[selectarchive-1]}", '/var/ossec', "zip")
            print("Archive unpacked, files restored")
        except:
            print("Error occurred while unpacking archive")
        try:
            print("Starting Wazuh services")
            os.system(f'/var/ossec/bin/wazuh-control start')
            print("Wazuh Services started")
        except:
            print("Unable to start Wazuh services, please start manually")
        
        
    elif option == "9":
        break

    else: 
        continue

Blockshell

If an interactive shell is spawned on a monitored host, Wazuh can trigger the following active-response script to block the connection automatically by adding a firewall rule.

#!/usr/bin/python3
"""
author: DnG-00
"""

import os
import sys
import json
import datetime
import subprocess
import socket

if os.name == 'nt':
    LOG_FILE = "C:\\Program Files (x86)\\ossec-agent\\active-response\\active-responses.log"
else:
    LOG_FILE = "/var/ossec/logs/active-responses.log"

ADD_COMMAND = 0
DELETE_COMMAND = 1
CONTINUE_COMMAND = 2
ABORT_COMMAND = 3

OS_SUCCESS = 0
OS_INVALID = -1

class message:
    def __init__(self):
        self.alert = ""
        self.command = 0


def write_debug_file(ar_name, msg):
    with open(LOG_FILE, mode="a") as log_file:
        log_file.write(str(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')) + " active-response/bin/blockshell" + ": " + msg +"\n")


def setup_and_check_message(argv):

    # get alert from stdin
    input_str = ""
    for line in sys.stdin:
        input_str = line
        break

    write_debug_file(argv[0], input_str)

    try:
        data = json.loads(input_str)
    except ValueError:
        write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
        message.command = OS_INVALID
        return message

    message.alert = data

    command = data.get("command")

    if command == "add":
        message.command = ADD_COMMAND
    elif command == "delete":
        message.command = DELETE_COMMAND
    else:
        message.command = OS_INVALID
        write_debug_file(argv[0], 'Not valid command: ' + command)

    return message


def send_keys_and_check_message(argv, keys):

    # build and send message with keys
    keys_msg = json.dumps({"version": 1,"origin":{"name": argv[0],"module":"active-response"},"command":"check_keys","parameters":{"keys":keys}})

    write_debug_file(argv[0], keys_msg)

    print(keys_msg)
    sys.stdout.flush()

    # read the response of previous message
    input_str = ""
    while True:
        line = sys.stdin.readline()
        if line:
            input_str = line
            break

    write_debug_file(argv[0], input_str)

    try:
        data = json.loads(input_str)
    except ValueError:
        write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
        return message

    action = data.get("command")

    if "continue" == action:
        ret = CONTINUE_COMMAND
    elif "abort" == action:
        ret = ABORT_COMMAND
    else:
        ret = OS_INVALID
        write_debug_file(argv[0], "Invalid value of 'command'")

    return ret


def main(argv):

    write_debug_file(argv[0], "Started")

    # validate json and get command
    msg = setup_and_check_message(argv)

    if msg.command < 0:
        sys.exit(OS_INVALID)

    if msg.command == ADD_COMMAND:

        """ Start Custom Key
        At this point, it is necessary to select the keys from the alert and add them into the keys array.
        """

        alert = msg.alert["parameters"]["alert"]
        keys = [alert["rule"]["id"]]

        """ End Custom Key """

        action = send_keys_and_check_message(argv, keys)

        # if necessary, abort execution
        if action != CONTINUE_COMMAND:

            if action == ABORT_COMMAND:
                write_debug_file(argv[0], "Aborted")
                sys.exit(OS_SUCCESS)
            else:
                write_debug_file(argv[0], "Invalid command")
                sys.exit(OS_INVALID)

        """ Start Custom Action Add """

        write_debug_file(argv[0], f"Adding firewall rule")

        hostname = socket.gethostname()
        hostip = socket.gethostbyname(hostname)
        destip = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["destinationIp"]
        destport = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["destinationPort"]

        if destport == "53" or "443":
            write_debug_file(argv[0], "Connection established for HTTPS / DNS, quitting program")
            sys.exit()
        else: 
            whitelist = []

            with open('C:\\Program Files (x86)\\ossec-agent\\active-response\\bin\\whitelist.txt', 'r') as reader:
                for line in reader.readlines():
                    whitelist.append(line)
            
            if destip in whitelist:
                write_debug_file(argv[0], f"{destip} is whitelisted, exiting")
                write_debug_file(argv[0], "Ended")
                sys.exit()
            else:
                command = f'netsh advfirewall firewall add rule name="Wazuh Active Response - {destip}" dir=out action=block protocol=any remoteip={destip}'

                os.system(f'cmd /c {command}')
                write_debug_file(argv[0], f"Created firewall rule - {destip}")

            """ End Custom Action Add """

    elif msg.command == DELETE_COMMAND:

        """ Start Custom Action Delete """

        write_debug_file(argv[0], f"Deleting firewall rule")

        hostname = socket.gethostname()
        hostip = socket.gethostbyname(hostname)
        destip = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["destinationIp"]
        
        command = f'netsh advfirewall firewall delete rule name="Wazuh Active Response - {destip}"'

        os.system(f'cmd /c {command}')
        write_debug_file(argv[0], f"Deleted firewall rule - {destip}")

        """ End Custom Action Delete """

    else:
        write_debug_file(argv[0], "Invalid command")

    write_debug_file(argv[0], "Ended")

    sys.exit(OS_SUCCESS)


if __name__ == "__main__":
    main(sys.argv)

Automatic firewall rule creation

When multiple failed RDP login attempts are detected for a host, Wazuh can invoke the following script to add a firewall rule that blocks the attacker’s IP address.

#!/usr/bin/python3
"""
author: DnG-00
"""

import os
import sys
import json
import datetime
import subprocess
import socket

if os.name == 'nt':
    LOG_FILE = "C:\\Program Files (x86)\\ossec-agent\\active-response\\active-responses.log"
else:
    LOG_FILE = "/var/ossec/logs/active-responses.log"

ADD_COMMAND = 0
DELETE_COMMAND = 1
CONTINUE_COMMAND = 2
ABORT_COMMAND = 3

OS_SUCCESS = 0
OS_INVALID = -1

class message:
    def __init__(self):
        self.alert = ""
        self.command = 0


def write_debug_file(ar_name, msg):
    with open(LOG_FILE, mode="a") as log_file:
        log_file.write(str(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')) + " active-response/bin/firewall" + ": " + msg +"\n")


def setup_and_check_message(argv):

    # get alert from stdin
    input_str = ""
    for line in sys.stdin:
        input_str = line
        break

    write_debug_file(argv[0], input_str)

    try:
        data = json.loads(input_str)
    except ValueError:
        write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
        message.command = OS_INVALID
        return message

    message.alert = data

    command = data.get("command")

    if command == "add":
        message.command = ADD_COMMAND
    elif command == "delete":
        message.command = DELETE_COMMAND
    else:
        message.command = OS_INVALID
        write_debug_file(argv[0], 'Not valid command: ' + command)

    return message


def send_keys_and_check_message(argv, keys):

    # build and send message with keys
    keys_msg = json.dumps({"version": 1,"origin":{"name": argv[0],"module":"active-response"},"command":"check_keys","parameters":{"keys":keys}})

    write_debug_file(argv[0], keys_msg)

    print(keys_msg)
    sys.stdout.flush()

    # read the response of previous message
    input_str = ""
    while True:
        line = sys.stdin.readline()
        if line:
            input_str = line
            break

    write_debug_file(argv[0], input_str)

    try:
        data = json.loads(input_str)
    except ValueError:
        write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
        return message

    action = data.get("command")

    if "continue" == action:
        ret = CONTINUE_COMMAND
    elif "abort" == action:
        ret = ABORT_COMMAND
    else:
        ret = OS_INVALID
        write_debug_file(argv[0], "Invalid value of 'command'")

    return ret


def main(argv):

    write_debug_file(argv[0], "Started")

    # validate json and get command
    msg = setup_and_check_message(argv)

    if msg.command < 0:
        sys.exit(OS_INVALID)

    if msg.command == ADD_COMMAND:

        """ Start Custom Key
        At this point, it is necessary to select the keys from the alert and add them into the keys array.
        """

        alert = msg.alert["parameters"]["alert"]
        keys = [alert["rule"]["id"]]

        """ End Custom Key """

        action = send_keys_and_check_message(argv, keys)

        # if necessary, abort execution
        if action != CONTINUE_COMMAND:

            if action == ABORT_COMMAND:
                write_debug_file(argv[0], "Aborted")
                sys.exit(OS_SUCCESS)
            else:
                write_debug_file(argv[0], "Invalid command")
                sys.exit(OS_INVALID)

        """ Start Custom Action Add """

        write_debug_file(argv[0], f"Adding firewall rule")

        hostname = socket.gethostname()
        hostip = socket.gethostbyname(hostname)
        destip = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["ipAddress"]
        
        command = f'netsh advfirewall firewall add rule name="Wazuh Active Response - {destip}" dir=in action=block protocol=any remoteip={destip}'

        os.system(f'cmd /c {command}')
        write_debug_file(argv[0], f"Created firewall rule - {destip}")

        """ End Custom Action Add """

    elif msg.command == DELETE_COMMAND:

        """ Start Custom Action Delete """

        write_debug_file(argv[0], f"Deleting firewall rule")

        hostname = socket.gethostname()
        hostip = socket.gethostbyname(hostname)
        destip = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["ipAddress"]
        
        command = f'netsh advfirewall firewall delete rule name="Wazuh Active Response - {destip}"'

        os.system(f'cmd /c {command}')
        write_debug_file(argv[0], f"Deleted firewall rule - {destip}")

        """ End Custom Action Delete """

    else:
        write_debug_file(argv[0], "Invalid command")

    write_debug_file(argv[0], "Ended")

    sys.exit(OS_SUCCESS)


if __name__ == "__main__":
    main(sys.argv)

Automatic process termination

If Wazuh detects a suspicious process (for example, abnormal file I/O activity), it can run the script below to terminate the offending process automatically.

#!/usr/bin/python3
"""
author: DnG-00
"""

import os
import signal
import sys
import json
import datetime

if os.name == 'nt':
    LOG_FILE = "C:\\Program Files (x86)\\ossec-agent\\active-response\\active-responses.log"
else:
    LOG_FILE = "/var/ossec/logs/active-responses.log"

ADD_COMMAND = 0
DELETE_COMMAND = 1
CONTINUE_COMMAND = 2
ABORT_COMMAND = 3

OS_SUCCESS = 0
OS_INVALID = -1

class message:
    def __init__(self):
        self.alert = ""
        self.command = 0


def write_debug_file(ar_name, msg):
    with open(LOG_FILE, mode="a") as log_file:
        log_file.write(str(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')) + " active-response/bin/killprocess" + ": " + msg +"\n")


def setup_and_check_message(argv):

    # get alert from stdin
    input_str = ""
    for line in sys.stdin:
        input_str = line
        break

    write_debug_file(argv[0], input_str)

    try:
        data = json.loads(input_str)
    except ValueError:
        write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
        message.command = OS_INVALID
        return message

    message.alert = data

    command = data.get("command")

    if command == "add":
        message.command = ADD_COMMAND
    elif command == "delete":
        message.command = DELETE_COMMAND
    else:
        message.command = OS_INVALID
        write_debug_file(argv[0], 'Not valid command: ' + command)

    return message


def send_keys_and_check_message(argv, keys):

    # build and send message with keys
    keys_msg = json.dumps({"version": 1,"origin":{"name": argv[0],"module":"active-response"},"command":"check_keys","parameters":{"keys":keys}})

    write_debug_file(argv[0], keys_msg)

    print(keys_msg)
    sys.stdout.flush()

    # read the response of previous message
    input_str = ""
    while True:
        line = sys.stdin.readline()
        if line:
            input_str = line
            break

    write_debug_file(argv[0], input_str)

    try:
        data = json.loads(input_str)
    except ValueError:
        write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
        return message

    action = data.get("command")

    if "continue" == action:
        ret = CONTINUE_COMMAND
    elif "abort" == action:
        ret = ABORT_COMMAND
    else:
        ret = OS_INVALID
        write_debug_file(argv[0], "Invalid value of 'command'")

    return ret


def main(argv):

    write_debug_file(argv[0], "Started")

    # validate json and get command
    msg = setup_and_check_message(argv)

    if msg.command < 0:
        sys.exit(OS_INVALID)

    if msg.command == ADD_COMMAND:

        """ Start Custom Key
        At this point, it is necessary to select the keys from the alert and add them into the keys array.
        """

        alert = msg.alert["parameters"]["alert"]
        keys = [alert["rule"]["id"]]

        """ End Custom Key """

        action = send_keys_and_check_message(argv, keys)

        # if necessary, abort execution
        if action != CONTINUE_COMMAND:

            if action == ABORT_COMMAND:
                write_debug_file(argv[0], "Aborted")
                sys.exit(OS_SUCCESS)
            else:
                write_debug_file(argv[0], "Invalid command")
                sys.exit(OS_INVALID)

        """ Start Custom Action Add """

        pname = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["image"]
        pid = int(msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["processId"])
        
        os.kill(pid, signal.SIGTERM)

        write_debug_file(argv[0], f"Killed process - {pname}")
            

        """ End Custom Action Add """

    elif msg.command == DELETE_COMMAND:

        """ Start Custom Action Delete """

        # os.remove("ar-test-result.txt")

        """ End Custom Action Delete """

    else:
        write_debug_file(argv[0], "Invalid command")

    write_debug_file(argv[0], "Ended")

    sys.exit(OS_SUCCESS)


if __name__ == "__main__":
    main(sys.argv)

Automatic threat removal

When a downloaded file is flagged as malicious (for example, via a VirusTotal lookup integrated into the workflow), Wazuh can execute the following script to remove the file immediately.

#!/usr/bin/python3
"""
author: DnG-00
"""


import os
import sys
import json
import datetime

if os.name == 'nt':
    LOG_FILE = "C:\\Program Files (x86)\\ossec-agent\\active-response\\active-responses.log"
else:
    LOG_FILE = "/var/ossec/logs/active-responses.log"

ADD_COMMAND = 0
DELETE_COMMAND = 1
CONTINUE_COMMAND = 2
ABORT_COMMAND = 3

OS_SUCCESS = 0
OS_INVALID = -1

class message:
    def __init__(self):
        self.alert = ""
        self.command = 0


def write_debug_file(ar_name, msg):
    with open(LOG_FILE, mode="a") as log_file:
        log_file.write(str(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')) + " active-response/bin/remove-threat" + ": " + msg +"\n")


def setup_and_check_message(argv):

    # get alert from stdin
    input_str = ""
    for line in sys.stdin:
        input_str = line
        break

    write_debug_file(argv[0], input_str)

    try:
        data = json.loads(input_str)
    except ValueError:
        write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
        message.command = OS_INVALID
        return message

    message.alert = data

    command = data.get("command")

    if command == "add":
        message.command = ADD_COMMAND
    elif command == "delete":
        message.command = DELETE_COMMAND
    else:
        message.command = OS_INVALID
        write_debug_file(argv[0], 'Not valid command: ' + command)

    return message


def send_keys_and_check_message(argv, keys):

    # build and send message with keys
    keys_msg = json.dumps({"version": 1,"origin":{"name": argv[0],"module":"active-response"},"command":"check_keys","parameters":{"keys":keys}})

    write_debug_file(argv[0], keys_msg)

    print(keys_msg)
    sys.stdout.flush()

    # read the response of previous message
    input_str = ""
    while True:
        line = sys.stdin.readline()
        if line:
            input_str = line
            break

    write_debug_file(argv[0], input_str)

    try:
        data = json.loads(input_str)
    except ValueError:
        write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
        return message

    action = data.get("command")

    if "continue" == action:
        ret = CONTINUE_COMMAND
    elif "abort" == action:
        ret = ABORT_COMMAND
    else:
        ret = OS_INVALID
        write_debug_file(argv[0], "Invalid value of 'command'")

    return ret


def main(argv):

    write_debug_file(argv[0], "Started")

    # validate json and get command
    msg = setup_and_check_message(argv)

    if msg.command < 0:
        sys.exit(OS_INVALID)

    if msg.command == ADD_COMMAND:

        """ Start Custom Key
        At this point, it is necessary to select the keys from the alert and add them into the keys array.
        """

        alert = msg.alert["parameters"]["alert"]
        keys = [alert["rule"]["id"]]

        """ End Custom Key """

        action = send_keys_and_check_message(argv, keys)

        # if necessary, abort execution
        if action != CONTINUE_COMMAND:

            if action == ABORT_COMMAND:
                write_debug_file(argv[0], "Aborted")
                sys.exit(OS_SUCCESS)
            else:
                write_debug_file(argv[0], "Invalid command")
                sys.exit(OS_INVALID)

        """ Start Custom Action Add """


        rmfile = msg.alert["parameters"]["alert"]["data"]["virustotal"]["source"]["file"]
        os.remove(rmfile)

        if not os.path.exists(rmfile):
            write_debug_file(argv[0], f"Successfully removed threat located in {rmfile}")
        else:
            write_debug_file(argv[0], f"Error occurred when removing positive threat located in {rmfile}")
            

        """ End Custom Action Add """

    elif msg.command == DELETE_COMMAND:

        """ Start Custom Action Delete """

        # os.remove("ar-test-result.txt")

        """ End Custom Action Delete """

    else:
        write_debug_file(argv[0], "Invalid command")

    write_debug_file(argv[0], "Ended")

    sys.exit(OS_SUCCESS)


if __name__ == "__main__":
    main(sys.argv)