Azure Flow Log Analysis
Azure flow logs don't have the same instance ID that AWS flow logs do. So how do you figure out which VM the logs came from?
Intro
Disclaimer I currently work at Snowflake and use the product on a daily basis for log analysis and threat detection. At the time of this writing, that probably adds bias to my article.
At Snowflake, we’re a multi-cloud environment. As part of the threat detection team, it’s my job to ensure that we’re monitoring. One of my tasks was to monitor our Azure flow logs for unusual behavior. Information on Azure Network Security Group (NSG) flow logs can be found at this link. When I started exploring this data source, I was frustrated to learn that, unlike the AWS VPC flow logs, there was no field that ties the flow log to the instance ID, or in the case of Azure, the Virtual Machine. The recommended solution from Microsoft was to use the IP address. However, the architecture I was working in had thousands of worker nodes created and destroyed frequently meaning that we would often see duplicate IP addresses within a short period of time due to NATing. If I couldn’t automatically tie the IP to a VM, the logs would be useless as our IR team needed that information in order to conduct automated response in case of a malicious event.
Plotting
Fortunately, I had SQL to help. To conduct my analysis, I used Snowflake, which is where we (obviously) store our logs. However, any SQL tool should offer the same capabilities below. In order to conduct this analysis, I used a trick taught to me by one of the data scientists at Snowflake. First, for Azure logs, IPs aren’t associated with VMs. They’re associated with NICs and a NIC is associated with a VM. Now, a lot of organizations use various tools and scripts that collect information about current assets periodically. However, considering Snowflake creates and destroys VMs and NICs so rapidly, this isn’t sufficient. We also needed to look at the Azure operation logs in order to establish confidence that we were looking at every NIC. Let’s take a look at some code.
select
parse_json(propertiesresponseBody) as response_body,
response_body:etag::string as etag,
response_body:name::string as nic_name,
response_body:location::string as nic_locatin,
response_body:properties as nic_properties,
response_body:tags as tags,
response_body:type::string as nic_type,
value:properties.privateIPAddress::string as nic_ip
from
azure_operation_logs_v, lateral flatten(parse_json(properties:responseBody):properties.ipConfigurations)
where
operation_name = 'MICROSOFT.NETWORK/NETWORKINTERFACES/WRITE'
You can review the documentation on the structure here: Network Interfaces - Create Or Update.
This gets us information on NICs from our operation logs. We also had have scripts running that collect information on existing NICs. This is useful for looking at longer running VMs.
with nic_collect_logs as (
select
id::string as nic_id,
etag::string as etag,
nic_name::string as nic_name,
nic_location::string as nic_location,
nic_properties,
tags,
nic_type::string as nic_type,
value:properties.privateIPAddress::string as nic_ip
'collect_script' as source
min(recorded_at) as earliest,
max(recorded_at) as latest
from
AZURE_COLLECT_NETWORK_INTERFACES_V, lateral flatten(properties:ipConfigurations)
group by
1,2,3,4,5,6,7,8,9
)
select * from nic_collect_logs;
This is slightly different. We wanted to remove duplicate NICs by using group by and also note the first time we saw the NIC and the most recent time we’ve seen the NIC. Let’s apply the same to the NICs collected by the operation logs.
with nic_create_op_logs_raw as (
select
parse_json(propertiesresponseBody) as response_body,
response_body:etag::string as etag,
response_body:name::string as nic_name,
response_body:location::string as nic_location,
response_body:properties as nic_properties,
response_body:tags as tags,
response_body:type::string as nic_type,
value:properties.privateIPAddress::string as nic_ip
value:properties.entity::string as nic_id,
'operation_logs' as source
from
azure_operation_logs_v, lateral flatten(parse_json(properties:responseBody):properties.ipConfigurations)
where
operation_name = 'MICROSOFT.NETWORK/NETWORKINTERFACES/WRITE'
),
nic_create_op_logs as (
select
nic_id,
etag,
nic_name,
nic_location,
nic_properties,
tags,
nic_type,
nic_ip,
source
min(event_time) as earliest,
max(event_time) as latest from nic_create_op_logs
from nic_create_op_logs
group by
1,2,3,4,5,6,7,8,9
)
select * from nic_create_op_logs;
So now we have NICs created by operational logs and NICs identified from our collection scripts. Let’s combine them together using union.
with all_nics as (
select * from nic_create_op_logs
UNION
select * from nic_collect_logs
)
select * from all_nics;
Now, the issue comes up where we want to know the window of time that a NIC had a particular IP address. We can do this using the lead and lag functions to get the start and end window times for a NIC’s association with an IP address. In this case, we can use true_earliest as the window start. To find the window_end, we’ll use lead
select *,
true_earliest as window_start,
lead(true_earliest) over (
partition by nic_ip
order by true_earliest asc
) as window_end
from all_nics
;
The Code
Putting all the code together we have
with nic_collect_logs as (
select
id::string as nic_id,
etag::string as etag,
nic_name::string as nic_name,
nic_location::string as nic_location,
nic_properties,
tags,
nic_type::string as nic_type,
value:properties.privateIPAddress::string as nic_ip
'collect_script' as source
min(recorded_at) as earliest,
max(recorded_at) as latest
from
AZURE_COLLECT_NETWORK_INTERFACES_V, lateral flatten(properties:ipConfigurations)
group by
1,2,3,4,5,6,7,8,9
),
nic_create_op_logs_raw as (
select
parse_json(propertiesresponseBody) as response_body,
response_body:etag::string as etag,
response_body:name::string as nic_name,
response_body:location::string as nic_location,
response_body:properties as nic_properties,
response_body:tags as tags,
response_body:type::string as nic_type,
value:properties.privateIPAddress::string as nic_ip
value:properties.entity::string as nic_id,
'operation_logs' as source
from
azure_operation_logs_v, lateral flatten(parse_json(properties:responseBody):properties.ipConfigurations)
where
operation_name = 'MICROSOFT.NETWORK/NETWORKINTERFACES/WRITE'
),
nic_create_op_logs as (
select
nic_id,
etag,
nic_name,
nic_location,
nic_properties,
tags,
nic_type,
nic_ip,
source
min(event_time) as earliest,
max(event_time) as latest from nic_create_op_logs
from nic_create_op_logs
group by
1,2,3,4,5,6,7,8,9
),
with all_nics as (
select * from nic_create_op_logs
UNION
select * from nic_collect_logs
),
select *,
true_earliest as window_start,
lead(true_earliest) over (
partition by nic_ip
order by true_earliest asc
) as window_end
from all_nics;
And now we have NICs and the IPs associated with them and the window of time when that IP was associated with that NIC.
Our next step is to join this on VM data. That’s fairly easy.
For our Azure VMs, we did a similar exercise of joining data from Collect script and Operational logs. We’ll call this view AZURE_VMS_V
. We’ll call our Azure NICs view AZURE_NICS_V
.
select * from AZURE_NICS_V a
join AZURE_VMS_V b on lower(a.id) = lower(b.properties['networkProfile']['networkInterfaces'][0]['id])
A few notes - I used lower since there were some instances where the ID had varying cases and other instances where it didn’t. I’m attributing this to inconsistences with the APIs between collection scripts vs operational logs. You may also note that I used 0 instead of lateral flatten for accessing the network interfaces of the VMs. In our environment, we do not have VMs where there are multiple NICs. If you do, you can use lateral flatten again.
The final step is to join our flow logs on our NICs. In this way we’ll have joined Flow Log <> NIC <> VM. Fortunately, most of the hardwork is done.
select * from AZURE_FLOW_LOGS_V
left outer join AZURE_NICS_V on src_addr = nic_ip and event_time >= window_start and (event_time <= window_end or window_end is NULL)
left outer join AZURE_VMS_V on lower(AZURE_NICS_V.id) = lower(AZURE_VMS_V.properties['networkProfile']['networkInterfaces'][0]['id])
For joining the flow logs on the NIC, we want to ensure that the Flow Log event time occurred after the window start for the NIC and before the end time of the NIC. If the NIC still exists, the window_end would be NULL.
If you seek higher fidelity, you can also look at including tenant ID and subscription ID into your joins.
The obvious question is “how accurate is this?”
Let’s check. I’ll call our above logic AZURE_FLOW_LOGS_JOIN_VMS_V
. We’ll look for null VM IDs (where we couldn’t join on a VM) and the flow was recorded in the past day. (flow_recorded_at
is a column we add during ingestion that records the time the flow was ingested into Snowflake.)
select count(*) from AZURE_FLOW_LOGS_JOIN_VMS_V
where vm_id is NULL
and flow_recorded_at >= dateadd(hour, -1, current_timestamp())
and something similar for not null showed me that 99.75% of flow logs could be matched to their VM.
Conclusion
This was a lot of work for something that AWS offers natively. I hope that as Azure matures their product that they’ll also offer similar levels of visibility.
You may be wondering why I used IPs instead of MAC addresses. When inspecting our logs, a vast number of NICs has IPs but no MAC addresses. I do not have a clear reason from Microsoft why that is at the time of this publication.
I hope this work helps those working in security and Azure clouds. If you have other ideas or ways to improve the success rate, please let me know!