Thursday, June 8, 2023

 

Sample Spark code for Databricks notebook:

Read CSV:

!pip install Keras-Preprocessing

import pickle

file_location = "abfss://container@storageaccount.dfs.core.windows.net/path/to/file.bin"

df = spark.read.format("binaryFile").option("pathGlobFilter", "*.bin").load(file_location)

val = df.rdd.map(lambda row: bytes(row.content)).first()

print(type(val))

tokenizer = pickle.loads(bytearray(val))

print(repr(tokenizer))

print(type(tokenizer))

print(tokenizer.word_index)

'''

<class 'bytes'>

<keras_preprocessing.text.Tokenizer object at 0x7f9d34bbb340>

<class 'keras_preprocessing.text.Tokenizer'>

{'key': value,

'''

Write CSV:

val mount_root = "/mnt/ContainerName/DirectoryName"

df.coalesce(1).write.format("csv").option("header","true").mode("OverWrite").save(s"dbfs:$mount_root/")

 

Sample Spark code with SAS URL for large csv in external storage:

import requests

 

CHUNK_SIZE=4096

filename = "filename1.csv"

with requests.get("<sas-url>", stream=True) as resp:

  if resp.ok:

    with open("/dbfs/" + filename, "wb") as f:

      for chunk in resp.iter_content(chunk_size=CHUNK_SIZE):

        f.write(chunk)

display(spark.read.csv("dbfs:/" + filename, header=True, inferSchema=True))       

 

 

 

for extra large items, download to dbfs and work with python utilities:

import requests

import os

CHUNK_SIZE=4096

filename = "filename2"

if not os.path.isfile("/dbfs/" + filename):

 print("downloading file...")

 with requests.get("<sas-url>", stream=True) as resp:

  if resp.ok:

    with open("/dbfs/" + filename, "wb") as f:

      for chunk in resp.iter_content(chunk_size=CHUNK_SIZE):

        f.write(chunk)

print("file found...")

file found…

 

No comments:

Post a Comment