References - Python 3 TAXII v1.1 Client Example
Overview
This page provides a Python 3 example of connecting to the ThreatConnect TAXII Server as a TAXII client to pull Indicators. This is primarily useful when creating a Integrations - External TAXII v1.1 Download (Poll) integration.
This article applies to using the built-in ThreatConnect TAXII Server for TAXII v1.1. This article does not apply to the ThreatConnect TAXII Server Service for TAXII v2.1.
Example
In order to use this script, you must define the following environment variables:
TAXII_USER - Your TAXII username.
TAXII_PASS - Your TAXII password.
This example will hit the ThreatConnect Sandbox. If you wish to use this against another instance, modify this script appropriately.
This script will pull the last 24 hours worth of supported Indicators from the first collection that is returned from the Collection Management call.
"""TAXII Client Example"""
import os
from urllib.parse import urlparse
import datetime
import io
import sys
import libtaxii
from stix.core import STIXPackage
def perform_discovery(taxii_client):
"""Perform discovery against the TAXII endpoint.
Arguments:
taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance.
Returns:
(cmurl, pollurl) - A tuple containing the Collection Management and Poll URLs.
"""
# Build and execute the Discovery request so that we can find all our service URLs
discovery_request = libtaxii.messages_11.DiscoveryRequest(
libtaxii.messages_11.generate_message_id()
)
discovery_request_xml = discovery_request.to_xml()
discovery_http_request = taxii_client.call_taxii_service2(
'sandbox.threatconnect.com',
'/api/taxii/discovery',
libtaxii.VID_TAXII_XML_11,
discovery_request_xml,
)
discovery_http_response = libtaxii.get_message_from_http_response(
discovery_http_request, discovery_request.message_id
)
# Assign our service URLs based on the services we receive back (and print)
cmurl = None
pollurl = None
for service in discovery_http_response.to_dict().get('service_instances', []):
print(f"Service: {service.get('service_type')}, Endpoint: {service.get('service_address')}")
if service.get('service_type') == "COLLECTION_MANAGEMENT":
cmurl = service.get('service_address')
elif service.get('service_type') == "POLL":
pollurl = service.get('service_address')
return (cmurl, pollurl)
def get_target_collection(taxii_client, cmurl):
"""Find the first collection from our server.
Arguments:
taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance.
cmurl (str): The full URL (as returned by a Discovery Request) for Collection Management.
Returns:
(str): The name of the first Collection returned in the Collection request.
"""
# Build and execute the Collections request
collections_request = libtaxii.messages_11.CollectionInformationRequest(
libtaxii.messages_11.generate_message_id()
)
collections_request_xml = collections_request.to_xml()
collections_http_request = taxii_client.call_taxii_service2(
urlparse(cmurl).netloc,
urlparse(cmurl).path,
libtaxii.VID_TAXII_XML_11,
collections_request_xml,
)
collections_http_response = libtaxii.get_message_from_http_response(
collections_http_request, collections_request.message_id
)
# Find the first collection (and print all the collections for our edification)
target_collection = None
for collection in collections_http_response.to_dict().get('collection_informations', []):
print(f"Collection: {collection.get('collection_name')}")
if not target_collection:
# We'll plan to poll the first collection returned to us
target_collection = collection.get('collection_name')
# Did we get a target_collection?
if not target_collection:
print("ERROR: No collections were returned by the remote server.", file=sys.stderr)
return target_collection
def get_content_blocks(taxii_client, pollurl, target_collection):
"""Get all of the content blocks from our server for the given target_collection.
Arguments:
taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance.
pollurl (str): The URL for polling.
target_collection (str): The name of the Collection that should be polled.
Returns:
(list): A list of the raw content blocks to be parsed.
"""
# Build and execute the Poll request so that we can get all of the indicators
# from our selected collection
# We must supply a beginning and end date. For ThreatConnect, the default time
# delta between these two is 24 hours.
end_date = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
begin_date = (end_date - datetime.timedelta(days=1)).replace(tzinfo=datetime.timezone.utc)
poll_request = libtaxii.messages_11.PollRequest(
libtaxii.messages_11.generate_message_id(),
collection_name=target_collection,
poll_parameters=libtaxii.messages_11.PollParameters(),
exclusive_begin_timestamp_label=begin_date,
inclusive_end_timestamp_label=end_date,
)
poll_request_xml = poll_request.to_xml()
poll_http_request = taxii_client.call_taxii_service2(
urlparse(pollurl).netloc,
urlparse(pollurl).path,
libtaxii.VID_TAXII_XML_11,
poll_request_xml,
)
poll_http_response = libtaxii.get_message_from_http_response(
poll_http_request, poll_request.message_id
)
content_blocks = poll_http_response.to_dict().get("content_blocks", None)
return content_blocks
def main():
"""Main method to perform all of the TAXII calls in this example."""
# Check our environment variables
taxii_user = os.environ.get("TAXII_USER")
taxii_pass = os.environ.get("TAXII_PASS")
if not taxii_user or not taxii_pass:
print(
"ERROR: You must define the TAXII_USER and "
"TAXII_PASS environment variables for your TAXII credentials.",
file=sys.stderr,
)
return 1
# Build a TAXII client
taxii_client = libtaxii.clients.HttpClient(
use_https=True,
auth_type=libtaxii.clients.HttpClient.AUTH_BASIC,
auth_credentials={'username': taxii_user, 'password': taxii_pass},
)
# Call discovery
(cmurl, pollurl) = perform_discovery(taxii_client)
# Did we actually get a Collections URL?
if not cmurl:
print(
"ERROR: No collection management service was " "found on the remote server.",
file=sys.stderr,
)
return 1
# Did we actually get a Poll URL?
if not pollurl:
print("ERROR: No poll service was found on the remote server.", file=sys.stderr)
return 1
# Find a target collection
target_collection = get_target_collection(taxii_client, cmurl)
# Get our content blocks
content_blocks = get_content_blocks(taxii_client, pollurl, target_collection)
# Did we get any content blocks?
if not content_blocks:
print("ERROR: No content was returned by the remote server.", file=sys.stderr)
return 1
stix_packages = []
for content_block in content_blocks:
if content_block.get("content", None):
stix_packages.append(
STIXPackage.from_xml(xml_file=io.BytesIO(content_block.get('content')))
)
# Print the indicators
for stix_package in stix_packages:
for indicator in stix_package.indicators:
print(f"Indicator: {indicator.description}")
return 0
if __name__ == "__main__":
sys.exit(main())