References - Python 3 TAXII v1.1 Client Example

Overview

This page provides a Python 3 example of connecting to the ThreatConnect TAXII Server as a TAXII client to pull Indicators. This is primarily useful when creating a Integrations - External TAXII v1.1 Download (Poll) integration.

This article applies to using the built-in ThreatConnect TAXII Server for TAXII v1.1. This article does not apply to the ThreatConnect TAXII Server Service for TAXII v2.1.

Example

In order to use this script, you must define the following environment variables:

  • TAXII_USER - Your TAXII username.

  • TAXII_PASS - Your TAXII password.

This example will hit the ThreatConnect Sandbox. If you wish to use this against another instance, modify this script appropriately.

This script will pull the last 24 hours worth of supported Indicators from the first collection that is returned from the Collection Management call.

"""TAXII Client Example""" import os from urllib.parse import urlparse import datetime import io import sys import libtaxii from stix.core import STIXPackage def perform_discovery(taxii_client): """Perform discovery against the TAXII endpoint. Arguments: taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance. Returns: (cmurl, pollurl) - A tuple containing the Collection Management and Poll URLs. """ # Build and execute the Discovery request so that we can find all our service URLs discovery_request = libtaxii.messages_11.DiscoveryRequest( libtaxii.messages_11.generate_message_id() ) discovery_request_xml = discovery_request.to_xml() discovery_http_request = taxii_client.call_taxii_service2( 'sandbox.threatconnect.com', '/api/taxii/discovery', libtaxii.VID_TAXII_XML_11, discovery_request_xml, ) discovery_http_response = libtaxii.get_message_from_http_response( discovery_http_request, discovery_request.message_id ) # Assign our service URLs based on the services we receive back (and print) cmurl = None pollurl = None for service in discovery_http_response.to_dict().get('service_instances', []): print(f"Service: {service.get('service_type')}, Endpoint: {service.get('service_address')}") if service.get('service_type') == "COLLECTION_MANAGEMENT": cmurl = service.get('service_address') elif service.get('service_type') == "POLL": pollurl = service.get('service_address') return (cmurl, pollurl) def get_target_collection(taxii_client, cmurl): """Find the first collection from our server. Arguments: taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance. cmurl (str): The full URL (as returned by a Discovery Request) for Collection Management. Returns: (str): The name of the first Collection returned in the Collection request. """ # Build and execute the Collections request collections_request = libtaxii.messages_11.CollectionInformationRequest( libtaxii.messages_11.generate_message_id() ) collections_request_xml = collections_request.to_xml() collections_http_request = taxii_client.call_taxii_service2( urlparse(cmurl).netloc, urlparse(cmurl).path, libtaxii.VID_TAXII_XML_11, collections_request_xml, ) collections_http_response = libtaxii.get_message_from_http_response( collections_http_request, collections_request.message_id ) # Find the first collection (and print all the collections for our edification) target_collection = None for collection in collections_http_response.to_dict().get('collection_informations', []): print(f"Collection: {collection.get('collection_name')}") if not target_collection: # We'll plan to poll the first collection returned to us target_collection = collection.get('collection_name') # Did we get a target_collection? if not target_collection: print("ERROR: No collections were returned by the remote server.", file=sys.stderr) return target_collection def get_content_blocks(taxii_client, pollurl, target_collection): """Get all of the content blocks from our server for the given target_collection. Arguments: taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance. pollurl (str): The URL for polling. target_collection (str): The name of the Collection that should be polled. Returns: (list): A list of the raw content blocks to be parsed. """ # Build and execute the Poll request so that we can get all of the indicators # from our selected collection # We must supply a beginning and end date. For ThreatConnect, the default time # delta between these two is 24 hours. end_date = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) begin_date = (end_date - datetime.timedelta(days=1)).replace(tzinfo=datetime.timezone.utc) poll_request = libtaxii.messages_11.PollRequest( libtaxii.messages_11.generate_message_id(), collection_name=target_collection, poll_parameters=libtaxii.messages_11.PollParameters(), exclusive_begin_timestamp_label=begin_date, inclusive_end_timestamp_label=end_date, ) poll_request_xml = poll_request.to_xml() poll_http_request = taxii_client.call_taxii_service2( urlparse(pollurl).netloc, urlparse(pollurl).path, libtaxii.VID_TAXII_XML_11, poll_request_xml, ) poll_http_response = libtaxii.get_message_from_http_response( poll_http_request, poll_request.message_id ) content_blocks = poll_http_response.to_dict().get("content_blocks", None) return content_blocks def main(): """Main method to perform all of the TAXII calls in this example.""" # Check our environment variables taxii_user = os.environ.get("TAXII_USER") taxii_pass = os.environ.get("TAXII_PASS") if not taxii_user or not taxii_pass: print( "ERROR: You must define the TAXII_USER and " "TAXII_PASS environment variables for your TAXII credentials.", file=sys.stderr, ) return 1 # Build a TAXII client taxii_client = libtaxii.clients.HttpClient( use_https=True, auth_type=libtaxii.clients.HttpClient.AUTH_BASIC, auth_credentials={'username': taxii_user, 'password': taxii_pass}, ) # Call discovery (cmurl, pollurl) = perform_discovery(taxii_client) # Did we actually get a Collections URL? if not cmurl: print( "ERROR: No collection management service was " "found on the remote server.", file=sys.stderr, ) return 1 # Did we actually get a Poll URL? if not pollurl: print("ERROR: No poll service was found on the remote server.", file=sys.stderr) return 1 # Find a target collection target_collection = get_target_collection(taxii_client, cmurl) # Get our content blocks content_blocks = get_content_blocks(taxii_client, pollurl, target_collection) # Did we get any content blocks? if not content_blocks: print("ERROR: No content was returned by the remote server.", file=sys.stderr) return 1 stix_packages = [] for content_block in content_blocks: if content_block.get("content", None): stix_packages.append( STIXPackage.from_xml(xml_file=io.BytesIO(content_block.get('content'))) ) # Print the indicators for stix_package in stix_packages: for indicator in stix_package.indicators: print(f"Indicator: {indicator.description}") return 0 if __name__ == "__main__": sys.exit(main())