Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Overview

This page provides a Python 3 example of connecting to the ThreatConnect TAXII Server as a TAXII client to pull Indicators. This is primarily useful when creating a Integrations - External TAXII Download (Poll) integration.

Example

"""TAXII Client Example"""
import os
from urllib.parse import urlparse
import datetime
import io
import sys
import libtaxii
from stix.core import STIXPackage


def perform_discovery(taxii_client):
    """Perform discovery against the TAXII endpoint."""
    # Build and execute the Discovery request so that we can find all our service URLs
    discovery_request = libtaxii.messages_11.DiscoveryRequest(
        libtaxii.messages_11.generate_message_id()
    )
    discovery_request_xml = discovery_request.to_xml()
    discovery_http_request = taxii_client.call_taxii_service2(
        'sandbox.threatconnect.com',
        '/api/taxii/discovery',
        libtaxii.VID_TAXII_XML_11,
        discovery_request_xml,
    )
    discovery_http_response = libtaxii.get_message_from_http_response(
        discovery_http_request, discovery_request.message_id
    )

    # Assign our service URLs based on the services we receive back (and print)
    cmurl = None
    pollurl = None
    for service in discovery_http_response.to_dict().get('service_instances', None):
        print(f"Service: {service['service_type']}, Endpoint: {service['service_address']}")
        if service['service_type'] == "COLLECTION_MANAGEMENT":
            cmurl = service['service_address']
        elif service['service_type'] == "POLL":
            pollurl = service['service_address']

    return (cmurl, pollurl)


def get_target_collection(taxii_client, cmurl):
    """Find the first collection from our server."""
    # Build and execute the Collections request
    collections_request = libtaxii.messages_11.CollectionInformationRequest(
        libtaxii.messages_11.generate_message_id()
    )
    collections_request_xml = collections_request.to_xml()
    collections_http_request = taxii_client.call_taxii_service2(
        urlparse(cmurl).netloc,
        urlparse(cmurl).path,
        libtaxii.VID_TAXII_XML_11,
        collections_request_xml,
    )
    collections_http_response = libtaxii.get_message_from_http_response(
        collections_http_request, collections_request.message_id
    )

    # Find the first collection (and print all the collections for our edification)
    target_collection = None
    for collection in collections_http_response.to_dict().get('collection_informations', None):
        print(f"Collection: {collection['collection_name']}")
        if not target_collection:
            # We'll plan to poll the first collection returned to us
            target_collection = collection['collection_name']

    # Did we get a target_collection?
    if not target_collection:
        print("ERROR: No collections were returned by the remote server.", file=sys.stderr)

    return target_collection


def get_content_blocks(taxii_client, pollurl, target_collection):
    """Get all of the content blocks from our server for the given target_collection."""
    # Build and execute the Poll request so that we can get all of the indicators
    # from our selected collection
    # We must supply a beginning and end date. For ThreatConnect, the default time
    # delta between these two is 24 hours.
    end_date = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
    begin_date = (end_date - datetime.timedelta(days=1)).replace(tzinfo=datetime.timezone.utc)

    poll_request = libtaxii.messages_11.PollRequest(
        libtaxii.messages_11.generate_message_id(),
        collection_name=target_collection,
        poll_parameters=libtaxii.messages_11.PollParameters(),
        exclusive_begin_timestamp_label=begin_date,
        inclusive_end_timestamp_label=end_date,
    )
    poll_request_xml = poll_request.to_xml()
    poll_http_request = taxii_client.call_taxii_service2(
        urlparse(pollurl).netloc,
        urlparse(pollurl).path,
        libtaxii.VID_TAXII_XML_11,
        poll_request_xml,
    )
    poll_http_response = libtaxii.get_message_from_http_response(
        poll_http_request, poll_request.message_id
    )

    content_blocks = poll_http_response.to_dict().get("content_blocks", None)

    return content_blocks


def main():
    """Main method to perform all of the TAXII calls in this example."""
    # Check our environment variables
    taxii_user = os.environ.get("TAXII_USER")
    taxii_pass = os.environ.get("TAXII_PASS")

    if not taxii_user or not taxii_pass:
        print(
            "ERROR: You must define the TAXII_USER and "
            "TAXII_PASS environment variables for your TAXII credentials.",
            file=sys.stderr,
        )
        return 1

    # Build a TAXII client
    taxii_client = libtaxii.clients.HttpClient(
        use_https=True,
        auth_type=libtaxii.clients.HttpClient.AUTH_BASIC,
        auth_credentials={'username': taxii_user, 'password': taxii_pass},
    )

    # Call discovery
    (cmurl, pollurl) = perform_discovery(taxii_client)

    # Did we actually get a Collections URL?
    if not cmurl:
        print(
            "ERROR: No collection management service was " "found on the remote server.",
            file=sys.stderr,
        )
        return 1

    # Did we actually get a Poll URL?
    if not pollurl:
        print("ERROR: No poll service was found on the remote server.", file=sys.stderr)
        return 1

    # Find a target collection
    target_collection = get_target_collection(taxii_client, cmurl)

    # Get our content blocks
    content_blocks = get_content_blocks(taxii_client, pollurl, target_collection)

    # Did we get any content blocks?
    if not content_blocks:
        print("ERROR: No content was returned by the remote server.", file=sys.stderr)
        return 1

    stix_packages = []
    for content_block in content_blocks:
        if content_block.get("content", None):
            stix_packages.append(
                STIXPackage.from_xml(xml_file=io.BytesIO(content_block['content']))
            )

    # Print the indicators
    for stix_package in stix_packages:
        for indicator in stix_package.indicators:
            print(f"Indicator: {indicator.description}")

    return 0


if __name__ == "__main__":
    sys.exit(main())
  • No labels