每分鐘查詢郵件,如果有信,TG通知

学习笔记 liuruoyu 77℃ 0评论

mail to tg
1、從Telegram上,找到@BotFather,建立通知bot,並且記錄下這個bot的API Token。

2、和這個新建的bot對話一下,點一次/start,以便讓它能發來通知消息。

3、在VPS上,部署腳本,並且以pm2來保持持續啓動狀態。

4、腳本內容,爲每分鐘,以IMAP協議,登錄郵箱,一旦有未讀郵件,則標記爲已讀,並且把郵件標題和內容,在Telegram上發送通知。

腳本內容:

 


import imaplib
import email
from email.header import decode_header
import time
import telegram
from telegram.error import TelegramError
import asyncio
import re
import html

# IMAP 伺服器配置
IMAP_SERVER = "郵件服務器"
USERNAME = "郵箱賬戶名"
PASSWORD = "郵箱密碼"
# Telegram Bot 配置
BOT_TOKEN = "bot的API Token"
CHAT_ID = "通知用戶的ID"

# 初始化 Telegram Bot
bot = telegram.Bot(token=BOT_TOKEN)

async def send_telegram_message(message):
    """通過 Telegram Bot 發送消息"""
    try:
        await bot.send_message(chat_id=CHAT_ID, text=message, parse_mode='HTML')
    except TelegramError as e:
        print(f"發送 Telegram 消息失敗: {e}")

def decode_email_subject(subject):
    """解碼郵件主題"""
    decoded_subject = decode_header(subject)[0][0]
    if isinstance(decoded_subject, bytes):
        try:
            return decoded_subject.decode()
        except:
            return decoded_subject.decode('utf-8', errors='ignore')
    return decoded_subject

def get_email_content(msg):
    """提取郵件內容和附件信息"""
    text_content = ""
    attachments = []

    if msg.is_multipart():
        for part in msg.walk():
            content_type = part.get_content_type()
            # 提取純文本內容
            if content_type == 'text/plain':
                try:
                    text_content = part.get_payload(decode=True).decode('utf-8', errors='ignore')
                except:
                    text_content = part.get_payload(decode=True).decode('latin1', errors='ignore')
            # 檢查附件
            if part.get('Content-Disposition') is not None:
                filename = part.get_filename()
                if filename:
                    decoded_filename = decode_header(filename)[0][0]
                    if isinstance(decoded_filename, bytes):
                        try:
                            decoded_filename = decoded_filename.decode()
                        except:
                            decoded_filename = decoded_filename.decode('utf-8', errors='ignore')
                    attachments.append(decoded_filename)
    else:
        try:
            text_content = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
        except:
            text_content = msg.get_payload(decode=True).decode('latin1', errors='ignore')

    # 限制文本內容長度
    if len(text_content) > 1000:
        text_content = text_content[:1000] + "... [內容已截斷]"

    # 添加附件信息
    if attachments:
        attachment_info = "\n<b>附件:</b> " + ", ".join(attachments)
    else:
        attachment_info = ""

    return text_content + attachment_info

async def check_email():
    """檢查新郵件並發送通知"""
    try:
        # 連接到 IMAP 伺服器
        mail = imaplib.IMAP4_SSL(IMAP_SERVER)
        mail.login(USERNAME, PASSWORD)
        mail.select("INBOX")

        # 搜索未讀郵件
        _, message_numbers = mail.search(None, 'UNSEEN')
        
        if not message_numbers[0]:
            print("沒有新郵件")
            mail.logout()
            return

        for num in message_numbers[0].split():
            # 獲取郵件
            _, msg_data = mail.fetch(num, "(RFC822)")
            email_body = msg_data[0][1]
            msg = email.message_from_bytes(email_body)

            # 獲取郵件主題、發件人和收件人
            subject = decode_email_subject(msg.get("Subject", "無主題"))
            from_ = msg.get("From", "未知發件人")
            to_ = msg.get("To", "未知收件人")
            
            # 清理並轉義發件人和收件人格式
            from_clean = html.escape(re.sub(r'[\n\r]+', ' ', from_))
            to_clean = html.escape(re.sub(r'[\n\r]+', ' ', to_))
            
            # 獲取郵件內容和附件信息
            content = get_email_content(msg)

            # 格式化消息,包含收件人信息
            message = (
                f"<b>新郵件通知</b>\n"
                f"<b>主題:</b> {html.escape(subject)}\n"
                f"<b>發件人:</b> {from_clean}\n"
                f"<b>收件人:</b> {to_clean}\n"
                f"<b>內容:</b>\n{content}"
            )

            # 發送 Telegram 通知
            await send_telegram_message(message)
            print(f"已發送通知: {subject}")

            # 將郵件標記為已讀
            mail.store(num, '+FLAGS', '\\Seen')

        mail.logout()
    except Exception as e:
        print(f"檢查郵件時出錯: {e}")

async def main():
    """主循環,每分鐘檢查一次郵件"""
    while True:
        print("正在檢查新郵件...")
        await check_email()
        await asyncio.sleep(60)  # 等待 60 秒

if __name__ == "__main__":
asyncio.run(main())

转载请注明:刘太监的私藏 » 每分鐘查詢郵件,如果有信,TG通知

喜欢 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址