This repository has been archived on 2023-11-29. You can view files and clone it, but cannot push or open issues or pull requests.
WebPicDownloader/model/webpic.py
2022-08-31 23:53:24 +02:00

132 lines
4.8 KiB
Python

import os
from urllib import request
from urllib.error import HTTPError, URLError
from bs4 import BeautifulSoup, Tag, ResultSet
class WebPicDownloader():
"""
WebPicDownloader
webpicdownloader is a simple tool able to
find and download all pictures on a webpage.
@author EndMove <contact@endmove.eu>
@version 1.1.0
"""
# Variables
path: str = None
messenger = None
headers: dict = None
# Constructor
def __init__(self, path: str = os.getcwd()) -> None:
"""Constructor"""
self.path = path
self.messenger = lambda message: print(message)
self.headers = {'User-Agent': "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36"}
# Internal functions
def __get_html(self, url: str) -> str:
"""Allow to retrieve the HTML content of a website"""
req = request.Request(url, headers=self.headers)
response = request.urlopen(req)
return response.read().decode('utf-8')
def __find_all_img(self, html: str) -> ResultSet:
"""Allow to retrieve all images of an html page"""
soup = BeautifulSoup(html, 'html.parser')
return soup.find_all('img')
def __find_img_link(self, img: Tag) -> str:
"""Allow to retrieve the link of a picture"""
if img.get('src'):
link = img.get('src')
elif img.get('data-src'):
link = img.get('data-src')
elif img.get('data-srcset'):
link = img.get('data-srcset')
elif img.get('data-fallback-src'):
link = img.get('data-fallback-src')
else:
raise ValueError("Unable to find image url")
if not 'http' in link:
raise ValueError("Bad image url")
return link
def __find_image_type(self, img_link: str) -> str:
"""Allow to retrieve the right image type"""
type = img_link.split('.')[-1]
if '?' in type:
type = type.split('?')[0]
return type
def __download_img(self, url: str, filename: str) -> None:
"""Allow to download a picture from internet"""
req = request.Request(url, headers=self.headers)
raw_img = request.urlopen(req).read()
with open(filename, 'wb') as img:
img.write(raw_img)
def __initialize_folder(self, folder_path: str) -> None:
"""Init the folder on which put downloaded images"""
if not os.path.exists(folder_path):
os.mkdir(folder_path)
else:
raise ValueError("the folder already exists, it may already contain images")
# Public functions
def set_messenger_callback(self, callback) -> None:
"""
Setter to define the callback function called when new messages arrive.
:callback: -> the callback function to call when a message event is emited.
"""
self.messenger = callback
def download(self, url: str, folder_name: str) -> bool:
"""
Start downloading all pictures of a website
:url: -> The url of the website to annalyse.\n
:folder_name: -> The name of the folder in which to upload the photos.
"""
try:
count = 0
folder_path = f"{self.path}/{folder_name}/"
html = self.__get_html(url)
images = self.__find_all_img(html)
self.__initialize_folder(folder_path)
self.messenger(f"WebPicDownloader found {len(images)} images on the website.")
for i, img in enumerate(images):
try:
self.messenger(f"Start downloading image {i}.")
img_link = self.__find_img_link(img)
self.__download_img(img_link, f"{folder_path}image-{i}.{self.__find_image_type(img_link)}")
self.messenger(f"Download of image {i}, done!")
count += 1
except Exception as err:
self.messenger(f"ERROR: Unable to process image {i} -> err[{err}].")
self.messenger(f"WebPicDownloader has processed {count} images out of {len(images)}.")
return True
except HTTPError as err:
self.messenger(f"ERROR: An http error occured -> err[{err}].")
except (ValueError, URLError) as err:
self.messenger(f"ERROT: An error occured with the url -> err[{err}].")
except Exception as err:
self.messenger(f"ERROR: An unknown error occured -> err[{err}]")
return False
if __name__ == "__main__":
wpd = WebPicDownloader()
wpd.set_messenger_callback(lambda msg: print(f"--> {msg}"))
while True:
url = input("Website URL ? ")
name = input("Folder name ? ")
wpd.download(url, name)
if "n" == input("Do you want to continue [Y/n] ? ").lower():
break
print("Good bye !")