Banned users env, access validator, removed silent command, code quality improvements

This commit is contained in:
miloszowi 2021-10-08 15:25:47 +02:00
parent d05d0c0904
commit 431b004284
28 changed files with 268 additions and 261 deletions

@ -4,7 +4,16 @@ All notable changes to this project will be documented in this file.
## [UNRELEASED] - 07.10.2021
### Added
- Inline Query for join/leave/everyone
- Validator class for group name
- Group name validator
- Banned users env
- Access validator
- ActionNotAllowedException
### Changed
- code quality improvements
### Deleted
- `/silent` command
### Updated
- start command content

@ -15,7 +15,6 @@
* [`/leave`](#leave)
* [`/everyone`](#everyone)
* [`/groups`](#groups)
* [`/silent`](#silent)
* [`/start`](#start)
## Description
Everyone Mention Bot is simple, but useful telegram bot to gather group members attention.
@ -62,6 +61,7 @@ docker/logs <container>
- `MONGODB_PASSWORD` - MongoDB password
- `MONGODB_HOSTNAME` - MongoDB host (default `database` - container name)
- `MONGODB_PORT` - MongoDB port (default `27017` - given in docker-compose configuration)
- `BANNED_USERS` - user ids separated by comma that are not allowed to use the bot
- `database.env`
- `MONGO_INITDB_ROOT_USERNAME` - conf from `app.env`
@ -101,16 +101,6 @@ If user does not have nickname, it will first try to assign his firstname, then
Will display available groups for this chat as well with members count.
![groups command example](docs/groups.png)
### `/silent`
```
/silent <group_name>
```
Will display all every member of given group (`default` if not given) but without notyfing them.
![silent command example](docs/silent.png)
### `/start`
Start & Help message

@ -6,4 +6,6 @@ MONGODB_DATABASE=
MONGODB_USERNAME=
MONGODB_PASSWORD=
MONGODB_HOSTNAME=localhost
MONGODB_PORT=27017
MONGODB_PORT=27017
BANNED_USERS=

Binary file not shown.

Before

(image error) Size: 27 KiB

@ -1,27 +1,21 @@
from logging import Logger
import logging
from telegram.ext import Updater
from telegram.ext.dispatcher import Dispatcher
from logger import Logger
from config.credentials import BOT_TOKEN, PORT, WEBHOOK_URL
from bot.handler import (groupsHandler, joinHandler, mentionHandler, leaveHandler,
silentMentionHandler, startHandler, inlineQueryHandler)
from bot.handler import *
from bot.handler.abstractHandler import AbstractHandler
from config.credentials import BOT_TOKEN, PORT, WEBHOOK_URL
from logger import Logger
class App:
updater: Updater
dispatcher: Dispatcher
log_file: str = '/var/log/bot.log'
log_format: str = '%(levelname)s-%(asctime)s: %(message)s'
def __init__(self):
self.updater = Updater(BOT_TOKEN)
def run(self) -> None:
self.setup_logging()
Logger.register()
self.register_handlers()
self.register_webhook()
@ -29,9 +23,7 @@ class App:
def register_handlers(self) -> None:
for handler in AbstractHandler.__subclasses__():
self.updater.dispatcher.add_handler(
handler().get_bot_handler()
)
self.updater.dispatcher.add_handler(handler().bot_handler)
def register_webhook(self) -> None:
self.updater.start_webhook(
@ -41,15 +33,9 @@ class App:
webhook_url="/".join([WEBHOOK_URL, BOT_TOKEN])
)
Logger.get_logger(Logger.action_logger).info(
f'Webhook configured, listening on {WEBHOOK_URL}/<bot-token>'
)
Logger.info(f'Webhook configured, listening on {WEBHOOK_URL}/<bot-token>')
def setup_logging(self) -> None:
logger = Logger()
logger.setup()
if __name__ == "__main__":
app = App()
app.run()

@ -0,0 +1,5 @@
__all__ = [
'abstractHandler', 'everyoneHandler', 'groupsHandler',
'inlineQueryHandler', 'joinHandler', 'leaveHandler',
'startHandler'
]

@ -1,37 +1,13 @@
from abc import abstractmethod
from bot.message.messageData import MessageData
from logger import Logger
from telegram.ext import Handler
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.handler import Handler
from telegram.update import Update
from telegram.utils.helpers import mention_markdown
class AbstractHandler:
@abstractmethod
def get_bot_handler(self) -> Handler: raise Exception('get_bot_handler method is not implemented')
class AbstractHandler:
bot_handler: Handler
@abstractmethod
def handle(self, update: Update, context: CallbackContext) -> None: raise Exception('handle method is not implemented')
@abstractmethod
def log_action(self, message_data: MessageData) -> None: raise Exception('log_action method is not implemented')
def interpolate_reply(self, reply: str, message_data: MessageData):
return reply.format(
mention_markdown(message_data.user_id, message_data.username),
message_data.group_name
)
def reply_markdown(self, update: Update, message: str) -> None:
try:
update.effective_message.reply_markdown_v2(text=message)
except Exception as err:
Logger.error(str(err))
def reply_html(self, update: Update, html: str) -> None:
try:
update.effective_message.reply_html(text=html)
except Exception as err:
Logger.error(str(err))
def handle(self, update: Update, context: CallbackContext) -> None:
raise Exception('handle method is not implemented')

@ -0,0 +1,35 @@
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.commandhandler import CommandHandler
from telegram.update import Update
from bot.handler.abstractHandler import AbstractHandler
from bot.message.messageData import MessageData
from bot.message.replier import Replier
from config.contents import mention_failed
from logger import Logger
from repository.userRepository import UserRepository
from utils.messageBuilder import MessageBuilder
class EveryoneHandler(AbstractHandler):
bot_handler: CommandHandler
user_repository: UserRepository
action: str = 'everyone'
def __init__(self) -> None:
self.bot_handler = CommandHandler(self.action, self.handle)
self.user_repository = UserRepository()
def handle(self, update: Update, context: CallbackContext) -> None:
try:
message_data = MessageData.create_from_arguments(update, context)
except Exception as e:
return Replier.markdown(update, str(e))
users = self.user_repository.get_all_for_chat(message_data.chat_id)
if users:
Replier.markdown(update, MessageBuilder.mention_message(users))
return Logger.action(message_data, self.action)
Replier.markdown(update, mention_failed)

@ -1,45 +1,36 @@
from typing import Iterable
import prettytable as pt
from bot.handler.abstractHandler import AbstractHandler
from bot.message.messageData import MessageData
from config.contents import no_groups
from entity.group import Group
from logger import Logger
from repository.groupRepository import GroupRepository
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.commandhandler import CommandHandler
from telegram.update import Update
from bot.handler.abstractHandler import AbstractHandler
from bot.message.messageData import MessageData
from bot.message.replier import Replier
from config.contents import no_groups
from exception.notFoundException import NotFoundException
from logger import Logger
from repository.groupRepository import GroupRepository
from utils.messageBuilder import MessageBuilder
class GroupsHandler(AbstractHandler):
bot_handler: CommandHandler
group_repository: GroupRepository
action: str = 'groups'
def __init__(self) -> None:
self.bot_handler = CommandHandler('groups', self.handle)
self.bot_handler = CommandHandler(self.action, self.handle)
self.group_repository = GroupRepository()
def handle(self, update: Update, context: CallbackContext) -> None:
message_data = MessageData.create_from_arguments(update, context, False)
try:
message_data = MessageData.create_from_arguments(update, context, False)
except Exception as e:
return Replier.markdown(update, str(e))
groups = self.group_repository.get_by_chat_id(message_data.chat_id)
try:
groups = self.group_repository.get_by_chat_id(message_data.chat_id)
Replier.html(update, MessageBuilder.group_message(groups))
if groups:
self.reply_html(update, self.build_groups_message(groups))
return self.log_action(message_data)
self.reply_markdown(update, no_groups)
def get_bot_handler(self) -> CommandHandler:
return self.bot_handler
def log_action(self, message_data: MessageData) -> None:
Logger.info(f'User {message_data.username} called /groups for {message_data.chat_id}')
def build_groups_message(self, groups: Iterable[Group]) -> str:
resultTable = pt.PrettyTable(['Name', 'Members'])
resultTable.add_rows([[record.group_name, record.users_count] for record in groups])
return f'<pre>{str(resultTable)}</pre>'
Logger.action(message_data, self.action)
except NotFoundException:
Replier.markdown(update, no_groups)

@ -1,21 +1,29 @@
from bot.handler.abstractHandler import AbstractHandler
from entity.group import Group
from telegram import InlineQueryResultArticle
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.commandhandler import CommandHandler
from telegram.ext.inlinequeryhandler import \
InlineQueryHandler as CoreInlineQueryHandler
from telegram.inline.inputtextmessagecontent import InputTextMessageContent
from telegram.update import Update
from bot.handler.abstractHandler import AbstractHandler
from entity.group import Group
from exception.actionNotAllowedException import ActionNotAllowedException
from validator.accessValidator import AccessValidator
class InlineQueryHandler(AbstractHandler):
bot_handler: CommandHandler
bot_handler: CoreInlineQueryHandler
def __init__(self) -> None:
self.bot_handler = CoreInlineQueryHandler(self.handle)
def handle(self, update: Update, context: CallbackContext) -> None:
try:
AccessValidator.validate(str(update.effective_user.id))
except ActionNotAllowedException:
update.inline_query.answer([])
return
group_display = update.inline_query.query or Group.default_name
group = '' if group_display == Group.default_name else group_display
@ -41,6 +49,3 @@ class InlineQueryHandler(AbstractHandler):
]
update.inline_query.answer(results, cache_time=4800)
def get_bot_handler(self) -> CoreInlineQueryHandler:
return self.bot_handler

@ -1,46 +1,41 @@
from telegram.utils.helpers import mention_markdown
from bot.handler.abstractHandler import AbstractHandler
from bot.message.messageData import MessageData
from config.contents import joined, not_joined
from exception.invalidArgumentException import InvalidArgumentException
from exception.notFoundException import NotFoundException
from logger import Logger
from repository.userRepository import UserRepository
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.commandhandler import CommandHandler
from telegram.update import Update
from bot.handler.abstractHandler import AbstractHandler
from bot.message.messageData import MessageData
from bot.message.replier import Replier
from config.contents import joined, not_joined
from exception.notFoundException import NotFoundException
from logger import Logger
from repository.userRepository import UserRepository
class JoinHandler(AbstractHandler):
bot_handler: CommandHandler
user_repository: UserRepository
action: str = 'join'
def __init__(self) -> None:
self.bot_handler = CommandHandler('join', self.handle)
self.bot_handler = CommandHandler(self.action, self.handle)
self.user_repository = UserRepository()
def handle(self, update: Update, context: CallbackContext) -> None:
try:
message_data = MessageData.create_from_arguments(update, context)
except InvalidArgumentException as e:
return self.reply_markdown(update, str(e))
except Exception as e:
return Replier.markdown(update, str(e))
try:
user = self.user_repository.get_by_id(message_data.user_id)
if user.is_in_chat(message_data.chat_id):
return self.reply_markdown(update, self.interpolate_reply(not_joined, message_data))
return Replier.markdown(update, Replier.interpolate(not_joined, message_data))
user.add_to_chat(message_data.chat_id)
self.user_repository.save(user)
except NotFoundException:
self.user_repository.save_by_message_data(message_data)
self.reply_markdown(update, self.interpolate_reply(joined, message_data))
self.log_action(message_data)
def get_bot_handler(self) -> CommandHandler:
return self.bot_handler
def log_action(self, message_data: MessageData) -> None:
Logger.info(f'User {message_data.username} joined {message_data.chat_id}')
Replier.markdown(update, Replier.interpolate(joined, message_data))
Logger.action(message_data, self.action)

@ -1,45 +1,37 @@
from bot.handler.abstractHandler import AbstractHandler
from bot.message.messageData import MessageData
from config.contents import left, not_left
from exception.invalidArgumentException import InvalidArgumentException
from exception.notFoundException import NotFoundException
from logger import Logger
from repository.userRepository import UserRepository
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.commandhandler import CommandHandler
from telegram.update import Update
from bot.handler.abstractHandler import AbstractHandler
from bot.message.messageData import MessageData
from bot.message.replier import Replier
from config.contents import left, not_left
from exception.notFoundException import NotFoundException
from logger import Logger
from repository.userRepository import UserRepository
class LeaveHandler(AbstractHandler):
bot_handler: CommandHandler
user_repository: UserRepository
action: str = 'leave'
def __init__(self) -> None:
self.bot_handler = CommandHandler('leave', self.handle)
self.bot_handler = CommandHandler(self.action, self.handle)
self.user_repository = UserRepository()
def handle(self, update: Update, context: CallbackContext) -> None:
try:
message_data = MessageData.create_from_arguments(update, context)
except InvalidArgumentException as e:
return self.reply_markdown(update, str(e))
except Exception as e:
return Replier.markdown(update, str(e))
try:
user = self.user_repository.get_by_id(message_data.user_id)
user = self.user_repository.get_by_id_and_chat_id(message_data.user_id, message_data.chat_id)
user.remove_from_chat(message_data.chat_id)
self.user_repository.save(user)
if not user.is_in_chat(message_data.chat_id):
raise NotFoundException()
Replier.markdown(update, Replier.interpolate(left, message_data))
Logger.action(message_data, self.action)
except NotFoundException:
return self.reply_markdown(update, self.interpolate_reply(not_left, message_data))
user.remove_from_chat(message_data.chat_id)
self.user_repository.save(user)
self.reply_markdown(update, self.interpolate_reply(left, message_data))
self.log_action(message_data)
def get_bot_handler(self) -> CommandHandler:
return self.bot_handler
def log_action(self, message_data: MessageData) -> None:
Logger.info(f'User {message_data.username} left {message_data.chat_id}')
return Replier.markdown(update, Replier.interpolate(not_left, message_data))

@ -1,47 +0,0 @@
from typing import Iterable
from telegram.utils.helpers import mention_markdown
from bot.handler.abstractHandler import AbstractHandler
from bot.message.messageData import MessageData
from config.contents import mention_failed
from entity.user import User
from exception.invalidArgumentException import InvalidArgumentException
from logger import Logger
from repository.userRepository import UserRepository
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.commandhandler import CommandHandler
from telegram.update import Update
class MentionHandler(AbstractHandler):
bot_handler: CommandHandler
user_repository: UserRepository
def __init__(self) -> None:
self.bot_handler = CommandHandler('everyone', self.handle)
self.user_repository = UserRepository()
def handle(self, update: Update, context: CallbackContext) -> None:
try:
message_data = MessageData.create_from_arguments(update, context)
except InvalidArgumentException as e:
return self.reply_markdown(update, str(e))
users = self.user_repository.get_all_for_chat(message_data.chat_id)
if users:
self.reply_markdown(update, self.build_mention_message(users))
return self.log_action(message_data)
self.reply_markdown(update, mention_failed)
def get_bot_handler(self) -> CommandHandler:
return self.bot_handler
def log_action(self, message_data: MessageData) -> None:
Logger.info(f'User {message_data.username} called /everyone for {message_data.chat_id}')
def build_mention_message(self, users: Iterable[User]) -> str:
return ' '.join([mention_markdown(user.user_id, user.username) for user in users])

@ -1,21 +0,0 @@
from typing import Iterable
from entity.user import User
from logger import Logger
from telegram.ext.commandhandler import CommandHandler
from bot.handler.abstractHandler import AbstractHandler
from bot.handler.mentionHandler import MentionHandler
from bot.message.messageData import MessageData
class MentionHandler(MentionHandler, AbstractHandler):
def __init__(self) -> None:
super().__init__()
self.bot_handler = CommandHandler('silent', self.handle)
def build_mention_message(self, users: Iterable[User]) -> str:
return ' '.join([user.username for user in users])
def log_action(self, message_data: MessageData) -> None:
Logger.info(f'User {message_data.username} called /silent for {message_data.chat_id}')

@ -1,25 +1,25 @@
from config.contents import start_text
from logger import Logger
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext.commandhandler import CommandHandler
from telegram.update import Update
from bot.handler.abstractHandler import AbstractHandler
from bot.message.messageData import MessageData
from bot.message.replier import Replier
from config.contents import start_text
from logger import Logger
class StartHandler(AbstractHandler):
bot_handler: CommandHandler
action: str = 'start'
def __init__(self) -> None:
self.bot_handler = CommandHandler('start', self.handle)
self.bot_handler = CommandHandler(self.action, self.handle)
def handle(self, update: Update, context: CallbackContext) -> None:
self.reply_markdown(update, start_text)
self.log_action(MessageData.create_from_arguments(update, context))
def get_bot_handler(self) -> CommandHandler:
return self.bot_handler
def log_action(self, message_data: MessageData) -> None:
Logger.info(f'User {message_data.username} called /start for {message_data.chat_id}')
try:
MessageData.create_from_arguments(update, context)
except Exception as e:
return Replier.markdown(update, str(e))
Replier.markdown(update, start_text)
Logger.action(MessageData.create_from_arguments(update, context), self.action)

@ -1,19 +1,18 @@
from __future__ import annotations
import re
from dataclasses import dataclass
import names
from entity.group import Group
from exception.invalidArgumentException import InvalidArgumentException
from telegram.ext.callbackcontext import CallbackContext
from telegram.update import Update
from entity.group import Group
from validator.accessValidator import AccessValidator
from validator.groupNameValidator import GroupNameValidator
@dataclass
class MessageData():
class MessageData:
user_id: str
chat_id: str
group_name: str
@ -21,6 +20,9 @@ class MessageData():
@staticmethod
def create_from_arguments(update: Update, context: CallbackContext, include_group: bool = True) -> MessageData:
user_id = str(update.effective_user.id)
AccessValidator.validate(user_id)
chat_id = str(update.effective_chat.id)
group_name = Group.default_name
@ -31,9 +33,7 @@ class MessageData():
if group_name is not Group.default_name:
chat_id += f'~{group_name}'
user_id = str(update.effective_user.id)
username = update.effective_user.username or update.effective_user.first_name
if not username:

@ -0,0 +1,29 @@
from telegram import Update
from telegram.utils.helpers import mention_markdown
from bot.message.messageData import MessageData
from logger import Logger
class Replier:
@staticmethod
def interpolate(content: str, message_data: MessageData):
return content.format(
mention_markdown(message_data.user_id, message_data.username),
message_data.group_name
)
@staticmethod
def markdown(update: Update, message: str) -> None:
try:
update.effective_message.reply_markdown_v2(message)
except Exception as err:
Logger.error(str(err))
@staticmethod
def html(update: Update, html: str) -> None:
try:
update.effective_message.reply_html(html)
except Exception as err:
Logger.error(str(err))

@ -5,7 +5,6 @@ not_left = '{} did not join group `{}` before'
mention_failed = 'There are no users to mention'
no_groups = 'There are no groups for this chat'
start_text = """
Hello there
I am @everyone\_mention\_bot
@ -32,4 +31,4 @@ To display all members in a group:
You can also try to tag me @everyone\_mention\_bot and then enter group name
Possible results will be displayed
"""
"""

@ -13,3 +13,5 @@ MONGODB_USERNAME = os.environ['MONGODB_USERNAME']
MONGODB_PASSWORD = os.environ['MONGODB_PASSWORD']
MONGODB_HOSTNAME = os.environ['MONGODB_HOSTNAME']
MONGODB_PORT = os.environ['MONGODB_PORT']
BANNED_USERS = os.environ['BANNED_USERS'].split(',') or []

@ -1,13 +1,14 @@
from urllib.parse import quote_plus
from pymongo import MongoClient
from pymongo.database import Database
from config.credentials import (MONGODB_DATABASE, MONGODB_HOSTNAME,
MONGODB_PASSWORD, MONGODB_PORT,
MONGODB_USERNAME)
from pymongo import MongoClient
from pymongo.database import Database
class Client():
class Client:
mongo_client: MongoClient
database: Database
@ -32,7 +33,7 @@ class Client():
def update_one(self, collection: str, filter: dict, data: dict) -> None:
self.database.get_collection(collection).update_one(
filter,
{ "$set" : data }
{"$set": data}
)
def aggregate(self, collection, pipeline: list):

@ -4,7 +4,7 @@ from dataclasses import dataclass
@dataclass
class Group():
class Group:
chat_id: str
group_name: str
users_count: int

@ -1,14 +1,14 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable
from typing import List
@dataclass
class User():
class User:
user_id: str
username: str
chats: Iterable[str]
chats: List[str]
collection: str = 'users'
id_index: str = '_id'

@ -0,0 +1,2 @@
class ActionNotAllowedException(Exception):
pass

@ -3,7 +3,10 @@ from __future__ import annotations
import logging
import os
from bot.message.messageData import MessageData
# noinspection SpellCheckingInspection
class Logger:
action_logger: str = 'action-logger'
action_logger_file: str = '/var/log/bot/action.log'
@ -11,9 +14,10 @@ class Logger:
main_logger: str = 'main-logger'
main_logger_file: str = '/var/log/bot/app.log'
formatter: str = logging.Formatter('%(asctime)s[%(levelname)s]: %(message)s')
# noinspection SpellCheckingInspection
formatter: logging.Formatter = logging.Formatter('%(asctime)s[%(levelname)s]: %(message)s')
def setup(self) -> None:
def __init__(self):
self.configure(self.action_logger, self.action_logger_file, logging.INFO)
self.configure(self.main_logger, self.main_logger_file, logging.ERROR)
@ -32,12 +36,22 @@ class Logger:
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
@staticmethod
def register() -> None:
Logger()
@staticmethod
def get_logger(logger_name) -> logging.Logger:
return logging.getLogger(logger_name)
@staticmethod
def info(message: str) -> None:
Logger.get_logger(Logger.action_logger).info(message)
@staticmethod
def error(message: str) -> None:
Logger.get_logger(Logger.main_logger).error(message)
Logger.get_logger(Logger.main_logger).error(message)
@staticmethod
def action(message_data: MessageData, action: str) -> None:
Logger.info(f'User {message_data.username}({message_data.user_id}) called {action.upper()} for {message_data.chat_id}')

@ -1,13 +1,13 @@
import itertools
import re
from typing import Iterable
from database.client import Client
from entity.group import Group
from entity.user import User
from exception.notFoundException import NotFoundException
class GroupRepository():
class GroupRepository:
client: Client
count: str = 'count'
@ -19,22 +19,22 @@ class GroupRepository():
groups = self.client.aggregate(
User.collection,
[
{ "$unwind": f'${User.chats_index}' },
{"$unwind": f'${User.chats_index}'},
{
"$match": {
User.chats_index: { "$regex": re.compile(f'^{chat_id}.*$') },
User.chats_index: {"$regex": re.compile(f'^{chat_id}.*$')},
},
},
{
"$group": {
"_id": {
"$last": { "$split": [f'${User.chats_index}', "~"] },
"$last": {"$split": [f'${User.chats_index}', "~"]},
},
self.count: { "$count": {} },
self.count: {"$count": {}},
},
},
{
"$sort": { '_id': 1 }
"$sort": {'_id': 1}
}
]
)
@ -50,4 +50,7 @@ class GroupRepository():
Group(chat_id, group_name, group[self.count])
)
if not groups:
raise NotFoundException
return result

@ -1,4 +1,4 @@
from typing import Iterable, Optional
from typing import Iterable
from bot.message.messageData import MessageData
from database.client import Client
@ -6,57 +6,65 @@ from entity.user import User
from exception.notFoundException import NotFoundException
class UserRepository():
class UserRepository:
client: Client
def __init__(self) -> None:
self.client = Client()
def get_by_id(self, id: str) -> User:
def get_by_id(self, user_id: str) -> User:
user = self.client.find_one(
User.collection,
{
User.id_index: id
User.id_index: user_id
}
)
if not user:
raise NotFoundException(f'Could not find user with "{id}" id')
raise NotFoundException(f'Could not find user with "{user_id}" id')
return User(
user[User.id_index],
user[User.username_index],
user[User.chats_index]
)
def get_by_id_and_chat_id(self, user_id: str, chat_id: str) -> User:
user = self.get_by_id(user_id)
if not user.is_in_chat(chat_id):
raise NotFoundException
return user
def save(self, user: User) -> None:
self.client.update_one(
User.collection,
{ User.id_index: user.user_id },
{User.id_index: user.user_id},
user.to_mongo_document()
)
def save_by_message_data(self, data: MessageData) -> None:
self.client.insert_one(
User.collection,
User.collection,
{
User.id_index: data.user_id,
User.username_index: data.username,
User.chats_index: [data.chat_id]
}
)
def get_all_for_chat(self, chat_id: str) -> Iterable[User]:
result = []
users = self.client.find_many(
User.collection,
{
User.chats_index: {
"$in" : [chat_id]
"$in": [chat_id]
}
}
)
for record in users:
result.append(User.from_mongo_document(record))

@ -0,0 +1,21 @@
from typing import Iterable
from prettytable import prettytable
from telegram.utils.helpers import mention_markdown
from entity.group import Group
from entity.user import User
class MessageBuilder:
@staticmethod
def group_message(groups: Iterable[Group]) -> str:
table = prettytable.PrettyTable(['Name', 'Members'])
table.add_rows([[record.group_name, record.users_count] for record in groups])
return f'<pre>{str(table)}</pre>'
@staticmethod
def mention_message(users: Iterable[User]) -> str:
return ' '.join([mention_markdown(user.user_id, user.username) for user in users])

@ -0,0 +1,10 @@
from config.credentials import BANNED_USERS
from exception.actionNotAllowedException import ActionNotAllowedException
class AccessValidator:
@staticmethod
def validate(user_id: str) -> None:
if user_id in BANNED_USERS:
raise ActionNotAllowedException('You are banned')