123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 |
- from threading import Timer
-
- #Once an hour has passed, this function erases the hash table of
- #event ids
- def prevent_overflow():
- global hash_EventID
- hash_EventID = {}
- #Starts the timer once again. (First argument is time in seconds)
- Timer(60*60, prevent_overflow).start()
-
- #This function recieves from an Event, its ID and Data payload;
- #If that id has not been received before, it calles the Function
- # processEventsWithoutDuplicates
- def processEvents(EventId, EventData):
- if EventID not in hash_EventID:
- processEventsWithoutDuplicates(EventID, EventData)
- hash_EventID[EventID] = 1
-
- #This function does something with the events that are not duplicated
- def processEventsWithoutDuplicates(EventId, EventData):
- pass
-
- def main():
- #hash structure to record the event ids
- hash_EventID = {}
- #create a thread of a timer.
- #once it ends, it will call the function prevent_overflow
- Timer(60*60, prevent_overflow).start()
-
- #iterator receiving the data and calling the processEvents function
- for elem in data_stream:
- #take the event id and data payload
- data = data_object(elem)
- processEvents(data.eventid, data.eventdata)
-
-
-
-
- processEvents 1 , data
- processEvents 2 , data
- processEvents 3 , data
- processEvents 1 , data
- processEvents 1 , data
-
- expectation
- processEventsWithoutDuplicates 1 , data
- processEventsWithoutDuplicates 2 , data
- processEventsWithoutDuplicates 3 , data
|