diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..8a616341 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +.git +.gitignore +LICENSE.md +README.md +venv/ diff --git a/.github/workflows/new_stable_version_manual.yml b/.github/workflows/new_stable_version_manual.yml new file mode 100644 index 00000000..edb205d8 --- /dev/null +++ b/.github/workflows/new_stable_version_manual.yml @@ -0,0 +1,21 @@ +name: Create New Stable Version (Manual) + +on: + workflow_dispatch: + +jobs: + create_pull_request: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Create Pull Request + uses: peter-evans/create-pull-request@v3 + with: + token: ${{ secrets.GITHUB_TOKEN }} + branch: stable + base: main + title: Automated New Stable Version + body: This pull request was automatically created by the workflow and contains the latest stable version of the repository. diff --git a/.github/workflows/new_stable_version_push.yml b/.github/workflows/new_stable_version_push.yml new file mode 100644 index 00000000..dcd93311 --- /dev/null +++ b/.github/workflows/new_stable_version_push.yml @@ -0,0 +1,27 @@ +name: Create New Stable Version (Push) + +on: + push: + branches: + - main + +jobs: + create_pull_request: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Check commit message for [stable] + id: check_commit_message + run: echo ::set-output name=contains_stable::$(if grep -q "\[stable\]" <<< "$(git log --format=%B -n 1)"; then echo true; else echo false; fi) + + - name: Create Pull Request + if: steps.check_commit_message.outputs.contains_stable == 'true' + uses: peter-evans/create-pull-request@v3 + with: + token: ${{ secrets.GITHUB_TOKEN }} + branch: stable + base: main + title: Automated New Stable Version + body: This pull request was automatically created by the workflow and contains the latest stable version of the repository. diff --git a/.gitignore b/.gitignore index 12efe911..5d5db640 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,7 @@ /.pytest_cache/ /models/ + +postgres/__pycache__ +flask_app/__pycache__ +1_:memory: \ No newline at end of file diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 00000000..1b79b4b5 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,116 @@ +[MASTER] + +init-hook='import os, sys; sys.path.append(os.path.abspath(os.path.curdir))' +ignore=tests +load-plugins= +jobs=4 +unsafe-load-any-extension=no +extension-pkg-whitelist= + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED +confidence= + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where +# it should appear only once). See also the "--disable" option for examples. +#enable= + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifiers separated by comma (,) or put this +# option multiple times (only on the command line, not in the configuration +# file where it should appear only once).You can also use "--disable=all" to +# disable everything first and then reenable specific checks. For example, if +# you want to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use"--disable=all --enable=classes +# --disable=W" +disable=locally-disabled,too-few-public-methods,too-many-ancestors,useless-object-inheritance,useless-return,unnecessary-pass + + +[REPORTS] +output-format=text +reports=yes +evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10) +#msg-template= + +[LOGGING] +logging-modules=logging + +[SIMILARITIES] +min-similarity-lines=8 +ignore-comments=yes +ignore-imports=no + +[FORMAT] +max-line-length=100 +ignore-long-lines=^\s*(# )??$ +single-line-if-stmt=no + +max-module-lines=1000 +indent-string='\t' + +[SPELLING] +spelling-dict= +spelling-ignore-words= +spelling-private-dict-file= +spelling-store-unknown-words=no + + +[VARIABLES] +init-import=no +dummy-variables-rgx=(_[a-zA-Z0-9_]*?$) +additional-builtins= +callbacks=cb_,_cb,handle_,get,post,put,patch,delete,options +redefining-builtins-modules=six.moves,future.builtins + + +[TYPECHECK] +ignore-mixin-members=yes +ignored-modules=flask_sqlalchemy,app.extensions.flask_sqlalchemy +ignored-classes=optparse.Values,thread._local,_thread._local +generated-members=fget,query,begin,add,merge,delete,commit,rollback +contextmanager-decorators=contextlib.contextmanager + + +[MISCELLANEOUS] +notes=FIXME,XXX,TODO + + +[BASIC] +good-names=i,j,k,ex,Run,_,log,api +bad-names=foo,bar,baz,toto,tutu,tata + + +[ELIF] +max-nested-blocks=5 + + +[DESIGN] +max-args=5 +ignored-argument-names=_.* +max-bool-expr=5 + + +[IMPORTS] +deprecated-modules=optparse +import-graph= +ext-import-graph= +int-import-graph= +known-standard-library= +known-third-party=flask_restplus_patched +analyse-fallback-blocks=no + + +[CLASSES] +defining-attr-methods=__init__,__new__,setUp +valid-classmethod-first-arg=cls +valid-metaclass-classmethod-first-arg=mcs +exclude-protected=_asdict,_fields,_replace,_source,_make + + +[EXCEPTIONS] +overgeneral-exceptions=builtins.Exception \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..cca7bcaa --- /dev/null +++ b/Dockerfile @@ -0,0 +1,65 @@ +FROM python:3.9-slim-buster as build + +ENV PYTHONUNBUFFERED 1 +ENV PYTHONDONTWRITEBYTECODE 1 + +RUN apt-get update \ + # dependencies for building Python packages + && apt-get install -y build-essential \ + # psycopg2 dependencies + && apt-get install -y libpq-dev \ + # Additional dependencies + && apt-get install -y telnet netcat \ + # cleaning up unused files + && apt-get purge -y --auto-remove -o APT::AutoRemove::RecommendsImportant=false \ + && rm -rf /var/lib/apt/lists/* + +RUN mkdir /home/wannadb +WORKDIR /home/wannadb + +# update Pip +RUN pip install --upgrade pip + +# install torch +RUN pip install --use-pep517 torch==1.10.0 + +# Install dependencies +COPY core-requirements.txt core-requirements.txt +RUN pip install --use-pep517 -r core-requirements.txt +COPY backend-requirements.txt backend-requirements.txt +RUN pip install --use-pep517 -r backend-requirements.txt +################################## +## do not change above ## +## changes above cause ## +## long loading times ## +################################## + +# Run tests +RUN pip install --use-pep517 pytest +#RUN pytest + +FROM build as worker + +FROM build as worker-prod + +#copy the rest +COPY . . + + +FROM build as dev + +#CMD [ "python", "app.py" ] + +CMD ["mypy","--install-types", "--non-interactive"] + +CMD ["flask", "--app", "app", "--debug", "run","--host","0.0.0.0", "--port", "8000" ] + + +FROM worker-prod as prod + + +RUN chmod +x wannadb_web/entrypoint.sh + +# Define the entrypoint.sh +CMD ["sh","./wannadb_web/entrypoint.sh"] + diff --git a/README.md b/README.md index e9df914c..78401b5d 100644 --- a/README.md +++ b/README.md @@ -114,25 +114,23 @@ series = {SIGMOD '22} WannaDB is dually licensed under both AGPLv3 for the free usage by end users or the embedding in Open Source projects, and a commercial license for the integration in industrial projects and closed-source tool chains. More details can be found in [our licence agreement](LICENSE.md). - ## Availability of Code & Datasets We publish the source code four our system as discussed in the papers here. Additionally, we publish code to reproduce our experiments in a separate repository (coming soon). Unfortunately, we cannot publish the datasets online due to copyright issues. We will send them via email on request to everyone interested and hope they can be of benefit for other research, too. - ## Implementation details -The core of WannaDB (extraction and matching) was previously developed by us under the name [ASET (Ad-hoc Structured Exploration of Text Collections)](https://link.tuda.systems/aset). To better reflect the whole application cycle vision we present with this paper, we switchted the name to WannaDB. +The core of WannaDB (extraction and matching) was previously developed by us under the name [ASET (Ad-hoc Structured Exploration of Text Collections)](https://link.tuda.systems/aset). To better reflect the whole application cycle vision we present with this paper, we switchted the name to WannaDB. ### Repository structure This repository is structured as follows: -* `wannadb`, `wannadb_parsql`, and `wannadb_ui` contain the implementation of ASET and the GUI. -* `scripts` contains helpers, like a stand-alone preprocessing script. -* `tests` contains pytest tests. +- `wannadb`, `wannadb_parsql`, and `wannadb_ui` contain the implementation of ASET and the GUI. +- `scripts` contains helpers, like a stand-alone preprocessing script. +- `tests` contains pytest tests. ### Architecture: Core @@ -140,7 +138,7 @@ The core implementation of WannaDB is in the `wannadb` package and implemented a **Data model** -`data` contains WannaDB's data model. The entities are `InformationNugget`s, `Attribute`s, `Document`s, and the `DocumentBase`. +`data` contains WannaDB's data model. The entities are `InformationNugget`s, `Attribute`s, `Document`s, and the `DocumentBase`. A nugget is an information piece obtained from a document. An attribute is a table column that gets populated with information from the documents. A document is a textual document, and the document base is a collection of documents and provides facilities for `BSON` serialization, consistency checks, and data access. @@ -175,3 +173,33 @@ The `Statistics` object allows you to easily record information during runtime. ### Architecture: GUI The GUI implementation can be found in the `wannadb_ui` package. `wannadb_api.py` provides an asynchronous API for the `wannadb` library using PyQt's slots and signals mechanism. `main_window.py`, `document_base.py`, and `interactive_window.py` contain different parts of the user interface, and `common.py` contains base classes for some recurring user interface elements. + +--- + +# Start the Web-Backend docker build + +to build/start the production + +``` +docker compose -f "docker-compose-prod.yaml" build +docker compose -f "docker-compose-prod.yaml" up +``` + +for developers use + +``` +docker compose build +docker compose up +``` + +the flask and other services start automaticly. +for more information click [here](https://github.com/lw86ruwo/wannadbBackend/blob/main/WEBSERVER_STRUCTURE.md) + +so see all the routes and the structure of the webserver click [here](https://github.com/lw86ruwo/wannadbBackend/blob/main/ROUTES.md) + +you can use `code` to attach the container and then work in docker + +git only works when you install gh and make gh auth +then you can work as usual + +a docker rebuild is only necessary if dependencies have changed diff --git a/ROUTES.md b/ROUTES.md new file mode 100644 index 00000000..0af8b907 --- /dev/null +++ b/ROUTES.md @@ -0,0 +1,681 @@ +# Routes _wannadbBackend_ + +The Flask app is running by default on port 8000. Here we assume that the app is running on localhost. + +--- + + +- [User Routes](#User-Routes) +- [File Routes](#File-Routes) +- [Core Routes](#Core-routes) + + +--- +## User Routes + +- [Register a new user](#register-a-new-user) +- [Login as a user](#login-as-a-user) +- [Delete a user](#delete-a-user) +- [Create an organization](#create-an-organization) +- [Leave an organization](#leave-an-organization) +- [Get organizations for a user](#get-organizations-for-a-user) +- [Get organization name by ID](#get-organization-name-by-id) +- [Get organization names for a user](#get-organization-names-for-a-user) +- [Add a user to an organization](#add-a-user-to-an-organization) +- [Get members of an organization](#get-members-of-an-organization) +- [Get user name suggestions](#get-user-name-suggestions) + +### Register a new user. + +``` +http://localhost:8000/register +``` + +- Body + ```json + { + "username": "username", + "password": "password" + } + ``` +- 422 : User register **failed**: + ```json + { + "message": "User register failed" + } + ``` +- 201: User register **success**: + ```json + { + "message": "User registered successfully", + "token": "eyJhbGciOiJIUI1NiIsIn5cCI6IkpXVCJ9.ey1c2VyIjocGhpbEiLCJpZCIM30.v_lKLd0X-PABkRFXHZa..." + } + ``` + +--- + +### Login as a user. + +``` +http://localhost:8000/login +``` +- Body + ```json + { + "username": "username", + "password": "password" + } + ``` +- 401: User login **failed**: + ```json + { + "message": "Wrong Password" + } + ``` +- 200: User login **success**: + ```json + { + "message": "Log in successfully", + "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1..." + } + ``` + +--- + +### Delete a user. + +``` +http://localhost:8000/deleteUser/ +``` +- Body + ```json + { + "username": "username", + "password": "password" + } + ``` +- 401: No authorization provided. +- 400: Invalid authorization token. +- 401: User not authorized. +- 401: Wrong Password. +- 204: User deleted successfully. +- 409: User deletion failed. + +--- + +### Create an organization. +``` +http://localhost:8000/createOrganisation +``` +- Body + ```json + { + "organisationName": "organisation_name" + } + ``` +- 401: No authorization provided. +- 400: Invalid authorization token. +- 200: Organization created successfully. +- 409: Organization creation failed. + +--- + +### Leave an organization. + +``` +http://localhost:8000/leaveOrganisation +``` +- Body + ```json + { + "organisationId": "organisation_id" + } + ``` +- 401: No authorization provided. +- 400: Invalid authorization token. +- 200: User left the organization successfully. +- 500: Error leaving organization. + +--- + +### Get organizations for a user. +``` +http://localhost:8000/getOrganisations +``` +- Body + ```json + { + "authorization": "---authorization---jwt---" + } + ``` +- 401: No authorization provided. +- 400: Invalid authorization token. +- 200: Retrieved user's organizations successfully. + ```json + { + "organisation_ids": ["number"] + } + ``` +- 404: User is not in any organization. +- 409: Error retrieving organizations. + +--- + +### Get organization name by ID. +``` +http://localhost:8000/getOrganisationName/<_id> +``` +- URL + ```json + { + "_id": "organisation_id" + } + ``` +- Body + ```json + { + "authorization": "---authorization---jwt---" + } + ``` +- 401: No authorization provided. +- 400: Invalid authorization token. +- 200: Retrieved organization name successfully. + ```json + { + "organisation_name": ["string"] + } + ``` +- 404: Organization not found. +- 409: Error retrieving organization name. + +--- + +### Get organization names for a user. + +``` +http://localhost:8000/getOrganisationNames +``` + +- Header + ```json + { + "authorization": "---authorization---jwt---" + } + ``` +- 401: No authorization provided. +- 400: Invalid authorization token. +- 200: Retrieved user's organization names successfully. + ```json + { + "organisations": ["number"] + } + ``` +- 404: User is not in any organization. +- 409: Error retrieving organization names. + +--- + +### Add a user to an organization. + +``` +http://localhost:8000/addUserToOrganisation +``` +- Header + ```json + { + "authorization": "---authorization---jwt---" + } + ``` +- Body + ```json + { + "organisationId": "organisation_id", + "newUser": "new_user" + } + ``` +- 401: No authorization provided. +- 400: Invalid authorization token. +- 200: User added to the organization successfully. + ```json + { + "organisation_id": "number" + } + ``` +- 409: Error adding user to organization. + +--- + +### Get members of an organization. +``` +http://localhost:8000/getOrganisationMembers/<_id> +``` +- URL + ```json + { + "_id": "organisation_id" + } + ``` +- Header + ```json + { + "authorization": "---authorization---jwt---" + } + ``` +- 401: No authorization provided. +- 400: Invalid authorization token. +- 200: Retrieved organization members successfully. + ```json + { + "members": ["string"] + } + ``` +- 404: Organization not found. +- 409: Error retrieving organization members. + +--- + +### Get user name suggestions. +``` +http://localhost:8000/get/user/suggestion/<_prefix> +``` + +- URL + ```json + { + "_prefix": "organisation_id" + } + ``` +- 401: No authorization provided. +- 400: Invalid authorization token. +- 200: Retrieved username suggestions successfully. + ```json + { + "usernames": ["string"] + } + ``` + +--- +## File Routes + +- [Upload File](#upload-file) +- [Get Files](#get-files) +- [Get document base for an organization](#get-document-base-for-an-organization) +- [Update file content](#update-file-content) +- [Delete a file](#delete-a-file) +- [Get a file](#get-a-file) +--- + +### Upload File + +``` +http://localhost:8000/data/upload/file +``` + +- Form + - `file`: The file to upload. + - `organisationId`: ID of the organization. +- Header + ```json + { + "authorization": "---authorization---jwt---" + } + ``` +- 401: No authorization provided. +- 200: File uploaded successfully. + ```json + { + "document_ids": ["number"] + } + ``` +- 400: Invalid file type. + ```json + { + "document_ids": ["string"] + } + ``` +- 207: Multiple files uploaded, some with errors. + ```json + { + "document_ids": ["number|string"] + } + ``` + +--- + +### Get Files + +``` +http://localhost:8000/data/organization/get/files/<_id> +``` + +- URL + ```json + { + "_id": "organisation_id" + } + ``` +- Header + ```json + { + "authorization": "---authorization---jwt---" + } + ``` +- 401: No authorization provided. +- 200: Retrieved organization files successfully. + ```json + { + "documents": "id" + } + ``` + +--- + +### Get document base for an organization. + +``` +http://localhost:8000/data/organization/get/documentbase/<_id> +``` + +- URL + ```json + { + "_id": "organisation_id" + } + ``` +- Header + ```json + { + "authorization": "---authorization---jwt---" + } + ``` +- 401: No authorization provided. +- 200: Retrieved document base successfully. + ```json + { + "document_base": "document_base" + } + ``` + +--- + +### Update file content. + +``` +http://localhost:8000/data/update/file/content +``` + +- Header + ```json + { + "authorization": "---authorization---jwt---" + } + ``` +- Body + ```json + { + "documentId": "document_id", + "newContent": "new_content" + } + ``` +- 401: No authorization provided. +- 200: File content updated successfully. + ```json + { + "status": "bool" + } + ``` + +--- + +### Delete a file. + +``` +http://localhost:8000/data/file/delete +``` + +- Header + ```json + { + "authorization": "---authorization---jwt---" + } + ``` +- Body + ```json + { + "documentId": "document_id" + } + ``` +- 401: No authorization provided. +- 200: File deleted successfully. + ```json + { + "status": "bool" + } + ``` + +--- + +### Get a file. + +``` +http://localhost:8000/data/get/file/<_id> +``` + + +- URL + ```json + { + "_id": "document_id" + } + ``` +- Header + ```json + { + "authorization": "---authorization---jwt---" + } + ``` +- 401: No authorization provided. +- 200: Retrieved file successfully. + ```json + { + "document_ids": ["list", "string", "bytes"] + } + ``` +- 404: File not found. + ```json + { + "document_ids": [] + } + ``` +- 206: Partial content retrieved. + ```json + { + "document_ids": ["list", "string", "bytes"] + } + ``` + +-- + +## Core Routes + +This module defines Flask routes for the 'core' functionality of the Wannadb UI. + +- [Create a document base](#create-a-document-base) +- [Load a document base](#load-a-document-base) +- [Interactive document population](#interactive-document-population) +- [Add attributes to a document base](#add-attributes-to-a-document-base) +- [Update the attributes of a document base](#update-the-attributes-of-a-document-base) +- [Sort nuggets](#sort-nuggets) +- [Confirm a custom nugget](#confirm-a-custom-nugget) +- [Confirm a match nugget](#confirm-a-match-nugget) +- [Get document base for an organization](#get-document-base-for-an-organization) + + +--- + +### Create a document base + +``` +http://localhost:8000/core/create_document_base +``` + +- Form + - `authorization`: Your authorization token. + - `organisationId`: Your organization ID. + - `baseName`: Your document base name. + - `document_ids`: Comma-separated list of document IDs. + - `attributes`: Comma-separated list of attributes. +- 401: No authorization provided. +- 200: Document base created successfully. + ```json + {"task_id": "task_id"} + ``` + +--- + +### Load a document base. + +``` +http://localhost:8000/core/document_base/load +``` + + +- Form +- `authorization`: Your authorization token. +- `organisationId`: Your organization ID. +- `baseName`: Your document base name. +- 401: No authorization provided. +- 200: Document base loaded successfully. + ```json + {"task_id": "task_id"} + ``` + +--- + +### Interactive document population. + + +``` +http://localhost:8000/core/document_base/interactive +``` + +- Form + - `authorization`: Your authorization token. + - `organisationId`: Your organization ID. + - `baseName`: Your document base name. +- 401: No authorization provided. +- 200: Document base populated interactively. + ```json + {"task_id": "task_id"} + ``` + +--- + +### Add attributes to a document base. + +``` +http://localhost:8000/core/document_base/attributes/add +``` + + +- Form + - `authorization`: Your authorization token. + - `organisationId`: Your organization ID. + - `baseName`: Your document base name. + - `attributes`: Comma-separated list of attributes. +- 401: No authorization provided. +- 200: Attributes added to document base successfully. + ```json + {"task_id": "task_id"} + ``` + +--- + +### Update the attributes of a document base. + +``` +http://localhost:8000/core/document_base/attributes/update +``` + + +- Form + - `authorization`: Your authorization token. + - `organisationId`: Your organization ID. + - `baseName`: Your document base name. + - `attributes`: Comma-separated list of attributes. +- 401: No authorization provided. +- 200: Attributes updated successfully. + ```json + {"task_id": "task_id"} + ``` + +--- + +### Sort nuggets. + +``` +http://localhost:8000/core/document_base/order/nugget +``` + +- Form + - `authorization`: Your authorization token. + - `organisationId`: Your organization ID. + - `baseName`: Your document base name. + - `documentName`: Your document name. + - `documentContent`: Your document content. +- 401: No authorization provided. +- 200: Nuggets sorted successfully. + ```json + {"task_id": "task_id"} + ``` + +--- + +### Confirm a custom nugget. + +``` +http://localhost:8000/core/document_base/confirm/nugget/custom +``` + +- Form + - `authorization`: Your authorization token. + - `organisationId`: Your organization ID. + - `baseName`: Your document base name. + - `documentName`: Your document name. + - `documentContent`: Your document content. + - `nuggetText`: Nugget as text. + - `startIndex`: Start index of the nugget. + - `endIndex`: End index of the nugget. + - `interactiveCallTaskId`: Interactive call task ID. +- 401: No authorization provided. +- 200: Nugget confirmed successfully. + ```json + {"task_id": "task_id"} + ``` + +--- + +### Confirm a match nugget. + +``` +http://localhost:8000/core/document_base/confirm/nugget/match +``` + +- Form + - `authorization`: Your authorization token. + - `organisationId`: Your organization ID. + - `baseName`: Your document base name. + - `documentName`: Your document name. + - `documentContent`: Your document content. + - `nuggetText`: Nugget as text. + - `startIndex`: Start index of the nugget. + - `endIndex`: End index of the nugget. + - `interactiveCallTaskId`: Interactive call task ID. +- 401: No authorization provided. +- 200: Nugget confirmed successfully. + ```json + {"task_id": "task_id"} + ``` + + + diff --git a/WEBSERVER_STRUCTURE.md b/WEBSERVER_STRUCTURE.md new file mode 100644 index 00000000..c5ac67ef --- /dev/null +++ b/WEBSERVER_STRUCTURE.md @@ -0,0 +1,114 @@ +## File Structure + +``` +. +├── entrypoint.sh +├── util.py +├── __init__.py +├── .env +│ └── .dev +├── Postgres +│ ├── queries.py +│ ├── transactions.py +│ ├── util.py +│ └── __init__.py +├── Redis +│ ├── RedisCache.py +│ ├── util.py +│ └── __init__.py +├── routing +│ ├── core.py +│ ├── dev.py +│ ├── files.py +│ ├── user.py +│ └── __init__.py +├── SQLite +│ ├── Cache_DB.py +│ ├── util.py +│ └── __init__.py +└── worker + ├── data.py + ├── tasks.py + ├── util.py + ├── Web_API.py + └── __init__.py +``` + +### Web Root Directory + +``` +├── entrypoint.sh +├── util.py +├── __init__.py +└── .env + └── .dev +``` +this contains the entrypoint for the webserver, a utility file for general functionality for the whole project, +and a .env file for environment configuration. +a .prod should be added for production environment configuration if tis is needed. +for these changes needs also be done in the Dockerfile + +### Postgres +``` +Postgres +├── queries.py +├── transactions.py +├── util.py +└── __init__.py +``` +Directory related to PostgresSQL database functionality. +This is an abstraction layer for the database connection and queries. + +### Redis +``` +Redis +├── RedisCache.py +├── util.py +└── __init__.py +``` +Directory related to Redis cache functionality. +This is an abstraction layer for the Redis cache. +It also scopes the cache to a specific namespace for the users. + +### Routing +``` +routing +├── core.py +├── dev.py +├── files.py +├── user.py +└── __init__.py +``` +Directory related to routing functionality for the flask webserver. +- core.py: contains the routes for the main application routes. +- dev.py: contains the routes for developers. +- files.py: contains the routes for file uploads and downloads. +- user.py: contains the routes for user authentication and management. + + +### SQLite +``` +SQLite +│ ├── Cache_DB.py +│ ├── util.py +│ └── __init__.py +``` +Directory related to SQLite database functionality. +This is an abstraction layer for the database connection and queries. +It also scopes the DB to a specific namespace for the users. + + +### Worker +``` +worker + ├── data.py + ├── tasks.py + ├── util.py + ├── Web_API.py + └── __init__.py +``` +Directory related to worker functionality, background tasks and asynchronous processing. +- data.py: contains all altert or new types. +- tasks.py: contains all the tasks that are to be run in the background. +- util.py: contains utility functions for the worker. +- Web_API.py: contains the API for the worker to communicate with the core. diff --git a/app.py b/app.py new file mode 100644 index 00000000..a25b21e9 --- /dev/null +++ b/app.py @@ -0,0 +1,61 @@ +import logging +import os + +from flask import Flask, make_response, render_template_string +from flask_cors import CORS +#from flask_debugtoolbar import DebugToolbarExtension +from wannadb_web.routing.core import core_routes +from wannadb_web.routing.dev import dev_routes +from wannadb_web.routing.user import user_management +from wannadb_web.routing.files import main_routes + +logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") + +app = Flask(__name__) + + +# Combine Flask and Celery configs +app.config.from_mapping( + SECRET_KEY='secret!', + DEBUG=True, + DEBUG_TB_ENABLED=True, + DEBUG_TB_PROFILER_ENABLED=True, + broker_url=os.environ.get("CELERY_BROKER_URL"), + task_ignore_result=True, + PREFERRED_URL_SCHEME='https', + #PROPAGATE_EXCEPTIONS=True +) +app.config['DEBUG'] = True +# Register the Extensions +CORS(app) +#toolbar = DebugToolbarExtension(app) + + + +# Register the blueprints +app.register_blueprint(main_routes) +app.register_blueprint(user_management) +app.register_blueprint(dev_routes) +app.register_blueprint(core_routes) + + +@app.errorhandler(404) +def not_found_error(error): + return make_response({'error': f'Not Found \n {error}'}, 404) + + + + +@app.route('/') +@app.route('/DEBUG') +def index(): + html_code = """ + + +
+

hello

+
+ + + """ + return render_template_string(html_code) diff --git a/backend-requirements.txt b/backend-requirements.txt new file mode 100644 index 00000000..59ecf0f4 --- /dev/null +++ b/backend-requirements.txt @@ -0,0 +1,19 @@ +pip==23.3.2 +flask==3.0.0 +Flask_Cors==4.0.0 +gunicorn==21.2.0 +psycopg2~=2.9.9 +bcrypt==4.1.2 +PyJWT~=2.8.0 +wheel==0.42.0 +tornado~=6.4 +setuptools~=69.0.2 +werkzeug~=3.0.1 +pylint~=3.0.3 +flask_profiler~=1.8.1 +flask-debugtoolbar~=0.14.1 +celery~=5.3.6 +flower~=2.0.1 +redis~=5.0.1 +pickle5~=0.0.11 +mypy==1.5.1 diff --git a/celery_app.py b/celery_app.py new file mode 100644 index 00000000..73322a0c --- /dev/null +++ b/celery_app.py @@ -0,0 +1,26 @@ +import logging +import os + +from celery import Celery + +from wannadb_web.worker.tasks import BaseTask, DocumentBaseAddAttributes, DocumentBaseConfirmNugget, DocumentBaseForgetMatches, DocumentBaseForgetMatchesForAttribute, DocumentBaseGetOrderedNuggets, DocumentBaseInteractiveTablePopulation, DocumentBaseLoad, DocumentBaseRemoveAttributes, DocumentBaseUpdateAttributes, TestTask, InitManager, CreateDocumentBase + +logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") + +app = Celery(__name__) + +app.conf.broker_url = os.environ.get("CELERY_BROKER_URL") + +app.register_task(BaseTask) +app.register_task(TestTask) +app.register_task(InitManager) +app.register_task(CreateDocumentBase) +app.register_task(DocumentBaseLoad) +app.register_task(DocumentBaseAddAttributes) +app.register_task(DocumentBaseUpdateAttributes) +app.register_task(DocumentBaseRemoveAttributes) +app.register_task(DocumentBaseForgetMatches) +app.register_task(DocumentBaseForgetMatchesForAttribute) +app.register_task(DocumentBaseInteractiveTablePopulation) +app.register_task(DocumentBaseGetOrderedNuggets) +app.register_task(DocumentBaseConfirmNugget) diff --git a/requirements.txt b/core-requirements.txt similarity index 99% rename from requirements.txt rename to core-requirements.txt index 19f3bad0..42da5b8c 100644 --- a/requirements.txt +++ b/core-requirements.txt @@ -230,4 +230,4 @@ wasabi==0.10.1 # thinc # The following packages are considered to be unsafe in a requirements file: -# setuptools +# setuptools \ No newline at end of file diff --git a/docker-compose-prod.yaml b/docker-compose-prod.yaml new file mode 100644 index 00000000..d8558da1 --- /dev/null +++ b/docker-compose-prod.yaml @@ -0,0 +1,83 @@ +version: "3.6" +services: + wannadb: + build: + context: . + dockerfile: Dockerfile + target: prod + restart: always + tty: true + ports: + - "8000:8000" + env_file: + - wannadb_web/.env/.dev + depends_on: + - postgres + - redis + networks: + - mynetwork + + worker: + build: + context: . + dockerfile: Dockerfile + target: worker + tty: true + command: ["celery", "-A", "celery_app", "worker", "-l", "info"] + env_file: + - wannadb_web/.env/.dev + volumes: + - ./:/home/wannadb + networks: + - mynetwork + depends_on: + - wannadb + - redis + + flower: + build: + context: . + dockerfile: Dockerfile + target: worker + tty: true + command: ['celery', '-A', 'celery_app', 'flower'] + env_file: + - wannadb_web/.env/.dev + volumes: + - ./:/home/wannadb + networks: + - mynetwork + ports: + - "5555:5555" + depends_on: + - wannadb + - redis + + + postgres: + image: postgres + container_name: postgres-container + environment: + POSTGRES_PASSWORD: 0 + POSTGRES_DB: userManagement + networks: + - mynetwork + ports: + - "5432:5432" + volumes: + - pgdata:/var/lib/postgresql/data + + redis: + image: redis:alpine + container_name: redis-container + ports: + - "6379:6379" + networks: + - mynetwork + +networks: + mynetwork: + driver: bridge + +volumes: + pgdata: \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 00000000..a43b9ab6 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,84 @@ +version: '3.6' +services: + wannadb: + build: + context: . + dockerfile: Dockerfile + target: dev + restart: always + tty: true + ports: + - '8000:8000' + env_file: + - wannadb_web/.env/.dev + depends_on: + - postgres + - redis + volumes: + - ./:/home/wannadb + networks: + - mynetwork + + worker: + build: + context: . + dockerfile: Dockerfile + target: worker + tty: true + command: ['celery', '-A', 'celery_app', 'worker', '-l', 'info'] + env_file: + - wannadb_web/.env/.dev + volumes: + - ./:/home/wannadb + networks: + - mynetwork + depends_on: + - wannadb + - redis + + flower: + build: + context: . + dockerfile: Dockerfile + target: worker + tty: true + command: ['celery', '-A', 'celery_app', 'flower'] + env_file: + - wannadb_web/.env/.dev + volumes: + - ./:/home/wannadb + networks: + - mynetwork + ports: + - '5555:5555' + depends_on: + - wannadb + - redis + + postgres: + image: postgres + container_name: postgres-container + environment: + POSTGRES_PASSWORD: 0 + POSTGRES_DB: userManagement + networks: + - mynetwork + ports: + - '5432:5432' + volumes: + - pgdata:/var/lib/postgresql/data + + redis: + image: redis:alpine + container_name: redis-container + ports: + - '6379:6379' + networks: + - mynetwork + +networks: + mynetwork: + driver: bridge + +volumes: + pgdata: diff --git a/prod/build.sh b/prod/build.sh new file mode 100755 index 00000000..d4f29d8d --- /dev/null +++ b/prod/build.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker compose -f docker-compose-prod.yaml build \ No newline at end of file diff --git a/prod/rebuild.sh b/prod/rebuild.sh new file mode 100644 index 00000000..8ba67ab1 --- /dev/null +++ b/prod/rebuild.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +docker stop $(docker ps -a -q) +docker rm $(docker ps -a -q) + +docker compose -f docker-compose-prod.yaml build + +docker compose -f docker-compose-prod.yaml up -d diff --git a/prod/up.sh b/prod/up.sh new file mode 100755 index 00000000..55152aa8 --- /dev/null +++ b/prod/up.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker compose -f docker-compose-prod.yaml up -d \ No newline at end of file diff --git a/tmp/Web_API_Thread.py b/tmp/Web_API_Thread.py new file mode 100644 index 00000000..070f1675 --- /dev/null +++ b/tmp/Web_API_Thread.py @@ -0,0 +1,54 @@ +import logging +import threading +from datetime import datetime +from enum import Enum +from wannadb.data.data import Attribute, Document +from wannadb.statistics import Statistics +from wannadb_web.worker.Web_API import Web_API +from wannadb.resources import ResourceManager + +logger = logging.getLogger(__name__) + + +class Status(Enum): + """Gives the status of the application.""" + IDLE = 1 + RUNNING = 2 + CREATED = 3 + DEAD = 98 + ERROR = 99 + + +class Web_API_Thread(threading.Thread): + def __init__(self, thread_id): + super().__init__() + self.function = None + self.thread_id = thread_id + self.wannadb_web_api = Web_API() + self.event = threading.Event() + self.status = Status.IDLE + self.last_call = datetime.now() + self.exit_flag = False + + def run(self): + ResourceManager() + self.status = Status.RUNNING + while True: + if self.exit_flag: + self.status = Status.DEAD + logger.info(f"Thread {self.thread_id} exited") + return + self.event.wait() + self.event.clear() + if self.function is not None: + self.function() + self.last_call = datetime.now() + else: + raise Exception("No function set") + self.function = None + + def create_document_base(self, documents: [Document], attributes: [Attribute], statistics: Statistics): + if self.function is not None: + raise Exception("Function running") + self.function = lambda: self.wannadb_web_api.create_document_base_task(documents, attributes, statistics) + self.event.set() diff --git a/tmp/Web_Thread_Manager.py b/tmp/Web_Thread_Manager.py new file mode 100644 index 00000000..916382ce --- /dev/null +++ b/tmp/Web_Thread_Manager.py @@ -0,0 +1,48 @@ +import threading +import logging.config +import time +from datetime import datetime +from wannadb_web.worker.Web_API_Thread import Web_API_Thread + +logger = logging.getLogger(__name__) + + +class Web_Thread_Manager(threading.Thread): + def __init__(self, idle_time=60): + super().__init__() + logger.info("Web_Thread_Manager initialized") + self.idle_time = idle_time + self.threads: dict[int, Web_API_Thread] = {} + self.thread_limit = 2 + global web_Thread_Manager + web_Thread_Manager = self + + def run(self): + logger.info("Web_Thread_Manager running") + while True: + time.sleep(self.idle_time) + for thread_id, thread in self.threads.items(): + if not thread.is_alive(): + logger.info(f"Thread {thread_id} cleaned") + del self.threads[thread_id] + elif (datetime.now() - thread.last_call).total_seconds() > self.idle_time: + thread.exit_flag = True + + def access_thread(self, thread_id): + if thread_id not in self.threads: + logger.error("Thread not found") + raise threading.ThreadError("Thread not found") + logger.debug(f"Thread {thread_id} accessed") + return self.threads[thread_id] + + def new_thread(self, thread_id): + if thread_id in self.threads: + logger.debug(f"Thread {thread_id} already exists") + return self.threads[thread_id] + if len(self.threads) >= self.thread_limit: + logger.error("Thread limit reached") + raise threading.ThreadError("Thread limit reached") + thread = Web_API_Thread(thread_id) + thread.start() + logger.debug(f"Thread {thread_id} created and started") + return thread diff --git a/wannadb/configuration.py b/wannadb/configuration.py index a39c5391..04bef45a 100644 --- a/wannadb/configuration.py +++ b/wannadb/configuration.py @@ -247,9 +247,11 @@ def __call__( logger.info("Execute the pipeline.") tick: float = time.time() status_callback("Running the pipeline...", -1) + - for ix, pipeline_element in enumerate(self._pipeline_elements): - pipeline_element(document_base, interaction_callback, status_callback, statistics[f"pipeline-element-{ix}"]) + for i, pipeline_element in enumerate(self._pipeline_elements): + print(f"Running pipeline element {pipeline_element}...") + pipeline_element(document_base, interaction_callback, status_callback, statistics[f"pipeline-element-{str(i)}"]) status_callback("Running the pipeline...", 1) tack: float = time.time() diff --git a/wannadb/data/data.py b/wannadb/data/data.py index 3a802048..a791e8ce 100644 --- a/wannadb/data/data.py +++ b/wannadb/data/data.py @@ -152,6 +152,12 @@ def __hash__(self) -> int: def __eq__(self, other) -> bool: return isinstance(other, Attribute) and self._name == other._name and self._signals == other._signals + + def toJSON(self): + print("toJSON") + return { + "name": self._name + } @property def name(self) -> str: @@ -600,6 +606,7 @@ def to_bson(self) -> bytes: logger.info("Convert to BSON bytes.") bson_bytes: bytes = bson.encode(serializable_base) + #bson_bytes: bytes = bson.tokenEncode(serializable_base) tack: float = time.time() logger.info(f"Serialized document base in {tack - tick} seconds.") @@ -620,6 +627,7 @@ def from_bson(cls, bson_bytes: bytes) -> "DocumentBase": logger.info("Convert from BSON bytes.") serialized_base: Dict[str, Any] = bson.decode(bson_bytes) + #serialized_base: Dict[str, Any] = bson.tokenDecode(bson_bytes) # deserialize the document base document_base: "DocumentBase" = cls([], []) diff --git a/wannadb_web/.env/.dev b/wannadb_web/.env/.dev new file mode 100644 index 00000000..d257e7ce --- /dev/null +++ b/wannadb_web/.env/.dev @@ -0,0 +1,19 @@ +FLASK_DEBUG=1 +FLASK_CONFIG=development +DATABASE_HOST=postgres +DATABASE_PORT=5432 +DATABASE_NAME=userManagement +DATABASE_USER=postgres +DATABASE_PASSWORD=0 + +CACHE_HOST=redis +CACHE_PORT=6379 +CACHE_DB=0 +CACHE_PASSWORD=0 + +DATABASE_URL=postgresql://${DATABASE_USER}:${DATABASE_PASSWORD}@localhost:5432/postgres +SECRET_KEY=my_precious +CELERY_BROKER_URL=redis://redis-container:6379/0 +CELERY_RESULT_BACKEND=redis://redis-container:6379/0 + + diff --git a/wannadb_web/Redis/RedisCache.py b/wannadb_web/Redis/RedisCache.py new file mode 100644 index 00000000..e8c25c9f --- /dev/null +++ b/wannadb_web/Redis/RedisCache.py @@ -0,0 +1,58 @@ +from typing import Optional, Union +import logging + +from wannadb_web.Redis import util + +logger = logging.getLogger(__name__) + + +class RedisCache: + def __init__(self, user_id: str) -> None: + """Initialize the RedisCache instance for a specific user.""" + self.redis_client = util.connectRedis() + self.user_space_key = f"user:{user_id}" + + def set(self, key: str, value: Union[str, bytes, int, float]) -> None: + """Set a key-value pair in the user-specific space.""" + user_key = f"{self.user_space_key}:{key}" + self.redis_client.set(name=user_key, value=value) + + def sadd(self, key: str, *values: Union[str, bytes, int, float]) -> None: + """Set a key-value pair in the user-specific space.""" + user_key = f"{self.user_space_key}:{key}" + self.redis_client.sadd(name=user_key, value=values) + + def spop(self, key: str) -> Optional[set]: + """Set a key-value pair in the user-specific space.""" + user_key = f"{self.user_space_key}:{key}" + return self.redis_client.smembers(name=user_key) + + def get(self, key: str) -> Optional[Union[str, bytes, int, float]]: + """Get the value associated with a key in the user-specific space.""" + user_key = f"{self.user_space_key}:{key}" + return self.redis_client.get(user_key) + + def delete(self, key: str) -> None: + """Delete the key-value pair associated with a key in the user-specific space.""" + user_key = f"{self.user_space_key}:{key}" + self.redis_client.delete(user_key) + + def delete_user_space(self) -> None: + """Delete all entries associated with the user-specific space.""" + user_space_pattern = f"{self.user_space_key}:*" + + # Use SCAN to get all keys matching the pattern + keys_to_delete = [] + cursor = '0' + while cursor != 0: + cursor, keys = self.redis_client.scan(cursor=cursor, match=user_space_pattern) + keys_to_delete.extend(keys) + + # Delete all keys found + if keys_to_delete: + self.redis_client.delete(*keys_to_delete) + + def close(self) -> None: + """Close the Redis connection for the user-specific space.""" + self.redis_client.close() + self.redis_client = None diff --git a/wannadb_web/Redis/__init__.py b/wannadb_web/Redis/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/wannadb_web/Redis/util.py b/wannadb_web/Redis/util.py new file mode 100644 index 00000000..f2bd5d3d --- /dev/null +++ b/wannadb_web/Redis/util.py @@ -0,0 +1,26 @@ +import logging +import os + +import redis + +CACHE_HOST = os.environ.get("CACHE_HOST", "127.0.0.1") +CACHE_PORT = int(os.environ.get("CACHE_PORT", 6379)) +CACHE_DB = int(os.environ.get("CACHE_DB", 0)) +CACHE_PASSWORD = int(os.environ.get("CACHE_PASSWORD", 0)) + +print(CACHE_HOST, CACHE_PORT, CACHE_DB, CACHE_PASSWORD) + +logger = logging.getLogger(__name__) + + +def connectRedis(): + try: + redis_client = redis.Redis( + host=CACHE_HOST, + port=CACHE_PORT, + db=CACHE_DB, + password=CACHE_PASSWORD, + ) + return redis_client + except Exception as e: + raise Exception("Redis connection failed because:", e) diff --git a/wannadb_web/SQLite/Cache_DB.py b/wannadb_web/SQLite/Cache_DB.py new file mode 100644 index 00000000..8ae96af6 --- /dev/null +++ b/wannadb_web/SQLite/Cache_DB.py @@ -0,0 +1,46 @@ +import logging +from typing import Optional + +from wannadb_parsql.cache_db import SQLiteCacheDB + +logger = logging.getLogger(__name__) + + +class SQLiteCacheDBWrapper: + __cache_db: Optional[SQLiteCacheDB] + + def __init__(self, user_id: int, db_file="wannadb_cache.db"): + """Initialize the RedisCache instance for a specific user.""" + if db_file == ":memory:": + self.db_identifier = db_file + else: + self.db_identifier = f"{user_id}_{db_file}" + self.__cache_db = SQLiteCacheDB(db_file=self.db_identifier) + if self.cache_db.conn is None: + raise Exception("Cache db could not be initialized") + + @property + def cache_db(self): + if self.__cache_db is None: + raise Exception("Cache db is not initialized") + return self.__cache_db + + def delete(self): + self.cache_db.conn.close() + self.__cache_db = None + self.db_identifier = None + + def reset_cache_db(self): + logger.debug("Reset cache db") + if self.__cache_db is not None: + self.cache_db.conn.close() + self.__cache_db = None + self.__cache_db = SQLiteCacheDB(db_file=self.db_identifier) + + def disconnect(self): + if self.cache_db is None: + logger.error(f"Cache db {self.db_identifier} already deleted") + return False + logger.debug(f"Disconnect {self.db_identifier} from cache db") + self.cache_db.conn.close() + return True diff --git a/wannadb_web/SQLite/__init__.py b/wannadb_web/SQLite/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/wannadb_web/SQLite/util.py b/wannadb_web/SQLite/util.py new file mode 100644 index 00000000..ea9c157c --- /dev/null +++ b/wannadb_web/SQLite/util.py @@ -0,0 +1,31 @@ +import sqlite3 +from sqlite3 import Error + + +def create_connection(db_file, user_id): + """ create a database connection to the SQLite database + specified by db_file with user-specific identifier + :param db_file: general database file + :param user_id: user-specific identifier + :return: Connection object or None + """ + conn = None + try: + db_identifier = f"{db_file}_{user_id}" + conn = sqlite3.connect(db_identifier, check_same_thread=False) + conn.row_factory = sqlite3.Row + return conn + except Error as e: + print(e) + + return conn + + +def alter_table(conn, entry): + if entry["type"] is None: + entry["type"] = 'text' + sql = ''' ALTER TABLE {} ADD COLUMN {} {}'''.format(entry["table"], entry["attribute"], entry["type"]) + cur = conn.cursor() + cur.execute(sql) + conn.commit() + return cur.lastrowid diff --git a/wannadb_web/__init__.py b/wannadb_web/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/wannadb_web/__pycache__/__init__.cpython-39.pyc b/wannadb_web/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 00000000..a13ac6b3 Binary files /dev/null and b/wannadb_web/__pycache__/__init__.cpython-39.pyc differ diff --git a/wannadb_web/entrypoint.sh b/wannadb_web/entrypoint.sh new file mode 100644 index 00000000..4126afb1 --- /dev/null +++ b/wannadb_web/entrypoint.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +# Create and activate the virtual environment +python -m venv venv +. venv/bin/activate +export PYTHONPATH="." + +pytest + +gunicorn -w 4 --bind 0.0.0.0:8000 app:app \ No newline at end of file diff --git a/wannadb_web/postgres/queries.py b/wannadb_web/postgres/queries.py new file mode 100644 index 00000000..47ac5457 --- /dev/null +++ b/wannadb_web/postgres/queries.py @@ -0,0 +1,346 @@ +from typing import Union + +import bcrypt +from psycopg2 import sql + +from wannadb_web.postgres.util import execute_query, execute_transaction + + +def getUserID(user: str): + select_query = sql.SQL("SELECT id FROM users WHERE username = %s;") + result = execute_query(select_query, (user,)) + if isinstance(result[0], int): + return int(result[0]) + return None + + +def getOrganisationID(organisation_name: str): + select_query = sql.SQL("SELECT id FROM organisations WHERE name = %s;") + return execute_query(select_query, (organisation_name,)) + + +def getOrganisationName(organisation_id: int): + select_query = sql.SQL("SELECT name FROM organisations WHERE id = %s;") + response = execute_query(select_query, (organisation_id,)) + if response is None: + return -1 + return str(response[0]) + + +def getMembersOfOrganisation(organisation_id: int): + select_query = sql.SQL( + "SELECT username FROM users WHERE id IN (SELECT userid FROM membership WHERE organisationid = %s);") + return execute_query(select_query, (organisation_id,)) + + +def getMemberIDsFromOrganisationID(organisationID: int): + select_query = sql.SQL("SELECT userid FROM membership WHERE organisationid = %s;") + return execute_query(select_query, (organisationID,)) + + +def getUserNameSuggestion(prefix: str): + select_query = sql.SQL("SELECT username FROM users WHERE username LIKE %s;") + return execute_query(select_query, (prefix + "%",)) + + +def getOrganisationIDsFromUserId(userID: int): + try: + select_query = sql.SQL("SELECT organisationid FROM membership WHERE userid = %s;") + response = execute_query(select_query, (userID,)) + if isinstance(response, list): + return response[0], None + elif response is None: + return [-1], None + else: + return None, "Unexpected response format" + + except Exception as e: + return None, e + + +def getOrganisationFromUserId(user_id: int): + try: + select_query = sql.SQL(""" SELECT organisationid, o.name + FROM membership + JOIN organisations o ON membership.organisationid = o.id + WHERE userid = %s;""") + response = execute_query(select_query, (user_id,)) + if isinstance(response, list): + organisations: list[dict[str, Union[str, int]]] = [] + for org in response: + organisations.append({"id": int(org[0]), "name": str(org[1])}) + return organisations, None + if response is None: + return [], None + return None, "Unexpected response format" + except Exception as e: + return None, e + + +def checkPassword(user: str, password: str): + """Checks if the password is correct for the given user + + Returns: + user_id: int (if password is correct) + False: bool (if password is incorrect) + Exception: Exception (if something went wrong) + Raises: + None + """ + select_query = sql.SQL("SELECT password,id as pw FROM users WHERE username = %s ") + + result = execute_query(select_query, (user,)) + _password, _id = result[0] + + if _password: + stored_password = bytes(_password) + check = bcrypt.checkpw(password.encode('utf-8'), stored_password) + if check: + return int(_id) + + return False + + +def checkOrganisationAuthorisation(organisationName: str, userName: str): + select_query = sql.SQL("SELECT authorisation from membership " + "where userid = (SELECT id from users where username = (%s)) " + "and " + "organisationid = (Select id from organisations where name = (%s))") + + result = execute_query(select_query, (organisationName, userName)) + if isinstance(result[0], int): + return int(result[0]) + if result[0] is None: + return None + + +def _getDocument(documentId: int): + select_query = sql.SQL("""SELECT content,content_byte + from documents + where id = (%s)""") + + result = execute_query(select_query, (documentId,)) + + if result[0]: + if result[0][0]: + content = result[0][0] + return str(content) + else: + content = result[0][1] + return bytes(content) + else: + return None + + +def getDocument_by_name(document_name: str, organisation_id: int, user_id: int) -> tuple[str, Union[str, bytes]]: + """ + Returns: + name: str + content: str or bytes + + Raises: + Exception: if no document with that name is found + Exception: if multiple documents with that name are found + """ + + select_query = sql.SQL("""SELECT id,content,content_byte + + FROM documents d + JOIN membership m ON d.organisationid = m.organisationid + WHERE d.name = (%s) AND m.userid = (%s) AND m.organisationid = (%s) + """) + + result = execute_query(select_query, (document_name, user_id, organisation_id,)) + if len(result) == 1: + document = result[0] + id = document[0] + if document[1]: + content = document[1] + return str(id), str(content) + elif document[2]: + content = document[2] + return str(id), bytes(content) + elif len(result) > 1: + raise Exception("Multiple documents with the same name found") + raise Exception("No document with that name found") + + +def getDocument(document_id: int, user_id: int): + select_query = sql.SQL("""SELECT name,content,content_byte + FROM documents + JOIN membership m ON documents.organisationid = m.organisationid + WHERE id = (%s) AND m.userid = (%s) + """) + + result = execute_query(select_query, (document_id, user_id,)) + if len(result) > 0: + for document in result: + name = document[0] + if document[1]: + content = document[1] + return str(name), str(content) + elif document[2]: + content = document[2] + return str(name), bytes(content) + else: + return None + +def getDocumentByNameAndContent(doc_name: str, doc_content: str, user_id: int): + select_query = sql.SQL(""" SELECT name,content,content_byte + FROM documents + JOIN membership m ON documents.organisationid = m.organisationid + WHERE name = (%s) AND m.userid = (%s) + """) + #AND content LIKE %(%s)% + + result = execute_query(select_query, (doc_name, user_id, )) + if len(result) > 0: + for document in result: + name = document[0] + if document[1]: + content = document[1] + return str(name), str(content) + elif document[2]: + content = document[2] + return str(name), bytes(content) + else: + return None + + +def getDocumentsForOrganization(organisation_id: int): + select_query = sql.SQL("""SELECT id, name,content,content_byte + FROM documents + WHERE organisationid = (%s) + """) + result = execute_query(select_query, (organisation_id,)) + + if result is None or len(result) == 0: + return [] + + doc_array = [] + + for document in result: + id = document[0] + name = document[1] + if document[2] == None: + continue + content = document[2] + doc_array.append({ + "id": id, + "name": name, + "content": content + }) + return doc_array + +def getDocumentBaseForOrganization(organisation_id: int): + + select_query = sql.SQL("""SELECT id, name,content,content_byte + FROM documents + + WHERE organisationid = (%s) + """) + result = execute_query(select_query, (organisation_id,)) + + if result is None or len(result) == 0: + return [] + + doc_array = [] + + for document in result: + id = document[0] + name = document[1] + if document[3] == None: + continue + content = document[3] + doc_array.append({ + "id": id, + "name": name, + }) + return doc_array + + +def updateDocumentContent(doc_id: int, new_content): + try: + select_query = sql.SQL("""SELECT content, content_byte + FROM documents + WHERE id = (%s) + """) + result = execute_query(select_query, (doc_id,)) + if result == None or len(result) == 0: + return False + content_type = "content" + if result[0][0] == None: + content_type = "content_byte" + update_query = sql.SQL("UPDATE documents SET " + content_type + " = (%s) WHERE id = (%s)") + execute_transaction(update_query, (new_content, doc_id,), commit=True, fetch=False) + return True + except Exception as e: + print("updateDocumentContent failed because:\n", e) + return False + + +def deleteDocumentContent(doc_id: int): + try: + delete_query = sql.SQL("""DELETE + FROM documents + WHERE id = (%s) + """) + execute_transaction(delete_query, (doc_id,), commit=True, fetch=False) + return True + except Exception as e: + print("updateDocumentContent failed because:\n", e) + return False + + +def getDocuments(document_ids: list[int], user_id: int): + select_query = sql.SQL(f"""SELECT name,content,content_byte + FROM documents + JOIN membership m ON documents.organisationid = m.organisationid + WHERE m.userid = (%s) and documents.id in + ({",".join(str(_id) for _id in document_ids)}) + """) + result = execute_query(select_query, (user_id,)) + if isinstance(result, list) and isinstance(result[0], tuple): + if len(result) > 0: + if result[0][1]: + documents = [] + for document in result: + name = document[0] + content = document[1] + documents.append((str(name), str(content))) + return documents + elif result[0][2]: + b_documents = [] + for document in result: + name = document[0] + content = document[2] + b_documents.append((str(name), bytes(content))) + return b_documents + return [(None,None)] + + +def getDocument_ids(organisation_id: int, user_id: int): + select_query = sql.SQL("""SELECT name,content,content_byte + from documents + join membership m on documents.organisationid = m.organisationid + where m.organisationid = (%s) and m.userid = (%s) + """) + + result = execute_query(select_query, (organisation_id, user_id,)) + if isinstance(result, list) and isinstance(result[0], tuple): + if len(result) > 0: + if result[0][1]: + documents = [] + for document in result: + name = document[0] + content = document[1] + documents.append((str(name), str(content))) + return documents + elif result[0][2]: + b_documents = [] + for document in result: + name = document[0] + content = document[2] + b_documents.append((str(name), bytes(content))) + return b_documents + return [] diff --git a/wannadb_web/postgres/transactions.py b/wannadb_web/postgres/transactions.py new file mode 100644 index 00000000..69a6bb41 --- /dev/null +++ b/wannadb_web/postgres/transactions.py @@ -0,0 +1,376 @@ +import logging +from typing import Union + +import bcrypt +from psycopg2 import sql, IntegrityError +from wannadb_web.util import Token, Authorisation, tokenDecode +from wannadb_web.postgres.queries import checkPassword +from wannadb_web.postgres.util import execute_transaction + +logger: logging.Logger = logging.getLogger(__name__) + + +# WARNING: This is only for development purposes! + +def createSchema(schema): + """ + Returns: None + """ + create_schema_query = sql.SQL(f"CREATE SCHEMA IF NOT EXISTS {schema};") + execute_transaction(create_schema_query, commit=True, fetch=False) + logger.info(f"Schema {schema} created successfully.") + + +def dropSchema(schema): + """ + Returns: None + """ + drop_schema_query = sql.SQL(f"DROP SCHEMA IF EXISTS {schema} CASCADE;") + execute_transaction(drop_schema_query, commit=True, fetch=False) + logger.info(f"Schema {schema} dropped successfully.") + + +def dropTables(schema): + """ + Returns: None + """ + drop_table_query = sql.SQL(f"DROP TABLE IF EXISTS {schema}.users CASCADE;\n" + f"DROP TABLE IF EXISTS {schema}.documents CASCADE;\n" + f"DROP TABLE IF EXISTS {schema}.membership CASCADE;\n" + f"DROP TABLE IF EXISTS {schema}.organisations CASCADE;") + execute_transaction(drop_table_query, commit=True) + + +def createUserTable(schema): + create_table_query = sql.SQL(f"""CREATE TABLE IF NOT EXISTS {schema}.users + ( + id bigint NOT NULL GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 START 1 MINVALUE 1 MAXVALUE 9223372036854775807 CACHE 1 ), + username text COLLATE pg_catalog."default" NOT NULL, + password bytea NOT NULL, + CONSTRAINT userid PRIMARY KEY (id), + CONSTRAINT unique_username UNIQUE (username) + ) + + TABLESPACE pg_default; + """) + execute_transaction(create_table_query, commit=True, fetch=False) + + +def createDocumentsTable(schema): + create_table_query = sql.SQL(f"""CREATE TABLE IF NOT EXISTS {schema}.documents + ( + id bigint NOT NULL GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 START 1 MINVALUE 1 MAXVALUE 9223372036854775807 CACHE 1 ), + name text NOT NULL, + content text , + content_byte bytea, + organisationid bigint NOT NULL, + userid bigint NOT NULL, + CONSTRAINT dokumentid PRIMARY KEY (id), + CONSTRAINT documents_organisationid_fkey FOREIGN KEY (organisationid) + REFERENCES {schema}.organisations (id) MATCH SIMPLE + ON UPDATE CASCADE + ON DELETE CASCADE + NOT VALID, + CONSTRAINT documents_userid_fkey FOREIGN KEY (userid) + REFERENCES {schema}.users (id) MATCH SIMPLE + ON UPDATE CASCADE + ON DELETE CASCADE + NOT VALID + ) + + TABLESPACE pg_default;""") + execute_transaction(create_table_query, commit=True, fetch=False) + + +def createMembershipTable(schema): + create_table_query = sql.SQL(f"""CREATE TABLE IF NOT EXISTS {schema}.membership +( + userid bigint NOT NULL, + organisationid bigint NOT NULL, + authorisation bigint NOT NULL DEFAULT 0, + CONSTRAINT membership_pkey PRIMARY KEY (userid, organisationid), + CONSTRAINT membership_organisationid_fkey FOREIGN KEY (organisationid) + REFERENCES {schema}.organisations (id) MATCH SIMPLE + ON UPDATE CASCADE + ON DELETE CASCADE + NOT VALID, + CONSTRAINT membership_userid_fkey FOREIGN KEY (userid) + REFERENCES {schema}.users (id) MATCH SIMPLE + ON UPDATE CASCADE + ON DELETE CASCADE + NOT VALID +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS {schema}.membership + OWNER to postgres; +-- Index: fki_organisationid + +-- DROP INDEX IF EXISTS {schema}.fki_organisationid; + +CREATE INDEX IF NOT EXISTS fki_organisationid + ON {schema}.membership USING btree + (organisationid ASC NULLS LAST) + TABLESPACE pg_default;""") + execute_transaction(create_table_query, commit=True, fetch=False) + + +def createOrganisationTable(schema): + create_table_query = sql.SQL(f"""CREATE TABLE IF NOT EXISTS {schema}.organisations +( + id bigint NOT NULL GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 START 1 MINVALUE 1 MAXVALUE 9223372036854775807 CACHE 1 ), + name text COLLATE pg_catalog."default" NOT NULL, + CONSTRAINT organisationid PRIMARY KEY (id), + CONSTRAINT organisations_name_key UNIQUE (name) +) + +TABLESPACE pg_default; + +""") + execute_transaction(create_table_query, commit=True, fetch=False) + + +def addUser(user: str, password: str): + """ + + Returns: int (user id) + + Raises: Exception + + """ + + pwBytes = password.encode('utf-8') + salt = bcrypt.gensalt() + pwHash = bcrypt.hashpw(pwBytes, salt) + # Needed this for the correct password check don't know why... + pwHashcode = pwHash.decode('utf-8') + + insert_data_query = sql.SQL("INSERT INTO users (username, password) VALUES (%s, %s) returning id;") + data_to_insert = (user, pwHashcode) + response = execute_transaction(insert_data_query, data_to_insert, commit=True) + if response is IntegrityError: + raise IntegrityError("User already exists") + if isinstance(response[0][0], int): + return int(response[0][0]) + raise Exception("addUser failed because: \n", response) + + +def changePassword(user: str, old_password: str, new_password: str): + try: + if old_password == new_password: + return False + + pwcheck = checkPassword(user, old_password) + if isinstance(pwcheck, Exception): + raise pwcheck + if isinstance(pwcheck, bool): + return bool(pwcheck) + if isinstance(pwcheck, int): + _ = int(pwcheck) + + pwBytes = new_password.encode('utf-8') + salt = bcrypt.gensalt() + pwHash = bcrypt.hashpw(pwBytes, salt) + + update_query = sql.SQL("UPDATE users SET password = %s WHERE username = %s;") + execute_transaction(update_query, (pwHash, user), commit=True) + + except Exception as e: + print("changePassword failed because: \n", e) + + +def deleteUser(user: str, password: str): + pwcheck = checkPassword(user, password) + if isinstance(pwcheck, Exception): + raise pwcheck + if isinstance(pwcheck, bool): + return bool(pwcheck) + if isinstance(pwcheck, int): + user_id = int(pwcheck) + delete_query = sql.SQL("""DELETE FROM users WHERE id = %s""") + response = execute_transaction(delete_query, (user_id,), commit=True, fetch=False) + if isinstance(response, bool): + return response + + +def addOrganisation(organisationName: str, sessionToken: str): + try: + token: Token = tokenDecode(sessionToken) + userid = token.id + insert_query = sql.SQL("with a as (INSERT INTO organisations (name) VALUES (%s) returning id) " + "INSERT INTO membership (userid,organisationid) select (%s),id from a returning organisationid") + organisation_id = execute_transaction(insert_query, (organisationName, userid), commit=True) + organisation_id = int(organisation_id[0][0]) + return organisation_id, None + + except IntegrityError: + return None, "name already exists." + + except Exception as e: + print("addOrganisation failed because: \n", e) + + +def leaveOrganisation(organisationId: int, sessionToken: str): + try: + token: Token = tokenDecode(sessionToken) + userid = token.id + + count_query = sql.SQL("SELECT COUNT(*) FROM membership WHERE userid = (%s) AND organisationid = (%s)") + count = execute_transaction(count_query, (userid, organisationId,), commit=True) + count = int(count[0][0]) + if count != 1: + return False, "You are not in this organisation" + + delete_query = sql.SQL( + "DELETE FROM membership WHERE userid = (%s) AND organisationid = (%s) returning organisationid") + execute_transaction(delete_query, (userid, organisationId,), commit=True) + + count_query = sql.SQL("SELECT COUNT(*) FROM membership WHERE organisationid = (%s)") + count = execute_transaction(count_query, [organisationId], commit=True) + count = int(count[0][0]) + if count > 0: + return True, None + + delete_query = sql.SQL("DELETE FROM organisations WHERE id = (%s)") + execute_transaction(delete_query, [organisationId], commit=True, fetch=False) + return True, None + except Exception as e: + print("leaveOrganisation failed because: \n", e) + return False, e + + +def addUserToOrganisation(organisationName: str, sessionToken: str, newUser: str): + try: + token: Token = tokenDecode(sessionToken) + userid = token.id + + insert_query = sql.SQL("""WITH addUser AS ( + SELECT id + FROM users + WHERE username = (%s) -- new User string + ), + ismemberandadmin as ( + SELECT organisationid + from membership + WHERE organisationid = (SELECT id FROM organisations WHERE name = (%s)) -- org name string + and userid = (%s) -- user id int + and authorisation < (%s) -- is minimum permission + ) +INSERT INTO membership (userid, organisationid) + SELECT a.id, m.organisationid + FROM addUser a, ismemberandadmin m + returning organisationid""") + + organisation_id = execute_transaction(insert_query, + (newUser, organisationName, userid, + str(Authorisation.Admin.value)), commit=True) + if organisation_id is None: + return None, "you have no privileges in this organisation" + + return int(organisation_id), None + + except IntegrityError: + return None, "name already exists." + + except Exception as e: + print("addUserToOrganisation failed because: \n", e) + + +def addUserToOrganisation2(organisationId: int, newUser: str): + try: + select_id_query = sql.SQL("SELECT id FROM users WHERE username = (%s)") + userid = execute_transaction(select_id_query, (newUser,), commit=True) + if userid is None: + return None, "User does not exist" + + insert_query = sql.SQL( + "INSERT INTO membership (userid, organisationid) VALUES (%s, %s) returning organisationid") + organisation_id = execute_transaction(insert_query, (userid[0][0], organisationId), commit=True) + if organisation_id is None: + return None, "you have no privileges in this organisation" + return int(organisation_id[0][0]), None + except IntegrityError: + return None, "User already in organisation" + except Exception as e: + print("addUserToOrganisation2w failed because: \n", e) + return None, 'Unknown error' + + +def removeUserFromOrganisation(organisationName: str, sessionToken: str, userToRemove: str): + try: + token: Token = tokenDecode(sessionToken) + userid = token.id + + delete_query = sql.SQL(""" + DELETE FROM membership + USING ( + SELECT userid, organisationid + FROM membership + WHERE organisationid = (SELECT id FROM organisations WHERE name = %s) + ) AS org + WHERE membership.organisationid = org.organisationid + AND membership.userid = (SELECT id FROM users WHERE username = %s) + AND membership.authorisation >= %s + AND %s >= %s + """) + + execute_transaction(delete_query, (organisationName, userToRemove, userid, userid, + str(Authorisation.Admin.value), userid), + commit=True) + + except Exception as e: + print("removeUserFromOrganisation failed because: \n", e) + + +def adjUserAuthorisation(organisationName: str, sessionToken: str, userToAdjust: str, newAuthorisation: int): + try: + token: Token = tokenDecode(sessionToken) + author_userid = token.id + + update_query = sql.SQL(""" + UPDATE membership + SET authorisation = %s + FROM ( + SELECT userid, organisationid, authorisation + FROM membership + WHERE organisationid = (SELECT id FROM organisations WHERE name = %s) + ) AS org + WHERE membership.organisationid = org.organisationid + AND membership.userid = (SELECT id FROM users WHERE username = %s) + AND org.authorisation >= %s -- Ensure the admin has higher or equal authorization + AND org.authorisation > %s -- Ensure the admin has higher authorization than Member + AND org.authorisation >= %s -- Ensure the new authorization is not higher than admin's + """) + + execute_transaction(update_query, (newAuthorisation, organisationName, userToAdjust, + str(Authorisation.Admin.value), str(Authorisation.Member.value), + author_userid), + commit=True) + + except Exception as e: + print("adjUserAuthorisation failed because: \n", e) + + +def addDocument(name: str, content: Union[str, bytes], organisationId: int, userid: int): + try: + + if isinstance(content, str): + insert_data_query = sql.SQL("INSERT INTO documents (name, content, organisationid, userid) " + "VALUES (%s, %s, %s, %s) returning id;") + string_data_to_insert = (name, content, organisationId, userid) + response = execute_transaction(insert_data_query, string_data_to_insert, commit=True) + return int(response[0][0]) + elif isinstance(content, bytes): + insert_data_query = sql.SQL("INSERT INTO documents (name, content_byte, organisationid, userid) " + "VALUES (%s, %s, %s, %s) returning id;") + byte_data_to_insert = (name, content, organisationId, userid) + response = execute_transaction(insert_data_query, byte_data_to_insert, commit=True) + return int(response[0][0]) + + except IntegrityError as i: + logger.error(str(i)) + return -1 + + except Exception as e: + logger.error(str(e)) diff --git a/wannadb_web/postgres/util.py b/wannadb_web/postgres/util.py new file mode 100644 index 00000000..1538fb6b --- /dev/null +++ b/wannadb_web/postgres/util.py @@ -0,0 +1,125 @@ +import logging +import os +from sqlite3 import OperationalError + +import psycopg2 +from psycopg2 import extensions, IntegrityError +from psycopg2.sql import SQL + +DB_NAME = os.environ.get("DATABASE_NAME") +DB_USER = os.environ.get("DATABASE_USER") +DB_PASSWORD = os.environ.get("DATABASE_PASSWORD") +DB_HOST = os.environ.get("DATABASE_HOST") +DB_PORT = os.environ.get("DATABASE_PORT") + + +# DB_NAME = "userManagement" +# DB_USER = "postgres" +# DB_PASSWORD = "0" +# DB_HOST = "127.0.0.1" +# DB_PORT = "5432" + + +def connectPG(): + """ + Connect to the PostgreSQL database server + + Returns: + conn (psycopg2 connection object) + + Raise: + OperationalError (if connection fails) + """ + + try: + conn = psycopg2.connect( + dbname=DB_NAME, + user=DB_USER, + password=DB_PASSWORD, + host=DB_HOST, + port=DB_PORT) + return conn + except OperationalError as e: + raise OperationalError("Connection failed because: \n", e) + + +def execute_transaction(query, params=None, commit=False, fetch=True): + """Execute a query and return the result + + Returns: + list of tuples (if successful), + None (if no result), + False (if error) + Raises: + i IntegrityError + """ + + conn = None + cur = None + try: + conn = connectPG() + cur = conn.cursor() + + cur.execute(query, params) + + if commit: + conn.commit() + + if fetch: + result = cur.fetchall() + return result if result else None + return True + + except IntegrityError as e: + raise IntegrityError(f"Query execution failed for transaction: {query} \nParams: {params} \nError: {e}") + + except Exception as e: + logging.error(f"Query execution failed for query:\n" + f"{query} \n" + f"Params: {params} \n" + f"Error: {e}") + return False + finally: + if conn: + conn.close() + if cur: + cur.close() + + +def execute_query(query: SQL, params=None): + """Execute a query and return the result + + Returns: + list of tuples (if successful), + None (if no result), + False (if error) + Raises: + None + """ + + conn = None + cur = None + try: + conn = connectPG() + conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cur = conn.cursor() + + cur.execute(query, params) + result = cur.fetchall() + if not result: + return None + + return result + + except Exception as e: + logging.error(f"Query execution failed for query:\n" + f"{query} \n" + f"Params: {params} \n" + f"Error: {e}") + return False + + finally: + if conn: + conn.close() + if cur: + cur.close() diff --git a/wannadb_web/routing/__init__.py b/wannadb_web/routing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/wannadb_web/routing/__pycache__/__init__.cpython-39.pyc b/wannadb_web/routing/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 00000000..7bafeb70 Binary files /dev/null and b/wannadb_web/routing/__pycache__/__init__.cpython-39.pyc differ diff --git a/wannadb_web/routing/__pycache__/core.cpython-39.pyc b/wannadb_web/routing/__pycache__/core.cpython-39.pyc new file mode 100644 index 00000000..04687dd2 Binary files /dev/null and b/wannadb_web/routing/__pycache__/core.cpython-39.pyc differ diff --git a/wannadb_web/routing/core.py b/wannadb_web/routing/core.py new file mode 100644 index 00000000..9581800e --- /dev/null +++ b/wannadb_web/routing/core.py @@ -0,0 +1,468 @@ +""" +core_routes Module + +This module defines Flask routes for the 'core' functionality of the Wannadb UI. + +It includes a Blueprint named 'core_routes' with routes related to creating document bases. + +Routes: + - /core/create_document_base (POST): Endpoint for creating a document base. + + +Dependencies: + - Flask: Web framework for handling HTTP requests and responses. + - config.tokenDecode: Function for decoding authorization tokens. + - wannadb_ui.wannadb_api.WannaDBAPI: API for interacting with Wannadb. + +Example: + To create a Flask app and register the 'core_routes' Blueprint: + + ```python + from flask import Flask + from core_routes import core_routes + + app = Flask(__name__) + app.register_blueprint(core_routes) + ``` + +Author: Leon Wenderoth +""" +import logging.config +import pickle +from typing import Optional + +from flask import Blueprint, make_response, request +from celery.result import AsyncResult +from wannadb.data.data import Attribute, Document, InformationNugget +from wannadb.statistics import Statistics +from wannadb_web.Redis.RedisCache import RedisCache +from wannadb_web.util import tokenDecode +from wannadb_web.worker.data import Signals + +from wannadb_web.worker.tasks import CreateDocumentBase, BaseTask, DocumentBaseAddAttributes, DocumentBaseConfirmNugget, DocumentBaseInteractiveTablePopulation, DocumentBaseLoad, \ + DocumentBaseUpdateAttributes, DocumentBaseGetOrderedNuggets + + +core_routes = Blueprint('core_routes', __name__, url_prefix='/core') + +logger = logging.getLogger(__name__) + + +@core_routes.route('/document_base', methods=['POST']) +def create_document_base(): + """ + Endpoint for creating a document base. + + This endpoint is used to create a document base from a list of document ids and a list of attributes. + + Example Form Payload: + { + "authorization": "your_authorization_token" + "organisationId": "your_organisation_id", + "baseName": "your_document_base_name", + "document_ids": "1, 2, 3", + "attributes": "plane,car,bike" + } + """ + form = request.form + authorization = form.get("authorization") + organisation_id: Optional[int] = form.get("organisationId") + base_name = form.get("baseName") + document_ids: Optional[list[int]] = form.get("document_ids") + attributes_strings = form.get("attributes") + if (organisation_id is None or base_name is None or document_ids is None or attributes_strings is None + or authorization is None): + return make_response({"error": "missing parameters"}, 400) + _token = tokenDecode(authorization) + + if _token is False: + return make_response({"error": "invalid token"}, 401) + + attributes_strings = attributes_strings.split(",") + document_ids = document_ids.split(",") + + statistics = Statistics(False) + user_id = _token.id + + statisticsDump = pickle.dumps(statistics) + task = CreateDocumentBase().apply_async(args=(user_id, document_ids, attributes_strings, statisticsDump, + base_name, organisation_id)) + + return make_response({'task_id': task.id}, 202) + +@core_routes.route('/document_base/load', methods=['POST']) +def load_document_base(): + + """ + Endpoint for loading a document base. + + This endpoint is used to load a document base from a name and an organisation id. + + Example Form Payload: + { + "authorization": "your_authorization_token" + "organisationId": "your_organisation_id", + "baseName": "your_document_base_name", + } + """ + form = request.form + authorization = form.get("authorization") + organisation_id: Optional[int] = form.get("organisationId") + base_name = form.get("baseName") + if (organisation_id is None or base_name is None + or authorization is None): + return make_response({"error": "missing parameters"}, 400) + _token = tokenDecode(authorization) + + if _token is False: + return make_response({"error": "invalid token"}, 401) + + user_id = _token.id + + task = DocumentBaseLoad().apply_async(args=(user_id, base_name, organisation_id)) + + return make_response({'task_id': task.id}, 202) + +@core_routes.route('/document_base/interactive', methods=['POST']) +def interactive_document_base(): + """ + Endpoint for interactive document population + + This endpoint is used to load a document base from a name and an organisation id. + + Example Form Payload: + { + "authorization": "your_authorization_token" + "organisationId": "your_organisation_id", + "baseName": "your_document_base_name", + } + """ + form = request.form + authorization = form.get("authorization") + organisation_id: Optional[int] = form.get("organisationId") + base_name = form.get("baseName") + + if (organisation_id is None or base_name is None + or authorization is None): + return make_response({"error": "missing parameters"}, 400) + _token = tokenDecode(authorization) + + if _token is False: + return make_response({"error": "invalid token"}, 401) + + user_id = _token.id + + task = DocumentBaseInteractiveTablePopulation().apply_async(args=(user_id, base_name, organisation_id)) + + return make_response({'task_id': task.id}, 202) + + +@core_routes.route('/document_base/attributes/add', methods=['POST']) +def document_base_attribute_add(): + """ + Endpoint for add attributes to a document base. + + This endpoint is used to add attributes to a document base from a list of attributes. + + Example Form Payload: + { + "authorization": "your_authorization_token" + "organisationId": "your_organisation_id", + "baseName": "your_document_base_name", + "attributes": "plane,car,bike" + } + """ + form = request.form + authorization = form.get("authorization") + organisation_id = form.get("organisationId") + base_name = form.get("baseName") + attributes_string = form.get("attributes") + if (organisation_id is None or base_name is None or attributes_string is None + or authorization is None): + return make_response({"error": "missing parameters"}, 400) + _token = tokenDecode(authorization) + + if _token is False: + return make_response({"error": "invalid token"}, 401) + + attributes_strings = attributes_string.split(",") + + + attributes = [] + for att in attributes_string: + attributes.append(Attribute(att)) + + statistics = Statistics(False) + user_id = _token.id + + #attributesDump = pickle.dumps(attributes) + #statisticsDump = pickle.dumps(statistics) + task = DocumentBaseAddAttributes().apply_async(args=(user_id, attributes_strings, + base_name, organisation_id)) + + return make_response({'task_id': task.id}, 202) + +@core_routes.route('/document_base/attributes/update', methods=['POST']) +def document_base_attribute_update(): + """ + Endpoint for update the attributes of a document base. + + This endpoint is used to update the attributes of a document base from a list of attributes. + + Example Form Payload: + { + "authorization": "your_authorization_token" + "organisationId": "your_organisation_id", + "baseName": "your_document_base_name", + "attributes": "plane,car,bike" + } + """ + form = request.form + authorization = form.get("authorization") + organisation_id = form.get("organisationId") + base_name = form.get("baseName") + attributes_string = form.get("attributes") + if (organisation_id is None or base_name is None or attributes_string is None + or authorization is None): + return make_response({"error": "missing parameters"}, 400) + _token = tokenDecode(authorization) + + if _token is False: + return make_response({"error": "invalid token"}, 401) + + attributes_string = attributes_string.split(",") + + #attributes = [] + #for att in attributes_string: + # attributes.append(Attribute(att)) + # + #statistics = Statistics(False) + + user_id = _token.id + + task = DocumentBaseUpdateAttributes().apply_async(args=( + user_id, + attributes_string, + base_name, + organisation_id + )) + + return make_response({'task_id': task.id}, 202) + + +# @core_routes.route('/longtask', methods=['POST']) +# def longtask(): +# task = long_task.apply_async() +# return jsonify(str(task.id)), 202, {'Location': url_for('core_routes.task_status', +# task_id=task.id)} + + +@core_routes.route('/status//', methods=['GET']) +def task_status(token: str,task_id: str): + + _token = tokenDecode(token) + + if _token is False: + return make_response({"error": "invalid token"}, 401) + user_id = _token.id + + task: AsyncResult = BaseTask().AsyncResult(task_id=task_id) + status = task.status + if status == "FAILURE": + return make_response({"state": "FAILURE", "meta": Signals(user_id).to_json()}, 500) + if status == "SUCCESS": + signals = Signals(user_id).to_json() + return make_response({"state": "SUCCESS", "meta": signals}, 200) + if status is None: + return make_response({"error": "task not found"}, 500) + + signals = Signals(user_id).to_json() + return make_response({"state": task.status, "meta": signals}, 202) + + +@core_routes.route('/status/', methods=['POST']) +def task_update(task_id: str): + signals = Signals(task_id) + + ## todo: hier muss feedback emitted werden im format: + ## { ------------------ } + + signals.feedback_request_from_ui.emit(request.json.get("feedback")) + + +@core_routes.route('/document_base/order/nugget', methods=['POST']) +def sort_nuggets(): + """ + Endpoint for creating a document base. + + This endpoint is used to create a document base from a list of document ids and a list of attributes. + + Example Form Payload: + { + "authorization": "your_authorization_token" + "organisationId": "your_organisation_id", + "baseName": "your_document_base_name", + "documentName": "your_document_name", + "documentContent": "your_document_content", + } + """ + form = request.form + authorization = form.get("authorization") + organisation_id: Optional[int] = form.get("organisationId") + base_name = form.get("baseName") + document_name = form.get("documentName") + document_content = form.get("documentContent") + if organisation_id is None or base_name is None or document_name is None or document_content is None or authorization is None: + return make_response({"error": "missing parameters"}, 400) + + _token = tokenDecode(authorization) + + if _token is False: + return make_response({"error": "invalid token"}, 401) + + user_id = _token.id + + task = DocumentBaseGetOrderedNuggets().apply_async(args=( + user_id, + base_name, + organisation_id, + document_name, + document_content + )) + + return make_response({'task_id': task.id}, 202) + +@core_routes.route('/document_base/confirm/nugget/custom', methods=['POST']) +def confirm_nugget_custom(): + """ + Endpoint to confirm a custom nugget. + + Example Form Payload: + { + "authorization": "your_authorization_token" + "organisationId": "your_organisation_id", + "baseName": "your_document_base_name", + "documentName": "your_document_name", + "documentContent": "your_document_content", + "nuggetText": "nugget_as_text", + "startIndex": "start_index_of_nugget", + "endIndex": "end_index_of_nugget", + "interactiveCallTaskId": "interactive_call_task_id" + } + """ + form = request.form + + authorization = form.get("authorization") + organisation_id: Optional[int] = form.get("organisationId") + base_name = form.get("baseName") + + document_name = form.get("documentName") + document_content = form.get("documentContent") + nugget_text = form.get("nuggetText") + start_index: Optional[int] = form.get("startIndex") + end_index: Optional[int] = form.get("endIndex") + + i_task_id = form.get("interactiveCallTaskId") + + if (organisation_id is None + or base_name is None + or document_name is None + or document_content is None + or authorization is None + or nugget_text is None + or start_index is None + or end_index is None + or i_task_id is None): + + return make_response({"error": "missing parameters"}, 400) + + _token = tokenDecode(authorization) + + if _token is False: + return make_response({"error": "invalid token"}, 401) + + user_id = _token.id + + task = DocumentBaseConfirmNugget().apply_async(args=( + user_id, + base_name, + organisation_id, + document_name, + document_content, + nugget_text, + start_index, + end_index, + i_task_id + )) + + return make_response({'task_id': task.id}, 202) + +@core_routes.route('/document_base/confirm/nugget/match', methods=['POST']) +def confirm_nugget_match(): + """ + Endpoint to confirm a match nugget. + + Example Form Payload: + { + "authorization": "your_authorization_token" + "organisationId": "your_organisation_id", + "baseName": "your_document_base_name", + "documentName": "your_document_name", + "documentContent": "your_document_content", + "nuggetText": "nugget_as_text", + "startIndex": "start_index_of_nugget", + "endIndex": "end_index_of_nugget", + "interactiveCallTaskId": "interactive_call_task_id" + } + """ + form = request.form + + authorization = form.get("authorization") + organisation_id: Optional[int] = form.get("organisationId") + base_name = form.get("baseName") + + document_name = form.get("documentName") + document_content = form.get("documentContent") + nugget_text = form.get("nuggetText") + start_index: Optional[int] = form.get("startIndex") + end_index: Optional[int] = form.get("endIndex") + + i_task_id = form.get("interactiveCallTaskId") + + if (organisation_id is None + or base_name is None + or document_name is None + or document_content is None + or authorization is None + or nugget_text is None + or start_index is None + or end_index is None + or i_task_id is None): + + return make_response({"error": "missing parameters"}, 400) + + _token = tokenDecode(authorization) + + if _token is False: + return make_response({"error": "invalid token"}, 401) + + user_id = _token.id + + document = Document(document_name, document_content) + + nugget = InformationNugget(document, start_index, end_index) + + task = DocumentBaseConfirmNugget().apply_async(args=( + user_id, + base_name, + organisation_id, + document_name, + document_content, + nugget, + start_index, + end_index, + i_task_id + )) + + return make_response({'task_id': task.id}, 202) + diff --git a/wannadb_web/routing/dev.py b/wannadb_web/routing/dev.py new file mode 100644 index 00000000..7f2056c3 --- /dev/null +++ b/wannadb_web/routing/dev.py @@ -0,0 +1,40 @@ +from flask import Blueprint, make_response + +from wannadb_web.postgres.queries import _getDocument +from wannadb_web.postgres.transactions import createUserTable, createDocumentsTable, createOrganisationTable, \ + createMembershipTable, \ + dropTables, dropSchema, createSchema + +dev_routes = Blueprint('dev_routes', __name__, url_prefix='/dev') + + +@dev_routes.route('/createTables/', methods=['POST']) +def create_tables(schema): + try: + createSchema(schema) + createUserTable(schema) + createOrganisationTable(schema) + createMembershipTable(schema) + createDocumentsTable(schema) + return f'create Tables in {schema} successfully' + except Exception as e: + print(f"create Tables in {schema} failed because: \n", e) + + +@dev_routes.route('/dropTables/', methods=['POST']) +def drop_tables(schema): + try: + dropTables(schema) + dropSchema(schema) + return f'drop Tables in {schema} successfully' + except Exception as e: + print("drop Tables in {schema} failed because: \n", e) + + +@dev_routes.route('/getDocument/<_id>', methods=['GET']) +def get_document(_id): + try: + response = _getDocument(_id) + return make_response(response, 200) + except Exception as e: + return make_response({"message": f"getFile with {_id} ", "details": str(e)}, 400) diff --git a/wannadb_web/routing/files.py b/wannadb_web/routing/files.py new file mode 100644 index 00000000..2b396480 --- /dev/null +++ b/wannadb_web/routing/files.py @@ -0,0 +1,127 @@ +from flask import Blueprint, request, make_response + +from wannadb_web.postgres.queries import deleteDocumentContent, getDocument, getDocumentsForOrganization, \ + updateDocumentContent, getDocumentBaseForOrganization + +from wannadb_web.util import tokenDecode +from wannadb_web.postgres.transactions import addDocument + +main_routes = Blueprint('main_routes', __name__, url_prefix='/data') + + +@main_routes.route('/upload/file', methods=['POST']) +def upload_files(): + + files = request.files.getlist('file') + form = request.form + + authorization = request.headers.get("authorization") + organisation_id = int(form.get("organisationId")) + + token = tokenDecode(authorization) + if token is None: + return make_response({'error': 'no authorization'}, 401) + + + document_ids: list = [] + + for file in files: + content_type = file.content_type + if 'text/plain' in content_type: + filename = file.filename + print("name:" + filename) + content = str(file.stream.read().decode('utf-8')) + dokument_id = addDocument(filename, content, organisation_id, token.id) + document_ids.append(dokument_id) + else: + document_ids.append(f"wrong type {content_type}") + + if all(isinstance(document_ids, str) for _ in document_ids): + return make_response(document_ids, 400) + if any(isinstance(document_ids, str) for _ in document_ids): + return make_response(document_ids, 207) + return make_response(document_ids, 201) + + +@main_routes.route('/organization/get/files/<_id>', methods=['GET']) +def get_files_for_organization(_id): + authorization = request.headers.get("authorization") + org_id = int(_id) + + token = tokenDecode(authorization) + if token is None: + return make_response({'error': 'no authorization'}, 401) + + + documents = getDocumentsForOrganization(org_id) + + return make_response(documents, 200) + +@main_routes.route('/organization/get/documentbase/<_id>', methods=['GET']) +def get_documentbase_for_organization(_id): + authorization = request.headers.get("authorization") + org_id = int(_id) + + token = tokenDecode(authorization) + if token is None: + return make_response({'error': 'no authorization'}, 401) + + + document_base = getDocumentBaseForOrganization(org_id) + + return make_response(document_base, 200) + +@main_routes.route('/update/file/content', methods=['POST']) +def update_file_content(): + authorization = request.headers.get("authorization") + + token = tokenDecode(authorization) + if token is None: + return make_response({'error': 'no authorization'}, 401) + + + data = request.get_json() + docId = data.get('documentId') + newContent = data.get('newContent') + + status = updateDocumentContent(docId, newContent) + + return make_response({"status": status}, 200) + +@main_routes.route('/file/delete', methods=['POST']) +def delete_file(): + authorization = request.headers.get("authorization") + + token = tokenDecode(authorization) + if token is None: + return make_response({'error': 'no authorization'}, 401) + + + data = request.get_json() + docId = data.get('documentId') + + status = deleteDocumentContent(docId) + + return make_response({"status": status}, 200) + +@main_routes.route('/get/file/<_id>', methods=['GET']) +def get_file(_id): + + authorization = request.headers.get("authorization") + document_id = int(_id) + + token = tokenDecode(authorization) + if token is None: + return make_response({'error': 'no authorization'}, 401) + + + document_ids: list = [] + + document = getDocument(document_id, token.id) + + if document is None: + return make_response(document_ids, 404) + if isinstance(document, str): + return make_response(document, 200) + if isinstance(document, bytes): + return make_response(document, 206) diff --git a/wannadb_web/routing/user.py b/wannadb_web/routing/user.py new file mode 100644 index 00000000..a170b51e --- /dev/null +++ b/wannadb_web/routing/user.py @@ -0,0 +1,207 @@ +# main_routes.py +from flask import Blueprint, request, make_response + +from wannadb_web.util import Token, tokenEncode, tokenDecode +from wannadb_web.postgres.queries import checkPassword, getMembersOfOrganisation, getOrganisationFromUserId, \ + getOrganisationIDsFromUserId, getOrganisationName, getUserNameSuggestion +from wannadb_web.postgres.transactions import (addUser, addOrganisation, addUserToOrganisation2, deleteUser, + leaveOrganisation) + +user_management = Blueprint('user_management', __name__) + + +@user_management.route('/register', methods=['POST']) +def register(): + data = request.get_json() + username = data.get('username') + password = data.get('password') + + _id = addUser(username, password) + + if _id > 0: + user = Token(username, _id) + token = tokenEncode(user.json()) + + return make_response({'message': 'User registered successfully', + 'token': token}, 201) + elif _id < 0: + return make_response({'message': 'Conflicting username'}, 409) + else: + return make_response({'message': 'User register failed'}, 422) + + +@user_management.route('/login', methods=['POST']) +def login(): + data = request.get_json() + username = data.get('username') + password = data.get('password') + + pwcheck = checkPassword(username, password) + if isinstance(pwcheck, Exception): + raise pwcheck + if isinstance(pwcheck, bool): + return make_response({'message': 'Wrong Password'}, 401) + if isinstance(pwcheck, int): + _id = pwcheck + user = Token(username, _id) + token = tokenEncode(user.json()) + + return make_response({'message': 'Log in successfully', + 'token': token}, 200) + + +@user_management.route('/deleteUser/', methods=['POST']) +def delete_user(): + data = request.get_json() + username = data.get('username') + password = data.get('password') + authorization = request.headers.get("Authorization") + + if authorization is None: + return make_response({'message': 'no authorization '}, 401) + + + token = tokenDecode(authorization) + if token is None: + return make_response({'message': 'no authorization '}, 400) + + + pwcheck = checkPassword(username, password) + _id = None + if isinstance(pwcheck, Exception): + raise pwcheck + if isinstance(pwcheck, bool): + return make_response({'message': 'Wrong Password'}, 401) + if isinstance(pwcheck, int): + _id = pwcheck + + if token.id != _id: + return make_response({'message': 'User not authorised '}, 401) + + response = deleteUser(username, password) + + if response: + return make_response({'message': 'User deleted'}, 204) + return make_response({'message': 'User deleted failed'}, 409) + + +@user_management.route('/createOrganisation', methods=['POST']) +def create_organisation(): + data = request.get_json() + authorization = request.headers.get("Authorization") + + organisation_name = data.get("organisationName") + + organisation_id, error = addOrganisation(organisation_name, authorization) + if error is None: + return make_response({'organisation_id': organisation_id}, 200) + return make_response({"error": error}, 409) + + +@user_management.route('/leaveOrganisation', methods=['POST']) +def leave_organisation(): + data = request.get_json() + authorization = request.headers.get("Authorization") + + organisationId = data.get("organisationId") + + success, error = leaveOrganisation(organisationId, authorization) + if success: + return make_response({'status': True}, 200) + return make_response({"status": False, "msg": str(error)}, 500) + + +@user_management.route('/getOrganisations', methods=['GET']) +def get_organisations(): + authorization = request.headers.get("authorization") + token = tokenDecode(authorization) + if token is None: + return make_response({}, 401) + + organisation_ids, error = getOrganisationIDsFromUserId(token.id) + if error is None: + return make_response({'organisation_ids': organisation_ids}, 200) + if organisation_ids[0] < 0: + return make_response({'user is in no organisation'}, 404) + return make_response({"error": error}, 409) + + +@user_management.route('/getOrganisationName/<_id>', methods=['GET']) +def get_organisation_name(_id): + authorization = request.headers.get("authorization") + token = tokenDecode(authorization) + if token is None: + return make_response({}, 401) + + organisation_name = getOrganisationName(_id) + + if organisation_name == -1: + return make_response({'organisation not found': organisation_name}, 404) + if isinstance(organisation_name, str): + return make_response({"organisation_name": organisation_name}, 200) + return make_response({"error": "error"}, 409) + + +@user_management.route('/getOrganisationNames', methods=['GET']) +def get_organisation_names(): + authorization = request.headers.get("authorization") + token = tokenDecode(authorization) + if token is None: + return make_response({}, 401) + + organisations, error = getOrganisationFromUserId(token.id) + if error is None: + return make_response({'organisations': organisations}, 200) + if organisations <= 0: + return make_response({'user is in no organisation'}, 404) + return make_response({"error": error}, 409) + + +@user_management.route('/addUserToOrganisation', methods=['POST']) +def add_user_to_organisation(): + authorization = request.headers.get("authorization") + token = tokenDecode(authorization) + if token is None: + return make_response({'error': 'no authorization'}, 401) + + data = request.get_json() + organisation_name = data.get("organisationId") + new_user = data.get("newUser") + + organisation_id, error = addUserToOrganisation2(organisation_name, new_user) + + if error: + return make_response({"error": error}, 409) + return make_response({'organisation_id': organisation_id}, 200) + + +@user_management.route('/getOrganisationMembers/<_id>', methods=['GET']) +def get_organisation_members(_id): + authorization = request.headers.get("authorization") + token = tokenDecode(authorization) + if token is None: + return make_response({'error': 'no authorization'}, 401) + + members_raw = getMembersOfOrganisation(_id) + if members_raw is None: + return make_response({'error': 'organisation ' + _id + ' not found'}, 404) + + members = [] + for member in members_raw: + members.append(member[0]) + + return make_response({"members": members}, 200) + + +@user_management.route('/get/user/suggestion/<_prefix>', methods=['GET']) +def get_user_suggestion(_prefix): + authorization = request.headers.get("authorization") + token = tokenDecode(authorization) + if token is None: + return make_response({'error': 'no authorization'}, 401) + + members_raw = getUserNameSuggestion(_prefix) + result = [] + for member in members_raw: + result.append(member[0]) + return make_response({"usernames": result}, 200) diff --git a/wannadb_web/util.py b/wannadb_web/util.py new file mode 100644 index 00000000..569e5364 --- /dev/null +++ b/wannadb_web/util.py @@ -0,0 +1,51 @@ +import datetime +import json +import logging +from enum import Enum +from typing import Any + +import jwt + +logger: logging.Logger = logging.getLogger(__name__) + + +class Authorisation(Enum): + Owner = 0 + Admin = 1 + Member = 10 + + +_jwtkey = "secret" + + +def tokenEncode(obj: dict[str, Any]): + obj["exp"] = datetime.datetime.now() + datetime.timedelta(hours=1) + return jwt.encode(obj, _jwtkey, algorithm="HS256") + + +def tokenDecode(string: str): + if string is None or len(string) < 2: + raise ValueError("string value is: ", string) + try: + decoded_token = jwt.decode(string, _jwtkey, leeway=datetime.timedelta(minutes=1), algorithms="HS256", + verify=True) + #except jwt.ExpiredSignatureError: + except: + return None + user = decoded_token.get('user') + _id = int(decoded_token.get('id')) + exp = decoded_token.get('exp') + return Token(user, _id, exp) + + +class Token: + user: str + id: int + + def __init__(self, user: str, _id: int, exp=datetime.datetime.now() + datetime.timedelta(hours=1)): + self.user = user + self.id = _id + self.exp = exp + + def json(self): + return {"user": self.user, "id": self.id} diff --git a/wannadb_web/worker/Web_API.py b/wannadb_web/worker/Web_API.py new file mode 100644 index 00000000..dbde9840 --- /dev/null +++ b/wannadb_web/worker/Web_API.py @@ -0,0 +1,492 @@ +import csv +import io +import json +import logging +import time +from typing import Optional, Any + +import wannadb +from wannadb import resources +from wannadb.configuration import Pipeline +from wannadb.data.data import Attribute, Document, DocumentBase, InformationNugget +from wannadb.data.signals import CachedDistanceSignal +from wannadb.interaction import EmptyInteractionCallback, InteractionCallback +from wannadb.matching.distance import SignalsMeanDistance +from wannadb.matching.matching import RankingBasedMatcher +from wannadb.preprocessing.embedding import BERTContextSentenceEmbedder, RelativePositionEmbedder, \ + SBERTTextEmbedder, SBERTLabelEmbedder +from wannadb.preprocessing.extraction import StanzaNERExtractor, SpacyNERExtractor +from wannadb.preprocessing.label_paraphrasing import OntoNotesLabelParaphraser, \ + SplitAttributeNameLabelParaphraser +from wannadb.preprocessing.normalization import CopyNormalizer +from wannadb.preprocessing.other_processing import ContextSentenceCacher +from wannadb.statistics import Statistics +from wannadb.status import StatusCallback +from wannadb_web.SQLite.Cache_DB import SQLiteCacheDBWrapper +from wannadb_web.postgres.queries import getDocument_by_name, getDocumentByNameAndContent, updateDocumentContent, \ + getDocument +from wannadb_web.postgres.transactions import addDocument +from wannadb_web.worker.data import Signals, CustomMatchFeedback, NuggetMatchFeedback, NoMatchFeedback + +logger = logging.getLogger(__name__) + + +class WannaDB_WebAPI: + + def __init__(self, user_id: int, document_base_name: str, organisation_id: int): + self._document_id: Optional[int] = None + self._document_base: Optional[DocumentBase] = None + self.user_id = user_id + self._feedback: Optional[dict[str, Any]] = None + + self.signals = Signals(str(self.user_id)) + self.signals.reset() + self.sqLiteCacheDBWrapper = SQLiteCacheDBWrapper(user_id, db_file=":memory:") + self.document_base_name = document_base_name + self.organisation_id = organisation_id + + def status_callback_fn(message, progress): + self.signals.status.emit(str(message) + " " + str(progress)) + + self.status_callback = StatusCallback(status_callback_fn) + + def interaction_callback_fn(pipeline_element_identifier, feedback_request): + + feedback_request["identifier"] = pipeline_element_identifier + + start_time = time.time() + while (time.time() - start_time) < 300: + msg = self.signals.match_feedback.msg + + + self.signals.feedback_request_to_ui.emit(feedback_request) + + if msg is not None: + self.signals.status.emit("Feedback received from UI") + self.signals.match_feedback.emit(None) + if isinstance(msg, CustomMatchFeedback): + return {"message": "custom-match", "document": msg.document, "start": msg.start} + elif isinstance(msg, NuggetMatchFeedback): + return {"message": "is-match", "nugget": msg.nugget, "not_a_match": msg.not_a_match} + elif isinstance(msg, NoMatchFeedback): + return {"message": "no-match-in-document", "nugget": msg.nugget, "not_a_match": msg.not_a_match} + else: + raise TypeError("Unknown match_feedback type!") + time.sleep(1) + raise TimeoutError("no match_feedback in time provided") + + self.interaction_callback = InteractionCallback(interaction_callback_fn) + + if wannadb.resources.MANAGER is None: + self.signals.error.emit(Exception("Resource Manager not initialized!")) + raise Exception("Resource Manager not initialized!") + if self.sqLiteCacheDBWrapper.cache_db.conn is None: + self.signals.error.emit(Exception("Cache db could not be initialized!")) + raise Exception("Cache db could not be initialized!") + logger.info("WannaDB_WebAPI initialized") + + @property + def feedback(self): + if self._feedback is None: + raise Exception("Feedback is not set!") + return self._feedback + + @feedback.setter + def feedback(self, value: dict): + self._feedback = value + + @property + def document_id(self): + if self._document_id is None: + raise Exception("Document ID not set!") + return self._document_id + + @document_id.setter + def document_id(self, value: int): + self._document_id = value + + @property + def document_base(self): + if self._document_base is None: + raise Exception("Document base not loaded!") + return self._document_base + + @document_base.setter + def document_base(self, value: DocumentBase): + if not isinstance(value, DocumentBase): + raise TypeError("Document base must be of type DocumentBase!") + self._document_base = value + self.signals.document_base_to_ui.emit(value) + return + + def get_ordert_nuggets(self, document_id: int): + document = getDocument(document_id, self.user_id) + if document is None: + logger.error(f"Document with id {document_id} not found!") + self.signals.error.emit(Exception(f"Document with id {document_id} not found!")) + return + document_name = document[0] + logger.debug("get_ordert_nuggets") + self.signals.status.emit("get_ordert_nuggets") + for document in self.document_base.documents: + if document.name == document_name: + self.signals.ordert_nuggets.emit(list(sorted(document.nuggets, key=lambda x: x[CachedDistanceSignal]))) + return + logger.error(f"Document \"{document_name}\" not found in document base!") + self.signals.error.emit(Exception(f"Document \"{document_name}\" not found in document base!")) + + def get_ordered_nuggets_by_doc_name(self, document_name: str, document_content: str): + document = getDocumentByNameAndContent(document_name, document_content, self.user_id) + if document is None: + logger.error(f"Document {document_name} not found!") + self.signals.error.emit(Exception(f"Document {document_name} not found!")) + return + logger.debug("get_ordered_nuggets_by_doc_name") + self.signals.status.emit("get_ordered_nuggets_by_doc_name") + for document in self.document_base.documents: + if document.name == document_name: + document_obj = Document(document_name, document_content) + self.signals.ordert_nuggets.emit( + list(sorted(document_obj.nuggets, key=lambda x: x[CachedDistanceSignal]))) + return + logger.error(f"Document \"{document_name}\" not found in document base!") + self.signals.error.emit(Exception(f"Document \"{document_name}\" not found in document base!")) + + def create_document_base(self, documents: list[Document], attributes: list[Attribute], statistics: Statistics): + logger.debug("Called slot 'create_document_base'.") + self.signals.status.emit("create_document_base") + try: + self.sqLiteCacheDBWrapper.reset_cache_db() + + document_base = DocumentBase(documents, attributes) + self.sqLiteCacheDBWrapper.cache_db.create_input_docs_table("input_document", document_base.documents) + + if not document_base.validate_consistency(): + logger.error("Document base is inconsistent!") + self.signals.error.emit(Exception("Document base is inconsistent!")) + + # load default preprocessing phase + self.signals.status.emit("Loading preprocessing phase...") + + # noinspection PyTypeChecker + preprocessing_phase = Pipeline([ + StanzaNERExtractor(), + SpacyNERExtractor("SpacyEnCoreWebLg"), + ContextSentenceCacher(), + CopyNormalizer(), + OntoNotesLabelParaphraser(), + SplitAttributeNameLabelParaphraser(do_lowercase=True, splitters=[" ", "_"]), + SBERTLabelEmbedder("SBERTBertLargeNliMeanTokensResource"), + SBERTTextEmbedder("SBERTBertLargeNliMeanTokensResource"), + BERTContextSentenceEmbedder("BertLargeCasedResource"), + RelativePositionEmbedder() + ]) + + preprocessing_phase(document_base, EmptyInteractionCallback(), self.status_callback, statistics) + + self.document_base = document_base + + self.signals.statistics.emit(statistics) + self.signals.finished.emit(1) + self.signals.status.emit("Finished!") + + + except Exception as e: + logger.error(str(e)) + self.signals.error.emit(e) + raise e + + def load_document_base_from_bson(self): + logger.debug("Called function 'load_document_base_from_bson'.") + try: + self.sqLiteCacheDBWrapper.reset_cache_db() + self.signals.reset() + + document_id, document = getDocument_by_name(self.document_base_name, self.organisation_id, self.user_id) + if not isinstance(document, bytes): + logger.error("document is not a DocumentBase!") + self.signals.error.emit(Exception("document is not a DocumentBase!")) + return + + document_base = DocumentBase.from_bson(document) + + if not document_base.validate_consistency(): + logger.error("Document base is inconsistent!") + self.signals.error.emit(Exception("Document base is inconsistent!")) + return + + for attribute in document_base.attributes: + self.sqLiteCacheDBWrapper.cache_db.create_table_by_name(attribute.name) + self.sqLiteCacheDBWrapper.cache_db.create_input_docs_table("input_document", document_base.documents) + + logger.info(f"Document base loaded from BSON with id {document_id}.") + self.document_base = document_base + self.document_id = document_id + + except Exception as e: + logger.error(str(e)) + self.signals.error.emit(e) + raise e + + def save_document_base_to_bson(self): + logger.debug("Called function 'save_document_base_to_bson'.") + + try: + document_id = addDocument(self.document_base_name, self.document_base.to_bson(), self.organisation_id, + self.user_id) + + if document_id is None: + logger.error("Document base could not be saved to BSON!") + elif document_id == -1: + logger.error( + f"Document base could not be saved to BSON! Document {self.document_base_name} already exists!") + self.signals.error.emit( + Exception( + f"Document base could not be saved to BSON! Document {self.document_base_name} already exists!")) + elif document_id > 0: + logger.info(f"Document base saved to BSON with ID {document_id}.") + self.signals.status.emit(f"Document base saved to BSON with ID {document_id}.") + self.document_id = document_id + return + except Exception as e: + logger.error(str(e)) + self.signals.error.emit(e) + raise e + + def update_document_base_to_bson(self): + logger.debug("Called function 'save_document_base_to_bson'.") + + if self.document_id is None: + logger.error("Document ID not set!") + self.signals.error.emit(Exception("Document ID not set!")) + return + try: + print("BASE") + print(self.document_base) + print("ID") + print(self.document_id) + print("ATT") + print(self.document_base.attributes) + + status = updateDocumentContent(self.document_id, self.document_base.to_bson()) + if status is False: + logger.error(f"Document base could not be saved to BSON! Document {self.document_id} does not exist!") + elif status is True: + logger.info(f"Document base saved to BSON with ID {self.document_id}.") + self.signals.status.emit(f"Document base saved to BSON with ID {self.document_id}.") + logger.error("Document base could not be saved to BSON!") + return + except Exception as e: + logger.error(str(e)) + self.signals.error.emit(e) + raise e + + # todo: below not implemented yet + def save_table_to_csv(self): + logger.debug("Called function 'save_table_to_csv'.") + + try: + buffer = io.StringIO() + + # check that the table is complete + for attribute in self.document_base.attributes: + for document in self.document_base.documents: + if attribute.name not in document.attribute_mappings.keys(): + logger.error("Cannot save a table with unpopulated attributes!") + self.signals.error.emit( + Exception("Cannot save a table with unpopulated attributes!")) + + # TODO: currently stores the text of the first matching nugget (if there is one) + table_dict = self.document_base.to_table_dict("text") + headers = list(table_dict.keys()) + rows = [] + for ix in range(len(table_dict[headers[0]])): + row = [] + for header in headers: + if header == "document-name": + row.append(table_dict[header][ix]) + elif not table_dict[header][ix]: + row.append(None) + else: + row.append(table_dict[header][ix][0]) # type: ignore + rows.append(row) + writer = csv.writer(buffer, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL) + writer.writerow(headers) + writer.writerows(rows) + except Exception as e: + logger.error(str(e)) + self.signals.error.emit(e) + raise e + + def add_attribute(self, attribute: Attribute): + logger.debug("Called function 'add_attribute'.") + if attribute in self.document_base.attributes: + logger.error("Attribute name already exists!") + self.signals.error.emit(Exception("Attribute name already exists!")) + else: + self.document_base.attributes.append(attribute) + logger.debug(f"Attribute '{attribute.name}' added.") + self.signals.status.emit(f"Attribute '{attribute.name}' added.") + self.sqLiteCacheDBWrapper.cache_db.create_table_by_name(attribute.name) + + def add_attributes(self, attributes: list[Attribute]): + logger.debug("Called function 'add_attributes'.") + already_existing_names = [] + for attribute in attributes: + if attribute in self.document_base.attributes: + logger.info(f"Attribute name '{attribute.name}' already exists and was thus not added.") + already_existing_names.append(attribute) + elif attribute is None: + logger.info("Attribute name must not be empty and was thus ignored.") + else: + self.document_base.attributes.append(attribute) + self.sqLiteCacheDBWrapper.cache_db.create_table_by_name(attribute.name) + logger.debug(f"Attribute '{attribute.name}' added.") + return already_existing_names + + def remove_attributes(self, attributes: list[Attribute]): + logger.debug("Called function 'remove_attribute'.") + for attribute in attributes: + if attribute in self.document_base.attributes: + for document in self.document_base.documents: + if attribute.name in document.attribute_mappings.keys(): + del document.attribute_mappings[attribute.name] + + for old_attribute in self.document_base.attributes: + if old_attribute == attribute: + self.document_base.attributes.remove(attribute) + break + self.signals.status.emit(f"Attribute '{attribute.name}' removed.") + else: + logger.error("Attribute name does not exist!") + self.signals.error.emit(Exception("Attribute name does not exist!")) + + def update_attributes(self, attributes: list[Attribute]): + logger.debug("Called function 'update_attributes'.") + self.document_base.attributes.clear() + for attribute in attributes: + if attribute is None: + logger.info("Attribute name must not be empty and was thus ignored.") + else: + self.document_base.attributes.append(attribute) + self.sqLiteCacheDBWrapper.cache_db.create_table_by_name(attribute.name) + logger.debug(f"Attribute '{attribute.name}' added.") + + def forget_matches_for_attribute(self, attribute: Attribute): + logger.debug("Called function 'forget_matches_for_attribute'.") + + self.sqLiteCacheDBWrapper.cache_db.delete_table(attribute.name) + try: + if attribute in self.document_base.attributes: + for document in self.document_base.documents: + if attribute.name in document.attribute_mappings.keys(): + del document.attribute_mappings[attribute.name] + self.signals.status.emit(f"Matches for attribute '{attribute.name}' forgotten.") + self.signals.document_base_to_ui.emit(self.document_base) + else: + logger.error("Attribute name does not exist!") + self.signals.error.emit(Exception("Attribute name does not exist!")) + except Exception as e: + logger.error(str(e)) + self.signals.error.emit(e) + raise e + + def forget_matches(self): + logger.debug("Called function 'forget_matches'.") + for attribute in self.document_base.attributes: + self.sqLiteCacheDBWrapper.cache_db.delete_table(attribute.name) + self.sqLiteCacheDBWrapper.cache_db.create_table_by_name(attribute.name) + try: + for document in self.document_base.documents: + document.attribute_mappings.clear() + self.signals.document_base_to_ui.emit(self.document_base) + self.signals.finished.emit(1) + except Exception as e: + logger.error(str(e)) + self.signals.error.emit(e) + raise e + + ## todo: below not implemented yet + + def save_statistics_to_json(self): + logger.debug("Called function 'save_statistics_to_json'.") + try: + return json.dumps(self.signals.statistics.to_json(), indent=2) + except Exception as e: + logger.error(str(e)) + self.signals.error.emit(e) + raise e + + def interactive_table_population(self): + logger.debug("Called slot 'interactive_table_population'.") + + try: + if self.document_base is None: + logger.error("Document base not loaded!") + self.signals.error.emit(Exception("Document base not loaded!")) + return + + # load default matching phase + self.signals.status.emit("Loading matching phase...") + + # TODO: this should not be implemented here! + def find_additional_nuggets(nugget, documents): + new_nuggets = [] + for document in documents: + doc_text = document.text.lower() + nug_text = nugget.text.lower() + start = 0 + while True: + start = doc_text.find(nug_text, start) + if start == -1: + break + else: + new_nuggets.append((document, start, start + len(nug_text))) + start += len(nug_text) + return new_nuggets + + matching_phase = Pipeline( + [ + SplitAttributeNameLabelParaphraser(do_lowercase=True, splitters=[" ", "_"]), + ContextSentenceCacher(), + SBERTLabelEmbedder("SBERTBertLargeNliMeanTokensResource"), + RankingBasedMatcher( + distance=SignalsMeanDistance( + signal_identifiers=[ + "LabelEmbeddingSignal", + "TextEmbeddingSignal", + "ContextSentenceEmbeddingSignal", + "RelativePositionSignal" + ] + ), + max_num_feedback=100, + len_ranked_list=10, + max_distance=0.2, + num_random_docs=1, + sampling_mode="AT_MAX_DISTANCE_THRESHOLD", + adjust_threshold=True, + nugget_pipeline=Pipeline( + [ + ContextSentenceCacher(), + CopyNormalizer(), + OntoNotesLabelParaphraser(), + SplitAttributeNameLabelParaphraser(do_lowercase=True, splitters=[" ", "_"]), + SBERTLabelEmbedder("SBERTBertLargeNliMeanTokensResource"), + SBERTTextEmbedder("SBERTBertLargeNliMeanTokensResource"), + BERTContextSentenceEmbedder("BertLargeCasedResource"), + RelativePositionEmbedder() + ] + ), + find_additional_nuggets=find_additional_nuggets + ) + ] + ) + + matching_phase(self.document_base, self.interaction_callback, self.status_callback, + Statistics(False)) + self.signals.document_base_to_ui.emit(self.document_base) + self.signals.finished.emit(1) + except Exception as e: + logger.error(str(e)) + self.signals.error.emit(e) + raise e diff --git a/wannadb_web/worker/__init__.py b/wannadb_web/worker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/wannadb_web/worker/data.py b/wannadb_web/worker/data.py new file mode 100644 index 00000000..9947ffd6 --- /dev/null +++ b/wannadb_web/worker/data.py @@ -0,0 +1,363 @@ +import abc +import json +import pickle +from abc import abstractmethod, ABC +from dataclasses import dataclass +from typing import Any, Union, Optional +import logging + +from wannadb.data.data import DocumentBase, InformationNugget, Document, Attribute +from wannadb.data.signals import BaseSignal +from wannadb.statistics import Statistics +from wannadb_web.Redis.RedisCache import RedisCache + +logger: logging.Logger = logging.getLogger(__name__) + +@dataclass +class _BaseSignal: + identifier:str + signal="not serializable" + + def to_json(self): + return { + "identifier": self.identifier, + "signal":self.signal + } + +def convert_signal(signal: BaseSignal) -> _BaseSignal: + return _BaseSignal(signal.identifier) + +@dataclass +class _InformationNugget: + text:str + signals:dict[str,BaseSignal] + document:Document + end_char:int + start_char:int + + def to_json(self): + return { + "text": self.text, + "signals": [{"name": name, "signal": convert_signal(signal).to_json()} for name, signal in + self.signals.items()], + "document": {"name": self.document.name, "text": self.document.text}, + "end_char": str(self.end_char), + "start_char": str(self.start_char)} + + + +def convert_to_nugget(nugget: InformationNugget): + return _InformationNugget(nugget.text,nugget.signals,nugget.document,nugget.end_char,nugget.start_char) + + +@dataclass +class _InformationNuggets: + nuggets: list[InformationNugget] + + def to_json(self): + return { + str(i): convert_to_nugget(nugget).to_json() for i, nugget in enumerate(self.nuggets) + } + +def convert_to_nuggets(nuggets: list[InformationNugget]): + return _InformationNuggets(nuggets) + + +@dataclass +class _Document: + name:str + text:str + attribute_mappings = "not implemented yet" + signals:dict[str,BaseSignal] + nuggets:list[InformationNugget] + + def to_json(self): + return { + "name": self.name, + "text": self.text, + "attribute_mappings": "not implemented yet", + "signals": [{"name": name, "signal": convert_signal(signal)} for name, signal in + self.signals.items()], + "nuggets": [convert_to_nugget(nugget).to_json() for nugget in self.nuggets] + } + + +def convert_to_document(document: Document): + return _Document(document.name,document.text,document.signals,document.nuggets) + + +@dataclass +class _Attribute: + name:str + signals = "not_implemented" + + def to_json(self): + return { + "name": self.name, + "signals": self.signals + } + +def convert_to_attribute(attribute: Attribute): + return _Attribute(attribute.name) + + +@dataclass +class _DocumentBase: + attributes:list[Attribute] + nuggets:list[InformationNugget] + documents:list[Document] + + def to_json(self): + return { + "attributes": [attribute.name for attribute in self.attributes], + "nuggets": [convert_to_nugget(nugget).to_json() for nugget in self.nuggets], + "documents": [convert_to_document(document).to_json() for document in self.documents] + } + +def convert_to_document_base(document_base: DocumentBase): + return _DocumentBase(document_base.attributes,document_base.nuggets,document_base.documents) + + +class Signals: + def __init__(self, user_id: str): + self.__user_id = user_id + self.pipeline = _State("pipeline", user_id) + self.feedback = _Signal("feedback", user_id) + self.status = _State("status", user_id) + self.finished = _Signal("finished", user_id) + self.error = _Error("error", user_id) + self.document_base_to_ui = _DocumentBaseToUi("document_base_to_ui", user_id) + self.statistics = _Statistics("statistics_to_ui", user_id) + self.feedback_request_to_ui = _Feedback("feedback_request_to_ui", user_id) + self.feedback_request_from_ui = _Feedback("feedback_request_from_ui", user_id) + self.cache_db_to_ui = _Dump("cache_db_to_ui", user_id) + self.ordert_nuggets = _Nuggets("ordert_nuggets", user_id) + self.match_feedback = _MatchFeedback("match_feedback", user_id) + + def to_json(self) -> dict[str, str]: + return {"user_id": self.__user_id, + self.feedback.type: self.feedback.to_json(), + self.error.type: self.error.to_json(), + self.status.type: self.status.to_json(), + self.finished.type: self.finished.to_json(), + self.document_base_to_ui.type: self.document_base_to_ui.to_json(), + self.statistics.type: self.statistics.to_json(), + self.feedback_request_to_ui.type: self.feedback_request_to_ui.to_json(), + self.cache_db_to_ui.type: self.cache_db_to_ui.to_json(), + self.ordert_nuggets.type: self.ordert_nuggets.to_json() + } + + def reset(self): + RedisCache(self.__user_id).delete_user_space() + + +class Emitable(abc.ABC): + + def __init__(self, emitable_type: str, user_id: str): + self.type = emitable_type + self.redis = RedisCache(user_id) + + @property + def msg(self): + msg = self.redis.get(self.type) + if msg is None: + return None + return msg + + @abstractmethod + def to_json(self): + raise NotImplementedError + + @abstractmethod + def emit(self, status: Any): + raise NotImplementedError + + +@dataclass +class CustomMatchFeedback: + message = "custom-match" + document: Document + start: int + end: int + + def to_json(self): + return {"message": self.message, "document": convert_to_document(self.document).to_json(), "start": self.start, + "end": self.end} + + +@dataclass +class NuggetMatchFeedback: + message = "is-match" + nugget: InformationNugget + not_a_match: None + + def to_json(self): + return {"message": self.message, "nugget": convert_to_nugget(self.nugget).to_json(), "not_a_match": self.not_a_match} + + +@dataclass +class NoMatchFeedback: + message = "no-match-in-document" + nugget: InformationNugget + not_a_match: InformationNugget + + def to_json(self): + return {"message": self.message, "nugget": convert_to_nugget(self.nugget).to_json(), + "not_a_match": convert_to_nugget(self.not_a_match).to_json()} + + +class _MatchFeedback(Emitable): + + @property + def msg(self) -> Union[CustomMatchFeedback, NuggetMatchFeedback, NoMatchFeedback, None]: + msg = self.redis.get(self.type) + if isinstance(msg, str) and msg.startswith("{"): + m = json.loads(msg) + if "message" in m and m["message"] == "custom-match": + return CustomMatchFeedback(m["document"], m["start"], m["end"]) + elif "message" in m and m["message"] == "is-match": + return NuggetMatchFeedback(m["nugget"], None) + elif "message" in m and m["message"] == "no-match-in-document": + return NoMatchFeedback(m["nugget"], m["not_a_match"]) + return None + + def to_json(self): + if self.msg is None: + return {} + return self.msg.to_json() + + def emit(self, status: Union[CustomMatchFeedback, NuggetMatchFeedback, NoMatchFeedback, None]): + if status is None: + self.redis.delete(self.type) + return + if isinstance(status, CustomMatchFeedback): + self.redis.set(self.type, json.dumps( + {"message": status.message, "document": convert_to_document(status.document).to_json(), "start": status.start, + "end": status.end})) + elif isinstance(status, NuggetMatchFeedback): + self.redis.set(self.type, json.dumps({"message": status.message, "nugget": convert_to_nugget(status.nugget).to_json()})) + elif isinstance(status, NoMatchFeedback): + self.redis.set(self.type, json.dumps( + {"message": status.message, "nugget": convert_to_nugget(status.nugget).to_json(), + "not_a_match": convert_to_nugget(status.not_a_match).to_json()})) + else: + raise TypeError("status must be of type CustomMatchFeedback or NuggetMatchFeedback or NoMatchFeedback or None") + + +class _State(Emitable): + + def to_json(self): + if self.msg is None: + return "" + return self.msg.decode("utf-8") + + def emit(self, status: str): + self.redis.set(self.type, status) + + +class _Signal(Emitable): + + def to_json(self): + return str(self.msg) + + def emit(self, status: float): + self.redis.set(self.type, str(status)) + + +class _Error(Emitable): + + def to_json(self): + if self.msg is None: + return "" + return self.msg.decode("utf-8") + + def emit(self, exception: BaseException): + self.redis.set(self.type, str(exception)) + + +class _Nuggets(Emitable): + + @property + def msg(self) -> Optional[list[InformationNugget]]: + msg = self.redis.get(self.type) + if msg is None: + return None + if isinstance(msg,bytes): + return pickle.loads(msg) + else: + raise TypeError("msg is not bytes") + + + def to_json(self): + if self.msg is None: + return {} + return convert_to_nuggets(self.msg).to_json() + + def emit(self, status: list[InformationNugget]): + logger.info("emitting Nuggets") + print("emitting Nuggets") + b:bytes = pickle.dumps(status) + if isinstance(b,bytes): + self.redis.set(self.type, b) + elif len(status)< 2: + raise TypeError("status smaller than 2") + else: + raise TypeError("b is not bytes") + + +class _DocumentBaseToUi(Emitable): + + @property + def msg(self) -> Optional[DocumentBase]: + msg = self.redis.get(self.type) + if msg is None: + return None + if isinstance(msg,bytes): + return pickle.loads(msg) + else: + raise TypeError("msg is not bytes") + + def to_json(self): + if self.msg is None: + return {} + return convert_to_document_base(self.msg).to_json() + + def emit(self, status: DocumentBase): + self.redis.set(self.type, pickle.dumps(status)) + + +class _Statistics(Emitable): + + @property + def msg(self): + return "not implemented" + + def to_json(self): + return Statistics(False).to_serializable() + + def emit(self, statistic: Statistics): + pass + + +class _Feedback(Emitable): + + def to_json(self): + if self.msg is None: + return {} + return json.loads(self.msg) + + def emit(self, status: dict[str, Any]): + print("Status: " + str(status)) + for key, value in status.items(): + if isinstance(value, Attribute): + status[key] = value.toJSON() + self.redis.set(self.type, json.dumps(status)) + + +class _Dump(Emitable): + + def to_json(self): + return self.msg + + def emit(self, status): + self.redis.set(self.type, json.dumps(status)) diff --git a/wannadb_web/worker/tasks.py b/wannadb_web/worker/tasks.py new file mode 100644 index 00000000..1bc01125 --- /dev/null +++ b/wannadb_web/worker/tasks.py @@ -0,0 +1,395 @@ +import logging +import pickle +import time +from typing import Optional, Any, Union + +from celery import Task + +import wannadb.resources +from wannadb.data.data import Document, Attribute, InformationNugget +from wannadb.resources import ResourceManager +from wannadb.statistics import Statistics +from wannadb_web.Redis.RedisCache import RedisCache +from wannadb_web.postgres.queries import getDocuments +from wannadb_web.worker.Web_API import WannaDB_WebAPI +from wannadb_web.worker.data import Signals, NoMatchFeedback, NuggetMatchFeedback, CustomMatchFeedback +from wannadb_web.worker.util import State + +logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") +logger = logging.getLogger() + + +class InitManager(Task): + name = "InitManager" + + def run(self, *args, **kwargs): + ResourceManager() + if wannadb.resources.MANAGER is None: + raise RuntimeError("Resource_Manager is None!") + manager = pickle.dumps(wannadb.resources.MANAGER) + RedisCache("0").set("manager", manager) + + +class BaseTask(Task): + name = "BaseTask" + _signals: Optional[Signals] = None + _redis_client: Optional[RedisCache] = None + + def __init__(self): + super().__init__() + + def run(self, *args, **kwargs): + raise NotImplementedError("BaseTask is abstract") + + @staticmethod + def load(): + if wannadb.resources.MANAGER is None: + wannadb.resources.ResourceManager() + BaseTask.load() + return + logging.info("loaded") + + def update(self, + state: State, + meta: Optional[dict[str, Any]] = None, + ) -> None: + super().update_state(state=state.value, meta=meta) + + def update_state(self, + task_id: Optional[str] = None, + state: Optional[str] = None, + meta: Any = None, + **kwargs: Any + ) -> None: + raise NotImplementedError("user update() instead") + + def get_new_input(self): + if self._redis_client is None: + raise RuntimeError("self._redis_client is None!") + _input = self._redis_client.get("input") + if _input is not None: + pass + + return _input + + +class TestTask(BaseTask): + name = "TestTask" + + def run(self, *args, **kwargs): + super().run() + self.update(state=State.PENDING) + while True: + _input = self.get_new_input() + if _input is not None: + print(_input) + self.update(state=State.SUCCESS, meta={"msg": _input}) + time.sleep(2) + self.update(state=State.WAITING, meta={"msg": "waiting"}) + time.sleep(2) + + +class CreateDocumentBase(BaseTask): + name = "CreateDocumentBase" + + def run(self, user_id: int, document_ids: list[int], attributes_strings: list[str], statistics_dump: bytes, + base_name: str, organisation_id: int): + self.load() + attributes: list[Attribute] = [] + statistics: Statistics = pickle.loads(statistics_dump) + for attribute_string in attributes_strings: + if attribute_string == "": + logger.error("Attribute names cannot be empty!") + raise Exception("Attribute names cannot be empty!") + if attribute_string in [attribute.name for attribute in attributes]: + logger.error("Attribute names must be unique!") + raise Exception("Attribute names must be unique!") + attributes.append(Attribute(attribute_string)) + + """ + init api + """ + api = WannaDB_WebAPI(user_id, base_name, organisation_id) + + """ + Creating document base + """ + if not isinstance(attributes[0], Attribute): + self.update(State.ERROR) + raise Exception("Invalid attributes") + + if not isinstance(statistics, Statistics): + self.update(State.ERROR) + raise Exception("Invalid statistics") + + docs = getDocuments(document_ids, user_id) + if docs[0] is tuple[None,None]: + raise Exception(f"user with user id:{user_id} has no document with the document_ids: {document_ids}") + + self.update(State.PENDING) + documents: list[Document] = [] + if docs: + for doc in docs: + name = doc[0] + text = doc[1] + if name is None: + raise Exception("Document Name is none") + if text is None: + raise Exception("Document text is none") + documents.append(Document(name, text)) + + else: + self.update(State.ERROR) + raise Exception("No documents found") + + api.create_document_base(documents, attributes, statistics) + + api.save_document_base_to_bson() + if api.signals.error.msg is None: + api.update_document_base_to_bson() + self.update(State.SUCCESS) + return self + self.update(State.ERROR) + return self + + +class DocumentBaseLoad(BaseTask): + name = "DocumentBaseLoad" + + def run(self, user_id: int, base_name: str, organisation_id: int): + self.load() + api = WannaDB_WebAPI(user_id, base_name, organisation_id) + api.load_document_base_from_bson() + # self.update(State.SUCCESS) + # return self + if api.signals.error.msg is None: + self.update(State.SUCCESS) + return self + self.update(State.ERROR) + return self + + +class DocumentBaseAddAttributes(BaseTask): + name = "DocumentBaseAddAttributes" + + def run(self, user_id: int, attributes_strings: list[str], base_name: str, organisation_id: int): + self.load() + attributes: list[Attribute] = [] + + for attribute_string in attributes_strings: + if attribute_string == "": + logger.error("Attribute names cannot be empty!") + raise Exception("Attribute names cannot be empty!") + if attribute_string in [attribute.name for attribute in attributes]: + logger.error("Attribute names must be unique!") + raise Exception("Attribute names must be unique!") + attributes.append(Attribute(attribute_string)) + + api = WannaDB_WebAPI(user_id, base_name, organisation_id) + api.load_document_base_from_bson() + api.add_attributes(attributes) + if api.signals.error.msg is None: + api.update_document_base_to_bson() + self.update(State.SUCCESS) + return self + self.update(State.ERROR) + return self + + +class DocumentBaseUpdateAttributes(BaseTask): + name = "DocumentBaseAddAttributes" + + def run(self, user_id: int, attributes_strings: list[str], base_name: str, organisation_id: int): + self.load() + attributes: list[Attribute] = [] + + for attribute_string in attributes_strings: + if attribute_string == "": + logger.error("Attribute names cannot be empty!") + raise Exception("Attribute names cannot be empty!") + if attribute_string in [attribute.name for attribute in attributes]: + logger.error("Attribute names must be unique!") + raise Exception("Attribute names must be unique!") + attributes.append(Attribute(attribute_string)) + + api = WannaDB_WebAPI(user_id, base_name, organisation_id) + api.load_document_base_from_bson() + api.update_attributes(attributes) + if api.signals.error.msg is None: + api.update_document_base_to_bson() + self.update(State.SUCCESS) + return self + self.update(State.ERROR) + return self + + +class DocumentBaseRemoveAttributes(BaseTask): + name = "DocumentBaseRemoveAttributes" + + def run(self, user_id: int, attributes_strings: list[str], base_name: str, organisation_id: int): + self.load() + attributes: list[Attribute] = [] + + for attribute_string in attributes_strings: + if attribute_string == "": + logger.error("Attribute names cannot be empty!") + raise Exception("Attribute names cannot be empty!") + if attribute_string in [attribute.name for attribute in attributes]: + logger.error("Attribute names must be unique!") + raise Exception("Attribute names must be unique!") + attributes.append(Attribute(attribute_string)) + + api = WannaDB_WebAPI(user_id, base_name, organisation_id) + api.load_document_base_from_bson() + api.remove_attributes(attributes) + if api.signals.error.msg is None: + api.update_document_base_to_bson() + self.update(State.SUCCESS) + return self + self.update(State.ERROR) + return self + + +class DocumentBaseForgetMatches(BaseTask): + name = "DocumentBaseForgetMatches" + + def run(self, user_id: int, attributes_strings: list[str], base_name: str, organisation_id: int): + self.load() + attributes: list[Attribute] = [] + + for attribute_string in attributes_strings: + if attribute_string == "": + logger.error("Attribute names cannot be empty!") + raise Exception("Attribute names cannot be empty!") + if attribute_string in [attribute.name for attribute in attributes]: + logger.error("Attribute names must be unique!") + raise Exception("Attribute names must be unique!") + attributes.append(Attribute(attribute_string)) + + api = WannaDB_WebAPI(user_id, base_name, organisation_id) + api.load_document_base_from_bson() + api.forget_matches() + if api.signals.error.msg is None: + api.update_document_base_to_bson() + self.update(State.SUCCESS) + return self + self.update(State.ERROR) + return self + + +class DocumentBaseForgetMatchesForAttribute(BaseTask): + name = "DocumentBaseForgetMatches" + + def run(self, user_id: int, attribute_string: str, base_name: str, organisation_id: int): + self.load() + + attribute = (Attribute(attribute_string)) + + api = WannaDB_WebAPI(user_id, base_name, organisation_id) + api.load_document_base_from_bson() + api.forget_matches_for_attribute(attribute) + if api.signals.error.msg is None: + api.update_document_base_to_bson() + self.update(State.SUCCESS) + return self + self.update(State.ERROR) + return self + + +class DocumentBaseInteractiveTablePopulation(BaseTask): + name = "DocumentBaseInteractiveTablePopulation" + + def run(self, user_id: int, base_name: str, organisation_id: int): + self._signals = Signals(str(user_id)) + self._redis_client = RedisCache(str(user_id)) + self.load() + + api = WannaDB_WebAPI(user_id, base_name, organisation_id) + api.load_document_base_from_bson() + api.interactive_table_population() + if api.signals.error.msg is None: + api.update_document_base_to_bson() + self.update(State.SUCCESS) + return self + + +class DocumentBaseGetOrderedNuggets(BaseTask): + name = "DocumentBaseGetOrderedNuggets" + + def run(self, user_id: int, base_name: str, organisation_id: int, document_name: str, document_content: str): + self._signals = Signals(str(user_id)) + self._redis_client = RedisCache(str(user_id)) + self.load() + + api = WannaDB_WebAPI(user_id, base_name, organisation_id) + api.load_document_base_from_bson() + # api.get_ordert_nuggets(document_id) + api.get_ordered_nuggets_by_doc_name(document_name, document_content) + # no need to update the document base + self.update(State.SUCCESS) + return self + + +class DocumentBaseConfirmNugget(BaseTask): + name = "DocumentBaseConfirmNugget" + + def run(self, user_id: int, base_name: str, organisation_id: int, + document_name: str, document_text: str, nugget: Union[str, InformationNugget], + start_index: Union[int, None], end_index: Union[int, None], interactive_call_task_id: str): + """ + :param user_id: user id + :param base_name: name of base document + :param organisation_id: organisation id of the document base + :param document_name: name of the document + :param document_text: text of the document + :param nugget: the Nugget that gets confirmed + :param start_index: start of the nugget in the document (optional) if start and end is None the nugget is not in the document + :param end_index: end of the nugget in the document (optional) if start and end is None the nugget is not in the document + :param interactive_call_task_id: the same task id that's used for interactive call + """ + self._signals = Signals(str(user_id)) + self._redis_client = RedisCache(str(user_id)) + self.load() + + document = Document(document_name, document_text) + if start_index is None and end_index is None and isinstance(nugget, InformationNugget): + self._signals.match_feedback.emit(no_match(nugget)) + else: + self._signals.match_feedback.emit(match_feedback(nugget, document, start_index, end_index)) + # no need to update the document base the doc will be saved in the interactive call + self.update(State.SUCCESS) + return self + + +def nugget_exist(nugget: str, document: Document, start_index: int, end_index: int): + print("start: ", start_index, "end: ", end_index) + try: + print("doc "+document.text[start_index:end_index]) + print("nug "+nugget) + if document.text[start_index:end_index] == nugget: + return True + except IndexError: + logger.error("Nugget does not exist in the given Text") + raise Exception("Nugget does not exist in the given Text") + logger.error("Nugget does not exist in the given Text") + raise Exception("Nugget does not exist in the given Text") + + +def match_feedback(nugget: Union[str, InformationNugget], document: Document, + start_index: Optional[int] = None, end_index: Optional[int] = None) -> Union[NuggetMatchFeedback, CustomMatchFeedback]: + logger.debug("match_feedback") + if isinstance(nugget, str): + if document is None: + logger.error("The document is missing in document base") + raise Exception("The document is missing in document base") + if start_index is None or end_index is None: + logger.error("Start-index or end-index are missing to find the custom nugget") + raise Exception("Start-index or end-index are missing to find the custom nugget") + return CustomMatchFeedback(document, start_index, end_index) + if isinstance(nugget, InformationNugget): + return NuggetMatchFeedback(nugget, None) + raise Exception("Invalid nugget type") + + +def no_match(nugget: InformationNugget) -> NoMatchFeedback: + return NoMatchFeedback(nugget, nugget) diff --git a/wannadb_web/worker/util.py b/wannadb_web/worker/util.py new file mode 100644 index 00000000..48278c6a --- /dev/null +++ b/wannadb_web/worker/util.py @@ -0,0 +1,10 @@ +import enum + + +class State(enum.Enum): + STARTED = 'STARTED' + WAITING = 'WAITING' + PENDING = 'PENDING' + SUCCESS = 'SUCCESS' + ERROR = 'ERROR' +