This repository has been archived on 2023-11-29. You can view files and clone it, but cannot push or open issues or pull requests.
WebPicDownloader/model/WebPicDownloader.py

259 lines
10 KiB
Python
Raw Normal View History

import os
from threading import Semaphore, Thread
from urllib import request
from urllib.error import HTTPError, URLError
from bs4 import BeautifulSoup, Tag, ResultSet
class WebPicDownloader(Thread):
"""
WebPicDownloader
Webpicdownloader is a scraping tool that allows you to browse a web page,
find the images and download them. This tool is easily usable and implementable
in an application. It has been designed to be executed in an integrated thread
in an asynchronous way as well as more classically in a synchronous way. This
tool allows to define 3 callback functions, one for events, one in case of
success and one in case of failure. It also has an integrated entry point
allowing it to be directly executed in terminal mode.
@author EndMove <contact@endmove.eu>
@version 1.2.0
"""
# Variables
__callbacks: dict = None # Callback dictionary
__settings: dict = None #
__dl_infos: dict = None #
__sem: Semaphore = None #
_exit: bool = None # When set to True quit the thread
# Constructor
def __init__(self, path: str = None, headers: dict = None, messenger = None,
success = None, failure = None) -> None:
"""
Constructor
=> It is important to initialize the WebPicDownloader object properly. The callback
functions can be initialized after the creation of the object.
* :path: -> Folder in which the tool will create the download folders and place the images.
* :headers: -> Dictionary allowing to define the different parameters present in the header
of the requests sent by WebPic.
* :asynchronous: -> True: launch the download in a thread, False: the opposite.
* :messenger: -> Callback function messenger (see setter).
* :success: -> Callback function success (see setter).
* :failure: -> Callback function failure (see setter).
"""
super().__init__(daemon=True)
self.__callbacks = {
'messenger': messenger if messenger else lambda msg: print(msg),
'success': success if success else lambda: print("Success!"),
'failure': failure if failure else lambda: print("failure!")
}
self.__settings = {
'root_path': path if path else os.getcwd(),
'headers': headers if headers else {
'User-Agent': "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36"
}
}
self.__dl_infos = {
'website_url': 'url',
'download_name': 'name',
'download_path': 'full_path',
'tot_image_count': 0,
'dl_image_count': 0,
'running': False
}
self.__sem = Semaphore(0)
self.__exit = False
self.start() # start deamon
# Internal functions
def __get_html(self, url: str) -> str:
"""
Internal Function #do-not-use#
=> Allow to retrieve the HTML content of a website.
* :url: -> The url of the site for which we want to get the content of the HTML page.
* RETURN -> Web page content.
"""
req = request.Request(url, headers=self.__settings.get('headers'))
response = request.urlopen(req)
return response.read().decode('utf-8')
def __find_all_img(self, html: str) -> ResultSet:
"""
Internal Function #do-not-use#
=> Allow to retrieve all images of an html page.
* :html: -> Html code in which to search for image balises.
* RETURN -> Iterable with all image balises.
"""
soup = BeautifulSoup(html, 'html.parser')
return soup.find_all('img')
def __find_img_link(self, img: Tag) -> str:
"""
Internal Function #do-not-use#
=> Allow to retrieve the link of a picture.
* :img: -> Image tag {@code bs4.Tag} for which to search the link of an image.
* RETURN -> Image link.
"""
if img.get('src'):
link = img.get('src')
elif img.get('data-src'):
link = img.get('data-src')
elif img.get('data-srcset'):
link = img.get('data-srcset')
elif img.get('data-fallback-src'):
link = img.get('data-fallback-src')
else:
raise ValueError("Unable to find image url")
if not 'http' in link:
raise ValueError("Bad image url")
return link
def __find_image_type(self, img_link: str) -> str:
"""
Internal Function #do-not-use#
=> Allow to retrieve the right image type (png, jpeg...)
* :img_link: -> Lien de l'image pourllaquel trouver le bon type.
* RETURN -> Type of image.
"""
type = img_link.split('.')[-1]
if '?' in type:
type = type.split('?')[0]
return type
def __download_img(self, url: str, filename: str) -> None:
"""
Internal Function #do-not-use#
=> Allow to download a picture from internet
* :url: -> Image url on the web.
* :filename: -> Full path with name of the future image.
"""
req = request.Request(url, headers=self.__settings.get('headers'))
raw_img = request.urlopen(req).read()
with open(filename, 'wb') as img:
img.write(raw_img)
def __initialize_folder(self, folder_path: str) -> None:
"""
Internal Function #do-not-use#
=> Checks if the folder in which to place the images to be uploaded exists and if
not chalk it up. An exception is raised if this folder already exists.
* :folder_path: -> Full path to the working folder (for the download task).
"""
if not os.path.exists(folder_path):
os.mkdir(folder_path)
else:
raise ValueError("The folder already exists, it may already contain images")
def __msg(self, message: str) -> None:
"""
Internal Function #do-not-use#
=> Use the messenger callback to send a message.
"""
self.__callbacks.get('messenger')(message)
# Public functions
def set_success_callback(self, callback) -> None:
"""
Setter to define the callback function when the download succeeded.
* :callback: -> the callback function to call when the download is a success.
"""
self.__callbacks['success'] = callback
def set_failure_callback(self, callback) -> None:
"""
Setter to define the callback function called when the download fails.
* :callback: -> the callback function to call when the download is a failure.
"""
self.__callbacks['failure'] = callback
def set_messenger_callback(self, callback) -> None:
"""
Setter to define the callback function called when new messages arrive.
* :callback: -> the callback function to call when a message event is emited.
"""
self.__callbacks['messenger'] = callback
def start_downloading(self, url: str, name: str) -> None:
"""
TODO desc
"""
if self.__dl_infos.get('running'):
print("bussy")
else:
self.__dl_infos['website_url'] = url
self.__dl_infos['download_name'] = name
self.__sem.release()
def stop_downloading(self, block=False) -> None:
"""
TODO DESC
"""
self.__exit = True
self.__sem.release()
if block:
self.join()
# Thread corp function
def run(self) -> None:
while True:
self.__sem.acquire()
if self.__exit:
return
self.__dl_infos['running'] = True # reserv run
try:
html = self.__get_html(self.__dl_infos.get('website_url')) # website html
images = self.__find_all_img(html) # find all img balises ing html
self.__dl_infos['tot_image_count'] = len(images) # count total image
self.__dl_infos['dl_image_count'] = 0 # set download count to 0
self.__dl_infos['download_path'] = f"{self.__settings.get('root_path')}/{self.__dl_infos.get('download_name')}/" # format path
self.__initialize_folder(self.__dl_infos.get('download_path')) # Init download folder
self.__msg(f"WebPicDownloader found {self.__dl_infos.get('tot_image_count')} images on the website.")
# process pictures
for i, img in enumerate(images):
try:
self.__msg(f"Start downloading image {i}.")
img_link = self.__find_img_link(img) # find image link
self.__download_img(img_link, f"{self.__dl_infos.get('download_path')}image-{i}.{self.__find_image_type(img_link)}") # download the image
self.__msg(f"Download of image {i}, done!")
self.__dl_infos['dl_image_count'] += 1 # increment download counter
except Exception as err:
self.__msg(f"ERROR: Unable to process image {i} -> err[{err}].")
self.__msg(f"WebPicDownloader has processed {self.__dl_infos.get('dl_image_count')} images out of {self.__dl_infos.get('tot_image_count')}.")
self.__callbacks.get('success')() # success, launch callback
except Exception as err:
self.__msg(f"ERROR: An error occured -> err[{err}]")
self.__callbacks.get('failure')() # error, launch callback
self.__dl_infos['running'] = False # free run
if __name__ == "__main__":
# Internal entry point for testing and consol use.
wpd = WebPicDownloader()
def lol(msg):
pass
wpd.set_messenger_callback(lol)
while True:
url = input("Website URL ? ")
name = input("Folder name ? ")
wpd.start_downloading(url, name)
if "n" == input("Do you want to continue [Y/n] ? ").lower():
wpd.stop_downloading()
break
print("Good bye !")