Wazuh Scripts
For the Cybersecurity Project module at Nanyang Polytechnic, we were tasked with identifying a cyberattack scenario and designing a security architecture to mitigate and prevent those attacks.
Scenario
The chosen scenario is the “Malicious Memes” attack (see: https://www.trendmicro.com/en_us/research/18/l/cybercriminals-use-malicious-memes-that-communicate-with-malware.html).
In summary:
- Attackers socially engineered employees (vishing) by impersonating the help desk.
- Victims were directed to a phishing site that captured their credentials.
- The attackers escalated access, identified accounts with administrative tools, and used further social engineering.
- Compromised admin tools were used to hijack high-profile accounts and post scam content.
- Sensitive user data (DMs, search history, contact numbers, emails) was exfiltrated via platform features.
- Malicious images circulated on the platform contained embedded commands.
- A two-stage payload downloads those images, extracts commands, and executes them on victims’ machines.
- Exfiltrated data is sent back to the attacker’s server.
Proposed Security Architecture

Responsibilities
I was responsible for deploying and configuring Wazuh, a host-based detection and response platform, to detect threats, monitor vulnerabilities, manage security events, and help meet compliance requirements.
To automate response actions, I developed custom Python scripts that integrate with Wazuh’s active response mechanism. When Wazuh detects suspicious activity (for example, an interactive shell, repeated failed logins, or a malicious file), it can trigger these scripts to mitigate the threat automatically.
To run the Python scripts on Wazuh, they’re to be converted to an executable first.
Backup / Recovery
This script provides an interactive utility to back up or recover Wazuh configuration and data. Backups can be created locally and optionally uploaded to Google Drive; restores can be performed from Google Drive or from a local archive.
#!/usr/bin/python3
"""
author: DnG-00
"""
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
import shutil
import os
import re
from datetime import datetime
gauth = GoogleAuth()
gauth.LocalWebserverAuth()
drive = GoogleDrive(gauth)
while True:
print("Backup Menu")
print("1. Backup Wazuh Server Files")
print("2. Recover Wazuh Server Files from Google Drive")
print("3. Recover Wazuh Server Files from local archive")
print("9. Exit")
option = input("Option: ")
if option == "1":
dt_string = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
wazuh_dir = "/var/ossec"
print("Backing up files")
backup_zip = shutil.make_archive(f"Wazuh Backup {dt_string}", 'zip', wazuh_dir)
if os.path.exists("/var/backups/Wazuh"):
shutil.move(backup_zip,f"/var/backups/Wazuh/Wazuh Backup {dt_string}")
print("Backup created in /var/backups/Wazuh")
else:
print("Creating directory Wazuh in /var/backups")
os.mkdir("/var/backups/Wazuh")
shutil.move(backup_zip,f"/var/backups/Wazuh/Wazuh Backup {dt_string}")
print("Backup created in /var/backups/Wazuh")
upload_file_list = [f"/var/backups/Wazuh/Wazuh Backup {dt_string}"]
for upload_file in upload_file_list:
gfile = drive.CreateFile({'parents': [{'id': 'ID_OF_FOLDER_IN_GOOGLE_DRIVE'}]}) # Create GoogleDriveFile instance with title 'Hello.txt'.
# Read file and set it as the content of this instance.
gfile.SetContentFile(upload_file)
gfile['title'] = f"Wazuh Backup {dt_string}"
gfile.Upload() # Upload the file.
print("Backup uploaded to Google Drive")
elif option == "2":
file_list = drive.ListFile({'q': "'{}' in parents and trashed=false".format('ID_OF_FOLDER_IN_GOOGLE_DRIVE')}).GetList()
choosefilelist = []
index = 1
while True:
print("Backup files found:")
for file1 in file_list:
choosefilelist.append(file1['id'])
print(f"{index}. Title: {file1['title']}, {file1['id']}")
index += 1
try:
backupfile = int(input("Choose Backup File: "))
if backupfile > len(choosefilelist):
print("Please select a valid file")
else:
break
except:
print("Please select a valid file")
index = 1
choosefilelist = []
try:
downloadfile = drive.CreateFile({'id': choosefilelist[backupfile-1]})
except:
print("File not found in Google Drive")
try:
downloadfile.GetContentFile(downloadfile['title'])
except:
print("Error occurred while downloading archive")
archive = f"{os.getcwd()}/{downloadfile['title']}"
print("Downloaded archive")
print("Stopping Wazuh services")
try:
os.system(f'/var/ossec/bin/wazuh-control stop')
except:
print("Unable to stop Wazuh services")
print("Unpacking archive")
try:
shutil.unpack_archive(archive, '/var/ossec', "zip")
print("Archive unpacked, files restored")
try:
print("Starting Wazuh services")
os.system(f'/var/ossec/bin/wazuh-control start')
print("Wazuh Services started")
except:
print("Unable to start Wazuh services, please start manually")
except:
print("Error occurred while unpacking archive, please ensure Wazuh services was stopped")
elif option == "3":
while True:
archives = os.listdir("/var/backups/Wazuh")
backuparchives = []
archiveindex = 1
for archive in archives:
print(f"{archiveindex}. {archive}")
backuparchives.append(archive)
archiveindex +=1
try:
selectarchive = int(input("Choose Backup File: "))
except:
print("Please select a valid file")
if selectarchive > len(backuparchives):
print("Please select a valid file")
else:
break
archiveindex = 1
backuparchives = []
print("Stopping Wazuh services")
try:
os.system(f'/var/ossec/bin/wazuh-control stop')
except:
print("Unable to stop Wazuh services")
print("Unpacking archive")
try:
shutil.unpack_archive(f"/var/backups/Wazuh/{backuparchives[selectarchive-1]}", '/var/ossec', "zip")
print("Archive unpacked, files restored")
except:
print("Error occurred while unpacking archive")
try:
print("Starting Wazuh services")
os.system(f'/var/ossec/bin/wazuh-control start')
print("Wazuh Services started")
except:
print("Unable to start Wazuh services, please start manually")
elif option == "9":
break
else:
continue
Blockshell
If an interactive shell is spawned on a monitored host, Wazuh can trigger the following active-response script to block the connection automatically by adding a firewall rule.
#!/usr/bin/python3
"""
author: DnG-00
"""
import os
import sys
import json
import datetime
import subprocess
import socket
if os.name == 'nt':
LOG_FILE = "C:\\Program Files (x86)\\ossec-agent\\active-response\\active-responses.log"
else:
LOG_FILE = "/var/ossec/logs/active-responses.log"
ADD_COMMAND = 0
DELETE_COMMAND = 1
CONTINUE_COMMAND = 2
ABORT_COMMAND = 3
OS_SUCCESS = 0
OS_INVALID = -1
class message:
def __init__(self):
self.alert = ""
self.command = 0
def write_debug_file(ar_name, msg):
with open(LOG_FILE, mode="a") as log_file:
log_file.write(str(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')) + " active-response/bin/blockshell" + ": " + msg +"\n")
def setup_and_check_message(argv):
# get alert from stdin
input_str = ""
for line in sys.stdin:
input_str = line
break
write_debug_file(argv[0], input_str)
try:
data = json.loads(input_str)
except ValueError:
write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
message.command = OS_INVALID
return message
message.alert = data
command = data.get("command")
if command == "add":
message.command = ADD_COMMAND
elif command == "delete":
message.command = DELETE_COMMAND
else:
message.command = OS_INVALID
write_debug_file(argv[0], 'Not valid command: ' + command)
return message
def send_keys_and_check_message(argv, keys):
# build and send message with keys
keys_msg = json.dumps({"version": 1,"origin":{"name": argv[0],"module":"active-response"},"command":"check_keys","parameters":{"keys":keys}})
write_debug_file(argv[0], keys_msg)
print(keys_msg)
sys.stdout.flush()
# read the response of previous message
input_str = ""
while True:
line = sys.stdin.readline()
if line:
input_str = line
break
write_debug_file(argv[0], input_str)
try:
data = json.loads(input_str)
except ValueError:
write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
return message
action = data.get("command")
if "continue" == action:
ret = CONTINUE_COMMAND
elif "abort" == action:
ret = ABORT_COMMAND
else:
ret = OS_INVALID
write_debug_file(argv[0], "Invalid value of 'command'")
return ret
def main(argv):
write_debug_file(argv[0], "Started")
# validate json and get command
msg = setup_and_check_message(argv)
if msg.command < 0:
sys.exit(OS_INVALID)
if msg.command == ADD_COMMAND:
""" Start Custom Key
At this point, it is necessary to select the keys from the alert and add them into the keys array.
"""
alert = msg.alert["parameters"]["alert"]
keys = [alert["rule"]["id"]]
""" End Custom Key """
action = send_keys_and_check_message(argv, keys)
# if necessary, abort execution
if action != CONTINUE_COMMAND:
if action == ABORT_COMMAND:
write_debug_file(argv[0], "Aborted")
sys.exit(OS_SUCCESS)
else:
write_debug_file(argv[0], "Invalid command")
sys.exit(OS_INVALID)
""" Start Custom Action Add """
write_debug_file(argv[0], f"Adding firewall rule")
hostname = socket.gethostname()
hostip = socket.gethostbyname(hostname)
destip = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["destinationIp"]
destport = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["destinationPort"]
if destport == "53" or "443":
write_debug_file(argv[0], "Connection established for HTTPS / DNS, quitting program")
sys.exit()
else:
whitelist = []
with open('C:\\Program Files (x86)\\ossec-agent\\active-response\\bin\\whitelist.txt', 'r') as reader:
for line in reader.readlines():
whitelist.append(line)
if destip in whitelist:
write_debug_file(argv[0], f"{destip} is whitelisted, exiting")
write_debug_file(argv[0], "Ended")
sys.exit()
else:
command = f'netsh advfirewall firewall add rule name="Wazuh Active Response - {destip}" dir=out action=block protocol=any remoteip={destip}'
os.system(f'cmd /c {command}')
write_debug_file(argv[0], f"Created firewall rule - {destip}")
""" End Custom Action Add """
elif msg.command == DELETE_COMMAND:
""" Start Custom Action Delete """
write_debug_file(argv[0], f"Deleting firewall rule")
hostname = socket.gethostname()
hostip = socket.gethostbyname(hostname)
destip = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["destinationIp"]
command = f'netsh advfirewall firewall delete rule name="Wazuh Active Response - {destip}"'
os.system(f'cmd /c {command}')
write_debug_file(argv[0], f"Deleted firewall rule - {destip}")
""" End Custom Action Delete """
else:
write_debug_file(argv[0], "Invalid command")
write_debug_file(argv[0], "Ended")
sys.exit(OS_SUCCESS)
if __name__ == "__main__":
main(sys.argv)
Automatic firewall rule creation
When multiple failed RDP login attempts are detected for a host, Wazuh can invoke the following script to add a firewall rule that blocks the attacker’s IP address.
#!/usr/bin/python3
"""
author: DnG-00
"""
import os
import sys
import json
import datetime
import subprocess
import socket
if os.name == 'nt':
LOG_FILE = "C:\\Program Files (x86)\\ossec-agent\\active-response\\active-responses.log"
else:
LOG_FILE = "/var/ossec/logs/active-responses.log"
ADD_COMMAND = 0
DELETE_COMMAND = 1
CONTINUE_COMMAND = 2
ABORT_COMMAND = 3
OS_SUCCESS = 0
OS_INVALID = -1
class message:
def __init__(self):
self.alert = ""
self.command = 0
def write_debug_file(ar_name, msg):
with open(LOG_FILE, mode="a") as log_file:
log_file.write(str(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')) + " active-response/bin/firewall" + ": " + msg +"\n")
def setup_and_check_message(argv):
# get alert from stdin
input_str = ""
for line in sys.stdin:
input_str = line
break
write_debug_file(argv[0], input_str)
try:
data = json.loads(input_str)
except ValueError:
write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
message.command = OS_INVALID
return message
message.alert = data
command = data.get("command")
if command == "add":
message.command = ADD_COMMAND
elif command == "delete":
message.command = DELETE_COMMAND
else:
message.command = OS_INVALID
write_debug_file(argv[0], 'Not valid command: ' + command)
return message
def send_keys_and_check_message(argv, keys):
# build and send message with keys
keys_msg = json.dumps({"version": 1,"origin":{"name": argv[0],"module":"active-response"},"command":"check_keys","parameters":{"keys":keys}})
write_debug_file(argv[0], keys_msg)
print(keys_msg)
sys.stdout.flush()
# read the response of previous message
input_str = ""
while True:
line = sys.stdin.readline()
if line:
input_str = line
break
write_debug_file(argv[0], input_str)
try:
data = json.loads(input_str)
except ValueError:
write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
return message
action = data.get("command")
if "continue" == action:
ret = CONTINUE_COMMAND
elif "abort" == action:
ret = ABORT_COMMAND
else:
ret = OS_INVALID
write_debug_file(argv[0], "Invalid value of 'command'")
return ret
def main(argv):
write_debug_file(argv[0], "Started")
# validate json and get command
msg = setup_and_check_message(argv)
if msg.command < 0:
sys.exit(OS_INVALID)
if msg.command == ADD_COMMAND:
""" Start Custom Key
At this point, it is necessary to select the keys from the alert and add them into the keys array.
"""
alert = msg.alert["parameters"]["alert"]
keys = [alert["rule"]["id"]]
""" End Custom Key """
action = send_keys_and_check_message(argv, keys)
# if necessary, abort execution
if action != CONTINUE_COMMAND:
if action == ABORT_COMMAND:
write_debug_file(argv[0], "Aborted")
sys.exit(OS_SUCCESS)
else:
write_debug_file(argv[0], "Invalid command")
sys.exit(OS_INVALID)
""" Start Custom Action Add """
write_debug_file(argv[0], f"Adding firewall rule")
hostname = socket.gethostname()
hostip = socket.gethostbyname(hostname)
destip = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["ipAddress"]
command = f'netsh advfirewall firewall add rule name="Wazuh Active Response - {destip}" dir=in action=block protocol=any remoteip={destip}'
os.system(f'cmd /c {command}')
write_debug_file(argv[0], f"Created firewall rule - {destip}")
""" End Custom Action Add """
elif msg.command == DELETE_COMMAND:
""" Start Custom Action Delete """
write_debug_file(argv[0], f"Deleting firewall rule")
hostname = socket.gethostname()
hostip = socket.gethostbyname(hostname)
destip = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["ipAddress"]
command = f'netsh advfirewall firewall delete rule name="Wazuh Active Response - {destip}"'
os.system(f'cmd /c {command}')
write_debug_file(argv[0], f"Deleted firewall rule - {destip}")
""" End Custom Action Delete """
else:
write_debug_file(argv[0], "Invalid command")
write_debug_file(argv[0], "Ended")
sys.exit(OS_SUCCESS)
if __name__ == "__main__":
main(sys.argv)
Automatic process termination
If Wazuh detects a suspicious process (for example, abnormal file I/O activity), it can run the script below to terminate the offending process automatically.
#!/usr/bin/python3
"""
author: DnG-00
"""
import os
import signal
import sys
import json
import datetime
if os.name == 'nt':
LOG_FILE = "C:\\Program Files (x86)\\ossec-agent\\active-response\\active-responses.log"
else:
LOG_FILE = "/var/ossec/logs/active-responses.log"
ADD_COMMAND = 0
DELETE_COMMAND = 1
CONTINUE_COMMAND = 2
ABORT_COMMAND = 3
OS_SUCCESS = 0
OS_INVALID = -1
class message:
def __init__(self):
self.alert = ""
self.command = 0
def write_debug_file(ar_name, msg):
with open(LOG_FILE, mode="a") as log_file:
log_file.write(str(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')) + " active-response/bin/killprocess" + ": " + msg +"\n")
def setup_and_check_message(argv):
# get alert from stdin
input_str = ""
for line in sys.stdin:
input_str = line
break
write_debug_file(argv[0], input_str)
try:
data = json.loads(input_str)
except ValueError:
write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
message.command = OS_INVALID
return message
message.alert = data
command = data.get("command")
if command == "add":
message.command = ADD_COMMAND
elif command == "delete":
message.command = DELETE_COMMAND
else:
message.command = OS_INVALID
write_debug_file(argv[0], 'Not valid command: ' + command)
return message
def send_keys_and_check_message(argv, keys):
# build and send message with keys
keys_msg = json.dumps({"version": 1,"origin":{"name": argv[0],"module":"active-response"},"command":"check_keys","parameters":{"keys":keys}})
write_debug_file(argv[0], keys_msg)
print(keys_msg)
sys.stdout.flush()
# read the response of previous message
input_str = ""
while True:
line = sys.stdin.readline()
if line:
input_str = line
break
write_debug_file(argv[0], input_str)
try:
data = json.loads(input_str)
except ValueError:
write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
return message
action = data.get("command")
if "continue" == action:
ret = CONTINUE_COMMAND
elif "abort" == action:
ret = ABORT_COMMAND
else:
ret = OS_INVALID
write_debug_file(argv[0], "Invalid value of 'command'")
return ret
def main(argv):
write_debug_file(argv[0], "Started")
# validate json and get command
msg = setup_and_check_message(argv)
if msg.command < 0:
sys.exit(OS_INVALID)
if msg.command == ADD_COMMAND:
""" Start Custom Key
At this point, it is necessary to select the keys from the alert and add them into the keys array.
"""
alert = msg.alert["parameters"]["alert"]
keys = [alert["rule"]["id"]]
""" End Custom Key """
action = send_keys_and_check_message(argv, keys)
# if necessary, abort execution
if action != CONTINUE_COMMAND:
if action == ABORT_COMMAND:
write_debug_file(argv[0], "Aborted")
sys.exit(OS_SUCCESS)
else:
write_debug_file(argv[0], "Invalid command")
sys.exit(OS_INVALID)
""" Start Custom Action Add """
pname = msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["image"]
pid = int(msg.alert["parameters"]["alert"]["data"]["win"]["eventdata"]["processId"])
os.kill(pid, signal.SIGTERM)
write_debug_file(argv[0], f"Killed process - {pname}")
""" End Custom Action Add """
elif msg.command == DELETE_COMMAND:
""" Start Custom Action Delete """
# os.remove("ar-test-result.txt")
""" End Custom Action Delete """
else:
write_debug_file(argv[0], "Invalid command")
write_debug_file(argv[0], "Ended")
sys.exit(OS_SUCCESS)
if __name__ == "__main__":
main(sys.argv)
Automatic threat removal
When a downloaded file is flagged as malicious (for example, via a VirusTotal lookup integrated into the workflow), Wazuh can execute the following script to remove the file immediately.
#!/usr/bin/python3
"""
author: DnG-00
"""
import os
import sys
import json
import datetime
if os.name == 'nt':
LOG_FILE = "C:\\Program Files (x86)\\ossec-agent\\active-response\\active-responses.log"
else:
LOG_FILE = "/var/ossec/logs/active-responses.log"
ADD_COMMAND = 0
DELETE_COMMAND = 1
CONTINUE_COMMAND = 2
ABORT_COMMAND = 3
OS_SUCCESS = 0
OS_INVALID = -1
class message:
def __init__(self):
self.alert = ""
self.command = 0
def write_debug_file(ar_name, msg):
with open(LOG_FILE, mode="a") as log_file:
log_file.write(str(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')) + " active-response/bin/remove-threat" + ": " + msg +"\n")
def setup_and_check_message(argv):
# get alert from stdin
input_str = ""
for line in sys.stdin:
input_str = line
break
write_debug_file(argv[0], input_str)
try:
data = json.loads(input_str)
except ValueError:
write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
message.command = OS_INVALID
return message
message.alert = data
command = data.get("command")
if command == "add":
message.command = ADD_COMMAND
elif command == "delete":
message.command = DELETE_COMMAND
else:
message.command = OS_INVALID
write_debug_file(argv[0], 'Not valid command: ' + command)
return message
def send_keys_and_check_message(argv, keys):
# build and send message with keys
keys_msg = json.dumps({"version": 1,"origin":{"name": argv[0],"module":"active-response"},"command":"check_keys","parameters":{"keys":keys}})
write_debug_file(argv[0], keys_msg)
print(keys_msg)
sys.stdout.flush()
# read the response of previous message
input_str = ""
while True:
line = sys.stdin.readline()
if line:
input_str = line
break
write_debug_file(argv[0], input_str)
try:
data = json.loads(input_str)
except ValueError:
write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
return message
action = data.get("command")
if "continue" == action:
ret = CONTINUE_COMMAND
elif "abort" == action:
ret = ABORT_COMMAND
else:
ret = OS_INVALID
write_debug_file(argv[0], "Invalid value of 'command'")
return ret
def main(argv):
write_debug_file(argv[0], "Started")
# validate json and get command
msg = setup_and_check_message(argv)
if msg.command < 0:
sys.exit(OS_INVALID)
if msg.command == ADD_COMMAND:
""" Start Custom Key
At this point, it is necessary to select the keys from the alert and add them into the keys array.
"""
alert = msg.alert["parameters"]["alert"]
keys = [alert["rule"]["id"]]
""" End Custom Key """
action = send_keys_and_check_message(argv, keys)
# if necessary, abort execution
if action != CONTINUE_COMMAND:
if action == ABORT_COMMAND:
write_debug_file(argv[0], "Aborted")
sys.exit(OS_SUCCESS)
else:
write_debug_file(argv[0], "Invalid command")
sys.exit(OS_INVALID)
""" Start Custom Action Add """
rmfile = msg.alert["parameters"]["alert"]["data"]["virustotal"]["source"]["file"]
os.remove(rmfile)
if not os.path.exists(rmfile):
write_debug_file(argv[0], f"Successfully removed threat located in {rmfile}")
else:
write_debug_file(argv[0], f"Error occurred when removing positive threat located in {rmfile}")
""" End Custom Action Add """
elif msg.command == DELETE_COMMAND:
""" Start Custom Action Delete """
# os.remove("ar-test-result.txt")
""" End Custom Action Delete """
else:
write_debug_file(argv[0], "Invalid command")
write_debug_file(argv[0], "Ended")
sys.exit(OS_SUCCESS)
if __name__ == "__main__":
main(sys.argv)