Skip to content

Commit

Permalink
Merge branch 'master' of github.com:InternetHealthReport/kafka-toolbox
Browse files Browse the repository at this point in the history
  • Loading branch information
romain-fontugne committed Jan 30, 2025
2 parents 1832624 + 24014b8 commit 99f47b0
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 28 deletions.
16 changes: 8 additions & 8 deletions atlas/producers/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
arrow==1.2.3
confluent_kafka==2.0.2
msgpack_python==0.5.6
numpy==1.19.5
requests==2.27.1
requests_futures==1.0.0
ripe==0.0.4
ripe.atlas.cousteau==2.0.0
arrow
confluent_kafka
msgpack_python
numpy
requests
requests_futures
ripe
ripe.atlas.cousteau
4 changes: 2 additions & 2 deletions bgp/producers/bgpstream2.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def pushData(record_type, collector, startts, endts):
timeStart = args.startTime
else:
if recordType == 'updates':
timeStart = currentTime.replace(microsecond=0, second=0, minute=minuteStart)-timedelta(minutes=3*timeWindow)
timeStart = currentTime.replace(microsecond=0, second=0, minute=minuteStart)-timedelta(minutes=2*timeWindow)
else:
delay = 120
if 'rrc' in collector:
Expand All @@ -175,7 +175,7 @@ def pushData(record_type, collector, startts, endts):
timeEnd = args.endTime
else:
if recordType == 'updates':
timeEnd = currentTime.replace(microsecond=0, second=0, minute=minuteStart)-timedelta(minutes=2*timeWindow)
timeEnd = currentTime.replace(microsecond=0, second=0, minute=minuteStart)-timedelta(minutes=1*timeWindow)
else:
timeEnd = currentTime

Expand Down
4 changes: 2 additions & 2 deletions generic/detector/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
confluent_kafka==2.0.2
msgpack_python==0.5.6
confluent_kafka
msgpack_python
arrow
6 changes: 3 additions & 3 deletions handy/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
arrow==1.2.3
confluent_kafka==2.0.2
msgpack_python==0.5.6
arrow
confluent_kafka
msgpack_python
2 changes: 1 addition & 1 deletion ihr.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
KAFKA_HOST=10.0.1.12:9092
KAFKA_HOST=kafka1.storage.iijlab.net:9092,kafka2.storage.iijlab.net:9092,kafka6.storage.iijlab.net:9092
DB_CONNECTION_STRING=host=10.0.1.155 dbname=ihr user=romain
2 changes: 2 additions & 0 deletions psql/consumers/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
KAFKA_HOST=10.0.1.176:9092
DB_CONNECTION_STRING=host=10.0.1.155 dbname=ihr user=romain
2 changes: 1 addition & 1 deletion psql/consumers/ASHegemony.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def run(self):

if msg is None:
nb_timeout += 1
if nb_timeout > 60:
if nb_timeout > 180:
logging.warning("Time out!")
break
self.commit()
Expand Down
8 changes: 4 additions & 4 deletions psql/consumers/ASHegemony_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(self, topic, af, start, end, host="localhost", dbname="ihr"):
# conn_string = "host='127.0.0.1' dbname='%s'" % dbname

self.conn = psycopg2.connect(DB_CONNECTION_STRING)
columns=("timebin", "prefix", "originasn_id", "asn_id", "country_id", "hege", "rpki_status", "irr_status", "delegated_prefix_status", "delegated_asn_status", "af", "descr", "visibility", "moas")
columns=("id", "timebin", "prefix", "originasn_id", "asn_id", "country_id", "hege", "rpki_status", "irr_status", "delegated_prefix_status", "delegated_asn_status", "af", "descr", "visibility", "moas")
self.cpmgr = CopyManager(self.conn, "ihr_hegemony_prefix", columns)
self.cursor = self.conn.cursor()
logging.debug("Connected to the PostgreSQL server")
Expand Down Expand Up @@ -235,7 +235,7 @@ def save(self, msg):

# Hegemony values to copy in the database
if hege!= 0:
#("timebin", "prefix", "originasn_id", "asn_id", "country_id", "hege",
#("id", "timebin", "prefix", "originasn_id", "asn_id", "country_id", "hege",
# "rpki_status", "irr_status", "delegated_prefix_status", "delegated_asn_status",
# "af", "descr", "visibility", "moas")

Expand Down Expand Up @@ -279,7 +279,7 @@ def save(self, msg):
self.cache['descr'] = descr

self.dataHege.append([
self.currenttime, prefix, int(originasn), int(asn), cc, float(hege),
0, self.currenttime, prefix, int(originasn), int(asn), cc, float(hege),
rov_check['rpki']['status'], rov_check['irr']['status'],
rov_check['delegated']['prefix']['status'], rov_check['delegated']['asn']['status'],
self.af, descr, msg['nb_peers']
Expand All @@ -298,7 +298,7 @@ def commit(self):
# set visibility in percentage and add moas
for vec in self.dataHege:
vec[-1] = 100.0*(vec[-1]/self.nb_peers)
rnode = self.rtree.search_best(vec[1])
rnode = self.rtree.search_best(vec[2])
vec.append( len(rnode.data['originasn'])>1 )

logging.warning(f"psql: start copy, ts={self.currenttime}, nb. data points={len(self.dataHege)}")
Expand Down
12 changes: 6 additions & 6 deletions psql/consumers/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,23 @@ services:
metis_atlas_deployment_v6_as_path_length:
image: "internethealthreport/psql_consumers"
restart: unless-stopped
command: anomalyDetector.py metis_atlas_deployment_v4_as_path_length.conf
command: anomalyDetector.py metis_atlas_deployment_v6_as_path_length.conf
env_file:
- path: .env
required: true # default

metis_atlas_deployment_v6_ip_hops:
image: "internethealthreport/psql_consumers"
restart: unless-stopped
command: anomalyDetector.py metis_atlas_deployment_v4_ip_hops.conf
command: anomalyDetector.py metis_atlas_deployment_v6_ip_hops.conf
env_file:
- path: .env
required: true # default

metis_atlas_deployment_v6_rtt:
image: "internethealthreport/psql_consumers"
restart: unless-stopped
command: anomalyDetector.py metis_atlas_selection_v4_rtt.conf
command: anomalyDetector.py metis_atlas_deployment_v6_rtt.conf
env_file:
- path: .env
required: true # default
Expand Down Expand Up @@ -114,23 +114,23 @@ services:
metis_atlas_selection_v6_as_path_length:
image: "internethealthreport/psql_consumers"
restart: unless-stopped
command: anomalyDetector.py metis_atlas_selection_v4_as_path_length.conf
command: anomalyDetector.py metis_atlas_selection_v6_as_path_length.conf
env_file:
- path: .env
required: true # default

metis_atlas_selection_v6_ip_hops:
image: "internethealthreport/psql_consumers"
restart: unless-stopped
command: anomalyDetector.py metis_atlas_selection_v4_ip_hops.conf
command: anomalyDetector.py metis_atlas_selection_v6_ip_hops.conf
env_file:
- path: .env
required: true # default

metis_atlas_selection_v6_rtt:
image: "internethealthreport/psql_consumers"
restart: unless-stopped
command: anomalyDetector.py metis_atlas_selection_v4_rtt.conf
command: anomalyDetector.py metis_atlas_selection_v6_rtt.conf
env_file:
- path: .env
required: true # default
Expand Down
1 change: 0 additions & 1 deletion psql/consumers/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ arrow==1.2.3
confluent_kafka==2.0.2
geoip2==4.6.0
iso3166==2.1.1
kafka==1.3.5
kafka_python==2.0.2
msgpack_python==0.5.6
pgcopy==1.5.0
Expand Down

0 comments on commit 99f47b0

Please sign in to comment.