Lightweight Vulnerability Scanner for Resourced-constrained Organizations

taskmaker.py 4.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import csv
  2. from gvm.connections import TLSConnection
  3. from gvm.protocols.gmp import Gmp
  4. from gvm.transforms import EtreeTransform
  5. import os
  6. # OpenVAS Configuration
  7. OPENVAS_HOST = "localhost"
  8. OPENVAS_PORT = 9390
  9. USERNAME = "admin"
  10. PASSWORD = "admin"
  11. # Set up data directory
  12. BASE_DIR = os.path.dirname(os.path.abspath(__file__))
  13. DATA_DIR = os.path.abspath(os.path.join(BASE_DIR, "..", "data"))
  14. os.makedirs(DATA_DIR, exist_ok=True)
  15. # CSV files
  16. TARGET_ID_CSV = os.path.join(DATA_DIR, "target_id.csv")
  17. TASK_ID_CSV = os.path.join(DATA_DIR, "task_id.csv")
  18. def get_scan_config_id(gmp):
  19. """
  20. Retrieve the ID of the 'Full and Fast' scan configuration.
  21. """
  22. scan_configs = gmp.get_scan_configs()
  23. for config in scan_configs.findall("config"):
  24. if config.find("name").text == "Full and fast":
  25. config_id = config.get("id")
  26. print(f"'Full and Fast' scan configuration found with ID: {config_id}")
  27. return config_id
  28. print("Failed to find 'Full and Fast' scan configuration.")
  29. return None
  30. def get_scanner_id(gmp):
  31. """
  32. Retrieve the ID of the default scanner.
  33. """
  34. scanners = gmp.get_scanners()
  35. for scanner in scanners.findall("scanner"):
  36. if "OpenVAS Default" in scanner.find("name").text:
  37. scanner_id = scanner.get("id")
  38. print(f"'OpenVAS Default' scanner found with ID: {scanner_id}")
  39. return scanner_id
  40. print("Failed to find 'OpenVAS Default' scanner.")
  41. return None
  42. def read_csv_to_target_list(csv_file):
  43. """
  44. Read the CSV file and extract target IDs into a list.
  45. """
  46. target_list = []
  47. with open(csv_file, newline="") as csvfile:
  48. reader = csv.DictReader(csvfile)
  49. for row in reader:
  50. target_list.append(row["Target ID"])
  51. return target_list
  52. def save_task_id_to_csv(target_id, task_id):
  53. """
  54. Save the task ID to the task_id.csv file.
  55. """
  56. with open(TASK_ID_CSV, "a", newline="") as csvfile:
  57. writer = csv.writer(csvfile)
  58. writer.writerow([target_id, task_id])
  59. print(f"Saved task ID {task_id} for target ID {target_id} to {TASK_ID_CSV}")
  60. def create_task(gmp, task_name, target_id, scan_config_id, scanner_id):
  61. """
  62. Create a task in OpenVAS.
  63. """
  64. response = gmp.create_task(
  65. name=task_name,
  66. config_id=scan_config_id,
  67. target_id=target_id,
  68. scanner_id=scanner_id
  69. )
  70. task_id = response.get("id")
  71. if task_id:
  72. print(f"Created task '{task_name}' for target ID '{target_id}' with ID: {task_id}")
  73. else:
  74. print(f"Failed to create task for target ID {target_id}. Check logs.")
  75. return task_id
  76. def main():
  77. connection = TLSConnection(hostname=OPENVAS_HOST, port=OPENVAS_PORT)
  78. with Gmp(connection=connection, transform=EtreeTransform()) as gmp:
  79. # Authenticate with OpenVAS
  80. gmp.authenticate(username=USERNAME, password=PASSWORD)
  81. print("Authenticated with OpenVAS")
  82. # Get the scan configuration ID
  83. scan_config_id = get_scan_config_id(gmp)
  84. if not scan_config_id:
  85. print("Failed to retrieve a valid scan configuration. Exiting.")
  86. return
  87. # Get the scanner ID
  88. scanner_id = get_scanner_id(gmp)
  89. if not scanner_id:
  90. print("Failed to retrieve a valid scanner. Exiting.")
  91. return
  92. # Read target IDs from the target_id.csv file
  93. target_list = read_csv_to_target_list(TARGET_ID_CSV)
  94. if not target_list:
  95. print("No target IDs found in the CSV file. Exiting.")
  96. return
  97. print(f"Found {len(target_list)} targets to create tasks for.")
  98. # Process each target
  99. for target_id in target_list:
  100. print(f"Processing target ID: {target_id}")
  101. # Create task for the target
  102. task_name = f"Task for Target {target_id}"
  103. task_id = create_task(gmp, task_name, target_id, scan_config_id, scanner_id)
  104. if task_id:
  105. save_task_id_to_csv(target_id, task_id)
  106. else:
  107. print(f"Failed to create task for target ID {target_id}. Continuing.")
  108. if __name__ == "__main__":
  109. # Ensure the task_id.csv file exists with headers
  110. try:
  111. with open(TASK_ID_CSV, "x", newline="") as csvfile:
  112. writer = csv.writer(csvfile)
  113. writer.writerow(["Target ID", "Task ID"]) # Write headers if the file doesn't exist
  114. except FileExistsError:
  115. pass # File already exists, no need to create
  116. main()