針對目標羣組,自動發佈內容的bot

学习笔记 liuruoyu 33℃ 0评论

針對目標羣組,自動發佈內容的bot

建立目標telegram羣組或頻道。

建立bot。

把bot加入到羣組或頻道,使它成爲管理員。

使用python腳本,對目標羣組發佈內容。

進階需求一:

讀取指定目錄所有zip文件。

逐一將其中每個zip文件,解壓縮到臨時目錄。

從臨時目錄中,按順序排列其中的圖片文件。

每9張圖片爲一組,將圖片發往目標telegram頻道或羣組。發送的文本則以圖片來源的zip文件名爲內容。

連續發送3組,總共27張圖片後,停止發送。

清除臨時目錄中的所有內容。

向目標羣組或者頻道,發送整個zip文件。

刪除發送後的zip文件,以同樣步驟,處理下一個文件。

每次發送動作完成,進行延時等待,避免bot使用過於頻繁,被telegram限權。

進階需求二:

支持OneDriver網盤的掛載,源zip文件,來自網盤,臨時文件夾在本地硬碟,以避免頻繁讀寫網盤的低效延誤。

進階需求三:

每次腳本啓動時,發送3組圖片以及壓縮包完成,和最終的全部完成,分別向需要通知的用戶們,發送進度通知。

進階需求四:

使用cron和pm2配合來定時啓動腳本。當定時的時間點到達,cron會調用一個sh腳本以檢測pm2中的相關任務是否在運行。如果在運行,則不作任何處理。如果未在運行,則一次性喚醒pm2中的相關任務。pm2中的任務腳本一經執行完畢,即行停止,不需要pm2反復喚醒它。也就可以避免因爲儲存待發送的zip文件的OneDrive網盤目錄中無內容,而反復發送“啓動”和“完成”的通知。


import os
import zipfile
import glob
import shutil
import asyncio
import time
from datetime import datetime
from telegram import Bot, InputMediaPhoto
import logging
from pathlib import Path
from telegram.error import TimedOut

# 配置 Telegram Bot
BOT_TOKEN = "BOT的API Token"
CHAT_ID = "-10000000000"  # 用於發送圖片預覽的群組
UPLOAD_CHAT_ID = "-10000000000"  # 用於上傳 ZIP 文件的群組
REPORT_USER_IDS = ["00000000", "00000000"]  # 通知用戶 ID

# 配置日誌
LOG_FILE = "/var/log/telegram_bot.log"
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(LOG_FILE, mode='w', encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

async def send_photos_in_groups(bot, chat_id, photos, caption):
    """將圖片按組發送到 Telegram 群組"""
    media_group = []
    for i, photo in enumerate(photos):
        try:
            media_group.append(InputMediaPhoto(media=open(photo, "rb"), caption=caption if i == 0 else ""))
        except OSError as e:
            logger.error(f"無法打開圖片 {Path(photo).name}:{str(e)}")
            return False
    try:
        await bot.send_media_group(chat_id=chat_id, media=media_group)
        logger.info(f"成功發送一組圖片(共 {len(photos)} 張),標題:{caption}")
        return True
    except TimedOut as e:
        logger.warning(f"發送圖片組時超時:{str(e)}")
        return False

async def send_report(bot, user_ids, message):
    """向多個用戶發送報告"""
    for user_id in user_ids:
        try:
            await bot.send_message(chat_id=user_id, text=message)
            logger.info(f"向用戶 {user_id} 發送報告:{message}")
        except Exception as e:
            logger.error(f"向用戶 {user_id} 發送報告失敗:{str(e)}")

async def upload_zip_file(bot, chat_id, zip_file, caption):
    """上傳 ZIP 文件到 Telegram 群組"""
    try:
        with open(zip_file, "rb") as file:
            await bot.send_document(chat_id=chat_id, document=file, caption=caption)
        logger.info(f"成功上傳 ZIP 文件:{Path(zip_file).name} 到群組 {chat_id}")
        return True
    except Exception as e:
        logger.error(f"上傳 ZIP 文件 {Path(zip_file).name} 失敗:{str(e)}")
        return False

async def main():
    # 初始化 Bot
    bot = Bot(token=BOT_TOKEN)
    logger.info("Telegram Bot 初始化完成")
    
    # 設置工作目錄和臨時目錄
    current_dir = Path("/od/mitalk")
    temp_dir = Path("/root/mitalk/temp")
    
    # 清空臨時目錄
    try:
        if temp_dir.exists():
            shutil.rmtree(temp_dir)
        temp_dir.mkdir(parents=True, exist_ok=True)
        logger.info(f"已清空並創建臨時目錄:{temp_dir}")
    except OSError as e:
        logger.error(f"清空或創建臨時目錄失敗:{str(e)}")
        await send_report(bot, REPORT_USER_IDS, f"錯誤:無法清空或創建臨時目錄 {temp_dir}:{str(e)}")
        return
    
    # 記錄腳本啟動時間
    start_time = time.time()
    start_time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    # 獲取 ZIP 文件列表
    try:
        zip_files = sorted(glob.glob(str(current_dir / "*.zip")))
    except OSError as e:
        logger.error(f"無法讀取 ZIP 文件列表:{str(e)}")
        await send_report(bot, REPORT_USER_IDS, f"錯誤:無法讀取 ZIP 文件列表:{str(e)}")
        return
    
    # 發送啟動報告
    zip_filenames = [Path(z).name for z in zip_files]
    startup_report = f"腳本於 {start_time_str} 啟動,待處理 ZIP 文件({len(zip_files)} 個):\n" + "\n".join(zip_filenames)
    await send_report(bot, REPORT_USER_IDS, startup_report)
    
    # 統計總圖片數量
    total_images = 0
    processed_zips = []
    
    # 處理每個 ZIP 文件
    for zip_file in zip_files:
        zip_name = Path(zip_file).name
        logger.info(f"\n開始處理 ZIP 文件:{zip_name}")
        zip_start_time = time.time()
        
        # 解壓 ZIP 文件
        try:
            with zipfile.ZipFile(zip_file, 'r') as zip_ref:
                zip_ref.extractall(temp_dir)
            logger.info(f"成功解壓 ZIP 文件:{zip_name} 到 {temp_dir}")
        except (zipfile.BadZipFile, OSError) as e:
            logger.error(f"解壓 ZIP 文件 {zip_name} 失敗:{str(e)}")
            await send_report(bot, REPORT_USER_IDS, f"錯誤:無法解壓 ZIP 文件 {zip_name}:{str(e)}")
            try:
                os.remove(zip_file)
                logger.info(f"已刪除失敗的 ZIP 文件:{zip_name}")
                shutil.rmtree(temp_dir)
                temp_dir.mkdir(parents=True, exist_ok=True)
                logger.info(f"已清空臨時目錄:{temp_dir}")
            except OSError as e:
                logger.error(f"刪除失敗的 ZIP 文件或清空臨時目錄失敗:{str(e)}")
            continue
        
        # 獲取圖片文件
        image_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp')
        try:
            images = sorted([f for f in glob.glob(str(temp_dir / "*")) if f.lower().endswith(image_extensions)])
        except OSError as e:
            logger.error(f"無法讀取圖片文件列表:{str(e)}")
            await send_report(bot, REPORT_USER_IDS, f"錯誤:無法讀取 {zip_name} 的圖片文件列表:{str(e)}")
            try:
                os.remove(zip_file)
                logger.info(f"已刪除失敗的 ZIP 文件:{zip_name}")
                shutil.rmtree(temp_dir)
                temp_dir.mkdir(parents=True, exist_ok=True)
                logger.info(f"已清空臨時目錄:{temp_dir}")
            except OSError as e:
                logger.error(f"刪除失敗的 ZIP 文件或清空臨時目錄失敗:{str(e)}")
            continue
        
        logger.info(f"解壓後找到 {len(images)} 張圖片:")
        for img in images:
            logger.info(f"- {Path(img).name}")
        
        total_images += len(images)
        group_count = 0
        
        # 按每組 9 張發送最多三組圖片
        group_size = 9
        max_groups = 3  # 修改為 3 組(共最多 27 張圖片)
        for i in range(0, min(len(images), group_size * max_groups), group_size):
            group = images[i:i + group_size]
            caption = f"#{zip_name}"
            logger.info(f"準備發送圖片組({len(group)} 張):{[Path(p).name for p in group]}")
            
            # 發送圖片組
            success = await send_photos_in_groups(bot, CHAT_ID, group, caption)
            group_count += 1
            
            # 刪除已發送的圖片
            for photo in group:
                try:
                    os.remove(photo)
                    logger.info(f"已刪除圖片:{Path(photo).name}")
                except OSError as e:
                    logger.warning(f"刪除圖片 {Path(photo).name} 失敗:{str(e)}")
            
            # 每組間暫停 30 秒
            # 若需調整等待延時,請修改此處的秒數(例如將 20 改為其他值)
            logger.info("開始等待 30 秒...")
            await asyncio.sleep(30)
            logger.info("30 秒等待結束")
        
        # 發送預覽完成報告
        preview_report = f"ZIP 文件 {zip_name} 已發送三組預覽圖片(共 {min(len(images), group_size * max_groups)} 張)"
        await send_report(bot, REPORT_USER_IDS, preview_report)
        
        # 清空臨時目錄
        try:
            shutil.rmtree(temp_dir)
            temp_dir.mkdir(parents=True, exist_ok=True)
            logger.info(f"已清空臨時目錄:{temp_dir}")
        except OSError as e:
            logger.error(f"清空臨時目錄失敗:{str(e)}")
            await send_report(bot, REPORT_USER_IDS, f"錯誤:無法清空臨時目錄 {temp_dir}:{str(e)}")
            continue
        
        # 上傳 ZIP 文件
        upload_caption = f"ZIP 文件:{zip_name}"
        success = await upload_zip_file(bot, UPLOAD_CHAT_ID, zip_file, upload_caption)
        if success:
            await send_report(bot, REPORT_USER_IDS, f"ZIP 文件 {zip_name} 已上傳到群組 {UPLOAD_CHAT_ID}")
        else:
            await send_report(bot, REPORT_USER_IDS, f"錯誤:無法上傳 ZIP 文件 {zip_name} 到群組 {UPLOAD_CHAT_ID}")
        
        # 每個 ZIP 文件處理後暫停 20 秒
        # 若需調整等待延時,請修改此處的秒數(例如將 20 改為其他值)
        logger.info("開始等待 20 秒...")
        await asyncio.sleep(20)
        logger.info("20 秒等待結束")
        
        # 刪除 ZIP 文件
        try:
            os.remove(zip_file)
            logger.info(f"已刪除 ZIP 文件:{zip_name}")
        except OSError as e:
            logger.error(f"刪除 ZIP 文件 {zip_name} 失敗:{str(e)}")
            await send_report(bot, REPORT_USER_IDS, f"錯誤:無法刪除 ZIP 文件 {zip_name}:{str(e)}")
        
        processed_zips.append(zip_name)
    
    # 發送總結報告
    total_duration = time.time() - start_time
    summary_report = (f"腳本處理完成:\n"
                      f"處理的 ZIP 文件({len(processed_zips)} 個):\n" + "\n".join(processed_zips) +
                      f"\n總圖片數:{total_images}\n總用時:{total_duration:.2f} 秒")
    # 總結報告前暫停 1 秒
    # 若需調整等待延時,請修改此處的秒數(例如將 1 改為其他值)
    await asyncio.sleep(1)
    await send_report(bot, REPORT_USER_IDS, summary_report)

if __name__ == "__main__":
    logger.info("腳本開始運行")
    try:
        asyncio.run(main())
        logger.info("腳本運行結束")
    except Exception as e:
        logger.error(f"腳本運行出錯:{str(e)}")
        asyncio.run(send_report(Bot(token=BOT_TOKEN), REPORT_USER_IDS, f"腳本運行出錯:{str(e)}"))

转载请注明:刘太监的私藏 » 針對目標羣組,自動發佈內容的bot

喜欢 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址