Преглед изворни кода

Commiting data-stream simulator

Sara пре 5 година
родитељ
комит
8f36f8c4cb

+ 48
- 0
Programas/Data_Events/Events-data-stream.py Прегледај датотеку

@@ -0,0 +1,48 @@
1
+from threading import Timer
2
+
3
+#Once an hour has passed, this function erases the hash table of
4
+#event ids
5
+def prevent_overflow():
6
+    global hash_EventID
7
+    hash_EventID = {}
8
+    #Starts the timer once again. (First argument is time in seconds)
9
+    Timer(60*60, prevent_overflow).start()
10
+
11
+#This function recieves from an Event, its ID and Data payload;
12
+#If that id has not been received before, it calles the Function
13
+# processEventsWithoutDuplicates
14
+def processEvents(EventId, EventData):
15
+    if EventID not in hash_EventID:
16
+        processEventsWithoutDuplicates(EventID, EventData)
17
+        hash_EventID[EventID] = 1
18
+
19
+#This function does something with the events that are not duplicated
20
+def processEventsWithoutDuplicates(EventId, EventData):
21
+    pass
22
+
23
+def main():
24
+    #hash structure to record the event ids
25
+    hash_EventID = {}
26
+    #create a thread of a timer.
27
+    #once it ends, it will call the function prevent_overflow
28
+    Timer(60*60, prevent_overflow).start()
29
+
30
+    #iterator receiving the data and calling the processEvents function
31
+    for elem in data_stream:
32
+        #take the event id and data payload
33
+        data = data_object(elem)
34
+        processEvents(data.eventid, data.eventdata)
35
+
36
+
37
+
38
+
39
+processEvents 1 , data
40
+processEvents 2 , data
41
+processEvents 3 , data
42
+processEvents 1 , data
43
+processEvents 1 , data
44
+
45
+expectation
46
+processEventsWithoutDuplicates 1 , data
47
+processEventsWithoutDuplicates 2 , data
48
+processEventsWithoutDuplicates 3 , data

+ 27
- 0
Programas/Data_Events/simulating_data-stream.py Прегледај датотеку

@@ -0,0 +1,27 @@
1
+from threading import Timer
2
+import random
3
+
4
+#Objective: receive specific information, and store.
5
+#Set a timer and once it has passed empty the storing structure
6
+#and renew the timer and proceed to continue storing
7
+
8
+
9
+def bleh():
10
+    print "\nEmpyting Event Id Hash\n"
11
+    global hash
12
+    hash = {}
13
+    t1 = Timer(3, bleh).start()
14
+    t.cancel()
15
+
16
+
17
+if __name__ == '__main__':
18
+    hash = {}
19
+    t = Timer(1, bleh)
20
+    t.start()
21
+
22
+    eventidlist = [1,1,2,3,4,3,2,3,4,5,3,3,6,7,8,6,5,6,88,9,7,6]
23
+    while True: #Simulating "data streaming"
24
+        for num in eventidlist:
25
+            if num not in hash:
26
+                hash[num] = 1
27
+                print "process Event", num