This demonstrates sample data transfer from sharepoint to adls:
# Databricks notebook source
import os
os.environ["SHAREPOINT_CLIENT_ID"] = ""
os.environ["SHAREPOINT_CLIENT_SECRET"] = ""
os.environ["SHAREPOINT_TENANT_ID"] = ""
# COMMAND ----------
import json
import time
import logging
import requests
from msal import ConfidentialClientApplication
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, ContentSettings
from urllib.parse import quote
from typing import Dict, Set, List
# COMMAND ----------
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("sp-to-adls")
# COMMAND ----------
client_id = os.environ["SHAREPOINT_CLIENT_ID"]
client_secret = os.environ["SHAREPOINT_CLIENT_SECRET"]
tenant_id = os.environ["SHAREPOINT_TENANT_ID"]
# COMMAND ----------
os.environ["STORAGE_CONNECTION_STRING"] = ""
# COMMAND ----------
connection_string = os.environ["STORAGE_CONNECTION_STRING"]
# COMMAND ----------
def get_graph_token():
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = ConfidentialClientApplication(client_id, authority=authority, client_credential=client_secret)
scope = "https://graph.microsoft.com/.default"
token = app.acquire_token_for_client(scopes=[scope])
return token["access_token"]
# COMMAND ----------
def get_drive_name(site_id: str, drive_id: str, token: str) -> str:
"""
Return the drive name for the given site_id and drive_id using Microsoft Graph.
Requires a get_graph_token() function that returns a valid app-only access token.
"""
access_token = None
if token:
access_token = token
if not access_token:
access_token = get_graph_token()
headers = {"Authorization": f"Bearer {access_token}"}
url_drive = f"https://graph.microsoft.com/v1.0/drives/{drive_id}"
resp = requests.get(url_drive, headers=headers)
if resp.status_code == 200:
drive = resp.json()
return drive.get("name")
elif resp.status_code in (401, 403):
# Token issue or permission problem; try refreshing token once
access_token = get_graph_token()
headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get(url_drive, headers=headers)
resp.raise_for_status()
return resp.json().get("name")
elif resp.status_code == 404:
# Fallback: list drives under the site and match by id
url_site_drives = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
resp2 = requests.get(url_site_drives, headers=headers)
resp2.raise_for_status()
drives = resp2.json().get("value", [])
for d in drives:
if d.get("id") == drive_id:
return d.get("name")
raise RuntimeError(f"Drive id {drive_id} not found under site {site_id}")
else:
# Raise for other unexpected statuses
resp.raise_for_status()
# COMMAND ----------
def get_drive_name_or_none(site_id: str, drive_id: str, token: str) -> str:
try:
drive_name = get_drive_name(site_id, drive_id, token)
return drive_name.strip('/')
except Exception as e:
# print("Failed to resolve drive name:", e)
return None
# COMMAND ----------
def get_site_id_drive_ids(hostname="myazure.sharepoint.com", site_path="/sites/site1/Deep/to/EI"):
access_token = get_graph_token()
headers = {"Authorization": f"Bearer {access_token}"}
# 1) Resolve site by path -> site-id
site_url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}"
r = requests.get(site_url, headers=headers)
r.raise_for_status()
site = r.json()
site_id = site["id"]
print("Site id:", site_id)
# 2) List drives (document libraries) for the site -> find drive id
drives_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
r = requests.get(drives_url, headers=headers)
r.raise_for_status()
drives = r.json().get("value", [])
drive_ids = []
for d in drives:
print("Drive name:", d["name"], "Drive id:", d["id"])
drive_ids += [d["id"]]
return site_id, drive_ids
# COMMAND ----------
site_id, drive_ids = get_site_id_drive_ids()
print(f"site_id={site_id}, drive_ids={drive_ids}")
print(f"drive_ids={drive_ids}")
# COMMAND ----------
from typing import List, Dict
def list_children_for_drive_item(drive_id: str, item_id: str, token: str) -> List[Dict]:
"""
List children for a drive item (folder) using Graph API with pagination.
- drive_id: the drive id (not site id)
- item_id: 'root' or a folder item id
- token: Graph access token
Returns list of item dicts (raw Graph objects).
"""
headers = {"Authorization": f"Bearer {token}"}
# Use the drive children endpoint; for root use /drives/{drive_id}/root/children
if item_id == "root":
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children"
else:
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/children"
items = []
while url:
resp = requests.get(url, headers=headers)
resp.raise_for_status()
payload = resp.json()
items.extend(payload.get("value", []))
url = payload.get("@odata.nextLink")
return items
def enumerate_drive_recursive(site_id: str, drive_id: str, token: str) -> List[Dict]:
"""
Recursively enumerate all items in the specified drive and return a list of items
with an added 'relativePath' key for each item.
- site_id: Graph site id (not used directly in the drive calls but kept for signature parity)
- drive_id: Graph drive id (document library)
Returns: List of dicts with keys: id, name, relativePath, file/folder, lastModifiedDateTime, parentReference, ...
"""
# Assumes get_graph_token() is defined elsewhere in your notebook and returns a valid app-only token
if not token:
token = get_graph_token()
results = []
stack = [("root", "")] # (item_id, relative_path)
while stack:
current_id, current_rel = stack.pop()
try:
children = list_children_for_drive_item(drive_id, current_id, token)
except requests.HTTPError as e:
# If token expired or transient error, refresh token and retry once
if e.response.status_code in (401, 403):
token = get_graph_token()
children = list_children_for_drive_item(drive_id, current_id, token)
else:
raise
for child in children:
name = child.get("name", "")
# Build relative path: if current_rel is empty, child_rel is name; else join with slash
child_rel_path = f"{current_rel}/{name}".lstrip("/")
# Attach relativePath to the returned item dict
item_with_path = dict(child) # shallow copy
item_with_path["relativePath"] = child_rel_path
results.append(item_with_path)
# If folder, push onto stack to enumerate its children
if "folder" in child:
stack.append((child["id"], child_rel_path))
return results
# COMMAND ----------
# Retry settings
MAX_RETRIES = 5
BASE_BACKOFF = 2 # seconds
# COMMAND ----------
# Azure Storage destination
storage_account = "deststoraccount"
container_name = "ctr1"
blob_service = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service.get_container_client(container_name)
# Checkpoint blob path inside container
checkpoint_blob_path = "_checkpoints/sharepoint_to_adls_checkpoint.json"
# Checkpoint structure: { item_id: lastModifiedDateTime }
checkpoint_blob = container_client.get_blob_client(checkpoint_blob_path)
def load_checkpoint() -> Dict[str, str]:
try:
data = checkpoint_blob.download_blob().readall()
return json.loads(data)
except Exception:
logger.info("No checkpoint found, starting fresh.")
return {}
def save_checkpoint(checkpoint: Dict[str, str]):
checkpoint_blob.upload_blob(json.dumps(checkpoint), overwrite=True)
logger.info("Checkpoint saved with %d entries", len(checkpoint))
# COMMAND ----------
import os
import mimetypes
from io import BytesIO
from typing import Optional, Dict
import requests
from azure.core.exceptions import ResourceExistsError, ServiceRequestError, ClientAuthenticationError, HttpResponseError
from azure.storage.blob import (
BlobServiceClient,
BlobClient,
ContentSettings
)
# ----------------------------
# Azure Blob upload helpers
# ----------------------------
def _get_blob_service_client(
*,
connection_string: Optional[str] = None,
account_url: Optional[str] = None,
sas_token: Optional[str] = None
) -> BlobServiceClient:
"""
Create a BlobServiceClient from one of:
- connection_string
- account_url + sas_token (e.g., https://<acct>.blob.core.windows.net/?<sas>)
"""
if connection_string:
return BlobServiceClient.from_connection_string(connection_string)
if account_url and sas_token:
# Ensure sas_token starts with '?'
sas = sas_token if sas_token.startswith('?') else f'?{sas_token}'
return BlobServiceClient(account_url=account_url, credential=sas)
raise ValueError("Provide either connection_string OR (account_url AND sas_token).")
def download_file(drive_id, item_id, token = None):
if not token:
token = get_graph_token()
headers = {"Authorization": f"Bearer {token}"}
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/content"
resp = requests.get(url, headers=headers)
resp.raise_for_status()
return resp.content
def upload_bytes_to_blob(
data: bytes,
*,
container_name: str,
blob_name: str,
blob_service: Optional[BlobServiceClient] = None,
connection_string: Optional[str] = None,
account_url: Optional[str] = None,
sas_token: Optional[str] = None,
content_type: Optional[str] = None,
overwrite: bool = False,
metadata: Optional[Dict[str, str]] = None
) -> str:
"""
Uploads a bytes object to Azure Blob Storage and returns the blob URL.
Parameters:
data (bytes): File content.
container_name (str): Target container name.
blob_name (str): Target blob name (e.g., 'reports/file.pdf').
connection_string/account_url/sas_token: Auth options.
content_type (str): MIME type; if None, guessed from blob_name.
overwrite (bool): Replace if exists.
metadata (dict): Optional metadata key/value pairs.
Returns:
str: The URL of the uploaded blob.
"""
# print(f"blob_name={blob_name}")
# Guess content type if not provided
if content_type is None:
guessed, _ = mimetypes.guess_type(blob_name)
content_type = guessed or "application/octet-stream"
# Build client
bsc = None
if blob_service != None:
bsc = blob_service
if not bsc:
bsc = _get_blob_service_client(
connection_string=connection_string,
account_url=account_url,
sas_token=sas_token
)
container_client = bsc.get_container_client(container_name)
# Ensure container exists (idempotent)
try:
container_client.create_container()
except ResourceExistsError:
pass # already exists
blob_client: BlobClient = container_client.get_blob_client(blob_name)
# Upload
try:
content_settings = ContentSettings(content_type=content_type)
# Use a stream to be memory-friendly for large files, though we already have bytes
stream = BytesIO(data)
blob_client.upload_blob(
stream,
overwrite=overwrite,
metadata=metadata,
content_settings=content_settings
)
except ResourceExistsError:
if not overwrite:
raise
except ClientAuthenticationError as e:
raise RuntimeError(f"Authentication failed when uploading blob: {e}") from e
except (ServiceRequestError, HttpResponseError) as e:
raise RuntimeError(f"Blob upload failed: {e}") from e
# Construct and return URL (works for both conn string and SAS)
return blob_client.url
def with_retries(func):
def wrapper(*args, **kwargs):
backoff = BASE_BACKOFF
for attempt in range(1, MAX_RETRIES + 1):
try:
return func(*args, **kwargs)
except Exception as e:
logger.warning("Attempt %d/%d failed for %s: %s", attempt, MAX_RETRIES, func.__name__, e)
if attempt == MAX_RETRIES:
logger.error("Max retries reached for %s", func.__name__)
raise
time.sleep(backoff)
backoff *= 2
return wrapper
download_file_with_retries = with_retries(download_file)
upload_bytes_to_blob_with_retries = with_retries(upload_bytes_to_blob)
# ----------------------------
# Orchestrator
# ----------------------------
def download_and_upload_to_blob(
*,
drive_id: str,
item_id: str,
token: str,
container_name: str,
blob_name: str,
blob_service: Optional[BlobServiceClient] = None,
connection_string: Optional[str] = None,
account_url: Optional[str] = None,
sas_token: Optional[str] = None,
content_type: Optional[str] = None,
overwrite: bool = False,
metadata: Optional[Dict[str, str]] = None
) -> str:
"""
Downloads a file from Microsoft Graph using the provided item_id and uploads it to Azure Blob Storage.
Returns the blob URL.
"""
# 1) Download bytes from Graph
file_bytes = download_file_with_retries(drive_id, item_id, token=token)
# 2) Upload to Blob
blob_url = upload_bytes_to_blob_with_retries(
file_bytes,
container_name=container_name,
blob_name=blob_name,
blob_service=blob_service,
connection_string=connection_string,
account_url=account_url,
sas_token=sas_token,
content_type=content_type,
overwrite=overwrite,
metadata=metadata,
)
return blob_url
# COMMAND ----------
import base64
from azure.storage.blob import ContentSettings
def copy_site_items_to_storage_account(hostname="myazure.sharepoint.com", site_path="/sites/site1/Deep/to/EI", container_name = "ctr1", destination_folder = "domestic/EI"):
site_id, drive_ids = get_site_id_drive_ids(hostname,site_path)
checkpoint = load_checkpoint() # item_id -> lastModifiedDateTime
for drive_id in drive_ids:
print(f"Processing drive_id={drive_id}")
token = get_graph_token()
items = enumerate_drive_recursive(site_id, drive_id, token)
items = [it for it in items if "file" in it]
len_items = len(items)
if len_items == 0:
continue
max_size = max(entry["size"] for entry in items)
sum_size = sum(entry["size"] for entry in items)
print(f"max_size={max_size}, sum_size={sum_size}, len_items={len_items}")
drive_name = get_drive_name_or_none(site_id, drive_id, token)
if not drive_name:
continue
for item in items:
item_id = item["id"]
last_mod = item.get("lastModifiedDateTime")
rel_path = item.get("relativePath")
try:
blob_name = destination_folder + "/"
if drive_name:
blob_name += drive_name + "/"
if item["relativePath"]:
blob_name += rel_path
else:
continue
download_and_upload_to_blob(drive_id = drive_id, item_id=item_id, blob_service = blob_service, token = token, container_name=container_name, blob_name=blob_name, connection_string=connection_string)
# Update checkpoint after successful upload
checkpoint[item_id] = last_mod
save_checkpoint(checkpoint)
logger.info("Copied and checkpointed %s", rel_path)
except Exception as e:
print(f"Error: {e}, item_id={item["id"]}, item_url={item["webUrl"]}")
# break
# break
# return
# COMMAND ----------
copy_site_items_to_storage_account()
References: previous article: https://1drv.ms/w/c/d609fb70e39b65c8/IQBiE_AAtirtRbK2Ur7XROmCAYCKwfMgvdfvbvFjw0j_o5Q?e=EmfLlu