# for licensing information, please see LICENSE.md and README.md # copyright (c) rndusr, aka randomuser 2023 from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException import json import os import math import datetime from types import NoneType class ToSend: def __init__(self, in_response_to_id: int, body: str): self.in_response_to_id: int = in_response_to_id self.body: str = body def serialize(self): return { "resp_to_id": self.in_response_to_id, "body": self.body, } def seriaize_to_string(self): return json.dumps(self.serialize()) @classmethod def from_json(cls, json_data): return cls( in_response_to_id = json_data["resp_to_id"], body = json_data["body"], ) class MailMessage: def __init__(self, index: str, sender: str, subject: str, maintext: str, id = None, acted_upon: bool = False): self.subject = subject self.index = index self.sender = sender self.maintext = maintext self.acted_upon = acted_upon if not id: self.id = int(math.floor(datetime.datetime.now().timestamp() * 1000000)) else: self.id = id self.attrs = "subject index sender maintext acted_upon" self.attrs = self.attrs.split(" ") def __eq__(self, other): for attr in self.attrs: if other == getattr(self, attr): return True return False def serialize(self): return { "subject": self.subject, "sender": self.sender, "index": self.index, "maintext": self.maintext, "id": self.id, "acted_upon": self.acted_upon, "date": "", # fill this in later } def serialize_to_string(self) -> str: return json.dumps(self.serialize()) @classmethod def from_json(cls, json_data): if isinstance(json_data, str): parsed = json.loads(json_data) else: parsed = json_data return cls( index = json_data["index"], sender = json_data["sender"], subject = json_data["subject"], maintext = json_data["maintext"], acted_upon = json_data["acted_upon"], id = json_data["id"], ) class MailProvider: def __init__(self, username: str, password: str, cachefile: NoneType | str = None): self.username: str = username self.password: str = password self.webdriver = None # TODO: fill in type information for this instance self.cachefile = cachefile self.to_send = [] self.seen_messages = [] def _get_webdriver(self): self.webdriver = webdriver.Chrome() def _get_elem_children(self, elem): return elem.find_elements(By.XPATH, "./child::*") def _to_elem(self, xpath): return self.webdriver.find_element(By.XPATH, xpath) def _wait_for_elem(self, xpath): try: WebDriverWait(self.webdriver, 20).until(EC.presence_of_element_located((By.XPATH, xpath))) except TimeoutException: pass def read_data(self): try: if not self.cachefile: self.cachefile = self.__class__.__qualname__ with open("data/{}".format(self.cachefile), "r") as f: loaded = json.load(f)["payload"] for entry in loaded: self.seen_messages.append(MailMessage.from_json(entry)) except ( FileNotFoundError, json.decoder.JSONDecodeError ) as e: self.seen_messages = [] def write_data(self): if not self.cachefile: self.cachefile = self.__class__.__qualname__ if not os.path.exists("data/"): os.mkdir("data/") with open("data/{}".format(self.cachefile), "w") as f: data = { "payload": [item.serialize() for item in self.seen_messages] } f.write( json.dumps( data ) ) def search_by_id(self, id): for i in self.seen_messages: if i.id == id: return i def add_to_seen(self, msg): self.seen_messages.append(msg) def add_message_response(self, response): self.responses.append(response) def is_seen(self, msg): for i in self.seen_messages: if i == msg: return True return False def _type_in_elem(self, xpath, typed): target_element = self._to_elem(xpath) ActionChains(self.webdriver) \ .move_to_element(target_element) \ .send_keys(typed) \ .perform() def _click_elem(self, xpath): try: target_element = self._to_elem(xpath) except: target_element = xpath ActionChains(self.webdriver) \ .move_to_element(target_element) \ .click(target_element) \ .perform()