Brands = []
Data = []
Products = []
Products_data = []
BASE_DIR = Path(__file__).resolve().parent
download_images = []
def get_client_token():
url_access_token = f'{host}/oauth/token'
headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
payload = {
"username": username,
"password": password,
"client_secret": client_secret,
"client_id": client_id,
"grant_type": grant_type
}
files = []
get_token_info = requests.request("POST", url_access_token, headers=headers, data=json.dumps(payload),
files=files)
token_info = get_token_info.content.decode('utf8').replace("'", '"')
json_token_info = json.loads(token_info)
try:
access_token = json_token_info['token_type'] + ' ' + json_token_info['access_token']
except (TypeError, ValueError):
access_token = bearer_token
return access_token
def get_exists_product(token):
headers = {
'Accept': 'application/json',
'Authorization': token
}
page_num = 1
url_update = '{}/api/articles'.format(host)
get_uploaded_products = requests.get(url_update, headers=headers, timeout=None)
uploaded_products = get_uploaded_products.text
json_uploaded_products = json.loads(uploaded_products)
data_uploaded_products = json_uploaded_products
total_num_page = data_uploaded_products['meta']['last_page']
print(total_num_page)
for page_num in range(1, total_num_page + 1):
try:
url_get_exist_product = f'{host}/api/articles?page={page_num}'
get_exist_product = requests.get(url_get_exist_product, headers=headers, timeout=None)
all_uploaded_products = get_exist_product.text
json_uploaded_products = json.loads(all_uploaded_products)
data_uploaded_products = json_uploaded_products['data']
Data.append(data_uploaded_products)
page_num = page_num + 1
time.sleep(2)
except:
pass
def get_uuids():
for datas in Data:
for data in datas:
Products.append(data['uuid'])
Products.append(data['collection']['uuid'])
def get_all_products(token, Product_uuid, Collection_uuid):
for product_uuid, collection_uuid in zip(Product_uuid, Collection_uuid):
url = f"{host}/api/v2/collection/{collection_uuid}/article/{product_uuid}"
payload = {}
headers = {
'Content-Type': 'application/json',
'Authorization': token
}
response = requests.request("GET", url, headers=headers, data=payload)
products = json.loads(response.text)
Products_data.append(products)
def download():
brand_names = download_images[0::3]
collection_names = download_images[1::3]
article_images = download_images[2::3]
for brand_name, collection_name, article_image in zip(brand_names, collection_names, article_images):
article_name = article_image.split('/')[-1].split('?')[0]
collection_dir = BASE_DIR / brand_name / collection_name
dest_filepath = collection_dir / article_name
collection_dir.mkdir(parents=True, exist_ok=True)
urllib.request.urlretrieve(article_image, dest_filepath)
def update_products(products_info_body, vendor_code, sku, name, prices):
session = shopify.Session(shop_url, api_version, access_token)
shopify.ShopifyResource.activate_session(session)
print('Updating...')
for uuids_products in products_info_body:
uuid_product = uuids_products['node']['id']
for uuids_variants in uuids_products['node']['variants']['edges']:
uuid_variant = uuids_variants['node']['id']
new_product = shopify.Product.find(uuid_product.split('/')[-1])
new_product.title = name
variant = shopify.Variant.find(uuid_variant.split('/')[-1])
try:
variant.price = prices[0]
except (TypeError, ValueError):
pass
variant.sku = sku
new_product.variants = [variant]
new_product.save()
def import_products(products_data_import):
session = shopify.Session(shop_url, api_version, access_token)
shopify.ShopifyResource.activate_session(session)
print('Importing...')
name = products_data_import['name']
description = ''
sku = products_data_import['sku']
price = products_data_import['regular_price']
image_name = products_data_import['image_path'].split('/')[-1]
product_type = ""
vendor = products_data_import['brand']['name']
tags = ""
download()
variant = shopify.Variant()
variant.sku = sku
try:
variant.price = price[0]
except:
pass
variant.requires_shipping = True
variant.taxable = True
new_product = shopify.Product()
new_product.title = name
new_product.body_html = description
new_product.vendor = vendor
new_product.product_type = product_type
new_product.variants = [variant]
new_product.tags = tags
new_product.save()
image = shopify.Image()
image.product_id = new_product.id
collection_dir = BASE_DIR / products_data_import['brand']['name'] / products_data_import['collection']['name']
dest_filepath = collection_dir / image_name
with open(dest_filepath, "rb") as f:
image.attach_image(f.read(), filename=image_name)
image.save()
def from_pim_to_store():
session = shopify.Session(shop_url, api_version, access_token)
shopify.ShopifyResource.activate_session(session)
products_info_body = []
for products_data in Products_data:
vendor_code = products_data['data']['vendor_code']
sku = products_data['data']['sku']
name = products_data['data']['name']
prices = products_data['data']['regular_price']
products_query_id = ''' query {
products(first:1, query:"sku:%s") {
pageInfo {
hasNextPage
hasPreviousPage
}
edges {
cursor
node {
id
variants(first: 1) {
edges {
node {
sku
id
}
}
}
}
}
}
}''' % vendor_code
products_query_ids = shopify.GraphQL().execute(products_query_id)
products_info_body = json.loads(products_query_ids)['data']['products']['edges']
if not products_info_body:
print(products_data['data']['name'], products_data['data']['sku'])
download_images.append(products_data['data']['brand']['name'])
download_images.append(products_data['data']['collection']['name'])
download_images.append(f"{images_pim_url}/{products_data['data']['image_path']}?resize={resize_images}&type={type_images}")
import_products(products_data['data'])
else:
update_products(products_info_body, vendor_code, sku, name, prices)
def main():
token = get_client_token()
print(f'Token from PIM: {token}')
print('Getting exist products from PIM...')
get_exists_product(token)
print('Getting uuids...')
get_uuids()
print('Getting products...')
get_all_products(token, Products[0::2], Products[1::2])
print('Importing or updating...')
from_pim_to_store()
if __name__ == '__main__':
main() |