Overview
This page provides a Python 3 example of connecting to the ThreatConnect TAXII Server as a TAXII client to pull Indicators. This is primarily useful when creating a Integrations - External TAXII Download (Poll) integration.
Example
"""TAXII Client Example""" import os from urllib.parse import urlparse import datetime import io import sys import libtaxii from stix.core import STIXPackage def perform_discovery(taxii_client): """Perform discovery against the TAXII endpoint.""" # Build and execute the Discovery request so that we can find all our service URLs discovery_request = libtaxii.messages_11.DiscoveryRequest( libtaxii.messages_11.generate_message_id() ) discovery_request_xml = discovery_request.to_xml() discovery_http_request = taxii_client.call_taxii_service2( 'sandbox.threatconnect.com', '/api/taxii/discovery', libtaxii.VID_TAXII_XML_11, discovery_request_xml, ) discovery_http_response = libtaxii.get_message_from_http_response( discovery_http_request, discovery_request.message_id ) # Assign our service URLs based on the services we receive back (and print) cmurl = None pollurl = None for service in discovery_http_response.to_dict().get('service_instances', None): print(f"Service: {service['service_type']}, Endpoint: {service['service_address']}") if service['service_type'] == "COLLECTION_MANAGEMENT": cmurl = service['service_address'] elif service['service_type'] == "POLL": pollurl = service['service_address'] return (cmurl, pollurl) def get_target_collection(taxii_client, cmurl): """Find the first collection from our server.""" # Build and execute the Collections request collections_request = libtaxii.messages_11.CollectionInformationRequest( libtaxii.messages_11.generate_message_id() ) collections_request_xml = collections_request.to_xml() collections_http_request = taxii_client.call_taxii_service2( urlparse(cmurl).netloc, urlparse(cmurl).path, libtaxii.VID_TAXII_XML_11, collections_request_xml, ) collections_http_response = libtaxii.get_message_from_http_response( collections_http_request, collections_request.message_id ) # Find the first collection (and print all the collections for our edification) target_collection = None for collection in collections_http_response.to_dict().get('collection_informations', None): print(f"Collection: {collection['collection_name']}") if not target_collection: # We'll plan to poll the first collection returned to us target_collection = collection['collection_name'] # Did we get a target_collection? if not target_collection: print("ERROR: No collections were returned by the remote server.", file=sys.stderr) return target_collection def get_content_blocks(taxii_client, pollurl, target_collection): """Get all of the content blocks from our server for the given target_collection.""" # Build and execute the Poll request so that we can get all of the indicators # from our selected collection # We must supply a beginning and end date. For ThreatConnect, the default time # delta between these two is 24 hours. end_date = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) begin_date = (end_date - datetime.timedelta(days=1)).replace(tzinfo=datetime.timezone.utc) poll_request = libtaxii.messages_11.PollRequest( libtaxii.messages_11.generate_message_id(), collection_name=target_collection, poll_parameters=libtaxii.messages_11.PollParameters(), exclusive_begin_timestamp_label=begin_date, inclusive_end_timestamp_label=end_date, ) poll_request_xml = poll_request.to_xml() poll_http_request = taxii_client.call_taxii_service2( urlparse(pollurl).netloc, urlparse(pollurl).path, libtaxii.VID_TAXII_XML_11, poll_request_xml, ) poll_http_response = libtaxii.get_message_from_http_response( poll_http_request, poll_request.message_id ) content_blocks = poll_http_response.to_dict().get("content_blocks", None) return content_blocks def main(): """Main method to perform all of the TAXII calls in this example.""" # Check our environment variables taxii_user = os.environ.get("TAXII_USER") taxii_pass = os.environ.get("TAXII_PASS") if not taxii_user or not taxii_pass: print( "ERROR: You must define the TAXII_USER and " "TAXII_PASS environment variables for your TAXII credentials.", file=sys.stderr, ) return 1 # Build a TAXII client taxii_client = libtaxii.clients.HttpClient( use_https=True, auth_type=libtaxii.clients.HttpClient.AUTH_BASIC, auth_credentials={'username': taxii_user, 'password': taxii_pass}, ) # Call discovery (cmurl, pollurl) = perform_discovery(taxii_client) # Did we actually get a Collections URL? if not cmurl: print( "ERROR: No collection management service was " "found on the remote server.", file=sys.stderr, ) return 1 # Did we actually get a Poll URL? if not pollurl: print("ERROR: No poll service was found on the remote server.", file=sys.stderr) return 1 # Find a target collection target_collection = get_target_collection(taxii_client, cmurl) # Get our content blocks content_blocks = get_content_blocks(taxii_client, pollurl, target_collection) # Did we get any content blocks? if not content_blocks: print("ERROR: No content was returned by the remote server.", file=sys.stderr) return 1 stix_packages = [] for content_block in content_blocks: if content_block.get("content", None): stix_packages.append( STIXPackage.from_xml(xml_file=io.BytesIO(content_block['content'])) ) # Print the indicators for stix_package in stix_packages: for indicator in stix_package.indicators: print(f"Indicator: {indicator.description}") return 0 if __name__ == "__main__": sys.exit(main())