Friday, March 20, 2026

 This demonstrates sample data transfer from sharepoint to adls:

# Databricks notebook source

import os

os.environ["SHAREPOINT_CLIENT_ID"] = ""

os.environ["SHAREPOINT_CLIENT_SECRET"] = ""

os.environ["SHAREPOINT_TENANT_ID"] = ""

# COMMAND ----------

import json

import time

import logging

import requests

from msal import ConfidentialClientApplication

from azure.identity import DefaultAzureCredential

from azure.storage.blob import BlobServiceClient, ContentSettings

from urllib.parse import quote

from typing import Dict, Set, List

# COMMAND ----------

# Logging

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger("sp-to-adls")

# COMMAND ----------

client_id = os.environ["SHAREPOINT_CLIENT_ID"]

client_secret = os.environ["SHAREPOINT_CLIENT_SECRET"]

tenant_id = os.environ["SHAREPOINT_TENANT_ID"]

# COMMAND ----------

os.environ["STORAGE_CONNECTION_STRING"] = ""

# COMMAND ----------

connection_string = os.environ["STORAGE_CONNECTION_STRING"]

# COMMAND ----------

def get_graph_token():

    authority = f"https://login.microsoftonline.com/{tenant_id}"

    app = ConfidentialClientApplication(client_id, authority=authority, client_credential=client_secret)

    scope = "https://graph.microsoft.com/.default"

    token = app.acquire_token_for_client(scopes=[scope])

    return token["access_token"]

# COMMAND ----------

def get_drive_name(site_id: str, drive_id: str, token: str) -> str:

    """

    Return the drive name for the given site_id and drive_id using Microsoft Graph.

    Requires a get_graph_token() function that returns a valid app-only access token.

    """

    access_token = None

    if token:

        access_token = token

    if not access_token:

        access_token = get_graph_token()

    headers = {"Authorization": f"Bearer {access_token}"}

    url_drive = f"https://graph.microsoft.com/v1.0/drives/{drive_id}"

    resp = requests.get(url_drive, headers=headers)

    if resp.status_code == 200:

        drive = resp.json()

        return drive.get("name")

    elif resp.status_code in (401, 403):

        # Token issue or permission problem; try refreshing token once

        access_token = get_graph_token()

        headers = {"Authorization": f"Bearer {access_token}"}

        resp = requests.get(url_drive, headers=headers)

        resp.raise_for_status()

        return resp.json().get("name")

    elif resp.status_code == 404:

        # Fallback: list drives under the site and match by id

        url_site_drives = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"

        resp2 = requests.get(url_site_drives, headers=headers)

        resp2.raise_for_status()

        drives = resp2.json().get("value", [])

        for d in drives:

            if d.get("id") == drive_id:

                return d.get("name")

        raise RuntimeError(f"Drive id {drive_id} not found under site {site_id}")

    else:

        # Raise for other unexpected statuses

        resp.raise_for_status()

# COMMAND ----------

def get_drive_name_or_none(site_id: str, drive_id: str, token: str) -> str:

    try:

        drive_name = get_drive_name(site_id, drive_id, token)

        return drive_name.strip('/')

    except Exception as e:

        # print("Failed to resolve drive name:", e)

        return None

# COMMAND ----------

def get_site_id_drive_ids(hostname="myazure.sharepoint.com", site_path="/sites/site1/Deep/to/EI"):

    access_token = get_graph_token()

    headers = {"Authorization": f"Bearer {access_token}"}

    # 1) Resolve site by path -> site-id

    site_url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}"

    r = requests.get(site_url, headers=headers)

    r.raise_for_status()

    site = r.json()

    site_id = site["id"]

    print("Site id:", site_id)

    # 2) List drives (document libraries) for the site -> find drive id

    drives_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"

    r = requests.get(drives_url, headers=headers)

    r.raise_for_status()

    drives = r.json().get("value", [])

    drive_ids = []

    for d in drives:

        print("Drive name:", d["name"], "Drive id:", d["id"])

        drive_ids += [d["id"]]

    return site_id, drive_ids

# COMMAND ----------

site_id, drive_ids = get_site_id_drive_ids()

print(f"site_id={site_id}, drive_ids={drive_ids}")

print(f"drive_ids={drive_ids}")

# COMMAND ----------

from typing import List, Dict

def list_children_for_drive_item(drive_id: str, item_id: str, token: str) -> List[Dict]:

    """

    List children for a drive item (folder) using Graph API with pagination.

    - drive_id: the drive id (not site id)

    - item_id: 'root' or a folder item id

    - token: Graph access token

    Returns list of item dicts (raw Graph objects).

    """

    headers = {"Authorization": f"Bearer {token}"}

    # Use the drive children endpoint; for root use /drives/{drive_id}/root/children

    if item_id == "root":

        url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children"

    else:

        url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/children"

    items = []

    while url:

        resp = requests.get(url, headers=headers)

        resp.raise_for_status()

        payload = resp.json()

        items.extend(payload.get("value", []))

        url = payload.get("@odata.nextLink")

    return items

def enumerate_drive_recursive(site_id: str, drive_id: str, token: str) -> List[Dict]:

    """

    Recursively enumerate all items in the specified drive and return a list of items

    with an added 'relativePath' key for each item.

    - site_id: Graph site id (not used directly in the drive calls but kept for signature parity)

    - drive_id: Graph drive id (document library)

    Returns: List of dicts with keys: id, name, relativePath, file/folder, lastModifiedDateTime, parentReference, ...

    """

    # Assumes get_graph_token() is defined elsewhere in your notebook and returns a valid app-only token

    if not token:

        token = get_graph_token()

    results = []

    stack = [("root", "")] # (item_id, relative_path)

    while stack:

        current_id, current_rel = stack.pop()

        try:

            children = list_children_for_drive_item(drive_id, current_id, token)

        except requests.HTTPError as e:

            # If token expired or transient error, refresh token and retry once

            if e.response.status_code in (401, 403):

                token = get_graph_token()

                children = list_children_for_drive_item(drive_id, current_id, token)

            else:

                raise

        for child in children:

            name = child.get("name", "")

            # Build relative path: if current_rel is empty, child_rel is name; else join with slash

            child_rel_path = f"{current_rel}/{name}".lstrip("/")

            # Attach relativePath to the returned item dict

            item_with_path = dict(child) # shallow copy

            item_with_path["relativePath"] = child_rel_path

            results.append(item_with_path)

            # If folder, push onto stack to enumerate its children

            if "folder" in child:

                stack.append((child["id"], child_rel_path))

    return results

# COMMAND ----------

# Retry settings

MAX_RETRIES = 5

BASE_BACKOFF = 2 # seconds

# COMMAND ----------

# Azure Storage destination

storage_account = "deststoraccount"

container_name = "ctr1"

blob_service = BlobServiceClient.from_connection_string(connection_string)

container_client = blob_service.get_container_client(container_name)

# Checkpoint blob path inside container

checkpoint_blob_path = "_checkpoints/sharepoint_to_adls_checkpoint.json"

# Checkpoint structure: { item_id: lastModifiedDateTime }

checkpoint_blob = container_client.get_blob_client(checkpoint_blob_path)

def load_checkpoint() -> Dict[str, str]:

    try:

        data = checkpoint_blob.download_blob().readall()

        return json.loads(data)

    except Exception:

        logger.info("No checkpoint found, starting fresh.")

        return {}

def save_checkpoint(checkpoint: Dict[str, str]):

    checkpoint_blob.upload_blob(json.dumps(checkpoint), overwrite=True)

    logger.info("Checkpoint saved with %d entries", len(checkpoint))

# COMMAND ----------

import os

import mimetypes

from io import BytesIO

from typing import Optional, Dict

import requests

from azure.core.exceptions import ResourceExistsError, ServiceRequestError, ClientAuthenticationError, HttpResponseError

from azure.storage.blob import (

    BlobServiceClient,

    BlobClient,

    ContentSettings

)

# ----------------------------

# Azure Blob upload helpers

# ----------------------------

def _get_blob_service_client(

    *,

    connection_string: Optional[str] = None,

    account_url: Optional[str] = None,

    sas_token: Optional[str] = None

) -> BlobServiceClient:

    """

    Create a BlobServiceClient from one of:

      - connection_string

      - account_url + sas_token (e.g., https://<acct>.blob.core.windows.net/?<sas>)

    """

    if connection_string:

        return BlobServiceClient.from_connection_string(connection_string)

    if account_url and sas_token:

        # Ensure sas_token starts with '?'

        sas = sas_token if sas_token.startswith('?') else f'?{sas_token}'

        return BlobServiceClient(account_url=account_url, credential=sas)

    raise ValueError("Provide either connection_string OR (account_url AND sas_token).")

def download_file(drive_id, item_id, token = None):

    if not token:

        token = get_graph_token()

    headers = {"Authorization": f"Bearer {token}"}

    url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/content"

    resp = requests.get(url, headers=headers)

    resp.raise_for_status()

    return resp.content

def upload_bytes_to_blob(

    data: bytes,

    *,

    container_name: str,

    blob_name: str,

    blob_service: Optional[BlobServiceClient] = None,

    connection_string: Optional[str] = None,

    account_url: Optional[str] = None,

    sas_token: Optional[str] = None,

    content_type: Optional[str] = None,

    overwrite: bool = False,

    metadata: Optional[Dict[str, str]] = None

) -> str:

    """

    Uploads a bytes object to Azure Blob Storage and returns the blob URL.

    Parameters:

        data (bytes): File content.

        container_name (str): Target container name.

        blob_name (str): Target blob name (e.g., 'reports/file.pdf').

        connection_string/account_url/sas_token: Auth options.

        content_type (str): MIME type; if None, guessed from blob_name.

        overwrite (bool): Replace if exists.

        metadata (dict): Optional metadata key/value pairs.

    Returns:

        str: The URL of the uploaded blob.

    """

    # print(f"blob_name={blob_name}")

    # Guess content type if not provided

    if content_type is None:

        guessed, _ = mimetypes.guess_type(blob_name)

        content_type = guessed or "application/octet-stream"

    # Build client

    bsc = None

    if blob_service != None:

        bsc = blob_service

    if not bsc:

        bsc = _get_blob_service_client(

        connection_string=connection_string,

        account_url=account_url,

        sas_token=sas_token

        )

    container_client = bsc.get_container_client(container_name)

    # Ensure container exists (idempotent)

    try:

        container_client.create_container()

    except ResourceExistsError:

        pass # already exists

    blob_client: BlobClient = container_client.get_blob_client(blob_name)

    # Upload

    try:

        content_settings = ContentSettings(content_type=content_type)

        # Use a stream to be memory-friendly for large files, though we already have bytes

        stream = BytesIO(data)

        blob_client.upload_blob(

            stream,

            overwrite=overwrite,

            metadata=metadata,

            content_settings=content_settings

        )

    except ResourceExistsError:

        if not overwrite:

            raise

    except ClientAuthenticationError as e:

        raise RuntimeError(f"Authentication failed when uploading blob: {e}") from e

    except (ServiceRequestError, HttpResponseError) as e:

        raise RuntimeError(f"Blob upload failed: {e}") from e

    # Construct and return URL (works for both conn string and SAS)

    return blob_client.url

def with_retries(func):

    def wrapper(*args, **kwargs):

        backoff = BASE_BACKOFF

        for attempt in range(1, MAX_RETRIES + 1):

            try:

                return func(*args, **kwargs)

            except Exception as e:

                logger.warning("Attempt %d/%d failed for %s: %s", attempt, MAX_RETRIES, func.__name__, e)

                if attempt == MAX_RETRIES:

                    logger.error("Max retries reached for %s", func.__name__)

                    raise

                time.sleep(backoff)

                backoff *= 2

    return wrapper

download_file_with_retries = with_retries(download_file)

upload_bytes_to_blob_with_retries = with_retries(upload_bytes_to_blob)

# ----------------------------

# Orchestrator

# ----------------------------

def download_and_upload_to_blob(

    *,

    drive_id: str,

    item_id: str,

    token: str,

    container_name: str,

    blob_name: str,

    blob_service: Optional[BlobServiceClient] = None,

    connection_string: Optional[str] = None,

    account_url: Optional[str] = None,

    sas_token: Optional[str] = None,

    content_type: Optional[str] = None,

    overwrite: bool = False,

    metadata: Optional[Dict[str, str]] = None

) -> str:

    """

    Downloads a file from Microsoft Graph using the provided item_id and uploads it to Azure Blob Storage.

    Returns the blob URL.

    """

    # 1) Download bytes from Graph

    file_bytes = download_file_with_retries(drive_id, item_id, token=token)

    # 2) Upload to Blob

    blob_url = upload_bytes_to_blob_with_retries(

        file_bytes,

        container_name=container_name,

        blob_name=blob_name,

        blob_service=blob_service,

        connection_string=connection_string,

        account_url=account_url,

        sas_token=sas_token,

        content_type=content_type,

        overwrite=overwrite,

        metadata=metadata,

    )

    return blob_url

# COMMAND ----------

import base64

from azure.storage.blob import ContentSettings

def copy_site_items_to_storage_account(hostname="myazure.sharepoint.com", site_path="/sites/site1/Deep/to/EI", container_name = "ctr1", destination_folder = "domestic/EI"):

    site_id, drive_ids = get_site_id_drive_ids(hostname,site_path)

    checkpoint = load_checkpoint() # item_id -> lastModifiedDateTime

    for drive_id in drive_ids:

        print(f"Processing drive_id={drive_id}")

        token = get_graph_token()

        items = enumerate_drive_recursive(site_id, drive_id, token)

        items = [it for it in items if "file" in it]

        len_items = len(items)

        if len_items == 0:

            continue

        max_size = max(entry["size"] for entry in items)

        sum_size = sum(entry["size"] for entry in items)

        print(f"max_size={max_size}, sum_size={sum_size}, len_items={len_items}")

        drive_name = get_drive_name_or_none(site_id, drive_id, token)

        if not drive_name:

            continue

        for item in items:

            item_id = item["id"]

            last_mod = item.get("lastModifiedDateTime")

            rel_path = item.get("relativePath")

            try:

                blob_name = destination_folder + "/"

                if drive_name:

                    blob_name += drive_name + "/"

                if item["relativePath"]:

                    blob_name += rel_path

                else:

                    continue

                download_and_upload_to_blob(drive_id = drive_id, item_id=item_id, blob_service = blob_service, token = token, container_name=container_name, blob_name=blob_name, connection_string=connection_string)

                # Update checkpoint after successful upload

                checkpoint[item_id] = last_mod

                save_checkpoint(checkpoint)

                logger.info("Copied and checkpointed %s", rel_path)

            except Exception as e:

                print(f"Error: {e}, item_id={item["id"]}, item_url={item["webUrl"]}")

    # break

    # break

    # return

# COMMAND ----------

copy_site_items_to_storage_account()

References: previous article: https://1drv.ms/w/c/d609fb70e39b65c8/IQBiE_AAtirtRbK2Ur7XROmCAYCKwfMgvdfvbvFjw0j_o5Q?e=EmfLlu


No comments:

Post a Comment