tfb-groupme-bridge/tfbbridge/bridge/views.py

444 lines
15 KiB
Python
Raw Normal View History

2024-11-11 00:54:30 -06:00
from django.http import HttpResponse, HttpResponseBadRequest
from django.shortcuts import redirect, render
from django.urls import reverse
2024-11-11 00:54:30 -06:00
from .models import Organization
import requests
import pickle
import qrcode
import json
import random
2024-11-11 00:54:30 -06:00
bot_name = "tfbbot"
auth_link = "https://oauth.groupme.com/oauth/authorize?client_id=KbStJZciPGivvLuTN9XX6aXf6NHVcxDSiVR0wRC0cqVDuDfG"
2024-11-11 00:54:30 -06:00
def login(request):
return redirect(auth_link)
def add_organization_flow_select(request):
user_group_data = pickle.loads(
bytes.fromhex(
request.session["groupme_groups_pickle"]
))
user_group_data = user_group_data["response"]
if request.method == "GET":
rendered_groups = []
rendered_users = []
for group in user_group_data:
our_id = request.session["groupme_id"]
for user in group["members"]: # check if we're an admin
if user["user_id"] == our_id:
if "admin" in user["roles"]: # we are an admin
rendered_groups.append({
"name": group["name"],
"id": group["group_id"]
})
for user in group["members"]:
rendered_users.append({
"name": user["name"],
"id": user["user_id"]
})
break
# remove duplicate users
# https://stackoverflow.com/questions/9427163/remove-duplicate-dict-in-list-in-python
rendered_users = [i for n, i in enumerate(rendered_users) if i not in rendered_users[n + 1:]]
# sort users by name
rendered_users = sorted(rendered_users, key=lambda x: x["name"].lower())
context = {
"request": request,
"title": "Add organization",
"groups": rendered_groups,
"users": rendered_users
}
return render(request, "create_organization.html", context)
elif request.method == "POST":
name = request.POST["name"]
users = [i[2:] for i in request.POST.keys() if i.startswith("u_")]
groups = [i[2:] for i in request.POST.keys() if i.startswith("g_")]
group_name_mapping = pickle.loads(
bytes.fromhex(
request.session["group_name_mapping"]
))
2024-11-11 00:54:30 -06:00
org = Organization()
org.owner = request.session["groupme_id"]
org.name = name
org.trusted_users = ""
org.channel_info = ""
org.save()
group_data_collected = []
for group in groups:
group_data = { "gid": group }
data = {
"bot": {
"name": bot_name,
"group_id": group,
"active": True,
"callback_url": "https://marching.beepboop.systems/groupme/organization/{}/callback/{}".format(
org.id,
str(random.randint(1, 100000))
),
}
}
r = requests.post(
"https://api.groupme.com/v3/bots?token={}".format(
request.session["groupme_access_token"]
),
data=json.dumps(data),
headers={"Content-Type": "application/json"},
)
data = r.json()
print(r.url)
group_data["bot_id"] = data["response"]["bot"]["bot_id"]
group_data["name"] = group_name_mapping[group_data["gid"]]
group_data["role"] = "trusted"
group_data_collected.append(group_data)
user_data_collected = []
user_name_mappings = pickle.loads(bytes.fromhex(request.session["user_name_mapping"]))
for user in users:
try:
username = user_name_mappings[user]
except KeyError:
username = "unknown"
user_data = { "uid": user, "role": "trusted", "name": username, "number_messages": 0 }
user_data_collected.append(user_data)
org.trusted_users = pickle.dumps(user_data_collected).hex()
org.channel_info = pickle.dumps(group_data_collected).hex()
org.save()
return redirect(reverse("view_organization", kwargs={"org_id": org.id}))
2024-11-11 00:54:30 -06:00
def handle_oauth(request):
request.session["groupme_access_token"] = request.GET.get('access_token')
r = requests.get("https://api.groupme.com/v3/users/me?token={}".format(
request.session["groupme_access_token"]
))
data = r.json()
request.session["groupme_id"] = data["response"]["id"]
request.session["groupme_name"] = data["response"]["name"]
r = requests.get("https://api.groupme.com/v3/groups?token={}&per_page=200".format(
request.session["groupme_access_token"]
))
request.session["groupme_groups_pickle"] = pickle.dumps(r.json()).hex()
request.session["logged_in"] = "yes"
group_name_mapping = {}
for group in r.json()["response"]:
group_name_mapping[group["id"]] = group["name"]
request.session["group_name_mapping"] = pickle.dumps(group_name_mapping).hex()
user_name_mapping = {}
for group in r.json()["response"]:
for user in group["members"]:
user_name_mapping[user["id"]] = user["name"]
user_name_mapping[data["response"]["id"]] = data["response"]["name"]
request.session["user_name_mapping"] = pickle.dumps(user_name_mapping).hex()
2024-11-11 00:54:30 -06:00
try:
if request.session["needs_redirect"]:
response = redirect(reverse(request.session["needs_redirect"]))
del request.session["needs_redirect"]
return response
2024-11-11 00:54:30 -06:00
except KeyError:
pass
return redirect(reverse("index"))
def index(request):
if "logged_in" in request.session:
orgs = Organization.objects.filter(owner=request.session["groupme_id"])
return render(request, "index_auth.html", {
"title": "Home",
"request": request,
"organizations": orgs,
})
else:
return render(request, "index_unauth.html", {
"title": "Home",
})
def refresh_group_data(request):
return redirect(reverse("handle_oauth") + "?access_token={}".format(
request.session["groupme_access_token"]
))
def logout(request):
if "logged_in" in request.session:
request.session.flush()
return render(request, "logged_out.html")
else:
return redirect(index)
def view_organization(request, org_id):
org = Organization.objects.filter(id__exact=org_id)[0]
scheme = 'https' if request.is_secure() else 'http'
hostname = request.get_host()
if request.session["groupme_id"] == org.owner:
org_channel_info = pickle.loads(bytes.fromhex(org.channel_info))
org_trusted_users = pickle.loads(bytes.fromhex(org.trusted_users))
return render(request, "view_organization.html", {
"request": request,
"org": org,
"hostname": hostname,
"scheme": scheme,
"channels": org_channel_info,
"users": org_trusted_users,
})
else:
pass # blow up
def qr_code(request, org_id):
org = Organization.objects.filter(id__exact=org_id)[0]
scheme = 'https' if request.is_secure() else 'http'
hostname = request.get_host()
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data("{}://{}{}".format(
scheme,
hostname,
reverse('add_channel_to_organization', args=[org.id]),
))
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
resp = HttpResponse(
headers={
"Content-Type": "image/png",
}
)
img.save(resp)
return resp
def add_channel_to_organization(request, org_id, trusted=False):
if not request.session["groupme_access_token"]:
request.session["needs_redirect"] = "add_channel_to_organization"
return redirect(reverse("login"))
org = Organization.objects.filter(id__exact=org_id)[0]
if request.method == "GET":
user_group_data = pickle.loads(
bytes.fromhex(
request.session["groupme_groups_pickle"]
))["response"]
our_id = request.session["groupme_id"]
groups_we_admin = []
for group in user_group_data:
for user in group["members"]: # check if we're an admin
if user["user_id"] == our_id:
if "admin" in user["roles"]: # we are an admin
groups_we_admin.append(group)
return render(request, "add_channel_to_organization.html", {
"groups": groups_we_admin,
"org": org,
"request": request,
"trusted": trusted,
})
elif request.method == "POST":
group_name_mapping = pickle.loads(
bytes.fromhex(
request.session["group_name_mapping"]
))
org = Organization.objects.filter(id__exact=org_id)[0]
groups = [i[2:] for i in request.POST.keys() if i.startswith("g_")]
org_group_info = pickle.loads(bytes.fromhex(org.channel_info))
if not trusted:
for group in org_group_info:
if group["gid"] in groups and group["role"] != "trusted":
# prune the groups
groups.remove(group["gid"])
else:
for index, group in enumerate(org_group_info):
if group["gid"] in groups:
org_group_info[index]["role"] = "trusted"
to_add = []
# these are the unique groups
for group in groups:
if not trusted:
data = {
"bot": {
"name": bot_name,
"group_id": group,
"active": True,
}
}
else:
data = {
"bot": {
"name": bot_name,
"group_id": group,
"active": True,
"callback_url": "https://marching.beepboop.systems/groupme/organization/{}/callback/{}".format(
org.id,
str(random.randint(1, 100000))
),
}
}
r = requests.post(
"https://api.groupme.com/v3/bots?token={}".format(
request.session["groupme_access_token"]
),
data=json.dumps(data),
headers={"Content-Type": "application/json"},
)
print(r)
data = r.json()
if not trusted:
to_add.append({
"gid": group,
"bot_id": data["response"]["bot"]["bot_id"],
"name": group_name_mapping[group],
"role": "following",
})
else:
to_add.append({
"gid": group,
"bot_id": data["response"]["bot"]["bot_id"],
"name": group_name_mapping[group],
"role": "trusted",
})
org_group_info += to_add
org.channel_info = pickle.dumps(org_group_info).hex()
org.save()
if request.session["groupme_id"] == org.owner:
return redirect(reverse('view_organization', args=[org.id]))
else:
return render(request, "group_add_succeed.html", {
"request": request
})
def add_trusted_channel(request, org_id):
return add_channel_to_organization(request, org_id, trusted=True)
def delete_channel(request, org_id, chan):
org = Organization.objects.filter(id__exact=org_id)[0]
org_group_info = pickle.loads(bytes.fromhex(org.channel_info))
print(org_group_info)
for index, channel in enumerate(org_group_info):
print(index, channel)
if channel["gid"] == chan:
org_group_info.pop(index)
break
org.channel_info = pickle.dumps(org_group_info).hex()
org.save()
return redirect(reverse('view_organization', args=[org.id]))
def demote_channel(request, org_id, chan):
org = Organization.objects.filter(id__exact=org_id)[0]
org_group_info = pickle.loads(bytes.fromhex(org.channel_info))
for index, channel in enumerate(org_group_info):
print(index, channel)
if channel["gid"] == chan:
org_group_info[index]["role"] = "following"
break
org.channel_info = pickle.dumps(org_group_info).hex()
org.save()
return redirect(reverse('view_organization', args=[org.id]))
def add_trusted_user(request, org_id):
org = Organization.objects.filter(id__exact=org_id)[0]
org_trusted_users = pickle.loads(bytes.fromhex(org.trusted_users))
user_group_data = pickle.loads(
bytes.fromhex(
request.session["groupme_groups_pickle"]
))
user_group_data = user_group_data["response"]
rendered_users = []
for group in user_group_data:
our_id = request.session["groupme_id"]
for user in group["members"]: # check if we're an admin
if user["user_id"] == our_id:
if "admin" in user["roles"]: # we are an admin
for user in group["members"]:
rendered_users.append({
"name": user["name"],
"id": user["user_id"]
})
break
# remove duplicate users
# https://stackoverflow.com/questions/9427163/remove-duplicate-dict-in-list-in-python
rendered_users = [i for n, i in enumerate(rendered_users) if i not in rendered_users[n + 1:]]
if request.method == "GET":
return render(request, "add_user.html", {
"request": request,
"users": rendered_users,
"org": org,
})
elif request.method == "POST":
users = [i[2:] for i in request.POST.keys() if i.startswith("u_")]
user_name_mappings = pickle.loads(bytes.fromhex(request.session["user_name_mapping"]))
for user in users:
try:
username = user_name_mappings[user]
except KeyError:
username = "unknown"
user_data = { "uid": user, "role": "trusted", "name": username, "number_messages": 0 }
org_trusted_users.append(user_data)
org.trusted_users = pickle.dumps(org_trusted_users).hex()
org.save()
return redirect(reverse('view_organization', args=[org.id]))
def remove_user(request, org_id, user):
org = Organization.objects.filter(id__exact=org_id)[0]
org_trusted_users = pickle.loads(bytes.fromhex(org.trusted_users))
2024-11-11 00:54:30 -06:00
for index, tuser in enumerate(org_trusted_users):
print(index, tuser)
if tuser["uid"] == user:
2024-11-11 00:54:30 -06:00
org_trusted_users.pop(index)
break
2024-11-11 00:54:30 -06:00
org.trusted_users = pickle.dumps(org_trusted_users).hex()
org.save()
2024-11-11 00:54:30 -06:00
return redirect(reverse('view_organization', args=[org.id]))