import os import sys from urllib import request from urllib.error import HTTPError, URLError from bs4 import BeautifulSoup, Tag, ResultSet class WebPicDownloader(): """ WebPicDownloader webpicdownloader is a simple tool able to find and download all pictures on a webpage. This tool is customizable and allows to define the working folder, the headers present in the http requests which will be emitted, a call back function named messenger which will be called at each event, and in addition the script is thread safe and allows to be stopped in the middle of treatment with a simple call to the stop function. @author EndMove @version 1.1.1 """ # Variables path: str = None # Main working folder directory headers: dict = None # Header parameters messenger = None # Event callback function thread_run: bool = None # Idicate if the task can still run (for use in a thread) # Constructor def __init__(self, path: str = None, headers: dict = None, messenger = None) -> None: """ Constructor => TODO * :path: -> Folder where the tool will download the images. * :headers: -> Dictionary allowing to define the different parameters present in the header of the requests sent by WebPic. """ self.path = path if path else os.getcwd() self.headers = headers if headers else { 'User-Agent': "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36" } self.messenger = messenger if messenger else lambda msg: print(msg) self.thread_run = True # Internal functions def __get_html(self, url: str) -> str: """ Internal Function #do-not-use# => Allow to retrieve the HTML content of a website. * :url: -> The url of the site for which we want to get the content of the HTML page. * RETURN -> Web page content. """ req = request.Request(url, headers=self.headers) response = request.urlopen(req) return response.read().decode('utf-8') def __find_all_img(self, html: str) -> ResultSet: """ Internal Function #do-not-use# => Allow to retrieve all images of an html page. * :html: -> Html code in which to search for image balises. * RETURN -> Iterable with all image balises. """ soup = BeautifulSoup(html, 'html.parser') return soup.find_all('img') def __find_img_link(self, img: Tag) -> str: """ Internal Function #do-not-use# => Allow to retrieve the link of a picture. * :img: -> Image tag {@code bs4.Tag} for which to search the link of an image. * RETURN -> Image link. """ if img.get('src'): link = img.get('src') elif img.get('data-src'): link = img.get('data-src') elif img.get('data-srcset'): link = img.get('data-srcset') elif img.get('data-fallback-src'): link = img.get('data-fallback-src') else: raise ValueError("Unable to find image url") if not 'http' in link: raise ValueError("Bad image url") return link def __find_image_type(self, img_link: str) -> str: """ Internal Function #do-not-use# => Allow to retrieve the right image type (png, jpeg...) * :img_link: -> Lien de l'image pourllaquel trouver le bon type. * RETURN -> Type of image. """ type = img_link.split('.')[-1] if '?' in type: type = type.split('?')[0] return type def __download_img(self, url: str, filename: str) -> None: """ Internal Function #do-not-use# => Allow to download a picture from internet * :url: -> Image url on the web. * :filename: -> Full path with name of the future image. """ req = request.Request(url, headers=self.headers) raw_img = request.urlopen(req).read() with open(filename, 'wb') as img: img.write(raw_img) def __initialize_folder(self, folder_path: str) -> None: """ Internal Function #do-not-use# => Checks if the folder in which to place the images to be uploaded exists and if not chalk it up. An exception is raised if this folder already exists. * :folder_path: -> Full path to the working folder (for the download task). """ if not os.path.exists(folder_path): os.mkdir(folder_path) else: raise ValueError("The folder already exists, it may already contain images") # Public functions def set_messenger_callback(self, callback) -> None: """ Setter to define the callback function called when new messages arrive. * :callback: -> the callback function to call when a message event is emited. """ self.messenger = callback def stop(self) -> None: """ Stop the downloading and processing of images (method for use in a thread). """ self.thread_run = False sys.exit(0) def download(self, url: str, folder_name: str) -> bool: """ Start downloading all pictures of a website * :url: -> The url of the website to annalyse. * :folder_name: -> The name of the folder in which to upload the photos. * RETURN -> True if success, False else. """ try: count = 0 # count to 0 folder_path = f"{self.path}/{folder_name}/" # format path html = self.__get_html(url) # website html images = self.__find_all_img(html) # find all img balises ing html self.thread_run = True # set thread_run to true self.__initialize_folder(folder_path) # initialize formatted path self.messenger(f"WebPicDownloader found {len(images)} images on the website.") for i, img in enumerate(images): print("check") print(id(self)) if not self.thread_run: print("return") try: self.messenger(f"Start downloading image {i}.") img_link = self.__find_img_link(img) self.__download_img(img_link, f"{folder_path}image-{i}.{self.__find_image_type(img_link)}") self.messenger(f"Download of image {i}, done!") count += 1 except Exception as err: self.messenger(f"ERROR: Unable to process image {i} -> err[{err}].") self.messenger(f"WebPicDownloader has processed {count} images out of {len(images)}.") return True except HTTPError as err: self.messenger(f"ERROR: An http error occured -> err[{err}].") except (ValueError, URLError) as err: self.messenger(f"ERROT: An error occured with the url -> err[{err}].") except Exception as err: self.messenger(f"ERROR: An unknown error occured -> err[{err}]") return False if __name__ == "__main__": wpd = WebPicDownloader() wpd.set_messenger_callback(lambda msg: print(f"--> {msg}")) while True: url = input("Website URL ? ") name = input("Folder name ? ") wpd.download(url, name) if "n" == input("Do you want to continue [Y/n] ? ").lower(): break print("Good bye !")