Files
fymious-tg-bot/main.py
2026-01-27 16:14:48 +03:00

337 lines
12 KiB
Python

import os
import logging
import subprocess
import email
import tempfile
import html2text
from email import policy
from email.parser import BytesParser
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Bot token from environment variable
TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
# Allowed user IDs (add your Telegram user ID here for security)
ALLOWED_USERS = os.getenv('ALLOWED_USER_IDS', '').split(',')
def check_authorization(update: Update) -> bool:
"""Check if user is authorized to use admin commands."""
if not ALLOWED_USERS or ALLOWED_USERS == ['']:
# If no users configured, allow all (not recommended for production)
return True
user_id = str(update.effective_user.id)
return user_id in ALLOWED_USERS
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a message when the command /start is issued."""
keyboard = [
[InlineKeyboardButton("📬 Check Mail", callback_data="checkmail")],
[InlineKeyboardButton("❓ Help", callback_data="help")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
'Hi! I am Fymious Bot 🤖\n\n'
'Use the buttons below to check your emails!',
reply_markup=reply_markup
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a message when the command /help is issued."""
query = update.callback_query
if query:
await query.answer()
message = query.message
else:
message = update.message
keyboard = [[InlineKeyboardButton("📬 Check Mail", callback_data="checkmail")]]
reply_markup = InlineKeyboardMarkup(keyboard)
help_text = (
'Available commands:\n'
'/start - Start the bot\n'
'/help - Show this help message\n'
'/checkmail - List new emails in your mailbox\n'
'/readmail <number> - Read a specific email\n\n'
'Or just use the buttons below!'
)
if query:
await query.edit_message_text(help_text, reply_markup=reply_markup)
else:
await message.reply_text(help_text, reply_markup=reply_markup)
async def check_mail(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Check for new emails in the mailserver."""
query = update.callback_query
if query:
await query.answer()
message = query.message
user = query.from_user
else:
message = update.message
user = update.effective_user
# Check authorization
if not check_authorization(update):
await message.reply_text("❌ You are not authorized to use this command.")
logger.warning(f"Unauthorized access attempt by user {user.id}")
return
try:
# Execute docker command to list new emails
result = subprocess.run(
['docker', 'exec', 'mailserver', 'ls', '/var/mail/fymio.us/me/new/'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
await message.reply_text(
f"❌ Error checking mail:\n`{result.stderr}`",
parse_mode='Markdown'
)
return
# Parse the output
emails = result.stdout.strip().split('\n')
emails = [e.strip().strip("'") for e in emails if e.strip()]
if not emails or emails == ['']:
keyboard = [[InlineKeyboardButton("🔄 Refresh", callback_data="checkmail")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await message.reply_text("📭 No new emails!", reply_markup=reply_markup)
context.user_data['emails'] = []
return
# Store email list in user context
context.user_data['emails'] = emails
# Format the response
response = f"📬 You have {len(emails)} new email(s):\n\n"
for i, email_file in enumerate(emails, 1):
parts = email_file.split(',')
size_part = parts[1] if len(parts) > 1 else ""
if size_part.startswith('S='):
try:
size_kb = int(size_part[2:]) / 1024
size_str = f"{size_kb:.1f} KB"
except ValueError:
size_str = "?"
else:
size_str = "?"
response += f"{i}. Email ({size_str})\n"
# Create inline keyboard with buttons for each email
keyboard = []
# Add buttons in rows of 2
for i in range(1, len(emails) + 1, 2):
row = [InlineKeyboardButton(f"📧 #{i}", callback_data=f"read_{i}")]
if i + 1 <= len(emails):
row.append(InlineKeyboardButton(f"📧 #{i+1}", callback_data=f"read_{i+1}"))
keyboard.append(row)
# Add refresh button
keyboard.append([InlineKeyboardButton("🔄 Refresh", callback_data="checkmail")])
reply_markup = InlineKeyboardMarkup(keyboard)
if query:
await query.edit_message_text(response, reply_markup=reply_markup)
else:
await message.reply_text(response, reply_markup=reply_markup)
except subprocess.TimeoutExpired:
await message.reply_text("❌ Command timed out. Mail server might be slow.")
except FileNotFoundError:
await message.reply_text("❌ Docker command not found. Is Docker installed?")
except Exception as e:
logger.error(f"Error checking mail: {e}")
await message.reply_text(f"❌ An error occurred: {str(e)}")
async def read_mail_handler(update: Update, context: ContextTypes.DEFAULT_TYPE, email_num: int = None):
"""Read a specific email by number."""
query = update.callback_query
if query:
await query.answer()
message = query.message
user = query.from_user
else:
message = update.message
user = update.effective_user
# Check authorization
if not check_authorization(update):
await message.reply_text("❌ You are not authorized to use this command.")
logger.warning(f"Unauthorized access attempt by user {user.id}")
return
# Check if user has checked mail first
if 'emails' not in context.user_data or not context.user_data['emails']:
await message.reply_text("📭 Please use /checkmail first to see available emails.")
return
# Get email number from callback or command args
if email_num is None:
if not context.args:
await message.reply_text("❌ Please specify an email number. Usage: /readmail 1")
return
try:
email_num = int(context.args[0])
except ValueError:
await message.reply_text("❌ Invalid email number. Please provide a number.")
return
try:
emails = context.user_data['emails']
if email_num < 1 or email_num > len(emails):
await message.reply_text(f"❌ Invalid email number. Please choose between 1 and {len(emails)}")
return
# Get the email filename (array is 0-indexed)
email_filename = emails[email_num - 1]
email_path = f"/var/mail/fymio.us/me/new/{email_filename}"
# Read email content using docker exec
result = subprocess.run(
['docker', 'exec', 'mailserver', 'cat', email_path],
capture_output=True,
timeout=10
)
if result.returncode != 0:
await message.reply_text(f"❌ Error reading email:\n`{result.stderr.decode()}`", parse_mode='Markdown')
return
# Parse email
msg = BytesParser(policy=policy.default).parsebytes(result.stdout)
# Extract email details
subject = msg.get('Subject', 'No Subject')
from_addr = msg.get('From', 'Unknown')
to_addr = msg.get('To', 'Unknown')
date = msg.get('Date', 'Unknown')
# Get email body - handle both plain text and HTML
body = ""
html_body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain" and not body:
try:
body = part.get_content()
except:
pass
elif content_type == "text/html" and not html_body:
try:
html_body = part.get_content()
except:
pass
else:
try:
content_type = msg.get_content_type()
if content_type == "text/html":
html_body = msg.get_content()
else:
body = msg.get_content()
except:
body = "Could not extract email body"
# If we have HTML but no plain text, convert HTML to text
if html_body and not body:
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = True
h.ignore_emphasis = False
body = h.handle(html_body)
# Truncate body if too long (Telegram has message length limits)
max_body_length = 3000
if len(body) > max_body_length:
body = body[:max_body_length] + "\n\n... (truncated)"
# Format response
response = f"📧 *Email #{email_num}*\n\n"
response += f"*From:* {from_addr}\n"
response += f"*To:* {to_addr}\n"
response += f"*Date:* {date}\n"
response += f"*Subject:* {subject}\n"
response += f"\n━━━━━━━━━━━━━━━━━━\n\n"
response += body
# Add back button
keyboard = [[InlineKeyboardButton("⬅️ Back to List", callback_data="checkmail")]]
reply_markup = InlineKeyboardMarkup(keyboard)
if query:
await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup)
else:
await message.reply_text(response, parse_mode='Markdown', reply_markup=reply_markup)
except subprocess.TimeoutExpired:
await message.reply_text("❌ Command timed out while reading email.")
except Exception as e:
logger.error(f"Error reading mail: {e}", exc_info=True)
await message.reply_text(f"❌ An error occurred: {str(e)}")
async def read_mail(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Read mail command handler."""
await read_mail_handler(update, context)
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle button callbacks."""
query = update.callback_query
if query.data == "checkmail":
await check_mail(update, context)
elif query.data == "help":
await help_command(update, context)
elif query.data.startswith("read_"):
email_num = int(query.data.split("_")[1])
await read_mail_handler(update, context, email_num)
else:
await query.answer("Unknown action")
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Echo the user message."""
await update.message.reply_text(f"You said: {update.message.text}")
def main():
"""Start the bot."""
if not TOKEN:
logger.error("TELEGRAM_BOT_TOKEN environment variable not set!")
return
# Create the Application
application = Application.builder().token(TOKEN).build()
# Register handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("checkmail", check_mail))
application.add_handler(CommandHandler("readmail", read_mail))
application.add_handler(CallbackQueryHandler(button_callback))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
# Start the Bot
logger.info("Bot is starting...")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()