Wednesday, March 18, 2026

 Sample code to import sharepoint data to Azure Storage Account:

# Databricks notebook source

import os

os.environ["PROD_DSS_SHAREPOINT_CLIENT_ID"] = ""

os.environ["PROD_DSS_SHAREPOINT_CLIENT_SECRET"] = ""

os.environ["PROD_DSS_SHAREPOINT_TENANT_ID"] = ""

# COMMAND ----------

# Optional: use dbutils secrets for sensitive values

# client_secret = dbutils.secrets.get(scope="my-scope", key="sharepoint-client-secret")

# client_id = dbutils.secrets.get(scope="my-scope", key="sharepoint-client-id")

# tenant_id = dbutils.secrets.get(scope="my-scope", key="sharepoint-tenant-id")

# COMMAND ----------

import requests

from msal import ConfidentialClientApplication

from azure.identity import DefaultAzureCredential

from azure.storage.blob import BlobServiceClient

import json

import time

# === SharePoint App Registration ===

import os

client_id = os.environ["PROD_DSS_SHAREPOINT_CLIENT_ID"]

client_secret = os.environ["PROD_DSS_SHAREPOINT_CLIENT_SECRET"]

tenant_id = os.environ["PROD_DSS_SHAREPOINT_TENANT_ID"]

siteId = "site01/EI/"

listId = "Links"

authority = f"https://login.microsoftonline.com/{tenant_id}"

scope = ["https://graph.microsoft.com/.default"]

app = ConfidentialClientApplication(

    client_id,

    authority=authority,

    client_credential=client_secret

)

def get_graph_token():

    token = app.acquire_token_for_client(scopes=scope)

    return token["access_token"]

# COMMAND ----------

# === Azure Storage ===

storage_account = "someaccount01"

container_name = "container01"

credential = DefaultAzureCredential()

blob_service = BlobServiceClient(

    f"https://{storage_account}.blob.core.windows.net",

    credential=credential

)

container_client = blob_service.get_container_client(container_name)

# COMMAND ----------

checkpoint_blob = container_client.get_blob_client("_checkpoints/sharepoint_copied.json")

def load_checkpoint():

    try:

        data = checkpoint_blob.download_blob().readall()

        return set(json.loads(data))

    except Exception:

        return set()

def save_checkpoint(copied_ids):

    checkpoint_blob.upload_blob(

        json.dumps(list(copied_ids)),

        overwrite=True

    )

# COMMAND ----------

import requests

from msal import ConfidentialClientApplication

# The SharePoint host and site path from your URL

hostname = "uhgazure.sharepoint.com"

site_path = "/sites/site01/EI" # server-relative path (no trailing path/to/lists)

# MSAL app-only token

authority = f"https://login.microsoftonline.com/{tenant_id}"

app = ConfidentialClientApplication(client_id, authority=authority, client_credential=client_secret)

token = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])

access_token = token.get("access_token")

headers = {"Authorization": f"Bearer {access_token}"}

# 1) Resolve site by path -> site-id

site_url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}"

r = requests.get(site_url, headers=headers)

r.raise_for_status()

site = r.json()

site_id = site["id"]

print("Site id:", site_id)

# 2) List drives (document libraries) for the site -> find drive id

drives_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"

r = requests.get(drives_url, headers=headers)

r.raise_for_status()

drives = r.json().get("value", [])

for d in drives:

    print("Drive name:", d["name"], "Drive id:", d["id"])

# If you know the library name, pick it:

target_library = "Documents" # or the library name you expect

drive_id = next((d["id"] for d in drives if d["name"] == target_library), None)

print("Selected drive id:", drive_id)

# COMMAND ----------

drive_id = "pick-a-value-from-above-output"

def list_all_items():

    token = get_graph_token()

    headers = {"Authorization": f"Bearer {token}"}

    url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children"

    items = []

    while url:

        resp = requests.get(url, headers=headers)

        resp.raise_for_status()

        data = resp.json()

        items.extend(data.get("value", []))

        url = data.get("@odata.nextLink") # pagination

    return items

# COMMAND ----------

def download_file(item_id):

    token = get_graph_token()

    headers = {"Authorization": f"Bearer {token}"}

    url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/content"

    resp = requests.get(url, headers=headers)

    resp.raise_for_status()

    return resp.content

# COMMAND ----------

os.environ["STORAGE_CONNECTION_STRING"]

# COMMAND ----------

connection_string = os.environ["STORAGE_CONNECTION_STRING"]

# COMMAND ----------

import os

import mimetypes

from io import BytesIO

from typing import Optional, Dict

import requests

from azure.core.exceptions import ResourceExistsError, ServiceRequestError, ClientAuthenticationError, HttpResponseError

from azure.storage.blob import (

    BlobServiceClient,

    BlobClient,

    ContentSettings

)

# ----------------------------

# Azure Blob upload helpers

# ----------------------------

def _get_blob_service_client(

    *,

    connection_string: Optional[str] = None,

    account_url: Optional[str] = None,

    sas_token: Optional[str] = None

) -> BlobServiceClient:

    """

    Create a BlobServiceClient from one of:

      - connection_string

      - account_url + sas_token (e.g., https://<acct>.blob.core.windows.net/?<sas>)

    """

    if connection_string:

        return BlobServiceClient.from_connection_string(connection_string)

    if account_url and sas_token:

        # Ensure sas_token starts with '?'

        sas = sas_token if sas_token.startswith('?') else f'?{sas_token}'

        return BlobServiceClient(account_url=account_url, credential=sas)

    raise ValueError("Provide either connection_string OR (account_url AND sas_token).")

def upload_bytes_to_blob(

    data: bytes,

    *,

    container_name: str,

    blob_name: str,

    connection_string: Optional[str] = None,

    account_url: Optional[str] = None,

    sas_token: Optional[str] = None,

    content_type: Optional[str] = None,

    overwrite: bool = False,

    metadata: Optional[Dict[str, str]] = None

) -> str:

    """

    Uploads a bytes object to Azure Blob Storage and returns the blob URL.

    Parameters:

        data (bytes): File content.

        container_name (str): Target container name.

        blob_name (str): Target blob name (e.g., 'reports/file.pdf').

        connection_string/account_url/sas_token: Auth options.

        content_type (str): MIME type; if None, guessed from blob_name.

        overwrite (bool): Replace if exists.

        metadata (dict): Optional metadata key/value pairs.

    Returns:

        str: The URL of the uploaded blob.

    """

    # Guess content type if not provided

    if content_type is None:

        guessed, _ = mimetypes.guess_type(blob_name)

        content_type = guessed or "application/octet-stream"

    # Build client

    bsc = _get_blob_service_client(

        connection_string=connection_string,

        account_url=account_url,

        sas_token=sas_token

    )

    container_client = bsc.get_container_client(container_name)

    # Ensure container exists (idempotent)

    try:

        container_client.create_container()

    except ResourceExistsError:

        pass # already exists

    blob_client: BlobClient = container_client.get_blob_client(blob_name)

    # Upload

    try:

        content_settings = ContentSettings(content_type=content_type)

        # Use a stream to be memory-friendly for large files, though we already have bytes

        stream = BytesIO(data)

        blob_client.upload_blob(

            stream,

            overwrite=overwrite,

            metadata=metadata,

            content_settings=content_settings

        )

    except ResourceExistsError:

        if not overwrite:

            raise

    except ClientAuthenticationError as e:

        raise RuntimeError(f"Authentication failed when uploading blob: {e}") from e

    except (ServiceRequestError, HttpResponseError) as e:

        raise RuntimeError(f"Blob upload failed: {e}") from e

    # Construct and return URL (works for both conn string and SAS)

    return blob_client.url

# ----------------------------

# Orchestrator

# ----------------------------

def download_and_upload_to_blob(

    *,

    item_id: str,

    container_name: str,

    blob_name: str,

    connection_string: Optional[str] = None,

    account_url: Optional[str] = None,

    sas_token: Optional[str] = None,

    content_type: Optional[str] = None,

    overwrite: bool = False,

    metadata: Optional[Dict[str, str]] = None

) -> str:

    """

    Downloads a file from Microsoft Graph using the provided item_id and uploads it to Azure Blob Storage.

    Returns the blob URL.

    """

    # 1) Download bytes from Graph

    file_bytes = download_file(item_id)

    # 2) Upload to Blob

    blob_url = upload_bytes_to_blob(

        file_bytes,

        container_name=container_name,

        blob_name=blob_name,

        connection_string=connection_string,

        account_url=account_url,

        sas_token=sas_token,

        content_type=content_type,

        overwrite=overwrite,

        metadata=metadata,

    )

    return blob_url

# COMMAND ----------

items = list_all_items()

# COMMAND ----------

print(len(items))

# COMMAND ----------

print(items[0:3])

# COMMAND ----------

import json

print(json.dumps(items[0], indent=4))

# COMMAND ----------

from urllib.parse import urlparse, parse_qs, unquote

def after_ei_from_xml_location(url: str, *, decode: bool = True) -> str:

    """

    Extracts the substring after '/EI/' from the XmlLocation query parameter.

    Args:

        url: The full URL containing the XmlLocation query parameter.

        decode: If True, URL-decodes the result (default True).

    Returns:

        The substring after '/EI/' from XmlLocation, or an empty string if not found.

    """

    parsed = urlparse(url)

    qs = parse_qs(parsed.query)

    xml_loc_values = qs.get("XmlLocation")

    # print(f"xml_loc_values={xml_loc_values}")

    if not xml_loc_values:

        return "" # XmlLocation not present

    # Take the first XmlLocation value

    xml_loc = xml_loc_values[0]

    if decode:

        xml_loc = unquote(xml_loc)

    # print(f"xml_loc={xml_loc}")

    marker = "/EI/"

    if marker not in xml_loc:

        return "" # No /EI/ in the XmlLocation value

    return xml_loc.split(marker, 1)[1]

# COMMAND ----------

download_and_upload_to_blob(item_id=items[0]["id"], container_name="iris", blob_name="domestic/EI/" + after_ei_from_xml_location(url=items[0]['webUrl']), connection_string=connection_string)

# COMMAND ----------

max_size = max(entry["size"] for entry in items)

sum_size = sum(entry["size"] for entry in items)

len_items = len(items)

print(f"max_size={max_size}, sum_size={sum_size}, len_items={len_items}")

# COMMAND ----------

for item in items:

    try:

        download_and_upload_to_blob(item_id=item["id"], container_name="iris", blob_name="domestic/EI/" + after_ei_from_xml_location(url=item['webUrl']), connection_string=connection_string)

        print(f"{item["webUrl"]}")

    except Exception as e:

        print(f"Error: {e}, item_id={item["id"]}, item_url={item["webUrl"]}")

Reference: previous article for context: https://1drv.ms/w/c/d609fb70e39b65c8/IQBV3Sd02qPlRa_y13mxnxHSAa6mrM4rmM3pnvbWPW7RpIE?e=b1nGOi

Sample code to import sharepoint data to Azure Storage Account:

# Databricks notebook source

import os

os.environ["PROD_DSS_SHAREPOINT_CLIENT_ID"] = ""

os.environ["PROD_DSS_SHAREPOINT_CLIENT_SECRET"] = ""

os.environ["PROD_DSS_SHAREPOINT_TENANT_ID"] = ""

# COMMAND ----------

# Optional: use dbutils secrets for sensitive values

# client_secret = dbutils.secrets.get(scope="my-scope", key="sharepoint-client-secret")

# client_id = dbutils.secrets.get(scope="my-scope", key="sharepoint-client-id")

# tenant_id = dbutils.secrets.get(scope="my-scope", key="sharepoint-tenant-id")

# COMMAND ----------

import requests

from msal import ConfidentialClientApplication

from azure.identity import DefaultAzureCredential

from azure.storage.blob import BlobServiceClient

import json

import time

# === SharePoint App Registration ===

import os

client_id = os.environ["PROD_DSS_SHAREPOINT_CLIENT_ID"]

client_secret = os.environ["PROD_DSS_SHAREPOINT_CLIENT_SECRET"]

tenant_id = os.environ["PROD_DSS_SHAREPOINT_TENANT_ID"]

siteId = "site01/EI/"

listId = "Links"

authority = f"https://login.microsoftonline.com/{tenant_id}"

scope = ["https://graph.microsoft.com/.default"]

app = ConfidentialClientApplication(

    client_id,

    authority=authority,

    client_credential=client_secret

)

def get_graph_token():

    token = app.acquire_token_for_client(scopes=scope)

    return token["access_token"]

# COMMAND ----------

# === Azure Storage ===

storage_account = "someaccount01"

container_name = "container01"

credential = DefaultAzureCredential()

blob_service = BlobServiceClient(

    f"https://{storage_account}.blob.core.windows.net",

    credential=credential

)

container_client = blob_service.get_container_client(container_name)

# COMMAND ----------

checkpoint_blob = container_client.get_blob_client("_checkpoints/sharepoint_copied.json")

def load_checkpoint():

    try:

        data = checkpoint_blob.download_blob().readall()

        return set(json.loads(data))

    except Exception:

        return set()

def save_checkpoint(copied_ids):

    checkpoint_blob.upload_blob(

        json.dumps(list(copied_ids)),

        overwrite=True

    )

# COMMAND ----------

import requests

from msal import ConfidentialClientApplication

# The SharePoint host and site path from your URL

hostname = "uhgazure.sharepoint.com"

site_path = "/sites/site01/EI" # server-relative path (no trailing path/to/lists)

# MSAL app-only token

authority = f"https://login.microsoftonline.com/{tenant_id}"

app = ConfidentialClientApplication(client_id, authority=authority, client_credential=client_secret)

token = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])

access_token = token.get("access_token")

headers = {"Authorization": f"Bearer {access_token}"}

# 1) Resolve site by path -> site-id

site_url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}"

r = requests.get(site_url, headers=headers)

r.raise_for_status()

site = r.json()

site_id = site["id"]

print("Site id:", site_id)

# 2) List drives (document libraries) for the site -> find drive id

drives_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"

r = requests.get(drives_url, headers=headers)

r.raise_for_status()

drives = r.json().get("value", [])

for d in drives:

    print("Drive name:", d["name"], "Drive id:", d["id"])

# If you know the library name, pick it:

target_library = "Documents" # or the library name you expect

drive_id = next((d["id"] for d in drives if d["name"] == target_library), None)

print("Selected drive id:", drive_id)

# COMMAND ----------

drive_id = "pick-a-value-from-above-output"

def list_all_items():

    token = get_graph_token()

    headers = {"Authorization": f"Bearer {token}"}

    url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children"

    items = []

    while url:

        resp = requests.get(url, headers=headers)

        resp.raise_for_status()

        data = resp.json()

        items.extend(data.get("value", []))

        url = data.get("@odata.nextLink") # pagination

    return items

# COMMAND ----------

def download_file(item_id):

    token = get_graph_token()

    headers = {"Authorization": f"Bearer {token}"}

    url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/content"

    resp = requests.get(url, headers=headers)

    resp.raise_for_status()

    return resp.content

# COMMAND ----------

os.environ["STORAGE_CONNECTION_STRING"]

# COMMAND ----------

connection_string = os.environ["STORAGE_CONNECTION_STRING"]

# COMMAND ----------

import os

import mimetypes

from io import BytesIO

from typing import Optional, Dict

import requests

from azure.core.exceptions import ResourceExistsError, ServiceRequestError, ClientAuthenticationError, HttpResponseError

from azure.storage.blob import (

    BlobServiceClient,

    BlobClient,

    ContentSettings

)

# ----------------------------

# Azure Blob upload helpers

# ----------------------------

def _get_blob_service_client(

    *,

    connection_string: Optional[str] = None,

    account_url: Optional[str] = None,

    sas_token: Optional[str] = None

) -> BlobServiceClient:

    """

    Create a BlobServiceClient from one of:

      - connection_string

      - account_url + sas_token (e.g., https://<acct>.blob.core.windows.net/?<sas>)

    """

    if connection_string:

        return BlobServiceClient.from_connection_string(connection_string)

    if account_url and sas_token:

        # Ensure sas_token starts with '?'

        sas = sas_token if sas_token.startswith('?') else f'?{sas_token}'

        return BlobServiceClient(account_url=account_url, credential=sas)

    raise ValueError("Provide either connection_string OR (account_url AND sas_token).")

def upload_bytes_to_blob(

    data: bytes,

    *,

    container_name: str,

    blob_name: str,

    connection_string: Optional[str] = None,

    account_url: Optional[str] = None,

    sas_token: Optional[str] = None,

    content_type: Optional[str] = None,

    overwrite: bool = False,

    metadata: Optional[Dict[str, str]] = None

) -> str:

    """

    Uploads a bytes object to Azure Blob Storage and returns the blob URL.

    Parameters:

        data (bytes): File content.

        container_name (str): Target container name.

        blob_name (str): Target blob name (e.g., 'reports/file.pdf').

        connection_string/account_url/sas_token: Auth options.

        content_type (str): MIME type; if None, guessed from blob_name.

        overwrite (bool): Replace if exists.

        metadata (dict): Optional metadata key/value pairs.

    Returns:

        str: The URL of the uploaded blob.

    """

    # Guess content type if not provided

    if content_type is None:

        guessed, _ = mimetypes.guess_type(blob_name)

        content_type = guessed or "application/octet-stream"

    # Build client

    bsc = _get_blob_service_client(

        connection_string=connection_string,

        account_url=account_url,

        sas_token=sas_token

    )

    container_client = bsc.get_container_client(container_name)

    # Ensure container exists (idempotent)

    try:

        container_client.create_container()

    except ResourceExistsError:

        pass # already exists

    blob_client: BlobClient = container_client.get_blob_client(blob_name)

    # Upload

    try:

        content_settings = ContentSettings(content_type=content_type)

        # Use a stream to be memory-friendly for large files, though we already have bytes

        stream = BytesIO(data)

        blob_client.upload_blob(

            stream,

            overwrite=overwrite,

            metadata=metadata,

            content_settings=content_settings

        )

    except ResourceExistsError:

        if not overwrite:

            raise

    except ClientAuthenticationError as e:

        raise RuntimeError(f"Authentication failed when uploading blob: {e}") from e

    except (ServiceRequestError, HttpResponseError) as e:

        raise RuntimeError(f"Blob upload failed: {e}") from e

    # Construct and return URL (works for both conn string and SAS)

    return blob_client.url

# ----------------------------

# Orchestrator

# ----------------------------

def download_and_upload_to_blob(

    *,

    item_id: str,

    container_name: str,

    blob_name: str,

    connection_string: Optional[str] = None,

    account_url: Optional[str] = None,

    sas_token: Optional[str] = None,

    content_type: Optional[str] = None,

    overwrite: bool = False,

    metadata: Optional[Dict[str, str]] = None

) -> str:

    """

    Downloads a file from Microsoft Graph using the provided item_id and uploads it to Azure Blob Storage.

    Returns the blob URL.

    """

    # 1) Download bytes from Graph

    file_bytes = download_file(item_id)

    # 2) Upload to Blob

    blob_url = upload_bytes_to_blob(

        file_bytes,

        container_name=container_name,

        blob_name=blob_name,

        connection_string=connection_string,

        account_url=account_url,

        sas_token=sas_token,

        content_type=content_type,

        overwrite=overwrite,

        metadata=metadata,

    )

    return blob_url

# COMMAND ----------

items = list_all_items()

# COMMAND ----------

print(len(items))

# COMMAND ----------

print(items[0:3])

# COMMAND ----------

import json

print(json.dumps(items[0], indent=4))

# COMMAND ----------

from urllib.parse import urlparse, parse_qs, unquote

def after_ei_from_xml_location(url: str, *, decode: bool = True) -> str:

    """

    Extracts the substring after '/EI/' from the XmlLocation query parameter.

    Args:

        url: The full URL containing the XmlLocation query parameter.

        decode: If True, URL-decodes the result (default True).

    Returns:

        The substring after '/EI/' from XmlLocation, or an empty string if not found.

    """

    parsed = urlparse(url)

    qs = parse_qs(parsed.query)

    xml_loc_values = qs.get("XmlLocation")

    # print(f"xml_loc_values={xml_loc_values}")

    if not xml_loc_values:

        return "" # XmlLocation not present

    # Take the first XmlLocation value

    xml_loc = xml_loc_values[0]

    if decode:

        xml_loc = unquote(xml_loc)

    # print(f"xml_loc={xml_loc}")

    marker = "/EI/"

    if marker not in xml_loc:

        return "" # No /EI/ in the XmlLocation value

    return xml_loc.split(marker, 1)[1]

# COMMAND ----------

download_and_upload_to_blob(item_id=items[0]["id"], container_name="iris", blob_name="domestic/EI/" + after_ei_from_xml_location(url=items[0]['webUrl']), connection_string=connection_string)

# COMMAND ----------

max_size = max(entry["size"] for entry in items)

sum_size = sum(entry["size"] for entry in items)

len_items = len(items)

print(f"max_size={max_size}, sum_size={sum_size}, len_items={len_items}")

# COMMAND ----------

for item in items:

    try:

        download_and_upload_to_blob(item_id=item["id"], container_name="iris", blob_name="domestic/EI/" + after_ei_from_xml_location(url=item['webUrl']), connection_string=connection_string)

        print(f"{item["webUrl"]}")

    except Exception as e:

        print(f"Error: {e}, item_id={item["id"]}, item_url={item["webUrl"]}")


No comments:

Post a Comment