"""TAXII Client Example"""
import os
from urllib.parse import urlparse
import datetime
import io
import sys
import libtaxii
from stix.core import STIXPackage
def perform_discovery(taxii_client):
"""Perform discovery against the TAXII endpoint.
Arguments:
taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance.
Returns:
(cmurl, pollurl) - A tuple containing the Collection Management and Poll URLs.
"""
# Build and execute the Discovery request so that we can find all our service URLs
discovery_request = libtaxii.messages_11.DiscoveryRequest(
libtaxii.messages_11.generate_message_id()
)
discovery_request_xml = discovery_request.to_xml()
discovery_http_request = taxii_client.call_taxii_service2(
'sandbox.threatconnect.com',
'/api/taxii/discovery',
libtaxii.VID_TAXII_XML_11,
discovery_request_xml,
)
discovery_http_response = libtaxii.get_message_from_http_response(
discovery_http_request, discovery_request.message_id
)
# Assign our service URLs based on the services we receive back (and print)
cmurl = None
pollurl = None
for service in discovery_http_response.to_dict().get('service_instances', None[]):
print(f"Service: {service[.get('service_type'])}, Endpoint: {service[.get('service_address'])}")
if service[.get('service_type']) == "COLLECTION_MANAGEMENT":
cmurl = service[.get('service_address'])
elif service[.get('service_type']) == "POLL":
pollurl = service[.get('service_address'])
return (cmurl, pollurl)
def get_target_collection(taxii_client, cmurl):
"""Find the first collection from our server.
Arguments:
taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance.
cmurl (str): The full URL (as returned by a Discovery Request) for Collection Management.
Returns:
(str): The name of the first Collection returned in the Collection request.
"""
# Build and execute the Collections request
collections_request = libtaxii.messages_11.CollectionInformationRequest(
libtaxii.messages_11.generate_message_id()
)
collections_request_xml = collections_request.to_xml()
collections_http_request = taxii_client.call_taxii_service2(
urlparse(cmurl).netloc,
urlparse(cmurl).path,
libtaxii.VID_TAXII_XML_11,
collections_request_xml,
)
collections_http_response = libtaxii.get_message_from_http_response(
collections_http_request, collections_request.message_id
)
# Find the first collection (and print all the collections for our edification)
target_collection = None
for collection in collections_http_response.to_dict().get('collection_informations', None[]):
print(f"Collection: {collection[.get('collection_name'])}")
if not target_collection:
# We'll plan to poll the first collection returned to us
target_collection = collection[.get('collection_name'])
# Did we get a target_collection?
if not target_collection:
print("ERROR: No collections were returned by the remote server.", file=sys.stderr)
return target_collection
def get_content_blocks(taxii_client, pollurl, target_collection):
"""Get all of the content blocks from our server for the given target_collection.
Arguments:
taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance.
pollurl (str): The URL for polling.
target_collection (str): The name of the Collection that should be polled.
Returns:
(list): A list of the raw content blocks to be parsed.
"""
# Build and execute the Poll request so that we can get all of the indicators
# from our selected collection
# We must supply a beginning and end date. For ThreatConnect, the default time
# delta between these two is 24 hours.
end_date = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
begin_date = (end_date - datetime.timedelta(days=1)).replace(tzinfo=datetime.timezone.utc)
poll_request = libtaxii.messages_11.PollRequest(
libtaxii.messages_11.generate_message_id(),
collection_name=target_collection,
poll_parameters=libtaxii.messages_11.PollParameters(),
exclusive_begin_timestamp_label=begin_date,
inclusive_end_timestamp_label=end_date,
)
poll_request_xml = poll_request.to_xml()
poll_http_request = taxii_client.call_taxii_service2(
urlparse(pollurl).netloc,
urlparse(pollurl).path,
libtaxii.VID_TAXII_XML_11,
poll_request_xml,
)
poll_http_response = libtaxii.get_message_from_http_response(
poll_http_request, poll_request.message_id
)
content_blocks = poll_http_response.to_dict().get("content_blocks", None)
return content_blocks
def main():
"""Main method to perform all of the TAXII calls in this example."""
# Check our environment variables
taxii_user = os.environ.get("TAXII_USER")
taxii_pass = os.environ.get("TAXII_PASS")
if not taxii_user or not taxii_pass:
print(
"ERROR: You must define the TAXII_USER and "
"TAXII_PASS environment variables for your TAXII credentials.",
file=sys.stderr,
)
return 1
# Build a TAXII client
taxii_client = libtaxii.clients.HttpClient(
use_https=True,
auth_type=libtaxii.clients.HttpClient.AUTH_BASIC,
auth_credentials={'username': taxii_user, 'password': taxii_pass},
)
# Call discovery
(cmurl, pollurl) = perform_discovery(taxii_client)
# Did we actually get a Collections URL?
if not cmurl:
print(
"ERROR: No collection management service was " "found on the remote server.",
file=sys.stderr,
)
return 1
# Did we actually get a Poll URL?
if not pollurl:
print("ERROR: No poll service was found on the remote server.", file=sys.stderr)
return 1
# Find a target collection
target_collection = get_target_collection(taxii_client, cmurl)
# Get our content blocks
content_blocks = get_content_blocks(taxii_client, pollurl, target_collection)
# Did we get any content blocks?
if not content_blocks:
print("ERROR: No content was returned by the remote server.", file=sys.stderr)
return 1
stix_packages = []
for content_block in content_blocks:
if content_block.get("content", None):
stix_packages.append(
STIXPackage.from_xml(xml_file=io.BytesIO(content_block[.get('content'])))
)
# Print the indicators
for stix_package in stix_packages:
for indicator in stix_package.indicators:
print(f"Indicator: {indicator.description}")
return 0
if __name__ == "__main__":
sys.exit(main())
|