Source code for vent.core.file_drop.file_drop

import json
import os
import sys
import time
import uuid

import pika
from redis import Redis
from redis import StrictRedis
from rq import Queue
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer


[docs]class GZHandler(PatternMatchingEventHandler): """ Handles when an event on the directory being watched happens that matches the values in patterns """ patterns = ['*'] # want to ignore certain pcap files from splitter as they contain junk ignore_patterns = ['*-miscellaneous*'] # don't want to process files in on_modified for files that have already # been created and processed created_files = set() try: # let jobs run for up to one day q = Queue(connection=Redis(host='redis'), default_timeout=86400) r = StrictRedis(host='redis', port=6379, db=0) except Exception as e: # pragma: no cover print('Unable to connect to redis:', str(e))
[docs] def process(self, event): """ event.event_type 'modified' | 'created' | 'moved' | 'deleted' event.is_directory True | False event.src_path path/to/observed/file """ uid = str(uuid.uuid4()) hostname = os.environ.get('VENT_HOST') if not hostname: hostname = '' try: # TODO should directories be treated as bulk paths to send to a # plugin? if not event.is_directory: spath = event.src_path # wait for long copies to finish historicalSize = -1 while (historicalSize != os.path.getsize(spath)): historicalSize = os.path.getsize(spath) time.sleep(0.1) if os.path.getsize(spath) == 0: spath = str(spath) print('file drop ignoring empty file: {0}'.format(spath)) if spath.startswith('/files/trace_'): key = spath.split('_')[1] # Rabbit settings exchange = 'topic-poseidon-internal' exchange_type = 'topic' routing_key = 'poseidon.algos.decider' message = {} message[key] = {'valid': False, 'source': 'file_drop'} message = json.dumps(message) # Send Rabbit message try: connection = pika.BlockingConnection( pika.ConnectionParameters(host='rabbit') ) channel = connection.channel() channel.exchange_declare( exchange=exchange, exchange_type=exchange_type ) channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message) connection.close() except Exception as e: print('failed to send rabbit message because: ' + str(e)) return # check if the file was already queued and ignore exists = False print(uid + ' started ' + spath) jobs = self.r.keys(pattern='rq:job*') for job in jobs: print(uid + ' ***') description = self.r.hget( job, 'description').decode('utf-8') print(uid + ' ' + description) if description.startswith("watch.file_queue('"): print(uid + ' ' + description.split("watch.file_queue('" + hostname + '_')[1][:-2]) print(uid + ' ' + spath) if description.split("watch.file_queue('" + hostname + '_')[1][:-2] == spath: print(uid + ' true') exists = True elif description.startswith("watch.gpu_queue('"): print(uid + ' ' + description.split('"file": "')[1].split('"')[0]) print(uid + ' ' + spath) if description.split('"file": "')[1].split('"')[0] == spath: print(uid + ' true') exists = True print(uid + ' ***') if not exists: # !! TODO this should be a configuration option in the # vent.template print(uid + " let's queue it " + spath) # let jobs be queued for up to 30 days self.q.enqueue('watch.file_queue', hostname + '_' + spath, ttl=2592000) print(uid + ' end ' + spath) except Exception as e: # pragma: no cover print('file drop error: ' + str(e))
[docs] def on_created(self, event): self.created_files.add(event.src_path) self.process(event)
[docs] def on_modified(self, event): # don't perform any action if file was already created or file is # deleted if (event.src_path not in self.created_files and os.path.exists(event.src_path)): # add to created files because the file was moved into directory, # which is what should be creating it, but some OS's treat it as a # modification with docker mounts self.created_files.add(event.src_path) self.process(event)
if __name__ == '__main__': # pragma: no cover args = None if len(sys.argv) > 1: args = sys.argv[1:] observer = Observer() observer.schedule(GZHandler(), path=args[0] if args else '/files', recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: # pragma: no cover observer.stop() observer.join()