Files
fymious-tg-bot/main.py
2026-01-27 16:02:24 +03:00

240 lines
9.0 KiB
Python

import os
import logging
import subprocess
import email
from email import policy
from email.parser import BytesParser
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Bot token from environment variable
TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
# Allowed user IDs (add your Telegram user ID here for security)
ALLOWED_USERS = os.getenv('ALLOWED_USER_IDS', '').split(',')
def check_authorization(update: Update) -> bool:
"""Check if user is authorized to use admin commands."""
if not ALLOWED_USERS or ALLOWED_USERS == ['']:
# If no users configured, allow all (not recommended for production)
return True
user_id = str(update.effective_user.id)
return user_id in ALLOWED_USERS
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a message when the command /start is issued."""
await update.message.reply_text(
'Hi! I am Fymious Bot. Send me any message and I will echo it back!\n\n'
'Available commands:\n'
'/start - Start the bot\n'
'/help - Show help message\n'
'/checkmail - List new emails\n'
'/readmail <number> - Read a specific email'
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a message when the command /help is issued."""
await update.message.reply_text(
'Available commands:\n'
'/start - Start the bot\n'
'/help - Show this help message\n'
'/checkmail - List new emails in your mailbox\n'
'/readmail <number> - Read a specific email (e.g., /readmail 1)\n'
'\nJust send me any text and I will echo it back!'
)
async def check_mail(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Check for new emails in the mailserver."""
# Check authorization
if not check_authorization(update):
await update.message.reply_text("❌ You are not authorized to use this command.")
logger.warning(f"Unauthorized access attempt by user {update.effective_user.id}")
return
try:
# Execute docker command to list new emails
result = subprocess.run(
['docker', 'exec', 'mailserver', 'ls', '/var/mail/fymio.us/me/new/'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
await update.message.reply_text(
f"❌ Error checking mail:\n`{result.stderr}`",
parse_mode='Markdown'
)
return
# Parse the output
emails = result.stdout.strip().split('\n')
emails = [e.strip().strip("'") for e in emails if e.strip()]
if not emails or emails == ['']:
await update.message.reply_text("📭 No new emails!")
# Clear stored emails
context.user_data['emails'] = []
return
# Store email list in user context for readmail command
context.user_data['emails'] = emails
# Format the response
response = f"📬 You have {len(emails)} new email(s):\n\n"
for i, email_file in enumerate(emails, 1):
# Parse email filename to extract info
parts = email_file.split(',')
timestamp_part = parts[0] if parts else email_file
size_part = parts[1] if len(parts) > 1 else ""
# Extract size in bytes
size_bytes = ""
if size_part.startswith('S='):
size_bytes = size_part[2:]
try:
size_kb = int(size_bytes) / 1024
size_str = f"{size_kb:.1f} KB"
except ValueError:
size_str = size_bytes + " bytes"
else:
size_str = "unknown size"
response += f"{i}. {timestamp_part} ({size_str})\n"
response += f"\n💡 Use /readmail <number> to read an email"
await update.message.reply_text(response)
except subprocess.TimeoutExpired:
await update.message.reply_text("❌ Command timed out. Mail server might be slow.")
except FileNotFoundError:
await update.message.reply_text("❌ Docker command not found. Is Docker installed?")
except Exception as e:
logger.error(f"Error checking mail: {e}")
await update.message.reply_text(f"❌ An error occurred: {str(e)}")
async def read_mail(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Read a specific email by number."""
# Check authorization
if not check_authorization(update):
await update.message.reply_text("❌ You are not authorized to use this command.")
logger.warning(f"Unauthorized access attempt by user {update.effective_user.id}")
return
# Check if user has checked mail first
if 'emails' not in context.user_data or not context.user_data['emails']:
await update.message.reply_text("📭 Please use /checkmail first to see available emails.")
return
# Parse email number from command
if not context.args:
await update.message.reply_text("❌ Please specify an email number. Usage: /readmail 1")
return
try:
email_num = int(context.args[0])
emails = context.user_data['emails']
if email_num < 1 or email_num > len(emails):
await update.message.reply_text(f"❌ Invalid email number. Please choose between 1 and {len(emails)}")
return
# Get the email filename (array is 0-indexed)
email_filename = emails[email_num - 1]
email_path = f"/var/mail/fymio.us/me/new/{email_filename}"
# Read email content using docker exec
result = subprocess.run(
['docker', 'exec', 'mailserver', 'cat', email_path],
capture_output=True,
timeout=10
)
if result.returncode != 0:
await update.message.reply_text(f"❌ Error reading email:\n`{result.stderr.decode()}`", parse_mode='Markdown')
return
# Parse email
msg = BytesParser(policy=policy.default).parsebytes(result.stdout)
# Extract email details
subject = msg.get('Subject', 'No Subject')
from_addr = msg.get('From', 'Unknown')
to_addr = msg.get('To', 'Unknown')
date = msg.get('Date', 'Unknown')
# Get email body
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
try:
body = part.get_content()
break
except:
pass
else:
try:
body = msg.get_content()
except:
body = "Could not extract email body"
# Truncate body if too long (Telegram has message length limits)
max_body_length = 3000
if len(body) > max_body_length:
body = body[:max_body_length] + "\n\n... (truncated)"
# Format response
response = f"📧 **Email #{email_num}**\n\n"
response += f"**From:** {from_addr}\n"
response += f"**To:** {to_addr}\n"
response += f"**Date:** {date}\n"
response += f"**Subject:** {subject}\n"
response += f"\n━━━━━━━━━━━━━━━━━━\n\n"
response += body
await update.message.reply_text(response, parse_mode='Markdown')
except ValueError:
await update.message.reply_text("❌ Invalid email number. Please provide a number.")
except subprocess.TimeoutExpired:
await update.message.reply_text("❌ Command timed out while reading email.")
except Exception as e:
logger.error(f"Error reading mail: {e}")
await update.message.reply_text(f"❌ An error occurred: {str(e)}")
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Echo the user message."""
await update.message.reply_text(f"You said: {update.message.text}")
def main():
"""Start the bot."""
if not TOKEN:
logger.error("TELEGRAM_BOT_TOKEN environment variable not set!")
return
# Create the Application
application = Application.builder().token(TOKEN).build()
# Register handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("checkmail", check_mail))
application.add_handler(CommandHandler("readmail", read_mail))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
# Start the Bot
logger.info("Bot is starting...")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()