Skip to content

Worker

worker

engine

Engine

Bases: ABC

Source code in src/qgis_server_light/worker/engine.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class Engine(ABC):
    def __init__(
        self,
        context: EngineContext,
        runner_plugins: list[str],
        svg_paths: Optional[List[str]] = None,
        log_level=logging.WARNING,
    ):
        self.qgis = Qgis(svg_paths, log_level)
        self.context = context
        self.layer_cache: dict[Any, Any] = {}
        self.available_runner_classes: dict[str, Type[Runner]] = {}
        self.available_runner_classes_by_job_info: dict[str, Type[Runner]] = {}
        self.available_job_info_classes: dict[str, Type[QslJobInfoParameter]] = {}
        self._load_runner_plugins(runner_plugins)
        logging.debug(self.available_runner_classes)
        logging.debug(self.available_runner_classes_by_job_info)
        logging.debug(self.available_job_info_classes)
        self.info = self._initialize_infos()

    def __del__(self):
        self.qgis.exitQgis()

    def _load_runner_plugins(self, worker_plugins: list[str]):
        for path in worker_plugins:
            loaded_class = self._load_runner_class(path)
            if loaded_class is not None:
                self.available_runner_classes[path] = loaded_class
                self.available_runner_classes_by_job_info[
                    loaded_class.job_info_class.__name__
                ] = loaded_class
                self.available_job_info_classes[
                    loaded_class.job_info_class.__name__
                ] = loaded_class.job_info_class

    @staticmethod
    def _load_runner_class(path: str) -> Type[Runner] | None:
        """
        Loads a class dynamically at runtime, like:
        "mypackage.mymodule.MyClass"
        """

        module_path, class_name = path.rsplit(".", 1)
        module = importlib.import_module(module_path)
        cls = getattr(module, class_name, None)

        # Ensure the class was loaded correctly
        if cls is None:
            raise ImportError(
                f"Class '{class_name}' not found in module '{module_path}'."
            )
        if not inspect.isclass(cls):
            raise TypeError(f"Passed '{class_name}' is not a class.")

        if not issubclass(cls, Runner):
            raise TypeError(
                f"{cls.__name__} is not a plugin as expected (each plugin has to inherit from qgis_server_light.worker.job.common.Job)."
            )

        return cls

    def _initialize_infos(self):
        worker_info = EngineInfo(
            id=str(uuid.uuid4()),
            qgis_info=QgisInfo(
                version=version(),
                version_name=version_name(),
                path=self.qgis.prefixPath(),
            ),
            status=Status.STARTING,
            started=datetime.datetime.now().timestamp(),
        )
        logging.debug(json.dumps(asdict(worker_info), indent=2))
        return worker_info

    def runner_plugin_by_job_info(self, job_info: QslJobInfoParameter) -> Type[Runner]:
        """
        Here we decide which plugin we load dynamically out of the available ones.

        Args:
            job_info: Is the parameter instance we check the available worker classes and there the
                job_info_class at each.

        Returns:
            The selected runner class
        """
        try:
            return self.available_runner_classes_by_job_info[
                job_info.__class__.__name__
            ]
        except KeyError:
            raise RuntimeError(f"Type {type(job_info)} not supported")

    def process(self, job_info: QslJobInfoParameter) -> JobResult:
        runner_class = self.runner_plugin_by_job_info(job_info)
        runner = runner_class(
            self.qgis,
            JobContext(self.context.base_path),
            job_info,
            layer_cache=self.layer_cache,
        )
        return runner.run()

    @property
    def status(self):
        return self.info.status.value

    def set_waiting(self):
        self.info.status = Status.WAITING

    def set_crashed(self):
        self.info.status = Status.CRASHED

    def set_processing(self):
        self.info.status = Status.PROCESSING
available_job_info_classes: dict[str, Type[QslJobInfoParameter]] = {} instance-attribute
available_runner_classes: dict[str, Type[Runner]] = {} instance-attribute
available_runner_classes_by_job_info: dict[str, Type[Runner]] = {} instance-attribute
context = context instance-attribute
info = self._initialize_infos() instance-attribute
layer_cache: dict[Any, Any] = {} instance-attribute
qgis = Qgis(svg_paths, log_level) instance-attribute
status property
__del__()
Source code in src/qgis_server_light/worker/engine.py
48
49
def __del__(self):
    self.qgis.exitQgis()
__init__(context: EngineContext, runner_plugins: list[str], svg_paths: Optional[List[str]] = None, log_level=logging.WARNING)
Source code in src/qgis_server_light/worker/engine.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    context: EngineContext,
    runner_plugins: list[str],
    svg_paths: Optional[List[str]] = None,
    log_level=logging.WARNING,
):
    self.qgis = Qgis(svg_paths, log_level)
    self.context = context
    self.layer_cache: dict[Any, Any] = {}
    self.available_runner_classes: dict[str, Type[Runner]] = {}
    self.available_runner_classes_by_job_info: dict[str, Type[Runner]] = {}
    self.available_job_info_classes: dict[str, Type[QslJobInfoParameter]] = {}
    self._load_runner_plugins(runner_plugins)
    logging.debug(self.available_runner_classes)
    logging.debug(self.available_runner_classes_by_job_info)
    logging.debug(self.available_job_info_classes)
    self.info = self._initialize_infos()
process(job_info: QslJobInfoParameter) -> JobResult
Source code in src/qgis_server_light/worker/engine.py
121
122
123
124
125
126
127
128
129
def process(self, job_info: QslJobInfoParameter) -> JobResult:
    runner_class = self.runner_plugin_by_job_info(job_info)
    runner = runner_class(
        self.qgis,
        JobContext(self.context.base_path),
        job_info,
        layer_cache=self.layer_cache,
    )
    return runner.run()
runner_plugin_by_job_info(job_info: QslJobInfoParameter) -> Type[Runner]

Here we decide which plugin we load dynamically out of the available ones.

Parameters:

  • job_info (QslJobInfoParameter) –

    Is the parameter instance we check the available worker classes and there the job_info_class at each.

Returns:

  • Type[Runner]

    The selected runner class

Source code in src/qgis_server_light/worker/engine.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def runner_plugin_by_job_info(self, job_info: QslJobInfoParameter) -> Type[Runner]:
    """
    Here we decide which plugin we load dynamically out of the available ones.

    Args:
        job_info: Is the parameter instance we check the available worker classes and there the
            job_info_class at each.

    Returns:
        The selected runner class
    """
    try:
        return self.available_runner_classes_by_job_info[
            job_info.__class__.__name__
        ]
    except KeyError:
        raise RuntimeError(f"Type {type(job_info)} not supported")
set_crashed()
Source code in src/qgis_server_light/worker/engine.py
138
139
def set_crashed(self):
    self.info.status = Status.CRASHED
set_processing()
Source code in src/qgis_server_light/worker/engine.py
141
142
def set_processing(self):
    self.info.status = Status.PROCESSING
set_waiting()
Source code in src/qgis_server_light/worker/engine.py
135
136
def set_waiting(self):
    self.info.status = Status.WAITING

EngineContext dataclass

Source code in src/qgis_server_light/worker/engine.py
23
24
25
@dataclass
class EngineContext:
    base_path: Union[str, pathlib.Path]
base_path: Union[str, pathlib.Path] instance-attribute
__init__(base_path: Union[str, pathlib.Path]) -> None

image_utils

qgis

CredentialsHelper

Bases: QgsCredentials

Source code in src/qgis_server_light/worker/qgis.py
 9
10
11
12
13
14
15
16
17
18
19
class CredentialsHelper(QgsCredentials):
    def __init__(self):
        super().__init__()
        self.setInstance(self)

    def request(self, realm, username, password, message):
        logging.warning(message)
        return True, None, None

    def requestMasterPassword(self, password, stored):
        logging.warning("Master password requested")
__init__()
Source code in src/qgis_server_light/worker/qgis.py
10
11
12
def __init__(self):
    super().__init__()
    self.setInstance(self)
request(realm, username, password, message)
Source code in src/qgis_server_light/worker/qgis.py
14
15
16
def request(self, realm, username, password, message):
    logging.warning(message)
    return True, None, None
requestMasterPassword(password, stored)
Source code in src/qgis_server_light/worker/qgis.py
18
19
def requestMasterPassword(self, password, stored):
    logging.warning("Master password requested")

Qgis(svg_paths: Optional[List[str]], log_level)

Source code in src/qgis_server_light/worker/qgis.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def Qgis(svg_paths: Optional[List[str]], log_level):
    os.environ["QT_QPA_PLATFORM"] = "offscreen"
    qgs = QgsApplication([], False)
    qgs.initQgis()
    if svg_paths:
        _svg_paths = qgs.svgPaths()
        # we do fast set algebra to always have unique list of paths
        # https://docs.python.org/3/library/stdtypes.html#frozenset.union
        qgs.setSvgPaths(list(set(_svg_paths) | set(svg_paths)))
    logging.debug(f"Application Path: {qgs.prefixPath()}")
    logging.info(f"QGIS Version {Qgis_.version()}")

    if log_level == logging.DEBUG:
        logging.debug("QGIS Debugging enabled")

        def write_log_message(message, tag, level):
            logging.debug(f"{tag}({level}): {message}")

        QgsApplication.messageLog().messageReceived.connect(write_log_message)

        qgs.credentialsHelper = CredentialsHelper()

    return qgs

version() -> int

Source code in src/qgis_server_light/worker/qgis.py
47
48
def version() -> int:
    return Qgis_.versionInt()

version_name() -> str

Source code in src/qgis_server_light/worker/qgis.py
51
52
def version_name() -> str:
    return Qgis_.version()

qgis_type_serializer

QDateConverter

Bases: Converter

Source code in src/qgis_server_light/worker/qgis_type_serializer.py
10
11
12
13
14
15
16
17
18
19
20
class QDateConverter(Converter):
    format = "yyyy-MM-dd"

    def deserialize(self, value: str, **kwargs: Any) -> QDate:
        return QDate.fromString(value, self.format)

    def serialize(self, value: QDate, **kwargs: Any) -> Optional[str]:
        if value:
            return value.toString(self.format)
        else:
            return None
format = 'yyyy-MM-dd' class-attribute instance-attribute
deserialize(value: str, **kwargs: Any) -> QDate
Source code in src/qgis_server_light/worker/qgis_type_serializer.py
13
14
def deserialize(self, value: str, **kwargs: Any) -> QDate:
    return QDate.fromString(value, self.format)
serialize(value: QDate, **kwargs: Any) -> Optional[str]
Source code in src/qgis_server_light/worker/qgis_type_serializer.py
16
17
18
19
20
def serialize(self, value: QDate, **kwargs: Any) -> Optional[str]:
    if value:
        return value.toString(self.format)
    else:
        return None

register_converters_at_runtime()

Source code in src/qgis_server_light/worker/qgis_type_serializer.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@contextmanager
def register_converters_at_runtime():
    def register(custom_type, converter_instance, registered_types):
        converter.register_converter(custom_type, converter_instance)
        registered_types.append(custom_type)

    registered = []
    try:
        register(QDate, QDateConverter(), registered)
        # register further types here
        yield
    finally:
        for tp in registered:
            try:
                converter.unregister_converter(tp)
            except KeyError:
                pass

redis

DEFAULT_DATA_ROOT = '/io/data' module-attribute

DEFAULT_SVG_PATH = '/io/svg' module-attribute

RedisEngine

Bases: Engine

Source code in src/qgis_server_light/worker/redis.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class RedisEngine(Engine):
    def __init__(
        self,
        context: EngineContext,
        runner_plugins: list[str],
        svg_paths: Optional[List] = None,
    ) -> None:
        self.boot_start = time.time()
        super().__init__(context, runner_plugins, svg_paths)
        self.shutdown = False
        self.retry_wait = 0.01
        self.max_retries = 11
        self.info_expire: int = 300

    def retry_handling_with_jitter(self, count: int):
        if count <= self.max_retries:
            sleep = math.pow(2, count) * self.retry_wait
            logging.warning(f"Retrying in {sleep} seconds...")
            time.sleep(sleep)
        else:
            self.exit_connection_error()

    @staticmethod
    def exit_connection_error():
        logging.error("Shutting down => now connection to Redis")
        exit(404)

    def exit_gracefully(self, signum, frame):
        logging.error(f"Received: {signum}")
        self.shutdown = True
        exit(0)

    @staticmethod
    def set_job_runtime_status(
        job_id: str,
        pipeline: Pipeline,
        status: str,
        start_time: float,
    ):
        duration = time.time() - start_time
        ts = datetime.datetime.now().isoformat()
        pipeline.hset(f"job:{job_id}", RedisQueue.job_status_key, status)
        pipeline.hset(
            f"job:{job_id}",
            f"{RedisQueue.job_timestamp_key}.{status}",
            ts,
        )
        pipeline.hset(f"job:{job_id}", RedisQueue.job_last_update_key, ts)
        pipeline.hset(f"job:{job_id}", RedisQueue.job_duration_key, str(duration))
        pipeline.execute()

    def heartbeat(self, client: Redis) -> datetime.datetime:
        now = datetime.datetime.now()
        client.hset(f"worker:{self.info.id}", "last_seen", now.isoformat())
        return now

    def register_worker(self, client: Redis):
        # writing worker info to redis
        client.hset(
            f"worker:{self.info.id}", "info", JsonSerializer().render(self.info)
        )
        # set timer to automatically remove worker info from list
        client.expire(f"worker:{self.info.id}", self.info_expire)
        # add worker to list of workers in redis
        client.sadd("workers", self.info.id)
        self.heartbeat(client)
        logging.info("Worker was registered in Redis")

    def retry_connection(self, redis_url: str, count: int):
        logging.warning(f"Could not connect to redis on `{redis_url}`.")
        self.retry_handling_with_jitter(count)

    def start(self, redis_url) -> Redis:
        signal.signal(signal.SIGINT, self.exit_gracefully)
        signal.signal(signal.SIGTERM, self.exit_gracefully)
        r = Redis.from_url(
            redis_url, decode_responses=True, retry=Retry(ExponentialBackoff(), 0)
        )
        retry_count = 0
        while True:
            try:
                retry_count += 1
                logging.debug(f"Looking up redis: {redis_url}")
                r.ping()
            except RedisConnectionError as e:
                logging.debug(f"Connection on Redis not successful => {e}")
                self.retry_connection(redis_url, retry_count)
            else:
                break
        logging.info(f"Connection to redis on `{redis_url}`successful.")
        return r

    def run(self, redis_url):
        r = self.start(redis_url)
        p = r.pipeline()
        expire_limit = self.info_expire * 0.95
        retry_count = 0
        while not self.shutdown:
            try:
                self.register_worker(r)
                logging.debug("Waiting for jobs")
                self.set_waiting()
                # this is blocking the loop until a job is found in the redis
                # list/queue, if there is one we take it, we have a timeout here, to
                # renew the workers heartbeat in redis
                result = r.blpop([RedisQueue.job_queue_name], int(expire_limit))
                if result is None:
                    now = self.heartbeat(r)
                    logging.debug(
                        f"Worker heartbeat renewed in queue {now.isoformat()}"
                    )
                    r.expire(f"worker:{self.info.id}", self.info_expire)
                    continue
                else:
                    _, job_id = result
            except RedisConnectionError:
                retry_count += 1
                self.retry_connection(redis_url, retry_count)
                continue
            start_time = time.time()
            try:
                # we inform, that the job is running.
                self.set_job_runtime_status(job_id, p, Status.RUNNING.value, start_time)

                job_info_json = r.hget(f"job:{job_id}", RedisQueue.job_info_key)
                job_info_class_name = r.hget(
                    f"job:{job_id}", RedisQueue.job_info_type_key
                )
                job_info_class = self.available_job_info_classes[job_info_class_name]
                job_info = JsonParser().from_string(job_info_json, job_info_class)
                result: JobResult = self.process(job_info)
                result.worker_id = self.info.id
                result.worker_host_name = socket.gethostname()
                data = pickle.dumps(result)

                # we inform, that the job was finished successful
                self.set_job_runtime_status(job_id, p, Status.SUCCESS.value, start_time)

                # we publish the result to any subscribers
                p.publish(f"{RedisQueue.job_channel_name}:{job_id}", data)

            except Exception as e:
                # preparation of the result, containing error information
                result = JobResult(id=job_id, data=str(e), content_type="text")
                result.worker_id = self.info.id
                result.worker_host_name = socket.gethostname()
                data = pickle.dumps(result)

                # we inform, that the job has failed with errors
                # self.set_job_runtime_status(job_id, p, Status.FAILURE.value,
                # start_time)

                # we publish the result to any subscribers
                p.publish(f"{RedisQueue.job_channel_name}:{job_id}", data)

                # we provide error information to the logs
                logging.error(e, exc_info=True)
            finally:
                p.execute()
            logging.debug(f"Job duration: {time.time() - start_time}")
        exit(0)
boot_start = time.time() instance-attribute
info_expire: int = 300 instance-attribute
max_retries = 11 instance-attribute
retry_wait = 0.01 instance-attribute
shutdown = False instance-attribute
__init__(context: EngineContext, runner_plugins: list[str], svg_paths: Optional[List] = None) -> None
Source code in src/qgis_server_light/worker/redis.py
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(
    self,
    context: EngineContext,
    runner_plugins: list[str],
    svg_paths: Optional[List] = None,
) -> None:
    self.boot_start = time.time()
    super().__init__(context, runner_plugins, svg_paths)
    self.shutdown = False
    self.retry_wait = 0.01
    self.max_retries = 11
    self.info_expire: int = 300
exit_connection_error() staticmethod
Source code in src/qgis_server_light/worker/redis.py
49
50
51
52
@staticmethod
def exit_connection_error():
    logging.error("Shutting down => now connection to Redis")
    exit(404)
exit_gracefully(signum, frame)
Source code in src/qgis_server_light/worker/redis.py
54
55
56
57
def exit_gracefully(self, signum, frame):
    logging.error(f"Received: {signum}")
    self.shutdown = True
    exit(0)
heartbeat(client: Redis) -> datetime.datetime
Source code in src/qgis_server_light/worker/redis.py
78
79
80
81
def heartbeat(self, client: Redis) -> datetime.datetime:
    now = datetime.datetime.now()
    client.hset(f"worker:{self.info.id}", "last_seen", now.isoformat())
    return now
register_worker(client: Redis)
Source code in src/qgis_server_light/worker/redis.py
83
84
85
86
87
88
89
90
91
92
93
def register_worker(self, client: Redis):
    # writing worker info to redis
    client.hset(
        f"worker:{self.info.id}", "info", JsonSerializer().render(self.info)
    )
    # set timer to automatically remove worker info from list
    client.expire(f"worker:{self.info.id}", self.info_expire)
    # add worker to list of workers in redis
    client.sadd("workers", self.info.id)
    self.heartbeat(client)
    logging.info("Worker was registered in Redis")
retry_connection(redis_url: str, count: int)
Source code in src/qgis_server_light/worker/redis.py
95
96
97
def retry_connection(self, redis_url: str, count: int):
    logging.warning(f"Could not connect to redis on `{redis_url}`.")
    self.retry_handling_with_jitter(count)
retry_handling_with_jitter(count: int)
Source code in src/qgis_server_light/worker/redis.py
41
42
43
44
45
46
47
def retry_handling_with_jitter(self, count: int):
    if count <= self.max_retries:
        sleep = math.pow(2, count) * self.retry_wait
        logging.warning(f"Retrying in {sleep} seconds...")
        time.sleep(sleep)
    else:
        self.exit_connection_error()
run(redis_url)
Source code in src/qgis_server_light/worker/redis.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def run(self, redis_url):
    r = self.start(redis_url)
    p = r.pipeline()
    expire_limit = self.info_expire * 0.95
    retry_count = 0
    while not self.shutdown:
        try:
            self.register_worker(r)
            logging.debug("Waiting for jobs")
            self.set_waiting()
            # this is blocking the loop until a job is found in the redis
            # list/queue, if there is one we take it, we have a timeout here, to
            # renew the workers heartbeat in redis
            result = r.blpop([RedisQueue.job_queue_name], int(expire_limit))
            if result is None:
                now = self.heartbeat(r)
                logging.debug(
                    f"Worker heartbeat renewed in queue {now.isoformat()}"
                )
                r.expire(f"worker:{self.info.id}", self.info_expire)
                continue
            else:
                _, job_id = result
        except RedisConnectionError:
            retry_count += 1
            self.retry_connection(redis_url, retry_count)
            continue
        start_time = time.time()
        try:
            # we inform, that the job is running.
            self.set_job_runtime_status(job_id, p, Status.RUNNING.value, start_time)

            job_info_json = r.hget(f"job:{job_id}", RedisQueue.job_info_key)
            job_info_class_name = r.hget(
                f"job:{job_id}", RedisQueue.job_info_type_key
            )
            job_info_class = self.available_job_info_classes[job_info_class_name]
            job_info = JsonParser().from_string(job_info_json, job_info_class)
            result: JobResult = self.process(job_info)
            result.worker_id = self.info.id
            result.worker_host_name = socket.gethostname()
            data = pickle.dumps(result)

            # we inform, that the job was finished successful
            self.set_job_runtime_status(job_id, p, Status.SUCCESS.value, start_time)

            # we publish the result to any subscribers
            p.publish(f"{RedisQueue.job_channel_name}:{job_id}", data)

        except Exception as e:
            # preparation of the result, containing error information
            result = JobResult(id=job_id, data=str(e), content_type="text")
            result.worker_id = self.info.id
            result.worker_host_name = socket.gethostname()
            data = pickle.dumps(result)

            # we inform, that the job has failed with errors
            # self.set_job_runtime_status(job_id, p, Status.FAILURE.value,
            # start_time)

            # we publish the result to any subscribers
            p.publish(f"{RedisQueue.job_channel_name}:{job_id}", data)

            # we provide error information to the logs
            logging.error(e, exc_info=True)
        finally:
            p.execute()
        logging.debug(f"Job duration: {time.time() - start_time}")
    exit(0)
set_job_runtime_status(job_id: str, pipeline: Pipeline, status: str, start_time: float) staticmethod
Source code in src/qgis_server_light/worker/redis.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@staticmethod
def set_job_runtime_status(
    job_id: str,
    pipeline: Pipeline,
    status: str,
    start_time: float,
):
    duration = time.time() - start_time
    ts = datetime.datetime.now().isoformat()
    pipeline.hset(f"job:{job_id}", RedisQueue.job_status_key, status)
    pipeline.hset(
        f"job:{job_id}",
        f"{RedisQueue.job_timestamp_key}.{status}",
        ts,
    )
    pipeline.hset(f"job:{job_id}", RedisQueue.job_last_update_key, ts)
    pipeline.hset(f"job:{job_id}", RedisQueue.job_duration_key, str(duration))
    pipeline.execute()
start(redis_url) -> Redis
Source code in src/qgis_server_light/worker/redis.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def start(self, redis_url) -> Redis:
    signal.signal(signal.SIGINT, self.exit_gracefully)
    signal.signal(signal.SIGTERM, self.exit_gracefully)
    r = Redis.from_url(
        redis_url, decode_responses=True, retry=Retry(ExponentialBackoff(), 0)
    )
    retry_count = 0
    while True:
        try:
            retry_count += 1
            logging.debug(f"Looking up redis: {redis_url}")
            r.ping()
        except RedisConnectionError as e:
            logging.debug(f"Connection on Redis not successful => {e}")
            self.retry_connection(redis_url, retry_count)
        else:
            break
    logging.info(f"Connection to redis on `{redis_url}`successful.")
    return r

main() -> None

Source code in src/qgis_server_light/worker/redis.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def main() -> None:
    parser = argparse.ArgumentParser()

    parser.add_argument("--redis-url", type=str, help="redis url")

    parser.add_argument(
        "--log-level",
        type=str,
        help="log level (debug, info, warning or error)",
        default="info",
    )

    parser.add_argument(
        "--data-root",
        type=str,
        help=f"Absolute path to the data dir. Defaults to {DEFAULT_DATA_ROOT}",
        default=DEFAULT_DATA_ROOT,
    )

    parser.add_argument(
        "--svg-path",
        type=str,
        help=f"Absolute path to additional svg files. Multiple paths "
        f"can be separated by `:`. Defaults to {DEFAULT_SVG_PATH}",
        default=DEFAULT_SVG_PATH,
    )

    args = parser.parse_args()

    logging.basicConfig(
        level=args.log_level.upper(), format="%(asctime)s [%(levelname)s] %(message)s"
    )

    if not args.redis_url:
        raise AssertionError(
            "no redis host specified: start qgis-server-light "
            "with '--redis-url <QSL_REDIS_URL>'"
        )

    svg_paths = args.svg_path.split(":")
    engine = RedisEngine(
        EngineContext(args.data_root),
        [
            "qgis_server_light.worker.runner.render.RenderRunner",
            "qgis_server_light.worker.runner.feature.GetFeatureRunner",
            # Not fully functional yet
            # "qgis_server_light.worker.runner.feature_info.GetFeatureInfoRunner",
        ],
        svg_paths=svg_paths,
    )
    engine.run(
        args.redis_url,
    )

runner

common

JobContext dataclass
Source code in src/qgis_server_light/worker/runner/common.py
38
39
40
@dataclass
class JobContext:
    base_path: str | Path
base_path: str | Path instance-attribute
__init__(base_path: str | Path) -> None
MapRunner

Bases: Runner

Base class for any runner that interacts with a map. Not runnable by itself.

Source code in src/qgis_server_light/worker/runner/common.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
class MapRunner(Runner):
    """Base class for any runner that interacts with a map.
    Not runnable by itself.
    """

    map_layers: List[QgsMapLayer]
    vector_layer_drivers = [
        "ogr",
        "postgres",
        "spatialite",
        "mssql",
        "oracle",
        "wfs",
        "delimitedtext",
        "gpx",
        "arcgisfeatureserver",
    ]
    raster_layer_drivers = [
        "gdal",
        "wms",
        "xyz",
        "arcgismapserver",
        "wcs",
    ]
    custom_layer_drivers = ["xyzvectortiles", "mbtilesvectortiles"]
    default_style_name = "default"

    def __init__(
        self,
        qgis: QgsApplication,
        context: JobContext,
        job_info: QslJobInfoParameter,
        layer_cache: Optional[Dict] = None,
    ) -> None:
        self.qgis = qgis
        self.context = context
        self.job_info = job_info
        self.map_layers = list()
        self.layer_cache = layer_cache

    def _get_map_settings(self, layers: List[QgsMapLayer]) -> QgsMapSettings:
        """Produces a QgsMapSettings object from a set of layers"""
        expression_context_scope = QgsExpressionContextScope()
        expression_context_scope.setVariable("map_id", str(uuid.uuid4()))
        expression_context = QgsExpressionContext()
        expression_context.appendScope(expression_context_scope)
        settings = QgsMapSettings()
        settings.setExpressionContext(expression_context)

        def preprocessor(path):
            return path

        settings.pathResolver().setPathPreprocessor(preprocessor)
        settings.setOutputSize(
            QSize(int(self.job_info.job.width), int(self.job_info.job.height))
        )
        if self.job_info.job.dpi:
            settings.setOutputDpi(self.job_info.job.dpi)
        minx, miny, maxx, maxy = self.job_info.job.bbox.to_2d_list()
        bbox = QgsRectangle(float(minx), float(miny), float(maxx), float(maxy))
        settings.setExtent(bbox)
        settings.setLayers(layers)
        settings.setBackgroundColor(QColor(Qt.transparent))
        crs = self.job_info.job.crs
        destination_crs = QgsCoordinateReferenceSystem.fromOgcWmsCrs(crs)
        settings.setDestinationCrs(destination_crs)
        return settings

    def _load_style(self, qgs_layer: QgsMapLayer, job_layer_definition: QslJobLayer):
        logging.info(
            f"Preparing job_layer_definition Style: {job_layer_definition.style.name}"
        )
        style_doc = QDomDocument()
        style_xml = zlib.decompress(
            urlsafe_b64decode(job_layer_definition.style.definition)
        )
        style_doc.setContent(style_xml)
        success, _ = qgs_layer.importNamedStyle(style_doc)

        logging.info(f" ✓ Style loaded: {success}")

    def get_cache_name(self, job_layer_definition: QslJobLayer) -> str:
        """Central method to decide which name is used in the cache to
        identify a layer.
        """
        return job_layer_definition.id

    def _decide_drivers(self, job_layer_definition: QslJobLayer) -> QgsMapLayer:
        """Decides which type of layer we are dealing with and delegates initialization
        to the right method.

        Args:
            job_layer_definition: The job_layer_definition containing all
                information to initialize a QgsMapLayer.
        Returns:
            The newly created layer.
        Raises:
            LookupError: When the driver is not in the expected ranges.
        """
        if job_layer_definition.driver in self.vector_layer_drivers:
            qgs_layer = self._prepare_vector_layer(job_layer_definition)
        elif job_layer_definition.driver in self.raster_layer_drivers:
            qgs_layer = self._prepare_raster_layer(job_layer_definition)
        elif job_layer_definition.driver in self.custom_layer_drivers:
            qgs_layer = self._prepare_custom_layer(job_layer_definition)
        else:
            raise LookupError(f"Type not implemented: {job_layer_definition}")
        return qgs_layer

    def _handle_layer_cache(self, job_layer_definition: QslJobLayer) -> QgsMapLayer:
        """Checks if layer can be fetched directly from the cache or initiates the
        creation of a new layer otherwise.

        Args:
            job_layer_definition: The job_layer_definition containing all
                information to initialize a QgsMapLayer.
        Returns:
            The layer (from cache or newly created).
        """
        cache_name = self.get_cache_name(job_layer_definition)
        if self.layer_cache is not None and cache_name in self.layer_cache:
            logging.debug(
                f"Using cached job_layer_definition {job_layer_definition.name} (identifier: {cache_name})"
            )
            qgs_layer = self.layer_cache[cache_name]
        else:
            qgs_layer = self._decide_drivers(job_layer_definition)
            if qgs_layer.isValid():
                logging.debug(
                    f"Newly initialized layer {job_layer_definition.name} is valid: {qgs_layer.isValid()}"
                )
                if self.layer_cache is not None:
                    self.layer_cache[cache_name] = qgs_layer
            else:
                logging.error(qgs_layer.error().message())
                logging.error(qgs_layer.dataProvider().error().message())
                raise RuntimeError(
                    f"Newly initialized layer {job_layer_definition.name} is not valid. JobLayerDefinition: {job_layer_definition}"
                )
        return qgs_layer

    def _provide_layer(self, job_layer_definition: QslJobLayer) -> None:
        """Fetches the QGIS layer relevant for the requested job layer.

        Args:
            job_layer_definition: The job_layer_definition containing all
                information to initialize a QgsMapLayer.
        Returns:
            None
        """
        qgs_layer = self._handle_layer_cache(job_layer_definition)
        # applying the style to the job_layer_definition
        self._load_style(qgs_layer, job_layer_definition)
        self.map_layers.append(qgs_layer)

    def _handle_datasource_definition(self, job_layer_definition: QslJobLayer) -> dict:
        layer_source = json.loads(job_layer_definition.source)
        if not job_layer_definition.remote:
            # we make the relative path an absolute one with the configured base path
            layer_source["path"] = os.path.join(
                self.context.base_path,
                job_layer_definition.folder_name,
                layer_source["path"],
            )
        return layer_source

    def _decoded_layer_source_to_connection_string(
        self, driver: str, layer_source: dict
    ) -> str:
        return QgsProviderRegistry.instance().encodeUri(driver, layer_source)

    def _prepare_vector_layer(
        self, job_layer_definition: QslJobLayer
    ) -> QgsVectorLayer:
        """
        Initializes a QgsVectorLayer from a job_layer_definition.
        Args:
            job_layer_definition: The job_layer_definition definition as
                received from the runner.

        Returns:
            The QgsVectorLayer instance in case initialization went correctly.
        Raises:
            RuntimeError: In case the initialized job_layer_definition was not
                valid from QGIS point of view (mostly related to not available
                data sources).
        """

        layer_source = self._handle_datasource_definition(job_layer_definition)
        layer_source_path = self._decoded_layer_source_to_connection_string(
            job_layer_definition.driver, layer_source
        )

        # removed loadDefaultStyle=False because it seems to have no effect anymore
        options = QgsVectorLayer.LayerOptions(readExtentFromXml=False)
        options.skipCrValidation = True
        options.forceReadOnly = True

        qgs_layer = QgsVectorLayer(
            layer_source_path,
            job_layer_definition.name,
            job_layer_definition.driver,
            options,
        )
        if job_layer_definition.filter:
            if isinstance(job_layer_definition.filter, OgcFilter110):
                # TODO: This is potentially bad: We always get all features from datasource. However, QGIS
                #   does not seem to support sliding window feature filter out of the box...
                logging.info(" QslJobLayer is filtered by:")
                logging.info(job_layer_definition.filter.definition)
                filter_doc = QDomDocument()
                filter_doc.setContent(job_layer_definition.filter.definition)
                filter_expression = QgsOgcUtils.expressionFromOgcFilter(
                    filter_doc.documentElement(),
                    QgsOgcUtils.FilterVersion.FILTER_OGC_1_1,
                    qgs_layer,
                )
                existing_expression = qgs_layer.subsetString()
                if existing_expression:
                    # Combining with AND the originally defined expression always takes precedence
                    expression = f"({existing_expression}) AND ({filter_expression.expression()})"
                else:
                    expression = filter_expression.expression()
                qgs_layer.setSubsetString(expression)
        return qgs_layer

    def _prepare_custom_layer(
        self, job_layer_definition: QslJobLayer
    ) -> QgsVectorTileLayer:
        """Initializes a custom job_layer_definition"""
        layer_source = self._handle_datasource_definition(job_layer_definition)
        layer_source_path = self._decoded_layer_source_to_connection_string(
            job_layer_definition.driver, layer_source
        )
        qgs_layer = QgsVectorTileLayer(layer_source_path, job_layer_definition.name)
        return qgs_layer

    def _prepare_raster_layer(
        self, job_layer_definition: QslJobLayer
    ) -> QgsRasterLayer:
        """Initializes a raster job_layer_definition"""
        layer_source = self._handle_datasource_definition(job_layer_definition)
        layer_source_path = self._decoded_layer_source_to_connection_string(
            job_layer_definition.driver, layer_source
        )
        qgs_layer = QgsRasterLayer(
            layer_source_path,
            job_layer_definition.name,
            job_layer_definition.driver,
        )
        return qgs_layer
context = context instance-attribute
custom_layer_drivers = ['xyzvectortiles', 'mbtilesvectortiles'] class-attribute instance-attribute
default_style_name = 'default' class-attribute instance-attribute
job_info = job_info instance-attribute
layer_cache = layer_cache instance-attribute
map_layers: List[QgsMapLayer] = list() instance-attribute
qgis = qgis instance-attribute
raster_layer_drivers = ['gdal', 'wms', 'xyz', 'arcgismapserver', 'wcs'] class-attribute instance-attribute
vector_layer_drivers = ['ogr', 'postgres', 'spatialite', 'mssql', 'oracle', 'wfs', 'delimitedtext', 'gpx', 'arcgisfeatureserver'] class-attribute instance-attribute
__init__(qgis: QgsApplication, context: JobContext, job_info: QslJobInfoParameter, layer_cache: Optional[Dict] = None) -> None
Source code in src/qgis_server_light/worker/runner/common.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def __init__(
    self,
    qgis: QgsApplication,
    context: JobContext,
    job_info: QslJobInfoParameter,
    layer_cache: Optional[Dict] = None,
) -> None:
    self.qgis = qgis
    self.context = context
    self.job_info = job_info
    self.map_layers = list()
    self.layer_cache = layer_cache
get_cache_name(job_layer_definition: QslJobLayer) -> str

Central method to decide which name is used in the cache to identify a layer.

Source code in src/qgis_server_light/worker/runner/common.py
146
147
148
149
150
def get_cache_name(self, job_layer_definition: QslJobLayer) -> str:
    """Central method to decide which name is used in the cache to
    identify a layer.
    """
    return job_layer_definition.id
Runner

Bases: ABC

Source code in src/qgis_server_light/worker/runner/common.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class Runner(ABC):
    job_info_class: Type[QslJobInfoParameter]

    def __init__(
        self,
        qgis: QgsApplication,
        context: JobContext,
        job_info: QslJobInfoParameter,
        layer_cache: Optional[Dict],
    ):
        # This is an abstract base class which is not runnable itself
        raise NotImplementedError()

    def run(self):
        # This is an abstract base class which is not runnable itself
        raise NotImplementedError()

    @classmethod
    def deserialize_job_info(cls, job_info: bytes):
        return JsonParser().from_bytes(job_info, cls.job_info_class)
job_info_class: Type[QslJobInfoParameter] instance-attribute
__init__(qgis: QgsApplication, context: JobContext, job_info: QslJobInfoParameter, layer_cache: Optional[Dict])
Source code in src/qgis_server_light/worker/runner/common.py
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    qgis: QgsApplication,
    context: JobContext,
    job_info: QslJobInfoParameter,
    layer_cache: Optional[Dict],
):
    # This is an abstract base class which is not runnable itself
    raise NotImplementedError()
deserialize_job_info(job_info: bytes) classmethod
Source code in src/qgis_server_light/worker/runner/common.py
60
61
62
@classmethod
def deserialize_job_info(cls, job_info: bytes):
    return JsonParser().from_bytes(job_info, cls.job_info_class)
run()
Source code in src/qgis_server_light/worker/runner/common.py
56
57
58
def run(self):
    # This is an abstract base class which is not runnable itself
    raise NotImplementedError()

feature

GetFeatureRunner

Bases: MapRunner

Source code in src/qgis_server_light/worker/runner/feature.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class GetFeatureRunner(MapRunner):
    job_info_class = QslJobInfoFeature

    def __init__(
        self,
        qgis: QgsApplication,
        context: JobContext,
        job_info: QslJobInfoFeature,
        layer_cache: Optional[Dict] = None,
        layer_style_cache: Optional[set] = None,
    ) -> None:
        super().__init__(qgis, context, job_info, layer_cache)

    def _clean_attribute(self, attribute_value: Any, idx: int, layer: QgsVectorLayer):
        if attribute_value == NULL:
            return None
        return attribute_value

    def _clean_attributes(self, attributes, layer):
        return [
            self._clean_attribute(attr, idx, layer)
            for idx, attr in enumerate(attributes)
        ]

    def _load_style(self, qgs_layer: QgsMapLayer, job_layer_definition: QslJobLayer):
        logging.info(" ✓ Omit style loading on WFS layer operation.")

    def run(self):
        query_collection = QueryCollection()
        numbers_matched = 0
        for query in self.job_info.job.queries:
            # we need to reset this because we want always only the layers related to the current query
            self.map_layers = []
            wfs_filter = query.filter
            for job_layer_definition in query.layers:
                self._provide_layer(job_layer_definition)

            for layer in self.map_layers:
                feature_collection = FeatureCollection(layer.name())
                query_collection.feature_collections.append(feature_collection)
                if isinstance(layer, QgsVectorLayer):
                    if wfs_filter is not None and wfs_filter.definition is not None:
                        # TODO: This is potentially bad: We always get all features from datasource. However, QGIS
                        #   does not seem to support sliding window feature filter out of the box...
                        logging.info(" QslJobLayer is filtered by:")
                        logging.info(f" {wfs_filter.definition}")
                        filter_doc = QDomDocument()
                        filter_doc.setContent(wfs_filter.definition)
                        # This is not correct in the WFS 2.0 way. We apply a filter to a job_layer_definition. But WFS 2.0
                        # allows filters on multiple layers.
                        expression = QgsOgcUtils.expressionFromOgcFilter(
                            filter_doc.documentElement(),
                            QgsOgcUtils.FilterVersion.FILTER_FES_2_0,
                        )
                        logging.info(
                            f" This was transformed to the QGIS expression (valid: {expression.isValid()})"
                        )
                        logging.info(f" '{expression.dump()}'")
                        feature_request = QgsFeatureRequest(expression)
                    else:
                        feature_request = QgsFeatureRequest()
                    layer_features = list(layer.getFeatures(feature_request))
                    numbers_matched += len(layer_features)
                    logging.info(f" Found {len(layer_features)} features")
                    if self.job_info.job.count:
                        layer_features = layer_features[
                            self.job_info.job.start_index : self.job_info.job.start_index
                            + self.job_info.job.count
                        ]
                    for layer_feature in layer_features:
                        property_list = zip(
                            layer_feature.fields().names(),
                            self._clean_attributes(layer_feature.attributes(), layer),
                        )
                        feature = Feature(
                            geometry=Geometry(
                                value=bytes(layer_feature.geometry().asWkb()),
                            )
                        )
                        feature_collection.features.append(feature)
                        for name, value in property_list:
                            feature.attributes.append(Attribute(name=name, value=value))
                else:
                    raise RuntimeError(
                        f"QslJobLayer type `{layer.type().name}` of layer `{layer.shortName()}` not supported by GetFeatureInfo"
                    )
        if numbers_matched > 0:
            query_collection.numbers_matched = numbers_matched
        with register_converters_at_runtime():
            data = JsonSerializer().render(query_collection).encode()
            return JobResult(
                id=self.job_info.id,
                data=data,
                content_type="application/qgis-server-light.interface.qgis.QueryCollection",
            )
job_info_class = QslJobInfoFeature class-attribute instance-attribute
__init__(qgis: QgsApplication, context: JobContext, job_info: QslJobInfoFeature, layer_cache: Optional[Dict] = None, layer_style_cache: Optional[set] = None) -> None
Source code in src/qgis_server_light/worker/runner/feature.py
32
33
34
35
36
37
38
39
40
def __init__(
    self,
    qgis: QgsApplication,
    context: JobContext,
    job_info: QslJobInfoFeature,
    layer_cache: Optional[Dict] = None,
    layer_style_cache: Optional[set] = None,
) -> None:
    super().__init__(qgis, context, job_info, layer_cache)
run()
Source code in src/qgis_server_light/worker/runner/feature.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def run(self):
    query_collection = QueryCollection()
    numbers_matched = 0
    for query in self.job_info.job.queries:
        # we need to reset this because we want always only the layers related to the current query
        self.map_layers = []
        wfs_filter = query.filter
        for job_layer_definition in query.layers:
            self._provide_layer(job_layer_definition)

        for layer in self.map_layers:
            feature_collection = FeatureCollection(layer.name())
            query_collection.feature_collections.append(feature_collection)
            if isinstance(layer, QgsVectorLayer):
                if wfs_filter is not None and wfs_filter.definition is not None:
                    # TODO: This is potentially bad: We always get all features from datasource. However, QGIS
                    #   does not seem to support sliding window feature filter out of the box...
                    logging.info(" QslJobLayer is filtered by:")
                    logging.info(f" {wfs_filter.definition}")
                    filter_doc = QDomDocument()
                    filter_doc.setContent(wfs_filter.definition)
                    # This is not correct in the WFS 2.0 way. We apply a filter to a job_layer_definition. But WFS 2.0
                    # allows filters on multiple layers.
                    expression = QgsOgcUtils.expressionFromOgcFilter(
                        filter_doc.documentElement(),
                        QgsOgcUtils.FilterVersion.FILTER_FES_2_0,
                    )
                    logging.info(
                        f" This was transformed to the QGIS expression (valid: {expression.isValid()})"
                    )
                    logging.info(f" '{expression.dump()}'")
                    feature_request = QgsFeatureRequest(expression)
                else:
                    feature_request = QgsFeatureRequest()
                layer_features = list(layer.getFeatures(feature_request))
                numbers_matched += len(layer_features)
                logging.info(f" Found {len(layer_features)} features")
                if self.job_info.job.count:
                    layer_features = layer_features[
                        self.job_info.job.start_index : self.job_info.job.start_index
                        + self.job_info.job.count
                    ]
                for layer_feature in layer_features:
                    property_list = zip(
                        layer_feature.fields().names(),
                        self._clean_attributes(layer_feature.attributes(), layer),
                    )
                    feature = Feature(
                        geometry=Geometry(
                            value=bytes(layer_feature.geometry().asWkb()),
                        )
                    )
                    feature_collection.features.append(feature)
                    for name, value in property_list:
                        feature.attributes.append(Attribute(name=name, value=value))
            else:
                raise RuntimeError(
                    f"QslJobLayer type `{layer.type().name}` of layer `{layer.shortName()}` not supported by GetFeatureInfo"
                )
    if numbers_matched > 0:
        query_collection.numbers_matched = numbers_matched
    with register_converters_at_runtime():
        data = JsonSerializer().render(query_collection).encode()
        return JobResult(
            id=self.job_info.id,
            data=data,
            content_type="application/qgis-server-light.interface.qgis.QueryCollection",
        )

feature_info

GetFeatureInfoRunner

Bases: MapRunner

Source code in src/qgis_server_light/worker/runner/feature_info.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class GetFeatureInfoRunner(MapRunner):
    job_info_class = QslJobInfoFeatureInfo

    def __init__(
        self,
        qgis: QgsApplication,
        context: JobContext,
        job_info: QslJobInfoFeatureInfo,
        layer_cache: Optional[Dict] = None,
    ) -> None:
        super().__init__(qgis, context, job_info, layer_cache)

    def _clean_attribute(self, attribute, idx, layer):
        if attribute == NULL:
            return None
        setup = layer.editorWidgetSetup(idx)
        fieldFormatter = QgsApplication.fieldFormatterRegistry().fieldFormatter(
            setup.type()
        )
        return fieldFormatter.representValue(
            layer, idx, setup.config(), None, attribute
        )

    def _clean_attributes(self, attributes, layer):
        return [
            self._clean_attribute(attr, idx, layer)
            for idx, attr in enumerate(attributes)
        ]

    def run(self):
        for job_layer_definition in self.job_info.job.layers:
            self._provide_layer(job_layer_definition)
        map_settings = self._get_map_settings(self.map_layers)
        # Estimate queryable bbox (2mm)
        map_to_pixel = map_settings.mapToPixel()
        map_point = map_to_pixel.toMapCoordinates(
            self.job_info.job.x, self.job_info.job.y
        )
        # Create identifiable bbox in map coordinates, ±2mm
        tolerance = 0.002 * 39.37 * map_settings.outputDpi()
        tl = QgsPointXY(map_point.x() - tolerance, map_point.y() - tolerance)
        br = QgsPointXY(map_point.x() + tolerance, map_point.y() + tolerance)
        rect = QgsRectangle(tl, br)
        render_context = QgsRenderContext.fromMapSettings(map_settings)

        features = list()
        for layer in self.map_layers:
            renderer = layer.renderer().clone() if layer.renderer() else None
            if renderer:
                renderer.startRender(render_context, layer.fields())

            if layer.type() == QgsMapLayerType.VectorLayer:
                layer_rect = map_settings.mapToLayerCoordinates(layer, rect)
                request = (
                    QgsFeatureRequest()
                    .setFilterRect(layer_rect)
                    .setFlags(QgsFeatureRequest.ExactIntersect)
                )
                for feature in layer.getFeatures(request):
                    if renderer.willRenderFeature(feature, render_context):
                        properties = OrderedDict(
                            zip(
                                feature.fields().names(),
                                self._clean_attributes(feature.attributes(), layer),
                            )
                        )
                        features.append({"type": "Feature", "properties": properties})
            else:
                raise RuntimeError(
                    f"Layer type `{layer.type().name}` of layer `{layer.shortName()}` not supported by GetFeatureInfo"
                )
            if renderer:
                renderer.stopRender(render_context)

        featurecollection = {"features": features, "type": "FeatureCollection"}
        return JobResult(
            id=self.job_info.id,
            data=json.dumps(featurecollection).encode("utf-8"),
            content_type="application/json",
        )
job_info_class = QslJobInfoFeatureInfo class-attribute instance-attribute
__init__(qgis: QgsApplication, context: JobContext, job_info: QslJobInfoFeatureInfo, layer_cache: Optional[Dict] = None) -> None
Source code in src/qgis_server_light/worker/runner/feature_info.py
22
23
24
25
26
27
28
29
def __init__(
    self,
    qgis: QgsApplication,
    context: JobContext,
    job_info: QslJobInfoFeatureInfo,
    layer_cache: Optional[Dict] = None,
) -> None:
    super().__init__(qgis, context, job_info, layer_cache)
run()
Source code in src/qgis_server_light/worker/runner/feature_info.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def run(self):
    for job_layer_definition in self.job_info.job.layers:
        self._provide_layer(job_layer_definition)
    map_settings = self._get_map_settings(self.map_layers)
    # Estimate queryable bbox (2mm)
    map_to_pixel = map_settings.mapToPixel()
    map_point = map_to_pixel.toMapCoordinates(
        self.job_info.job.x, self.job_info.job.y
    )
    # Create identifiable bbox in map coordinates, ±2mm
    tolerance = 0.002 * 39.37 * map_settings.outputDpi()
    tl = QgsPointXY(map_point.x() - tolerance, map_point.y() - tolerance)
    br = QgsPointXY(map_point.x() + tolerance, map_point.y() + tolerance)
    rect = QgsRectangle(tl, br)
    render_context = QgsRenderContext.fromMapSettings(map_settings)

    features = list()
    for layer in self.map_layers:
        renderer = layer.renderer().clone() if layer.renderer() else None
        if renderer:
            renderer.startRender(render_context, layer.fields())

        if layer.type() == QgsMapLayerType.VectorLayer:
            layer_rect = map_settings.mapToLayerCoordinates(layer, rect)
            request = (
                QgsFeatureRequest()
                .setFilterRect(layer_rect)
                .setFlags(QgsFeatureRequest.ExactIntersect)
            )
            for feature in layer.getFeatures(request):
                if renderer.willRenderFeature(feature, render_context):
                    properties = OrderedDict(
                        zip(
                            feature.fields().names(),
                            self._clean_attributes(feature.attributes(), layer),
                        )
                    )
                    features.append({"type": "Feature", "properties": properties})
        else:
            raise RuntimeError(
                f"Layer type `{layer.type().name}` of layer `{layer.shortName()}` not supported by GetFeatureInfo"
            )
        if renderer:
            renderer.stopRender(render_context)

    featurecollection = {"features": features, "type": "FeatureCollection"}
    return JobResult(
        id=self.job_info.id,
        data=json.dumps(featurecollection).encode("utf-8"),
        content_type="application/json",
    )

legend

GetLegendRunner

Bases: MapRunner

Source code in src/qgis_server_light/worker/runner/legend.py
 5
 6
 7
 8
 9
10
11
class GetLegendRunner(MapRunner):
    def __init__(self, qgis, context: JobContext, job_info: QslJobInfoLegend) -> None:
        super().__init__(qgis, context, job_info)

    def run(self):
        # TODO Implement ....
        raise NotImplementedError()
__init__(qgis, context: JobContext, job_info: QslJobInfoLegend) -> None
Source code in src/qgis_server_light/worker/runner/legend.py
6
7
def __init__(self, qgis, context: JobContext, job_info: QslJobInfoLegend) -> None:
    super().__init__(qgis, context, job_info)
run()
Source code in src/qgis_server_light/worker/runner/legend.py
 9
10
11
def run(self):
    # TODO Implement ....
    raise NotImplementedError()

process

ProcessRunner

Bases: MapRunner

Source code in src/qgis_server_light/worker/runner/process.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class ProcessRunner(MapRunner):
    def __init__(
        self,
        qgis: QgsApplication,
        context: JobContext,
        job_info: QslJobInfoParameter,
        layer_cache: Optional[dict],
    ):
        super().__init__(qgis, context, job_info, layer_cache)

        ProcessingAlgFactory()
        providers = QgsProviderRegistry.instance().pluginList().split("\n")
        logging.info("Found Providers:")
        for provider in providers:
            logging.info(f" - {provider}")

    def load_providers(self, qgis: Qgis):
        qgis.processingRegistry().addProvider(QgsNativeAlgorithms())
        qgis.processingRegistry().addProvider(QgsPdalAlgorithms())
__init__(qgis: QgsApplication, context: JobContext, job_info: QslJobInfoParameter, layer_cache: Optional[dict])
Source code in src/qgis_server_light/worker/runner/process.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def __init__(
    self,
    qgis: QgsApplication,
    context: JobContext,
    job_info: QslJobInfoParameter,
    layer_cache: Optional[dict],
):
    super().__init__(qgis, context, job_info, layer_cache)

    ProcessingAlgFactory()
    providers = QgsProviderRegistry.instance().pluginList().split("\n")
    logging.info("Found Providers:")
    for provider in providers:
        logging.info(f" - {provider}")
load_providers(qgis: Qgis)
Source code in src/qgis_server_light/worker/runner/process.py
28
29
30
def load_providers(self, qgis: Qgis):
    qgis.processingRegistry().addProvider(QgsNativeAlgorithms())
    qgis.processingRegistry().addProvider(QgsPdalAlgorithms())

render

RenderRunner

Bases: MapRunner

Responsible for rendering a QslRenderJob to an image.

Source code in src/qgis_server_light/worker/runner/render.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class RenderRunner(MapRunner):
    """Responsible for rendering a QslRenderJob to an image."""

    job_info_class = QslJobInfoRender

    def __init__(
        self,
        qgis: QgsApplication,
        context: JobContext,
        job_info: QslJobInfoRender,
        layer_cache: Optional[Dict] = None,
    ) -> None:
        super().__init__(qgis, context, job_info, layer_cache)

    @classmethod
    def image_formats(cls):
        return {"image/png": cls._encode_png, "image/jpeg": cls._encode_jpg}

    def run(self):
        """Run this runner.
        Returns:
            A JobResult with the content_type and image_data (bytes) of the rendered image.
        """
        logging.info(f"Executing job: {self.job_info}")
        feature_filter = QgsFeatureFilter()
        for job_layer_definition in self.job_info.job.layers:
            self._provide_layer(job_layer_definition)
        map_settings = self._get_map_settings(self.map_layers)
        filter_providers = QgsFeatureFilterProviderGroup()
        filter_providers.addProvider(feature_filter)
        renderer = QgsMapRendererParallelJob(map_settings)
        renderer.setFeatureFilterProvider(filter_providers)
        event_loop = QEventLoop(self.qgis)
        renderer.finished.connect(event_loop.quit)
        renderer.start()
        event_loop.exec_()
        img = renderer.renderedImage()
        img.setDotsPerMeterX(int(map_settings.outputDpi() * 39.37))
        img.setDotsPerMeterY(int(map_settings.outputDpi() * 39.37))
        content_type, image_data = self._encode_image(img, self.job_info.job.format)
        return JobResult(
            id=self.job_info.id, data=image_data, content_type=content_type
        )

    def _encode_image(self, image: QImage, fmt: str) -> Tuple[str, bytearray]:
        """Encodes an image in a specific mime type
        Args:
            image (QImage): The image to encode
            fmt (str): The mime type of the format
        Returns:
            A tuple with mime type and bytes-like object of an encoded image in the desired format
        """
        try:
            fmt = fmt.lower()
            encoding_method = self.image_formats()[fmt]
            return fmt, encoding_method(image)
        except KeyError:
            raise RuntimeError(
                f"Requested mimtype '{fmt}' was found in {list(self.image_formats.keys())}."
            )

    @staticmethod
    def _encode_png(image: QImage):
        image.convertTo(QImage.Format_RGBA8888)
        image_data = fpng_encode_image_to_memory(
            image.constBits().asstring(image.sizeInBytes()),
            image.width(),
            image.height(),
            0,
            CompressionFlags.NONE,
        )
        return image_data

    @staticmethod
    def _encode_jpg(image: QImage):
        image_data = QByteArray()
        buf = QBuffer(image_data)
        buf.open(QIODevice.WriteOnly)
        image.save(buf, "JPG")
        return image_data
job_info_class = QslJobInfoRender class-attribute instance-attribute
__init__(qgis: QgsApplication, context: JobContext, job_info: QslJobInfoRender, layer_cache: Optional[Dict] = None) -> None
Source code in src/qgis_server_light/worker/runner/render.py
20
21
22
23
24
25
26
27
def __init__(
    self,
    qgis: QgsApplication,
    context: JobContext,
    job_info: QslJobInfoRender,
    layer_cache: Optional[Dict] = None,
) -> None:
    super().__init__(qgis, context, job_info, layer_cache)
image_formats() classmethod
Source code in src/qgis_server_light/worker/runner/render.py
29
30
31
@classmethod
def image_formats(cls):
    return {"image/png": cls._encode_png, "image/jpeg": cls._encode_jpg}
run()

Run this runner. Returns: A JobResult with the content_type and image_data (bytes) of the rendered image.

Source code in src/qgis_server_light/worker/runner/render.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def run(self):
    """Run this runner.
    Returns:
        A JobResult with the content_type and image_data (bytes) of the rendered image.
    """
    logging.info(f"Executing job: {self.job_info}")
    feature_filter = QgsFeatureFilter()
    for job_layer_definition in self.job_info.job.layers:
        self._provide_layer(job_layer_definition)
    map_settings = self._get_map_settings(self.map_layers)
    filter_providers = QgsFeatureFilterProviderGroup()
    filter_providers.addProvider(feature_filter)
    renderer = QgsMapRendererParallelJob(map_settings)
    renderer.setFeatureFilterProvider(filter_providers)
    event_loop = QEventLoop(self.qgis)
    renderer.finished.connect(event_loop.quit)
    renderer.start()
    event_loop.exec_()
    img = renderer.renderedImage()
    img.setDotsPerMeterX(int(map_settings.outputDpi() * 39.37))
    img.setDotsPerMeterY(int(map_settings.outputDpi() * 39.37))
    content_type, image_data = self._encode_image(img, self.job_info.job.format)
    return JobResult(
        id=self.job_info.id, data=image_data, content_type=content_type
    )