diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..42aa1dd --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +secrets.py +ProtonWebmail \ No newline at end of file diff --git a/common.py b/common.py new file mode 100644 index 0000000..97952a6 --- /dev/null +++ b/common.py @@ -0,0 +1,115 @@ +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.common.action_chains import ActionChains +from selenium.webdriver.support.wait import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.common.exceptions import TimeoutException +from selenium.common.exceptions import ElementNotInteractableException + +import json +from types import NoneType + +class MailMessage: + def __init__(self, subject: str, sender: str, maintext: str): + self.subject = subject + self.sender = sender + self.maintext = maintext + + def __eq__(self, other): + return other == self.subject or other == self.sender or other == self.maintext + + def serialize(self): + return { + "subject": self.subject, + "sender": self.sender, + "maintext": self.maintext, + "date": "", # fill this in later + } + + def serialize_to_string(self) -> str: + return json.dumps(self.serialize()) + + @classmethod + def from_json(cls, json_data): + if isinstance(json_data, str): + parsed = json.loads(json_data) + else: + parsed = json_data + + return cls(json_data["subject"], json_data["sender"], json_data["maintext"]) + +class MailProvider: + def __init__(self, username: str, password: str, cachefile: NoneType | str = None): + self.username: str = username + self.password: str = password + self.sessionmessages = [] + self.webdriver = None # TODO: fill in type information for this instance + self.cachefile = cachefile + + def _get_webdriver(self): + self.webdriver = webdriver.Chrome() + + def _get_elem_children(self, elem): + return elem.find_elements(By.XPATH, "./child::*") + + def _to_elem(self, xpath): + return self.webdriver.find_element(By.XPATH, xpath) + + def _wait_for_elem(self, xpath): + try: WebDriverWait(self.webdriver, 20).until(EC.presence_of_element_located((By.XPATH, xpath))) + except TimeoutException: pass + + def get_seen_messages(self): + try: + if not self.cachefile: + self.cachefile = self.__class__.__qualname__ + + with open(self.cachefile, "r") as f: + loaded = json.load(f)["payload"] + for entry in loaded: + self.seen_messages.append(MailMessage.from_json(entry)) + except ( + FileNotFoundError, + json.decoder.JSONDecodeError + ) as e: + self.seen_messages = [] + + def write_seen_messages(self): + if not self.cachefile: + self.cachefile = self.__name__ + + with open(self.cachefile, "w") as f: + data = {"payload": [item.serialize() for item in self.seen_messages]} + print(data) + f.write( + json.dumps( + data + ) + ) + + def add_to_seen(self, msg): + self.seen_messages.append(msg) + + def is_seen(self, msg): + for i in self.seen_messages: + if i == msg: + return True + return False + + def _type_in_elem(self, xpath, typed): + target_element = self._to_elem(xpath) + ActionChains(self.webdriver) \ + .move_to_element(target_element) \ + .send_keys(typed) \ + .perform() + + def _click_elem(self, xpath): + try: + target_element = self._to_elem(xpath) + except: + target_element = xpath + + ActionChains(self.webdriver) \ + .move_to_element(target_element) \ + .click(target_element) \ + .perform() \ No newline at end of file diff --git a/driver.py b/driver.py new file mode 100644 index 0000000..0fe47db --- /dev/null +++ b/driver.py @@ -0,0 +1,4 @@ +from proton import ProtonWebmail +from secrets import proton, gmail + +print(ProtonWebmail(proton.username, proton.password).get()) \ No newline at end of file diff --git a/proton.py b/proton.py new file mode 100644 index 0000000..885b59e --- /dev/null +++ b/proton.py @@ -0,0 +1,59 @@ +from selenium import webdriver +from selenium.webdriver.common.by import By +from common import MailProvider, MailMessage +import json +from time import sleep + +class ProtonWebmail(MailProvider): + def transform_message_header(self, header): + return header.replace('\n', ' - ').replace('Unread email - ', '') + + def get(self): + self.messages_failed = 0 + xpaths = { + "username_box": "/html/body/div[1]/div[4]/div[1]/main/div[1]/div[2]/form/div[2]/div[1]/div/div/input", + "password_box": "/html/body/div[1]/div[4]/div[1]/main/div[1]/div[2]/form/div[3]/div[1]/div/div[1]/input", + "sign_in": "/html/body/div[1]/div[4]/div[1]/main/div[1]/div[2]/form/button", + "messages": "/html/body/div[1]/div[3]/div/div[2]/div/div[2]/div/div/div/main/div/div/div/div/div/div[2]/div[2]", + "messagebody": "/html/body/div[1]/div[3]/div/div[2]/div/div[2]/div/div/div/main/div/div/section/div/div[3]/div/div/article/div[2]", + "backbutton": "//*[text()[contains(., 'Back')]]", + } + self.get_seen_messages() + self._get_webdriver() + self.webdriver.get("https://account.proton.me/login") + self._wait_for_elem(xpaths["username_box"]) + sleep(0.5) + self._type_in_elem(xpaths["username_box"], self.username) + self._click_elem(xpaths["password_box"]) + self._type_in_elem(xpaths["password_box"], self.password) + self._click_elem(xpaths["sign_in"]) + self._wait_for_elem(xpaths["messages"]) + count = 0 + for i in self._get_elem_children(self._to_elem(xpaths["messages"])): + + if not self.is_seen(i.text): + count += 1 + if count == 3: + break + text = i.text + # we can interact with it, just selenium doesn't like it + try: + self._click_elem(i) + except ElementNotInteractableException: + pass + self._wait_for_elem(xpaths["messagebody"]) + sleep(5) + self.webdriver.switch_to.frame(self._to_elem(xpaths["messagebody"]).find_elements(By.XPATH, "//iframe")[0]) + message = MailMessage( + self.transform_message_header(text), + "", + self.webdriver.page_source, + ) + self.add_to_seen(message) + self.webdriver.switch_to.default_content() + sleep(2) + self._click_elem(xpaths["backbutton"]) + sleep(2) + + self.write_seen_messages() + self.webdriver.quit() \ No newline at end of file diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..5c816db --- /dev/null +++ b/shell.nix @@ -0,0 +1,5 @@ +{ pkgs ? import {} }: + pkgs.mkShell { + # nativeBuildInputs is usually what you want -- tools you need to run + nativeBuildInputs = with pkgs; [ buildPackages.python311Packages.selenium buildPackages.python311Packages.beautifulsoup4 chromedriver ]; +}