Sample code to import sharepoint data to Azure Storage Account:
# Databricks notebook source
import os
os.environ["PROD_DSS_SHAREPOINT_CLIENT_ID"] = ""
os.environ["PROD_DSS_SHAREPOINT_CLIENT_SECRET"] = ""
os.environ["PROD_DSS_SHAREPOINT_TENANT_ID"] = ""
# COMMAND ----------
# Optional: use dbutils secrets for sensitive values
# client_secret = dbutils.secrets.get(scope="my-scope", key="sharepoint-client-secret")
# client_id = dbutils.secrets.get(scope="my-scope", key="sharepoint-client-id")
# tenant_id = dbutils.secrets.get(scope="my-scope", key="sharepoint-tenant-id")
# COMMAND ----------
import requests
from msal import ConfidentialClientApplication
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
import json
import time
# === SharePoint App Registration ===
import os
client_id = os.environ["PROD_DSS_SHAREPOINT_CLIENT_ID"]
client_secret = os.environ["PROD_DSS_SHAREPOINT_CLIENT_SECRET"]
tenant_id = os.environ["PROD_DSS_SHAREPOINT_TENANT_ID"]
siteId = "site01/EI/"
listId = "Links"
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = ["https://graph.microsoft.com/.default"]
app = ConfidentialClientApplication(
client_id,
authority=authority,
client_credential=client_secret
)
def get_graph_token():
token = app.acquire_token_for_client(scopes=scope)
return token["access_token"]
# COMMAND ----------
# === Azure Storage ===
storage_account = "someaccount01"
container_name = "container01"
credential = DefaultAzureCredential()
blob_service = BlobServiceClient(
f"https://{storage_account}.blob.core.windows.net",
credential=credential
)
container_client = blob_service.get_container_client(container_name)
# COMMAND ----------
checkpoint_blob = container_client.get_blob_client("_checkpoints/sharepoint_copied.json")
def load_checkpoint():
try:
data = checkpoint_blob.download_blob().readall()
return set(json.loads(data))
except Exception:
return set()
def save_checkpoint(copied_ids):
checkpoint_blob.upload_blob(
json.dumps(list(copied_ids)),
overwrite=True
)
# COMMAND ----------
import requests
from msal import ConfidentialClientApplication
# The SharePoint host and site path from your URL
hostname = "uhgazure.sharepoint.com"
site_path = "/sites/site01/EI" # server-relative path (no trailing path/to/lists)
# MSAL app-only token
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = ConfidentialClientApplication(client_id, authority=authority, client_credential=client_secret)
token = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
access_token = token.get("access_token")
headers = {"Authorization": f"Bearer {access_token}"}
# 1) Resolve site by path -> site-id
site_url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}"
r = requests.get(site_url, headers=headers)
r.raise_for_status()
site = r.json()
site_id = site["id"]
print("Site id:", site_id)
# 2) List drives (document libraries) for the site -> find drive id
drives_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
r = requests.get(drives_url, headers=headers)
r.raise_for_status()
drives = r.json().get("value", [])
for d in drives:
print("Drive name:", d["name"], "Drive id:", d["id"])
# If you know the library name, pick it:
target_library = "Documents" # or the library name you expect
drive_id = next((d["id"] for d in drives if d["name"] == target_library), None)
print("Selected drive id:", drive_id)
# COMMAND ----------
drive_id = "pick-a-value-from-above-output"
def list_all_items():
token = get_graph_token()
headers = {"Authorization": f"Bearer {token}"}
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children"
items = []
while url:
resp = requests.get(url, headers=headers)
resp.raise_for_status()
data = resp.json()
items.extend(data.get("value", []))
url = data.get("@odata.nextLink") # pagination
return items
# COMMAND ----------
def download_file(item_id):
token = get_graph_token()
headers = {"Authorization": f"Bearer {token}"}
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/content"
resp = requests.get(url, headers=headers)
resp.raise_for_status()
return resp.content
# COMMAND ----------
os.environ["STORAGE_CONNECTION_STRING"]
# COMMAND ----------
connection_string = os.environ["STORAGE_CONNECTION_STRING"]
# COMMAND ----------
import os
import mimetypes
from io import BytesIO
from typing import Optional, Dict
import requests
from azure.core.exceptions import ResourceExistsError, ServiceRequestError, ClientAuthenticationError, HttpResponseError
from azure.storage.blob import (
BlobServiceClient,
BlobClient,
ContentSettings
)
# ----------------------------
# Azure Blob upload helpers
# ----------------------------
def _get_blob_service_client(
*,
connection_string: Optional[str] = None,
account_url: Optional[str] = None,
sas_token: Optional[str] = None
) -> BlobServiceClient:
"""
Create a BlobServiceClient from one of:
- connection_string
- account_url + sas_token (e.g., https://<acct>.blob.core.windows.net/?<sas>)
"""
if connection_string:
return BlobServiceClient.from_connection_string(connection_string)
if account_url and sas_token:
# Ensure sas_token starts with '?'
sas = sas_token if sas_token.startswith('?') else f'?{sas_token}'
return BlobServiceClient(account_url=account_url, credential=sas)
raise ValueError("Provide either connection_string OR (account_url AND sas_token).")
def upload_bytes_to_blob(
data: bytes,
*,
container_name: str,
blob_name: str,
connection_string: Optional[str] = None,
account_url: Optional[str] = None,
sas_token: Optional[str] = None,
content_type: Optional[str] = None,
overwrite: bool = False,
metadata: Optional[Dict[str, str]] = None
) -> str:
"""
Uploads a bytes object to Azure Blob Storage and returns the blob URL.
Parameters:
data (bytes): File content.
container_name (str): Target container name.
blob_name (str): Target blob name (e.g., 'reports/file.pdf').
connection_string/account_url/sas_token: Auth options.
content_type (str): MIME type; if None, guessed from blob_name.
overwrite (bool): Replace if exists.
metadata (dict): Optional metadata key/value pairs.
Returns:
str: The URL of the uploaded blob.
"""
# Guess content type if not provided
if content_type is None:
guessed, _ = mimetypes.guess_type(blob_name)
content_type = guessed or "application/octet-stream"
# Build client
bsc = _get_blob_service_client(
connection_string=connection_string,
account_url=account_url,
sas_token=sas_token
)
container_client = bsc.get_container_client(container_name)
# Ensure container exists (idempotent)
try:
container_client.create_container()
except ResourceExistsError:
pass # already exists
blob_client: BlobClient = container_client.get_blob_client(blob_name)
# Upload
try:
content_settings = ContentSettings(content_type=content_type)
# Use a stream to be memory-friendly for large files, though we already have bytes
stream = BytesIO(data)
blob_client.upload_blob(
stream,
overwrite=overwrite,
metadata=metadata,
content_settings=content_settings
)
except ResourceExistsError:
if not overwrite:
raise
except ClientAuthenticationError as e:
raise RuntimeError(f"Authentication failed when uploading blob: {e}") from e
except (ServiceRequestError, HttpResponseError) as e:
raise RuntimeError(f"Blob upload failed: {e}") from e
# Construct and return URL (works for both conn string and SAS)
return blob_client.url
# ----------------------------
# Orchestrator
# ----------------------------
def download_and_upload_to_blob(
*,
item_id: str,
container_name: str,
blob_name: str,
connection_string: Optional[str] = None,
account_url: Optional[str] = None,
sas_token: Optional[str] = None,
content_type: Optional[str] = None,
overwrite: bool = False,
metadata: Optional[Dict[str, str]] = None
) -> str:
"""
Downloads a file from Microsoft Graph using the provided item_id and uploads it to Azure Blob Storage.
Returns the blob URL.
"""
# 1) Download bytes from Graph
file_bytes = download_file(item_id)
# 2) Upload to Blob
blob_url = upload_bytes_to_blob(
file_bytes,
container_name=container_name,
blob_name=blob_name,
connection_string=connection_string,
account_url=account_url,
sas_token=sas_token,
content_type=content_type,
overwrite=overwrite,
metadata=metadata,
)
return blob_url
# COMMAND ----------
items = list_all_items()
# COMMAND ----------
print(len(items))
# COMMAND ----------
print(items[0:3])
# COMMAND ----------
import json
print(json.dumps(items[0], indent=4))
# COMMAND ----------
from urllib.parse import urlparse, parse_qs, unquote
def after_ei_from_xml_location(url: str, *, decode: bool = True) -> str:
"""
Extracts the substring after '/EI/' from the XmlLocation query parameter.
Args:
url: The full URL containing the XmlLocation query parameter.
decode: If True, URL-decodes the result (default True).
Returns:
The substring after '/EI/' from XmlLocation, or an empty string if not found.
"""
parsed = urlparse(url)
qs = parse_qs(parsed.query)
xml_loc_values = qs.get("XmlLocation")
# print(f"xml_loc_values={xml_loc_values}")
if not xml_loc_values:
return "" # XmlLocation not present
# Take the first XmlLocation value
xml_loc = xml_loc_values[0]
if decode:
xml_loc = unquote(xml_loc)
# print(f"xml_loc={xml_loc}")
marker = "/EI/"
if marker not in xml_loc:
return "" # No /EI/ in the XmlLocation value
return xml_loc.split(marker, 1)[1]
# COMMAND ----------
download_and_upload_to_blob(item_id=items[0]["id"], container_name="iris", blob_name="domestic/EI/" + after_ei_from_xml_location(url=items[0]['webUrl']), connection_string=connection_string)
# COMMAND ----------
max_size = max(entry["size"] for entry in items)
sum_size = sum(entry["size"] for entry in items)
len_items = len(items)
print(f"max_size={max_size}, sum_size={sum_size}, len_items={len_items}")
# COMMAND ----------
for item in items:
try:
download_and_upload_to_blob(item_id=item["id"], container_name="iris", blob_name="domestic/EI/" + after_ei_from_xml_location(url=item['webUrl']), connection_string=connection_string)
print(f"{item["webUrl"]}")
except Exception as e:
print(f"Error: {e}, item_id={item["id"]}, item_url={item["webUrl"]}")
Reference: previous article for context: https://1drv.ms/w/c/d609fb70e39b65c8/IQBV3Sd02qPlRa_y13mxnxHSAa6mrM4rmM3pnvbWPW7RpIE?e=b1nGOi
Sample code to import sharepoint data to Azure Storage Account:
# Databricks notebook source
import os
os.environ["PROD_DSS_SHAREPOINT_CLIENT_ID"] = ""
os.environ["PROD_DSS_SHAREPOINT_CLIENT_SECRET"] = ""
os.environ["PROD_DSS_SHAREPOINT_TENANT_ID"] = ""
# COMMAND ----------
# Optional: use dbutils secrets for sensitive values
# client_secret = dbutils.secrets.get(scope="my-scope", key="sharepoint-client-secret")
# client_id = dbutils.secrets.get(scope="my-scope", key="sharepoint-client-id")
# tenant_id = dbutils.secrets.get(scope="my-scope", key="sharepoint-tenant-id")
# COMMAND ----------
import requests
from msal import ConfidentialClientApplication
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
import json
import time
# === SharePoint App Registration ===
import os
client_id = os.environ["PROD_DSS_SHAREPOINT_CLIENT_ID"]
client_secret = os.environ["PROD_DSS_SHAREPOINT_CLIENT_SECRET"]
tenant_id = os.environ["PROD_DSS_SHAREPOINT_TENANT_ID"]
siteId = "site01/EI/"
listId = "Links"
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = ["https://graph.microsoft.com/.default"]
app = ConfidentialClientApplication(
client_id,
authority=authority,
client_credential=client_secret
)
def get_graph_token():
token = app.acquire_token_for_client(scopes=scope)
return token["access_token"]
# COMMAND ----------
# === Azure Storage ===
storage_account = "someaccount01"
container_name = "container01"
credential = DefaultAzureCredential()
blob_service = BlobServiceClient(
f"https://{storage_account}.blob.core.windows.net",
credential=credential
)
container_client = blob_service.get_container_client(container_name)
# COMMAND ----------
checkpoint_blob = container_client.get_blob_client("_checkpoints/sharepoint_copied.json")
def load_checkpoint():
try:
data = checkpoint_blob.download_blob().readall()
return set(json.loads(data))
except Exception:
return set()
def save_checkpoint(copied_ids):
checkpoint_blob.upload_blob(
json.dumps(list(copied_ids)),
overwrite=True
)
# COMMAND ----------
import requests
from msal import ConfidentialClientApplication
# The SharePoint host and site path from your URL
hostname = "uhgazure.sharepoint.com"
site_path = "/sites/site01/EI" # server-relative path (no trailing path/to/lists)
# MSAL app-only token
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = ConfidentialClientApplication(client_id, authority=authority, client_credential=client_secret)
token = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
access_token = token.get("access_token")
headers = {"Authorization": f"Bearer {access_token}"}
# 1) Resolve site by path -> site-id
site_url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}"
r = requests.get(site_url, headers=headers)
r.raise_for_status()
site = r.json()
site_id = site["id"]
print("Site id:", site_id)
# 2) List drives (document libraries) for the site -> find drive id
drives_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
r = requests.get(drives_url, headers=headers)
r.raise_for_status()
drives = r.json().get("value", [])
for d in drives:
print("Drive name:", d["name"], "Drive id:", d["id"])
# If you know the library name, pick it:
target_library = "Documents" # or the library name you expect
drive_id = next((d["id"] for d in drives if d["name"] == target_library), None)
print("Selected drive id:", drive_id)
# COMMAND ----------
drive_id = "pick-a-value-from-above-output"
def list_all_items():
token = get_graph_token()
headers = {"Authorization": f"Bearer {token}"}
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children"
items = []
while url:
resp = requests.get(url, headers=headers)
resp.raise_for_status()
data = resp.json()
items.extend(data.get("value", []))
url = data.get("@odata.nextLink") # pagination
return items
# COMMAND ----------
def download_file(item_id):
token = get_graph_token()
headers = {"Authorization": f"Bearer {token}"}
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/content"
resp = requests.get(url, headers=headers)
resp.raise_for_status()
return resp.content
# COMMAND ----------
os.environ["STORAGE_CONNECTION_STRING"]
# COMMAND ----------
connection_string = os.environ["STORAGE_CONNECTION_STRING"]
# COMMAND ----------
import os
import mimetypes
from io import BytesIO
from typing import Optional, Dict
import requests
from azure.core.exceptions import ResourceExistsError, ServiceRequestError, ClientAuthenticationError, HttpResponseError
from azure.storage.blob import (
BlobServiceClient,
BlobClient,
ContentSettings
)
# ----------------------------
# Azure Blob upload helpers
# ----------------------------
def _get_blob_service_client(
*,
connection_string: Optional[str] = None,
account_url: Optional[str] = None,
sas_token: Optional[str] = None
) -> BlobServiceClient:
"""
Create a BlobServiceClient from one of:
- connection_string
- account_url + sas_token (e.g., https://<acct>.blob.core.windows.net/?<sas>)
"""
if connection_string:
return BlobServiceClient.from_connection_string(connection_string)
if account_url and sas_token:
# Ensure sas_token starts with '?'
sas = sas_token if sas_token.startswith('?') else f'?{sas_token}'
return BlobServiceClient(account_url=account_url, credential=sas)
raise ValueError("Provide either connection_string OR (account_url AND sas_token).")
def upload_bytes_to_blob(
data: bytes,
*,
container_name: str,
blob_name: str,
connection_string: Optional[str] = None,
account_url: Optional[str] = None,
sas_token: Optional[str] = None,
content_type: Optional[str] = None,
overwrite: bool = False,
metadata: Optional[Dict[str, str]] = None
) -> str:
"""
Uploads a bytes object to Azure Blob Storage and returns the blob URL.
Parameters:
data (bytes): File content.
container_name (str): Target container name.
blob_name (str): Target blob name (e.g., 'reports/file.pdf').
connection_string/account_url/sas_token: Auth options.
content_type (str): MIME type; if None, guessed from blob_name.
overwrite (bool): Replace if exists.
metadata (dict): Optional metadata key/value pairs.
Returns:
str: The URL of the uploaded blob.
"""
# Guess content type if not provided
if content_type is None:
guessed, _ = mimetypes.guess_type(blob_name)
content_type = guessed or "application/octet-stream"
# Build client
bsc = _get_blob_service_client(
connection_string=connection_string,
account_url=account_url,
sas_token=sas_token
)
container_client = bsc.get_container_client(container_name)
# Ensure container exists (idempotent)
try:
container_client.create_container()
except ResourceExistsError:
pass # already exists
blob_client: BlobClient = container_client.get_blob_client(blob_name)
# Upload
try:
content_settings = ContentSettings(content_type=content_type)
# Use a stream to be memory-friendly for large files, though we already have bytes
stream = BytesIO(data)
blob_client.upload_blob(
stream,
overwrite=overwrite,
metadata=metadata,
content_settings=content_settings
)
except ResourceExistsError:
if not overwrite:
raise
except ClientAuthenticationError as e:
raise RuntimeError(f"Authentication failed when uploading blob: {e}") from e
except (ServiceRequestError, HttpResponseError) as e:
raise RuntimeError(f"Blob upload failed: {e}") from e
# Construct and return URL (works for both conn string and SAS)
return blob_client.url
# ----------------------------
# Orchestrator
# ----------------------------
def download_and_upload_to_blob(
*,
item_id: str,
container_name: str,
blob_name: str,
connection_string: Optional[str] = None,
account_url: Optional[str] = None,
sas_token: Optional[str] = None,
content_type: Optional[str] = None,
overwrite: bool = False,
metadata: Optional[Dict[str, str]] = None
) -> str:
"""
Downloads a file from Microsoft Graph using the provided item_id and uploads it to Azure Blob Storage.
Returns the blob URL.
"""
# 1) Download bytes from Graph
file_bytes = download_file(item_id)
# 2) Upload to Blob
blob_url = upload_bytes_to_blob(
file_bytes,
container_name=container_name,
blob_name=blob_name,
connection_string=connection_string,
account_url=account_url,
sas_token=sas_token,
content_type=content_type,
overwrite=overwrite,
metadata=metadata,
)
return blob_url
# COMMAND ----------
items = list_all_items()
# COMMAND ----------
print(len(items))
# COMMAND ----------
print(items[0:3])
# COMMAND ----------
import json
print(json.dumps(items[0], indent=4))
# COMMAND ----------
from urllib.parse import urlparse, parse_qs, unquote
def after_ei_from_xml_location(url: str, *, decode: bool = True) -> str:
"""
Extracts the substring after '/EI/' from the XmlLocation query parameter.
Args:
url: The full URL containing the XmlLocation query parameter.
decode: If True, URL-decodes the result (default True).
Returns:
The substring after '/EI/' from XmlLocation, or an empty string if not found.
"""
parsed = urlparse(url)
qs = parse_qs(parsed.query)
xml_loc_values = qs.get("XmlLocation")
# print(f"xml_loc_values={xml_loc_values}")
if not xml_loc_values:
return "" # XmlLocation not present
# Take the first XmlLocation value
xml_loc = xml_loc_values[0]
if decode:
xml_loc = unquote(xml_loc)
# print(f"xml_loc={xml_loc}")
marker = "/EI/"
if marker not in xml_loc:
return "" # No /EI/ in the XmlLocation value
return xml_loc.split(marker, 1)[1]
# COMMAND ----------
download_and_upload_to_blob(item_id=items[0]["id"], container_name="iris", blob_name="domestic/EI/" + after_ei_from_xml_location(url=items[0]['webUrl']), connection_string=connection_string)
# COMMAND ----------
max_size = max(entry["size"] for entry in items)
sum_size = sum(entry["size"] for entry in items)
len_items = len(items)
print(f"max_size={max_size}, sum_size={sum_size}, len_items={len_items}")
# COMMAND ----------
for item in items:
try:
download_and_upload_to_blob(item_id=item["id"], container_name="iris", blob_name="domestic/EI/" + after_ei_from_xml_location(url=item['webUrl']), connection_string=connection_string)
print(f"{item["webUrl"]}")
except Exception as e:
print(f"Error: {e}, item_id={item["id"]}, item_url={item["webUrl"]}")
No comments:
Post a Comment