for transmission only; rebase later
This commit is contained in:
parent
f8f642cead
commit
47c60e53b7
@ -0,0 +1,134 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import ClassVar, Any, Optional
|
||||||
|
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class GMUser:
|
||||||
|
name: str
|
||||||
|
userid: str
|
||||||
|
|
||||||
|
x_role: Optional[str] = None # always "trusted"
|
||||||
|
x_messages: Optional[int] = 0
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
if isinstance(other, GMUser):
|
||||||
|
return self.userid == other.userid
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Comparison with non-GMUser objects not allowed")
|
||||||
|
|
||||||
|
def __hash__(self):
|
||||||
|
return self.userid
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sortkey(self):
|
||||||
|
return name
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class GMChannel:
|
||||||
|
name: str
|
||||||
|
chanid: str
|
||||||
|
admins: list[GMUser]
|
||||||
|
members: list[GMUser]
|
||||||
|
|
||||||
|
x_role: Optional[str] = None # either "following" or "trusted"
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class GMBot:
|
||||||
|
name: str
|
||||||
|
chanid: str
|
||||||
|
callback: Optional[str] = None
|
||||||
|
chankey: Optional[str] = None
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class GroupMe:
|
||||||
|
prefix: ClassVar[str] = "https://api.groupme.com/v3"
|
||||||
|
token: str
|
||||||
|
|
||||||
|
def _construct_url(self, endpoint: str, params: dict[str,str] = {}) -> str:
|
||||||
|
params["token"] = self.token
|
||||||
|
return "{}/{}?{}".format(self.prefix, endpoint, urlencode(params))
|
||||||
|
|
||||||
|
_cons_url: ClassVar[Any] = _construct_url
|
||||||
|
|
||||||
|
def me(self):
|
||||||
|
r = requests.get(
|
||||||
|
self._cons_url("users/me")
|
||||||
|
)
|
||||||
|
data = r.json()["response"]
|
||||||
|
|
||||||
|
return GMUser(data["name"], data["id"])
|
||||||
|
|
||||||
|
def groups(self):
|
||||||
|
r = requests.get(
|
||||||
|
self._cons_url("groups", { "per_page": 200 }),
|
||||||
|
)
|
||||||
|
data = r.json()["response"]
|
||||||
|
|
||||||
|
output = []
|
||||||
|
for group in data:
|
||||||
|
members = []
|
||||||
|
admins = []
|
||||||
|
|
||||||
|
for member in group["members"]:
|
||||||
|
new_member = GMUser(member["name"], member["user_id"])
|
||||||
|
|
||||||
|
if "admin" in member["roles"]:
|
||||||
|
admins.append(new_member)
|
||||||
|
members.append(new_member)
|
||||||
|
|
||||||
|
output.append(GMChannel(
|
||||||
|
group["name"],
|
||||||
|
group["group_id"],
|
||||||
|
admins,
|
||||||
|
members
|
||||||
|
))
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
def register_bot(self, bot: GMBot):
|
||||||
|
data = {
|
||||||
|
"bot": {
|
||||||
|
"name": bot.name,
|
||||||
|
"group_id": bot.chanid,
|
||||||
|
"active": True,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if bot.callback:
|
||||||
|
data["bot"]["callback_url"] = bot.callback
|
||||||
|
|
||||||
|
r = requests.post(
|
||||||
|
self._cons_url("bots"),
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers={"Content-Type": "application/json"},
|
||||||
|
)
|
||||||
|
data = r.json()["response"]["bot"]
|
||||||
|
|
||||||
|
name = data["name"]
|
||||||
|
chanid = data["group_id"]
|
||||||
|
chankey = data["bot_id"]
|
||||||
|
callback = data["callback_url"]
|
||||||
|
|
||||||
|
return GMBot(name, chanid, callback, chankey)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
api = GroupMe(input("token? "))
|
||||||
|
me = api.me()
|
||||||
|
groups = api.groups()
|
||||||
|
|
||||||
|
for group in groups:
|
||||||
|
if me in group.admins:
|
||||||
|
name = group.name
|
||||||
|
if name == "another group":
|
||||||
|
bot = GMBot(
|
||||||
|
"tfbbot",
|
||||||
|
group.chanid,
|
||||||
|
"https://marching.beepboop.systems",
|
||||||
|
)
|
||||||
|
bot = api.register_bot(bot)
|
||||||
|
|
||||||
|
print(bot)
|
||||||
|
break
|
@ -14,3 +14,16 @@ def remove_dup_dict_in_list(l):
|
|||||||
|
|
||||||
def get_org_by_id(org_id):
|
def get_org_by_id(org_id):
|
||||||
return Organization.objects.filter(id__exact=org_id)[0]
|
return Organization.objects.filter(id__exact=org_id)[0]
|
||||||
|
|
||||||
|
def get_user_from_user_id(user_id, users):
|
||||||
|
for user in users:
|
||||||
|
if user.userid == user_id:
|
||||||
|
return user
|
||||||
|
|
||||||
|
def get_group_from_group_id(group_id, groups):
|
||||||
|
for group in groups:
|
||||||
|
if group.groupid == group_id:
|
||||||
|
return group
|
||||||
|
|
||||||
|
def org_filter(org, **kwargs):
|
||||||
|
return Organization.objects.filter(**kwargs)
|
||||||
|
@ -7,7 +7,9 @@ import pickle
|
|||||||
import qrcode
|
import qrcode
|
||||||
import json
|
import json
|
||||||
import random
|
import random
|
||||||
from .utils import to_pickle, from_pickle, remove_dup_dict_in_list, get_org_by_id
|
from itertools import chain
|
||||||
|
from .utils import *
|
||||||
|
from .groupme import GroupMe
|
||||||
|
|
||||||
bot_name = "tfbbot"
|
bot_name = "tfbbot"
|
||||||
auth_link = "https://oauth.groupme.com/oauth/authorize?client_id=KbStJZciPGivvLuTN9XX6aXf6NHVcxDSiVR0wRC0cqVDuDfG"
|
auth_link = "https://oauth.groupme.com/oauth/authorize?client_id=KbStJZciPGivvLuTN9XX6aXf6NHVcxDSiVR0wRC0cqVDuDfG"
|
||||||
@ -114,34 +116,18 @@ def add_organization_flow_select(request):
|
|||||||
return redirect(reverse("view_organization", kwargs={"org_id": org.id}))
|
return redirect(reverse("view_organization", kwargs={"org_id": org.id}))
|
||||||
|
|
||||||
def handle_oauth(request):
|
def handle_oauth(request):
|
||||||
request.session["groupme_access_token"] = request.GET.get('access_token')
|
request.session["token"] = request.GET.get('access_token')
|
||||||
|
api = GroupMe(request.session["token"])
|
||||||
|
|
||||||
r = requests.get("https://api.groupme.com/v3/users/me?token={}".format(
|
me = api.me()
|
||||||
request.session["groupme_access_token"]
|
groups = api.groups()
|
||||||
))
|
|
||||||
|
|
||||||
data = r.json()
|
request.session["groups"] = to_pickle(groups)
|
||||||
request.session["groupme_id"] = data["response"]["id"]
|
|
||||||
request.session["groupme_name"] = data["response"]["name"]
|
|
||||||
|
|
||||||
r = requests.get("https://api.groupme.com/v3/groups?token={}&per_page=200".format(
|
|
||||||
request.session["groupme_access_token"]
|
|
||||||
))
|
|
||||||
|
|
||||||
request.session["groupme_groups_pickle"] = to_pickle(r.json())
|
|
||||||
request.session["logged_in"] = "yes"
|
request.session["logged_in"] = "yes"
|
||||||
|
users = list(set(chain(
|
||||||
group_name_mapping = {}
|
[group.members for group in groups]
|
||||||
for group in r.json()["response"]:
|
)))
|
||||||
group_name_mapping[group["id"]] = group["name"]
|
request.session["users"] = to_pickle(users)
|
||||||
request.session["group_name_mapping"] = to_pickle(group_name_mapping)
|
|
||||||
|
|
||||||
user_name_mapping = {}
|
|
||||||
for group in r.json()["response"]:
|
|
||||||
for user in group["members"]:
|
|
||||||
user_name_mapping[user["id"]] = user["name"]
|
|
||||||
user_name_mapping[data["response"]["id"]] = data["response"]["name"]
|
|
||||||
request.session["user_name_mapping"] = to_pickle(user_name_mapping)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if request.session["needs_redirect"]:
|
if request.session["needs_redirect"]:
|
||||||
|
Loading…
Reference in New Issue
Block a user