TG-forward-videos

import asyncio

import os

import random

import json

from datetime import datetime, timezone

from telethon import TelegramClient, events

from telethon.tl.types import (

    MessageMediaDocument,

    DocumentAttributeFilename,

    DocumentAttributeVideo

)

from telethon.errors import FloodWaitError

from dotenv import load_dotenv

# ==================== 配置读取 ====================

load_dotenv()

api_id = int(os.getenv(“API_ID”))

api_hash = os.getenv(“API_HASH”)

PHONE_NUMBER = os.getenv(“PHONE_NUMBER”)

TWO_STEP_PASSWORD = os.getenv(“TWO_STEP_PASSWORD”)

SOURCE_CHANNELS = [ch.strip() for ch in os.getenv(“SOURCE_CHANNELS”, “”).split(“,”) if ch.strip()]

TARGET_CHANNEL = os.getenv(“TARGET_CHANNEL”)

MIN_FILE_SIZE = int(os.getenv(“MIN_FILE_SIZE”, 200 * 1024 * 1024))

MAX_FORWARD_COUNT = int(os.getenv(“MAX_FORWARD_COUNT”, 0))

SCAN_LIMIT = int(os.getenv(“SCAN_LIMIT”, 50))   # 0 = 全量历史

MIN_DURATION = int(os.getenv(“MIN_DURATION”, 0))

MAX_DURATION = int(os.getenv(“MAX_DURATION”, 0))

MAX_CAPTION_LENGTH = int(os.getenv(“MAX_CAPTION_LENGTH”, 1024))

START_DATE = os.getenv(“START_DATE”)

END_DATE = os.getenv(“END_DATE”)

start_date = datetime.strptime(START_DATE, “%Y-%m-%d”).replace(tzinfo=timezone.utc) if START_DATE else None

end_date = datetime.strptime(END_DATE, “%Y-%m-%d”).replace(tzinfo=timezone.utc) if END_DATE else None

client = TelegramClient(“user_session”, api_id, api_hash)

VIDEO_KEYS_FILE = “video_keys.json”

PROGRESS_FILE = “channel_progress.json”

# ==================== JSON 持久化 ====================

def load_json(path, default):

    if os.path.exists(path):

        try:

            with open(path, “r”, encoding=”utf-8″) as f:

                return json.load(f)

        except Exception:

            return default

    return default

def save_json(path, data):

    try:

        with open(path, “w”, encoding=”utf-8″) as f:

            json.dump(data, f, ensure_ascii=False, indent=2)

    except Exception as e:

        print(f”❌ 保存 {path} 失败: {e}”)

existing_video_keys = load_json(VIDEO_KEYS_FILE, {})

channel_progress = load_json(PROGRESS_FILE, {})

# ==================== 工具函数 ====================

def get_duration_from_doc(doc):

    for attr in doc.attributes:

        if isinstance(attr, DocumentAttributeVideo):

            return attr.duration

    return 0

def get_video_key(message):

    doc = message.media.document

    filename = next(

        (a.file_name for a in doc.attributes if isinstance(a, DocumentAttributeFilename)),

        “video.mp4”

    )

    return f”{filename}_{doc.size}_{doc.id}”

def is_video_eligible(message):

    if not message.media or not isinstance(message.media, MessageMediaDocument):

        return False

    doc = message.media.document

    if not doc.mime_type or not doc.mime_type.startswith(“video”):

        return False

    if doc.size < MIN_FILE_SIZE:

        return False

    duration = get_duration_from_doc(doc)

    if MIN_DURATION > 0 and duration < MIN_DURATION:

        return False

    if MAX_DURATION > 0 and duration > MAX_DURATION:

        return False

    if start_date and message.date < start_date:

        return False

    if end_date and message.date > end_date:

        return False

    return True

# ==================== 转发函数 ====================

async def forward_message(message, video_key):

    caption = message.message or “”

    if len(caption) > MAX_CAPTION_LENGTH:

        caption = caption[:MAX_CAPTION_LENGTH] + “…”

    try:

        await client.send_file(

            TARGET_CHANNEL,

            file=message.media,

            caption=caption,

            silent=True

        )

        print(f”✅ 转发成功 message_id={message.id}”)

        existing_video_keys[video_key] = message.id

        save_json(VIDEO_KEYS_FILE, existing_video_keys)

        await asyncio.sleep(random.uniform(1, 4))

    except FloodWaitError as e:

        print(f”⏳ FloodWait {e.seconds}s”)

        await asyncio.sleep(e.seconds)

        await forward_message(message, video_key)

# ==================== 历史批量转发 ====================

async def batch_forward_latest_videos():

    for source_channel in SOURCE_CHANNELS:

        source = await client.get_entity(source_channel)

        channel_id = str(source.id)

        last_id = channel_progress.get(channel_id, 0)

        limit = SCAN_LIMIT if SCAN_LIMIT > 0 else None

        mode = “全量历史” if limit is None else f”最近 {limit} 条”

        print(f”\n📦 扫描 {source_channel}({mode}),last_message_id={last_id}”)

        count = 0

        async for message in client.iter_messages(

            source,

            min_id=last_id,

            limit=limit,

            reverse=True    # 🔑 旧 → 新

        ):

            if not is_video_eligible(message):

                continue

            video_key = get_video_key(message)

            if video_key in existing_video_keys:

                continue

            try:

                await forward_message(message, video_key)

                # ✅ 成功后推进断点

                channel_progress[channel_id] = message.id

                save_json(PROGRESS_FILE, channel_progress)

                count += 1

                if MAX_FORWARD_COUNT > 0 and count >= MAX_FORWARD_COUNT:

                    print(f”⏹️ 达到最大转发数量 {MAX_FORWARD_COUNT}”)

                    break

            except Exception as e:

                print(f”❌ message_id={message.id} 失败,中断本频道: {e}”)

                break

        print(f”📁 {source_channel} 本轮完成,转发 {count} 个视频”)

# ==================== 实时监听 ====================

@client.on(events.NewMessage(chats=SOURCE_CHANNELS))

async def handler(event):

    source = await event.get_chat()

    channel_id = str(source.id)

    message = event.message

    if not is_video_eligible(message):

        return

    video_key = get_video_key(message)

    if video_key in existing_video_keys:

        return

    await forward_message(message, video_key)

    # ✅ 实时消息同样推进断点

    channel_progress[channel_id] = message.id

    save_json(PROGRESS_FILE, channel_progress)

# ==================== 主程序 ====================

async def main():

    await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)

    print(“✅ Telegram 登录成功”)

    await batch_forward_latest_videos()

    print(“⏳ 开始实时监听新视频…”)

    await client.run_until_disconnected()

if __name__ == “__main__”:

    with client:

        client.loop.run_until_complete(main())

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注