1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
|
import requests import sys import time from bs4 import BeautifulSoup import csv import os import you_get import ssl from retrying import retry import random
class X8x8(object): """ :desc 先抓取页面,再下载视频。具体步骤通过main方法的type参数控制 step 1:抓取X8X8中的视频页面地址保存到csv文件,main(csv_file, outputs_path, type=2) step 2:遍历csv文件,获取视频下载地址并下载视频,main(csv_file, outputs_path, type=2), 下载视频失败重试5次,下载时随机休眠4-15秒 :time 2021-05-31 05:17:51 """
def __init__(self): ssl._create_default_https_context = ssl._create_unverified_context
def spider(self, path, retries = 0): """ :desc 爬取页面内容 :param page 页数 :param retries 重试次数 : TIPS:网站地址关注公众号”融媒体技术社“,回复”变量“获取 :return html页面内容 """ domain = "变量1:网站地址" url = domain + path headers = { 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'accept-encoding': 'gzip, deflate, br', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8', 'sec-ch-ua-mobile': '?0', 'sec-fetch-dest': 'image', 'sec-fetch-mode': 'no-cors', 'sec-fetch-site': 'same-origin', 'cache-control': 'no-cache', 'cookie': '_ga=GA1.2.1941385504.1622125602; _gid=GA1.2.1628234473.162212560', 'sec-ch-ua': '"Google Chrome";v="89", "Chromium";v="89", ";Not A Brand";v="99"', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_0_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36' }
reset = 1 response = requests.get(url, headers = headers) if (response.status_code == 200): print('success') else: while retries != 0 and reset <= retries: time.sleep(5) reset = reset + 1 response = requests.get(url, headers = headers) if (response.status_code == 200): break if (response.status_code != 200): print("======================第{0}页抓取失败:{1}==================".format(page, url)) return None return response.text
def acquire_total_page(self, html=""): """ :return 返回总页数 """ return 2233
def parse_html(self, html=''): ''' :param html: html :desc: 解析html ''' if not html: return None bs = BeautifulSoup(html, 'html.parser') results = bs.select("main#main > div.lm_lb > ul > li") list = [] for r in results: cover = r.select_one('img[alt="title"]') link = r.select_one('div.lm_lbimg > a[href^="/html/"]') title = r.get_text() if cover is not None and link is not None: cover = cover.attrs['src'] link = link.attrs['href'] list.append([cover, link, title]) else: print("=================AD=================") return list
def parser_video_path(self, html): if not html: return None bs = BeautifulSoup(html, 'html.parser') main = bs.find(name="main", attrs={"id": "main"}) sp = main.select_one('div.s_p') vpath = sp.select_one('span#vpath') purl = sp.select_one('span#purl') downloadurl1 = sp.select_one('span#downloadurl1') if vpath and downloadurl1 and purl: vapth = vpath.get_text() purl = purl.get_text() downloadurl = downloadurl1.get_text() return (vapth, purl, downloadurl) return None
def save_to_csv(self, file_name='', list=[]): ''' :param file_name 文件名 :param list ''' if not file_name: exit("文件名不能为空")
with open(file_name, 'a', errors = 'ignore', newline = '') as f: writer = csv.writer(f) for row in list: writer.writerows([row])
@retry(stop_max_attempt_number=5, wait_fixed=5) def download_video(self, dirs, video_url): ''' :param dir: 输入的文件传入字典格式{文件:操作} :param outputs_path: 输出的文件传入字典格式{文件:操作} :return: ''' paths = os.path.split(video_url) file = os.path.join(dirs, paths[1])
if os.path.exists(file): print("2.视频文件已存在:{}".format(file)) else: sys.argv = ['you-get', '-o', dirs, video_url] you_get.main()
def batch_download_videos(self, csv_file, outputs_path): ''' :param csv_file 存放视频链接的CSV文件 :param outputs_path 下载视频输出的路径 : TIPS:实际视频地址关注公众号”融媒体技术社“,回复”变量“获取 ''' domain = "变量2:实际视频地址" if not outputs_path: outputs_path = "./" if not outputs_path.endswith("/"): outputs_path = outputs_path + '/' if not os.path.exists(outputs_path): os.makedirs(outputs_path)
csv_reader = csv.reader(open(csv_file)) for row in csv_reader: cover = row[0] path = row[1] title = row[2]
dirs = outputs_path + title list = os.listdir(dirs) if os.path.exists(dirs) else [] if len(list) == 1 and list[0].endswith(".mp4"): print("1.视频文件已存在:{0}".format(title)) else: try: html = self.spider(path, 3)
t = self.parser_video_path(html) if t and len(t) == 3: self.download_video(dirs, t[2]) time.sleep(random.randint(4, 15)) else: print("下载失败的视频:页面:{0}, titile:{1}", path, title) except Exception as e: print("下载失败的视频为:{0}".format(title))
def main(self, csv_file, outputs_path=None, type=1): if type == 1 or type is None: total_page = self.acquire_total_page("") for page in range(0, total_page): path = "/html/category/video/page_{}.html".format(page + 1) html = self.spider(path, retries=3) list = self.parse_html(html) self.save_to_csv(csv_file, list) else: if not os.path.exists(csv_file): print("已抓取的视频地址文件不存在:{}".format(csv_file)) self.batch_download_videos(csv_file, outputs_path)
if __name__ == '__main__': csv_file = "x8x8.csv" outputs_path = "/模型训练资料/x8x8/"
x8x8 = X8x8() x8x8.main(csv_file, outputs_path, type=2)
|