# 1. 获取iframe里的m3u8地址
# 2.下载第一个m3u8,从第一个m3u8地址里获取第二个m3u8
# 3.下载视频
# 4. 解密
# 5. 合并视频
import asyncio
from base64 import b64encode
import aiofiles
import aiohttp
import requests
from bs4 import BeautifulSoup
import httpx
from Crypto.Cipher import AES
import os
def get_url_m3u8_first(url):
url_first_req = requests.get(url)
url_first_bs = BeautifulSoup(url_first_req.text, 'html.parser')
url_first_req.close()
return url_first_bs.find('iframe').get('data-play')
def get_m3u8_ts(url, layer, collection):
m3u8_first_ts_req = requests.get(url)
with open(f'../备份文件夹/协程_综合训练2/{layer}/第{collection}集.txt', mode='wb') as f:
f.write(m3u8_first_ts_req.content)
async def down_m3u8_second(url, name, session, num):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/86.0.4240.198 Safari/537.36',
'Host': 'v7.dious.cc',
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Origin': 'http://www.klz-toy.com',
'Cache-Control': 'no-cache',
'Referer': 'http://www.klz-toy.com/static/player/dp.html?url=https://v7.dious.cc/20220801/l4CNv2DR/index.m3u8'
}
try:
async with session.get(url, headers=headers) as down_m3u8_req:
async with aiofiles.open(f'../备份文件夹/协程_综合训练2/video/{num}_{name}', mode='wb') as f:
await f.write(await down_m3u8_req.content.read())
print(f'{num}._{name}下载完毕!')
except Exception as e:
print(e, '=====================================================')
await asyncio.sleep(5)
async def m3u8_second_read(main_url, layer):
tasks = []
async with aiohttp.ClientSession() as session:
async with aiofiles.open(f'../备份文件夹/协程_综合训练2/第二层/第{layer}集.txt', mode='r', encoding='utf-8') as f:
n = 1
async for line in f:
if str(line).startswith('#'):
continue
else:
line = str(line).strip()
url_m3u8_second = main_url + line
names = line.split('/')[-1]
tasks.append(asyncio.create_task(down_m3u8_second(url_m3u8_second, names, session, n)))
n += 1
await asyncio.wait(tasks)
def get_key(url):
req = requests.get(url)
return req.text
async def aio_key(key, num):
tasks = []
async with aiofiles.open(f'../备份文件夹/协程_综合训练2/第二层/第{num}集.txt', mode='r', encoding='utf-8') as m3u8:
nu = 1
async for i in m3u8:
if str(i).startswith('#'):
continue
i_ts = str(nu) + '_' + str(i).strip().split('/')[-1]
nu += 1
tasks.append(asyncio.create_task(dec_ts(i_ts, key)))
await asyncio.wait(tasks)
async def dec_ts(ts_name, key):
try:
asc = AES.new(key=key.encode('utf-8'), IV=b"0000000000000000", mode=AES.MODE_CBC)
async with aiofiles.open(f'../备份文件夹/协程_综合训练2/video/{ts_name}', mode='rb') as f, \
aiofiles.open(f'../备份文件夹/协程_综合训练2/video2/temp_{ts_name}', mode='wb') as f2:
bs = await f.read()
await f2.write(asc.decrypt(bs))
print(f'{ts_name},完毕!!!')
except FileNotFoundError:
print('===================================解密完成')
def ts_merge(num):
lst = []
with open(f'../备份文件夹/协程_综合训练2/第二层/第{num}集.txt', mode='r', encoding='utf-8') as f:
n = 1
for i in f:
if str(i).startswith('#'):
continue
line = 'temp_'+str(n) + '_' + str(i).strip().split('/')[-1]
n += 1
lst.append(line)
ts = '+'.join(lst)
os.system(f"copy /b {ts} ss.mp4")
def main(url, collection):
# 第一层m3u8的url
url_m3u8_first = get_url_m3u8_first(url)
# 下载第一层的m3u8文件
get_m3u8_ts(url_m3u8_first, '第一层', collection)
# 获取第二层地址>>>>https://v7.dious.cc/20220801/l4CNv2DR/1500kb/hls/index.m3u8
# with open(f'../备份文件夹/协程_综合训练2/第一层/第{collection}集.txt', mode='r', encoding='utf-8') as f:
# url_m3u8_second = url_m3u8_first.split('/20220801')[0] + f.readlines()[-1].strip()
# 下载第二层的m3u8文件
# get_m3u8_ts(url_m3u8_second, '第二层', collection)
# 读取第二层m3u8文件,并且拼接域名
# asyncio.run(m3u8_second_read(url_m3u8_first.split('/20220801')[0], collection))
# 获取key地址
# with open(f'../备份文件夹/协程_综合训练2/第二层/第{collection}集.txt', mode='r', encoding='utf-8') as key_read:
# for i in key_read:
# if i[0:10] == '#EXT-X-KEY':
# key_url = url_m3u8_first.split('/20220801')[0] + i.strip().split('"')[1]
# key_value = get_key(key_url, )
# asyncio.run(aio_key(key_value, collection))
ts_merge(collection)
if __name__ == '__main__':
coll = 1
url_first = f'http://www.klz-toy.com/vodplay/yunhebianderenmen/2-{coll}.html'
main(url_first, coll)
报错信息
def ts_merge(num):
lst = []
with open(f'../备份文件夹/协程_综合训练2/第二层/第{num}集.txt', mode='r', encoding='utf-8') as f:
n = 1
for i in f:
if str(i).startswith('#'):
continue
line = 'temp_'+str(n) + '_' + str(i).strip().split('/')[-1]
n += 1
lst.append(line)
ts = '+'.join(lst)
os.system(f"copy /b {ts} ss.mp4")