爬取思路与小结
- 在查找过程中,查看源代码,bv号可以转化为av号,ss号可以转化为ep号
即可以相互转换,如图,图中一个视频就有ep号,av,bv,cv号,代码中利用了bv号可以转化为av号,ss号可以转化为ep号
- 只能对网页里已有的链接进行爬取,无法爬取大会员视频。
- 打包Python
- pip install pyinstaller
- cd 到bilbili_down.py文件所在位置
- 在cmd终端直接使用 pyinstaller bilbili_down.py
- 这是我已经打包好的:感兴趣的老铁可以试一下功能(第一次打包不小心把我自己的快捷方式打包里面去了😅,老铁们要打开真正的exe文件啊,不然可能无法保存视频😯):https://nmydt.lanzous.com/iMkpUlufosd
代码
import json,requests,os,re,shutil,ssl,time
from concurrent.futures import ThreadPoolExecutor
from lxml import etree
## 设置请求头等参数,防止被反爬
headers = {
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36'
}
params = {
'from': 'search',
'seid': '9698329271136034665'
}
def re_video_info(text, pattern):
'''利用正则表达式匹配出视频信息并转化成json'''
match = re.search(pattern, text)
return json.loads(match.group(1))
def create_folder(aid):
'''创建文件夹'''
if not os.path.exists(aid):
os.mkdir(aid)
def remove_move_file(aid):
'''删除和移动文件'''
file_list = os.listdir('./')
for file in file_list:
## 移除临时文件
if file.endswith('_video.mp4'):
os.remove(file)
pass
elif file.endswith('_audio.mp4'):
os.remove(file)
pass
## 保存最终的视频文件
elif file.endswith('.mp4'):
if os.path.exists(aid + '/' + file):
os.remove(aid + '/' + file)
shutil.move(file, aid)
def BV_move_av(url):
r=requests.get(url)
html = etree.HTML(r.text)
av_url = html.xpath('/html/head/meta[@itemprop="url"]/@content')[0]
aid = re.search('\d+',av_url).group(0)
return aid
def ss_move_ep(url):
r=requests.get(url)
url = "https://www.bilibili.com/bangumi/play/ep"+str(json.loads(re.search('"epList\":(.*?),\"epI',r.text).group(1))[0]['id'])
return url
def download_video_batch(referer_url, video_url, audio_url, video_name, index):
'''批量下载系列视频'''
## 更新请求头
headers.update({"Referer": referer_url})
## 获取文件名
short_name = video_name.split('/')[2]
print("%d.\t视频下载开始:%s" % (index, short_name))
## 下载并保存视频
video_content = requests.get(video_url, headers=headers)
print('%d.\t%s\t视频大小:' % (index, short_name),
round(int(video_content.headers.get('content-length', 0)) / 1024 / 1024, 2), '\tMB')
received_video = 0
with open('%s_video.mp4' % video_name, 'ab') as output:
headers['Range'] = 'bytes=' + str(received_video) + '-'
response = requests.get(video_url, headers=headers)
output.write(response.content)
## 下载并保存音频
audio_content = requests.get(audio_url, headers=headers)
print('%d.\t%s\t音频大小:' % (index, short_name),
round(int(audio_content.headers.get('content-length', 0)) / 1024 / 1024, 2), '\tMB')
received_audio = 0
with open('%s_audio.mp4' % video_name, 'ab') as output:
headers['Range'] = 'bytes=' + str(received_audio) + '-'
response = requests.get(audio_url, headers=headers)
output.write(response.content)
received_audio += len(response.content)
return video_name, index
def download_video_single(referer_url, video_url, audio_url, video_name):
'''单个视频下载'''
## 更新请求头
headers.update({"Referer": referer_url})
print("视频下载开始:%s" % video_name)
## 下载并保存视频
video_content = requests.get(video_url, headers=headers)
print('%s\t视频大小:' % video_name, round(int(video_content.headers.get('content-length', 0)) / 1024 / 1024, 2), '\tMB')
received_video = 0
with open('%s_video.mp4' % video_name, 'ab') as output:
headers['Range'] = 'bytes=' + str(received_video) + '-'
response = requests.get(video_url, headers=headers)
output.write(response.content)
## 下载并保存音频
audio_content = requests.get(audio_url, headers=headers)
print('%s\t音频大小:' % video_name, round(int(audio_content.headers.get('content-length', 0)) / 1024 / 1024, 2), '\tMB')
received_audio = 0
with open('%s_audio.mp4' % video_name, 'ab') as output:
headers['Range'] = 'bytes=' + str(received_audio) + '-'
response = requests.get(audio_url, headers=headers)
output.write(response.content)
received_audio += len(response.content)
print("视频下载结束:%s" % video_name)
video_audio_merge_single(video_name)
def video_audio_merge_batch(result):
'''使用ffmpeg批量视频音频合并'''
video_name = result.result()[0]
index = result.result()[1]
import subprocess
video_final = video_name.replace('video', 'video_final')
command = 'ffmpeg -i "%s_video.mp4" -i "%s_audio.mp4" -c copy "%s.mp4" -y -loglevel quiet' % (
video_name, video_name, video_final)
subprocess.Popen(command, shell=True)
print("%d.\t视频下载结束:%s" % (index, video_name.split('/')[2]))
def video_audio_merge_single(video_name):
'''使用ffmpeg单个视频音频合并'''
print("视频合成开始:%s" % video_name)
import subprocess
command = 'ffmpeg -i "%s_video.mp4" -i "%s_audio.mp4" -c copy "%s.mp4" -y -loglevel quiet' % (
video_name, video_name, video_name)
subprocess.Popen(command, shell=True)
print("视频合成结束:%s" % video_name)
def batch_download():
'''使用多线程批量下载视频'''
## 提示输入需要下载的系列视频对应的id
aid = input(
"请输入要下载的视频id(举例:链接https://www.bilibili.com/video/BV1Ke411W71L?p=1中id为1Ke411W71L\nhttps://www.bilibili.com/video/av91748877?p=1中id为91748877,默认为91748877)")
if aid:
if re.search('\D',aid):
aid = BV_move_av('https://www.bilibili.com/video/BV'+aid)
else:
aid = '91748877'
## 提示选择清晰度
quality = input('请选择清晰度(1代表高清,2代表清晰,3代表流畅),默认高清\t')
if quality == '2':
pass
elif quality == '3':
pass
else:
quality = '1'
acc_quality = int(quality) - 1
## ssl模块,处理https请求失败问题,生成证书上下文
ssl._create_default_https_context = ssl._create_unverified_context
## 获取视频主题
url = 'https://www.bilibili.com/video/av{}?p=1'.format(aid)
html = etree.HTML(requests.get(url, params=params, headers=headers).text)
title = html.xpath('//*[@id="viewbox_report"]/h1/span/text()')[0]
print('您即将下载的视频系列是:', title)
## 创建临时文件夹
create_folder('video')
create_folder('video_final')
## 定义一个线程池,大小为3
pool = ThreadPoolExecutor(3)
## 通过api获取视频信息
res_json = requests.get('https://api.bilibili.com/x/player/pagelist?aid={}'.format(aid)).json()
video_name_list = res_json['data']
print('共下载视频{}个'.format(len(video_name_list)))
for i, video_content in enumerate(video_name_list):
video_name = ('./video/' + video_content['part']).replace(" ", "-")
origin_video_url = 'https://www.bilibili.com/video/av{}'.format(aid) + '?p=%d' % (i + 1)
## 请求视频,获取信息
res = requests.get(origin_video_url, headers=headers)
## 解析出视频详情的json
video_info_temp = re_video_info(res.text, '__playinfo__=(.*?)</script><script>')
video_info = {}
## 获取视频品质
quality = video_info_temp['data']['accept_description'][acc_quality]
## 获取视频时长
video_info['duration'] = video_info_temp['data']['dash']['duration']
## 获取视频链接
video_url = video_info_temp['data']['dash']['video'][acc_quality]['baseUrl']
## 获取音频链接
audio_url = video_info_temp['data']['dash']['audio'][acc_quality]['baseUrl']
## 计算视频时长
video_time = int(video_info.get('duration', 0))
video_minute = video_time // 60
video_second = video_time % 60
print('{}.\t当前视频清晰度为{},时长{}分{}秒'.format(i + 1, quality, video_minute, video_second))
## 将任务加入线程池,并在任务完成后回调完成视频音频合并
pool.submit(download_video_batch, origin_video_url, video_url, audio_url, video_name, i + 1).add_done_callback(
video_audio_merge_batch)
pool.shutdown(wait=True)
time.sleep(5)
## 整理视频信息
if os.path.exists(title):
shutil.rmtree(title)
os.rename('video_final', title)
try:
shutil.rmtree('video')
except:
shutil.rmtree('video')
def batch_down_fanju():
'''使用多线程批量下载番剧'''
## 提示输入需要下载的系列视频对应的id
url = input('请输入下载的番剧链接:(支持ep,ss加数字类型,如https://www.bilibili.com/bangumi/play/ep21434,\nhttps://www.bilibili.com/bangumi/play/ss28763/)')
if not url:
url = 'https://www.bilibili.com/bangumi/play/ep21434'
if 'ep' not in url:
url = ss_move_ep(url)
r = requests.get(url)
r.close()
html = etree.HTML(r.text)
title = html.xpath('//*[@id="media_module"]/div/a')[0].text
data = json.loads(re.search('__playinfo__=(.*?)</script>',r.text).group(1))
catalog = json.loads(re.search('__INITIAL_STATE__=(.*?)\;\(function()',r.text).group(1))
# name = ''.join(catalog['epList'][0]['titleFormat']+' '+catalog['epList'][0]['longTitle'])
all_num = len(catalog['epList'])
urls=[]
re.search('\D+',url).group(0)
id = int(re.search('\d+',url).group(0))
url_half = re.search('\D+',url).group(0)
[urls.append(url_half+str(id+i)) for i in range(all_num)]
quality = input('请选择清晰度(1代表高清,2代表清晰,3代表流畅),默认高清\t')
if quality == '2':
pass
elif quality == '3':
pass
else:
quality = '1'
acc_quality = int(quality) - 1
## 创建临时文件夹
create_folder('video')
create_folder('video_final')
pool = ThreadPoolExecutor(3)
for i,ul in enumerate(urls):
r = requests.get(ul)
r.close()
try:
data = json.loads(re.search('__playinfo__=(.*?)</script>',r.text).group(1))
except Exception as e:
break
name = ''.join(catalog['epList'][i]['titleFormat']+' '+catalog['epList'][i]['longTitle'])
video_name = ('./video/' + name).replace(" ", "-")
duration = data['data']['dash']['duration']
quality = data['data']['support_formats'][acc_quality]['display_desc']
video_url = data['data']['dash']['video'][acc_quality]['backupUrl'][0]
audio_url = data['data']['dash']['audio'][acc_quality]['backupUrl'][0]
video_minute = duration // 60
video_second = duration % 60
print('{}.\t当前视频清晰度为{},时长{}分{}秒'.format(i + 1, quality, video_minute, video_second))
pool.submit(download_video_batch, url, video_url, audio_url, video_name, i + 1).add_done_callback(
video_audio_merge_batch)
pool.shutdown(wait=True)
time.sleep(5)
## 整理视频信息
if os.path.exists(title):
shutil.rmtree(title)
os.rename('video_final', title)
try:
shutil.rmtree('video')
except:
shutil.rmtree('video')
def multiple_download():
'''批量下载多个独立视频'''
## 提示输入所有aid
aid_str = input(
'请输入要下载的所有视频id,id之间用空格分开\n举例:有5个链接https://www.bilibili.com/video/av89592082、https://www.bilibili.com/video/av68716174、https://www.bilibili.com/video/av87216317、\nhttps://www.bilibili.com/video/av83200644和https://www.bilibili.com/video/av88252843,则输入89592082 68716174 87216317 83200644 88252843\n默认为89592082 68716174 87216317 83200644 88252843\t')
if aid_str:
pass
else:
aid_str = '89592082 68716174 87216317 83200644 88252843'
if os.path.exists(aid_str):
shutil.rmtree(aid_str)
aids = aid_str.split(' ')
## 提示选择视频质量
quality = input('请选择清晰度(1代表高清,2代表清晰,3代表流畅),默认高清\t')
if quality == '2':
pass
elif quality == '3':
pass
else:
quality = '1'
acc_quality = int(quality) - 1
## 创建文件夹
create_folder(aid_str)
## 创建线程池,执行多任务
pool = ThreadPoolExecutor(3)
for aid in aids:
## 将任务加入线程池
pool.submit(single_download, aid, acc_quality)
pool.shutdown(wait=True)
time.sleep(5)
## 删除临时文件,移动文件
remove_move_file(aid_str)
def single_download(aid, acc_quality):
'''单个视频实现下载'''
## 请求视频链接,获取信息
origin_video_url = 'https://www.bilibili.com/video/av' + aid
res = requests.get(origin_video_url, headers=headers)
html = etree.HTML(res.text)
title = html.xpath('//*[@id="viewbox_report"]/h1/span/text()')[0]
print('您当前正在下载:', title)
video_info_temp = re_video_info(res.text, '__playinfo__=(.*?)</script><script>')
video_info = {}
## 获取视频质量
quality = video_info_temp['data']['accept_description'][acc_quality]
## 获取视频时长
video_info['duration'] = video_info_temp['data']['dash']['duration']
## 获取视频链接
video_url = video_info_temp['data']['dash']['video'][acc_quality]['baseUrl']
## 获取音频链接
audio_url = video_info_temp['data']['dash']['audio'][acc_quality]['baseUrl']
## 计算视频时长
video_time = int(video_info.get('duration', 0))
video_minute = video_time // 60
video_second = video_time % 60
print('当前视频清晰度为{},时长{}分{}秒'.format(quality, video_minute, video_second))
## 调用函数下载保存视频
download_video_single(origin_video_url, video_url, audio_url, title)
def single_input():
'''单个文件下载,获取参数'''
## 获取视频aid
aid = input('请输入要下载的视频id(举例:链接https://www.bilibili.com/video/av89592082中id为89592082),默认为89592082\t')
if aid:
pass
else:
aid = '89592082'
## 提示选择视频质量
quality = input('请选择清晰度(1代表高清,2代表清晰,3代表流畅),默认高清\t')
if quality == '2':
pass
elif quality == '3':
pass
else:
quality = '1'
acc_quality = int(quality) - 1
## 调用函数进行下载
single_download(aid, acc_quality)
def main():
'''主函数,提示用户进行三种下载模式的选择'''
download_choice = input('请输入您需要下载的类型:\n1代表下载单个视频,2代表批量下载系列视频,3代表批量下载多个不同视频,默认下载单个视频\t')
print("*"*30)
## 批量下载系列视频
if download_choice == '2':
down_choic = input('请输入您需要下载的类型:\n1代表下载系列视频,2代表批量下载番剧')
print("*"*30)
if down_choic=='1':
batch_download()
else:
batch_down_fanju()
## 批量下载多个单个视频
elif download_choice == '3':
multiple_download()
## 下载单个视频
else:
single_input()
if __name__ == '__main__':
'''调用主函数'''
print("*"*14+"声明"+"*"*14)
print("本程序单个视频下载只对av加数字有效,\n批量下载视频对av,bv,ss,ep加数字有效,\n批量下载独立视频只对av加数字有效\nBy 雾进\t\t 博客地址:https://blog.csdn.net/a12355556/")
print("*"*30)
main()
本项目在实施的过程中可能参考了其他大佬的实现思路,如有侵犯他人利益,请联系更改或删除。