Cambios varios

This commit is contained in:
2020-11-12 18:56:12 -03:00
parent 9adafbae13
commit 3eae0194f7
15 changed files with 282 additions and 220 deletions

View File

@ -0,0 +1 @@
from .message import Message

View File

@ -1,4 +1,3 @@
from .email_interpreter import EmailInterpreter
from .revisor_worker import RevisorEmailWorker
from .obtenedor import Obtenedor
from .validador import Validador
from .workers import Obtenedor, Validador, Confirmador

19
src/email/definitions.py Normal file
View File

@ -0,0 +1,19 @@
import email
class Email:
def __init__(self, uid):
self.uid = uid
self.message = ''
def get(self, imap):
status, raw_data = imap.uid('fetch', self.uid, '(RFC822)')
if status != 'OK':
raise Exception('Could not recover message {0}'.format(self.uid))
self.message = email.message_from_bytes(raw_data[0][1])
def delete(self, imap):
status, result = imap.uid('STORE', self.uid, '+FLAGS', '(\\Deleted)')
if status != 'OK':
raise Exception('Could not flag message {0}'.format(self.uid))

39
src/email/functions.py Normal file
View File

@ -0,0 +1,39 @@
import keyboard
import imaplib
from src.email.definitions import Email
def exit_thread(stop, logger):
logger.log('Starting exit thread', caller='exit_thread')
keyboard.wait('Esc')
logger.log('Escape pressed', caller='exit_thread')
stop.set()
logger.log('Exit signal sent', caller='exit_thread')
def connect(imap_url, port, username, password, ssl=False):
if ssl:
server = imaplib.IMAP4_SSL(imap_url, port=port)
else:
server = imaplib.IMAP4(imap_url, port=port)
server.login(username, password)
return server
def check_inbox(server):
status, msg = server.select('INBOX')
if status != 'OK':
return None
status, ids = server.uid('search', None, 'All')
if status != 'OK':
return None
ids = ids[0].decode().split()
emails = []
for mid in ids:
em = Email(mid)
em.get(server)
emails.append(em)
return emails

View File

@ -1,112 +1,10 @@
from multiprocessing import Process
from queue import Queue
from threading import Thread, Event, Lock
import argparse
import os
from common.helper.logging import Logging
from setup.config import load_config
import pytz
from src.bosses import Bosses
import keyboard
import time
class Email(Process):
def __init__(self, configs, params, setup):
super(Email, self).__init__()
self.configs = configs
self.params = params
self.registry = {}
self.workers = []
self.working = []
self.worker_status = []
self.add_event('stop')
def setup(self, data):
for (m, n) in data['workers']:
self.register_worker(m, n)
for q in data['queues']:
self.add_queue(q)
for e in data['events']:
self.add_event(e)
for l in data['locks']:
self.add_lock(l)
def register_worker(self, module, name):
if module not in self.registry:
self.registry[module] = []
self.registry[module].append(name)
def add_worker(self, worker):
self.workers.append(worker)
def start_worker(self, module, name):
worker = getattr(module, name)
self.add_worker(worker)
self.working.append((module, name))
worker.start()
self.worker_status.append(True)
def start_workers(self):
for module, workers in self.registry:
for name in workers:
self.start_worker(module, name)
def check_workers(self):
stopped = 0
for (k, w) in enumerate(self.workers):
if not self.worker_status[k]:
stopped += 1
continue
if not w.is_alive():
self.params['logging'].log('Worker {0} stopped'.format(type(w)))
self.params['queues']['log'].put({'not': True, 'action': type(w)})
stopped += 1
self.worker_status[k] = False
(m, n) = self.working[k]
# Restart worker
self.start_worker(m, n)
if stopped == len(self.workers):
return False
return True
def join_workers(self):
[w.join(self.configs.get('supervisor.wait')) for w in self.workers if w.is_alive()]
def add_queue(self, name):
if 'queues' not in self.params or self.params['queues'] is None:
self.params['queues'] = {}
self.params['queues'][name] = Queue()
def add_event(self, name):
if 'events' not in self.params or self.params['events'] is None:
self.params['events'] = {}
self.params['events'][name] = Event()
def add_lock(self, name):
if 'locks' not in self.params or self.params['locks'] is None:
self.params['locks'] = {}
self.params['locks'][name] = Lock()
def run(self) -> None:
self.start_workers()
self.add_worker(Thread(target=exit_thread, args=(self.params['events']['stop'], self.params['logging'])))
while not self.params['events']['stop'].is_set():
if not self.check_workers():
break
time.sleep(self.configs.get('supervisor.wait'))
self.join_workers()
def exit_thread(stop, logger):
logger.log('Starting exit thread', caller='exit_thread')
keyboard.wait('Esc')
logger.log('Escape pressed', caller='exit_thread')
stop.set()
logger.log('Exit signal sent', caller='exit_thread')
from src.email.supervisor import Email
def main(args):
@ -119,14 +17,18 @@ def main(args):
'data': args.data_folder
},
'bosses': Bosses(args.data_folder),
'logging': Logging(configs.get('timezone'), args.log_folder)
'logging': Logging(configs.get('timezone'), args.log_folder, 'email'),
'logger': {
'name': 'email'
}
}
setup = {
'workers': [
('common.helper.logger', 'Worker'),
('src.email', 'Obtenedor'),
('src.email', 'Validador')
('src.email', 'Validador'),
('src.email', 'Confirmador')
],
'queues': ['log', 'emails', 'valid', 'invalid'],
'events': [],

View File

@ -10,7 +10,7 @@ from src.communication.message import Message
class RevisorEmailWorker(Thread):
def __init__(self, configs, params):
super().__init__()
self._url = configs.get('email.imap_server')
self._url = configs.get('email.server')
self._port = configs.get('email.port')
self._username = configs.get('email.username')
self._password = configs.get('email.password')

102
src/email/supervisor.py Normal file
View File

@ -0,0 +1,102 @@
from threading import Thread, Event, Lock
from queue import Queue
import importlib
from src.functions import exit_thread
import time
class Email(Thread):
def __init__(self, configs, params, setup):
super(Email, self).__init__()
self.configs = configs
self.params = params
self.registry = {}
self.workers = []
self.working = []
self.worker_status = []
self.add_event('stop')
self.setup(setup)
def setup(self, data):
for (m, n) in data['workers']:
self.register_worker(m, n)
for q in data['queues']:
self.add_queue(q)
for e in data['events']:
self.add_event(e)
for l in data['locks']:
self.add_lock(l)
def register_worker(self, module, name):
if module not in self.registry:
self.registry[module] = []
self.registry[module].append(name)
def add_worker(self, worker):
self.workers.append(worker)
def start_worker(self, module, name):
worker = getattr(module, name)
worker = worker(configs=self.configs, params=self.params)
self.add_worker(worker)
self.working.append((module, name))
worker.start()
self.worker_status.append(True)
def start_workers(self):
for module_name, workers in self.registry.items():
module = importlib.import_module(module_name)
for name in workers:
self.start_worker(module, name)
def check_workers(self):
stopped = 0
for (k, w) in enumerate(self.workers):
if not self.worker_status[k]:
stopped += 1
continue
if not w.is_alive():
self.params['logging'].log('Worker {0} stopped'.format(type(w)))
self.params['queues']['log'].put({'not': True, 'action': type(w)})
stopped += 1
self.worker_status[k] = False
(m, n) = self.working[k]
# Restart worker
self.start_worker(m, n)
if stopped == len(self.workers):
return False
return True
def join_workers(self):
[w.join(self.configs.get('supervisor.wait')) for w in self.workers if w.is_alive()]
def add_queue(self, name):
if 'queues' not in self.params or self.params['queues'] is None:
self.params['queues'] = {}
self.params['queues'][name] = Queue()
def add_event(self, name):
if 'events' not in self.params or self.params['events'] is None:
self.params['events'] = {}
self.params['events'][name] = Event()
def add_lock(self, name):
if 'locks' not in self.params or self.params['locks'] is None:
self.params['locks'] = {}
self.params['locks'][name] = Lock()
def run(self) -> None:
self.start_workers()
worker = Thread(target=exit_thread, args=(self.params['events']['stop'], self.params['logging']))
self.add_worker(worker)
worker.start()
self.worker_status.append(True)
while not self.params['events']['stop'].is_set():
if not self.check_workers():
break
time.sleep(self.configs.get('supervisor.wait'))
self.join_workers()

View File

@ -1,25 +0,0 @@
from src.worker import Worker
class Validador(Worker):
def __init__(self, configs, params):
super(Validador, self).__init__(configs, params)
self.emails = params['queues']['emails']
self.validos = params['queues']['valid']
self.invalidos = params['queues']['invalid']
self.bosses = params['bosses']
self.frec = configs.get('supervisor.wait')
def run(self):
self.logger.log('Starting', type(self))
self.diary.put({'action': 'Inicio de jornada de Validador'})
while not self.stop.is_set():
em = self.emails.get(timeout=self.frec)
if not self.bosses.is_boss(em.sender):
self.invalidos.put(em)
continue
self.validos.put(em)
self.logger.log('Exiting', type(self))
self.diary.put({'action': 'Terminando la jornada de Validador'})

View File

@ -1,11 +1,12 @@
from src.worker import Worker
from types import SimpleNamespace
from entry.email.inbox import connect, check_inbox
import re
from bs4 import BeautifulSoup
import email.utils
from src.communication.message import Message
import queue
from src.email.functions import connect, check_inbox
import time
from types import SimpleNamespace
from src.worker import Worker
from bs4 import BeautifulSoup
import re
import email.utils
from src.communication import Message
class Obtenedor(Worker):
@ -15,13 +16,13 @@ class Obtenedor(Worker):
def __init__(self, configs, params):
super(Obtenedor, self).__init__(configs, params)
self.url = configs.get('email.server')
self.port = configs.get('email.port')
self.url = configs.get('email.imap.server')
self.port = configs.get('email.imap.port')
user = {'user': '', 'password': ''}
self.user = SimpleNamespace(**user)
self.user.name = configs.get('email.user.name')
self.user.password = configs.get('email.user.password')
self.ssl = configs.get('email.ssl')
self.user.name = configs.get('email.imap.user.name')
self.user.password = configs.get('email.imap.user.password')
self.ssl = configs.get('email.imap.ssl')
self.revisados = []
@ -40,7 +41,7 @@ class Obtenedor(Worker):
output = []
if email_part.is_multipart():
for part in email_part.get_payload():
output.append(self.build_message(part))
output += self.build_message(part)
else:
html = email_part.get_payload(decode=True)
bs = BeautifulSoup(html, 'html.parser')
@ -54,13 +55,14 @@ class Obtenedor(Worker):
def run(self) -> None:
self.logger.log('Starting', type(self))
self.diary.put({'action': 'Inicio de jornada de Obtenedor'})
self.diary.put({'action': 'Inicio de turno de Obtenedor'})
while not self.stop.is_set():
e = 0
with connect(self.url, self.port, self.user.name, self.user.password, self.ssl) as imap:
self.logger.log('Getting emails', type(self))
emails = check_inbox(imap)
if emails is None:
self.logger.log('No emails found', type(self))
continue
for em in emails:
if self.is_revisado(em.uid):
@ -72,7 +74,56 @@ class Obtenedor(Worker):
self.queue.put(msg)
self.add_revisado(em.uid)
e += 1
self.diary.put({'action': 'Obtenidos {0} correos nuevos'.format(e)})
self.logger.log('{0} new emails found'.format(e), type(self))
self.diary.put({'action': 'Obtenidos{0} correos nuevos'.format(e)})
time.sleep(self.frec)
self.logger.log('Exiting', type(self))
self.diary.put({'action': 'Terminando el turno de Obtenedor'})
class Validador(Worker):
def __init__(self, configs, params):
super(Validador, self).__init__(configs, params)
self.emails = params['queues']['emails']
self.validos = params['queues']['valid']
self.invalidos = params['queues']['invalid']
self.bosses = params['bosses']
self.frec = configs.get('supervisor.wait')
def run(self):
self.logger.log('Starting', type(self))
self.diary.put({'action': 'Inicio de turno de Validador'})
while not self.stop.is_set():
try:
em = self.emails.get(timeout=self.frec)
except queue.Empty:
continue
if not self.bosses.is_boss(em.sender):
self.invalidos.put(em)
continue
self.validos.put(em)
self.logger.log('Exiting', type(self))
self.diary.put({'action': 'Terminando el turno de Validador'})
class Confirmador(Worker):
def __init__(self, configs, params):
super(Confirmador, self).__init__(configs=configs, params=params)
self.invalidos = params['queues']['invalid']
self.frec = configs.get('supervisor.wait')
def crear_mega_mensaje(self):
pass
def run(self) -> None:
self.logger.log('Starting', type(self))
while not self.stop.is_set():
try:
em = self.invalidos.get(self.frec)
except queue.Empty:
pass
self.logger.log('Exiting', type(self))

9
src/functions.py Normal file
View File

@ -0,0 +1,9 @@
import keyboard
def exit_thread(stop, logger):
logger.log('Starting exit thread', caller='exit_thread')
keyboard.wait('Esc')
logger.log('Escape pressed', caller='exit_thread')
stop.set()
logger.log('Exit signal sent', caller='exit_thread')