o
    ðù‹i-  ã                   @   sà   d Z ddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZ ddlmZ ddlmZ ddlZddlmZ ejejejdd ddlZe e¡ZG d	d
„ d
eƒZdd„ Zdd„ Zedkrneƒ  dS dS )z
@group:waditu
@author: DY
é    N)Údefaultdict)Úwraps)ÚProcess)Ú"WebSocketConnectionClosedExceptionz4%(asctime)s - %(name)s - %(levelname)s - %(message)s)ÚstreamÚlevelÚformatc                   @   sV   e Zd Zddd„Zdd„ Zdd	„ Zd
d„ Zdd„ Zdd„ Zdd„ Z	dd„ Z
dd„ ZdS )ÚTsSubscribeÚ úmulti-threadFc                 C   s>   d| _ || _|| _|| _tdd„ ƒ| _tdd„ ƒ| _d | _d S )Nzwss://ws.tushare.pro/listeningc                   S   ó   t ƒ S ©N©Úlist© r   r   úQ/opt/alphahud/venv/lib/python3.10/site-packages/tushare/subs/ts_subs/subscribe.pyÚ<lambda>$   ó    z&TsSubscribe.__init__.<locals>.<lambda>c                   S   r   r   r   r   r   r   r   r   %   r   )ÚurlÚtokenÚdebugÚcallback_moder   ÚtopicsÚcallback_funcsÚ	websocket)Úselfr   r   r   r   r   r   Ú__init__   s   
zTsSubscribe.__init__c                    s    ‡ fdd„}t j|d ¡  d S )Nc                     s2   t  d¡ ddi} ˆ j t | ¡¡ t d¡ d S )Né   ÚactionÚpingzsend ping message)ÚtimeÚsleepr   ÚsendÚjsonÚdumpsÚloggerr   )Úreq_data©r   r   r   r   *   s
   
ÿz2TsSubscribe.threading_keepalive_ping.<locals>.ping)Útarget)Ú	threadingÚThreadÚstart)r   r   r   r'   r   Úthreading_keepalive_ping)   s   z$TsSubscribe.threading_keepalive_pingc                 O   s8   d| j | jdœ}| j t |¡¡ t d¡ |  ¡  d S )NÚ	listening)r   r   Údatazapplication starting...)	r   r   r   r"   r#   r$   r%   Úinfor,   )r   ÚargsÚkwargsr&   r   r   r   Úon_open3   s   ý
zTsSubscribe.on_openc           	      O   sà   t |d tjƒr|d }n|d }t |¡ t |tttfƒr3t 	|¡}| 
d¡s2t | 
d¡¡ d S nt |¡ d S | 
di ¡}|rGt |tƒsId S | 
d¡}| 
d¡}| 
d¡}|r^|r^|sgt d	| ¡ d S |  |||¡ d S )
Nr   é   ÚstatusÚmessager.   ÚtopicÚcodeÚrecordzget invalid response-data(%s))Ú
isinstancer   ÚWebSocketAppr%   r   ÚstrÚbytesÚ	bytearrayr#   ÚloadsÚgetÚerrorr/   ÚdictÚwarningÚ_do_callback_function)	r   r0   r1   r5   Ú	resp_datar.   r6   r7   r8   r   r   r   Ú
on_message=   s,   



þ



zTsSubscribe.on_messagec                 O   s    | j rtjt|ƒdd d S d S )NT)Úexc_info)r   Úloggingr@   r;   )r   r@   r0   r1   r   r   r   Úon_errorX   s   ÿzTsSubscribe.on_errorc                 O   s>   t  d¡ t ¡ \}}}|ttfv rt d¡ |  ¡  d S d S )NÚcloser3   )	r%   r@   ÚsysrF   r   ÚConnectionRefusedErrorr    r!   Úrun)r   r0   r1   Ú_typeÚ_valueÚ
_tracebackr   r   r   Úon_close\   s   

þzTsSubscribe.on_closec                 C   sf   | j s
t d¡ d S tj| j| j| j| j| j	d| _| j 
d¡r,| jjdtjid d S | j ¡  d S )Nzno data.)rE   rH   rP   r2   zwss:Ú	cert_reqs)Ússlopt)r   r%   r@   r   r:   r   rE   rH   rP   r2   Ú
startswithÚrun_foreverÚsslÚ	CERT_NONEr'   r   r   r   rL   c   s   
ûzTsSubscribe.runc                    s   t ˆ ƒ‰ ‡ ‡‡fdd„}|S )Nc                    s  t ƒ ˆ _t ƒ ˆ _ˆD ]g}t d|¡st d¡ tdƒ d|v rVˆ jD ]}t ||¡s0t ||¡r9t d¡ tdƒ q"ˆ jD ]}t ||¡rNt d¡ tdƒ q=ˆ j 	|¡ q
ˆ jD ]}t ||¡rjt d¡ tdƒ qYˆ j 	|¡ q
ˆj
ˆ  ˆ7  < ˆjˆ  ˆ ¡ tˆ ƒ‡ fdd„ƒ}|S )Nz[\d\w\.\*]+z
error coder3   Ú*zduplicate codec                     s   ˆ | i |¤ŽS )z* should receive a message-value parameter r   )r0   r1   ©Úfuncr   r   Úinner“   s   z6TsSubscribe.register.<locals>.decorator.<locals>.inner)ÚsetÚcodesÚpcodesÚreÚmatchr%   r@   ÚexitÚfnmatchÚaddr   r   Úappendr   )rY   r7   Úcode1rZ   ©r\   r   r6   rX   r   Ú	decoratorw   s:   


€

€

€z'TsSubscribe.register.<locals>.decorator)r[   )r   r6   r\   rf   r   re   r   Úregistert   s   #zTsSubscribe.registerc                 C   sœ   | j | D ]F}d}||jv rd}n|jD ]}t ||¡r d} nq|s$q| jdkr.||ƒ q| jdkr;t ||f¡ q| jdkrKt||fd}| ¡  qd S )NFTzsingle-threadr   zmulti-process)r(   r0   )	r   r\   r]   ra   r   ÚthreadÚstart_new_threadr   r+   )r   r6   r7   ÚvaluerY   ÚcheckedÚpcodeÚpr   r   r   rC   œ   s(   

þ



€íz!TsSubscribe._do_callback_functionN)r
   r   F)Ú__name__Ú
__module__Ú__qualname__r   r,   r2   rE   rH   rP   rL   rg   rC   r   r   r   r   r	      s    


(r	   c                  C   s,   t dƒ} | jddgddd„ ƒ}|  ¡  d S )NÚxxxÚ
HQ_STK_MINz	1MIN:*.SH©r6   r\   c                 S   ó   t dt| ƒ ƒ dS ©u°   
        è®¢é˜…ä¸»é¢˜topicï¼Œå¹¶æŒ‡å®šcodesåˆ—è¡¨ï¼Œåœ¨æŽ¥æ”¶åˆ°topicçš„æŽ¨é€æ¶ˆæ¯æ—¶ï¼Œç¬¦åˆcodeæ¡ä»¶ï¼Œå°±ä¼šæ‰§è¡Œå›žè°ƒ
        :param record:
        :return:
        u0   ç”¨æˆ·å®šä¹‰ä¸šåŠ¡ä»£ç è¾“å‡º print_message(%s)N©Úprintr;   ©r8   r   r   r   Úprint_message·   ó   ztest_min.<locals>.print_message©r	   rg   rL   ©Úappry   r   r   r   Útest_min³   s   
r~   c                  C   s.   t dd} | jddgddd„ ƒ}|  ¡  d S )Nrq   )r   ÚHQ_STK_TICKz*.SHrs   c                 S   rt   ru   rv   rx   r   r   r   ry   Ç   rz   z test_tick.<locals>.print_messager{   r|   r   r   r   Ú	test_tickÃ   s   

r€   Ú__main__)Ú__doc__Ú_threadrh   ra   r#   rG   rJ   r^   rU   r)   r    Úcollectionsr   Ú	functoolsr   Úmultiprocessing.contextr   r   r   ÚbasicConfigÚstdoutÚDEBUGÚ	getLoggerrn   r%   Úobjectr	   r~   r€   r   r   r   r   Ú<module>   s0   
 
ÿ