Files
secretary/src/email/workers.py

285 lines
9.1 KiB
Python

import queue
from src.email.functions import connect, check_inbox
import time
from types import SimpleNamespace
from src.worker import Worker
from bs4 import BeautifulSoup
import re
import email.utils
from src.communication import Message
import json
from src.functions import dump_queue
class Obtenedor(Worker):
"""
Trabajador que obtiene la lista de correos del inbox configurado
"""
def __init__(self, configs, params):
super(Obtenedor, self).__init__(configs, params)
self.name = 'Email:Obtenedor'
self.url = configs.get('email.imap.server')
self.port = configs.get('email.imap.port')
user = {'user': '', 'password': ''}
self.user = SimpleNamespace(**user)
self.user.name = configs.get('email.imap.user.name')
self.user.password = configs.get('email.imap.user.password')
self.ssl = configs.get('email.imap.ssl')
self.filename = params['filenames']['revisados']
self.revisados = []
self.load_revisados()
self.queue = params['queues']['emails']
self.frec = configs.get('supervisor.wait')
def is_revisado(self, uid):
return uid in self.revisados
def load_revisados(self):
data = []
try:
with open(self.filename, 'r') as f:
data = json.load(f)
except FileNotFoundError:
pass
self.revisados = data
def save_revisados(self):
data = []
try:
with open(self.filename, 'r') as f:
data = json.load(f)
except FileNotFoundError:
pass
for uid in self.revisados:
if uid not in data:
data.append(uid)
with open(self.filename, 'w') as f:
json.dump(data, f)
def add_revisado(self, uid):
if self.is_revisado(uid):
return
self.revisados.append(uid)
def build_message(self, email_part):
output = []
if email_part.is_multipart():
for part in email_part.get_payload():
output += self.build_message(part)
else:
html = email_part.get_payload(decode=True)
bs = BeautifulSoup(html, 'html.parser')
if bs.body:
html = bs.body.get_text()
else:
html = bs.get_text()
html = re.sub(' +', ' ', re.sub("\n+", ' ', html)).strip(' ')
output.append(html)
return output
def run(self) -> None:
self.start_turn()
while not self.stop.is_set():
e = 0
with connect(self.url, self.port, self.user.name, self.user.password, self.ssl) as imap:
self.logger.log('Getting emails', type(self))
emails = check_inbox(imap)
if emails is None:
self.logger.log('No emails found', type(self))
continue
for em in emails:
if self.is_revisado(em.uid):
continue
sender = em.message['from']
text = ' '.join([em.message['subject'] + '.'] + self.build_message(em.message))
msg = Message('email', text=text, original=em, sender=sender,
datetime=email.utils.parsedate_to_datetime(em.message['Date']))
self.queue.put(msg)
self.add_revisado(em.uid)
e += 1
self.logger.log('{0} new emails found'.format(e), type(self))
self.diary.put({'message': 'Obtenidos {0} correos nuevos'.format(e)})
time.sleep(self.frec)
self.save_revisados()
self.end_turn()
class Validador(Worker):
"""
Trabajador que valida segun las reglas establecidas
Reglas:
1. Listado de jefes, con sus correos en la libreta de contactos -> validos
2. Instrucciones conocidas -> invalidos, pero para revisar
3. Listado de spam -> borrar
"""
def __init__(self, configs, params):
super(Validador, self).__init__(configs=configs, params=params)
self.name = 'Email:Validador'
self.emails = params['queues']['emails']
self.validos = params['queues']['valid']
self.invalidos = params['queues']['invalid']
self.borrar = params['queues']['borrar']
self.bosses = params['bosses']
self.instrucciones = params['instrucciones']
self.frec = configs.get('supervisor.wait')
def validar_bosses(self, sender):
return self.bosses.is_boss(sender)
def validar_instrucciones(self, message):
return self.instrucciones.is_valid(message.original.message['subject'])
def validar(self, message):
if self.validar_bosses(message.sender):
self.validos.put(message)
return
if self.validar_instrucciones(message):
self.invalidos.put(message)
return
self.borrar.put(message)
def run(self):
self.start_turn()
while not self.stop.is_set():
try:
em = self.emails.get(timeout=self.frec)
except queue.Empty:
continue
self.validar(em)
# Cleanup
[self.validar(em) for em in dump_queue(self.emails, self.frec)]
self.end_turn()
class Consultador(Worker):
"""
Trabajador que registra los correos que no son de jefes para consulta
"""
def __init__(self, configs, params):
super(Consultador, self).__init__(configs=configs, params=params)
self.name = 'Email:Consultador'
self.filename = params['filenames']['consultas']
self.invalidos = params['queues']['invalid']
self.frec = configs.get('supervisor.wait')
self.max = configs.get('email.max')
self.mensajes = []
def is_full(self):
return len(self.mensajes) >= self.max
def save_messages(self):
data = []
try:
with open(self.filename, 'r') as f:
data = json.load(f)
except FileNotFoundError:
pass
for m in self.mensajes:
if m not in data:
data.append(m)
with open(self.filename, 'w') as f:
json.dump(data, f, indent=2)
self.mensajes = []
def parse_message(self, message):
msg = {
'sender': message.sender,
'uid': message.original.uid,
'time': message.datetime.strftime('%Y-%m-%d %H:%M:%S'),
'message': message.text
}
self.mensajes.append(msg)
def run(self) -> None:
self.start_turn()
while not self.stop.is_set():
try:
em = self.invalidos.get(timeout=self.frec)
except queue.Empty:
continue
self.parse_message(em)
if self.is_full():
self.save_messages()
[self.parse_message(message) for message in dump_queue(self.invalidos, self.frec)]
self.save_messages()
self.end_turn()
class Borrador(Worker):
"""
Trabajador que borra los correos marcados para borrar
"""
def __init__(self, configs, params):
super(Borrador, self).__init__(configs=configs, params=params)
self.name = 'Email:Borrador'
self.queue = params['queues']['borrar']
self.frec = configs.get('supervisor.wait')
self.max = configs.get('email.max')
self.url = configs.get('email.imap.server')
self.port = configs.get('email.imap.port')
user = {'user': '', 'password': ''}
self.user = SimpleNamespace(**user)
self.user.name = configs.get('email.imap.user.name')
self.user.password = configs.get('email.imap.user.password')
self.ssl = configs.get('email.imap.ssl')
self.borrar = []
def is_full(self):
return len(self.borrar) >= self.max
def do_borrar(self):
with connect(self.url, self.port, self.user.name, self.user.password, self.ssl) as imap:
status, msg = imap.select('INBOX')
if status != 'OK':
return
for uid in self.borrar:
status, ids = imap.uid('store', uid, '+FLAGS', b'\\Deleted')
if status != 'OK':
continue
imap.expunge()
self.borrar = []
def run(self) -> None:
self.start_turn()
while not self.stop.is_set():
try:
uid = self.queue.get(timeout=self.frec)
except queue.Empty:
continue
self.borrar.append(uid)
if self.is_full():
self.do_borrar()
# Cleanup
[self.borrar.append(uid) for uid in dump_queue(self.queue, self.frec)]
self.do_borrar()
self.end_turn()
class Procesador(Worker):
"""
Trabajador que revisa los correos validos y los procesa de acuerdo a las instrucciones
"""
def __init__(self, configs, params):
super(Procesador, self).__init__(configs=configs, params=params)
self.name = 'Email:Procesador'
def run(self) -> None:
self.start_turn()
while not self.stop.is_set():
continue
self.end_turn()