Example of the Wordpress Connector (one-way)

This documentation assumes that you already have a token and secret key. You can get them by this instruction.

Connector from Shopify - Wizart PIM (one-way)

First, let's initialize the libraries that we will need during the writing process:

import csv import datetime import json import logging import os import shutil import ssl import sys import time import urllib.request from pathlib import Path from zipfile import ZipFile import pandas as pd import requests from dotenv import load_dotenv from requests_toolbelt import MultipartEncoder from woocommerce import API

Please be careful, as there may be additional libraries required. This is just a subset of them. In your case, additional libraries may be needed.

Next, we will initialize the variables that we will need (it’s example, you need add to the code real credentials):

ssl._create_default_https_context = ssl._create_unverified_context load_dotenv() client_name = os.getenv('client_name') host = os.getenv('host') client_id = os.getenv('client_id') client_secret = os.getenv('client_secret') grant_type = os.getenv('grant_type') password = os.getenv('password') username = os.getenv('username') target_size = os.getenv('target_size') client_product_type = os.getenv('client_product_type') # you need to add name_attribute = os.getenv('name_attribute') # you need to add number_id_attribute = os.getenv('number_id_attribute') slug_attribute = os.getenv('slug_attribute') attribute_term_id = os.getenv('attribute_term_id') wordpress_status = os.getenv('wordpress_status') wordpress_status_remove = os.getenv('wordpress_status_remove') today = datetime.date.today() date_of_get_json = today.strftime("%b-%d-%Y") path = os.getcwd() All_products = [] All_products_remove = [] customized_size = os.getenv('customized_size') url = os.getenv('url') consumer_key = os.getenv('consumer_key') consumer_secret = os.getenv('consumer_secret') version = os.getenv('version')

And the rest of the code:

def woocommerce_api_products(): wcapi = API( url=url, consumer_key=consumer_key, consumer_secret=consumer_secret, wp_api=True, version=version, query_string_auth=True ) customized_size = 'No' # Customized size or no response_attributes = wcapi.get("products/attributes").json() number_id_attribute = '' slug_attribute = '' attribute_term_id = '' for response_attribute in response_attributes: if response_attribute['name'] == name_attribute: number_id_attribute = response_attribute['id'] slug_attribute = response_attribute['slug'] attributes_term = wcapi.get("products/attributes/{}/terms".format(number_id_attribute)).json() for attribute_term in attributes_term: if attribute_term['name'] == client_product_type: attribute_term_id = attribute_term['id'] response_first_page = wcapi.get("products", params={"attribute": slug_attribute, "attribute_term": attribute_term_id, "status": wordpress_status}) response_first_page_remove = wcapi.get("products", params={"attribute": slug_attribute, "attribute_term": attribute_term_id, "status": wordpress_status_remove}) total_pages = int(response_first_page.headers['X-WP-TotalPages']) total_pages_remove = int(response_first_page_remove.headers['X-WP-TotalPages']) page = 1 page_remove = 1 while page <= total_pages: all_products = wcapi.get("products", params={"page": page, "attribute": slug_attribute, "status": wordpress_status, "attribute_term": attribute_term_id}).json() for product in all_products: All_products.append(product) page = page + 1 while page_remove <= total_pages_remove: all_products_remove = wcapi.get("products", params={"page": page_remove, "attribute": slug_attribute, "status": wordpress_status_remove, "attribute_term": attribute_term_id}).json() for product_remove in all_products_remove: All_products_remove.append(product_remove) page_remove = page_remove + 1 print('Getting woocommerce information...') woocommerce_api_products() def api_prepare_import(): def get_client_token(): url_access_token = '{}/oauth/token'.format(host) headers = { "Accept": "application/json", "Content-Type": "application/json" } payload = { "username": username, "password": password, "client_secret": client_secret, "client_id": client_id, "grant_type": grant_type } files = [] get_token_info = requests.request("POST", url_access_token, headers=headers, data=json.dumps(payload), files=files) token_info = get_token_info.content.decode('utf8').replace("'", '"') json_token_info = json.loads(token_info) access_token = json_token_info['token_type'] + ' ' + json_token_info['access_token'] return access_token def get_product_type(): url_get_product_type = '{}/api/v2/import/product-types'.format(host) product_code = '' headers = { "Accept": "application/json", "Authorization": get_client_token() } get_product_types = requests.get(url_get_product_type, headers=headers) product_types = get_product_types.content.decode('utf8').replace("'", '"') json_product_types = json.loads(product_types) for each_product_type in json_product_types['data']: if client_product_type in each_product_type['attributes']['name']: product_code = each_product_type['attributes']['code'] return product_code def get_data_mapping_id(): url_data_mapping = '{}/api/customer/data-mappings?product_type={}'.format(host, get_product_type()) headers = { "Accept": "application/json", "Authorization": get_client_token() } get_data_mapping = requests.get(url_data_mapping, headers=headers) data_mapping = get_data_mapping.content.decode('utf8').replace("'", '"') json_data_mapping = json.loads(data_mapping) for each_data_mapping in json_data_mapping['data']: id_data_mapping = each_data_mapping['id'] return id_data_mapping def get_default_csv(): url_default_csv = '{}/api/customer/data-mappings/default/{}'.format(host, get_product_type()) headers = { "Accept": "text/csv", "Authorization": get_client_token() } get_default_csv = requests.get(url_default_csv, headers=headers) default_csv = get_default_csv.content.decode("utf-8") csv_columns_murals = default_csv.split(',') return csv_columns_murals get_client_token() get_data_mapping_id() get_default_csv() return get_client_token(), get_data_mapping_id(), get_default_csv(), get_product_type() print('Getting API information...') token, id_mapping, template_csv, product_type = api_prepare_import() def create_directory(): path_to_basic_directory = path + '/{}-{}'.format(client_name + '-' + date_of_get_json, client_product_type) try: os.mkdir(path_to_basic_directory) except OSError: pass return path_to_basic_directory csv_file = "{}/{}-{}.csv".format(create_directory(), date_of_get_json, client_product_type) def create_image_directory(): try: path_images_directory = create_directory() + '/{}-Images'.format(client_product_type) os.mkdir(path_images_directory) except OSError: pass return path_images_directory Data = [] def get_exists_product(): headers = { 'Accept': 'application/json', 'Authorization': token } page_num = 1 url_update = '{}/api/customer/articles?page=1&locale=en'.format(host) get_uploaded_products = requests.get(url_update, headers=headers, timeout=None) uploaded_products = get_uploaded_products.text json_uploaded_products = json.loads(uploaded_products) data_uploaded_products = json_uploaded_products total_num_page = data_uploaded_products['meta']['last_page'] for page_num in range(1, total_num_page + 1): url_get_exist_product = '{}/api/customer/articles?page={}&locale=en'.format(host, page_num) get_exist_product = requests.get(url_get_exist_product, headers=headers, timeout=None) all_uploaded_products = get_exist_product.text json_uploaded_products = json.loads(all_uploaded_products) data_uploaded_products = json_uploaded_products['data'] Data.append(data_uploaded_products) page_num = page_num + 1 # return Data print('Getting exist products from PIM...') get_exists_product() Images_link = [] def wallpaper_wordpress_to_csv(csv_file_folder): csv_file_wallpaper = csv_file_folder flag_length = 'attributes' flag_width = 'attributes' try: with open(csv_file_wallpaper, 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=template_csv, extrasaction='ignore') writer.writeheader() for products in All_products: try: products['Article_name'] = products['name'] except: pass products['Article_Link'] = products['permalink'] products['Article_description'] = products['description'] products['Unique_SKU_ID'] = products['sku'] products['Article_regular_price'] = products['price'] products['Product_Width'] = 1 products['Repeat_Width'] = 1 if products['dimensions']['length'] == '': flag_length = 'attributes' pass else: products['Wallpaper_roll_length'] = products['dimensions']['length'] flag_length = 'dimensions' if products['dimensions']['width'] == '': flag_width = 'attributes' pass else: products['Product_Width'] = products['dimensions']['width'] products['Repeat_Width'] = products['dimensions']['width'] flag_width = 'dimensions' for images in products['images']: src = images['src'] image_name = images['name'] Images_link.append(src) Images_link.append(image_name) products['Article_image'] = src for product_attributes in products['attributes']: if product_attributes['name'] == 'Brand': for brand_name in product_attributes['options']: products['Brand_name'] = brand_name if product_attributes['name'] == 'Collection': for collection_name in product_attributes['options']: products['Collection_name'] = collection_name if flag_width == 'attributes': if product_attributes['name'] == 'Width': for width in product_attributes['options']: products['Product_Width'] = width.replace('N/A', '1') products['Repeat_Width'] = width.replace('N/A', '1') if flag_length == 'attributes': if product_attributes['name'] == 'Length': for length in product_attributes['options']: products['Wallpaper_roll_length'] = length writer.writerow(products) csvfile.close() except IOError: print("I/O error") def mural_wordpress_to_csv(csv_file_folder): csv_file_mural = csv_file_folder flag_length = 'attributes' flag_width = 'attributes' try: with open(csv_file_mural, 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=template_csv, extrasaction='ignore') writer.writeheader() for products in All_products: products['product_name'] = products['name'] products['product_link'] = products['permalink'] products['product_description'] = products['description'] products['unique_sku_id'] = products['sku'] products['product_regular_price'] = products['price'] products['customized_size'] = 'Yes' products['product_width'] = 1 products['pattern_width'] = 1 products['product_length'] = 1 products['pattern_length'] = 1 if products['dimensions']['length'] == '': flag_length = 'attributes' pass else: products['product_length'] = products['dimensions']['length'] products['pattern_length'] = products['dimensions']['length'] flag_length = 'dimensions' if products['dimensions']['width'] == '': flag_width = 'attributes' pass else: products['product_width'] = products['dimensions']['width'] products['pattern_width'] = products['dimensions']['width'] flag_width = 'dimensions' for images in products['images']: src = images['src'] image_name = images['name'] Images_link.append(src) Images_link.append(image_name) products['product_image'] = src for product_attributes in products['attributes']: if product_attributes['name'] == 'Brand': for brand_name in product_attributes['options']: products['brand_name'] = brand_name if product_attributes['name'] == 'Collection': for collection_name in product_attributes['options']: products['collection_name'] = collection_name if flag_width == 'attributes': if product_attributes['name'] == 'Width': for width in product_attributes['options']: if str(width) != '' or width != None or str(width) != 'N/A': products['customized_size'] = customized_size products['product_width'] = width products['pattern_width'] = width if flag_length == 'attributes': if product_attributes['name'] == 'Length': for length in product_attributes['options']: if str(length) != '' or length != None or str(length) != 'N/A': products['product_length'] = length products['pattern_length'] = length writer.writerow(products) csvfile.close() except IOError: print("I/O error") if client_product_type == 'Wallpaper': print('Creating wallpaper csv...') wallpaper_wordpress_to_csv(csv_file) elif client_product_type == 'Mural': print('Creating mural csv...') mural_wordpress_to_csv(csv_file) Images_links_to_pim = [] Update = {} Update_collection = {} Sku_uuid = [] Client_list = [] Collection_uuid_update = [] Client_remove_sku = [] Remove = [] Collection_uuid_remove = [] def to_csv_import_get_remove(): for product in Data: for each_exist_product in product: try: Sku_uuid.append(each_exist_product['sku']) Sku_uuid.append(each_exist_product['uuid']) Sku_uuid.append(each_exist_product['collection']['uuid']) Update[each_exist_product['sku']] = each_exist_product['uuid'] Update_collection[each_exist_product['sku']] = each_exist_product['collection']['uuid'] except: pass df = pd.read_csv(csv_file, index_col=0) for products_to_remove in All_products_remove: client_remove_sku = products_to_remove['sku'] Client_remove_sku.append(str(client_remove_sku)) if client_product_type == 'Wallpaper': df = df.astype({'Unique_SKU_ID': 'object'}) for strings in df['Unique_SKU_ID']: Client_list.append(str(strings)) else: df = df.astype({'unique_sku_id': 'object'}) for strings in df['unique_sku_id']: Client_list.append(str(strings)) add_diff_client_from_pim = list( set([str(item) for item in Client_list]) - set([str(item) for item in Sku_uuid[0::3]])) # add match_products = list( set([str(item) for item in Client_remove_sku]) & set([str(item) for item in Sku_uuid[0::3]])) # update zip_sku_uuid = zip(Sku_uuid[0::3], Sku_uuid[1::3]) zip_collection_uuid = zip(Sku_uuid[0::3], Sku_uuid[2::3]) sku_uuid_dictionary = dict(zip_sku_uuid) sku_collection_uuid_dictionary = dict(zip_collection_uuid) dataframe_end_for_import = pd.read_csv(csv_file, index_col=0) if client_product_type == 'Wallpaper': dataframe_end_for_import = dataframe_end_for_import.astype({'Unique_SKU_ID': str}) check_if_exsist_in_csv = dataframe_end_for_import.Unique_SKU_ID.isin(add_diff_client_from_pim) import_filtered_csv = dataframe_end_for_import[check_if_exsist_in_csv] for images in import_filtered_csv['Article_image']: Images_links_to_pim.append(images) import_filtered_csv['Article_image'] = import_filtered_csv['Article_image'].str.split(pat="/").str[-1] df_for_update = dataframe_end_for_import.Unique_SKU_ID.isin(Sku_uuid[0::3]) update_filtered_csv = dataframe_end_for_import[df_for_update] for diff_pim_from_client in match_products: if diff_pim_from_client in sku_uuid_dictionary.keys(): Remove.append(sku_uuid_dictionary[diff_pim_from_client]) for collection_uuid in match_products: if collection_uuid in sku_collection_uuid_dictionary.keys(): Collection_uuid_remove.append(sku_collection_uuid_dictionary[collection_uuid]) else: dataframe_end_for_import = dataframe_end_for_import.astype({'unique_sku_id': str}) check_if_exsist_in_csv = dataframe_end_for_import.unique_sku_id.isin(add_diff_client_from_pim) import_filtered_csv = dataframe_end_for_import[check_if_exsist_in_csv] for images in import_filtered_csv['product_image']: Images_links_to_pim.append(images) import_filtered_csv['product_image'] = import_filtered_csv['product_image'].str.split(pat="/").str[-1] df_for_update = dataframe_end_for_import.unique_sku_id.isin(Sku_uuid[0::3]) update_filtered_csv = dataframe_end_for_import[df_for_update] for diff_pim_from_client in match_products: if diff_pim_from_client in sku_uuid_dictionary.keys(): Remove.append(sku_uuid_dictionary[diff_pim_from_client]) for collection_uuid in match_products: if collection_uuid in sku_collection_uuid_dictionary.keys(): Collection_uuid_remove.append(sku_collection_uuid_dictionary[collection_uuid]) import_filtered_csv.to_csv(csv_file) return update_filtered_csv print('Checking products for remove and add...') update_dataframe = to_csv_import_get_remove() update_dataframe.fillna('', inplace=True) if client_product_type == 'Wallpaper': update_dataframe = update_dataframe.sort_values(by=['Unique_SKU_ID']) else: update_dataframe = update_dataframe.sort_values(by=['unique_sku_id']) def update_product(): type_import = "update" logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s') stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(logging.DEBUG) stdout_handler.setFormatter(formatter) file_handler = logging.FileHandler('logging_history-{}.log'.format(type_import)) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(stdout_handler) Sorted_uuid_list = [] for key in sorted(Update.keys()): Sorted_uuid_list.append(Update[key]) Sorted_uuid_list_collection = [] for key_col in sorted(Update_collection.keys()): Sorted_uuid_list_collection.append(Update_collection[key_col]) if client_product_type == 'Wallpaper': Width = list(update_dataframe['Product_Width']) Length = list(update_dataframe['Wallpaper_roll_length']) Description = list(update_dataframe['Article_description']) Link = list(update_dataframe['Article_Link']) Name = list(update_dataframe['Article_name']) Availability = list(update_dataframe['Article_availability']) else: Width = list(update_dataframe['product_width']) Length = list(update_dataframe['product_length']) Description = list(update_dataframe['product_description']) Link = list(update_dataframe['product_link']) Name = list(update_dataframe['product_name']) Availability = list(update_dataframe['product_availability']) headers = { 'Accept': 'application/json', 'Content-Type': 'applicaton/json', 'Authorization': token } Url_update = [] for collection_uuid, uuid in zip(Sorted_uuid_list_collection, Sorted_uuid_list): url_update_product_data = '{}/api/v2/collection/{}/article/{}'.format(host, collection_uuid, uuid) Url_update.append(url_update_product_data) for width, length, description, link, name, availability, url_pim_product in zip(Width, Length, Description, Link, Name, Availability, Url_update): payload = json.dumps({ "description": description, "link": [link], "locale": "en", "name": name, "product_length": length, "product_width": width, "width": width, "length": length, "availability": availability }) response = requests.request("PATCH", url_pim_product, headers=headers, data=payload) return response.text def download_images(): for link_image_download in Images_links_to_pim: try: urllib.request.urlretrieve(link_image_download, create_image_directory() + '/' + link_image_download.split('/')[-1]) except: print(link_image_download + " can't download.") pass print('Downoliading images...') download_images() def make_zip_archive(): all_images = os.listdir(create_image_directory()) try: with ZipFile(create_directory() + '/Images-{}.zip'.format(client_name), 'w') as zip_wallsauce: for image in all_images: zip_wallsauce.write(create_image_directory() + '/' + image, arcname=image) except: pass print('Making zip file...') make_zip_archive() def check_zip_file_size(): root_directory = Path(create_image_directory()) size_of_dir = sum(f.stat().st_size for f in root_directory.glob('**/*') if f.is_file()) return size_of_dir def delete_product(): global delete_request for uuid_delete, collection_uuid_delete in zip(Remove, Collection_uuid_remove): url_delete = "{}/api/customer/articles".format(host) payload = "{\n \"ids\": [\n \"%s\"\n ]\n}" % (uuid_delete) headers = { 'Accept': 'application/json', 'Content-Type': 'applicaton/json', 'Authorization': token } delete_request = requests.request("DELETE", url_delete, headers=headers, data=payload) return delete_request.status_code def check_status_importing(): url_check = '{}/api/customer/imports/last'.format(host) headers = { 'Accept': 'application/json', 'Authorization': token } payload = {} check_importing_request = requests.request("GET", url_check, headers=headers, data=payload) check_importing = check_importing_request.content.decode('utf8').replace("'", '"') json_check_importing = json.loads(check_importing) return check_importing_request.text, json_check_importing def import_splitting_products(): print('Starting splitting...') try: path_splitting_directory = create_directory() + '/Splitting csv' os.mkdir(path_splitting_directory) except OSError: pass try: path_splitting_images_directory = create_directory() + '/Splitting images' os.mkdir(path_splitting_images_directory) except OSError: pass print('Creating splitting directories...') print('Splitting general csv file...') for i, split_csv in enumerate(pd.read_csv(csv_file, chunksize=500)): split_csv.to_csv('{}/split-murals - {}.csv'.format(path_splitting_directory, i), index=False) for new_split_csv in pd.read_csv('{}/split-murals - {}.csv'.format(path_splitting_directory, i)): csv_file = pd.read_csv('{}/split-murals - {}.csv'.format(path_splitting_directory, i)) image_name = csv_file['product_image'] for one_image_name in image_name: full_path_image = create_image_directory() + '/' + one_image_name try: os.mkdir(path_splitting_images_directory + '/Images for ' + 'split-murals - {}'.format(i)) except OSError: pass try: shutil.copy(full_path_image, path_splitting_images_directory + '/Images for ' + 'split-murals - {}'.format( i) + '/' + one_image_name) except OSError: pass print('Making splittings zip files...') Path_to_images = os.listdir(path_splitting_images_directory) Full_path_of_images = [] for path_image in Path_to_images: full_path = os.path.join(path_splitting_images_directory, path_image) Full_path_of_images.append(full_path) number = 1 Full_path_of_images.sort() for image_directory in Full_path_of_images: try: shutil.make_archive(path_splitting_images_directory + '/{}'.format(image_directory.split('/')[-1]), 'zip', image_directory) number = number + 1 except: pass url_import = '{}/api/customer/data-mappings/{}/imports'.format(host, id_mapping) path_splitting_csv = path_splitting_directory path_splitting_images = path_splitting_images_directory Full_path_zip = [] full_path_splitting_csv = os.listdir(path_splitting_csv) full_path_splitting_images = os.listdir(path_splitting_images) for path_zip in full_path_splitting_images: if '.zip' in path_zip: Full_path_zip.append(path_zip) Path_to_csv = [] Path_to_zip = [] for zip_file in Full_path_zip: full_path = os.path.join(path_splitting_images, zip_file) Path_to_zip.append(full_path) for csv_file in full_path_splitting_csv: full_path_csv = os.path.join(path_splitting_csv, csv_file) Path_to_csv.append(full_path_csv) Path_to_csv.sort() Path_to_zip.sort() for path_splitting_csv, path_splitting_zip in zip(Path_to_csv, Path_to_zip): print('Importing...') print('Now is importing ' + path_splitting_csv.split('/')[-1]) multipart_encoder = MultipartEncoder( fields={ 'data': (path_splitting_csv.split('/')[-1], open(path_splitting_csv, 'rb'), 'text/csv'), 'archive': (path_splitting_zip.split('/')[-1], open(path_splitting_zip, 'rb'), 'application/zip') } ) headers = { 'Accept': 'application/json', 'Content-Type': multipart_encoder.content_type, 'Authorization': token } r = requests.post(url_import, data=multipart_encoder, headers=headers) print(r.status_code, r.text) time.sleep(90) def import_products(): print('Starting usual import...') url_import = '{}/api/customer/data-mappings/{}/imports'.format(host, id_mapping) name_csv_file = csv_file.split('/')[-1] path_to_csv = csv_file name_zip_file = create_directory() + '/Images-{}.zip'.format(client_name) multipart_encoder = MultipartEncoder( fields={ 'data': (name_csv_file, open(path_to_csv, 'rb'), 'text/csv'), 'archive': (name_zip_file.split('/')[-1], open(name_zip_file, 'rb'), 'application/zip') } ) headers = { 'Accept': 'application/json', 'Content-Type': multipart_encoder.content_type, 'Authorization': token } r = requests.post(url_import, data=multipart_encoder, headers=headers) print(r.status_code, r.text) print('Starting delete process...') try: delete_status_code = delete_product() print('There are some products for deleting.') except: delete_status_code = 202 print('No product for deleting.') delete_status_code = 202 if delete_status_code == 202 or delete_status_code == 200: try: print('Updating products...') update_product() except: print('No Updating.') print('Starting importing process...') logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s') stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(logging.DEBUG) stdout_handler.setFormatter(formatter) file_handler = logging.FileHandler('logging_history-{}.log'.format(client_product_type)) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(stdout_handler) if check_zip_file_size() > int(target_size): import_splitting_products() json_status_importing = None starttime = time.time() while json_status_importing == None: check_importing_request, json_check_importing = check_status_importing() json_status_importing = json_check_importing['data']['result'] logger.info(str(json_status_importing) + str(json_check_importing)) time.sleep(3 - ((time.time() - starttime) % 3)) else: import_products() json_status_importing = None starttime = time.time() while json_status_importing == None: check_importing_request, json_check_importing = check_status_importing() json_status_importing = json_check_importing['data']['result'] logger.info(str(json_status_importing) + str(json_check_importing)) time.sleep(3 - ((time.time() - starttime) % 3)) logger.info(check_status_importing()) print('Updating products...') update_product()