Saturday, May 24, 2025

agentic retrieval

 In continuation of previous post, you can either upgrade your LLM from text-to-text, multimodal or reasoning models or you can employ specific agents in an agentic retrieval to get better analysis for your drone imagesThe agentic retrieval is easier to diversify and scale but the model is harder to upskill. The flip side is that the more agents use LLMs, the number and cost for tokens used increases proportionally while the sample data can be enhanced to help fine-tune or improve chain of thought for the reasoning model with zero cost or change to the infrastructure. The following section walks us through a sample of using an agent retrieval. 

#! /usr/bin/python 

""" 
requirements.txt: 
azure-identity 
openai 
aiohttp 
ipykernel 
dotenv 
requests 
azure-search-documents==11.6.0b12 
""" 
from azure.ai.agents.models import FunctionTool, ToolSet, ListSortOrder 
 
from azure.search.documents.agent import KnowledgeAgentRetrievalClient 
from azure.search.documents.agent.models import KnowledgeAgentRetrievalRequest, KnowledgeAgentMessage, KnowledgeAgentMessageTextContent, KnowledgeAgentIndexParams 
 
agent_client = KnowledgeAgentRetrievalClient(endpoint=endpoint, agent_name=agent_name, credential=credential) 
 
thread = project_client.agents.threads.create() 
retrieval_results = {} 
 
def agentic_retrieval() -> str: 
    """ 
        Searches the drone images and other curated metadata and facts. 
        The returned string is in a JSON format that contains the reference id. 
        Using the same format as in agent's response 
        References are cited by zero-based id number 
    """ 
    # Take the last 5 messages in the conversation 
    messages = project_client.agents.messages.list(thread.id, limit=5, order=ListSortOrder.DESCENDING) 
    # Reverse the order so the most recent message is last 
    messages = list(messages) 
    messages.reverse() 
    retrieval_result = agent_client.retrieve( 
        retrieval_request=KnowledgeAgentRetrievalRequest( 
            messages=[KnowledgeAgentMessage(role=msg["role"], content=[KnowledgeAgentMessageTextContent(text=msg.content[0].text)]) for msg in messages if msg["role"] != "system"], 
            target_index_params=[KnowledgeAgentIndexParams(index_name=index_name, reranker_threshold=2.5)] 
        ) 
    ) 
 
    # Associate the retrieval results with the last message in the conversation 
    last_message = messages[-1] 
    retrieval_results[last_message.id] = retrieval_result 
 
    # Return the grounding response to the agent 
    return retrieval_result.response[0].content[0].text 
 
# https://learn.microsoft.com/en-us/azure/ai-services/agents/how-to/tools/function-calling 
functions = FunctionTool({ agentic_retrieval }) 
toolset = ToolSet() 
toolset.add(functions) 
project_client.agents.enable_auto_function_calls(toolset) 

 

# start a chat 

from azure.ai.agents.models import AgentsNamedToolChoice, AgentsNamedToolChoiceType, FunctionName 
 
message = project_client.agents.messages.create( 
    thread_id=thread.id, 
    role="user", 
    content=""" 
        Which landmarks are responsible for intersections that are more prone to vehicle and pedestrian traffic conflicts or activities? Which hours of the day are quiet for pedestrian traffic at those intersections? 
    """ 
) 
 
run = project_client.agents.runs.create_and_process( 
    thread_id=thread.id, 
    agent_id=agent.id, 
    tool_choice=AgentsNamedToolChoice(type=AgentsNamedToolChoiceType.FUNCTION, function=FunctionName(name="agentic_retrieval")), 
    toolset=toolset) 
if run.status == "failed": 
    raise RuntimeError(f"Run failed: {run.last_error}") 
output = project_client.agents.messages.get_last_message_text_by_role(thread_id=thread.id, role="assistant").text.value 
 
print("Agent response:", output.replace(".", "\n")) 

 

Friday, May 23, 2025

 Building a reasoning model from an LLM can be a great return-on-investment and involves just reinforcement learning. The fun part is that it can work for both text-to-text LLMs such as GPT3.5/4 and Multi-modal LLMs such as GPT4o. Reasoning models have a dedicated chain-of-thought phase to decompose and logically work through problems before responding. Examples of reasoning models are o4, and Phi but let us review how to do that with drone imagery models for drone sensing applications where an agent can send a drone image to a fine-tuned model to get observations and then send the observations to a reasoning model to get a better analysis. In this example of building a reasoning model, we use Group Relative Policy Optimization aka GRPO technique and use Azure AI Machine Learning platform because it helps with debugging, logging, profiling and metrics. GRPO is a reinforcement learning technique that compares multiple answers within a group, rewards the best-performing outputs, penalizes poor ones and applies careful updates to avoid sudden changes.

#! /usr/bin/python

# requisites:

#! pip install azure-core azure-ai-ml rich huggingface_hub

#! curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash

#! export HF_TOKEN="hf_xxxxxxxxx"

from aml_setup import setup

ml_clent, drone_mcqa_data, model, compute, environment = setup()

"""

# Sample multiple choice

Which direction has more population density?

A. North

B. East

C. West

D. South

The Ideal reasoning model response:

<think>

Start by rotating the current image to align with the true North relative to the drone camera. Find people in any of the four quadrants. Select the direction from the quadrant that improves the probability the most. If there are no winners among the quadrants, select the direction based on the direction of travel.

</think>

The presence of habitats such as building or outgoing traffic indicates the direction where population density is most.

Final Answer: C.

"""

trainer = GRPOTrainer(

     model=current_policy,

     reward_funcs=reward_function,

     train_dataset=dataset[script_args.dataset_train_split],

     args=training_args,

     peft_config=get_peft_config(model_args),

     processing_class=tokenizer,

     eval_dataset=(

       dataset[script_args.dataset_test_split]

       if training_args.eval_strategy != "no"

       else None

     ),

     callbacks=[save_mlflow_callback],

)

trainer.train()

def format_reward(completions, **kwargs):

    """

    This function determines whether the predicted answer is in the correct format.

    It checks if the reasoning process is enclosed within <think> and </think> tags,

    while the final answer is enclosed within <answer> and </answer> tags.

    Args:

        completions (list): List of model predictions.

    Returns:

        list: List of rewards (1.0 for correct format, 0.0 for incorrect format)

    """

    pattern = r"^<think>\n.*?\n</think>\n<answer>\n.*?\n</answer>$"

    completion_contents = [completion[0]["content"] for completion in completions]

    matches = [

       re.match(pattern, content, re.DOTALL | re.MULTILINE)

       for content in completion_contents

    ]

    return [1.0 if match else 0.0 for match in matches]

"""

# The following are configurations to

# run vllm to generate samples

# there are two ways to do this: server and collocate

# collocate mode runs sampler and trainer on same GPU.

use_vllm: True

vllm_mode: "collocate"

vllm_gpu_memory_utilization: 0.25

vllm_tensor_parallel_size: 4

reward_funcs:

- accuracy

- format

# reward = 0.8*accuracy + 0.2*format

"""

from azure.ai.ml import command, Input, Output

from azure.ai.ml.entities import (

    ManagedOnlineEndpoint,

    ManagedOnlineDeployment,

    Model

)

from azure.ai.ml.constants import AssetTypes

# the following is a command job that takes grpo config, deepspeed config, dataset and the mdoel parameters and kicks off a distributed job.

command_str = """python train.py \

    --config grpo_trainer_config.yaml \

    --model_name_or_path ${{inputs.model_dir}} \

    --dataset_name ${{inputs.dataset}} \

    --output_dir ${{oututs.checkpoint_folder}} \

    --final_model_save_path ${{outputs.mlflow_model_folder}} \

    --deepspeeed deepspeed_stage3_zero_config.json \

    --mlflow_task_type "chat-completion" """

command_str += f'--base_model_name "model.name"'

job_input = {

     "model_dir": Input(

          path=model.path,

          type=AssetTypes.CUSTOM_MODEL,

     ),

     "mlflow_model_folder": Output(

          type=AssetTypes.CUSTOM_MODEL,

          mode="rw_mount",

     ),

     "checkpoint_folder": Output(

          type=AssetTypes.URI_FOLDER,

          mode="rw_mount",

     )

} # notice checkpoints are saved in a separate folder.

job = command(

    code="./src"

    inputs=job_input,

    command=command_str,

    environment=environment,

    compute=compute.name,

    instance_count=2,

    outputs=job_output,

    distribution={

       "type": "PyTorch",

        "process_count_per_instance": 8,

    },

    experiment_name = "drone-images-reasoning-training-jobs",

    display_name = "drone-images-reasoning-train-batchsize-16,

    properties = {

        "_azureml.LogTrainingMetricsToAzMon": "true"

    },

    environment variables = {

        "KINETO_USE_DAEMON": "1",

        "ENABLE_AZUREML_TRAINING_PROFILER": "true",

        "AZUREML_PROFILER_WAIT_DURATION_SECOND": "2",

        "AZUREML_PROFILER_RUN_DURATION_MILLISECOND": "500",

        "AZUREML_COMMON_RUNTIME_USE_APPINSIGHTS_CAPABILITY": "true",

    }

}

# the following is for training

train_job = ml_client.jobs.create_or_update(job)

train_job

# the following registers the model which is required to deploy it to an endpoint.

model_output_path = f"azureml://jobs/{train_job.name}/outputs/mlflow_model_folder"

run_model = Model(

    path=model_output_path,

    name="grpo-reasoning-model",

    description=f"Model created from run {train_job.name}.",

    type=AssetTypes.MLFLOW_MODEL

)

ft_model = ml_client.models.create_or_update(run_model)

deployment = ManagedOnlineDeployment(

    name="grpo-rft-model-deployment",

    endpoint_name=online_endpoint_name,

    model=ft_model,

    instance_type="Standard_ND96amsr_A100_v4",

    instance_count=1

)

ml_client.begin_create_or_update(deployment)


Thursday, May 22, 2025

 A previous post talked about writing SQL queries to create embedding and perform vector search over the shredded description in JSON format from drone image analysis output along with associated vectors and then using the built-in operators to query the objects associated with the vectors. This article talks about creating an agent in Azure AI search with consolidated vector search from local vectors and those in the SQL database and where the agent acts as a wrapper for an LLM deployed to Azure Open AI. The LLM is used to send queries to an agentic retrieval pipeline.

from azure.search.documents.indexes import SearchIndexClient

from azure.search.documents.indexes.models import (

    KnowledgeAgent,

    KnowledgeAgentAzureOpenAIModel,

    KnowledgeAgentRequestLimits,

    KnowledgeAgentTargetIndex

)

agent=KnowledgeAgent(

    name=agent_name,

    target_indexes=[

        KnowledgeAgentTargetIndex(

            index_name=index_name, default_include_reference_source_data=True,

default_reranker_threshold=2.5

        )

    ],

    models=[

        KnowledgeAgentAzureOpenAIModel(

            azure_open_ai_parameters=AzureOpenAIVectorizerParameters(

                resource_url=azure_openai_endpoint,

                deployment_name=azure_openai_gpt_deployment,

                model_name=azure_openai_gpt_model,

            )

        )

    ],

    request_limits=KnowledgeAgentRequestLimits(

        max_output_size=agent_max_output_tokens

    )

)

index_client = SearchIndexClient(endpoint=endpoint, credential=credential)

index_client.create_or_update_agent(agent)

And with constants such as

AZURE_OPENAI_ENDPOINT=https://<openai-resource-name>.openai.azure.com

AZURE_OPENAI_GPT_DEPLOYMENT=gpt-4o-mini

AZURE_SEARCH_ENDPOINT=https://<search-resource-name>.search.windows.net

AZURE_SEARCH_INDEX_NAME=agentic-retrieval-drone-images

And its usage as follows:

from azure.search.documents.agent import KnowledgeAgentRetrievalClient

from azure.search.documents.agent.models import KnowledgeAgentRetrievalClient, KnowledgeAgentMessage

agent_client = KnowledgeAgentRetrievalClient(

  endpoint=AZURE_SEARCH_ENDPOINT, agent_name=AZURE_SEARCH_AGENT, credential=azure_credential

 )

messages.append({

  “role”: “user”,

  “content”:

“““

How do the landmarks detailed in the object detection output compare in proximity to those found near high population density?

”””

})

retrieval_result = agent_client.knowledge_retrieval.retrieve(

   messages[KnowledgeAgentMessage(

  role=msgp[“role”],

            content=[KnowledgeAgentMessageTextContent(text=msg[“content”])])

        for msg in messages if msg[“role”] != “system”],

   Target_index_params=[KnowedgeAgentIndexParams(index_name=index_name, reranker_threshold=3, include_reference_source_data=True)],

   )

)

messages.append({

   “role”: “assistant”,

   “content”: retrieval_result.response[0].content[0].text

})