多线程you-get 下载b站视频/ffmpeg提取音频/视频/直接请求下载并保存视频/批量移动文件/百度翻译api翻译
多线程you-get 下载b站视频
# coding: utf-8
import json
import threading
import os
import time
# s ={"_id_":"xxx","age":"","area":"","asrSortText":[{"endTime":12,"startTime":9,"text":"啊啊啊啊。"}],"videoTime":"","videoUrl":"","viewCnt":0}
# s=json.dumps(s)
# print(s,type(s))
# item_dict = json.loads(s)
# print(item_dict,type(item_dict))
# print(item_dict['videoUrl'])
file_list = []
video_list = []
def readfilep(file):
with open(file) as f:
for item in f:
item = item.strip('\n')
item_dict = json.loads(item)
d = dict(
name = item_dict['_id_'],
url = item_dict['url'],
title = item_dict['title'],
vedio_path = item_dict['videoPath'][0]
)
print(d)
video_list.append(d['vedio_path'])
def download1080(count,url):
print("thread "+ str(count) +" is running")
os.system("you-get --format=dash-flv "+url)
def download720(count,url):
print("thread "+ str(count) +" is running")
os.system("you-get --format=dash-flv720 "+url)
def downloaddefault(count,url):
print("thread "+ str(count) +" is running")
os.system("you-get "+url + " -o /Users/111/Desktop/youtube-en")
def showinfo(count,url):
print("thread "+ str(count) +" is running")
os.system("you-get -i "+url)
def test(arg):
print("thread "+ str(arg) +" is running")
os.system("ping www.baidu.com")
print("thread "+ str(arg) +" finish")
if __name__ == '__main__':
readfilep("/Users/xxx/xxx.txt")
print(video_list,len(video_list),len(set(video_list)))
# thread_list = []
# thread_num = 8
# i = 1
# for url in video_list :
# #为每个新URL创建下载线程
# t = threading.Thread(target=downloaddefault, args=(i,url))
# #加入线程池并启动
# thread_list.append(t)
# t.start()
# #print(thread_list[0])
# #当线程池满时,等待线程结束
# while len(thread_list)>thread_num:
# #移除已结束线程
# thread_list = [x for x in thread_list if x.is_alive()]
# time.sleep(3)
# # print("running threads_________" + str(thread_list))
# i= i+ 1
# pass
# if __name__ == "__main__":
# readfilep("/Users/111/xxx.txt")
# print(video_list)
# with open('bili_path.txt','w') as f:
# for line in video_list:
# f.write(line+'\n')
文件格式
ffmpeg提取音频
分割音频,因为需要分割视频,所以也从分割的视频里直接提前音频
import os
import subprocess
# 从视频中提取音频
def video_to_audio(video_file,audio_file):
# ffmpeg命令工具 在D:/FFmpeg/bin目录下
try:
# ffmpeg = r'D:/FFmpeg/bin/ffmpeg -i %s -vn -y -acodec copy %s' %(video_file,audio_file)
# ffmpeg = r'D:/FFmpeg/bin/ffmpeg -i D:\AI\bili_data\test.mp4 -vn -y -acodec copy D:\AI\bili_data\output.aac'
# 8000采样率
ffmpeg = r'/usr/local/bin/ffmpeg -i "%s" -f wav -vn -ar 8000 -ac 1 -y "%s"' % (video_file, audio_file)
# print(ffmpeg)
print(ffmpeg)
ffmpeg1 = "/usr/local/bin/ffmpeg -i '%s' -f wav -vn -ar 8000 -ac 1 -y '%s'" % (video_file, audio_file)
print(ffmpeg1)
os.system(ffmpeg)
# p = subprocess.Popen(ffmpeg, shell=False)
# p.wait(5)
print('提取完成')
except Exception as ex:
print('音频地址:',audio_file)
print('提取音频发生异常', ex)
# # 从视频中截取图片,每秒钟一个图片
# def cut_image_from_video(video_file,imagedir):
# try:
# # strls = os.path.split(video_file)
# # filename = strls[1].replace('.mp4','')
# #
# # imagedir = os.path.join(strls[0],filename)
# if not os.path.exists(imagedir):
# os.mkdir(imagedir)
# filename = os.path.join(imagedir, 'test_frame_%03d.jpg')
# ffmpeg = r'D:/FFmpeg/bin/ffmpeg -i %s -r 1 -q:v 2 -f image2 -y %s' % (video_file, filename)
# # print(ffmpeg)
# os.system(ffmpeg)
# # p = subprocess.Popen(ffmpeg, shell=False)
# # p.wait(5)
# print('提取完成')
# except Exception as ex:
# print('提取音频发生异常', ex)
# 截取视频片段
def cutVideo(video_file,save_name):
# file_time = ffmpeg.probe(file_name)['format']['duration'] # 视频总时长 秒
start_time='00:00:00' # 开始时间
length ='00:00:01' # 截取多长时间 2分钟
#length = '120' #秒
ffmpeg = r"/usr/local/bin/ffmpeg -y -i '%s' -ss %s -t %s -acodec copy -vcodec copy -async 1 '%s'" % (video_file, start_time,length,save_name)
# subprocess.call('D:/FFmpeg/bin/ffmpeg.exe -y -i ' + video_file + ' -ss ' + start_time + ' -t ' + length + ' -acodec copy -vcodec copy -async 1 ' + save_name)
r = subprocess.call(ffmpeg)
print(r)
def getVideoTime(path):
cmdline = "ffprobe '%s' -show_entries format=duration -of compact=p=0:nk=1 -v 0"%path
gettime=subprocess.check_output(cmdline, shell=True)
timeT=int(float(gettime.strip()))
return timeT
file_list = []
def readfilep(file):
with open(file) as f:
for item in f:
item = item.strip('\n')
file_list.append(item)
# print(item)
if __name__ == "__main__":
path = '/Users/111/Desktop/youtube-jap/'
print(len(os.listdir(path)))
for file_name in os.listdir(path):
print(file_name)
if file_name=='.DS_Store':
continue
file_url = path+file_name
base_path = path+file_name.split('.')[0]
audio_path = base_path+'.mp3'
print(audio_path)
print(file_url)
video_to_audio(file_url,audio_path)
#audio_path = base_path+'.mp3'
# videoPath = file_url
# cutTime=1
# timeT=getVideoTime(videoPath)
# firstTime=0
# index=1
# while firstTime<timeT:
# cmdLine = 'ffmpeg -ss %s -i %s -c copy -t %s %s.mp4 -loglevel quiet -y'%(firstTime,videoPath,cutTime,'%s_%s'%(base_path,index))
# print(cmdLine)
# returnCmd = subprocess.call(cmdLine, shell=True)
# firstTime+=cutTime
# index+=1
ffmpeg分割视频
import os
import subprocess
# 从视频中提取音频
def video_to_audio(video_file,audio_file):
# ffmpeg命令工具 在D:/FFmpeg/bin目录下
try:
# ffmpeg = r'D:/FFmpeg/bin/ffmpeg -i %s -vn -y -acodec copy %s' %(video_file,audio_file)
# ffmpeg = r'D:/FFmpeg/bin/ffmpeg -i D:\AI\bili_data\test.mp4 -vn -y -acodec copy D:\AI\bili_data\output.aac'
# 8000采样率
ffmpeg = r'/usr/local/bin/ffmpeg -i "%s" -f wav -vn -ar 8000 -ac 1 -y "%s"' % (video_file, audio_file)
# print(ffmpeg)
print(ffmpeg)
ffmpeg1 = "/usr/local/bin/ffmpeg -i '%s' -f wav -vn -ar 8000 -ac 1 -y '%s'" % (video_file, audio_file)
print(ffmpeg1)
os.system(ffmpeg)
# p = subprocess.Popen(ffmpeg, shell=False)
# p.wait(5)
print('提取完成')
except Exception as ex:
print('音频地址:',audio_file)
print('提取音频发生异常', ex)
# # 从视频中截取图片,每秒钟一个图片
# def cut_image_from_video(video_file,imagedir):
# try:
# # strls = os.path.split(video_file)
# # filename = strls[1].replace('.mp4','')
# #
# # imagedir = os.path.join(strls[0],filename)
# if not os.path.exists(imagedir):
# os.mkdir(imagedir)
# filename = os.path.join(imagedir, 'test_frame_%03d.jpg')
# ffmpeg = r'D:/FFmpeg/bin/ffmpeg -i %s -r 1 -q:v 2 -f image2 -y %s' % (video_file, filename)
# # print(ffmpeg)
# os.system(ffmpeg)
# # p = subprocess.Popen(ffmpeg, shell=False)
# # p.wait(5)
# print('提取完成')
# except Exception as ex:
# print('提取音频发生异常', ex)
# 截取视频片段
def cutVideo(video_file,save_name):
# file_time = ffmpeg.probe(file_name)['format']['duration'] # 视频总时长 秒
start_time='00:00:00' # 开始时间
length ='00:00:01' # 截取多长时间 2分钟
#length = '120' #秒
ffmpeg = r"/usr/local/bin/ffmpeg -y -i '%s' -ss %s -t %s -acodec copy -vcodec copy -async 1 '%s'" % (video_file, start_time,length,save_name)
# subprocess.call('D:/FFmpeg/bin/ffmpeg.exe -y -i ' + video_file + ' -ss ' + start_time + ' -t ' + length + ' -acodec copy -vcodec copy -async 1 ' + save_name)
r = subprocess.call(ffmpeg)
print(r)
def getVideoTime(path):
cmdline = "ffprobe '%s' -show_entries format=duration -of compact=p=0:nk=1 -v 0"%path
gettime=subprocess.check_output(cmdline, shell=True)
timeT=int(float(gettime.strip()))
return timeT
file_list = []
def readfilep(file):
with open(file) as f:
for item in f:
item = item.strip('\n')
file_list.append(item)
# print(item)
if __name__ == "__main__":
path = '/Users/111/Desktop/youtube-jap/'
print(len(os.listdir(path)))
for file_name in os.listdir(path):
if file_name=='.DS_Store':
continue
file_url = path+file_name
base_path = path+file_name.split('.')[0]
#audio_path = base_path+'.mp3'
videoPath = file_url
cutTime=1
timeT=getVideoTime(videoPath)
firstTime=0
index=1
while firstTime<timeT:
cmdLine = 'ffmpeg -ss %s -i %s -c copy -t %s %s.mp4 -loglevel quiet -y'%(firstTime,videoPath,cutTime,'%s_%s'%(base_path,index))
print(cmdLine)
returnCmd = subprocess.call(cmdLine, shell=True)
firstTime+=cutTime
index+=1
直接请求下载并保存视频
# coding: utf-8
import json
import os
import requests
file_list = []
video_list = []
def readfilep(file):
with open(file) as f:
for item in f:
item = item.strip('\n')
item_dict = json.loads(item)
d = dict(
name = item_dict['_id_'],
url = item_dict['url'],
title = item_dict['title'],
vedio_path = item_dict['videoPath'][0]
)
print(d)
video_list.append(d['vedio_path'])
def apiRequest():
headers = {'User-Agent' : 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; InfoPath.3)'}
i = 1
for v in video_list:
get_response = requests.get(v)
print(get_response.status_code)
filename=os.path.basename(v)
save_path ='/Users/sueong/Desktop/youtube-bili/'+filename
#从服务器上下载的视频,需要保存get_response.content,写入文件
#因为是个浏览器的下载地址
if get_response.status_code == 200:
with open (save_path,'wb') as f:
f.write(get_response.content)
f.close()
print('文件保存成功!!',i,filename)
else:
print('失败!!!!',i,filename)
i+=1
if __name__ == '__main__':
readfilep("/Users/sueong/Desktop/extract/asr_bilibili_1110.txt")
print(video_list,len(video_list),len(set(video_list)))
apiRequest()
批量移动文件 shutil.move
import shutil
import os
def remove_file(old_path, new_path):
filelist = os.listdir(old_path) #列出该目录下的所有文件,listdir返回的文件列表是不包含路径的。
# print(filelist)
for file in filelist:
if file.split('.')[-1]=='mp3':
src = os.path.join(old_path, file)
dst = os.path.join(new_path, file)
# print('src:', src)
print('dst:', dst)
shutil.move(src, dst)
print('移动成功!')
if __name__ == '__main__':
remove_file(r"/Users/sueong/Desktop/youtube-bili/分割的视频", r"/Users/sueong/Desktop/youtube-bili/分割的视频/分割的音频")
print('move stop!')
利用百度翻译api翻译
如何使用api可以参考官方文档和demo
同时将翻译结果插入原来文档中,建立新字段
# coding: utf-8
import json
import threading
import os
import time
import translatebybaidu_jap
file_list = []
video_list = []
def readfilep(file):
with open(file) as f:
for item in f:
item = item.strip('\n')
item_dict = json.loads(item)
asrSortText = item_dict['asrSortText']
# 在asrSortText插入新字段zh_text
for asr in asrSortText:
ori_text = asr['text']
#video_list.append(ori_text)
zh_text = translatebybaidu_jap.translateTozh(ori_text)
print(item_dict['_id_'],zh_text)
asr['zh_text']=zh_text
time.sleep(1)
new_item = json.dumps(item_dict,ensure_ascii=False)
print('111111111111')
print(new_item)
with open(r'new1_asr_youtube_jap_1110.txt', 'a+')as file:
file.write(new_item+'\n')
if __name__ == '__main__':
readfilep("/Users/sueong/Desktop/extract/asr_youtube_jap_1110.txt")
translatebybaidu_jap.py
# coding: utf-8
trans_data = [] # 存放翻译后的数据
''' ======================= 百度翻译api ==========================='''
import random
from hashlib import md5
import time
import requests
#自己申请appid和appkey
appid = '1'
appkey ='1'
from_lang = 'jp' #日文
to_lang = 'zh' #中文
endpoint = 'http://api.fanyi.baidu.com'
path = '/api/trans/vip/translate'
url = endpoint + path
# # query = 'i love you man.'
fout = open(r'zh.txt', 'a+') # 以写的方式打开输出文件,将每次翻译的数据放入文件中
def make_md5(s, encoding='utf-8'):
return md5(s.encode(encoding)).hexdigest()
# def translateTozhINfile(query):
# salt = random.randint(32768, 65536)
# sign = make_md5(appid + query + str(salt) + appkey)
# print(sign,len(sign))
# headers = {'Content-Type': 'applicantion/x-www-form-urlencoded'}
# payload = {'appid': appid,
# 'q': query,
# 'from': from_lang,
# 'to': to_lang,
# 'salt': salt,
# 'sign': sign}
# r = requests.post(url, params=payload, headers=headers)
# result = r.json()
# print(result)
# for res in result['trans_result']:
# print(res['dst'])
# trans_data.append(res['dst'])
# fout.write(res['dst'] + '\n')
# time.sleep(1) # 受免费用户限制,睡眠1秒
# print(trans_data)
# fout.close()
def translateTozh(query):
salt = random.randint(32768, 65536)
sign = make_md5(appid + query + str(salt) + appkey)
headers = {'Content-Type': 'applicantion/x-www-form-urlencoded'}
payload = {'appid': appid,
'q': query,
'from': from_lang,
'to': to_lang,
'salt': salt,
'sign': sign}
r = requests.post(url, params=payload, headers=headers)
result = r.json()
for res in result['trans_result']:
return res['dst']