Initial commit

This commit is contained in:
Miłosz Guglas
2021-09-18 15:30:56 +02:00
committed by --local
commit 9c15227cbf
27 changed files with 523 additions and 0 deletions

34
src/app.py Normal file
View File

@@ -0,0 +1,34 @@
from .config.credentials import bot_token, app_url, port
from .config.handlers import handlers
from .handlers.handlerInterface import HandlerInterface
from telegram.ext.dispatcher import Dispatcher
from telegram.ext import Updater
class App:
updater: Updater
dispatcher: Dispatcher
def __init__(self):
self.updater = Updater(bot_token)
def run(self) -> None:
self.registerHandlers()
self.registerWebhook()
self.updater.idle()
def registerHandlers(self) -> None:
for handler in handlers:
if not isinstance(handler, HandlerInterface):
raise Exception('Invalid list of handlers provided. Handler must implement HandlerInterface')
self.updater.dispatcher.add_handler(handler.getBotHandler())
def registerWebhook(self) -> None:
self.updater.start_webhook(
listen="0.0.0.0",
port=int(port),
url_path=bot_token,
webhook_url=f'{app_url}/{bot_token}'
)

8
src/config/contents.py Normal file
View File

@@ -0,0 +1,8 @@
import re
# These are MarkdownV2 python-telegram-bot specific
opted_in_successfully = re.escape('You have opted-in for everyone-mentions.')
opted_in_failed = re.escape('You already opted-in for everyone-mentions.')
opted_off_successfully = re.escape('You have opted-off for everyone-mentions.')
opted_off_failed = re.escape('You need to opt-in first before processing this command.')
mention_failed = re.escape('There are no users to mention.')

16
src/config/credentials.py Normal file
View File

@@ -0,0 +1,16 @@
import os
from dotenv import load_dotenv
load_dotenv()
bot_token = os.environ['bot_token']
app_url = os.environ['app_url']
port = os.environ['PORT']
firebaseConfig = {
"apiKey": os.environ['firebase_apiKey'],
"authDomain": os.environ['firebase_authDomain'],
"databaseURL": os.environ['firebase_databaseURL'],
"projectId": os.environ['firebase_projectId'],
"storageBucket": os.environ['firebase_storageBucket'],
}

9
src/config/handlers.py Normal file
View File

@@ -0,0 +1,9 @@
from ..handlers.inHandler import InHandler
from ..handlers.outHandler import OutHandler
from ..handlers.mentionHandler import MentionHandler
handlers = [
InHandler(),
OutHandler(),
MentionHandler()
]

34
src/firebaseProxy.py Normal file
View File

@@ -0,0 +1,34 @@
import pyrebase
from pyrebase.pyrebase import Database as FirebaseDB
from .config.credentials import firebaseConfig
class FirebaseProxy():
db: FirebaseDB
# Group specific values
group_index: str = 'groups'
# User specific values
id_index: str = 'id'
name_index: str = 'name'
def __init__(self) -> None:
firebase = pyrebase.pyrebase.initialize_app(firebaseConfig)
self.db = firebase.database()
def getChilds(self, *childs: str) -> FirebaseDB:
current = self.db
for child_index in childs:
current = current.child(child_index)
return current
@staticmethod
def getGroupPath(groupId: int) -> str:
return f'{FirebaseProxy.group_index}/{groupId}'
@staticmethod
def getUserPath(userId: int, groupId: int) -> str:
return f'{groupId}_{userId}'

View File

@@ -0,0 +1,18 @@
from abc import abstractmethod
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.handler import Handler
from telegram.update import Update
class HandlerInterface:
def __init__(self) -> None:
pass
@abstractmethod
def getBotHandler(self) -> Handler: raise Exception('getBotHandler method is not implemented')
@abstractmethod
def handle(self, update: Update, context: CallbackContext) -> None: raise Exception('handle method is not implemented')
@abstractmethod
def getCommandName(self) -> str: raise Exception('getCommandName method is not implemented')

39
src/handlers/inHandler.py Normal file
View File

@@ -0,0 +1,39 @@
from ..config.contents import opted_in_successfully, opted_in_failed
from ..repositories.userRepository import UserRepository
from ..firebaseProxy import FirebaseProxy
from .handlerInterface import HandlerInterface
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.commandhandler import CommandHandler
from telegram.update import Update
class InHandler(HandlerInterface):
botHandler: CommandHandler
commandName: str = 'in'
def __init__(self) -> None:
self.botHandler = CommandHandler(
self.getCommandName(),
self.handle
)
def handle(self, update: Update, context: CallbackContext) -> None:
groupId = update.effective_chat.id
userData = {
FirebaseProxy.id_index: update.effective_user.id,
FirebaseProxy.name_index: update.effective_user.username
}
userRepository = UserRepository()
if userRepository.isPresentInGroup(userData.get(FirebaseProxy.id_index), groupId):
update.message.reply_markdown_v2(text=opted_in_failed)
return
userRepository.addForGroup(userData, groupId)
update.message.reply_markdown_v2(text=opted_in_successfully)
def getBotHandler(self) -> CommandHandler:
return self.botHandler
def getCommandName(self) -> str:
return self.commandName

View File

@@ -0,0 +1,42 @@
from ..config.contents import mention_failed
from ..firebaseProxy import FirebaseProxy
from ..repositories.groupRepository import GroupRepository
from .handlerInterface import HandlerInterface
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.commandhandler import CommandHandler
from telegram.update import Update
class MentionHandler(HandlerInterface):
botHandler: CommandHandler
commandName: str = 'everyone'
def __init__(self) -> None:
self.botHandler = CommandHandler(
self.getCommandName(),
self.handle
)
def handle(self, update: Update, context: CallbackContext) -> None:
groupId = update.effective_chat.id
groupRepository = GroupRepository()
mentionMessage = self.buildMentionMessage(groupRepository.get(id=groupId))
update.message.reply_markdown_v2(text=mentionMessage)
def getBotHandler(self) -> CommandHandler:
return self.botHandler
def getCommandName(self) -> str:
return self.commandName
def buildMentionMessage(self, usersData: dict) -> str:
result = ''
for userData in usersData:
userId = str(userData.get(FirebaseProxy.id_index))
username = userData.get(FirebaseProxy.name_index) or userId
result += "*[%s](tg://user?id=%s)* " % (username, userId)
return result or mention_failed

View File

@@ -0,0 +1,39 @@
from ..config.contents import opted_off_successfully, opted_off_failed
from ..repositories.userRepository import UserRepository
from .handlerInterface import HandlerInterface
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.commandhandler import CommandHandler
from telegram.update import Update
class OutHandler(HandlerInterface):
botHandler: CommandHandler
commandName: str = 'out'
def __init__(self) -> None:
self.botHandler = CommandHandler(
self.getCommandName(),
self.handle
)
def handle(self, update: Update, context: CallbackContext) -> None:
groupId = update.effective_chat.id
userData = {
'id': update.effective_user.id,
'name': update.effective_user.username
}
userRepository = UserRepository()
if not userRepository.isPresentInGroup(userData.get('id'), groupId):
update.message.reply_markdown_v2(text=opted_off_failed)
return
userRepository.removeForGroup(userId=userData.get('id'), groupId=groupId)
update.message.reply_markdown_v2(text=opted_off_successfully)
def getBotHandler(self) -> CommandHandler:
return self.botHandler
def getCommandName(self) -> str:
return self.commandName

View File

@@ -0,0 +1,18 @@
from ..firebaseProxy import FirebaseProxy
class GroupRepository():
firebase: FirebaseProxy
def __init__(self) -> None:
self.firebase = FirebaseProxy()
def get(self, id: int) -> dict:
result = []
groupData = self.firebase.getChilds(FirebaseProxy.group_index, id).get()
if groupData.each():
for user_root in groupData.each():
result.append(user_root.val())
return result

View File

@@ -0,0 +1,30 @@
from ..firebaseProxy import FirebaseProxy
class UserRepository():
firebaseProxy: FirebaseProxy
def __init__(self) -> None:
self.firebaseProxy = FirebaseProxy()
def addForGroup(self, userData: dict, groupId: int) -> None:
self.firebaseProxy.getChilds(FirebaseProxy.getGroupPath(groupId)).update({
f'{groupId}_{userData.get("id")}': {
FirebaseProxy.id_index: userData.get("id"),
FirebaseProxy.name_index: userData.get("name")
}
})
def removeForGroup(self, userId: int, groupId: int) -> None:
self.firebaseProxy.getChilds(FirebaseProxy.getGroupPath(groupId)).update({
FirebaseProxy.getUserPath(userId, groupId): {}
})
def isPresentInGroup(self, userId: int, groupId: int) -> bool:
user = self.firebaseProxy.getChilds(
FirebaseProxy.getGroupPath(groupId),
FirebaseProxy.getUserPath(userId, groupId)
).get().val()
return bool(user)