模型训练是一个耗时费力的活,除了现有的业务数据还需要采集一些其他资源作为补充语料。该脚本主要抓取“多人运动罗志祥”形象代言视频网站X8X8,目的是提取无语意的音频。

什么是无语意?无语意是指发出的声音没有明确的含义,比如一些“嗯”,“啊”,“哦”之类的声音,大家应该明白了什么是无语音了吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
#!/usr/bin/python
# _*_ encoding: utf8 _*_
# @author: joyven
# @date: 2021-05-27 22:58:37
# pip install ffmpy3 -i https://mirrors.aliyun.com/pypi/simple/

import requests
import sys
import time
from bs4 import BeautifulSoup
import csv
import os
import you_get
import ssl
from retrying import retry
import random


class X8x8(object):
"""
:desc 先抓取页面,再下载视频。具体步骤通过main方法的type参数控制
step 1:抓取X8X8中的视频页面地址保存到csv文件,main(csv_file, outputs_path, type=2)
step 2:遍历csv文件,获取视频下载地址并下载视频,main(csv_file, outputs_path, type=2),
下载视频失败重试5次,下载时随机休眠4-15秒
:time 2021-05-31 05:17:51
"""

def __init__(self):
ssl._create_default_https_context = ssl._create_unverified_context

def spider(self, path, retries = 0):
"""
:desc 爬取页面内容
:param page 页数
:param retries 重试次数
: TIPS:网站地址关注公众号”融媒体技术社“,回复”变量“获取
:return html页面内容
"""
domain = "变量1:网站地址"
url = domain + path
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'sec-ch-ua-mobile': '?0',
'sec-fetch-dest': 'image',
'sec-fetch-mode': 'no-cors',
'sec-fetch-site': 'same-origin',
'cache-control': 'no-cache',
'cookie': '_ga=GA1.2.1941385504.1622125602; _gid=GA1.2.1628234473.162212560',
'sec-ch-ua': '"Google Chrome";v="89", "Chromium";v="89", ";Not A Brand";v="99"',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_0_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36'
}

# sessions = requests.session()
# sessions.mount(domain, HTTP20Adapter())
# response = sessions.get(url, headers = headers)
reset = 1
response = requests.get(url, headers = headers)
if (response.status_code == 200):
print('success')
else:
while retries != 0 and reset <= retries:
time.sleep(5)
reset = reset + 1
response = requests.get(url, headers = headers)
if (response.status_code == 200):
break
if (response.status_code != 200):
print("======================第{0}页抓取失败:{1}==================".format(page, url))
return None
return response.text

def acquire_total_page(self, html=""):
"""
:return 返回总页数
"""
return 2233
# return 10

def parse_html(self, html=''):
'''
:param html: html
:desc: 解析html
'''
if not html:
return None
# print(html)
bs = BeautifulSoup(html, 'html.parser')
# result = bs.find(name='main', attrs={"id": "main"})
results = bs.select("main#main > div.lm_lb > ul > li")
list = []
for r in results:
# for child in r.children:
cover = r.select_one('img[alt="title"]')
link = r.select_one('div.lm_lbimg > a[href^="/html/"]')
title = r.get_text()
if cover is not None and link is not None:
cover = cover.attrs['src']
link = link.attrs['href']
list.append([cover, link, title])
else:
print("=================AD=================")
return list

def parser_video_path(self, html):
if not html:
return None
bs = BeautifulSoup(html, 'html.parser')
main = bs.find(name="main", attrs={"id": "main"})
sp = main.select_one('div.s_p')
vpath = sp.select_one('span#vpath')
purl = sp.select_one('span#purl')
downloadurl1 = sp.select_one('span#downloadurl1')
if vpath and downloadurl1 and purl:
vapth = vpath.get_text()
purl = purl.get_text()
downloadurl = downloadurl1.get_text()
return (vapth, purl, downloadurl)
return None

def save_to_csv(self, file_name='', list=[]):
'''
:param file_name 文件名
:param list
'''
if not file_name:
exit("文件名不能为空")

with open(file_name, 'a', errors = 'ignore', newline = '') as f:
writer = csv.writer(f)
for row in list:
writer.writerows([row])

@retry(stop_max_attempt_number=5, wait_fixed=5)
def download_video(self, dirs, video_url):
'''
:param dir: 输入的文件传入字典格式{文件:操作}
:param outputs_path: 输出的文件传入字典格式{文件:操作}
:return:
'''
paths = os.path.split(video_url)
file = os.path.join(dirs, paths[1])

if os.path.exists(file):
print("2.视频文件已存在:{}".format(file))
else:
sys.argv = ['you-get', '-o', dirs, video_url]
you_get.main()

def batch_download_videos(self, csv_file, outputs_path):
'''
:param csv_file 存放视频链接的CSV文件
:param outputs_path 下载视频输出的路径
: TIPS:实际视频地址关注公众号”融媒体技术社“,回复”变量“获取
'''
domain = "变量2:实际视频地址"
if not outputs_path:
outputs_path = "./"
if not outputs_path.endswith("/"):
outputs_path = outputs_path + '/'
if not os.path.exists(outputs_path):
os.makedirs(outputs_path)

csv_reader = csv.reader(open(csv_file))
for row in csv_reader:
cover = row[0]
path = row[1]
title = row[2]

dirs = outputs_path + title
list = os.listdir(dirs) if os.path.exists(dirs) else []
if len(list) == 1 and list[0].endswith(".mp4"):
print("1.视频文件已存在:{0}".format(title))
else:
try:
html = self.spider(path, 3)

# t[0] vapth, t[1] purl, t[2] downloadurl
t = self.parser_video_path(html)
if t and len(t) == 3:
self.download_video(dirs, t[2])
time.sleep(random.randint(4, 15))
else:
print("下载失败的视频:页面:{0}, titile:{1}", path, title)
except Exception as e:
print("下载失败的视频为:{0}".format(title))

def main(self, csv_file, outputs_path=None, type=1):
if type == 1 or type is None:
total_page = self.acquire_total_page("")
for page in range(0, total_page):
path = "/html/category/video/page_{}.html".format(page + 1)
html = self.spider(path, retries=3)
list = self.parse_html(html)
self.save_to_csv(csv_file, list)
else:
if not os.path.exists(csv_file):
print("已抓取的视频地址文件不存在:{}".format(csv_file))
self.batch_download_videos(csv_file, outputs_path)


if __name__ == '__main__':
csv_file = "x8x8.csv"
outputs_path = "/模型训练资料/x8x8/"
# ssl._create_default_https_context = ssl._create_unverified_context

x8x8 = X8x8()
x8x8.main(csv_file, outputs_path, type=2)