2023-10-09 21:07:55 -05:00
|
|
|
from selenium import webdriver
|
|
|
|
from selenium.webdriver.common.by import By
|
|
|
|
from selenium.webdriver.common.action_chains import ActionChains
|
|
|
|
from selenium.webdriver.support.wait import WebDriverWait
|
|
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
from selenium.common.exceptions import TimeoutException
|
|
|
|
from selenium.common.exceptions import ElementNotInteractableException
|
|
|
|
|
|
|
|
import json
|
2023-10-10 18:38:30 -05:00
|
|
|
import os
|
2023-10-09 21:07:55 -05:00
|
|
|
from types import NoneType
|
|
|
|
|
|
|
|
class MailMessage:
|
2023-10-10 18:38:30 -05:00
|
|
|
def __init__(self, index: str, sender: str, subject: str, maintext: str, acted_upon: bool = True):
|
2023-10-09 21:07:55 -05:00
|
|
|
self.subject = subject
|
2023-10-10 18:38:30 -05:00
|
|
|
self.index = index
|
2023-10-09 21:07:55 -05:00
|
|
|
self.sender = sender
|
|
|
|
self.maintext = maintext
|
2023-10-10 18:38:30 -05:00
|
|
|
self.acted_upon = acted_upon
|
|
|
|
|
|
|
|
self.attrs = "subject index sender maintext acted_upon"
|
|
|
|
self.attrs = self.attrs.split(" ")
|
2023-10-09 21:07:55 -05:00
|
|
|
|
|
|
|
def __eq__(self, other):
|
2023-10-10 18:38:30 -05:00
|
|
|
for attr in self.attrs:
|
|
|
|
if other == getattr(self, attr):
|
|
|
|
return True
|
|
|
|
|
|
|
|
return False
|
2023-10-09 21:07:55 -05:00
|
|
|
|
|
|
|
def serialize(self):
|
|
|
|
return {
|
|
|
|
"subject": self.subject,
|
|
|
|
"sender": self.sender,
|
2023-10-10 18:38:30 -05:00
|
|
|
"index": self.index,
|
2023-10-09 21:07:55 -05:00
|
|
|
"maintext": self.maintext,
|
2023-10-10 18:38:30 -05:00
|
|
|
"acted_upon": self.acted_upon,
|
2023-10-09 21:07:55 -05:00
|
|
|
"date": "", # fill this in later
|
|
|
|
}
|
|
|
|
|
|
|
|
def serialize_to_string(self) -> str:
|
|
|
|
return json.dumps(self.serialize())
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def from_json(cls, json_data):
|
|
|
|
if isinstance(json_data, str):
|
|
|
|
parsed = json.loads(json_data)
|
|
|
|
else:
|
|
|
|
parsed = json_data
|
|
|
|
|
2023-10-10 18:38:30 -05:00
|
|
|
return cls(
|
|
|
|
index = json_data["index"],
|
|
|
|
sender = json_data["sender"],
|
|
|
|
subject = json_data["subject"],
|
|
|
|
maintext = json_data["maintext"],
|
|
|
|
acted_upon = json_data["acted_upon"],
|
|
|
|
)
|
2023-10-09 21:07:55 -05:00
|
|
|
|
|
|
|
class MailProvider:
|
|
|
|
def __init__(self, username: str, password: str, cachefile: NoneType | str = None):
|
|
|
|
self.username: str = username
|
|
|
|
self.password: str = password
|
|
|
|
self.webdriver = None # TODO: fill in type information for this instance
|
|
|
|
self.cachefile = cachefile
|
2023-10-10 18:38:30 -05:00
|
|
|
self.seen_messages = []
|
2023-10-09 21:07:55 -05:00
|
|
|
|
|
|
|
def _get_webdriver(self):
|
|
|
|
self.webdriver = webdriver.Chrome()
|
|
|
|
|
|
|
|
def _get_elem_children(self, elem):
|
|
|
|
return elem.find_elements(By.XPATH, "./child::*")
|
|
|
|
|
|
|
|
def _to_elem(self, xpath):
|
|
|
|
return self.webdriver.find_element(By.XPATH, xpath)
|
|
|
|
|
|
|
|
def _wait_for_elem(self, xpath):
|
|
|
|
try: WebDriverWait(self.webdriver, 20).until(EC.presence_of_element_located((By.XPATH, xpath)))
|
|
|
|
except TimeoutException: pass
|
|
|
|
|
|
|
|
def get_seen_messages(self):
|
|
|
|
try:
|
|
|
|
if not self.cachefile:
|
|
|
|
self.cachefile = self.__class__.__qualname__
|
|
|
|
|
2023-10-10 18:38:30 -05:00
|
|
|
with open("data/{}".format(self.cachefile), "r") as f:
|
2023-10-09 21:07:55 -05:00
|
|
|
loaded = json.load(f)["payload"]
|
|
|
|
for entry in loaded:
|
|
|
|
self.seen_messages.append(MailMessage.from_json(entry))
|
|
|
|
except (
|
|
|
|
FileNotFoundError,
|
|
|
|
json.decoder.JSONDecodeError
|
|
|
|
) as e:
|
|
|
|
self.seen_messages = []
|
|
|
|
|
|
|
|
def write_seen_messages(self):
|
|
|
|
if not self.cachefile:
|
|
|
|
self.cachefile = self.__name__
|
|
|
|
|
2023-10-10 18:38:30 -05:00
|
|
|
if not os.path.exists("data/"):
|
|
|
|
os.mkdir("data/")
|
|
|
|
|
|
|
|
with open("data/{}".format(self.cachefile), "w") as f:
|
2023-10-09 21:07:55 -05:00
|
|
|
data = {"payload": [item.serialize() for item in self.seen_messages]}
|
|
|
|
f.write(
|
|
|
|
json.dumps(
|
|
|
|
data
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
def add_to_seen(self, msg):
|
|
|
|
self.seen_messages.append(msg)
|
|
|
|
|
|
|
|
def is_seen(self, msg):
|
|
|
|
for i in self.seen_messages:
|
|
|
|
if i == msg:
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
def _type_in_elem(self, xpath, typed):
|
|
|
|
target_element = self._to_elem(xpath)
|
|
|
|
ActionChains(self.webdriver) \
|
|
|
|
.move_to_element(target_element) \
|
|
|
|
.send_keys(typed) \
|
|
|
|
.perform()
|
|
|
|
|
|
|
|
def _click_elem(self, xpath):
|
|
|
|
try:
|
|
|
|
target_element = self._to_elem(xpath)
|
|
|
|
except:
|
|
|
|
target_element = xpath
|
|
|
|
|
|
|
|
ActionChains(self.webdriver) \
|
|
|
|
.move_to_element(target_element) \
|
|
|
|
.click(target_element) \
|
|
|
|
.perform()
|