From 194797b8560d2c1e93d9dcd8fceecb7aeee29421 Mon Sep 17 00:00:00 2001 From: randomuser Date: Wed, 11 Oct 2023 23:00:26 -0500 Subject: [PATCH] sync replies back to the other side of the proxy --- common.py | 52 +++++++++++++++++++++++++++++++---- driver.py | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- proton.py | 47 +++++++++++++++++++++++++++++-- shell.nix | 2 +- 4 files changed, 171 insertions(+), 12 deletions(-) diff --git a/common.py b/common.py index c2bc611..1508efb 100644 --- a/common.py +++ b/common.py @@ -4,20 +4,47 @@ from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException -from selenium.common.exceptions import ElementNotInteractableException import json import os +import math +import datetime from types import NoneType +class ToSend: + def __init__(self, in_response_to_id: int, body: str): + self.in_response_to_id: int = in_response_to_id + self.body: str = body + + def serialize(self): + return { + "resp_to_id": self.in_response_to_id, + "body": self.body, + } + + def seriaize_to_string(self): + return json.dumps(self.serialize()) + + @classmethod + def from_json(cls, json_data): + return cls( + in_response_to_id = json_data["resp_to_id"], + body = json_data["body"], + ) + class MailMessage: - def __init__(self, index: str, sender: str, subject: str, maintext: str, acted_upon: bool = True): + def __init__(self, index: str, sender: str, subject: str, maintext: str, id = None, acted_upon: bool = False): self.subject = subject self.index = index self.sender = sender self.maintext = maintext self.acted_upon = acted_upon + if not id: + self.id = int(math.floor(datetime.datetime.now().timestamp() * 1000000)) + else: + self.id = id + self.attrs = "subject index sender maintext acted_upon" self.attrs = self.attrs.split(" ") @@ -34,6 +61,7 @@ class MailMessage: "sender": self.sender, "index": self.index, "maintext": self.maintext, + "id": self.id, "acted_upon": self.acted_upon, "date": "", # fill this in later } @@ -54,6 +82,7 @@ class MailMessage: subject = json_data["subject"], maintext = json_data["maintext"], acted_upon = json_data["acted_upon"], + id = json_data["id"], ) class MailProvider: @@ -62,6 +91,7 @@ class MailProvider: self.password: str = password self.webdriver = None # TODO: fill in type information for this instance self.cachefile = cachefile + self.to_send = [] self.seen_messages = [] def _get_webdriver(self): @@ -77,7 +107,7 @@ class MailProvider: try: WebDriverWait(self.webdriver, 20).until(EC.presence_of_element_located((By.XPATH, xpath))) except TimeoutException: pass - def get_seen_messages(self): + def read_data(self): try: if not self.cachefile: self.cachefile = self.__class__.__qualname__ @@ -92,24 +122,34 @@ class MailProvider: ) as e: self.seen_messages = [] - def write_seen_messages(self): + def write_data(self): if not self.cachefile: - self.cachefile = self.__name__ + self.cachefile = self.__class__.__qualname__ if not os.path.exists("data/"): os.mkdir("data/") with open("data/{}".format(self.cachefile), "w") as f: - data = {"payload": [item.serialize() for item in self.seen_messages]} + data = { + "payload": [item.serialize() for item in self.seen_messages] + } f.write( json.dumps( data ) ) + + def search_by_id(self, id): + for i in self.seen_messages: + if i.id == id: + return i def add_to_seen(self, msg): self.seen_messages.append(msg) + def add_message_response(self, response): + self.responses.append(response) + def is_seen(self, msg): for i in self.seen_messages: if i == msg: diff --git a/driver.py b/driver.py index 9fc2f02..087d869 100644 --- a/driver.py +++ b/driver.py @@ -1,4 +1,82 @@ from proton import ProtonWebmail -from secrets import proton, gmail +from common import ToSend +from secrets import proton, gmail, mailserv -ProtonWebmail(proton.username, proton.password).get() \ No newline at end of file +import smtplib, ssl +import imaplib +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +import email +from email.header import decode_header + +port = 465 +password = mailserv.managedpassword +ctx = ssl.create_default_context() + +imap = imaplib.IMAP4_SSL(mailserv.hostname) +imap.login(mailserv.managedusername, mailserv.managedpassword) + +# Initialize an array to store the dictionaries of message details +unread_messages = [] +# Select the current mailbox +# List all available mailboxes +result, mailbox_data = imap.list() +# Iterate through the list of mailboxes +for mailbox_info in mailbox_data: + # Parse the mailbox name from the list response + mailbox_name = mailbox_info.decode().split('"')[-1].lstrip() + + # Select the current mailbox + status, _ = imap.select(mailbox_name) + if status == "NO": + continue + + # Search for all unread messages in the current mailbox + result, data = imap.search(None, "ALL") + + for imap_id in data[0].split(): + result, message_data = imap.fetch(imap_id, '(RFC822)') + raw_email = message_data[0][1] + email_message = email.message_from_bytes(raw_email) + subject, encoding = decode_header(email_message['Subject'])[0] + subject = subject.decode(encoding) if encoding else subject + body = email_message.get_payload() + + try: + in_response_to_id = int( + subject \ + .split("kb_ident_")[-1] \ + .replace(")", "") + ) + except (IndexError, ValueError) as e: + imap.store(imap_id, '+FLAGS', '\\Deleted') + continue + + + unread_messages.append(ToSend( + in_response_to_id, + email_message.get_payload(), + )) + + imap.store(imap_id, '+FLAGS', '\\Deleted') + + +imap.expunge() +imap.logout() + +proton_mail = ProtonWebmail(proton.username, proton.password) +proton_mail.to_send = unread_messages +proton_mail.get() +with smtplib.SMTP_SSL("mail." + mailserv.hostname, port, context=ctx) as server: + server.login(mailserv.managedusername, password) + for message in proton_mail.seen_messages: + if not message.acted_upon: + mime_message = MIMEMultipart("alternative") + html = MIMEText(message.maintext, "html") + mime_message["Subject"] = "{} (from {}, kb_ident_{})".format(message.subject, message.sender, str(message.id)) + mime_message["From"] = mailserv.managedusername + mime_message["To"] = mailserv.targetusername + mime_message.attach(html) + server.sendmail(mailserv.managedusername, mailserv.targetusername, mime_message.as_string()) + message.acted_upon = True + proton_mail.write_data() \ No newline at end of file diff --git a/proton.py b/proton.py index 985421d..57656ba 100644 --- a/proton.py +++ b/proton.py @@ -1,6 +1,8 @@ from selenium import webdriver from selenium.webdriver.common.by import By from common import MailProvider, MailMessage +from selenium.common.exceptions import TimeoutException +from selenium.common.exceptions import ElementNotInteractableException import json from time import sleep @@ -12,6 +14,9 @@ class ProtonWebmail(MailProvider): "messages": "/html/body/div[1]/div[3]/div/div[2]/div/div[2]/div/div/div/main/div/div/div/div/div/div[2]/div[2]", "messagebody": "/html/body/div[1]/div[3]/div/div[2]/div/div[2]/div/div/div/main/div/div/section/div/div[3]/div/div/article/div[2]", "backbutton": "//*[text()[contains(., 'Back')]]", + "reply": "/html/body/div[1]/div[3]/div/div[2]/div/div[2]/div/div/div/main/div/div/section/div/div[3]/div/div/article/div[1]/div[4]/div[2]/button[1]", + "replytextarea": "/html/body/div[1]/div[4]/div/div/div/div/section/div/div[1]/div/div/textarea", + "replysubmit": "/html/body/div[1]/div[4]/div/div/div/footer/div/div[1]/button[1]", } def transform_message_header(self, header): @@ -19,7 +24,7 @@ class ProtonWebmail(MailProvider): def get(self): self.messages_failed = 0 - self.get_seen_messages() + self.read_data() self._get_webdriver() self.webdriver.get("https://account.proton.me/login") self._wait_for_elem(self.xpaths["username_box"]) @@ -57,8 +62,44 @@ class ProtonWebmail(MailProvider): sleep(2) self._click_elem(self.xpaths["backbutton"]) sleep(2) + else: + # check if we have to reply to this email + print("checking if we have to reply") - self.write_seen_messages() + search = self.transform_message_header(i.text) + selected = None + for message in self.seen_messages: + if message.index == search: + selected = message + + if not selected: + continue + + for message_to_send in self.to_send: + print('loop iter 1') + print(message_to_send.in_response_to_id, selected.id) + if message_to_send.in_response_to_id == selected.id: + # we do have to reply + print("sending reply to message id {}".format(str(selected.id))) + try: + self._click_elem(i) + except ElementNotInteractableException: + pass + self._wait_for_elem(self.xpaths["messagebody"]) + sleep(5) + + self._click_elem(self.xpaths["reply"]) + sleep(1) + replytextarea = self._to_elem(self.xpaths["replytextarea"]) + replytextarea.clear() + replytextarea.send_keys(message_to_send.body) + sleep(1) + self._click_elem(self.xpaths["replysubmit"]) + sleep(2) + self._click_elem(self.xpaths["backbutton"]) + sleep(2) + + self.write_data() self.webdriver.quit() - return self.seen_messages \ No newline at end of file + return self \ No newline at end of file diff --git a/shell.nix b/shell.nix index 5c816db..4323514 100644 --- a/shell.nix +++ b/shell.nix @@ -1,5 +1,5 @@ { pkgs ? import {} }: pkgs.mkShell { # nativeBuildInputs is usually what you want -- tools you need to run - nativeBuildInputs = with pkgs; [ buildPackages.python311Packages.selenium buildPackages.python311Packages.beautifulsoup4 chromedriver ]; + nativeBuildInputs = with pkgs; [ buildPackages.python311Packages.selenium buildPackages.python311Packages.beautifulsoup4 buildPackages.python311Packages.imap-tools chromedriver ]; }