def woocommerce_api_products():
wcapi = API(
url=url,
consumer_key=consumer_key,
consumer_secret=consumer_secret,
wp_api=True,
version=version,
query_string_auth=True
)
customized_size = 'No' # Customized size или or no
response_attributes = wcapi.get("products/attributes").json()
number_id_attribute = ''
slug_attribute = ''
attribute_term_id = ''
for response_attribute in response_attributes:
if response_attribute['name'] == name_attribute:
number_id_attribute = response_attribute['id']
slug_attribute = response_attribute['slug']
attributes_term = wcapi.get("products/attributes/{}/terms".format(number_id_attribute)).json()
for attribute_term in attributes_term:
if attribute_term['name'] == client_product_type:
attribute_term_id = attribute_term['id']
response_first_page = wcapi.get("products",
params={"attribute": slug_attribute, "attribute_term": attribute_term_id,
"status": wordpress_status})
response_first_page_remove = wcapi.get("products",
params={"attribute": slug_attribute, "attribute_term": attribute_term_id,
"status": wordpress_status_remove})
total_pages = int(response_first_page.headers['X-WP-TotalPages'])
total_pages_remove = int(response_first_page_remove.headers['X-WP-TotalPages'])
page = 1
page_remove = 1
while page <= total_pages:
all_products = wcapi.get("products",
params={"page": page, "attribute": slug_attribute, "status": wordpress_status,
"attribute_term": attribute_term_id}).json()
for product in all_products:
All_products.append(product)
page = page + 1
while page_remove <= total_pages_remove:
all_products_remove = wcapi.get("products", params={"page": page_remove, "attribute": slug_attribute,
"status": wordpress_status_remove,
"attribute_term": attribute_term_id}).json()
for product_remove in all_products_remove:
All_products_remove.append(product_remove)
page_remove = page_remove + 1
print('Getting woocommerce information...')
woocommerce_api_products()
def api_prepare_import():
def get_client_token():
url_access_token = '{}/oauth/token'.format(host)
headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
payload = {
"username": username,
"password": password,
"client_secret": client_secret,
"client_id": client_id,
"grant_type": grant_type
}
files = []
get_token_info = requests.request("POST", url_access_token, headers=headers, data=json.dumps(payload),
files=files)
token_info = get_token_info.content.decode('utf8').replace("'", '"')
json_token_info = json.loads(token_info)
access_token = json_token_info['token_type'] + ' ' + json_token_info['access_token']
return access_token
def get_product_type():
url_get_product_type = '{}/api/v2/import/product-types'.format(host)
product_code = ''
headers = {
"Accept": "application/json",
"Authorization": get_client_token()
}
get_product_types = requests.get(url_get_product_type, headers=headers)
product_types = get_product_types.content.decode('utf8').replace("'", '"')
json_product_types = json.loads(product_types)
for each_product_type in json_product_types['data']:
if client_product_type in each_product_type['attributes']['name']:
product_code = each_product_type['attributes']['code']
return product_code
def get_data_mapping_id():
url_data_mapping = '{}/api/customer/data-mappings?product_type={}'.format(host, get_product_type())
headers = {
"Accept": "application/json",
"Authorization": get_client_token()
}
get_data_mapping = requests.get(url_data_mapping, headers=headers)
data_mapping = get_data_mapping.content.decode('utf8').replace("'", '"')
json_data_mapping = json.loads(data_mapping)
for each_data_mapping in json_data_mapping['data']:
id_data_mapping = each_data_mapping['id']
return id_data_mapping
def get_default_csv():
url_default_csv = '{}/api/customer/data-mappings/default/{}'.format(host, get_product_type())
headers = {
"Accept": "text/csv",
"Authorization": get_client_token()
}
get_default_csv = requests.get(url_default_csv, headers=headers)
default_csv = get_default_csv.content.decode("utf-8")
csv_columns_murals = default_csv.split(',')
return csv_columns_murals
get_client_token()
get_data_mapping_id()
get_default_csv()
return get_client_token(), get_data_mapping_id(), get_default_csv(), get_product_type()
print('Getting API information...')
token, id_mapping, template_csv, product_type = api_prepare_import()
def create_directory():
path_to_basic_directory = path + '/{}-{}'.format(client_name + '-' + date_of_get_json, client_product_type)
try:
os.mkdir(path_to_basic_directory)
except OSError:
pass
return path_to_basic_directory
csv_file = "{}/{}-{}.csv".format(create_directory(), date_of_get_json, client_product_type)
def create_image_directory():
try:
path_images_directory = create_directory() + '/{}-Images'.format(client_product_type)
os.mkdir(path_images_directory)
except OSError:
pass
return path_images_directory
Data = []
def get_exists_product():
headers = {
'Accept': 'application/json',
'Authorization': token
}
page_num = 1
url_update = '{}/api/customer/articles?page=1&locale=en'.format(host)
get_uploaded_products = requests.get(url_update, headers=headers, timeout=None)
uploaded_products = get_uploaded_products.text
json_uploaded_products = json.loads(uploaded_products)
data_uploaded_products = json_uploaded_products
total_num_page = data_uploaded_products['meta']['last_page']
for page_num in range(1, total_num_page + 1):
url_get_exist_product = '{}/api/customer/articles?page={}&locale=en'.format(host, page_num)
get_exist_product = requests.get(url_get_exist_product, headers=headers, timeout=None)
all_uploaded_products = get_exist_product.text
json_uploaded_products = json.loads(all_uploaded_products)
data_uploaded_products = json_uploaded_products['data']
Data.append(data_uploaded_products)
page_num = page_num + 1
# return Data
print('Getting exist products from PIM...')
get_exists_product()
Images_link = []
def wallpaper_wordpress_to_csv(csv_file_folder):
csv_file_wallpaper = csv_file_folder
flag_length = 'attributes'
flag_width = 'attributes'
try:
with open(csv_file_wallpaper, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=template_csv, extrasaction='ignore')
writer.writeheader()
for products in All_products:
try:
products['Article_name'] = products['name']
except:
pass
products['Article_Link'] = products['permalink']
products['Article_description'] = products['description']
products['Unique_SKU_ID'] = products['sku']
products['Article_regular_price'] = products['price']
products['Product_Width'] = 1
products['Repeat_Width'] = 1
if products['dimensions']['length'] == '':
flag_length = 'attributes'
pass
else:
products['Wallpaper_roll_length'] = products['dimensions']['length']
flag_length = 'dimensions'
if products['dimensions']['width'] == '':
flag_width = 'attributes'
pass
else:
products['Product_Width'] = products['dimensions']['width']
products['Repeat_Width'] = products['dimensions']['width']
flag_width = 'dimensions'
for images in products['images']:
src = images['src']
image_name = images['name']
Images_link.append(src)
Images_link.append(image_name)
products['Article_image'] = src
for product_attributes in products['attributes']:
if product_attributes['name'] == 'Brand':
for brand_name in product_attributes['options']:
products['Brand_name'] = brand_name
if product_attributes['name'] == 'Collection':
for collection_name in product_attributes['options']:
products['Collection_name'] = collection_name
if flag_width == 'attributes':
if product_attributes['name'] == 'Width':
for width in product_attributes['options']:
products['Product_Width'] = width.replace('N/A', '1')
products['Repeat_Width'] = width.replace('N/A', '1')
if flag_length == 'attributes':
if product_attributes['name'] == 'Length':
for length in product_attributes['options']:
products['Wallpaper_roll_length'] = length
writer.writerow(products)
csvfile.close()
except IOError:
print("I/O error")
def mural_wordpress_to_csv(csv_file_folder):
csv_file_mural = csv_file_folder
flag_length = 'attributes'
flag_width = 'attributes'
try:
with open(csv_file_mural, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=template_csv, extrasaction='ignore')
writer.writeheader()
for products in All_products:
products['product_name'] = products['name']
products['product_link'] = products['permalink']
products['product_description'] = products['description']
products['unique_sku_id'] = products['sku']
products['product_regular_price'] = products['price']
products['customized_size'] = 'Yes'
products['product_width'] = 1
products['pattern_width'] = 1
products['product_length'] = 1
products['pattern_length'] = 1
if products['dimensions']['length'] == '':
flag_length = 'attributes'
pass
else:
products['product_length'] = products['dimensions']['length']
products['pattern_length'] = products['dimensions']['length']
flag_length = 'dimensions'
if products['dimensions']['width'] == '':
flag_width = 'attributes'
pass
else:
products['product_width'] = products['dimensions']['width']
products['pattern_width'] = products['dimensions']['width']
flag_width = 'dimensions'
for images in products['images']:
src = images['src']
image_name = images['name']
Images_link.append(src)
Images_link.append(image_name)
products['product_image'] = src
for product_attributes in products['attributes']:
if product_attributes['name'] == 'Brand':
for brand_name in product_attributes['options']:
products['brand_name'] = brand_name
if product_attributes['name'] == 'Collection':
for collection_name in product_attributes['options']:
products['collection_name'] = collection_name
if flag_width == 'attributes':
if product_attributes['name'] == 'Width':
for width in product_attributes['options']:
if str(width) != '' or width != None or str(width) != 'N/A':
products['customized_size'] = customized_size
products['product_width'] = width
products['pattern_width'] = width
if flag_length == 'attributes':
if product_attributes['name'] == 'Length':
for length in product_attributes['options']:
if str(length) != '' or length != None or str(length) != 'N/A':
products['product_length'] = length
products['pattern_length'] = length
writer.writerow(products)
csvfile.close()
except IOError:
print("I/O error")
if client_product_type == 'Wallpaper':
print('Creating wallpaper csv...')
wallpaper_wordpress_to_csv(csv_file)
elif client_product_type == 'Mural':
print('Creating mural csv...')
mural_wordpress_to_csv(csv_file)
Images_links_to_pim = []
Update = {}
Update_collection = {}
Sku_uuid = []
Client_list = []
Collection_uuid_update = []
Client_remove_sku = []
Remove = []
Collection_uuid_remove = []
def to_csv_import_get_remove():
for product in Data:
for each_exist_product in product:
try:
Sku_uuid.append(each_exist_product['sku'])
Sku_uuid.append(each_exist_product['uuid'])
Sku_uuid.append(each_exist_product['collection']['uuid'])
Update[each_exist_product['sku']] = each_exist_product['uuid']
Update_collection[each_exist_product['sku']] = each_exist_product['collection']['uuid']
except:
pass
df = pd.read_csv(csv_file, index_col=0)
for products_to_remove in All_products_remove:
client_remove_sku = products_to_remove['sku']
Client_remove_sku.append(str(client_remove_sku))
if client_product_type == 'Wallpaper':
df = df.astype({'Unique_SKU_ID': 'object'})
for strings in df['Unique_SKU_ID']:
Client_list.append(str(strings))
else:
df = df.astype({'unique_sku_id': 'object'})
for strings in df['unique_sku_id']:
Client_list.append(str(strings))
add_diff_client_from_pim = list(
set([str(item) for item in Client_list]) - set([str(item) for item in Sku_uuid[0::3]])) # add
match_products = list(
set([str(item) for item in Client_remove_sku]) & set([str(item) for item in Sku_uuid[0::3]])) # update
zip_sku_uuid = zip(Sku_uuid[0::3], Sku_uuid[1::3])
zip_collection_uuid = zip(Sku_uuid[0::3], Sku_uuid[2::3])
sku_uuid_dictionary = dict(zip_sku_uuid)
sku_collection_uuid_dictionary = dict(zip_collection_uuid)
dataframe_end_for_import = pd.read_csv(csv_file, index_col=0)
if client_product_type == 'Wallpaper':
dataframe_end_for_import = dataframe_end_for_import.astype({'Unique_SKU_ID': str})
check_if_exsist_in_csv = dataframe_end_for_import.Unique_SKU_ID.isin(add_diff_client_from_pim)
import_filtered_csv = dataframe_end_for_import[check_if_exsist_in_csv]
for images in import_filtered_csv['Article_image']:
Images_links_to_pim.append(images)
import_filtered_csv['Article_image'] = import_filtered_csv['Article_image'].str.split(pat="/").str[-1]
df_for_update = dataframe_end_for_import.Unique_SKU_ID.isin(Sku_uuid[0::3])
update_filtered_csv = dataframe_end_for_import[df_for_update]
for diff_pim_from_client in match_products:
if diff_pim_from_client in sku_uuid_dictionary.keys():
Remove.append(sku_uuid_dictionary[diff_pim_from_client])
for collection_uuid in match_products:
if collection_uuid in sku_collection_uuid_dictionary.keys():
Collection_uuid_remove.append(sku_collection_uuid_dictionary[collection_uuid])
else:
dataframe_end_for_import = dataframe_end_for_import.astype({'unique_sku_id': str})
check_if_exsist_in_csv = dataframe_end_for_import.unique_sku_id.isin(add_diff_client_from_pim)
import_filtered_csv = dataframe_end_for_import[check_if_exsist_in_csv]
for images in import_filtered_csv['product_image']:
Images_links_to_pim.append(images)
import_filtered_csv['product_image'] = import_filtered_csv['product_image'].str.split(pat="/").str[-1]
df_for_update = dataframe_end_for_import.unique_sku_id.isin(Sku_uuid[0::3])
update_filtered_csv = dataframe_end_for_import[df_for_update]
for diff_pim_from_client in match_products:
if diff_pim_from_client in sku_uuid_dictionary.keys():
Remove.append(sku_uuid_dictionary[diff_pim_from_client])
for collection_uuid in match_products:
if collection_uuid in sku_collection_uuid_dictionary.keys():
Collection_uuid_remove.append(sku_collection_uuid_dictionary[collection_uuid])
import_filtered_csv.to_csv(csv_file)
return update_filtered_csv
print('Checking products for remove and add...')
update_dataframe = to_csv_import_get_remove()
update_dataframe.fillna('', inplace=True)
if client_product_type == 'Wallpaper':
update_dataframe = update_dataframe.sort_values(by=['Unique_SKU_ID'])
else:
update_dataframe = update_dataframe.sort_values(by=['unique_sku_id'])
def update_product():
type_import = "update"
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.DEBUG)
stdout_handler.setFormatter(formatter)
file_handler = logging.FileHandler('logging_history-{}.log'.format(type_import))
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(stdout_handler)
Sorted_uuid_list = []
for key in sorted(Update.keys()):
Sorted_uuid_list.append(Update[key])
Sorted_uuid_list_collection = []
for key_col in sorted(Update_collection.keys()):
Sorted_uuid_list_collection.append(Update_collection[key_col])
if client_product_type == 'Wallpaper':
Width = list(update_dataframe['Product_Width'])
Length = list(update_dataframe['Wallpaper_roll_length'])
Description = list(update_dataframe['Article_description'])
Link = list(update_dataframe['Article_Link'])
Name = list(update_dataframe['Article_name'])
Availability = list(update_dataframe['Article_availability'])
else:
Width = list(update_dataframe['product_width'])
Length = list(update_dataframe['product_length'])
Description = list(update_dataframe['product_description'])
Link = list(update_dataframe['product_link'])
Name = list(update_dataframe['product_name'])
Availability = list(update_dataframe['product_availability'])
headers = {
'Accept': 'application/json',
'Content-Type': 'applicaton/json',
'Authorization': token
}
Url_update = []
for collection_uuid, uuid in zip(Sorted_uuid_list_collection, Sorted_uuid_list):
url_update_product_data = '{}/api/v2/collection/{}/article/{}'.format(host, collection_uuid, uuid)
Url_update.append(url_update_product_data)
for width, length, description, link, name, availability, url_pim_product in zip(Width, Length, Description, Link,
Name, Availability, Url_update):
payload = json.dumps({
"description": description,
"link": [link],
"locale": "en",
"name": name,
"product_length": length,
"product_width": width,
"width": width,
"length": length,
"availability": availability
})
response = requests.request("PATCH", url_pim_product, headers=headers, data=payload)
return response.text
def download_images():
for link_image_download in Images_links_to_pim:
try:
urllib.request.urlretrieve(link_image_download,
create_image_directory() + '/' + link_image_download.split('/')[-1])
except:
print(link_image_download + " can't download.")
pass
print('Downoliading images...')
download_images()
def make_zip_archive():
all_images = os.listdir(create_image_directory())
try:
with ZipFile(create_directory() + '/Images-{}.zip'.format(client_name), 'w') as zip_wallsauce:
for image in all_images:
zip_wallsauce.write(create_image_directory() + '/' + image, arcname=image)
except:
pass
print('Making zip file...')
make_zip_archive()
def check_zip_file_size():
root_directory = Path(create_image_directory())
size_of_dir = sum(f.stat().st_size for f in root_directory.glob('**/*') if f.is_file())
return size_of_dir
def delete_product():
global delete_request
for uuid_delete, collection_uuid_delete in zip(Remove, Collection_uuid_remove):
url_delete = "{}/api/customer/articles".format(host)
payload = "{\n \"ids\": [\n \"%s\"\n ]\n}" % (uuid_delete)
headers = {
'Accept': 'application/json',
'Content-Type': 'applicaton/json',
'Authorization': token
}
delete_request = requests.request("DELETE", url_delete, headers=headers, data=payload)
return delete_request.status_code
def check_status_importing():
url_check = '{}/api/customer/imports/last'.format(host)
headers = {
'Accept': 'application/json',
'Authorization': token
}
payload = {}
check_importing_request = requests.request("GET", url_check, headers=headers, data=payload)
check_importing = check_importing_request.content.decode('utf8').replace("'", '"')
json_check_importing = json.loads(check_importing)
return check_importing_request.text, json_check_importing
def import_splitting_products():
print('Starting splitting...')
try:
path_splitting_directory = create_directory() + '/Splitting csv'
os.mkdir(path_splitting_directory)
except OSError:
pass
try:
path_splitting_images_directory = create_directory() + '/Splitting images'
os.mkdir(path_splitting_images_directory)
except OSError:
pass
print('Creating splitting directories...')
print('Splitting general csv file...')
for i, split_csv in enumerate(pd.read_csv(csv_file, chunksize=500)):
split_csv.to_csv('{}/split-murals - {}.csv'.format(path_splitting_directory, i), index=False)
for new_split_csv in pd.read_csv('{}/split-murals - {}.csv'.format(path_splitting_directory, i)):
csv_file = pd.read_csv('{}/split-murals - {}.csv'.format(path_splitting_directory, i))
image_name = csv_file['product_image']
for one_image_name in image_name:
full_path_image = create_image_directory() + '/' + one_image_name
try:
os.mkdir(path_splitting_images_directory + '/Images for ' + 'split-murals - {}'.format(i))
except OSError:
pass
try:
shutil.copy(full_path_image,
path_splitting_images_directory + '/Images for ' + 'split-murals - {}'.format(
i) + '/' + one_image_name)
except OSError:
pass
print('Making splittings zip files...')
Path_to_images = os.listdir(path_splitting_images_directory)
Full_path_of_images = []
for path_image in Path_to_images:
full_path = os.path.join(path_splitting_images_directory, path_image)
Full_path_of_images.append(full_path)
number = 1
Full_path_of_images.sort()
for image_directory in Full_path_of_images:
try:
shutil.make_archive(path_splitting_images_directory + '/{}'.format(image_directory.split('/')[-1]), 'zip',
image_directory)
number = number + 1
except:
pass
url_import = '{}/api/customer/data-mappings/{}/imports'.format(host, id_mapping)
path_splitting_csv = path_splitting_directory
path_splitting_images = path_splitting_images_directory
Full_path_zip = []
full_path_splitting_csv = os.listdir(path_splitting_csv)
full_path_splitting_images = os.listdir(path_splitting_images)
for path_zip in full_path_splitting_images:
if '.zip' in path_zip:
Full_path_zip.append(path_zip)
Path_to_csv = []
Path_to_zip = []
for zip_file in Full_path_zip:
full_path = os.path.join(path_splitting_images, zip_file)
Path_to_zip.append(full_path)
for csv_file in full_path_splitting_csv:
full_path_csv = os.path.join(path_splitting_csv, csv_file)
Path_to_csv.append(full_path_csv)
Path_to_csv.sort()
Path_to_zip.sort()
for path_splitting_csv, path_splitting_zip in zip(Path_to_csv, Path_to_zip):
print('Importing...')
print('Now is importing ' + path_splitting_csv.split('/')[-1])
multipart_encoder = MultipartEncoder(
fields={
'data': (path_splitting_csv.split('/')[-1], open(path_splitting_csv, 'rb'), 'text/csv'),
'archive': (path_splitting_zip.split('/')[-1], open(path_splitting_zip, 'rb'), 'application/zip')
}
)
headers = {
'Accept': 'application/json',
'Content-Type': multipart_encoder.content_type,
'Authorization': token
}
r = requests.post(url_import, data=multipart_encoder,
headers=headers)
print(r.status_code, r.text)
time.sleep(90)
def import_products():
print('Starting usual import...')
url_import = '{}/api/customer/data-mappings/{}/imports'.format(host, id_mapping)
name_csv_file = csv_file.split('/')[-1]
path_to_csv = csv_file
name_zip_file = create_directory() + '/Images-{}.zip'.format(client_name)
multipart_encoder = MultipartEncoder(
fields={
'data': (name_csv_file, open(path_to_csv, 'rb'), 'text/csv'),
'archive': (name_zip_file.split('/')[-1], open(name_zip_file, 'rb'), 'application/zip')
}
)
headers = {
'Accept': 'application/json',
'Content-Type': multipart_encoder.content_type,
'Authorization': token
}
r = requests.post(url_import, data=multipart_encoder,
headers=headers)
print(r.status_code, r.text)
print('Starting delete process...')
try:
delete_status_code = delete_product()
print('There are some products for deleting.')
except:
delete_status_code = 202
print('No product for deleting.')
delete_status_code = 202
if delete_status_code == 202 or delete_status_code == 200:
try:
print('Updating products...')
update_product()
except:
print('No Updating.')
print('Starting importing process...')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.DEBUG)
stdout_handler.setFormatter(formatter)
file_handler = logging.FileHandler('logging_history-{}.log'.format(client_product_type))
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(stdout_handler)
if check_zip_file_size() > int(target_size):
import_splitting_products()
json_status_importing = None
starttime = time.time()
while json_status_importing == None:
check_importing_request, json_check_importing = check_status_importing()
json_status_importing = json_check_importing['data']['result']
logger.info(str(json_status_importing) + str(json_check_importing))
time.sleep(3 - ((time.time() - starttime) % 3))
else:
import_products()
json_status_importing = None
starttime = time.time()
while json_status_importing == None:
check_importing_request, json_check_importing = check_status_importing()
json_status_importing = json_check_importing['data']['result']
logger.info(str(json_status_importing) + str(json_check_importing))
time.sleep(3 - ((time.time() - starttime) % 3))
logger.info(check_status_importing())
print('Updating products...')
update_product() |