266 lines
10 KiB
Python
266 lines
10 KiB
Python
import os
|
|
import threading
|
|
from urllib import request
|
|
from urllib.error import HTTPError, URLError
|
|
from bs4 import BeautifulSoup, Tag, ResultSet
|
|
|
|
|
|
class WebPicDownloader(threading.Thread):
|
|
"""
|
|
WebPicDownloader
|
|
|
|
Webpicdownloader is a scraping tool that allows you to browse a web page,
|
|
find the images and download them. This tool is easily usable and implementable
|
|
in an application. It has been designed to be executed in an integrated thread
|
|
in an asynchronous way as well as more classically in a synchronous way. This
|
|
tool allows to define 3 callback functions, one for events, one in case of
|
|
success and one in case of failure. It also has an integrated entry point
|
|
allowing it to be directly executed in terminal mode.
|
|
|
|
@author EndMove <contact@endmove.eu>
|
|
@version 1.2.0
|
|
"""
|
|
# Variables
|
|
path: str = None # Main working folder directory
|
|
headers: dict = None # Header parameters
|
|
dl_setting: dict = None # Parameter dictionary
|
|
|
|
callbacks: dict = None # Callback dictionary
|
|
|
|
use_thread: bool = None # Indicates if the script must launch the download in a separate thread
|
|
async_run: bool = None # Idicate if the task can still run (for use in a thread)
|
|
|
|
# Constructor
|
|
def __init__(self, path: str = None, headers: dict = None, asynchrone: bool = False,
|
|
messenger = None, success = None, failure = None) -> None:
|
|
"""
|
|
Constructor
|
|
=> It is important to initialize the WebPicDownloader object properly. The callback
|
|
functions can be initialized after the creation of the object.
|
|
|
|
* :path: -> Folder in which the tool will create the download folders and place the images.
|
|
* :headers: -> Dictionary allowing to define the different parameters present in the header
|
|
of the requests sent by WebPic.
|
|
* :asynchronous: -> True: launch the download in a thread, False: the opposite.
|
|
* :messenger: -> Callback function messenger (see setter).
|
|
* :success: -> Callback function success (see setter).
|
|
* :failure: -> Callback function failure (see setter).
|
|
"""
|
|
super().__init__()
|
|
self.path = path if path else os.getcwd()
|
|
self.headers = headers if headers else {
|
|
'User-Agent': "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36"
|
|
}
|
|
self.dl_setting = {
|
|
'url': None,
|
|
'name': None
|
|
}
|
|
|
|
self.callbacks = {
|
|
'messenger': messenger if messenger else lambda msg: print(msg),
|
|
'success': success if success else lambda: print("Success!"),
|
|
'failure': failure if failure else lambda: print("failure!")
|
|
}
|
|
|
|
self.use_thread = asynchrone
|
|
self.async_run = True
|
|
|
|
# Internal functions
|
|
def __get_html(self, url: str) -> str:
|
|
"""
|
|
Internal Function #do-not-use#
|
|
=> Allow to retrieve the HTML content of a website.
|
|
|
|
* :url: -> The url of the site for which we want to get the content of the HTML page.
|
|
* RETURN -> Web page content.
|
|
"""
|
|
req = request.Request(url, headers=self.headers)
|
|
response = request.urlopen(req)
|
|
return response.read().decode('utf-8')
|
|
|
|
def __find_all_img(self, html: str) -> ResultSet:
|
|
"""
|
|
Internal Function #do-not-use#
|
|
=> Allow to retrieve all images of an html page.
|
|
|
|
* :html: -> Html code in which to search for image balises.
|
|
* RETURN -> Iterable with all image balises.
|
|
"""
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
return soup.find_all('img')
|
|
|
|
def __find_img_link(self, img: Tag) -> str:
|
|
"""
|
|
Internal Function #do-not-use#
|
|
=> Allow to retrieve the link of a picture.
|
|
|
|
* :img: -> Image tag {@code bs4.Tag} for which to search the link of an image.
|
|
* RETURN -> Image link.
|
|
"""
|
|
if img.get('src'):
|
|
link = img.get('src')
|
|
elif img.get('data-src'):
|
|
link = img.get('data-src')
|
|
elif img.get('data-srcset'):
|
|
link = img.get('data-srcset')
|
|
elif img.get('data-fallback-src'):
|
|
link = img.get('data-fallback-src')
|
|
else:
|
|
raise ValueError("Unable to find image url")
|
|
if not 'http' in link:
|
|
raise ValueError("Bad image url")
|
|
return link
|
|
|
|
def __find_image_type(self, img_link: str) -> str:
|
|
"""
|
|
Internal Function #do-not-use#
|
|
=> Allow to retrieve the right image type (png, jpeg...)
|
|
|
|
* :img_link: -> Lien de l'image pourllaquel trouver le bon type.
|
|
* RETURN -> Type of image.
|
|
"""
|
|
type = img_link.split('.')[-1]
|
|
if '?' in type:
|
|
type = type.split('?')[0]
|
|
return type
|
|
|
|
def __download_img(self, url: str, filename: str) -> None:
|
|
"""
|
|
Internal Function #do-not-use#
|
|
=> Allow to download a picture from internet
|
|
|
|
* :url: -> Image url on the web.
|
|
* :filename: -> Full path with name of the future image.
|
|
"""
|
|
req = request.Request(url, headers=self.headers)
|
|
raw_img = request.urlopen(req).read()
|
|
with open(filename, 'wb') as img:
|
|
img.write(raw_img)
|
|
|
|
def __initialize_folder(self, folder_path: str) -> None:
|
|
"""
|
|
Internal Function #do-not-use#
|
|
=> Checks if the folder in which to place the images to be uploaded exists and if
|
|
not chalk it up. An exception is raised if this folder already exists.
|
|
|
|
* :folder_path: -> Full path to the working folder (for the download task).
|
|
"""
|
|
if not os.path.exists(folder_path):
|
|
os.mkdir(folder_path)
|
|
else:
|
|
raise ValueError("The folder already exists, it may already contain images")
|
|
|
|
def __msg(self, message: str) -> None:
|
|
"""
|
|
Internal Function #do-not-use#
|
|
=> Use the messenger callback to send a message.
|
|
"""
|
|
self.callbacks.get('messenger')(message)
|
|
|
|
# Public functions
|
|
def start_downloading(self, url: str, folder_name: str) -> None:
|
|
"""
|
|
Start downloading all pictures of a website.
|
|
Depending on the configuration the download will either be started in the
|
|
main process or in a separate thread.
|
|
|
|
* :url: -> The url of the website to annalyse.
|
|
* :folder_name: -> The name of the folder in which to upload the photos.
|
|
"""
|
|
# Updating settings
|
|
self.dl_setting['url'] = url
|
|
self.dl_setting['name'] = folder_name
|
|
|
|
# Start processing
|
|
if self.use_thread:
|
|
self.async_run = True
|
|
self.start()
|
|
else:
|
|
self.run()
|
|
|
|
def stop_downloading(self) -> None:
|
|
"""
|
|
Stops the download after the current item is processed and exit the downloading thread.
|
|
|
|
<!> this function is available only if asynchronous option is set to True in constructor. <!>
|
|
"""
|
|
if self.use_thread:
|
|
self.async_run = False
|
|
else:
|
|
raise Exception("Can not stop a no multi thread execution")
|
|
|
|
def run(self) -> None:
|
|
"""
|
|
<!>
|
|
This function must not be used! It is an internal function called automatically
|
|
when the download task starts.
|
|
<!>
|
|
"""
|
|
try:
|
|
# init variables
|
|
download_count = 0 # count to 0
|
|
download_folder = f"{self.path}/{self.dl_setting.get('name')}/" # format path
|
|
website_html = self.__get_html(self.dl_setting.get('url')) # website html
|
|
images = self.__find_all_img(website_html) # find all img balises ing html
|
|
|
|
self.__initialize_folder(download_folder) # initialize download folder
|
|
self.__msg(f"WebPicDownloader found {len(images)} images on the website.")
|
|
|
|
# process pictures
|
|
for i, img in enumerate(images):
|
|
if self.use_thread and not self.async_run:
|
|
return
|
|
try:
|
|
self.__msg(f"Start downloading image {i}.")
|
|
img_link = self.__find_img_link(img)
|
|
self.__download_img(img_link, f"{download_folder}image-{i}.{self.__find_image_type(img_link)}")
|
|
self.__msg(f"Download of image {i}, done!")
|
|
download_count += 1
|
|
except Exception as err:
|
|
self.__msg(f"ERROR: Unable to process image {i} -> err[{err}].")
|
|
self.__msg(f"WebPicDownloader has processed {download_count} images out of {len(images)}.")
|
|
self.callbacks.get('success')() # success, launch callback and return
|
|
return
|
|
|
|
except HTTPError as err:
|
|
self.__msg(f"ERROR: An http error occured -> err[{err}].")
|
|
except (ValueError, URLError) as err:
|
|
self.__msg(f"ERROT: An error occured with the url -> err[{err}].")
|
|
except Exception as err:
|
|
self.__msg(f"ERROR: An unknown error occured -> err[{err}]")
|
|
self.callbacks.get('failure')() # failure, launch callback and exit
|
|
|
|
def set_success_callback(self, callback) -> None:
|
|
"""
|
|
Setter to define the callback function when the download succeeded.
|
|
|
|
* :callback: -> the callback function to call when the download is a success.
|
|
"""
|
|
self.callbacks['success'] = callback
|
|
|
|
def set_failure_callback(self, callback) -> None:
|
|
"""
|
|
Setter to define the callback function called when the download fails.
|
|
|
|
* :callback: -> the callback function to call when the download is a failure.
|
|
"""
|
|
self.callbacks['failure'] = callback
|
|
|
|
def set_messenger_callback(self, callback) -> None:
|
|
"""
|
|
Setter to define the callback function called when new messages arrive.
|
|
|
|
* :callback: -> the callback function to call when a message event is emited.
|
|
"""
|
|
self.callbacks['messenger'] = callback
|
|
|
|
if __name__ == "__main__":
|
|
# Internal entry point for testing and consol use.
|
|
wpd = WebPicDownloader()
|
|
while True:
|
|
url = input("Website URL ? ")
|
|
name = input("Folder name ? ")
|
|
wpd.start_downloading(url, name)
|
|
if "n" == input("Do you want to continue [Y/n] ? ").lower():
|
|
break
|
|
print("Good bye !") |