#!/usr/bin/env python3importasabimportasab.webimportasab.metricsimportasab.api# Advertisement through ApiService requires two configuration sections - `web` and `zookeeper`asab.Config.add_defaults({"web":{"listen":"0.0.0.0 8089",},"asab:metrics":{"target":"influxdb"},"asab:metrics:influxdb":{"url":"http://localhost:8086/","db":"test","username":"test","password":"testtest",}})classMyApplication(asab.Application):def__init__(self):super().__init__(modules=[asab.web.Module,asab.metrics.Module])metrics_service=self.get_service('asab.MetricsService')self.MyCounter=metrics_service.create_counter("mycounter",tags={'foo':'bar'},init_values={'v1':0,'v2':0})self.MyGauge=metrics_service.create_gauge("mygauge",tags={'foo':'bar'},init_values={'v1':0,'v2':0})self.MyEPSCounter=metrics_service.create_eps_counter("myepscounter",tags={'foo':'bar'},init_values={'event.in':0})# The timer will trigger a message publishing at every secondself.PubSub.subscribe("Application.tick!",self.on_tick)# Initialize API serviceself.ApiService=asab.api.ApiService(self)self.ApiService.initialize_web()asyncdefon_tick(self,event_type):print("Tick!")self.MyCounter.add('v1',1)self.MyGauge.set('v1',1)self.MyEPSCounter.add('event.in',1)if__name__=='__main__':app=MyApplication()app.run()