import os import logging import subprocess import email import tempfile import html2text import asyncio from email import policy from email.parser import BytesParser from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes from telegram.error import TelegramError # Enable logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Bot token from environment variable TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') # Allowed user IDs (add your Telegram user ID here for security) ALLOWED_USERS = os.getenv('ALLOWED_USER_IDS', '').split(',') # Notification settings CHECK_INTERVAL = int(os.getenv('EMAIL_CHECK_INTERVAL', '60')) # Check every 60 seconds by default NOTIFY_USER_ID = os.getenv('NOTIFY_USER_ID', '') # User ID to send notifications to # Store last known email count last_email_count = 0 def check_authorization(update: Update) -> bool: """Check if user is authorized to use admin commands.""" if not ALLOWED_USERS or ALLOWED_USERS == ['']: # If no users configured, allow all (not recommended for production) return True user_id = str(update.effective_user.id) return user_id in ALLOWED_USERS async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): """Send a message when the command /start is issued.""" keyboard = [ [InlineKeyboardButton("šŸ“¬ Check Mail", callback_data="checkmail")], [InlineKeyboardButton("ā“ Help", callback_data="help")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( 'Hi! I am Fymious Bot šŸ¤–\n\n' 'Use the buttons below to check your emails!', reply_markup=reply_markup ) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Send a message when the command /help is issued.""" query = update.callback_query if query: await query.answer() message = query.message else: message = update.message keyboard = [[InlineKeyboardButton("šŸ“¬ Check Mail", callback_data="checkmail")]] reply_markup = InlineKeyboardMarkup(keyboard) help_text = ( 'Available commands:\n' '/start - Start the bot\n' '/help - Show this help message\n' '/checkmail - List new emails in your mailbox\n' '/readmail - Read a specific email\n\n' 'Or just use the buttons below!' ) if query: await query.edit_message_text(help_text, reply_markup=reply_markup) else: await message.reply_text(help_text, reply_markup=reply_markup) async def check_mail(update: Update, context: ContextTypes.DEFAULT_TYPE): """Check for new emails in the mailserver.""" query = update.callback_query if query: await query.answer() message = query.message user = query.from_user else: message = update.message user = update.effective_user # Check authorization if not check_authorization(update): await message.reply_text("āŒ You are not authorized to use this command.") logger.warning(f"Unauthorized access attempt by user {user.id}") return try: # Execute docker command to list new emails result = subprocess.run( ['docker', 'exec', 'mailserver', 'ls', '/var/mail/fymio.us/me/new/'], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: await message.reply_text( f"āŒ Error checking mail:\n`{result.stderr}`", parse_mode='Markdown' ) return # Parse the output emails = result.stdout.strip().split('\n') emails = [e.strip().strip("'") for e in emails if e.strip()] if not emails or emails == ['']: keyboard = [[InlineKeyboardButton("šŸ”„ Refresh", callback_data="checkmail")]] reply_markup = InlineKeyboardMarkup(keyboard) await message.reply_text("šŸ“­ No new emails!", reply_markup=reply_markup) context.user_data['emails'] = [] return # Store email list in user context context.user_data['emails'] = emails # Format the response response = f"šŸ“¬ You have {len(emails)} new email(s):\n\n" for i, email_file in enumerate(emails, 1): parts = email_file.split(',') size_part = parts[1] if len(parts) > 1 else "" if size_part.startswith('S='): try: size_kb = int(size_part[2:]) / 1024 size_str = f"{size_kb:.1f} KB" except ValueError: size_str = "?" else: size_str = "?" response += f"{i}. Email ({size_str})\n" # Create inline keyboard with buttons for each email keyboard = [] # Add buttons in rows of 2 for i in range(1, len(emails) + 1, 2): row = [InlineKeyboardButton(f"šŸ“§ #{i}", callback_data=f"read_{i}")] if i + 1 <= len(emails): row.append(InlineKeyboardButton(f"šŸ“§ #{i+1}", callback_data=f"read_{i+1}")) keyboard.append(row) # Add refresh button keyboard.append([InlineKeyboardButton("šŸ”„ Refresh", callback_data="checkmail")]) reply_markup = InlineKeyboardMarkup(keyboard) if query: await query.edit_message_text(response, reply_markup=reply_markup) else: await message.reply_text(response, reply_markup=reply_markup) except subprocess.TimeoutExpired: await message.reply_text("āŒ Command timed out. Mail server might be slow.") except FileNotFoundError: await message.reply_text("āŒ Docker command not found. Is Docker installed?") except Exception as e: logger.error(f"Error checking mail: {e}") await message.reply_text(f"āŒ An error occurred: {str(e)}") async def read_mail_handler(update: Update, context: ContextTypes.DEFAULT_TYPE, email_num: int = None): """Read a specific email by number.""" query = update.callback_query if query: await query.answer() message = query.message user = query.from_user else: message = update.message user = update.effective_user # Check authorization if not check_authorization(update): await message.reply_text("āŒ You are not authorized to use this command.") logger.warning(f"Unauthorized access attempt by user {user.id}") return # Check if user has checked mail first if 'emails' not in context.user_data or not context.user_data['emails']: await message.reply_text("šŸ“­ Please use /checkmail first to see available emails.") return # Get email number from callback or command args if email_num is None: if not context.args: await message.reply_text("āŒ Please specify an email number. Usage: /readmail 1") return try: email_num = int(context.args[0]) except ValueError: await message.reply_text("āŒ Invalid email number. Please provide a number.") return try: emails = context.user_data['emails'] if email_num < 1 or email_num > len(emails): await message.reply_text(f"āŒ Invalid email number. Please choose between 1 and {len(emails)}") return # Get the email filename (array is 0-indexed) email_filename = emails[email_num - 1] email_path_new = f"/var/mail/fymio.us/me/new/{email_filename}" # Read email content using docker exec result = subprocess.run( ['docker', 'exec', 'mailserver', 'cat', email_path_new], capture_output=True, timeout=10 ) if result.returncode != 0: await message.reply_text(f"āŒ Error reading email:\n`{result.stderr.decode()}`", parse_mode='Markdown') return # Parse email msg = BytesParser(policy=policy.default).parsebytes(result.stdout) # Extract email details subject = msg.get('Subject', 'No Subject') from_addr = msg.get('From', 'Unknown') to_addr = msg.get('To', 'Unknown') date = msg.get('Date', 'Unknown') # Get email body - handle both plain text and HTML body = "" html_body = "" if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() if content_type == "text/plain" and not body: try: body = part.get_content() except: pass elif content_type == "text/html" and not html_body: try: html_body = part.get_content() except: pass else: try: content_type = msg.get_content_type() if content_type == "text/html": html_body = msg.get_content() else: body = msg.get_content() except: body = "Could not extract email body" # If we have HTML but no plain text, convert HTML to text if html_body and not body: h = html2text.HTML2Text() h.ignore_links = False h.ignore_images = True h.ignore_emphasis = False body = h.handle(html_body) # Move email from new/ to cur/ (mark as read) # In Maildir format, files in cur/ should have :2, suffix with flags email_filename_cur = email_filename.replace(',S=', ':2,S') # Add :2, flag if ':2,' not in email_filename_cur: # If filename doesn't have standard format, just add :2, email_filename_cur = email_filename + ':2,' email_path_cur = f"/var/mail/fymio.us/me/cur/{email_filename_cur}" # Move the file move_result = subprocess.run( ['docker', 'exec', 'mailserver', 'mv', email_path_new, email_path_cur], capture_output=True, timeout=5 ) if move_result.returncode == 0: logger.info(f"Moved email from new/ to cur/: {email_filename}") # Remove this email from the context so it won't show up in the list anymore context.user_data['emails'].pop(email_num - 1) else: logger.warning(f"Failed to move email to cur/: {move_result.stderr.decode()}") # Truncate body if too long (Telegram has message length limits) max_body_length = 3000 if len(body) > max_body_length: body = body[:max_body_length] + "\n\n... (truncated)" # Format response response = f"šŸ“§ *Email #{email_num}*\n\n" response += f"*From:* {from_addr}\n" response += f"*To:* {to_addr}\n" response += f"*Date:* {date}\n" response += f"*Subject:* {subject}\n" response += f"\n━━━━━━━━━━━━━━━━━━\n\n" response += body # Add back button keyboard = [[InlineKeyboardButton("ā¬…ļø Back to List", callback_data="checkmail")]] reply_markup = InlineKeyboardMarkup(keyboard) if query: await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup) else: await message.reply_text(response, parse_mode='Markdown', reply_markup=reply_markup) except subprocess.TimeoutExpired: await message.reply_text("āŒ Command timed out while reading email.") except Exception as e: logger.error(f"Error reading mail: {e}", exc_info=True) await message.reply_text(f"āŒ An error occurred: {str(e)}") async def read_mail(update: Update, context: ContextTypes.DEFAULT_TYPE): """Read mail command handler.""" await read_mail_handler(update, context) async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle button callbacks.""" query = update.callback_query if query.data == "checkmail": await check_mail(update, context) elif query.data == "help": await help_command(update, context) elif query.data.startswith("read_"): email_num = int(query.data.split("_")[1]) await read_mail_handler(update, context, email_num) else: await query.answer("Unknown action") async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE): """Echo the user message.""" await update.message.reply_text(f"You said: {update.message.text}") async def check_new_emails(context: ContextTypes.DEFAULT_TYPE): """Background task to check for new emails and send notifications.""" global last_email_count if not NOTIFY_USER_ID or NOTIFY_USER_ID == '': return try: # Execute docker command to list new emails result = subprocess.run( ['docker', 'exec', 'mailserver', 'ls', '/var/mail/fymio.us/me/new/'], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: logger.warning(f"Error checking mail in background: {result.stderr}") return # Parse the output emails = result.stdout.strip().split('\n') emails = [e.strip().strip("'") for e in emails if e.strip()] if not emails or emails == ['']: current_count = 0 else: current_count = len(emails) # Check if there are new emails if current_count > last_email_count: new_email_count = current_count - last_email_count # Send notification message = f"šŸ“¬ You have {new_email_count} new email(s)!\n\n" message += f"Total unread: {current_count}" keyboard = [[InlineKeyboardButton("šŸ“¬ Check Mail", callback_data="checkmail")]] reply_markup = InlineKeyboardMarkup(keyboard) try: await context.bot.send_message( chat_id=NOTIFY_USER_ID, text=message, reply_markup=reply_markup ) logger.info(f"Sent notification for {new_email_count} new email(s)") except TelegramError as e: logger.error(f"Failed to send notification: {e}") # Update last known count last_email_count = current_count except subprocess.TimeoutExpired: logger.warning("Background email check timed out") except Exception as e: logger.error(f"Error in background email check: {e}") def main(): """Start the bot.""" if not TOKEN: logger.error("TELEGRAM_BOT_TOKEN environment variable not set!") return # Create the Application application = Application.builder().token(TOKEN).build() # Register handlers application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("checkmail", check_mail)) application.add_handler(CommandHandler("readmail", read_mail)) application.add_handler(CallbackQueryHandler(button_callback)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo)) # Set up background job to check for new emails if NOTIFY_USER_ID and NOTIFY_USER_ID != '': job_queue = application.job_queue job_queue.run_repeating( check_new_emails, interval=CHECK_INTERVAL, first=10 # Start checking after 10 seconds ) logger.info(f"Email notification service started. Checking every {CHECK_INTERVAL} seconds.") logger.info(f"Will send notifications to user ID: {NOTIFY_USER_ID}") else: logger.info("Email notifications disabled. Set NOTIFY_USER_ID to enable.") # Start the Bot logger.info("Bot is starting...") application.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == '__main__': main()