#!/usr/bin/env python3 # coding=utf-8 import logging import config import sqlite3 from telegram.ext import Updater, MessageHandler, CallbackQueryHandler, Filters from telegram import InlineKeyboardButton, InlineKeyboardMarkup logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - \ %(message)s', level=logging.INFO) def get_callback_id(db): cursor = db.cursor() query = ('SELECT max(callback) FROM message_ids') cursor.execute(query) return int(cursor.fetchone()[0]) + 1 def add_message_id(db, message_id, callback): cursor = db.cursor() query = ("INSERT INTO message_ids(callback, message_id) values(?,?)") cursor.execute(query, (callback, message_id)) db.commit() def add_vote(db, callback_id, user=None): user_id = user.id name = str(user.first_name) if user.last_name: name += " " + str(user.last_name) cursor = db.cursor() query = ("INSERT INTO poll(callback, user_id, name) values(?,?,?)") cursor.execute(query, (callback_id, user_id, name)) db.commit() def delete_vote(db, callback_id, user): user_id = user.id query = "DELETE FROM poll WHERE callback=? AND user_id=?" cursor.execute(query, (callback_id, user_id)) db.commit() def check_vote(db, callback_id, user): user_id = user.id cursor = db.cursor() query = ("SELECT * FROM poll WHERE callback=? AND user_id=?") cursor.execute(query, (callback_id, user_id)) if cursor.fetchone(): return True else: return False def update_message(bot, db, callback_id): cursor = db.cursor() query = "SELECT message_id FROM message_ids WHERE callback = ?" cursor.execute(query, (callback_id,)) message_id = cursor.fetchone()[0] query = "SELECT count(user_id) FROM poll WHERE callback = ?" cursor.execute(query, (callback_id,)) count = cursor.fetchone()[0] keyboard = InlineKeyboardMarkup([[InlineKeyboardButton( text=f"❤️ {count}", callback_data=callback_id)]]) bot.edit_message_reply_markup(chat_id=config.channel_id, message_id=message_id, reply_markup=keyboard) def send_stats(bot, update, db, tag): cursor = db.cursor() query = ("SELECT user_id, name FROM poll NATURAL JOIN message_ids WHERE " "message_id = ?") cursor.execute(query, (update.message.forward_from_message_id,)) voters = [] for row in cursor: voters.append(row) if len(voters) == 0: bot.send_message(chat_id=update.message.chat_id, text="No Votes") else: text = "Voters\n" for i in voters: if tag: text += f"{i[1]}\n" else: text += f"{i[1]}\n" bot.send_message(chat_id=update.message.chat_id, text=text, parse_mode="HTML") def post(bot, update, db): chat_id = update.message.chat_id user_id = update.message.from_user.id if (update.message.forward_from_chat and update.message.forward_from_chat.id == config.channel_id and user_id in config.allowed_users): tag = (chat_id == user_id) send_stats(bot, update, db, tag) return if chat_id < 0: return if chat_id in config.allowed_users: callback_id = get_callback_id(db) photo = update.message.photo[0] file_id = photo.file_id keyboard = InlineKeyboardMarkup([[InlineKeyboardButton(text="❤️", callback_data=callback_id)]]) reply = bot.send_photo(chat_id=config.channel_id, photo=file_id, reply_markup=keyboard) add_message_id(db, reply.message_id, callback_id) else: bot.send_message(chat_id=chat_id, text="Not Authorized") def click(bot, update, db): user = update.callback_query.from_user callback_id = update.callback_query.data if not check_vote(db, callback_id, user): add_vote(db, callback_id, user) else: delete_vote(db, callback_id, user) update_message(bot, db, callback_id) bot.answer_callback_query(update.callback_query.id) updater = Updater(token=config.api_key) dispatcher = updater.dispatcher db = sqlite3.connect("chanbot.sqlite", check_same_thread=False) cursor = db.cursor() dispatcher.add_handler(MessageHandler(Filters.photo, lambda x, y: post(x, y, db))) dispatcher.add_handler(CallbackQueryHandler(lambda x, y: click(x, y, db))) updater.start_polling()