mail-sync/common.py

175 lines
5.1 KiB
Python

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import json
import os
import math
import datetime
from types import NoneType
class ToSend:
def __init__(self, in_response_to_id: int, body: str):
self.in_response_to_id: int = in_response_to_id
self.body: str = body
def serialize(self):
return {
"resp_to_id": self.in_response_to_id,
"body": self.body,
}
def seriaize_to_string(self):
return json.dumps(self.serialize())
@classmethod
def from_json(cls, json_data):
return cls(
in_response_to_id = json_data["resp_to_id"],
body = json_data["body"],
)
class MailMessage:
def __init__(self, index: str, sender: str, subject: str, maintext: str, id = None, acted_upon: bool = False):
self.subject = subject
self.index = index
self.sender = sender
self.maintext = maintext
self.acted_upon = acted_upon
if not id:
self.id = int(math.floor(datetime.datetime.now().timestamp() * 1000000))
else:
self.id = id
self.attrs = "subject index sender maintext acted_upon"
self.attrs = self.attrs.split(" ")
def __eq__(self, other):
for attr in self.attrs:
if other == getattr(self, attr):
return True
return False
def serialize(self):
return {
"subject": self.subject,
"sender": self.sender,
"index": self.index,
"maintext": self.maintext,
"id": self.id,
"acted_upon": self.acted_upon,
"date": "", # fill this in later
}
def serialize_to_string(self) -> str:
return json.dumps(self.serialize())
@classmethod
def from_json(cls, json_data):
if isinstance(json_data, str):
parsed = json.loads(json_data)
else:
parsed = json_data
return cls(
index = json_data["index"],
sender = json_data["sender"],
subject = json_data["subject"],
maintext = json_data["maintext"],
acted_upon = json_data["acted_upon"],
id = json_data["id"],
)
class MailProvider:
def __init__(self, username: str, password: str, cachefile: NoneType | str = None):
self.username: str = username
self.password: str = password
self.webdriver = None # TODO: fill in type information for this instance
self.cachefile = cachefile
self.to_send = []
self.seen_messages = []
def _get_webdriver(self):
self.webdriver = webdriver.Chrome()
def _get_elem_children(self, elem):
return elem.find_elements(By.XPATH, "./child::*")
def _to_elem(self, xpath):
return self.webdriver.find_element(By.XPATH, xpath)
def _wait_for_elem(self, xpath):
try: WebDriverWait(self.webdriver, 20).until(EC.presence_of_element_located((By.XPATH, xpath)))
except TimeoutException: pass
def read_data(self):
try:
if not self.cachefile:
self.cachefile = self.__class__.__qualname__
with open("data/{}".format(self.cachefile), "r") as f:
loaded = json.load(f)["payload"]
for entry in loaded:
self.seen_messages.append(MailMessage.from_json(entry))
except (
FileNotFoundError,
json.decoder.JSONDecodeError
) as e:
self.seen_messages = []
def write_data(self):
if not self.cachefile:
self.cachefile = self.__class__.__qualname__
if not os.path.exists("data/"):
os.mkdir("data/")
with open("data/{}".format(self.cachefile), "w") as f:
data = {
"payload": [item.serialize() for item in self.seen_messages]
}
f.write(
json.dumps(
data
)
)
def search_by_id(self, id):
for i in self.seen_messages:
if i.id == id:
return i
def add_to_seen(self, msg):
self.seen_messages.append(msg)
def add_message_response(self, response):
self.responses.append(response)
def is_seen(self, msg):
for i in self.seen_messages:
if i == msg:
return True
return False
def _type_in_elem(self, xpath, typed):
target_element = self._to_elem(xpath)
ActionChains(self.webdriver) \
.move_to_element(target_element) \
.send_keys(typed) \
.perform()
def _click_elem(self, xpath):
try:
target_element = self._to_elem(xpath)
except:
target_element = xpath
ActionChains(self.webdriver) \
.move_to_element(target_element) \
.click(target_element) \
.perform()