123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- import os
- import csv
- import base64
- from gvm.connections import TLSConnection
- from gvm.protocols.gmp import Gmp
- from gvm.transforms import EtreeTransform
-
- # Set up data directory
- BASE_DIR = os.path.dirname(os.path.abspath(__file__))
- DATA_DIR = os.path.abspath(os.path.join(BASE_DIR, "..", "data"))
- os.makedirs(DATA_DIR, exist_ok=True)
-
- REPORT_ID_CSV = os.path.join(DATA_DIR, "report_id.csv")
- OPENVAS_SCAN_CSV = os.path.join(DATA_DIR, "openvasscan.csv")
-
- OPENVAS_HOST = "localhost"
- OPENVAS_PORT = 9390
- USERNAME = "admin"
- PASSWORD = "admin"
-
- CSV_FORMAT_ID = "c1645568-627a-11e3-a660-406186ea4fc5"
-
- def get_report_csv_data(gmp, report_id):
- report = gmp.get_report(report_id=report_id, report_format_id=CSV_FORMAT_ID)
- base64_data = report.find(".//report_format").tail
- if base64_data:
- try:
- decoded_data = base64.b64decode(base64_data).decode("utf-8")
- return decoded_data
- except Exception as e:
- print(f"[✗] Error decoding report {report_id}: {e}")
- return None
- else:
- print(f"[!] No data found in report {report_id}")
- return None
-
- def save_individual_report(report_id, content):
- report_path = os.path.join(DATA_DIR, f"report_{report_id}.csv")
- with open(report_path, "w", encoding="utf-8") as f:
- f.write(content)
- print(f"[✓] Saved individual report to {report_path}")
-
- def append_to_aggregate(content, is_first=False):
- mode = "w" if is_first else "a"
- with open(OPENVAS_SCAN_CSV, mode, encoding="utf-8") as f:
- if not is_first:
- # Skip header row to not duplicate rows
- content = "\n".join(content.splitlines()[1:])
- f.write(content + "\n")
-
- def main():
- connection = TLSConnection(hostname=OPENVAS_HOST, port=OPENVAS_PORT)
- with Gmp(connection=connection, transform=EtreeTransform()) as gmp:
- gmp.authenticate(username=USERNAME, password=PASSWORD)
- print("Authenticated with OpenVAS")
-
- first = True
- try:
- with open(REPORT_ID_CSV, newline="") as f:
- reader = csv.DictReader(f)
- for row in reader:
- report_id = row["Report ID"]
- print(f"Fetching report: {report_id}")
- content = get_report_csv_data(gmp, report_id)
- if content:
- #save_individual_report(report_id, content)
- append_to_aggregate(content, is_first=first)
- first = False
- except FileNotFoundError:
- print(f"[✗] Missing file: {REPORT_ID_CSV}")
- except KeyError:
- print(f"[✗] 'Report ID' column not found in {REPORT_ID_CSV}")
-
- if __name__ == "__main__":
- main()
|