add message fetching; stub email sending

This commit is contained in:
stupidcomputer 2025-01-11 15:33:07 -06:00
parent 408d87366e
commit 5ff6fc7ca2
3 changed files with 165 additions and 0 deletions

1
pcomon/__init__.py Normal file
View File

@ -0,0 +1 @@

60
pcomon/entry.py Normal file
View File

@ -0,0 +1,60 @@
from pcomon.messages import MessageDeliverer, Message
import json
import sys
read_messages = "/run/test/messages"
def get_creds(filename: str) -> dict[str, str]:
with open(filename, "r") as creds_handle:
lines = creds_handle.readlines()
creds = [line.rstrip().split("=") for line in lines]
creds = {k: v for k, v in creds}
return creds
def get_messages_read(from_file: str = read_messages) -> set[str]:
try:
with open(from_file, "r") as read_messages_handle:
read_messages: list[str] = json.loads(read_messages_handle.read())
read_messages: set[str] = set(read_messages)
except (FileNotFoundError, json.JSONDecodeError):
return set()
return read_messages
def write_read_messages(read_messages: list[str], from_file: str = read_messages) -> None:
with open(from_file, "w+") as read_messages_handle:
read_messages_handle.write(
json.dumps(
list(read_messages)
)
)
def send_email(msg: Message) -> None:
print(msg)
def entry():
creds_file = sys.argv[1]
creds = get_creds(creds_file)
deliverer = MessageDeliverer(
app_id=creds["PCO_APP_ID"],
app_secret=creds["PCO_APP_SECRET"],
person_id=creds["PCO_PERSON_ID"],
)
deliverer.fetch_messages()
read_messages = get_messages_read()
unread = deliverer.get_unread_messages(read_messages)
for message in unread:
message: Message = deliverer.id_dictionary[message]
send_email(message)
read_messages.add(message.message_id)
write_read_messages(read_messages)
if __name__ == "__main__":
entry()

104
pcomon/messages.py Normal file
View File

@ -0,0 +1,104 @@
import requests
from requests.auth import HTTPBasicAuth
from email.parser import BytesParser
from email.message import Message
from email.header import Header
from email.utils import formataddr
from email.mime.text import MIMEText
from dataclasses import dataclass
from typing import ClassVar
api_domain = "https://api.planningcenteronline.com"
@dataclass
class Message:
eml_content: str
message_id: str
from_name: str
subject: str
@property
def email(self):
parsed_email: Message = BytesParser(self.eml_content)
email_text = parsed_email.get_body(
preference_list=("plain", "html")
)
message = Message()
msg['From'] = 'pcomon@mail.beepboop.systems'
msg['From'] = formataddr((
str(Header(self.from_name + " (via pcomon)", "utf-8")), "pcomon@mail.beepboop.systems"
))
msg['To'] = 'ryan@beepboop.systems'
msg['Subject'] = self.subject
msg.add_header("Content-Type", "text")
msg.set_payload(email_text)
@dataclass
class MessageDeliverer:
app_id: str
app_secret: str
person_id: str
messages: ClassVar[list[Message]] = []
has_fetched: ClassVar[bool] = False
@property
def http_auth(self) -> HTTPBasicAuth:
return HTTPBasicAuth(self.app_id, self.app_secret)
def fetch_messages(self) -> None:
r = requests.get(
"{}/people/v2/people/{}/messages".format(api_domain, self.person_id),
auth=self.http_auth
)
messages = r.json()["data"]
for message in messages:
msg_eml_content = requests.get(message["attributes"]["file"]["url"]).text
self.messages.append(
Message(
eml_content=msg_eml_content,
message_id=message["id"],
from_name=message["attributes"]["from_name"],
subject=message["attributes"]["subject"],
)
)
self.has_fetched = True
@property
def id_dictionary(self) -> dict[str, Message]:
if not self.has_fetched:
return ValueError("you need to call self.fetch_messages() first")
return {message.message_id: message for message in self.messages}
@property
def message_ids(self) -> set[str]:
if not self.has_fetched:
raise ValueError("you need to call self.fetch_messages() first")
return set(self.id_dictionary.keys())
def get_unread_messages(self, read: set[str]) -> set[str]:
return self.message_ids.difference(read)
def main():
import os
env = os.environ
app_id = env["PCO_APP_ID"]
app_secret = env["PCO_APP_SECRET"]
person_id = env["PCO_PERSON_ID"]
deliverer = MessageDeliverer(app_id, app_secret, person_id)
deliverer.fetch_messages()
print(deliverer.messages)
if __name__ == "__main__":
main()