/
Example of the Shopify Connector (one-way)

Example of the Shopify Connector (one-way)

This documentation assumes that you already have a token and secret key. You can get them by this instruction.

Connector from Shopify - Wizart PIM (one-way)

Attention! Here, the built-in Python library 'shopify' is used to facilitate working with Shopify.

First, let's initialize the libraries that we will need during the writing process:

import chardet import numpy as np import pandas as pd import os import requests import json import shopify

Please be careful, as there may be additional libraries required. This is just a subset of them. In your case, additional libraries may be needed.

Next, we will initialize the variables that we will need (it’s example, you need add to the code real credentials):

api_key = "{{your_api_key}}" api_secret = "{{your_api_secret}}" shop_url = "yourstoreurl.myshopify.com" api_version = "{{api_version}}" #Please check the official documentation to find the newest version of the API access_token = "{{your_access_token}}" vendor_code_check = "https://pim-client.wizart.ai/api/articles/available-vendor-codes" host = "https://pim-client.wizart.ai" client_id = "{{your_client_id}}" #You can get it from support@wizart.ai client_secret = "{{your_client_secret}}" #You can get it from support@wizart.ai grant_type = "{{your_grant_type}}" #You can get it from support@wizart.ai password = "{{your_pim_password}}" username = "{{your_pim_password}}"

And the rest of the code:

Brands = [] Data = [] Products = [] Products_data = [] BASE_DIR = Path(__file__).resolve().parent download_images = [] def get_client_token(): url_access_token = f'{host}/oauth/token' headers = { "Accept": "application/json", "Content-Type": "application/json" } payload = { "username": username, "password": password, "client_secret": client_secret, "client_id": client_id, "grant_type": grant_type } files = [] get_token_info = requests.request("POST", url_access_token, headers=headers, data=json.dumps(payload), files=files) token_info = get_token_info.content.decode('utf8').replace("'", '"') json_token_info = json.loads(token_info) try: access_token = json_token_info['token_type'] + ' ' + json_token_info['access_token'] except (TypeError, ValueError): access_token = bearer_token return access_token def get_exists_product(token): headers = { 'Accept': 'application/json', 'Authorization': token } page_num = 1 url_update = '{}/api/articles'.format(host) get_uploaded_products = requests.get(url_update, headers=headers, timeout=None) uploaded_products = get_uploaded_products.text json_uploaded_products = json.loads(uploaded_products) data_uploaded_products = json_uploaded_products total_num_page = data_uploaded_products['meta']['last_page'] print(total_num_page) for page_num in range(1, total_num_page + 1): try: url_get_exist_product = f'{host}/api/articles?page={page_num}' get_exist_product = requests.get(url_get_exist_product, headers=headers, timeout=None) all_uploaded_products = get_exist_product.text json_uploaded_products = json.loads(all_uploaded_products) data_uploaded_products = json_uploaded_products['data'] Data.append(data_uploaded_products) page_num = page_num + 1 time.sleep(2) except: pass def get_uuids(): for datas in Data: for data in datas: Products.append(data['uuid']) Products.append(data['collection']['uuid']) def get_all_products(token, Product_uuid, Collection_uuid): for product_uuid, collection_uuid in zip(Product_uuid, Collection_uuid): url = f"{host}/api/v2/collection/{collection_uuid}/article/{product_uuid}" payload = {} headers = { 'Content-Type': 'application/json', 'Authorization': token } response = requests.request("GET", url, headers=headers, data=payload) products = json.loads(response.text) Products_data.append(products) def download(): brand_names = download_images[0::3] collection_names = download_images[1::3] article_images = download_images[2::3] for brand_name, collection_name, article_image in zip(brand_names, collection_names, article_images): article_name = article_image.split('/')[-1].split('?')[0] collection_dir = BASE_DIR / brand_name / collection_name dest_filepath = collection_dir / article_name collection_dir.mkdir(parents=True, exist_ok=True) urllib.request.urlretrieve(article_image, dest_filepath) def update_products(products_info_body, vendor_code, sku, name, prices): session = shopify.Session(shop_url, api_version, access_token) shopify.ShopifyResource.activate_session(session) print('Updating...') for uuids_products in products_info_body: uuid_product = uuids_products['node']['id'] for uuids_variants in uuids_products['node']['variants']['edges']: uuid_variant = uuids_variants['node']['id'] new_product = shopify.Product.find(uuid_product.split('/')[-1]) new_product.title = name variant = shopify.Variant.find(uuid_variant.split('/')[-1]) try: variant.price = prices[0] except (TypeError, ValueError): pass variant.sku = sku new_product.variants = [variant] new_product.save() def import_products(products_data_import): session = shopify.Session(shop_url, api_version, access_token) shopify.ShopifyResource.activate_session(session) print('Importing...') name = products_data_import['name'] description = '' sku = products_data_import['sku'] price = products_data_import['regular_price'] image_name = products_data_import['image_path'].split('/')[-1] product_type = "" vendor = products_data_import['brand']['name'] tags = "" download() variant = shopify.Variant() variant.sku = sku try: variant.price = price[0] except: pass variant.requires_shipping = True variant.taxable = True new_product = shopify.Product() new_product.title = name new_product.body_html = description new_product.vendor = vendor new_product.product_type = product_type new_product.variants = [variant] new_product.tags = tags new_product.save() image = shopify.Image() image.product_id = new_product.id collection_dir = BASE_DIR / products_data_import['brand']['name'] / products_data_import['collection']['name'] dest_filepath = collection_dir / image_name with open(dest_filepath, "rb") as f: image.attach_image(f.read(), filename=image_name) image.save() def from_pim_to_store(): session = shopify.Session(shop_url, api_version, access_token) shopify.ShopifyResource.activate_session(session) products_info_body = [] for products_data in Products_data: vendor_code = products_data['data']['vendor_code'] sku = products_data['data']['sku'] name = products_data['data']['name'] prices = products_data['data']['regular_price'] products_query_id = ''' query { products(first:1, query:"sku:%s") { pageInfo { hasNextPage hasPreviousPage } edges { cursor node { id variants(first: 1) { edges { node { sku id } } } } } } }''' % vendor_code products_query_ids = shopify.GraphQL().execute(products_query_id) products_info_body = json.loads(products_query_ids)['data']['products']['edges'] if not products_info_body: print(products_data['data']['name'], products_data['data']['sku']) download_images.append(products_data['data']['brand']['name']) download_images.append(products_data['data']['collection']['name']) download_images.append(f"{images_pim_url}/{products_data['data']['image_path']}?resize={resize_images}&type={type_images}") import_products(products_data['data']) else: update_products(products_info_body, vendor_code, sku, name, prices) def main(): token = get_client_token() print(f'Token from PIM: {token}') print('Getting exist products from PIM...') get_exists_product(token) print('Getting uuids...') get_uuids() print('Getting products...') get_all_products(token, Products[0::2], Products[1::2]) print('Importing or updating...') from_pim_to_store() if __name__ == '__main__': main()

Please remember that this is just an example. It shows the logic and can't be used by you in production.

Related content