Apache NiFi has a powerful web-based interface which provides a seamless experience between design, control, feedback, and monitoring. Sometimes however, you want to automate tasks instead of doing them manually using the UI. This does not only allow you to perform the tasks a lot quicker but it also helps make them more reproducible. It allows you to incorporate tasks in for example a CI/CD system without requiring human intervention. A NiFi feature to help you automate tasks is its powerful API. In order to more easily use this API from Python, NiPyAPI is available. In this blog post I'll describe some things you can do with NiPyAPI, some challenges I encountered and how I fixed them. You can find my sample code here.
Getting started with NiPyAPI
TLS
Before you can do anything, you need to specify several properties which are used to connect to NiFi. These include certificate information (when using TLS) and user authentication details. This can be challenging and require some work. See for example here. You will need to create keys, keystores, register them with NiFi, make them available to your Python code. This is also a barrier for other developers to start experimenting with the NiFi API.
You can also decide it is safe enough to not explicitly validate the NiFi public key by using the following in your Python code:
nipyapi.config.global_ssl_verify = False
nipyapi.config.nifi_config.verify_ssl = False
Your connection will still be encrypted but a man in the middle attack could fool you since the certificate and hostname are not verified. However, since I'm checking the certificate regularly by using a browser to access the NiFi webinterface (which uses the same certificate as the API) and the hostname is fixed in my script, the chances of that happening are negligible. I prefer an easy way to use the API and like to have my environment up and running quickly and easily.
Logging
In order to obtain some useful logging, I've added the following lines to my Python script:
log.setLevel(logging.INFO)
logging.getLogger('nipyapi.utils').setLevel(logging.INFO)
logging.getLogger('nipyapi.security').setLevel(logging.INFO)
logging.getLogger('nipyapi.versioning').setLevel(logging.INFO)
Next you can use log.info for example to log messages.
Automate tasks
Authenticating to the API
I was having some difficulties when logging into NiFi at a customer. Sometimes it did not work after a single try. This can be devastating to automated processes. I build in some retries in my Python code to make it more robust.
nipyapi.config.nifi_config.host = 'https://NIFIHOST/nifi-api'
nipyapi.config.default_nifi_username = 'NIFIUSER'
nipyapi.config.default_nifi_password = 'NIFIPASSWORD'
#Next a login procedure which tries 3 times with a delay
def nifi_login():
loggedin = False
i = 0
attempts = 3
delay = 10
while (i < attempts) and (loggedin is False):
log.info("Logging in. Attempt: " + str(i))
loggedin = nipyapi.security.service_login(bool_response=True)
if loggedin is False:
time.sleep(delay)
i = i + 1
return loggedin
Stopping and starting controller services and process groups
When updating a process group to a new version, processes need to be stopped, queues need to be empty and controller services need to be turned off. If you do this manually, it takes quite some time. Especially the queues and controller services.
The below procedure disables controller services and process groups.
app_pg_groups = nipyapi.canvas.list_all_process_groups(name_to_id[app])
# Disable controllers, process groups and empty queues
for app_pg_group in app_pg_groups:
log.info("Disabling process group: " + str(app_pg_group.component.name))
nipyapi.canvas.schedule_process_group(app_pg_group.id, False)
log.info("Disabling controllers for: " + str(app_pg_group.component.name))
acse = nipyapi.nifi.ActivateControllerServicesEntity()
acse.state = "DISABLED"
acse.id = app_pg_group.id
nipyapi.nifi.FlowApi().activate_controller_services(id=app_pg_group.id, body=acse)
Why did I use the low level SDK here (nipyapi.nifi) and not the high level SDK (nipyapi.flow)? I got permission denied errors when using the high level SDK. Apparently I did not have sufficient permissions. By using the more fine grained low level SDK, many more things become possible. As a general recommendation, first try to do things with the easy to use high level SDK and if you encounter issues, try the low level SDK. The low level SDK is harder to use but sometimes requires less permissions since it is more specific.
Empty queues
In order to empty a queue, you require the 'modify the data' permission. Suppose you do not have this permission and still want to automate emptying queues, can you? Yes, you can!
The trick is to set the flowfile expiration on the connection to 1 second and wait until the messages are cleared. Then you can restore the original setting to the connection and the queue will be empty. You can automate this. The below sample can be made more robust by but for example checking if the queue is empty every 3 seconds and if not wait a bit more.
app_pg_groups = nipyapi.canvas.list_all_process_groups(name_to_id[app])
# Empty queues
for app_pg_group in app_pg_groups:
log.info("Remove all messages on connection queues of: " + str(app_pg_group.component.name))
for con in nipyapi.canvas.list_all_connections(app_pg_group.id,True):
if (con.status.aggregate_snapshot.queued_count is not '0'):
log.info("Empty queue: " + str(con.id))
mycon = nipyapi.nifi.ConnectionsApi().get_connection(con.id)
oldexp = mycon.component.flow_file_expiration
mycon.component.flow_file_expiration = '1 sec'
nipyapi.nifi.ConnectionsApi().update_connection(con.id, mycon)
time.sleep(10)
mycon.component.flow_file_expiration = oldexp
nipyapi.nifi.ConnectionsApi().update_connection(con.id, mycon)
Cleaning up
When developing, your NiFi environment can become polluted with for example old parameters or controller services you do not use anymore. Wouldn't it be nice if you can easily find them in order to evaluate whether you can remove them?
Unused controller services
You can find unused controller services with a method like below.
app_pg = nipyapi.canvas.get_process_group(identifier_type='id',identifier=name_to_id[app])
# Enable controllers and process groups
cs = nipyapi.nifi.FlowApi().get_controller_services_from_group(id=app_pg.id, include_ancestor_groups=False, include_descendant_groups=True).controller_services
for service in cs:
if len(service.component.referencing_components)==0:
print("Controller service " + str(service.component.name) + " has no referencing components. Parent pg: "+id_to_name[service.component.parent_group_id])
After identifying them, you should manually check if they can truly be removed and then you can clean them up.
Finding unused parameters
Finally
By using NiPyAPI in combination with some Python scripting, you can quite easily make your NiFi experience a lot better! Not only is this an enabler for automating deployments but it can also help reduce the effort required for cleaning up an environment of components which are no longer being used.
No comments:
Post a Comment