-
-
Notifications
You must be signed in to change notification settings - Fork 83
/
Copy pathc_seabreeze_wrapper.pyx
3760 lines (3232 loc) · 138 KB
/
c_seabreeze_wrapper.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""This is the cython wrapper for the seabreeze library
Author: Andreas Poehlmann
"""
cimport cython
cimport c_seabreeze as csb
from cpython.mem cimport PyMem_Malloc, PyMem_Free
from libcpp cimport bool as bool_t
import numpy as np
# from libseabreeze api/SeaBreezeConstants.h
class _ErrorCode(object):
SUCCESS = 0
INVALID_ERROR = 1
NO_DEVICE = 2
FAILED_TO_CLOSE = 3
NOT_IMPLEMENTED = 4
FEATURE_NOT_FOUND = 5
TRANSFER_ERROR = 6
BAD_USER_BUFFER = 7
INPUT_OUT_OF_BOUNDS = 8
SPECTROMETER_SATURATED = 9
VALUE_NOT_FOUND = 10
VALUE_NOT_EXPECTED = 11
INVALID_TRIGGER_MODE = 12
# define max length for some strings
DEF _MAXBUFLEN = 32
DEF _MAXDBUFLEN = 256
class SeaBreezeError(Exception):
_error_msgs = (
"Success",
"Error: Undefined error",
"Error: No device found",
"Error: Could not close device",
"Error: Feature not implemented",
"Error: No such feature on device",
"Error: Data transfer error",
"Error: Invalid user buffer provided",
"Error: Input was out of bounds",
"Error: Spectrometer was saturated",
"Error: Value not found",
"Error: Value not expected",
"Error: Invalid trigger mode"
)
def __init__(self, message=None, error_code=None):
if error_code is not None:
if -99999 < error_code < 0:
# as defined in libseabreeze api/SeaBreezeAPI.cpp
message = "System Error: {:d}".format(error_code)
elif error_code >= len(self._error_msgs):
# not a valid a seabreeze error code
message = self._error_msgs[_ErrorCode.INVALID_ERROR]
else:
# return a seabreeze error message
message = self._error_msgs[error_code]
elif message is not None:
pass
else:
message = ""
# Call the base class constructor with the parameters it needs
super(SeaBreezeError, self).__init__(message)
self.error_code = error_code
class SeaBreezeNumFeaturesError(SeaBreezeError):
def __init__(self, feature, received_num, expected_num=1):
message = ("This should not have happened. Apparently this device has "
"{received_num:d} {feature:s} features. The code expects it "
"to have {expected_num:d}. Please file a bug report including "
"a description of your device.").format(
expected_num=expected_num,
received_num=received_num,
feature=feature
)
super(SeaBreezeNumFeaturesError, self).__init__(message)
class SeaBreezeNotSupported(SeaBreezeError):
# ... i know you shouldn't always subclass Exceptions ...
pass
cdef class SeaBreezeAPI(object):
"""SeaBreeze API interface"""
cdef csb.SeaBreezeAPI *sbapi
def __init__(self, initialize=True):
self.sbapi = NULL
if initialize:
self.initialize()
def initialize(self):
"""initialize the api backend
normally this function does not have to be called directly by the user
"""
self.sbapi = csb.SeaBreezeAPI.getInstance()
def shutdown(self):
"""shutdown the api backend
normally this function does not have to be called directly by the user
"""
# csb.SeaBreezeAPI.shutdown()
# self.sbapi = NULL
pass # disable python level SeaBreezeAPI shutdown
def add_rs232_device_location(self, device_type, bus_path, baudrate):
"""add RS232 device location
Parameters
----------
device_type : str
bus_path : str
This will be a platform-specific location. Under Windows, this may
be COM1, COM2, etc. Under Linux, this might be /dev/ttyS0, /dev/ttyS1,
etc.
baudrate : int
Returns
-------
success : bool
"""
cdef int output
cdef bytes c_devtype
cdef bytes c_buspath
cdef unsigned int c_baudrate
c_devtype = bytes(device_type)
c_buspath = bytes(bus_path)
c_baudrate = int(baudrate)
cdef char* p_devtype = c_devtype
cdef char* p_buspath = c_buspath
output = self.sbapi.addRS232DeviceLocation(p_devtype, p_buspath, c_baudrate)
return not bool(output)
def add_ipv4_device_location(self, device_type, ip_address, port):
"""add ipv4 device location
Parameters
----------
device_type : str
ip_address : str
format XXX.XXX.XXX.XXX
port : int
Returns
-------
success : bool
"""
cdef int output
cdef bytes c_devtype
cdef bytes c_buspath
cdef int c_port
c_devtype = bytes(device_type)
c_ipaddr = bytes(ip_address)
c_port = int(port)
cdef char* p_devtype = c_devtype
cdef char* p_ipaddr = c_ipaddr
output = self.sbapi.addTCPIPv4DeviceLocation(p_devtype, p_ipaddr, c_port)
return not bool(output)
def _list_device_ids(self):
"""list device ids for all available spectrometers
Note: this includes spectrometers that are currently opened in other
processes on the machine.
Returns
-------
device_ids : list of ints
unique device_ids for each available spectrometer
"""
cdef int num_devices
cdef long* c_device_ids
cdef int found_devices
self.sbapi.probeDevices()
num_devices = self.sbapi.getNumberOfDeviceIDs()
c_device_ids = <long*> PyMem_Malloc(num_devices * sizeof(long))
if not c_device_ids:
raise MemoryError("could not allocate memory for device_ids")
try:
found_devices = self.sbapi.getDeviceIDs(c_device_ids, num_devices)
device_ids = []
for i in range(found_devices):
device_ids.append(int(c_device_ids[i]))
return device_ids
finally:
PyMem_Free(c_device_ids)
def list_devices(self):
"""returns available SeaBreezeDevices
list all connected Ocean Optics devices supported
by libseabreeze.
Returns
-------
devices: list of SeaBreezeDevice
connected Spectrometer instances
"""
# Probe Devices on all Buses
device_ids = self._list_device_ids()
devices = []
for handle in device_ids:
dev = SeaBreezeDevice(handle)
if dev.is_open:
was_open_before = True
else:
was_open_before = False
try:
dev.open()
except SeaBreezeError as err:
if err.error_code == _ErrorCode.NO_DEVICE:
# device used by another thread?
continue
model = dev.model
serial = dev.serial_number
if not was_open_before:
dev.close()
devices.append(dev)
return devices
def supported_models(self):
"""returns SeaBreezeDevices supported by the backend
models supported by the backend
Returns
-------
devices: list of str
list of model names that are supported by this backend
"""
cdef int num_models
cdef int error_code
cdef char c_buffer[_MAXBUFLEN]
cdef int bytes_written
num_devices = self.sbapi.getNumberOfSupportedModels()
output = []
for i in range(num_devices):
bytes_written = self.sbapi.getSupportedModelName(i, &error_code, c_buffer, _MAXBUFLEN)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
serial = c_buffer[:bytes_written]
output.append(serial.decode("utf-8").rstrip('\x00'))
return output
cdef class SeaBreezeDevice(object):
"""SeaBreezeDevice class for handling all spectrometers
This is the default cseabreeze class interface for all supported spectrometers.
Users don't instantiate it directly, but retrieve instances via
:func:`seabreeze.cseabreeze.SeaBreezeAPI.list_devices`
"""
cdef readonly long handle
cdef readonly str _model, _serial_number
cdef csb.SeaBreezeAPI *sbapi
def __cinit__(self, handle):
self.sbapi = csb.SeaBreezeAPI.getInstance()
def __init__(self, handle=None):
if handle is None:
raise SeaBreezeError("Don't instantiate SeaBreezeDevice directly. Use `SeabreezeAPI.list_devices()`.")
self.handle = handle
try:
self._get_info()
except SeaBreezeError:
if not self._model:
# TODO: warn, getting the model string should always succeed...
self._model = "?"
if not self._serial_number:
self._serial_number = "?"
def __dealloc__(self):
cdef int error_code
# always returns 1
self.sbapi.closeDevice(self.handle, &error_code)
cdef _get_info(self):
"""populate model and serial_number attributes (internal)"""
model = self.get_model()
try:
self._model = model
except TypeError:
self._model = model.encode("utf-8")
serial_number = self.get_serial_number()
try:
self._serial_number = serial_number
except TypeError:
self._serial_number = serial_number.encode("utf-8")
def __repr__(self):
return "<SeaBreezeDevice %s:%s>" % (self.model, self.serial_number)
def open(self):
"""open the spectrometer usb connection
Returns
-------
None
"""
cdef int error_code
cdef int ret
ret = self.sbapi.openDevice(self.handle, &error_code)
if int(ret) > 0 or error_code != 0:
raise SeaBreezeError(error_code=error_code)
self._get_info()
def close(self):
"""close the spectrometer usb connection
Returns
-------
None
"""
cdef int error_code
# always returns 1
self.sbapi.closeDevice(self.handle, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
@property
def model(self):
return "{}.".format(self._model)[:-1]
@property
def serial_number(self):
return "{}.".format(self._serial_number)[:-1]
@property
def is_open(self):
"""returns if the spectrometer device usb connection is opened
Returns
-------
bool
"""
try:
# this is a hack to figure out if the spectrometer is connected
self.get_serial_number()
except SeaBreezeNumFeaturesError:
return False
except SeaBreezeError as err:
if err.error_code == _ErrorCode.TRANSFER_ERROR:
return False
raise err
else:
return True
def get_serial_number(self):
"""return the serial number string of the spectrometer
Returns
-------
serial_number: str
"""
cdef int error_code
cdef int num_serial_number_features
num_serial_number_features = self.sbapi.getNumberOfSerialNumberFeatures(self.handle, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
if num_serial_number_features != 1:
raise SeaBreezeNumFeaturesError("serial number", received_num=num_serial_number_features)
cdef long feature_id
self.sbapi.getSerialNumberFeatures(self.handle, &error_code, &feature_id, 1)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
cdef unsigned char max_length
max_length = self.sbapi.getSerialNumberMaximumLength(self.handle, feature_id, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
cdef char c_buffer[_MAXBUFLEN]
cdef int bytes_written
bytes_written = self.sbapi.getSerialNumber(self.handle, feature_id, &error_code, c_buffer, max_length)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
serial = c_buffer[:bytes_written]
return serial.decode("utf-8").rstrip('\x00')
def get_model(self):
"""return the model string of the spectrometer
Returns
-------
model: str
"""
cdef int error_code
cdef char c_buffer[_MAXBUFLEN]
cdef int bytes_written
bytes_written = self.sbapi.getDeviceType(self.handle, &error_code, c_buffer, _MAXBUFLEN)
model = c_buffer[:bytes_written]
if model == "NONE":
raise SeaBreezeError(error_code=error_code)
return model.decode("utf-8")
@property
def features(self):
"""return a dictionary of all supported features
this returns a dictionary with all supported Features of the spectrometer
Returns
-------
features : `dict` [`str`, `seabreeze.cseabreeze.SeaBreezeFeature`]
"""
# TODO: make this a cached property
features = {}
# noinspection PyProtectedMember
feature_registry = SeaBreezeFeature.get_feature_class_registry()
for identifier, feature_class in feature_registry.items():
feature_ids = feature_class._get_feature_ids_from_device(self)
features[identifier] = [feature_class(self, feature_id) for feature_id in feature_ids]
return features
@property
def f(self):
"""convenience assess to features via attributes
this allows you to access a feature like this::
# via .features
device.features['spectrometer'][0].get_intensities()
# via .f
device.f.spectrometer.get_intensities()
"""
class FeatureAccessHandler(object):
def __init__(self, feature_dict):
for identifier, features in feature_dict.items():
setattr(self, identifier, features[0] if features else None) # TODO: raise FeatureNotAvailable?
return FeatureAccessHandler(self.features)
cdef class SeaBreezeFeature(object):
"""BaseClass for SeaBreezeFeatures
defines the minimum class interface for all features
"""
cdef SeaBreezeDevice device
cdef long device_id
cdef readonly long feature_id
cdef csb.SeaBreezeAPI *sbapi
identifier = "base_feature"
def __cinit__(self, SeaBreezeDevice device, int feature_id):
self.sbapi = csb.SeaBreezeAPI.getInstance()
def __init__(self, SeaBreezeDevice device, int feature_id):
if self.identifier == "base_feature":
raise SeaBreezeError("Don't instantiate SeaBreezeFeature directly. Use derived feature classes.")
self.device = device
self.device_id = device.handle
self.feature_id = feature_id
def __repr__(self):
return "<{}:{}:{} id={}>".format(self.__class__.__name__,
self.device.model, self.device.serial_number, self.feature_id)
@classmethod
def get_feature_class_registry(cls):
# noinspection PyUnresolvedReferences
return {feature_class.identifier: feature_class for feature_class in SeaBreezeFeature.__subclasses__()}
@classmethod
def _get_feature_ids_from_device(cls, SeaBreezeDevice device):
return []
@classmethod
def _raise_if_error(cls, error_code, num_features):
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
if num_features == 0:
# TODO: raise SeaBreezeNumFeaturesError(cls.identifier, num_features) ?
pass
cdef class SeaBreezeRawUSBBusAccessFeature(SeaBreezeFeature):
identifier = "raw_usb_bus_access"
@classmethod
def _get_feature_ids_from_device(cls, SeaBreezeDevice device): # autogenerated
cdef int num_features, error_code
cdef csb.SeaBreezeAPI* sbapi = csb.SeaBreezeAPI.getInstance()
num_features = sbapi.getNumberOfRawUSBBusAccessFeatures(device.handle, &error_code)
cls._raise_if_error(error_code, num_features)
py_feature_ids = []
if num_features != 0:
feature_ids = <long*> PyMem_Malloc(num_features * sizeof(long))
if not feature_ids:
raise MemoryError("could not allocate memory for feature_ids")
try:
sbapi.getRawUSBBusAccessFeatures(device.handle, &error_code, feature_ids, num_features)
cls._raise_if_error(error_code, num_features)
py_feature_ids = [feature_ids[i] for i in range(num_features)]
finally:
PyMem_Free(feature_ids)
return py_feature_ids
cdef unsigned char _get_device_endpoint(self, endpoint):
cdef csb.usbEndpointType ep
cdef int error_code
cdef unsigned char out
ep_map = {
'primary_out': csb.kEndpointTypePrimaryOut, # slow speed
'primary_in': csb.kEndpointTypePrimaryIn, # slow speed
'secondary_out': csb.kEndpointTypeSecondaryOut, # could be high speed
'secondary_in': csb.kEndpointTypeSecondaryIn, # could be high speed
'secondary_in2': csb.kEndpointTypeSecondaryIn2 # generally high speed
}
if endpoint not in ep_map.keys():
raise ValueError("endpoint not in %s" % str(ep_map.keys()))
ep = ep_map[endpoint]
out = self.sbapi.getDeviceEndpoint(self.device_id, &error_code, ep)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
return out
def raw_usb_read(self, endpoint, buffer_length=1024):
"""read raw data from usb
Parameters
----------
endpoint : str
one of {'primary_out', 'primary_in', 'secondary_out', 'secondary_in', 'secondary_in2'}
buffer_length : int, default=1024
length of the allocated outputbuffer
Returns
-------
data: str
raw readout from usb
"""
cdef unsigned char* c_buffer = NULL
cdef unsigned int buflen = int(buffer_length)
cdef int error_code
cdef int bytes_written
cdef unsigned char ep = self._get_device_endpoint(endpoint)
c_buffer = <unsigned char*> PyMem_Malloc(buffer_length * sizeof(unsigned char))
if not c_buffer:
raise MemoryError("could not allocate memory for buffer")
try:
bytes_written = self.sbapi.rawUSBBusAccessRead(self.device_id, self.feature_id, &error_code,
&c_buffer[0], buflen, ep)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
data = c_buffer[:bytes_written]
finally:
PyMem_Free(c_buffer)
return data
def raw_usb_write(self, data, endpoint):
"""send raw data to usb
Parameters
----------
data : str
raw data that should be transfered to spectrometer
endpoint : str
one of {'primary_out', 'primary_in', 'secondary_out', 'secondary_in', 'secondary_in2'}
Returns
-------
bytes_written : int
"""
cdef unsigned char* c_buffer
cdef unsigned int c_buffer_length
cdef int error_code
cdef int bytes_written
cdef unsigned char ep = self._get_device_endpoint(endpoint)
bdata = bytes(data)
c_buffer = bdata
c_buffer_length = len(data)
bytes_written = self.sbapi.rawUSBBusAccessWrite(self.device_id, self.feature_id, &error_code,
&c_buffer[0], c_buffer_length, ep)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
return int(bytes_written)
cdef class SeaBreezeSpectrometerFeature(SeaBreezeFeature):
identifier = "spectrometer"
cdef readonly int _cached_spectrum_length
def __cinit__(self, SeaBreezeDevice device, int feature_id):
self._cached_spectrum_length = -1
@classmethod
def _get_feature_ids_from_device(cls, SeaBreezeDevice device): # autogenerated
cdef int num_features, error_code
cdef csb.SeaBreezeAPI* sbapi = csb.SeaBreezeAPI.getInstance()
num_features = sbapi.getNumberOfSpectrometerFeatures(device.handle, &error_code)
cls._raise_if_error(error_code, num_features)
py_feature_ids = []
if num_features != 0:
feature_ids = <long*> PyMem_Malloc(num_features * sizeof(long))
if not feature_ids:
raise MemoryError("could not allocate memory for feature_ids")
try:
sbapi.getSpectrometerFeatures(device.handle, &error_code, feature_ids, num_features)
cls._raise_if_error(error_code, num_features)
py_feature_ids = [feature_ids[i] for i in range(num_features)]
finally:
PyMem_Free(feature_ids)
return py_feature_ids
def set_trigger_mode(self, mode):
"""sets the trigger mode for the spectrometer
Parameters
----------
mode : int
trigger mode for spectrometer. Note that requesting an
unsupported mode will result in an error.
Returns
-------
None
"""
cdef int error_code
cdef int cmode
cmode = int(mode)
cdef unsigned long device_id, feature_id
with nogil:
self.sbapi.spectrometerSetTriggerMode(self.device_id, self.feature_id, &error_code, cmode)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
def set_integration_time_micros(self, unsigned long integration_time_micros):
"""sets the integration time for the specified device
Parameters
----------
integration_time_micros : int
the integration time in micro seconds
Returns
-------
None
"""
cdef int error_code
cdef unsigned long cinttime
cinttime = int(integration_time_micros)
with nogil:
self.sbapi.spectrometerSetIntegrationTimeMicros(self.device_id, self.feature_id, &error_code, cinttime)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
def get_integration_time_micros_limits(self):
"""returns the smallest and largest valid integration time setting, in microseconds
Returns
-------
micros_low: int
smallest supported integration time
micros_high: int
largest supported integration time
"""
cdef int error_code
cdef unsigned long int_low, int_high
int_low = self.sbapi.spectrometerGetMinimumIntegrationTimeMicros(self.device_id, self.feature_id, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
int_high = self.sbapi.spectrometerGetMaximumIntegrationTimeMicros(self.device_id, self.feature_id, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
return int(int_low), int(int_high)
def get_maximum_intensity(self):
"""returns the maximum pixel intensity for the spectrometer
Returns
-------
max_intensity: float
"""
cdef int error_code
cdef double max_intensity
max_intensity = self.sbapi.spectrometerGetMaximumIntensity(self.device_id, self.feature_id, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
return float(max_intensity)
def get_electric_dark_pixel_indices(self):
"""returns the electric dark pixel indices for the spectrometer
This returns a list of indices of the pixels that are electrically active
but optically masked (a.k.a. electric dark pixels). Note that not all
detectors have optically masked pixels; in that case, an empty list is returned
Returns
-------
dark_pixel_idxs: list of int
"""
cdef int error_code
cdef int dp_count, written
dp_count = self.sbapi.spectrometerGetElectricDarkPixelCount(self.device_id, self.feature_id, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
if dp_count == 0:
return []
cindices = <int*> PyMem_Malloc(dp_count * sizeof(int))
if not cindices:
raise MemoryError("could not allocate memory for cindices")
try:
written = self.sbapi.spectrometerGetElectricDarkPixelIndices(self.device_id, self.feature_id, &error_code,
&cindices[0], dp_count)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
assert int(written) == int(dp_count)
indices = [int(cindices[i]) for i in range(dp_count)]
finally:
PyMem_Free(cindices)
return indices
@property
def _spectrum_length(self):
"""cached spectrum length
Returns
-------
spectrum_length: int
"""
cdef int error_code
cdef int spec_length
if self._cached_spectrum_length < 0:
spec_length = self.sbapi.spectrometerGetFormattedSpectrumLength(self.device_id, self.feature_id, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
self._cached_spectrum_length = int(spec_length)
return self._cached_spectrum_length
@cython.boundscheck(False)
def get_wavelengths(self):
"""computes the wavelengths for the spectrometer
Returns
-------
wavelengths: `np.ndarray`
"""
cdef int error_code
cdef int bytes_written
cdef double[::1] out
cdef int out_length
wavelengths = np.zeros((self._spectrum_length, ), dtype=np.double)
out = wavelengths
out_length = wavelengths.size
with nogil:
bytes_written = self.sbapi.spectrometerGetWavelengths(self.device_id, self.feature_id, &error_code,
&out[0], out_length)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
return wavelengths
@cython.boundscheck(False)
def get_intensities(self):
"""acquires a spectrum and returns the measured intensities
In this mode, auto-nulling should be automatically performed
for devices that support it.
Returns
-------
intensities: `np.ndarray`
"""
cdef int error_code
cdef int bytes_written
cdef double[::1] out
cdef int out_length
intensities = np.zeros((self._spectrum_length, ), dtype=np.double)
out = intensities
out_length = intensities.size
with nogil:
bytes_written = self.sbapi.spectrometerGetFormattedSpectrum(self.device_id, self.feature_id, &error_code,
&out[0], out_length)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
assert bytes_written == self._spectrum_length
return intensities
def _get_spectrum_raw(self):
# int spectrometerGetUnformattedSpectrumLength(long deviceID, long spectrometerFeatureID, int *errorCode)
# int spectrometerGetUnformattedSpectrum(long deviceID, long spectrometerFeatureID, int *errorCode,
# unsigned char *buffer, int bufferLength)
raise NotImplementedError("unformatted spectrum")
def get_fast_buffer_spectrum(self):
# TODO: requires wrapping of OBP command GetRawSpectrumWithMetadata
# which returns N raw spectra each with a 64 byte metadata prefix
# int spectrometerGetFastBufferSpectrum(long deviceID, long spectrometerFeatureID, int *errorCode,
# unsigned char *dataBuffer, int dataMaxLength,
# unsigned int numberOfSampleToRetrieve) # currently 15 max
raise NotImplementedError("Not yet supported via cseabreeze. Requires changes to libseabreeze.")
cdef class SeaBreezePixelBinningFeature(SeaBreezeFeature):
identifier = "pixel_binning"
@classmethod
def _get_feature_ids_from_device(cls, SeaBreezeDevice device): # autogenerated
cdef int num_features, error_code
cdef csb.SeaBreezeAPI* sbapi = csb.SeaBreezeAPI.getInstance()
num_features = sbapi.getNumberOfPixelBinningFeatures(device.handle, &error_code)
cls._raise_if_error(error_code, num_features)
py_feature_ids = []
if num_features != 0:
feature_ids = <long*> PyMem_Malloc(num_features * sizeof(long))
if not feature_ids:
raise MemoryError("could not allocate memory for feature_ids")
try:
sbapi.getPixelBinningFeatures(device.handle, &error_code, feature_ids, num_features)
cls._raise_if_error(error_code, num_features)
py_feature_ids = [feature_ids[i] for i in range(num_features)]
finally:
PyMem_Free(feature_ids)
return py_feature_ids
def set_binning_factor(self, factor):
"""sets the pixel binning factor on the device
Returns
-------
None
"""
cdef int error_code
cdef unsigned char binning
binning = int(factor)
self.sbapi.binningSetPixelBinningFactor(self.device_id, self.feature_id, &error_code, binning)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
def get_binning_factor(self):
"""gets the pixel binning factor on the device
Returns
-------
binning_factor: int
"""
cdef int error_code
cdef unsigned char binning
binning = self.sbapi.binningGetPixelBinningFactor(self.device_id, self.feature_id, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
return int(binning)
def get_max_binning_factor(self):
"""gets the max pixel binning factor on the device
Returns
-------
binning_factor: int
"""
cdef int error_code
cdef unsigned char binning
binning = self.sbapi.binningGetMaxPixelBinningFactor(self.device_id, self.feature_id, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
return int(binning)
binning_factor = property(get_binning_factor, set_binning_factor)
max_binning_factor = property(get_max_binning_factor)
def set_default_binning_factor(self, factor):
"""sets the pixel binning factor on the device
Parameters
----------
factor : int or None
the desired default pixel binning factor. If None, resets the default.
Returns
-------
None
"""
cdef int error_code
cdef unsigned char binning
if factor is None:
self.sbapi.binningSetDefaultPixelBinningFactor(self.device_id, self.feature_id, &error_code)
else:
binning = int(factor)
self.sbapi.binningSetDefaultPixelBinningFactor(self.device_id, self.feature_id, &error_code, binning)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
def get_default_binning_factor(self):
"""gets the default pixel binning factor on the device
Returns
-------
binning_factor: int
"""
cdef int error_code
cdef unsigned char binning
binning = self.sbapi.binningGetDefaultPixelBinningFactor(self.device_id, self.feature_id, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
return int(binning)
default_binning_factor = property(get_default_binning_factor, set_default_binning_factor)
cdef class SeaBreezeThermoElectricFeature(SeaBreezeFeature):
identifier = "thermo_electric"
@classmethod
def _get_feature_ids_from_device(cls, SeaBreezeDevice device): # autogenerated
cdef int num_features, error_code
cdef csb.SeaBreezeAPI* sbapi = csb.SeaBreezeAPI.getInstance()
num_features = sbapi.getNumberOfThermoElectricFeatures(device.handle, &error_code)
cls._raise_if_error(error_code, num_features)
py_feature_ids = []
if num_features != 0:
feature_ids = <long*> PyMem_Malloc(num_features * sizeof(long))
if not feature_ids:
raise MemoryError("could not allocate memory for feature_ids")
try:
sbapi.getThermoElectricFeatures(device.handle, &error_code, feature_ids, num_features)
cls._raise_if_error(error_code, num_features)
py_feature_ids = [feature_ids[i] for i in range(num_features)]
finally:
PyMem_Free(feature_ids)
return py_feature_ids
def read_temperature_degrees_celsius(self):
"""reads the actual temperature of the TEC in degrees Celsius
Returns
-------
temperature: float
tec temperature in degrees Celsius
"""
cdef int error_code
cdef double temperature
temperature = self.sbapi.tecReadTemperatureDegreesC(self.device_id, self.feature_id, &error_code)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
return float(temperature)
def set_temperature_setpoint_degrees_celsius(self, temperature):
"""sets the target (setpoint) TEC temperature
Returns
-------
None
"""
cdef int error_code
cdef double temperature_degrees_celsius
temperature_degrees_celsius = float(temperature)
self.sbapi.tecSetTemperatureSetpointDegreesC(self.device_id, self.feature_id, &error_code,
temperature_degrees_celsius)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
def enable_tec(self, state):
"""enables the TEC feature on the device
Parameters
----------
state : bool
on or off
Returns
-------
None
"""
cdef int error_code
cdef unsigned char enable = 1 if bool(state) else 0
self.sbapi.tecSetEnable(self.device_id, self.feature_id, &error_code, enable)
if error_code != 0:
raise SeaBreezeError(error_code=error_code)
cdef class SeaBreezeIrradCalFeature(SeaBreezeFeature):
identifier = "irrad_cal"