Skip to content

Commit

Permalink
Merge pull request #323 from parthsharma2/dev
Browse files Browse the repository at this point in the history
Make gh_repo_info_worker compatible with broker
  • Loading branch information
gabe-heim authored Jul 11, 2019
2 parents 781a5d0 + 77c0ca9 commit 55235df
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 32 deletions.
28 changes: 13 additions & 15 deletions workers/gh_repo_info_worker/gh_repo_info_worker/runtime.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
#############################################################
# This file is a copy of the augur_worker_github/runtime.py #
#############################################################


from flask import Flask, jsonify, request, Response
import click, os, json, requests, logging
from augur_worker_github.worker import GitHubWorker
from gh_repo_info_worker.worker import GHRepoInfoWorker
logging.basicConfig(filename='worker.log', filemode='w', level=logging.INFO)


Expand All @@ -22,7 +17,7 @@ def augwop_task():
"""
if request.method == 'POST': #will post a task to be added to the queue
logging.info("Sending to work on task: {}".format(str(request.json)))
app.gh_worker.task = request.json
app.gh_repo_info_worker.task = request.json

#set task
return Response(response=request.json,
Expand All @@ -40,12 +35,12 @@ def augwop_task():
def augwop_config():
""" Retrieve worker's config
"""
return app.gh_worker.config
return app.gh_repo_info_worker.config

@click.command()
@click.option('--augur-url', default='http://localhost:5000/', help='Augur URL')
@click.option('--host', default='localhost', help='Host')
@click.option('--port', default=51236, help='Port')
@click.option('--port', default=51237, help='Port')
def main(augur_url, host, port):
""" Declares singular worker and creates the server and flask app that it will be running on
"""
Expand All @@ -56,16 +51,16 @@ def main(augur_url, host, port):
server = read_config("Server", use_main_config=1)

config = {
"id": "com.augurlabs.core.github_worker",
"id": "com.augurlabs.core.gh_repo_info_worker",
"broker_port": server['port'],
"zombie_id": credentials["zombie_id"],
#"zombie_id": credentials["zombie_id"],
"host": credentials["host"],
"key": credentials["key"],
"password": credentials["password"],
"port": credentials["port"],
"user": credentials["user"],
"database": credentials["database"],
"table": "repo_badging",
"table": "repo_info",
"endpoint": "https://bestpractices.coreinfrastructure.org/projects.json",
"display_name": "",
"description": "",
Expand All @@ -75,13 +70,13 @@ def main(augur_url, host, port):

#create instance of the worker

app.gh_worker = GitHubWorker(config) # declares the worker that will be running on this server with specified config
app.gh_repo_info_worker = GHRepoInfoWorker(config) # declares the worker that will be running on this server with specified config

create_server(app, None)
logging.info("Starting Flask App with pid: " + str(os.getpid()) + "...")
app.run(debug=app.debug, host=host, port=port)
if app.gh_worker._child is not None:
app.gh_worker._child.terminate()
if app.gh_repo_info_worker._child is not None:
app.gh_repo_info_worker._child.terminate()
try:
requests.post('http://localhost:{}/api/unstable/workers/remove'.format(server['port']), json={"id": config['id']})
except:
Expand Down Expand Up @@ -147,3 +142,6 @@ def read_config(section, name=None, environment_variable=None, default=None, con

__config = __default_config
return(__config[section][name])

if __name__ == "__main__":
main()
100 changes: 83 additions & 17 deletions workers/gh_repo_info_worker/gh_repo_info_worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import logging
import time
from datetime import datetime
import os
import sys

logging.basicConfig(filename='worker.log', level=logging.INFO, filemode='w')

Expand Down Expand Up @@ -38,6 +40,8 @@ def __init__(self, config, task=None):
self._task = task
self._child = None
self._queue = Queue()
self._maintain_queue = Queue()
self.working_on = None
self.config = config
self.db = None
self.table = None
Expand All @@ -54,7 +58,7 @@ def __init__(self, config, task=None):
self.rate_limit = int(response.headers['X-RateLimit-Remaining'])

specs = {
"id": "com.augurlabs.core.gh_repo_info",
"id": self.config['id'],
"location": "http://localhost:51237",
"qualifications": [
{
Expand All @@ -66,7 +70,7 @@ def __init__(self, config, task=None):
}

self.DB_STR = 'postgresql://{}:{}@{}:{}/{}'.format(
self.config['user'], self.config['password'], self.config['host'], self.config['port'], self.config['name']
self.config['user'], self.config['password'], self.config['host'], self.config['port'], self.config['database']
)

logging.info("Making database connections...")
Expand All @@ -83,7 +87,7 @@ def __init__(self, config, task=None):
helper_metadata = MetaData()

metadata.reflect(self.db, only=['repo_info'])
# helper_metadata.reflect(self.helper_db)
helper_metadata.reflect(self.helper_db)

Base = automap_base(metadata=metadata)

Expand All @@ -105,7 +109,12 @@ def __init__(self, config, task=None):
else:
self.info_id_inc = repo_info_start_id + 1

# requests.post('http://localhost:5000/api/unstable/workers', json=specs)
try:
requests.post('http://localhost:{}/api/unstable/workers'.format(
self.config['broker_port']), json=specs)
except requests.exceptions.ConnectionError:
logging.error('Cannot connect to the broker')
sys.exit('Cannot connect to the broker! Quitting...')

@property
def task(self):
Expand All @@ -120,30 +129,87 @@ def task(self):

# repos = pd.read_sql(repo_id_sql, self.db)

@task.setter
def task(self, value):
git_url = value['given']['git_url']

repo_url_SQL = s.sql.text("""
SELECT min(repo_id) AS repo_id
FROM repo
WHERE repo_git = :repo_git
""")

rs = pd.read_sql(repo_url_SQL, self.db, params={'repo_git': git_url})

try:
repo_id = int(rs.iloc[0]['repo_id'])
if value['job_type'] == 'UPDATE':
self._queue.put(CollectorTask('TASK', {"git_url": git_url, "repo_id": repo_id}))
elif value['job_type'] == 'MAINTAIN':
self._maintain_queue.put(CollectorTask('TASK', {"git_url": git_url, "repo_id": repo_id}))

if 'focused_task' in value:
if value['focused_task'] == 1:
self.finishing_task = True

except Exception as e:
logging.error("Error: {}, or that repo is not in our database: {}".format(str(e), str(value)))

self._task = CollectorTask('TASK', {"git_url": git_url, "repo_id": repo_id})
self.run()

def cancel(self):
""" Delete/cancel current task """
self._task = None

def run(self):
logging.info("Running...")
self._child = Process(target=self.collect, args=())
self._child.start()
if self._child is None:
self._child = Process(target=self.collect, args=())
self._child.start()
requests.post("http://localhost:{}/api/unstable/add_pids".format(
self.config['broker_port']), json={'pids': [self._child.pid, os.getpid()]})

def collect(self, repos=None):

if repos == None:
repo_id_sql = s.sql.text("""
SELECT repo_id, repo_git FROM repo
""")
while True:
if not self._queue.empty():
message = self._queue.get()
self.working_on = 'UPDATE'
else:
if not self._maintain_queue.empty():
message = self._queue.get()
logging.info("Popped off message: {}".format(str(message.entry_info)))
self.working_on = "MAINTAIN"
else:
break

if message.type == 'EXIT':
break

if message.type != 'TASK':
raise ValueError(f'{message.type} is not a recognized task type')

if message.type == 'TASK':
owner, repo = self.get_owner_repo(message.entry_info['git_url'])
logging.info(f'Querying: {owner}/{repo}')
self.query_repo_info(message.entry_info['repo_id'],
owner, repo)


# if repos == None:
# repo_id_sql = s.sql.text("""
# SELECT repo_id, repo_git FROM repo
# """)

repos = pd.read_sql(repo_id_sql, self.db)
# repos = pd.read_sql(repo_id_sql, self.db)

for _, row in repos.iterrows():
owner, repo = self.get_owner_repo(row['repo_git'])
print(f'Querying: {owner}/{repo}')
self.query_repo_info(row['repo_id'], owner, repo)
# for _, row in repos.iterrows():
# owner, repo = self.get_owner_repo(row['repo_git'])
# print(f'Querying: {owner}/{repo}')
# self.query_repo_info(row['repo_id'], owner, repo)

print(f'Added repo info for {self.results_counter} repos')
# print(f'Added repo info for {self.results_counter} repos')


def get_owner_repo(self, git_url):
Expand Down Expand Up @@ -200,7 +266,7 @@ def query_repo_info(self, repo_id, owner, repo):
self.update_rate_limit(r)
j = r.json()['data']['repository']
except Exception as e:
logging.error('Caught Exception:', str(e))
logging.error('Caught Exception: ' + str(e))

logging.info(f'Inserting repo info for repo with id:{repo_id}, owner:{owner}, name:{repo}')

Expand Down

0 comments on commit 55235df

Please sign in to comment.