Browse Source

Adding new versions of the Multiprocessing implementation

SaraBeatriz 5 years ago
parent
commit
75634778fd

+ 52
- 26
Programas/Last_work/Multiprocess/bruteforce_sip-dip_threehash.py View File

@@ -1,28 +1,43 @@
1
-# Guarda lista de puertos de cada dip por cada sip
1
+####################################################
2
+# Researcher: Sara Schwarz
3
+# Advisor: Dr. Jose Ortiz
4
+#
5
+# Program Objective:
6
+#
7
+#In this work we seek to develop algorithms (brute-force approach, TRW, etc) to detect network and port
8
+#scanners on large-scale networks traffc such as those of Internet Service Providers,
9
+#Big Data research centers, and Science DMZ networks implemented in research institutions using network
10
+#flows. This specific program will follow a brute-force approach algorithm, of reading the network flows
11
+#and recording for each source ip the destination ips and the different destination ports it connected to.
12
+#Later, the number of connected ports will be compared with a threshold to classify the source ips as
13
+#either suspicious scanners or not.
14
+###############################################################
2 15
 
3 16
 from silk import *
4 17
 
5 18
 
6
-startDate = "2018/09/1"
7
-endDate = "2018/09/30"
8
-#Para filtrar por puertos. Pero no queremos todavia
9
-#minPort = 20
10
-#maxPort = 5000
11 19
 
20
+#input => list of files where each file has a list of network flows
21
+#output => threeway-hash table of the form {sip:{dip:{dport:total # of each dport}}}
22
+def verify_type(filename):
23
+    #print "Files", len(filename)
12 24
 
13
-def verify_type():
14
-    x = 0
15
-    dportHash = {} #contains amount of dport per each sip
16
-    for filename in FGlob(classname="all", type="all", start_date=startDate, end_date=endDate, site_config_file="/etc/silk/conf-v9/silk.conf", data_rootdir="/home/scratch/flow/rwflowpack/"):
17
-        for rec in silkfile_open(filename, READ):#reading the flow file
25
+    count_flow = 0
26
+    dportHash = {} #will contain amount of dport per dip per each sip
27
+    #iterates through each file that has a list of network flows
28
+    for file in filename:
29
+
30
+        #iterates through each network flow, and reads it using silkfile_open, SiLks Flow Repository function
31
+        for rec in silkfile_open(file, READ):
32
+            count_flow +=1
18 33
             sip = str(rec.sip)
19 34
             dip = str(rec.dip)
20 35
             dport = rec.dport
21
-            if (':' in sip): #Si en el paso anterior se vio que no
22
-                # print "heeloo", x
23
-                # x+=1
24
-                #                                             #tiene el length de puertos requerido, se ignora
36
+            #For our current research, we are limiting to only verifying IPv4 network flows
37
+            if (':' in sip):
25 38
                 continue
39
+            #using the hash table structure, keeps record of all the times each
40
+            #destination port for each destination ip was connected to by the same source ip
26 41
             else:
27 42
                 if sip in dportHash:
28 43
                     if dip in dportHash[sip]:
@@ -34,29 +49,40 @@ def verify_type():
34 49
                         dportHash[sip][dip] = {dport : 1}
35 50
                 else:
36 51
                     dportHash[sip] = { dip: {dport: 1} }
52
+
53
+
54
+    #print "flows", count_flow
37 55
     return dportHash
38 56
 
39 57
 
40 58
 #MAIN
41
-otherHash = {}
59
+#for accesing the network flows during a specific interval of time
60
+startDate = "2018/08/1"
61
+endDate = "2018/10/15"
62
+otherHash = {}#hash table for suspicious source ips with destination ips
42 63
 counter = 0
43
-files = FGlob(classname="all", type="all", start_date=startDate, end_date=endDate, site_config_file="/etc/silk/conf-v9/silk.conf", data_rootdir="/home/scratch/flow/rwflowpack/")
44
-files = [x for x in files]
45
-print "Flow", len(files)
64
+threshold = 100
65
+#object for retrieving files from Silk data store. Specified the interval of time, the silk configuration file location, and the silk data location
66
+files1 = FGlob(classname="all", type="all", start_date=startDate, end_date=endDate, site_config_file="/etc/silk/conf-v9/silk.conf", data_rootdir="/home/scratch/flow/rwflowpack/")
67
+files = [x for x in files1]
68
+#****************************************
69
+#  Verifying the source ips
70
+#****************************************
46 71
 flowHash = verify_type()
47
-print "After flow", len(flowHash)
48
-for sips in flowHash: #se itera por todos los dip y sus counters o puertos
72
+#Will iterate through the hash of sips and dports
73
+#Verify whether the number of different dports per desitnation ip, connected to by the same source ip
74
+#is greater than our threshold. If it is, the source ip will be added to the hash suspicious sips.
75
+
76
+for sips in flowHash:
49 77
     for dips, dports in flowHash[sips].items():
50
-        if len(dports) >= 100: #si la cantidad de puertos es mayor o igual a 100, nos interesan
51
-                                #y por lo tanto se guardan en un hash
52
-            print "DIP", dips, len(dports)
78
+        if len(dports) >= threshold:
53 79
             if sips in otherHash:
54 80
                 otherHash[sips][dips] = dports
55 81
             else:
56 82
                 otherHash[sips] = {dips: dports}
57 83
 
58 84
 for dips, dports in otherHash.items():
59
-    counter +=1 #para contar los elementos del hash
85
+    counter +=1
86
+    #prints the total number of suspicious source ips.
60 87
 
61 88
 print counter
62
-#print otherHash

+ 47
- 21
Programas/Last_work/Multiprocess/map_bruteforce_three.py View File

@@ -1,5 +1,21 @@
1
-# Guarda lista de puertos de cada dip por cada sip
2
-#ftp remote edit
1
+####################################################
2
+# Researcher: Sara Schwarz
3
+# Advisor: Dr. Jose Ortiz
4
+# VERSION #1
5
+# Program Objective:
6
+#
7
+#In this work we seek to develop algorithms (brute-force approach, TRW, etc) to detect network and port
8
+#scanners on large-scale networks traffc such as those of Internet Service Providers,
9
+#Big Data research centers, and Science DMZ networks implemented in research institutions using network
10
+#flows. This specific program will follow a brute-force approach algorithm, of reading the network flows
11
+#and recording for each source ip the destination ips and the different destination ports it connected to.
12
+#Later, the number of connected ports will be compared with a threshold to classify the source ips as
13
+#either suspicious scanners or not.
14
+#To run this algorithm we will be using high-performance methods for computing such as Map and Reduce,
15
+#specifically Python's Pool Class Library.
16
+###############################################################
17
+
18
+
3 19
 from silk import *
4 20
 import multiprocessing as mp
5 21
 
@@ -63,36 +79,46 @@ def join_hash(list):
63 79
 
64 80
 
65 81
 def main():
66
-    startDate = "2018/09/1"
67
-    endDate = "2018/09/30"
68
-    otherHash = {}
82
+    #for accesing the network flows during a specific interval of time
83
+    startDate = "2018/08/1"
84
+    endDate = "2018/10/15"
85
+    otherHash = {} #hash table for suspicious source ips with destination ips
69 86
     counter = 0
70
-    process_num = 8
71
-    pool = mp.Pool(processes=process_num)
87
+    threshold = 100
88
+    #object for retrieving files from Silk data store. Specified the interval of time, the silk configuration file location, and the silk data location
72 89
     files = FGlob(classname="all", type="all", start_date=startDate, end_date=endDate, site_config_file="/etc/silk/conf-v9/silk.conf", data_rootdir="/home/scratch/flow/rwflowpack/")
73
-
74
-    files = [x for x in files]
75
-    print len(files)
76
-    fileHash = pool.map(verify_type, files) # FGlob(classname="all", type="all", start_date=startDate, end_date=endDate, site_config_file="/etc/silk/conf-v9/silk.conf", data_rootdir="/home/scratch/flow/rwflowpack/"))
90
+#****************************************
91
+# Using Pythons Pool Class for multiprocessing
92
+#****************************************
93
+    process_num = 2
94
+    pool = mp.Pool(processes=process_num)
95
+    #change files1 from FGlob object type to a list of silk data files
96
+    files1 = [x for x in files]
97
+    #Using map from Pool, returns a list of a hash per process, being the hash the output of verify_type
98
+    fileHash = pool.map(verify_type, files1)
99
+    #returns the hash of all hashes merged.
77 100
     flowHash = join_hash(fileHash)
78
-    print "FLOW", len(flowHash)
79
-    for sips in flowHash: #se itera por todos los dip y sus counters o puertos
80
-        #print sips
101
+    #print len(flowHash)
102
+    #****************************************
103
+#****************************************
104
+#  Verifying the source ips
105
+#****************************************
106
+    #Will iterate through the hash of sips and dports
107
+    #Verify whether the number of different dports per desitnation ip, connected to by the same source ip
108
+    #is greater than our threshold. If it is, the source ip will be added to the hash suspicious sips.
109
+    for sips in flowHash:
81 110
         for dips, dports in flowHash[sips].items():
82
-            #print "Dip", dips, dports
83
-            if len(dports) >= 100: #si la cantidad de puertos es mayor o igual a 100, nos interesan
84
-                                #y por lo tanto se guardan en un hash
85
-                print "DIP", dips, len(dports)
111
+            if len(dports) >= threshold:
86 112
                 if sips in otherHash:
87 113
                     otherHash[sips][dips] = dports
88 114
                 else:
89 115
                     otherHash[sips] = {dips: dports}
90 116
 
91 117
     for dips, dports in otherHash.items():
92
-        counter +=1 #para contar los elementos del hash
93
-
118
+        counter +=1
119
+    #prints the total number of suspicious source ips.
94 120
     print counter
95
-#print otherHash
121
+
96 122
 
97 123
 if __name__== "__main__":
98 124
   main()

+ 169
- 0
Programas/Last_work/Multiprocess/map_bruteforce_three_2.py View File

@@ -0,0 +1,169 @@
1
+####################################################
2
+# Researcher: Sara Schwarz
3
+# Advisor: Dr. Jose Ortiz
4
+# VERSION #2
5
+# Program Objective:
6
+#
7
+#In this work we seek to develop algorithms (brute-force approach, TRW, etc) to detect network and port
8
+#scanners on large-scale networks traffc such as those of Internet Service Providers,
9
+#Big Data research centers, and Science DMZ networks implemented in research institutions using network
10
+#flows. This specific program will follow a brute-force approach algorithm, of reading the network flows
11
+#and recording for each source ip the destination ips and the different destination ports it connected to.
12
+#Later, the number of connected ports will be compared with a threshold to classify the source ips as
13
+#either suspicious scanners or not.
14
+#To run this algorithm we will be using high-performance methods for computing such as Map and Reduce,
15
+#specifically Python's Pool Class Library.
16
+###############################################################
17
+
18
+
19
+from silk import *
20
+import multiprocessing as mp
21
+
22
+
23
+#input => list of files where each file has a list of network flows
24
+#output => threeway-hash table of the form {sip:{dip:{dport:total # of each dport}}}
25
+def verify_type(filename):
26
+    #print "Files", len(filename)
27
+
28
+    count_flow = 0
29
+    dportHash = {} #will contain amount of dport per dip per each sip
30
+    #iterates through each file that has a list of network flows
31
+    for file in filename:
32
+
33
+        #iterates through each network flow, and reads it using silkfile_open, SiLks Flow Repository function
34
+        for rec in silkfile_open(file, READ):
35
+            count_flow +=1
36
+            sip = str(rec.sip)
37
+            dip = str(rec.dip)
38
+            dport = rec.dport
39
+            #For our current research, we are limiting to only verifying IPv4 network flows
40
+            if (':' in sip):
41
+                continue
42
+            #using the hash table structure, keeps record of all the times each
43
+            #destination port for each destination ip was connected to by the same source ip
44
+            else:
45
+                if sip in dportHash:
46
+                    if dip in dportHash[sip]:
47
+                        if dport in dportHash[sip][dip]:
48
+                            dportHash[sip][dip][dport] += 1
49
+                        else:
50
+                            dportHash[sip][dip][dport] = 1
51
+                    else:
52
+                        dportHash[sip][dip] = {dport : 1}
53
+                else:
54
+                    dportHash[sip] = { dip: {dport: 1} }
55
+
56
+
57
+    #print "flows", count_flow
58
+    return dportHash
59
+
60
+#input => list of hash tables of the form {sip:{dip:{dport:total # of each dport}}}
61
+#output => hash table of the form {sip:{dip:{dport:total # of each dport}}}
62
+#Method => iterates through each hash
63
+def join_hash(list):
64
+    complete_hash ={} #will contain the hash table of all the hash tables once formed
65
+
66
+    for i in list: #will iterate through each hash of the list
67
+        #will iterate through each source ip key as sip with the hash table of the form {dip:{dport:#}} as the value
68
+        for sip, hash in i.items():
69
+            #verify if that source ip is already in the merged hash.
70
+            if sip in complete_hash:
71
+                #will iterate through each destination ip key as dip with the hash table of the form {dport:#} as the value
72
+                for dip, dports in i[sip].items():
73
+                    #verify if that destination ip is already in the hash of the sip's value in the merged hash
74
+                    if dip in complete_hash[sip]:
75
+                        #will iterate through each destination port key as number with the total # of times it was connected as the value
76
+                        for number, value in dports.items():
77
+                            #verify if that destination port is already in the hash of the dip's value in the hash of the sips'value in the merged hash
78
+                            if number in complete_hash[sip]:
79
+                                #Add the value to the dport's value in the merged hash
80
+                                complete_hash[sip][dip][number] += value
81
+
82
+                            else:
83
+                                #Add the destination port with its value to the merged hash
84
+                                complete_hash[sip][dip][number]= value
85
+                    else:
86
+                        #Add the destination ip with its hash table of dports to the merged hash
87
+                        complete_hash[sip][dip]= dports
88
+            else:
89
+                #Add the source ip with its hash table of destination ip and dports to the merged hash
90
+                complete_hash[sip]= hash
91
+    return complete_hash
92
+
93
+
94
+def main():
95
+    #for accesing the network flows during a specific interval of time
96
+    startDate = "2018/08/1"
97
+    endDate = "2018/10/15"
98
+    otherHash = {} #hash table for suspicious source ips with destination ips
99
+    counter = 0
100
+    threshold = 100
101
+    #object for retrieving files from Silk data store. Specified the interval of time, the silk configuration file location, and the silk data location
102
+    files = FGlob(classname="all", type="all", start_date=startDate, end_date=endDate, site_config_file="/etc/silk/conf-v9/silk.conf", data_rootdir="/home/scratch/flow/rwflowpack/")
103
+#****************************************
104
+# Using Pythons Pool Class for multiprocessing
105
+#****************************************
106
+    process_num = 2
107
+    pool = mp.Pool(processes=process_num)
108
+    #change files1 from FGlob object type to a list of silk data files
109
+    files1 = [x for x in files]
110
+    files_list = []
111
+    #Send each process an equal amount of files
112
+    blocksize = len(files1) / process_num
113
+    for x in range(process_num):
114
+        files_list.append(files1[0:blocksize])
115
+        files1 = files1[blocksize:]
116
+    #in the case that the number of files is not divisible by the number of processes
117
+    for i in files1:
118
+        files_list[files1.index(i)].append(i)
119
+    #Using map from Pool, returns a list of a hash per process, being the hash the output of verify_type
120
+    fileHash = pool.map(verify_type, files_list)
121
+    #returns the hash of all hashes merged.
122
+    flowHash = join_hash(fileHash)
123
+    #print len(flowHash)
124
+#****************************************
125
+
126
+#****************************************
127
+#  Verifying the source ips
128
+#****************************************
129
+    #Will iterate through the hash of sips and dports
130
+    #Verify whether the number of different dports per desitnation ip, connected to by the same source ip
131
+    #is greater than our threshold. If it is, the source ip will be added to the hash suspicious sips.
132
+    for sips in flowHash:
133
+        for dips, dports in flowHash[sips].items():
134
+            if len(dports) >= threshold:
135
+                if sips in otherHash:
136
+                    otherHash[sips][dips] = dports
137
+                else:
138
+                    otherHash[sips] = {dips: dports}
139
+
140
+    for dips, dports in otherHash.items():
141
+        counter +=1
142
+    #prints the total number of suspicious source ips.
143
+    print counter
144
+
145
+
146
+if __name__== "__main__":
147
+  main()
148
+
149
+
150
+
151
+
152
+  # def Filter(fd, sip, dip, sport, dport, dportHash):
153
+  #
154
+  # 	if dport > 1024 and (sport <= 1024 or (sport >= 8000 and sport < 9000)):
155
+  # 		return
156
+  # 	if sip in dportHash:
157
+  # #       	if dip in dportHash[sip]["dips"]:
158
+  # #        	        dportHash[sip]["dips"][dip] += 1
159
+  # #		else:
160
+  # #			dportHash[sip]["dips"][dip] = 1
161
+  # 		if dport in dportHash[sip]["dports"]:
162
+  #                	        dportHash[sip]["dports"][dport] += 1
163
+  # 			#return
164
+  #                	else:
165
+  #                		dportHash[sip]["dports"][dport] = 1
166
+  #      	else:
167
+  # 		dportHash[sip] = {"dports": {}}
168
+  # 		dportHash[sip]["dips"] = {}
169
+  # 	#fd.write("%s:%s:%s:%s\n" % (sip, dip, sport, dport))

+ 24
- 0
Programas/Last_work/Multiprocess/trial_map.py View File

@@ -0,0 +1,24 @@
1
+#Trial program to practice using Pool class
2
+
3
+import multiprocessing as mp
4
+
5
+
6
+process_num = 8
7
+pool = mp.Pool(processes=process_num)
8
+files1 = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26]
9
+
10
+files_list = []
11
+quenepas_blocksize = len(files1) / process_num
12
+for x in range(process_num):
13
+    files_list.append(files1[0:quenepas_blocksize])
14
+    files1 = files1[quenepas_blocksize:]
15
+    print files1
16
+print len(files1)
17
+print files_list
18
+for i in files1:
19
+    print files1.index(i)
20
+    print i
21
+    files_list[files1.index(i)].append(i)
22
+
23
+print files1
24
+print files_list

+ 70
- 33
Programas/Last_work/Multiprocess/trw.py View File

@@ -1,32 +1,47 @@
1
-##################################
2
-#   TRW sin reduccion           #
3
-#      Para Data Set de la Uni    #
4
-################################
1
+####################################################
2
+# Researcher: Sara Schwarz
3
+# Advisor: Dr. Jose Ortiz
4
+# VERSION #2
5
+# Program Objective:
6
+#
7
+#In this work we seek to develop algorithms (brute-force approach, TRW, etc) to detect network and port
8
+#scanners on large-scale networks traffc such as those of Internet Service Providers,
9
+#Big Data research centers, and Science DMZ networks implemented in research institutions using network
10
+#flows. This specific program will follow the threshold random walk algorithm, of reading the network flows
11
+#and recording for each source ip amount of succesful connections and failed connections.
12
+#            Succesful connections => It means to have completed the three way handshake, to have received the
13
+#                                       Acknowledgment from the desitnaiton node.
14
+#            Failed connections => It means to not have completed the three way handshake, to have not received
15
+#                                   any answer, meaning it only has the Syncronization.
16
+#Later, the ratio between failed and succesful connections will be compared with a threshold to classify the source ips as
17
+#either suspicious scanners or not.
18
+###############################################################
5 19
 
6 20
 
7 21
 
8 22
 from silk import *
9
-startDate = "2018/08/14"
10
-endDate = "2018/08/15"
11
-p = 2
23
+
12 24
 
13 25
 def Analisis():
14 26
     counter = 0 # borrar luego
15
-    sampleHash={} #hash para contener los dip con el numero de conecciones y failed coneccciones
27
+    counter_files=0
28
+    sampleHash={} #hash contains each sip with their dip and failed and succesful connections
16 29
     flow_counter = 0
17 30
     for filename in FGlob(classname="all", type="all", start_date=startDate, end_date=endDate, site_config_file="/etc/silk/conf-v9/silk.conf", data_rootdir="/home/scratch/flow/rwflowpack/"):
31
+        counter_files +=1
18 32
         for rec in silkfile_open(filename, READ):#reading the flow file
19 33
             flow_counter += 1
20 34
             if (':' in str(rec.sip)):
21 35
 				continue
22 36
 	    else:
23
-            	connection = [0] * 2 #Lista para contener los valores de conecciones failed y conecciones buenas
24
-            	sip = str(rec.sip) #Devuelve el ip en notacion punto-decimal
37
+            	connection = [0] * 2 #array to contain the amount of failed connections and amount of succesful conenctions
38
+            	sip = str(rec.sip)
25 39
             	flags = str(rec.tcpflags)
26 40
             #print sip, flags
27 41
 	    #counter +=1
28
-            	if 'A' in flags: ####arreglar
29
-                	connection[1]=1 #good conections
42
+                #verify if the network flow contains the Acknowledgment flag, which will imply a succesful connection
43
+            	if 'A' in flags:
44
+                	connection[1]=1 #succesful conections
30 45
                 else:
31 46
                 	connection [0] =1 #failed conections
32 47
             	if sip in sampleHash:
@@ -35,27 +50,49 @@ def Analisis():
35 50
             	else:
36 51
                 	sampleHash[sip] = [connection[0], connection[1]]
37 52
 	    #print sampleHash
38
-    #print flow_counter
53
+    # print "flows", flow_counter
54
+    # print counter_files
39 55
     return sampleHash
40 56
 
41
-sip_connections_list = Analisis()
57
+def main():
58
+    startDate = "2018/06/1"
59
+    endDate = "2018/06/30"
60
+    p_ratio = 2 #threshold of the ratio. This ratio is chosen.
61
+    p_counter = 100 #if there are only failed connections, rather than the ratio,
62
+                    #the total number of failed connection is compared to this threshold.
63
+    sip_connections_list = Analisis()
64
+    #Receives the list hash of sip with the number of connections per dip
65
+    #print len(sip_connections_list)
42 66
 #print sip_connections_list
43
-sipList = {"sipList":[]}
44
-for sip in sip_connections_list:
45
-          if (sip_connections_list[sip][1] != 0) and ((sip_connections_list[sip][0] / sip_connections_list[sip][1])  < 1) : #si la cantidad de succesful
46
-                                #b                #g                    #connections es mas que failed connections
47
-                      #not scanner, ignore
48
-                      continue
49
-          elif (sip_connections_list[sip][1] != 0) and ((sip_connections_list[sip][0] / sip_connections_list[sip][1])  < p): #mas failed que succesful, pero no llega al threshold
50
-                      #not scanner, ignore
51
-                      continue                          #se debe tener en cuenta que es suspicious pero no taaanto
52
-          elif (sip_connections_list[sip][1] == 0 and sip_connections_list[sip][0] > 10): #el ratio de failed a succesful llega al threshold pautado
53
-                            #scanner, oh oh
54
-                    hash = {sip:sip_connections_list[sip]}
55
-                    sipList["sipList"].append(hash)
56
-          else:
57
-                            #scanner, oh oh
58
-                    hash = {sip:sip_connections_list[sip]}
59
-                    sipList["sipList"].append(hash)
60
-
61
-print sipList
67
+    counter = 1
68
+    sipList = {"sipList":[]}
69
+    for sip in sip_connections_list:
70
+        #print counter, sip, "sip0", sip_connections_list[sip][0], "sip1", sip_connections_list[sip] [1]
71
+        counter +=1
72
+
73
+        #compares the ratio of succesful connections to failed connections with a given threshold
74
+        #If the amount is larger than the threshold it is added to a list of suspicious sips.
75
+        if (sip_connections_list[sip][1] != 0) and ((sip_connections_list[sip][0] / sip_connections_list[sip][1])  > p_ratio):
76
+            hash = {sip:sip_connections_list[sip]}
77
+            #print hash
78
+            sipList["sipList"].append(hash)
79
+            continue
80
+
81
+        elif (sip_connections_list[sip][1] == 0 and sip_connections_list[sip][0] > p_counter):
82
+
83
+            hash = {sip:sip_connections_list[sip]}
84
+            #print hash
85
+            sipList["sipList"].append(hash)
86
+            continue
87
+        #If it does not reach the threshold, the network flow is ignored.
88
+        else:
89
+            continue
90
+
91
+    counter = 0
92
+    for i in sipList["sipList"]:
93
+        counter +=1
94
+    print counter
95
+
96
+
97
+if __name__== "__main__":
98
+  main()

+ 137
- 0
Programas/Last_work/Multiprocess/trw_map.py View File

@@ -0,0 +1,137 @@
1
+####################################################
2
+# Researcher: Sara Schwarz
3
+# Advisor: Dr. Jose Ortiz
4
+# VERSION #2
5
+# Program Objective:
6
+#
7
+#In this work we seek to develop algorithms (brute-force approach, TRW, etc) to detect network and port
8
+#scanners on large-scale networks traffc such as those of Internet Service Providers,
9
+#Big Data research centers, and Science DMZ networks implemented in research institutions using network
10
+#flows. This specific program will follow the threshold random walk algorithm, of reading the network flows
11
+#and recording for each source ip amount of succesful connections and failed connections.
12
+#            Succesful connections => It means to have completed the three way handshake, to have received the
13
+#                                       Acknowledgment from the desitnaiton node.
14
+#            Failed connections => It means to not have completed the three way handshake, to have not received
15
+#                                   any answer, meaning it only has the Syncronization.
16
+#Later, the ratio between failed and succesful connections will be compared with a threshold to classify the source ips as
17
+#either suspicious scanners or not.
18
+#To run this algorithm we will be using high-performance methods for computing such as Map and Reduce,
19
+#specifically Python's Pool Class Library.
20
+###############################################################
21
+
22
+from silk import *
23
+import multiprocessing as mp
24
+
25
+
26
+def Analisis():
27
+    counter = 0 # borrar luego
28
+    counter_files=0
29
+    sampleHash={} #hash contains each sip with their dip and failed and succesful connections
30
+    flow_counter = 0
31
+    for filename in FGlob(classname="all", type="all", start_date=startDate, end_date=endDate, site_config_file="/etc/silk/conf-v9/silk.conf", data_rootdir="/home/scratch/flow/rwflowpack/"):
32
+        counter_files +=1
33
+        for rec in silkfile_open(filename, READ):#reading the flow file
34
+            flow_counter += 1
35
+            if (':' in str(rec.sip)):
36
+				continue
37
+	    else:
38
+            	connection = [0] * 2 #array to contain the amount of failed connections and amount of succesful conenctions
39
+            	sip = str(rec.sip)
40
+            	flags = str(rec.tcpflags)
41
+            #print sip, flags
42
+	    #counter +=1
43
+                #verify if the network flow contains the Acknowledgment flag, which will imply a succesful connection
44
+            	if 'A' in flags:
45
+                	connection[1]=1 #succesful conections
46
+                else:
47
+                	connection [0] =1 #failed conections
48
+            	if sip in sampleHash:
49
+	                sampleHash[sip][0]+= connection[0]
50
+	                sampleHash[sip][1]+= connection[1]
51
+            	else:
52
+                	sampleHash[sip] = [connection[0], connection[1]]
53
+	    #print sampleHash
54
+    # print "flows", flow_counter
55
+    # print counter_files
56
+    return sampleHash
57
+
58
+
59
+
60
+def merge_list(list_hash):
61
+    sampleHash = {}
62
+    for sip_hash in list_hash:
63
+        #print sip_hash
64
+        for sip, arr in sip_hash.items():
65
+            if sip in sampleHash:
66
+                sampleHash[sip][0]+= arr[0]
67
+                sampleHash[sip][1]+= arr[1]
68
+            else:
69
+                sampleHash[sip] = [arr[0], arr[1]]
70
+    return sampleHash
71
+
72
+
73
+
74
+def main():
75
+    startDate = "2018/06/1"
76
+    endDate = "2018/06/30"
77
+    p_ratio = 2 #threshold of the ratio. This ratio is chosen.
78
+    p_counter = 100 #if there are only failed connections, rather than the ratio,
79
+                    #the total number of failed connection is compared to this threshold.
80
+    process_num = 2
81
+    pool = mp.Pool(processes=process_num)
82
+    files1 = FGlob(classname="all", type="all", start_date=startDate, end_date=endDate, site_config_file="/etc/silk/conf-v9/silk.conf", data_rootdir="/home/scratch/flow/rwflowpack/")
83
+#****************************************
84
+
85
+#****************************************
86
+    files1 = [x for x in files1] #change files1 from FGlob object to a list
87
+        #print len(files1)
88
+    files_list = []
89
+    quenepas_blocksize = len(files1) / process_num
90
+    for x in range(process_num):
91
+        files_list.append(files1[0:quenepas_blocksize])
92
+        files1 = files1[quenepas_blocksize:]
93
+
94
+    for i in files1:
95
+        files_list[files1.index(i)].append(i)
96
+
97
+#****************************************
98
+
99
+#****************************************
100
+
101
+    fileHash = pool.map(Analisis, files_list) # FGlob(classname="all", type="all", start_date=startDate, end_date=endDate, site_config_file="/etc/silk/conf-v9/silk.conf", data_rootdir="/home/scratch/flow/rwflowpack/"))
102
+    #print fileHash[0]['136.145.231.48']
103
+    sip_connections_list = merge_list(fileHash)
104
+    #print sip_connections_list['136.145.231.48']
105
+#print sip_connections_list
106
+counter = 1
107
+sipList = {"sipList":[]}
108
+for sip in sip_connections_list:
109
+    #print counter, sip, "sip0", sip_connections_list[sip][0], "sip1", sip_connections_list[sip] [1]
110
+    counter +=1
111
+
112
+    #compares the ratio of succesful connections to failed connections with a given threshold
113
+    #If the amount is larger than the threshold it is added to a list of suspicious sips.
114
+    if (sip_connections_list[sip][1] != 0) and ((sip_connections_list[sip][0] / sip_connections_list[sip][1])  > p_ratio):
115
+        hash = {sip:sip_connections_list[sip]}
116
+        #print hash
117
+        sipList["sipList"].append(hash)
118
+        continue
119
+
120
+    elif (sip_connections_list[sip][1] == 0 and sip_connections_list[sip][0] > p_counter):
121
+
122
+        hash = {sip:sip_connections_list[sip]}
123
+        #print hash
124
+        sipList["sipList"].append(hash)
125
+        continue
126
+    #If it does not reach the threshold, the network flow is ignored.
127
+    else:
128
+        continue
129
+
130
+counter = 0
131
+for i in sipList["sipList"]:
132
+    counter +=1
133
+print counter
134
+
135
+
136
+if __name__== "__main__":
137
+main()