import os import logging import subprocess from telegram import Update from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes # Enable logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Bot token from environment variable TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') # Allowed user IDs (add your Telegram user ID here for security) ALLOWED_USERS = os.getenv('ALLOWED_USER_IDS', '').split(',') def check_authorization(update: Update) -> bool: """Check if user is authorized to use admin commands.""" if not ALLOWED_USERS or ALLOWED_USERS == ['']: # If no users configured, allow all (not recommended for production) return True user_id = str(update.effective_user.id) return user_id in ALLOWED_USERS async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): """Send a message when the command /start is issued.""" await update.message.reply_text( 'Hi! I am Fymious Bot. Send me any message and I will echo it back!\n\n' 'Available commands:\n' '/start - Start the bot\n' '/help - Show help message\n' '/checkmail - Check new emails' ) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Send a message when the command /help is issued.""" await update.message.reply_text( 'Available commands:\n' '/start - Start the bot\n' '/help - Show this help message\n' '/checkmail - Check new emails in your mailbox\n' '\nJust send me any text and I will echo it back!' ) async def check_mail(update: Update, context: ContextTypes.DEFAULT_TYPE): """Check for new emails in the mailserver.""" # Check authorization if not check_authorization(update): await update.message.reply_text("❌ You are not authorized to use this command.") logger.warning(f"Unauthorized access attempt by user {update.effective_user.id}") return try: # Execute docker command to list new emails result = subprocess.run( ['docker', 'exec', 'mailserver', 'ls', '/var/mail/fymio.us/me/new/'], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: await update.message.reply_text( f"❌ Error checking mail:\n`{result.stderr}`", parse_mode='Markdown' ) return # Parse the output emails = result.stdout.strip().split('\n') emails = [e.strip().strip("'") for e in emails if e.strip()] if not emails or emails == ['']: await update.message.reply_text("📭 No new emails!") return # Format the response response = f"📬 You have {len(emails)} new email(s):\n\n" for i, email in enumerate(emails, 1): # Parse email filename to extract info # Format: timestamp.M###P###.mail,S=size,W=size parts = email.split(',') timestamp_part = parts[0] if parts else email size_part = parts[1] if len(parts) > 1 else "" # Extract size in bytes size_bytes = "" if size_part.startswith('S='): size_bytes = size_part[2:] try: size_kb = int(size_bytes) / 1024 size_str = f"{size_kb:.1f} KB" except ValueError: size_str = size_bytes + " bytes" else: size_str = "unknown size" response += f"{i}. {timestamp_part} ({size_str})\n" await update.message.reply_text(response) except subprocess.TimeoutExpired: await update.message.reply_text("❌ Command timed out. Mail server might be slow.") except FileNotFoundError: await update.message.reply_text("❌ Docker command not found. Is Docker installed?") except Exception as e: logger.error(f"Error checking mail: {e}") await update.message.reply_text(f"❌ An error occurred: {str(e)}") async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE): """Echo the user message.""" await update.message.reply_text(f"You said: {update.message.text}") def main(): """Start the bot.""" if not TOKEN: logger.error("TELEGRAM_BOT_TOKEN environment variable not set!") return # Create the Application application = Application.builder().token(TOKEN).build() # Register handlers application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("checkmail", check_mail)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo)) # Start the Bot logger.info("Bot is starting...") application.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == '__main__': main()