Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make gh_repo_info_worker compatible with broker #323

Merged
merged 1 commit into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions workers/gh_repo_info_worker/gh_repo_info_worker/runtime.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
#############################################################
# This file is a copy of the augur_worker_github/runtime.py #
#############################################################


from flask import Flask, jsonify, request, Response
import click, os, json, requests, logging
from augur_worker_github.worker import GitHubWorker
from gh_repo_info_worker.worker import GHRepoInfoWorker
logging.basicConfig(filename='worker.log', filemode='w', level=logging.INFO)


Expand All @@ -22,7 +17,7 @@ def augwop_task():
"""
if request.method == 'POST': #will post a task to be added to the queue
logging.info("Sending to work on task: {}".format(str(request.json)))
app.gh_worker.task = request.json
app.gh_repo_info_worker.task = request.json

#set task
return Response(response=request.json,
Expand All @@ -40,12 +35,12 @@ def augwop_task():
def augwop_config():
""" Retrieve worker's config
"""
return app.gh_worker.config
return app.gh_repo_info_worker.config

@click.command()
@click.option('--augur-url', default='http://localhost:5000/', help='Augur URL')
@click.option('--host', default='localhost', help='Host')
@click.option('--port', default=51236, help='Port')
@click.option('--port', default=51237, help='Port')
def main(augur_url, host, port):
""" Declares singular worker and creates the server and flask app that it will be running on
"""
Expand All @@ -56,16 +51,16 @@ def main(augur_url, host, port):
server = read_config("Server", use_main_config=1)

config = {
"id": "com.augurlabs.core.github_worker",
"id": "com.augurlabs.core.gh_repo_info_worker",
"broker_port": server['port'],
"zombie_id": credentials["zombie_id"],
#"zombie_id": credentials["zombie_id"],
"host": credentials["host"],
"key": credentials["key"],
"password": credentials["password"],
"port": credentials["port"],
"user": credentials["user"],
"database": credentials["database"],
"table": "repo_badging",
"table": "repo_info",
"endpoint": "https://bestpractices.coreinfrastructure.org/projects.json",
"display_name": "",
"description": "",
Expand All @@ -75,13 +70,13 @@ def main(augur_url, host, port):

#create instance of the worker

app.gh_worker = GitHubWorker(config) # declares the worker that will be running on this server with specified config
app.gh_repo_info_worker = GHRepoInfoWorker(config) # declares the worker that will be running on this server with specified config

create_server(app, None)
logging.info("Starting Flask App with pid: " + str(os.getpid()) + "...")
app.run(debug=app.debug, host=host, port=port)
if app.gh_worker._child is not None:
app.gh_worker._child.terminate()
if app.gh_repo_info_worker._child is not None:
app.gh_repo_info_worker._child.terminate()
try:
requests.post('http://localhost:{}/api/unstable/workers/remove'.format(server['port']), json={"id": config['id']})
except:
Expand Down Expand Up @@ -147,3 +142,6 @@ def read_config(section, name=None, environment_variable=None, default=None, con

__config = __default_config
return(__config[section][name])

if __name__ == "__main__":
main()
100 changes: 83 additions & 17 deletions workers/gh_repo_info_worker/gh_repo_info_worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import logging
import time
from datetime import datetime
import os
import sys

logging.basicConfig(filename='worker.log', level=logging.INFO, filemode='w')

Expand Down Expand Up @@ -38,6 +40,8 @@ def __init__(self, config, task=None):
self._task = task
self._child = None
self._queue = Queue()
self._maintain_queue = Queue()
self.working_on = None
self.config = config
self.db = None
self.table = None
Expand All @@ -54,7 +58,7 @@ def __init__(self, config, task=None):
self.rate_limit = int(response.headers['X-RateLimit-Remaining'])

specs = {
"id": "com.augurlabs.core.gh_repo_info",
"id": self.config['id'],
"location": "http://localhost:51237",
"qualifications": [
{
Expand All @@ -66,7 +70,7 @@ def __init__(self, config, task=None):
}

self.DB_STR = 'postgresql://{}:{}@{}:{}/{}'.format(
self.config['user'], self.config['password'], self.config['host'], self.config['port'], self.config['name']
self.config['user'], self.config['password'], self.config['host'], self.config['port'], self.config['database']
)

logging.info("Making database connections...")
Expand All @@ -83,7 +87,7 @@ def __init__(self, config, task=None):
helper_metadata = MetaData()

metadata.reflect(self.db, only=['repo_info'])
# helper_metadata.reflect(self.helper_db)
helper_metadata.reflect(self.helper_db)

Base = automap_base(metadata=metadata)

Expand All @@ -105,7 +109,12 @@ def __init__(self, config, task=None):
else:
self.info_id_inc = repo_info_start_id + 1

# requests.post('http://localhost:5000/api/unstable/workers', json=specs)
try:
requests.post('http://localhost:{}/api/unstable/workers'.format(
self.config['broker_port']), json=specs)
except requests.exceptions.ConnectionError:
logging.error('Cannot connect to the broker')
sys.exit('Cannot connect to the broker! Quitting...')

@property
def task(self):
Expand All @@ -120,30 +129,87 @@ def task(self):

# repos = pd.read_sql(repo_id_sql, self.db)

@task.setter
def task(self, value):
git_url = value['given']['git_url']

repo_url_SQL = s.sql.text("""
SELECT min(repo_id) AS repo_id
FROM repo
WHERE repo_git = :repo_git
""")

rs = pd.read_sql(repo_url_SQL, self.db, params={'repo_git': git_url})

try:
repo_id = int(rs.iloc[0]['repo_id'])
if value['job_type'] == 'UPDATE':
self._queue.put(CollectorTask('TASK', {"git_url": git_url, "repo_id": repo_id}))
elif value['job_type'] == 'MAINTAIN':
self._maintain_queue.put(CollectorTask('TASK', {"git_url": git_url, "repo_id": repo_id}))

if 'focused_task' in value:
if value['focused_task'] == 1:
self.finishing_task = True

except Exception as e:
logging.error("Error: {}, or that repo is not in our database: {}".format(str(e), str(value)))

self._task = CollectorTask('TASK', {"git_url": git_url, "repo_id": repo_id})
self.run()

def cancel(self):
""" Delete/cancel current task """
self._task = None

def run(self):
logging.info("Running...")
self._child = Process(target=self.collect, args=())
self._child.start()
if self._child is None:
self._child = Process(target=self.collect, args=())
self._child.start()
requests.post("http://localhost:{}/api/unstable/add_pids".format(
self.config['broker_port']), json={'pids': [self._child.pid, os.getpid()]})

def collect(self, repos=None):

if repos == None:
repo_id_sql = s.sql.text("""
SELECT repo_id, repo_git FROM repo
""")
while True:
if not self._queue.empty():
message = self._queue.get()
self.working_on = 'UPDATE'
else:
if not self._maintain_queue.empty():
message = self._queue.get()
logging.info("Popped off message: {}".format(str(message.entry_info)))
self.working_on = "MAINTAIN"
else:
break

if message.type == 'EXIT':
break

if message.type != 'TASK':
raise ValueError(f'{message.type} is not a recognized task type')

if message.type == 'TASK':
owner, repo = self.get_owner_repo(message.entry_info['git_url'])
logging.info(f'Querying: {owner}/{repo}')
self.query_repo_info(message.entry_info['repo_id'],
owner, repo)


# if repos == None:
# repo_id_sql = s.sql.text("""
# SELECT repo_id, repo_git FROM repo
# """)

repos = pd.read_sql(repo_id_sql, self.db)
# repos = pd.read_sql(repo_id_sql, self.db)

for _, row in repos.iterrows():
owner, repo = self.get_owner_repo(row['repo_git'])
print(f'Querying: {owner}/{repo}')
self.query_repo_info(row['repo_id'], owner, repo)
# for _, row in repos.iterrows():
# owner, repo = self.get_owner_repo(row['repo_git'])
# print(f'Querying: {owner}/{repo}')
# self.query_repo_info(row['repo_id'], owner, repo)

print(f'Added repo info for {self.results_counter} repos')
# print(f'Added repo info for {self.results_counter} repos')


def get_owner_repo(self, git_url):
Expand Down Expand Up @@ -200,7 +266,7 @@ def query_repo_info(self, repo_id, owner, repo):
self.update_rate_limit(r)
j = r.json()['data']['repository']
except Exception as e:
logging.error('Caught Exception:', str(e))
logging.error('Caught Exception: ' + str(e))

logging.info(f'Inserting repo info for repo with id:{repo_id}, owner:{owner}, name:{repo}')

Expand Down